

# AWS Glue for Spark での ETL の接続タイプとオプション
<a name="aws-glue-programming-etl-connect"></a>

AWS Glue for Spark では、PySpark と Scala のさまざまなメソッドと変換で、`connectionType` パラメータを使用しながら接続タイプを指定します。`connectionOptions` または `options` パラメータを使用して接続オプションを指定します。

`connectionType` パラメータには、次の表に示す値を指定できます。各タイプの関連する `connectionOptions` (または `options`) パラメータ値については、以下のセクションで説明します。特に明記されていない限り、パラメータは、接続がソースまたはシンクとして使用されるときに適用されます。

接続オプションの設定と使用方法を示すサンプルコードについては、「各接続タイプのホームページ」を参照してください。


| `connectionType` | 接続先 | 
| --- | --- | 
| [dynamodb](aws-glue-programming-etl-connect-dynamodb-home.md) | [Amazon DynamoDB](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/) データベース | 
| [kinesis](aws-glue-programming-etl-connect-kinesis-home.md) | [Amazon Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/introduction.html) | 
| [s3](aws-glue-programming-etl-connect-s3-home.md) | [Simple Storage Service (Amazon S3)](https://docs.aws.amazon.com/AmazonS3/latest/dev/) | 
| [documentdb](aws-glue-programming-etl-connect-documentdb-home.md#aws-glue-programming-etl-connect-documentdb) | [Amazon DocumentDB (MongoDB 互換)](https://docs.aws.amazon.com/documentdb/latest/developerguide/) データベース | 
| [opensearch](aws-glue-programming-etl-connect-opensearch-home.md) | [Amazon OpenSearch Service](https://docs.aws.amazon.com/opensearch-service/latest/developerguide/)。 | 
| [Redshift](aws-glue-programming-etl-connect-redshift-home.md) | [Amazon Redshift](https://aws.amazon.com/redshift/) データベース | 
| [kafka](aws-glue-programming-etl-connect-kafka-home.md) |  [Kafka](https://kafka.apache.org/) または [Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com/msk/latest/developerguide/what-is-msk.html) | 
| [azurecosmos](aws-glue-programming-etl-connect-azurecosmos-home.md) | Azure Cosmos for NoSQL。 | 
| [azuresql](aws-glue-programming-etl-connect-azuresql-home.md) | Azure SQL。 | 
| [bigquery](aws-glue-programming-etl-connect-bigquery-home.md) | Google BigQuery。 | 
| [mongodb](aws-glue-programming-etl-connect-mongodb-home.md) | [MongoDB](https://www.mongodb.com/what-is-mongodb) データベース (MongoDB Atlas を含む)。 | 
| [sqlserver](aws-glue-programming-etl-connect-jdbc-home.md) |  Microsoft SQL Server データベース (「[JDBC 接続](aws-glue-programming-etl-connect-jdbc-home.md)」を参照) | 
| [mysql](aws-glue-programming-etl-connect-jdbc-home.md) | [MySQL](https://www.mysql.com/) データベース (「[JDBC 接続](aws-glue-programming-etl-connect-jdbc-home.md)」を参照) | 
| [oracle](aws-glue-programming-etl-connect-jdbc-home.md) | [Oracle](https://www.oracle.com/database/) データベース (「[JDBC 接続](aws-glue-programming-etl-connect-jdbc-home.md)」を参照) | 
| [postgresql](aws-glue-programming-etl-connect-jdbc-home.md) |  [PostgreSQL](https://www.postgresql.org/) データベース (「[JDBC 接続](aws-glue-programming-etl-connect-jdbc-home.md)」を参照) | 
| [saphana](aws-glue-programming-etl-connect-saphana-home.md) | SAP HANA。 | 
| [snowflake](aws-glue-programming-etl-connect-snowflake-home.md) | [Snowflake](https://www.snowflake.com/) データレイク | 
| [teradata](aws-glue-programming-etl-connect-teradata-home.md) | Teradata Vantage。 | 
| [vertica](aws-glue-programming-etl-connect-vertica-home.md) | Vertica。 | 
| [カスタム。\*](#aws-glue-programming-etl-connect-market) | Spark、Athena、または JDBC データストア (「[カスタムと AWS Marketplace での、connectionType の値](#aws-glue-programming-etl-connect-market)」 を参照  | 
| [marketplace.\*](#aws-glue-programming-etl-connect-market) | Spark、Athena、または JDBC データストア (「[カスタムと AWS Marketplace での、connectionType の値](#aws-glue-programming-etl-connect-market)」を参照)  | 

## AWS Glue 5.0 for Spark の ETL の DataFrame オプション
<a name="aws-glue-programming-etl-connect-dataframe"></a>

DataFrame は、名前の付いた列にまとめられた、テーブルに似たデータセットで、機能スタイル (マップ/リデュース/フィルター/その他) 操作と SQL 操作 (選択、プロジェクト、集計) をサポートしています。

Glue でサポートされているデータソースの DataFrame を作成するには、以下が必要です。
+ データソースコネクタ `ClassName`
+ データソース接続 `Options`

同様に、Glue でサポートされているデータシンクに DataFrame を書き込むときも、同じものが必要です。
+ データシンクコネクタ `ClassName`
+ データシンク接続 `Options`

ジョブのブックマークなどの AWS Glue 機能や、`connectionName` などの DynamicFrame オプションは DataFrame ではサポートされていないことに注意してください。DataFrame とサポートされているオペレーションの詳細については、[DataFrame](https://spark.apache.org/docs/3.5.2/api/python/reference/pyspark.sql/dataframe.html) の Spark のドキュメントを参照してください。

### コネクタ ClassName の指定
<a name="aws-glue-programming-etl-connect-dataframe-classname"></a>

データソース/シンクの `ClassName` を指定するには、`.format` オプションを使用して、データソース/シンクを定義するための対応するコネクタ `ClassName` を指定します。

**JDBC コネクタ**  
JDBC コネクタの場合は、`.format` オプションの値として `jdbc` を指定し、`driver` オプションで JDBC ドライバー `ClassName` を指定します。

```
df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")...

df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...
```

次の表は、AWS Glue for DataFrames でサポートされているデータソースの JDBC ドライバー `ClassName` の一覧です。

| データソース | ドライバーの ClassName | 
| --- |--- |
| PostgreSQL | org.postgresql.Driver | 
| Oracle | oracle.jdbc.driver.OracleDriver | 
| SQLServer | com.microsoft.sqlserver.jdbc.SQLServerDriver | 
| MySQL | com.mysql.jdbc.Driver | 
| SAPHana | com.sap.db.jdbc.Driver | 
| Teradata | com.teradata.jdbc.TeraDriver | 

**スパークコネクタ**  
Spark コネクタの場合は、コネクタの `ClassName` を `.format` オプションの値として指定します。

```
df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")...

df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...
```

次の表は、AWS Glue for DataFrames でサポートされているデータソースの Spark コネクタ `ClassName` の一覧です。

| データソース | ClassName | 
| --- |--- |
| MongoDB/DocumentDB | glue.spark.mongodb | 
| Redshift | io.github.spark\_redshift\_community.spark.redshift | 
| AzureCosmos | cosmos.oltp | 
| AzureSQL | com.microsoft.sqlserver.jdbc.spark | 
| BigQuery | com.google.cloud.spark.bigquery | 
| OpenSearch | org.opensearch.spark.sql | 
| Snowflake | net.snowflake.spark.snowflake | 
| Vertica | com.vertica.spark.datasource.VerticaSource | 

### 接続オプションの指定
<a name="aws-glue-programming-etl-connect-dataframe-connection-options"></a>

データソース/シンクへの接続の `Options` を指定するには、`.option(<KEY>, <VALUE>)` を使用して個々のオプションを指定するか、`.options(<MAP>)` を使用してキーと値のマップとして複数のオプションを指定します。

各データソース/シンクは、接続 `Options` の独自のセットをサポートします。利用可能な `Options` の詳細については、次の表に記載されている特定のデータソース/シンクの Spark コネクタの公開ドキュメントを参照してください。
+ [JDBC](https://spark.apache.org/docs/3.5.2/sql-data-sources-jdbc.html)
+ [MongoDB/DocumentDB](https://www.mongodb.com/docs/spark-connector/v10.4/)
+ [Redshift](https://docs.aws.amazon.com/redshift/latest/mgmt/spark-redshift-connector.html)
+ [AzureCosmos](https://github.com/Azure/azure-cosmosdb-spark)
+ [AzureSQL](https://learn.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver16)
+ [BigQuery](https://cloud.google.com/dataproc/docs/tutorials/bigquery-connector-spark-example)
+ [OpenSearch](https://github.com/opensearch-project/opensearch-hadoop/blob/main/USER_GUIDE.md#apache-spark)
+ [Snowflake](https://docs.snowflake.com/en/user-guide/spark-connector-use#setting-configuration-options-for-the-connector)
+ [Vertica](https://github.com/vertica/spark-connector)

### 例
<a name="aws-glue-programming-etl-connect-dataframe-examples"></a>

次の例では、PostgreSQL から読み取り、SnowFlake に書き込みます。

**Python**  
例:

```
from awsglue.context import GlueContext
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

dataSourceClassName = "jdbc"
dataSourceOptions = {
  "driver": "org.postgresql.Driver",
  "url": "<url>",
  "user": "<user>",
  "password": "<password>",
  "dbtable": "<dbtable>",
}

dataframe = spark.read.format(className).options(**options).load()

dataSinkClassName = "net.snowflake.spark.snowflake"
dataSinkOptions = {
  "sfUrl": "<url>",
  "sfUsername": "<username>",
  "sfPassword": "<password>",
  "sfDatabase" -> "<database>",                              
  "sfSchema" -> "<schema>",                       
  "sfWarehouse" -> "<warehouse>"  
}

dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
```

**Scala**  
例:

```
import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()

val dataSourceClassName = "jdbc"
val dataSourceOptions = Map(
  "driver" -> "org.postgresql.Driver",
  "url" -> "<url>",
  "user" -> "<user>",
  "password" -> "<password>",
  "dbtable" -> "<dbtable>"
)

val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load()

val dataSinkClassName = "net.snowflake.spark.snowflake"
val dataSinkOptions = Map(
  "sfUrl" -> "<url>",
  "sfUsername" -> "<username>",
  "sfPassword" -> "<password>",
  "sfDatabase" -> "<database>",
  "sfSchema" -> "<schema>",
  "sfWarehouse" -> "<warehouse>"
)

dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()
```

## カスタムと AWS Marketplace での、connectionType の値
<a name="aws-glue-programming-etl-connect-market"></a>

これには以下が含まれます。
+ `"connectionType": "marketplace.athena"`: Amazon Athena データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。
+ `"connectionType": "marketplace.spark"`: Apache Spark データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。
+ `"connectionType": "marketplace.jdbc"`: JDBC データストアへの接続を指定します。この接続では、AWS Marketplace が提供するコネクタを使用します。
+ `"connectionType": "custom.athena"`: Amazon Athena データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。
+ `"connectionType": "custom.spark"`: Apache Spark データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。
+ `"connectionType": "custom.jdbc"`: JDBC データストアへの接続を指定します。接続には、AWS Glue Studio にアップロードしたカスタムコネクタを使用します。

### custom.jdbc または marketplace.jdbc 型での接続オプション
<a name="marketplace-jdbc-connect-options"></a>
+ `className` – 文字列、必須、JDBC ドライバーの完全修飾 Java クラス名 (`com.mysql.cj.jdbc.Driver` など)。カスタムコネクタまたは AWS Marketplace JDBC コネクタを使用する場合、このオプションを指定する必要があります。このオプションを省略した場合、パラメータ化された JDBC URL を使用すると、ジョブが `No suitable driver` エラーで失敗する可能性があります。
+ `connectionName` – (必須) コネクタに関連付けられている接続の名前を示す文字列。
+ `url` – (必須) データソースへの接続を構築するために使用される、プレースホルダを含む JDBC URL (`${}`) を示す文字列。プレースホルダー `${secretKey}` は、AWS Secrets Manager 内にある同じ名前のシークレットにより置き換えられます。URL の構築の詳細については、データストアのドキュメントを参照してください。
+ `secretId` または `user/password` – (必須) URL の認証情報を取得するために使用される文字列。
+ `dbTable`または`query` – (必須) データを取得するテーブルまたは SQL クエリを示す文字列。`dbTable` または `query` を指定できます。両方を指定することはできません。
+ `partitionColumn` – (オプション) パーティション化に使用される整数カラムの名前を示す文字列。このオプションは、`lowerBound`、`upperBound`、および `numPartitions` に含まれている場合にのみ機能します。このオプションの機能は、Spark SQL JDBC リーダーのものと同様です。詳細については、*Apache Spark SQL, DataFrames and Datasets Guide* の「[JDBC To Other Databases](https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html)」を参照してください。

  `lowerBound` および `upperBound` 値は、パーティションのストライドを決定するために使用されます (テーブル内の行のフィルタリングには使用しません)。返されるテーブル内のすべての行は、パーティション化されています。
**注記**  
テーブル名の代わりにクエリを使用する場合は、指定されたパーティショニング条件でクエリが動作することを確認する必要があります。例:   
`"SELECT col1 FROM table1"` の形式のクエリでパーティション列を使用する場合、末尾に `WHERE` 句を追加してそのクエリをテストします。
クエリ形式が `SELECT col1 FROM table1 WHERE col2=val"` の場合は、`WHERE` 句を `AND` とパーティション列を使用する式で拡張することで、そのクエリをテストします。
+ `lowerBound` – パーティションストライドを決定するために使用される `partitionColumn` の最小値を示す整数 (オプション)。
+ `upperBound` – パーティションストライドを決定するために使用される `partitionColumn` の最大値を示す整数 (オプション)。
+ `numPartitions` – パーティション数を示す整数 (オプション)。この値は、(範囲に含まれる) `lowerBound` と (範囲に含まれない) `upperBound` とともに使用され、`partitionColumn` の分割で使用するために生成された `WHERE` 句の式のための、パーティションストライドを形成します。
**重要**  
パーティションが多すぎると、外部データベースシステムで問題が発生する可能性があるため、パーティションの数には注意を払ってください。
+ `filterPredicate` – ソースからのデータをフィルタリングする、追加の条件句を示す文字列 (オプション)。例: 

  ```
  BillingCity='Mountain View'
  ```

  *table* 名の代わりに *query* を使用した場合は、指定された `filterPredicate` でクエリが動作することを確認します。例: 
  + クエリの形式が `"SELECT col1 FROM table1"` の場合は、フィルタ述語を使用するクエリの末尾に `WHERE` 句を追加して、そのクエリをテストします。
  + クエリ形式が `"SELECT col1 FROM table1 WHERE col2=val"` の場合は、`WHERE` 句を `AND` およびフィルター述語を使用する式で拡張して、そのクエリをテストします。
+ `dataTypeMapping` – (ディクショナリ、オプション) **JDBC** データ型 から **Glue** データ型に対するマッピングを構築する、カスタムのデータ型マッピング。例えば、オプション `"dataTypeMapping":{"FLOAT":"STRING"}` では、ドライバーの `ResultSet.getString()` メソッドを呼び出すことで、JDBC タイプ `FLOAT` のデータフィールドが Java `String` 型にマッピングされ、それを使用して AWS Glue レコードが構築されます。`ResultSet` オブジェクトは各ドライバによって実装されるため、その動作は使用するドライバにより決定されます。ドライバによる変換の実行方法については、JDBC ドライバのドキュメントを参照してください。
+ AWS Glue で現在サポートされているデータ型は次のとおりです。
  + DATE
  + STRING
  + TIMESTAMP
  + INT
  + FLOAT
  + LONG
  + BIGDECIMAL
  + BYTE
  + SHORT
  + DOUBLE

   JDBC データ型としては、[Java8 java.sql.types](https://docs.oracle.com/javase/8/docs/api/java/sql/Types.html) がサポートされています。

  デフォルトの (JDBC から AWS Glue への) データ型マッピングは以下のとおりです。
  +  DATE -> DATE
  +  VARCHAR -> STRING
  +  CHAR -> STRING
  +  LONGNVARCHAR -> STRING
  +  TIMESTAMP -> TIMESTAMP
  +  INTEGER -> INT
  +  FLOAT -> FLOAT
  +  REAL -> FLOAT
  +  BIT -> BOOLEAN
  +  BOOLEAN -> BOOLEAN
  +  BIGINT -> LONG
  +  DECIMAL -> BIGDECIMAL
  +  NUMERIC -> BIGDECIMAL
  +  TINYINT -> SHORT
  +  SMALLINT -> SHORT
  +  DOUBLE -> DOUBLE

  カスタムのデータタイプマッピングで `dataTypeMapping` オプションを使用すると、デフォルトのデータ型マッピングをオーバーライドできます。この影響を受けるのは、`dataTypeMapping` オプションでリストされた JDBC データ型のみです。他のすべての JDBC データ型に対しては、デフォルトのマッピングが使用されます。必要に応じて、別の JDBC データ型のマッピングを追加することも可能です。デフォルトまたはカスタムのマッピングのいずれにも JDBC データ型が含まれていない場合、データ型はデフォルトで AWS Glue`STRING` データ型に変換されます。

次の Python コード例は、AWS Marketplace のJDBC ドライバーを使用して、JDBC データベースからの読み取りを実行する方法を示しています。ここでは、データベースからの読み取りと、S3 ロケーションへの書き込みの方法を知ることができます。

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.jdbc", connection_options = 
     {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, 
       name, department from department where id < 200","numPartitions":"4",
       "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"},
        transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},
      "upperBound":"200","query":"select id, name, department from department where 
       id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0",
       "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0")
    ## @type: ApplyMapping
    ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string",
      "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"]
    ## @return: Transform0
    ## @inputs: [frame = DataSource0]
    Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string",
      "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], 
       transformation_ctx = "Transform0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
     "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = Transform0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, 
      connection_type = "s3", format = "json", connection_options = {"path": 
      "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

### custom.athena または marketplace.athena タイプ用の接続オプション
<a name="marketplace-athena-connect-options"></a>
+ `className` – (必須) ドライバクラス名を示す文字列。Athena-CloudWatch コネクタを使用している場合、このパラメータ値はクラス名にプレフィックスされます (例: `"com.amazonaws.athena.connectors"`)。Athena-CloudWatch コネクタは、メタデータハンドラーとレコードハンドラーの 2 つのクラスで構成されています。共通のプレフィックスを指定することで、API がそのプレフィックスに基づいた適切なクラスをロードします。
+ `tableName` – (必須) 読み込む CloudWatch ログストリームの名前を示す文字列。このコードスニペットでは、ビューに特別な名前 `all_log_streams` を使用しています。この場合、返された動的データフレームには、ロググループ内のすべてのログストリームからのデータが含まれます。
+ `schemaName` – (必須) 読み取りのソースとなる CloudWatch ロググループの名前を示す文字列。例えば、`/aws-glue/jobs/output`。
+ `connectionName` – (必須) コネクタに関連付けられている接続の名前を示す文字列。

このコネクタの追加オプションについては、GitHub の [Amazon Athena CloudWatch Connector README](https://github.com/awslabs/aws-athena-query-federation/tree/master/athena-cloudwatch) ファイルを参照してください。

次の Python コード例は、AWS Marketplace コネクタを使用しながら、Athena データストアからの読み取りを実行する方法を示しています。こここでは、Athena からの読み取りと、S3 ロケーションへの書き込みを行う方法を知ることができます。

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.athena", connection_options = 
     {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output",
      "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.athena", connection_options = {"tableName":"all_log_streams",,
      "schemaName":"/aws-glue/jobs/output","connectionName":
      "test-connection-athena"}, transformation_ctx = "DataSource0")
    ## @type: ApplyMapping
    ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string",
      "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"]
    ## @return: Transform0
    ## @inputs: [frame = DataSource0]
    Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string",
      "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], 
       transformation_ctx = "Transform0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
     "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = Transform0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, 
      connection_type = "s3", format = "json", connection_options = {"path": 
      "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

### custom.spark または marketplace.spark タイプ用の接続オプション
<a name="marketplace-spark-connect-options"></a>
+ `className` – (必須) コネクタのクラス名を支援す文字列。
+ `secretId` – (オプション) コネクタ接続の認証情報を取得するために使用される文字列。
+ `connectionName` – (必須) コネクタに関連付けられている接続の名前を示す文字列。
+ その他のオプションは、データストアによって異なります。例えば、[Elasticsearch for Apache Hadoop](https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html) ドキュメントの説明にあるように、OpenSearch の設定オプションは「`es`」でプレフィックスされます。Spark から Snowflake への接続では、*Connecting to Snowflake* ガイドの「[Using the Spark Connector](https://docs.snowflake.com/en/user-guide/spark-connector-use.html)」で説明されているように、`sfUser` および `sfPassword` のオプションを使用します。

次の Python コード例に、`marketplace.spark` 接続を使用して、OpenSearch のデータストアからの読み取りを実行する方法を示します。

```
    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
     
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
     
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    ## @type: DataSource
    ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test",
      "es.nodes.wan.only":"true","es.nodes":"https://{{<AWS endpoint>}}",
      "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"]
    ## @return: DataSource0
    ## @inputs: []
    DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = 
      "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only":
      "true","es.nodes":"https://{{<AWS endpoint>}}","connectionName":
      "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0")
    ## @type: DataSink
    ## @args: [connection_type = "s3", format = "json", connection_options = {"path": 
         "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0"]
    ## @return: DataSink0
    ## @inputs: [frame = DataSource0]
    DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, 
       connection_type = "s3", format = "json", connection_options = {"path": 
       "s3://{{<S3 path>}}/", "partitionKeys": []}, transformation_ctx = "DataSink0")
    job.commit()
```

## 汎用オプション
<a name="aws-glue-programming-etl-connect-general-options"></a>

このセクションのオプションは `connection_options` として提供されていますが、特定のコネクタには当てはまりません。

以下のパラメータは、ブックマークを設定する際に一般的に使用されます。Amazon S3 または JDBC ワークフローに適用される場合があります。詳細については、「[ジョブのブックマークを使用する](programming-etl-connect-bookmarks.md)」を参照してください。
+ `jobBookmarkKeys` - 列名の配列。
+ `jobBookmarkKeysSortOrder` — ソート順序に基づいて値を比較する方法を定義する文字列。有効な値: `"asc"`、`"desc"`。
+ `useS3ListImplementation` — Amazon S3 バケットの内容を一覧表示する際のメモリパフォーマンスの管理に使用されます。詳しくは、「[Optimize memory management in AWS Glue](https://aws.amazon.com/blogs/big-data/optimize-memory-management-in-aws-glue/)」を参照してください。