AWS Glue Scala GlueContext-APIs
Paket: com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )GlueContext ist der Eintrittspunkt für das Lesen und Schreiben eines DynamicFrame von und zu Amazon Simple Storage Service (Amazon S3), den AWS Glue Data Catalog, JDBC und so weiter. Diese Klasse bietet Hilfsfunktionen zum Erstellen von DataSource-Trait- und DataSink-Objekten, die wiederum zum Lesen und Schreiben von DynamicFrames verwendet werden können.
Sie können GlueContext auch verwenden, um eine Zielanzahl an Partitionen (Standard 20) im DynamicFrame festzulegen, wenn die Anzahl der von der Quelle erstellten Partitionen kleiner ist als ein Mindestwert für Partitionen (Standard 10).
def addIngestionTimeColumns
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
Hängt die Erfassungszeitspalten wie ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute an die Eingabe DataFrame an. Diese Funktion wird automatisch in dem von AWS Glue erzeugten Skript generiert, wenn Sie eine Data-Catalog-Tabelle mit Amazon S3 als Ziel angeben. Diese Funktion aktualisiert automatisch die Partition mit Erfassungszeitspalten in der Ausgabetabelle. So können die Ausgabedaten bei der Erfassung automatisch partitioniert werden, ohne dass explizite Erfassungszeitspalten in den Eingabedaten erforderlich sind.
-
dataFrame– DerdataFrame, um die Erfassungszeitspalten anzuhängen. -
timeGranularity– Die Granularität der Zeitspalten. Gültige Werte sind „day“, „hour“ und „minute“. Wenn zum Beispiel „hour“ an die Funktion übergeben wird, werden im OriginaldataFramedie Zeiten in den Spalten „ingest_year“, „ingest_month“, „ingest_day“, und „ingest_hour“ aktualisiert.
Gibt den Datenrahmen nach dem Anhängen der Zeitgranularitätsspalten zurück.
Beispiel:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrameFromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Gibt einen DataFrame zurück, der mit der angegebenen Verbindung und dem Format erstellt wurde. Verwenden Sie diese Funktion nur mit AWS-Glue-Streaming-Quellen.
connectionType– Die Art der Streaming-Verbindung. Gültige Werte sindkinesisundkafka.-
connectionOptions– Verbindungsoptionen, die für Kinesis und Kafka unterschiedlich sind. Die Liste aller Verbindungsoptionen für jede Streaming-Datenquelle finden Sie unter Verbindungstypen und Optionen für ETL in AWS Glue für Spark. Beachten Sie die folgenden Unterschiede bei den Streaming-Verbindungsoptionen:-
Kinesis-Streaming-Quellen erfordern
streamARN,startingPosition,inferSchemaundclassification. -
Kafka-Streaming-Quellen erfordern
connectionName,topicName,startingOffsets,inferSchemaundclassification.
-
transformationContext– Der zu verwendende Transformationskontext (optional).format– Eine Formatspezifikation (optional). Diese wird für eine Amazon-S3- oder eine AWS Glue-Verbindung verwendet, die mehrere Formate unterstützt. Informationen zu den unterstützten Formaten finden Sie unter Mögliche Formate für Eingaben und Ausgaben in AWS Glue für SparkformatOptions– Formatierungsoptionen für das angegebene Format. Weitere Informationen zu unterstützten Formatoptionen finden Sie unter Pfad-Formatoptionen.
Beispiel für Amazon-Kinesis-Streaming-Quelle:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Beispiel für die Kafka-Streaming-Quelle:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
Wendet die batch_function auf jeden Mikrobatch an, der von der Streaming-Quelle gelesen wird.
-
frame– Der DataFrame, der den aktuellen Mikrobatch enthält. -
batch_function– Eine Funktion, die für jeden Mikrobatch angewendet wird. -
options– Eine Sammlung von Schlüssel-Wert-Paaren, die Informationen zur Verarbeitung von Mikrobatches enthält. Die folgenden Optionen sind erforderlich:-
windowSize– Die Zeitspanne für die Verarbeitung der einzelnen Batches. -
checkpointLocation– Der Ort, an dem Checkpoints für den Streaming-ETL-Auftrag gespeichert werden. -
batchMaxRetries– Die maximale Anzahl der Wiederholungsversuche für diesen Batch, wenn er fehlschlägt. Der Standardwert ist 3. Diese Option ist nur für Glue 2.0 und höher konfigurierbar.
-
Beispiel:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
Erstellt eine DataSink, die in einen Speicherort schreibt, der in einer Tabelle angegeben ist, die im Data Catalog definiert ist.
database– Der Datenbankname im Data Catalog.tableName– Der Tabellenname im Data Catalog.redshiftTmpDir– Das vorläufige Staging-Verzeichnis für die Verwendung mit bestimmten Datensenken. Standardmäßig auf „leer“ festgelegt.transformationContext– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.additionalOptions– Zusätzliche Optionen für AWS Glue.catalogId– Die Katalog-ID (Konto-ID) des Data Catalogs, auf den zugegriffen wird. Bei null wird die Standard-Konto-ID des Aufrufers verwendet.
Gibt den DataSink zurück.
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
Erstellt einen DataSource-Trait, der Daten aus einer Tabellendefinition im Data Catalog liest.
database– Der Datenbankname im Data Catalog.tableName– Der Tabellenname im Data Catalog.redshiftTmpDir– Das vorläufige Staging-Verzeichnis für die Verwendung mit bestimmten Datensenken. Standardmäßig auf „leer“ festgelegt.transformationContext– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.pushDownPredicate– Filtert Partitionen, ohne alle Dateien in Ihrem Datensatz auflisten und lesen zu müssen. Weitere Informationen finden Sie unter Vorabfilterung mit Pushdown-Prädikaten.additionalOptions– Eine Sammlung optionaler Name/Wert-Paare. Zu den möglichen Optionen gehören die unter Verbindungstypen und Optionen für ETL in AWS Glue für Spark aufgeführten, außerendpointUrl,streamName,bootstrap.servers,security.protocol,topicName,classificationunddelimiter. Eine weitere unterstützte Option istcatalogPartitionPredicate:catalogPartitionPredicate– Sie können einen Katalogausdruck basierend auf den Indexspalten an den Filter übergeben. Dies verlagert die Filterung auf die Serverseite. Weitere Informationen finden Sie unter AWS Glue-Partition-Indizes. Beachten Sie, dasspush_down_predicateundcatalogPartitionPredicateverschiedene Syntaxen verwenden. Erstere verwendet die Spark-SQL-Standardsyntax und letztere verwendet den JSQL-Parser.catalogId– Die Katalog-ID (Konto-ID) des Data Catalogs, auf den zugegriffen wird. Bei null wird die Standard-Konto-ID des Aufrufers verwendet.
Gibt den DataSource zurück.
Beispiel für eine Streaming-Quelle
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
Erstellt eine DataSink, die in eine JDBC-Datenbank schreibt, die im Data Catalog in einem Connection-Objekt angegeben ist. Das Connection-Objekt enthält Informationen zum Herstellen einer Verbindung zu einer JDBC-Senke, einschließlich URL, Benutzername, Passwort, VPC, Subnetz und Sicherheitsgruppen.
catalogConnection– Der Name der Verbindung im Data Catalog, die die JDBC-URL zum Schreiben enthält.options– Eine Reihe von JSON-Namen-Wert-Paaren, die zusätzliche Informationen liefern, die zum Schreiben in einen JDBC-Datenspeicher erforderlich sind. Dies umfasst:dbtable (erforderlich) – Der Name der JDBC-Tabelle. Bei JDBC-Datenspeichern, die von Schemata innerhalb einer Datenbank unterstützen, geben Sie
schema.table-namean. Wenn kein Schema angegeben ist, wird der Standardwert "öffentliches" Schema verwendet. Das folgende Beispiel zeigt einen Optionsparameter, der auf ein Schema mit Namentestund einer Tabelle mit Namentest_tablein der Datenbanktest_dbverweist.options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")database (erforderlich) – Der Name der JDBC-Datenbank.
Alle zusätzlichen Optionen werden direkt an den SparkSQL JDBC-Writer übergeben. Weitere Informationen finden Sie unter Redshift-Datenquelle für Spark
.
redshiftTmpDir– Ein vorläufiges Staging-Verzeichnis für die Verwendung mit bestimmten Datensenken. Standardmäßig auf „leer“ festgelegt.transformationContext– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.catalogId– Die Katalog-ID (Konto-ID) des Data Catalogs, auf den zugegriffen wird. Bei null wird die Standard-Konto-ID des Aufrufers verwendet.
Beispiel-Code:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
Gibt den DataSink zurück.
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
Erstellt eine DataSink, die Daten in ein Ziel wie Amazon Simple Storage Service (Amazon S3), JDBC, den AWS-Glue-Datenkatalog oder einen Apache-Kafka- oder Amazon-Kinesis-Datenstrom schreibt.
-
connectionType– Typ der Verbindung. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark. -
connectionOptions– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datensenke bereitstellt. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark. -
transformationContext– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.
Gibt den DataSink zurück.
def getSinkWithFormat
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
Erstellt eine DataSink, die Daten in ein Ziel wie Amazon S3, JDBC, den Datenkatalog oder einen Apache-Kafka- oder Amazon-Kinesis-Datenstrom schreibt. Legt auch das Format der Daten fest, die in das Ziel geschrieben werden.
connectionType– Typ der Verbindung. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.-
options– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datensenke bereitstellt. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark. transformationContext– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.format– Das Format der Daten, die in das Ziel geschrieben werden.formatOptions– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Optionen zum Formatieren der Daten am Ziel bereitstellt. Siehe Pfad-Formatoptionen.
Gibt den DataSink zurück.
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
Erstellt einen DataSource-Trait, der Daten aus einer Quelle wie Amazon S3, JDBC oder dem AWS Glue Data Catalog liest. Unterstützt auch Kafka- und Kinesis-Streaming-Datenquellen.
connectionType– Der Typ der Datenquelle. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.-
connectionOptions– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datenquelle bereitstellt. Weitere Informationen finden Sie unter Verbindungstypen und Optionen für ETL in AWS Glue für Spark.Für eine Kinesis-Streaming-Quelle sind die folgenden Verbindungsoptionen erforderlich:
streamARN,startingPosition,inferSchemaundclassification.Für eine Kafka-Streaming-Quelle sind die folgenden Verbindungsoptionen erforderlich:
connectionName,topicName,startingOffsets,inferSchemaundclassification. transformationContext– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.pushDownPredicate– Prädikat für Partitionsspalten.
Gibt den DataSource zurück.
Beispiel für Amazon-Kinesis-Streaming-Quelle:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Beispiel für die Kafka-Streaming-Quelle:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
def getSourceWithFormat
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Erstellt einen DataSource-Trait, der Daten aus Quellen wie Amazon S3, JDBC oder dem AWS Glue Data Catalog liest und zudem das Format für in der Quelle gespeicherte Daten festlegt.
connectionType– Der Typ der Datenquelle. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark.-
options– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Informationen zum Herstellen einer Verbindung mit der Datenquelle bereitstellt. Siehe Verbindungstypen und Optionen für ETL in AWS Glue für Spark. transformationContext– Der Transformationskontext, der mit der Senke verbunden ist, die von Auftrags-Lesezeichen zu verwenden ist. Standardmäßig auf „leer“ festgelegt.format– Das Format der in der Quelle gespeicherten Daten. Wenn derconnectionType„s3“ ist, können Sie auchformatangeben. Mögliche Werte sind: „avro“, „csv“, „grokLog“, „ion“, „json“, „xml“, „parquet“ oder „orc“.formatOptions– Eine Zeichenfolge eines JSON-Name-Wert-Paares, die zusätzliche Optionen zum Analysieren von Daten an der Quelle bereitstellt. Siehe Pfad-Formatoptionen.
Gibt den DataSource zurück.
Beispiele
Erstellen Sie einen DynamicFrame mit einer CSV-Datei als Datenquelle in Amazon S3:
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
Erstellen Sie einen DynamicFrame aus einer PostgreSQL-Datenquelle mit einer JDBC-Verbindung:
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()
Erstellen Sie einen DynamicFrame aus einer MySQL-Datenquelle mit einer JDBC-Verbindung:
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
Ruft das SparkSession-Objekt ab, das mit diesem GlueContext verknüpft ist. Verwenden Sie dieses SparkSession-Objekt zum Registrieren von Tabellen und UDFs für die Verwendung eines von DynamicFrames erstellten DataFrame.
Gibt die SparkSession zurück.
def startTransaction
def startTransaction(readOnly: Boolean):String
Starten Sie eine neue Transaktion. Ruft intern die startTransaction-API von Lake Formation auf.
readOnly– (Boolean) Gibt an, ob diese Transaktion schreibgeschützt oder gelesen und geschrieben werden soll. Schreibvorgänge mit einer schreibgeschützten Transaktions-ID werden abgelehnt. Schreibgeschützte Transaktionen müssen nicht festgeschrieben werden.
Gibt die Transaktions-ID zurück.
def commitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
Versucht, die angegebene Transaktion zu übernehmen. commitTransaction kann zurückkehren, bevor die Transaktion abgeschlossen ist. Ruft intern die CommitTransaction-API von Lake Formation auf.
transactionId– (String) Die zu verbindende Transaktion.waitForCommit– (Boolean) Bestimmt, ob die Rückgabe voncommitTransactionsofort erfolgt. Der Standardwert ist "True". Wenn false (falsch), befragtcommitTransactionund wartet, bis die Transaktion übergeben wurde. Die Dauer der Wartezeit ist mit einem exponentiellen Backoff mit maximal 6 Wiederholungsversuchen auf 1 Minute beschränkt.
Gibt einen booleschen Wert zurück, um anzugeben, ob das Commit abgeschlossen ist oder nicht.
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
Versucht, die angegebene Transaktion abzubrechen. Ruft intern die CancelTransaction-API von Lake Formation auf.
transactionId– (String) Die abzubrechende Transaktion.
Gibt eine TransactionCommittedException-Ausnahme zurück, wenn die Transaktion zuvor festgeschrieben wurde.
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
Erzeugt ein GlueContext-Objekt mit dem angegebenen SparkContext, minimalen Partitionen und Zielpartitionen.
sc— Das ToolSparkContext.minPartitions– Die Mindestanzahl an Partitionen.targetPartitions– Die Zielanzahl an Partitionen.
Gibt den GlueContext zurück.
def this
def this( sc : SparkContext )
Erstellt ein GlueContext-Objekt mit dem mitgelieferten SparkContext. Legt die Mindestpartitionen auf 10 und die Zielpartitionen auf 20 fest.
sc— Das ToolSparkContext.
Gibt den GlueContext zurück.
def this
def this( sparkContext : JavaSparkContext )
Erstellt ein GlueContext-Objekt mit dem mitgelieferten JavaSparkContext. Legt die Mindestpartitionen auf 10 und die Zielpartitionen auf 20 fest.
sparkContext— Das ToolJavaSparkContext.
Gibt den GlueContext zurück.