Tipi e opzioni di connessione per ETL in AWS Glue per Spark - AWS Glue

Tipi e opzioni di connessione per ETL in AWS Glue per Spark

In AWS Glue per Spark, diversi metodi e trasformazioni PySpark e Scala specificano il tipo di connessione utilizzando un parametro connectionType. Specificano le opzioni di connessione utilizzando un parametro connectionOptions o options.

Il parametro connectionType può assumere i valori indicati nella tabella seguente. I valori dei parametri associati connectionOptions (o options) per ciascun tipo sono documentati nelle sezioni seguenti. Salvo indicazione contraria, i parametri si applicano quando la connessione viene utilizzata come sorgente o sink.

Per il codice di esempio che illustra l'impostazione e l'utilizzo delle opzioni di connessione, consulta la home page per ogni tipo di connessione.

connectionType Si connette a
dynamodb Amazon DynamoDB database
kinesis Flusso di dati Amazon Kinesis
s3 Amazon S3
documentdb Amazon DocumentDB (compatibile con MongoDB) database
opensearch Servizio OpenSearch di Amazon.
redshift Database Amazon Redshift
kafka Kafka o Amazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos per NoSQL.
azuresql Azure SQL.
bigquery Google BigQuery.
mongodb Database MongoDB, incluso MongoDB Atlas.
sqlserver Microsoft SQL Server database (vedere Connessioni JDBC)
mysql MySQL database (vedere Connessioni JDBC)
oracle Oracle database (vedere Connessioni JDBC)
postgresql PostgreSQL database (vedere Connessioni JDBC)
saphana SAP HANA.
snowflake Data lake Snowflake
teradata Teradata Vantage.
vertica Vertica.
personalizzato.* Archivi dati Spark, Athena o JDBC (consulta Valori di personalizzazione e connectionType Marketplace AWS
marketplace.* Archivi dati Spark, Athena o JDBC (consulta Valori di personalizzazione e connectionType Marketplace AWS)

Opzioni DataFrame per ETL in AWS Glue 5.0 per Spark

Un DataFrame è un set di dati organizzato in modo simile a una tabella e supporta operazioni di tipo funzionale (mappatura, riduzione, filtro e così via) e operazioni SQL (selezione, proiezione, aggregazione).

Per creare un DataFrame per un'origine dati supportata da Glue, è necessario quanto segue:

  • Connettore dell'origine dati ClassName

  • Connessione dell'origine dati Options

Allo stesso modo, per scrivere un DataFrame in un data sink supportato da Glue, sono necessari gli stessi elementi:

  • Connettore del data sink ClassName

  • Connessione del data sink Options

Attenzione: alcune funzionalità di AWS Glue, come i segnalibri di processo, e opzioni DynamicFrame, come connectionName, non sono supportate in DataFrame. Per maggiori informazioni su DataFrame e sulle operazioni supportate, consultare la documentazione Spark relativa a DataFrame.

Specificare il connettore ClassName

Per specificare il connettore ClassName di un'origine dati o di un data sink, utilizzare l'opzione .format per indicare il connettore ClassName corrispondente che definisce l'origine dati o il data sink.

Connettori JDBC

Per i connettori JDBC, specificare jdbc come valore dell'opzione .format e indicare il driver JDBC ClassName nell'opzione driver.

df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...

La seguente tabella riporta il driver JDBC ClassName dell'origine dati supportata in AWS Glue per DataFrame.

Origine dati ClassName del driver
PostgreSQL org.postgresql.Driver
Oracle oracle.jdbc.driver.OracleDriver
SQLServer com.microsoft.sqlserver.jdbc.SQLServerDriver
MySQL com.mysql.jdbc.Driver
SAPHana com.sap.db.jdbc.Driver
Teradata com.teradata.jdbc.TeraDriver
Connettori Spark

Per i connettori Spark, specificare il ClassName del connettore come valore dell'opzione .format.

df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...

La seguente tabella riporta il connettore Spark ClassName dell'origine dati supportata in AWS Glue per DataFrame.

Origine dati ClassName
MongoDB/DocumentDB glue.spark.mongodb
Redshift io.github.spark_redshift_community.spark.redshift
AzureCosmos cosmos.oltp
AzureSQL com.microsoft.sqlserver.jdbc.spark
BigQuery com.google.cloud.spark.bigquery
OpenSearch org.opensearch.spark.sql
Snowflake net.snowflake.spark.snowflake
Vertica com.vertica.spark.datasource.VerticaSource

Specificare le opzioni di connessione

Per specificare le Options della connessione a un'origine dati o a un data sink, utilizzare .option(<KEY>, <VALUE>) per indicare singole opzioni oppure .options(<MAP>) per indicare più opzioni come mappa chiave-valore.

Ogni origine dati o data sink supporta il proprio set di Options di connessione. Per ulteriori informazioni sulle Options disponibili, fare riferimento alla documentazione pubblica del connettore Spark specifico dell'origine dati o del data sink riportato nella seguente tabella.

Esempi

Gli esempi riportati di seguito leggono da PostgreSQL e scrivono in SnowFlake:

Python

Esempio:

from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala

Esempio:

import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()

Valori di personalizzazione e connectionType Marketplace AWS

Questi sono i seguenti:

  • "connectionType": "marketplace.athena": designa una connessione a un archivio dati Amazon Athena. La connessione utilizza un connettore di Marketplace AWS.

  • "connectionType": "marketplace.spark": designa una connessione a un archivio dati Apache Spark. La connessione utilizza un connettore di Marketplace AWS.

  • "connectionType": "marketplace.jdbc": designa una connessione a un archivio dati JDBC. La connessione utilizza un connettore di Marketplace AWS.

  • "connectionType": "custom.athena": designa una connessione a un archivio dati Amazon Athena. La connessione utilizza un connettore personalizzato che va caricato in AWS Glue Studio.

  • "connectionType": "custom.spark": designa una connessione a un archivio dati Apache Spark. La connessione utilizza un connettore personalizzato che va caricato in AWS Glue Studio.

  • "connectionType": "custom.jdbc": designa una connessione a un archivio dati JDBC. La connessione utilizza un connettore personalizzato che va caricato in AWS Glue Studio.

Opzioni di connessione per il tipo custom.jdbc o marketplace.jdbc

  • className: stringa, obbligatorio, nome della classe driver.

  • connectionName: stringa, obbligatorio, nome della connessione associata al connettore.

  • url: stringa, obbligatorio, URL JDBC con segnaposto (${}) che vengono utilizzati per creare la connessione all'origine dati. Il segnaposto ${secretKey} viene sostituito con il segreto con lo stesso nome in AWS Secrets Manager. Per ulteriori informazioni sulla creazione dell'URL, fare riferimento alla documentazione dell'archivio dati.

  • secretId o user/password: stringa, obbligatorio, utilizzato per recuperare le credenziali per l'URL.

  • dbTable o query: stringa, obbligatorio, la tabella o la query SQL da cui ottenere i dati. Puoi specificare dbTable o query, ma non entrambi.

  • partitionColumn: stringa, facoltativo, il nome di una colonna intera utilizzata per il partizionamento. Questa opzione funziona solo quando è inclusa con lowerBound, upperBound e numPartitions. Questa opzione funziona allo stesso modo del lettore Spark SQL JDBC. Per ulteriori informazioni, consulta Da JDBC ad altri database nel manuale Apache Spark SQL, DataFrames and Datasets Guide.

    I valori lowerBound e upperBound vengono utilizzati per decidere lo stride della partizione, non per filtrare le righe nella tabella. Tutte le righe della tabella vengono partizionate e restituite.

    Nota

    Quando si utilizza una query anziché un nome di tabella, è necessario verificare che la query funzioni con la condizione di partizionamento specificata. Ad esempio:

    • Se il formato della query è "SELECT col1 FROM table1", testa la query aggiungendo una clausola WHERE alla fine della query che utilizza la colonna della partizione.

    • Se il formato della query è "SELECT col1 FROM table1 WHERE col2=val", testa la query estendendo la clausola WHERE con AND e un'espressione che utilizza la colonna della partizione.

  • lowerBound: intero, facoltativo, il valore minimo di partitionColumn che viene utilizzato per decidere lo stride della partizione.

  • upperBound: intero, facoltativo, il valore massimo di partitionColumn che viene utilizzato per decidere lo stride della partizione.

  • numPartitions: intero, facoltativo, il numero di partizioni. Questo valore, insieme a lowerBound (incluso) e upperBound (escluso), forma stride di partizione per espressioni con le clausole WHERE generate che vengono utilizzate per dividere la partitionColumn.

    Importante

    Presta attenzione al numero di partizioni perché troppe partizioni potrebbero causare problemi nei sistemi di database esterni.

  • filterPredicate: stringa, opzionale, clausola condizione extra per filtrare i dati dall'origine. Ad esempio:

    BillingCity='Mountain View'

    Quando si utilizza una query anziché un nome di table, è necessario verificare che la query funzioni con il filterPredicate specificato. Ad esempio:

    • Se il formato della query è "SELECT col1 FROM table1", testa la query aggiungendo una clausola WHERE alla fine della query che utilizza il predicato filtro.

    • Se il formato della query è "SELECT col1 FROM table1 WHERE col2=val", testa la query estendendo la clausola WHERE con AND e un'espressione che utilizza il predicato filtro.

  • dataTypeMapping: dizionario, opzionale, mappatura del tipo di dati personalizzata che crea una mappatura da un tipo di dati JDBC a un tipo di dati Glue. Ad esempio, l'opzione "dataTypeMapping":{"FLOAT":"STRING"} mappa i campi di dati di tipo JDBC FLOAT nel tipo Java String chiamando il metodo ResultSet.getString() del driver e lo usa per costruire registri di AWS Glue. L'oggetto ResultSet viene implementato da ciascun driver, quindi il comportamento è specifico del driver utilizzato. Consulta la documentazione relativa al driver JDBC per capire come il driver esegue le conversioni.

  • I tipi di dati AWS Glue correntemente supportati sono:

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    I tipi di dati JDBC supportati sono Java8 java.sql.types.

    Le mappature di default dei tipi di dati (da JDBC a AWS Glue) sono:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    Se si utilizza un mapping del tipo di dati personalizzato con l'opzione dataTypeMapping, è possibile sovrascrivere una mappatura di default del tipo di dati. Sono interessati solo i tipi di dati JDBC elencati nell'opzione dataTypeMapping; per tutti gli altri tipi di dati JDBC viene utilizzata la mappatura di default. Se necessario, è possibile aggiungere mappature per tipi di dati JDBC aggiuntivi. Se un tipo di dati JDBC non è incluso nella mappatura di default o in una mappatura personalizzata, per impostazione predefinita viene convertito nel tipo di dati STRING AWS Glue.

Negli esempi di codice Python riportati di seguito viene illustrato come leggere dai database JDBC con driver JDBC Marketplace AWS. Mostra la lettura da un database e la scrittura in una posizione S3.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opzioni di connessione per il tipo custom.athena o marketplace.athena

  • className – Stringa, obbligatorio, nome della classe driver. Quando si utilizza il connettore Athena-CloudWatch, questo valore di parametro è il prefisso della classe Name (ad esempio, "com.amazonaws.athena.connectors"). Il connettore Athena-CloudWatch è composto da due classi: un gestore di metadati e un gestore di registri. Se si fornisce qui il prefisso comune, l'API carica le classi corrette in base a tale prefisso.

  • tableName: stringa, obbligatorio, il nome del flusso di log CloudWatch da leggere. In questo frammento di codice viene utilizzato il nome della vista speciale all_log_streams, il che significa che il frame di dati dinamico restituito conterrà i dati di tutti i flussi di log nel gruppo di log.

  • schemaName: stringa, obbligatorio, il nome del gruppo di log CloudWatch da cui leggere. Ad esempio, /aws-glue/jobs/output.

  • connectionName – Stringa, obbligatorio, nome della connessione associata al connettore.

Per ulteriori opzioni per questo connettore, consultare il README del connettore Amazon Athena CloudWatch su GitHub.

Il seguente esempio di codice Python mostra come leggere da un archivio dati Athena utilizzando un connettore Marketplace AWS. Mostra la lettura da Athena e la scrittura in una posizione S3.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opzioni di connessione per il tipo custom.spark o marketplace.spark

  • className: stringa, obbligatorio, nome della classe del connettore.

  • secretId: stringa, facoltativo, utilizzato per recuperare le credenziali per la connessione del connettore.

  • connectionName – Stringa, obbligatorio, nome della connessione associata al connettore.

  • Altre opzioni dipendono dall'archivio dati. Ad esempio, le opzioni di configurazione di OpenSearch iniziano con il prefisso es, come descritto nella documentazione di Elasticsearch per Apache Hadoop. Le connessioni Spark a Snowflake utilizzano opzioni come sfUser e sfPassword, come descritto in Using the Spark Connector nella guida Connecting to Snowflake.

Il seguente esempio di codice Python mostra come leggere da un archivio dati OpenSearch utilizzando una connessione marketplace.spark.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opzioni generali

Le opzioni in questa sezione sono fornite come connettore connection_options, ma non si applicano specificamente a tale connettore.

I seguenti parametri vengono generalmente utilizzati per la configurazione dei segnalibri. Possono applicarsi ai flussi di lavoro Amazon S3 o JDBC. Per ulteriori informazioni, consulta Utilizzo di segnalibri di processo.

  • jobBookmarkKeys: un array di nomi di colonna.

  • jobBookmarkKeysSortOrder: una stringa che definisce come confrontare i valori in base all'ordinamento. Valori validi: "asc", "desc".

  • useS3ListImplementation: utilizzato per gestire le prestazioni della memoria quando si elencano i contenuti dei bucket Amazon S3. Per ulteriori informazioni, consulta la pagina Optimize memory management in AWS Glue.