Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Verbindungstypen und Optionen für ETL in AWS Glue für Spark
In AWS Glue für Spark geben verschiedene PySpark- und Scala-Methoden sowie -Transformationen den Verbindungstyp mithilfe eines connectionType-Parameters an. Sie geben Verbindungsoptionen mit einem connectionOptions- oder options-Parameter an.
Der connectionType-Parameter kann die in der folgenden Tabelle angegebenen Werte annehmen. Die zugehörigen connectionOptions- (oder options)-Parameterwerte für jeden Typ sind in den folgenden Abschnitten dokumentiert. Sofern nicht anders angegeben, gelten die Parameter, wenn die Verbindung als Quelle oder Senke verwendet wird.
Beispielcode, der das Festlegen und Verwenden von Verbindungsoptionen veranschaulicht, finden Sie auf der Homepage für jeden Verbindungstyp.
DataFrame-Optionen für ETL in AWS Glue 5.0 für Spark
Ein DataFrame ist ein Datensatz, der in benannte Spalten angeordnet ist, ähnlich wie eine Tabelle. Er unterstützt funktionale Operationen (map/reduce/filter/usw.) und SQL-Operationen (select, project, aggregate).
Um einen DataFrame für eine von Glue unterstützte Datenquelle zu erstellen, ist Folgendes erforderlich:
Datenquellen-Connector
ClassNameDatenquellen-Verbindung
Options
Diese sind ebenfalls erforderlich, um einen DataFrame in eine von Glue unterstützte Datensenke zu schreiben:
Datensenke-Connector
ClassNameDatensenke-Verbindung
Options
Beachten Sie, dass AWS-Glue-Features wie Auftrags-Lesezeichen und DynamicFrame-Optionen wie connectionName in DataFrame nicht unterstützt werden. Weitere Informationen zu DataFrame und den unterstützten Vorgängen finden Sie in der Spark-Dokumentation für DataFrame
Angeben des ClassName-Connectors
Um ClassName einer Datenquelle/Datensenke anzugeben, verwenden Sie die .format-Option, um den entsprechenden ClassName-Connector bereitzustellen, der die Datenquelle/Datensenke definiert.
JDBC-Konnektoren
Geben Sie für JDBC-Connectors jdbc als Wert der Option .format an und geben Sie den JDBC-Treiber ClassName in der driver-Option an.
df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...
In der folgenden Tabelle ist der JDBC-Treiber ClassName der unterstützten Datenquelle in AWS Glue für DataFrames aufgelistet.
| So sieht eine Snowball-Appliance aus. | Treiber ClassName |
|---|---|
| PostgreSQL | org.postgresql.Driver |
| Oracle | oracle.jdbc.driver.OracleDriver |
| SQLServer | com.microsoft.sqlserver.jdbc.SQLServerDriver |
| MySQL | com.mysql.jdbc.Driver |
| SAPHana | com.sap.db.jdbc.Driver |
| Teradata | com.teradata.jdbc.TeraDriver |
Spark-Connectors
Geben Sie für Spark-Connectors den ClassName des Connectors als Wert der .format-Option an.
df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...
In der folgenden Tabelle ist der Spark-Connector ClassName der unterstützten Datenquelle in AWS Glue für DataFrames aufgelistet.
| So sieht eine Snowball-Appliance aus. | ClassName |
|---|---|
| MongoDB/DocumentDB | glue.spark.mongodb |
| Redshift | io.github.spark_redshift_community.spark.redshift |
| AzureCosmos | cosmos.oltp |
| AzureSQL | com.microsoft.sqlserver.jdbc.spark |
| BigQuery | com.google.cloud.spark.bigquery |
| OpenSearch | org.opensearch.spark.sql |
| Snowflake | net.snowflake.spark.snowflake |
| Vertica | com.vertica.spark.datasource.VerticaSource |
Angeben der Verbindungsoptionen
Um die Options der Verbindung zu einer Datenquelle/Datensenke anzugeben, verwenden Sie .option(<KEY>, <VALUE>), um einzelne Optionen bereitzustellen, oder .options(<MAP>), um mehrere Optionen als Schlüssel-Wert-Zuordnung bereitzustellen.
Jede Datenquelle/Datensenke unterstützt einen eigene Gruppe von Options der Verbindung. Einzelheiten zu den verfügbaren Options finden Sie in der öffentlichen Dokumentation für die spezifische Datenquelle/Datensenke des Spark-Connectors, der in der folgenden Tabelle aufgeführt ist.
Beispiele
Mit den folgenden Beispielen wird aus PostgreSQL gelesen und in Snowflake geschrieben:
Python
Beispiel:
from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala
Beispiel:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()
Benutzerdefinierte und AWS Marketplace connectionType-Werte
Diese umfassen u. a. folgende:
-
"connectionType": "marketplace.athena": Bezeichnet eine Verbindung zu einem Amazon-Athena-Datenspeicher. Die Verbindung verwendet einen Konnektor von AWS Marketplace. -
"connectionType": "marketplace.spark": Bezeichnet eine Verbindung zu einem Apache-Spark-Datenspeicher. Die Verbindung verwendet einen Konnektor von AWS Marketplace. -
"connectionType": "marketplace.jdbc": Bezeichnet eine Verbindung zu einem JDBC-Datenspeicher. Die Verbindung verwendet einen Konnektor von AWS Marketplace. -
"connectionType": "custom.athena": Bezeichnet eine Verbindung zu einem Amazon-Athena-Datenspeicher. Die Verbindung verwendet einen benutzerdefinierten Konnektor, den Sie in AWS Glue Studio hochladen. -
"connectionType": "custom.spark": Bezeichnet eine Verbindung zu einem Apache-Spark-Datenspeicher. Die Verbindung verwendet einen benutzerdefinierten Konnektor, den Sie in AWS Glue Studio hochladen. -
"connectionType": "custom.jdbc": Bezeichnet eine Verbindung zu einem JDBC-Datenspeicher. Die Verbindung verwendet einen benutzerdefinierten Konnektor, den Sie in AWS Glue Studio hochladen.
Verbindungsoptionen für den Typ custom.jdbc oder marketplace.jdbc
-
className– Zeichenfolge, erforderlich, Name der Treiberklasse. -
connectionName– Zeichenfolge, erforderlich, Name der Verbindung, die dem Konnektor zugeordnet ist. -
url– Zeichenfolge, erforderlich, JDBC-URL mit Platzhaltern (${}), die verwendet werden, um die Verbindung zur Datenquelle herzustellen. Der Platzhalter${secretKey}wird durch das Secret des gleichen Namens in AWS Secrets Manager ersetzt. Weitere Informationen zum Erstellen der URL finden Sie in der Dokumentation zum Datenspeicher. -
secretIdoderuser/password– Zeichenfolge, erforderlich, zum Abrufen der Anmeldeinformationen für die URL. -
dbTableoderquery– Zeichenfolge, erforderlich, die Tabelle oder SQL-Abfrage, aus der die Daten abgerufen werden. Sie könnendbTableoderqueryangeben, aber nicht beides. -
partitionColumn– Zeichenfolge, optional, der Name einer Ganzzahlspalte, die für die Partitionierung verwendet wird. Diese Option funktioniert nur, wenn sie inlowerBound,upperBoundundnumPartitionsenthalten ist. Diese Option funktioniert auf die gleiche Weise wie im Spark SQL JDBC Reader. Weitere Informationen finden Sie unter JDBC To Other Databases (JDBC in anderen Datenbanken)im Handbuch zu Apache Spark SQL, DataFrames und Datasets. Die Werte für
lowerBoundundupperBoundwerden verwendet, um den Partitionsschritt zu bestimmen, nicht zum Filtern der Zeilen in der Tabelle. Alle Zeilen der Tabelle werden partitioniert und zurückgegeben.Anmerkung
Wenn Sie eine Abfrage anstelle eines Tabellennamens verwenden, sollten Sie überprüfen, ob die Abfrage mit der angegebenen Partitionierungsbedingung funktioniert. Zum Beispiel:
-
Wenn Ihr Abfrageformat
"SELECT col1 FROM table1"lautet, dann testen Sie die Abfrage, indem Sie eineWHERE-Klausel am Ende der Abfrage stellen, die die Partitionsspalte verwendet. -
Wenn Ihr Abfrageformat
SELECT col1 FROM table1 WHERE col2=val"lautet, dann testen Sie die Abfrage, indem Sie dieWHERE-Klausel mitANDund einem Ausdruck erweitern, der die Partitionsspalte verwendet.
-
-
lowerBound– Ganzzahl, optional, der Mindestwert vonpartitionColumn, der verwendet wird, um Partitionsschritte festzulegen. -
upperBound– Ganzzahl, optional, der Maximalwert vonpartitionColumn, der verwendet wird, um Partitionsschritte festzulegen. -
numPartitions– Ganzzahl, optional, die Anzahl der Partitionen. Dieser Wert, zusammen mitlowerBound(inklusive) undupperBound(exklusiv), bilden Partitionsschritte für generierteWHERE-Klauselausdrücke, die verwendet werden, um diepartitionColumnaufzuteilen.Wichtig
Seien Sie vorsichtig mit der Anzahl der Partitionen, da zu viele Partitionen Probleme auf Ihren externen Datenbanksystemen verursachen können.
-
filterPredicate– Zeichenfolge, optional, zusätzliche Bedingungsklausel zum Filtern von Daten aus der Quelle. Zum Beispiel:BillingCity='Mountain View'Wenn Sie eine Abfrage anstelle eines Tabellennamens verwenden, sollten Sie überprüfen, ob die Abfrage mit dem angegebenen
filterPredicatefunktioniert. Zum Beispiel:-
Wenn Ihr Abfrageformat
"SELECT col1 FROM table1"lautet, dann testen Sie die Abfrage, indem Sie eineWHERE-Klausel am Ende der Abfrage stellen, die das Filterprädikat verwendet. -
Wenn Ihr Abfrageformat
"SELECT col1 FROM table1 WHERE col2=val"lautet, dann testen Sie die Abfrage, indem Sie dieWHERE-Klausel mitANDund einem Ausdruck erweitern, der das Filterprädikat verwendet.
-
-
dataTypeMapping– Wörterbuch, optional, benutzerdefiniertes Datentyp-Mapping, das ein Mapping aus einem JDBC-Datentyp auf einen Glue-Datentyp durchführt. Beispielsweise ordnet die Option"dataTypeMapping":{"FLOAT":"STRING"}-Datenfelder des JDBC-TypsFLOATin den Java-TypStringzu, indem dieResultSet.getString()-Methode des Treibers abgerufen und für die Entwicklung des AWS Glue-Datensatzes verwendet wird. DasResultSet-Objekt wird von jedem Treiber implementiert, sodass das Verhalten spezifisch für den von Ihnen verwendeten Treiber ist. Informieren Sie sich in der Dokumentation für Ihren JDBC-Treiber, um zu verstehen, wie der Treiber die Konvertierungen durchführt. -
Die derzeit unterstützten AWS Glue-Datentypen sind:
-
DATUM
-
STRING
-
TIMESTAMP (ZEITSTEMPEL)
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
Die unterstützten JDBC-Datentypen sind Java8 java.sql.types
. Die Standard-Datentyp-Mappings (von JDBC zu AWS Glue) sind:
-
DATUM -> DATUM
-
VARCHAR -> ZEICHENFOLGE
-
CHAR -> ZEICHENFOLGE
-
LONGNVARCHAR -> ZEICHENFOLGE
-
TIMESTAMP -> ZEITSTEMPEL
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLESCHER WERT
-
BOOLEAN -> BOOLESCHER WERT
-
BIGINT -> LANG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> KURZ
-
SMALLINT -> KURZ
-
DOUBLE -> DOPPELT
Wenn Sie ein benutzerdefiniertes Datentyp-Mapping mit der Option
dataTypeMappingverwenden, können Sie ein Standard-Datentyp-Mapping überschreiben. Nur die JDBC-Datentypen, die in der OptiondataTypeMappingbetroffen sind. Das Standardmapping wird für alle anderen JDBC-Datentypen verwendet. Sie können bei Bedarf Mappings für zusätzliche JDBC-Datentypen hinzufügen. Wenn ein JDBC-Datentyp weder im Standard-Mapping noch in einem benutzerdefinierten Mapping enthalten ist, wird der Datentyp standardmäßig in den Datentyp AWS GlueSTRINGumgewandelt. -
In den folgenden Python-Codebeispielen wird gezeigt, wie JDBC-Datenbanken mit benutzerdefinierten AWS Marketplace-JDBC-Treibern gelesen werden. Es demonstriert das Lesen aus einer Datenbank und das Schreiben in einen S3-Speicherort.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Verbindungsoptionen für den Typ custom.athena oder marketplace.athena
-
className– Zeichenfolge, erforderlich, Name der Treiberklasse. Wenn Sie den Athena-CloudWatch-Konnektor verwenden, ist dieser Parameterwert das Präfix des Klassennamens (z. B."com.amazonaws.athena.connectors"). Der Athena-CloudWatch-Konnektor besteht aus zwei Klassen: einem Metadatenhandler und einem Record-Handler. Wenn Sie hier das allgemeine Präfix angeben, lädt die API die richtigen Klassen basierend auf diesem Präfix. -
tableName– Zeichenfolge, erforderlich, der Name des zu lesenden CloudWatch-Protokolldatenstreams. Dieses Codefragment verwendet den speziellen Ansichtsnamenall_log_streams, was bedeutet, dass der zurückgegebene dynamische Datenrahmen Daten aus allen Protokollstreams in der Protokollgruppe enthält. -
schemaName– Zeichenfolge, erforderlich, der Name des zu lesenden CloudWatch-Protokollgruppenstreams. Beispiel,/aws-glue/jobs/output. -
connectionName– Zeichenfolge, erforderlich, Name der Verbindung, die dem Konnektor zugeordnet ist.
Weitere Optionen für diesen Konnektor finden Sie in der Datei Amazon Athena CloudWatch Connector README
Im folgenden Python-Codebeispiel wird gezeigt, wie aus einem Athena-Datenspeicher mithilfe eines AWS Marketplace-Konnektoren gelesen wird. Es demonstriert das Lesen aus Athena und das Schreiben in einen S3-Speicherort.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Verbindungsoptionen für den Typ custom.spark oder marketplace.spark
-
className– Zeichenfolge, erforderlich, Konnektor-Klassenname. -
secretId– Zeichenfolge, optional, wird zum Abrufen der Anmeldeinformationen für die Konnektor-Verbindung verwendet. -
connectionName– Zeichenfolge, erforderlich, Name der Verbindung, die dem Konnektor zugeordnet ist. -
Andere Optionen hängen vom Datenspeicher ab. OpenSearch-Konfigurationsoptionen beginnen beispielsweise mit dem Präfix
es, wie in der Dokumentation Elasticsearch for Apache Hadoop (Elasticsearch für Apache Hadoop)beschrieben. Spark-Verbindungen zu Snowflake verwenden Optionen wie sfUserundsfPassword, wie unter Using the Spark Connector (Verwenden des Spark-Connectors)im Handbuch Connecting to Snowflake (Verbindung mit Snowflake herstellen) beschrieben.
Im folgenden Python-Codebeispiel wird gezeigt, wie aus einem OpenSearch-Datenspeicher mithilfe einer marketplace.spark-Verbindung gelesen wird.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Allgemeine Optionen
Die Optionen in diesem Abschnitt werden als connection_options bereitgestellt, gelten jedoch nicht speziell für einen Konnektor.
Die folgenden Parameter werden im Allgemeinen bei der Konfiguration von Lesezeichen verwendet. Sie können für Amazon-S3- oder JDBC-Workflows gelten. Weitere Informationen finden Sie unter Verwenden von Auftragslesezeichen.
jobBookmarkKeys– Ein Array von Spaltennamen.jobBookmarkKeysSortOrder– Zeichenfolge, die definiert, wie Werte basierend auf der Sortierreihenfolge verglichen werden. Zulässige Werte:"asc","desc".useS3ListImplementation– Wird zur Verwaltung der Speicherleistung beim Auflisten von Amazon-S3-Bucket-Inhalten verwendet. Weitere Informationen finden Sie unter Optimieren der Speicherverwaltung in AWS Glue.