Utilisation des connecteurs de source de données pour Apache Spark
Certains connecteurs de source de données Athena sont disponibles sous forme de connecteurs Spark DSV2. Les noms des connecteurs Spark DSV2 ont un suffixe -dsv2 (par exemple,athena-dynamodb-dsv2).
Voici ci-dessous les connecteurs DSV2 actuellement disponibles, le nom de leur classe .format() Spark et les liens vers leur documentation Amazon Athena Federated Query correspondante :
| Connecteur DSV2 | Nom de classe .format () Spark | Documentation |
|---|---|---|
| athena-cloudwatch-dsv2 | com.amazonaws.athena.connectors.dsv2.cloudwatch.CloudwatchTableProvider |
CloudWatch |
| athena-cloudwatch-metrics-dsv2 | com.amazonaws.athena.connectors.dsv2.cloudwatch.metrics.CloudwatchMetricsTableProvider |
Métriques CloudWatch |
| athena-aws-cmdb-dsv2 | com.amazonaws.athena.connectors.dsv2.aws.cmdb.AwsCmdbTableProvider |
CMDB |
| athena-dynamodb-dsv2 | com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider |
DynamoDB |
Pour télécharger des fichiers .jar pour les connecteurs DSV2, rendez-vous sur la page GitHub DSV2 d'Amazon Athena Query Federation<version>, Assets.
Spécification du fichier jar dans Spark
Pour utiliser les connecteurs Athena DSV2 avec Spark, vous devez transmettre le fichier .jar du connecteur à l'environnement Spark que vous utilisez. Les sections suivantes décrivent les cas spécifiques.
Athena pour Spark
Pour plus d'informations sur l'ajout de fichiers .jar personnalisés et d'une configuration personnalisée dans Amazon Athena pour Apache Spark, consultez. Utilisation de propriétés Spark pour spécifier une configuration personnalisée
General Spark
Pour transmettre le fichier .jar du connecteur à Spark, utilisez la commande spark-submit et spécifiez le fichier .jar dans l'option --jars, comme dans l'exemple suivant :
spark-submit \ --deploy-mode cluster \ --jars https://github.com/awslabs/aws-athena-query-federation-dsv2/releases/download/some_version/athena-dynamodb-dsv2-some_version.jar
Amazon EMR Spark
Pour exécuter une commande spark-submit avec le paramètre --jars sur Amazon EMR, vous devez ajouter une étape à votre cluster Amazon EMR Spark. Pour en savoir plus sur l'utilisation de spark-submit sur Amazon EMR, consultez Ajouter une étape Spark dans le Guide de version d'Amazon EMR.
AWS Glue ETL Spark
Pour AWS Glue ETL, vous pouvez transmettre l'URL GitHub.com du fichier .jar à l'argument --extra-jars de la commande aws glue start-job-run. La documentation AWS Glue décrit le paramètre --extra-jars comme empruntant un chemin Amazon S3, mais il peut également prendre une URL HTTPS. Pour plus d'informations, consultez la référence du paramètre de tâche dans le Guide du développeur AWS Glue.
Interrogation du connecteur sur Spark
Pour envoyer l'équivalent de votre requête fédérée Athena existante sur Apache Spark, utilisez la fonction spark.sql(). Supposons par exemple que vous ayez la requête Athena que vous souhaitez utiliser sur Apache Spark.
SELECT somecola, somecolb, somecolc FROM ddb_datasource.some_schema_or_glue_database.some_ddb_or_glue_table WHERE somecola > 1
Pour exécuter la même requête sur Spark à l'aide du connecteur Amazon Athena DynamoDB DSV2, utilisez le code suivant :
dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load()) dynamoDf.createOrReplaceTempView("ddb_spark_table") spark.sql(''' SELECT somecola, somecolb, somecolc FROM ddb_spark_table WHERE somecola > 1 ''')
Specify parameters
Les versions DSV2 des connecteurs de source de données Athena utilisent les mêmes paramètres que les connecteurs de sources de données Athena correspondants. Pour obtenir des informations sur les paramètres, reportez-vous à la documentation du connecteur de source de données Athena correspondant.
Dans votre code PySpark, utilisez la syntaxe suivante pour configurer vos paramètres.
spark.read.option("athena.connectors.conf.parameter", "value")
Par exemple, le code suivant définit le paramètre disable_projection_and_casing du connecteur Amazon Athena DynamoDB sur always.
dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .option("athena.connectors.conf.disable_projection_and_casing", "always") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load())