Arbeiten mit Datenquellenkonnektoren für Apache Spark
Einige Athena-Datenquellenkonnektoren sind als Spark-DSV2-Konnektoren verfügbar. Die Namen der Spark-DSV2-Konnektoren haben ein -dsv2-Suffix (z. B. athena-dynamodb-dsv2).
Im Folgenden finden Sie die derzeit verfügbaren DSV2-Konnektoren, ihren .format()-Spark-Klassennamen und Links zur entsprechenden Amazon-Athena-Verbundabfragen-Dokumentation:
| DSV2-Konnektor | Spark .format() class name | Dokumentation |
|---|---|---|
| athena-cloudwatch-dsv2 | com.amazonaws.athena.connectors.dsv2.cloudwatch.CloudwatchTableProvider |
CloudWatch |
| athena-cloudwatch-metrics-dsv2 | com.amazonaws.athena.connectors.dsv2.cloudwatch.metrics.CloudwatchMetricsTableProvider |
CloudWatch-Metriken |
| athena-aws-cmdb-dsv2 | com.amazonaws.athena.connectors.dsv2.aws.cmdb.AwsCmdbTableProvider |
CMDB |
| athena-dynamodb-dsv2 | com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider |
DynamoDB |
Um .jar-Dateien für die DSV2-Konnektoren herunterzuladen, besuchen Sie die GitHub-Seite Amazon-Athena-Verbundabfragen-DSV2<version>, Assets an.
JAR für Spark angeben
Um die Athena-DSV2-Konnektoren mit Spark zu verwenden, senden Sie die .jar-Datei für den Konnektor an die Spark-Umgebung, die Sie verwenden. In den folgenden Abschnitten werden spezifische Fälle beschrieben.
Athena für Spark
Informationen zum Hinzufügen von benutzerdefinierten .jar-Dateien und zur benutzerdefinierten Konfiguration zu Amazon Athena für Apache Spark finden Sie unter Spark-Eigenschaften verwenden um eine benutzerdefinierte Konfiguration anzugeben.
General Spark
Um die .jar-Konnektor-Datei an Spark zu übergeben, verwenden Sie den spark-submit-Befehl und geben Sie die Datei .jar in der Option --jars an, wie im folgenden Beispiel:
spark-submit \ --deploy-mode cluster \ --jars https://github.com/awslabs/aws-athena-query-federation-dsv2/releases/download/some_version/athena-dynamodb-dsv2-some_version.jar
Amazon EMR Spark
Um einen spark-submit-Befehl mit dem Parameter --jars auf Amazon EMR auszuführen, müssen Sie Ihrem Amazon-EMR-Spark-Cluster einen Schritt hinzufügen. Einzelheiten zur Verwendung von spark-submit auf Amazon EMR finden Sie unter Spark-Schritt hinzufügen im Amazon-EMR-Versionshandbuch.
AWS Glue ETL Spark
Für AWS Glue ETL können Sie die GitHub.com-URL der .jar-Datei an das --extra-jars-Argument des aws glue start-job-run-Befehls übergeben. In der AWS Glue-Dokumentation wird beschrieben, dass der Parameter --extra-jars einen Amazon-S3-Pfad verwendet, aber der Parameter kann auch eine HTTPS-URL verwenden. Weitere Informationen finden Sie in der Aufgabenparameter-Referenz im AWS Glue-Entwicklerhandbuch.
Den Konnektor auf Spark abfragen
Verwenden Sie die Funktion spark.sql(), um das Äquivalent Ihrer vorhandenen Athena-Verbundabfrage auf Apache Spark einzureichen. Nehmen wir beispielsweise an, die folgende Athena-Abfrage wurde mit Apache Spark erstellt.
SELECT somecola, somecolb, somecolc FROM ddb_datasource.some_schema_or_glue_database.some_ddb_or_glue_table WHERE somecola > 1
Verwenden Sie den folgenden Code, um dieselbe Abfrage auf Spark mithilfe des Konnektors von Amazon Athena DynamoDB DSV2 durchzuführen:
dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load()) dynamoDf.createOrReplaceTempView("ddb_spark_table") spark.sql(''' SELECT somecola, somecolb, somecolc FROM ddb_spark_table WHERE somecola > 1 ''')
Angeben von Parametern
Die DSV2-Versionen der Athena-Datenquellenkonnektoren verwenden dieselben Parameter wie die entsprechenden Athena-Datenquellen-Konnektoren. Informationen zu den Parametern finden Sie in der Dokumentation für den entsprechenden Athena-Datenquellenkonnektor.
Verwenden Sie in Ihrem PySpark-Code die folgende Syntax, um Ihre Parameter zu konfigurieren.
spark.read.option("athena.connectors.conf.parameter", "value")
Der folgende Code setzt zum Beispiel den Parameter des Amazon-Athena-DynamoDB-Konnektors disable_projection_and_casing auf always.
dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .option("athena.connectors.conf.disable_projection_and_casing", "always") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load())