Lavorare con connettori origine dati per Apache Spark - Amazon Athena

Lavorare con connettori origine dati per Apache Spark

Alcuni connettori di origine dati Athena sono disponibili come connettori Spark DSV2. I nomi dei connettori Spark DSV2 hanno come suffisso -dsv2 (ad esempio athena-dynamodb-dsv2).

Di seguito sono riportati i connettori DSV2 attualmente disponibili, il relativo nome della classe .format() Spark e i link alla documentazione corrispondente di Amazon Athena Federated Query:

Connettore DSV2 Spark .format() nome classe Documentazione
athena-cloudwatch-dsv2 com.amazonaws.athena.connectors.dsv2.cloudwatch.CloudwatchTableProvider CloudWatch
athena-cloudwatch-metrics-dsv2 com.amazonaws.athena.connectors.dsv2.cloudwatch.metrics.CloudwatchMetricsTableProvider Metriche CloudWatch
athena-aws-cmdb-dsv2 com.amazonaws.athena.connectors.dsv2.aws.cmdb.AwsCmdbTableProvider CMDB
athena-dynamodb-dsv2 com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider DynamoDB

Per scaricare i file .jar dei connettori DSV2, visita la pagina GitHub DSV2 di Amazon Athena Query Federation e consulta la sezione Rilasci, <versione> di rilascio, sezione Asset.

Specificare il jar a Spark

Per utilizzare i connettori Athena DSV2 con Spark, devi inviare il file .jar del connettore all'ambiente Spark che stai utilizzando. Le sezioni seguenti descrivono casi specifici.

Athena per Spark

Per informazioni sull'aggiunta di file .jar personalizzati e configurazioni personalizzate ad Amazon Athena per Apache Spark, consulta Utilizzare le proprietà Spark per specificare una configurazione personalizzata.

General Spark

Per passare il file .jar del connettore a Spark, usa il comando spark-submit e specifica il file .jar nell'opzione --jars, come nell'esempio seguente:

spark-submit \ --deploy-mode cluster \ --jars https://github.com/awslabs/aws-athena-query-federation-dsv2/releases/download/some_version/athena-dynamodb-dsv2-some_version.jar

Spark di Amazon EMR

Per eseguire un comando spark-submit con il parametro --jars su Amazon EMR, devi aggiungere un passaggio al cluster Spark di Amazon EMR. Per informazioni dettagliate su come utilizzare spark-submit su Amazon EMR, consulta la Aggiungi fase Spark nella Guida di rilascio di Amazon EMR.

AWS Glue ETL Spark

Per AWS Glue ETL, puoi passare l'URL GitHub.com del file .jar all'argomento --extra-jars del comando aws glue start-job-run. La documentazione AWS Glue descrive il parametro --extra-jars come se avesse un percorso Amazon S3, ma il parametro può anche avere un URL HTTPS. Per ulteriori informazioni, consulta la Documentazione di riferimento dei parametri dei processi nella Guida per gli sviluppatori di AWS Glue.

Eseguire query del connettore su Spark

Per inviare l'equivalente della tua query esistente federata di Athena su Apache Spark, utilizza la funzione spark.sql(). Ad esempio, supponi che intendi utilizzare la seguente query Athena su Apache Spark.

SELECT somecola, somecolb, somecolc FROM ddb_datasource.some_schema_or_glue_database.some_ddb_or_glue_table WHERE somecola > 1

Per eseguire la stessa query su Spark utilizzando il connettore DynamoDB DSV2 di Amazon Athena, utilizza il codice seguente:

dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load()) dynamoDf.createOrReplaceTempView("ddb_spark_table") spark.sql(''' SELECT somecola, somecolb, somecolc FROM ddb_spark_table WHERE somecola > 1 ''')

Specificare i parametri

Le versioni DSV2 dei connettori di origine dati Athena utilizzano gli stessi parametri dei connettori corrispondenti di origine dati Athena. Per informazioni sui parametri, consulta la documentazione del connettore corrispondente di origine dati Athena.

Nel codice PySpark, utilizza la seguente sintassi per configurare i parametri.

spark.read.option("athena.connectors.conf.parameter", "value")

Ad esempio, il codice seguente imposta il parametro disable_projection_and_casing del connettore DynamoDB di Amazon Athena su always.

dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .option("athena.connectors.conf.disable_projection_and_casing", "always") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load())