Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Liste des API GlueContext Scala AWS Glue
Package : com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )GlueContext Est le point d'entrée pour lire et écrire un DynamicFrame depuis et vers Amazon Simple Storage Service (Amazon S3), le catalogue de données AWS Glue, JDBC, etc. Cette classe fournit des fonctions d'utilitaire pour créer des objets Caractéristique DataSource et DataSink qui peuvent ensuite être utilisés pour lire et écrire les DynamicFrame.
Vous pouvez également utiliser GlueContext pour définir un nombre cible de partitions (par défaut 20) dans le DynamicFrame si le nombre de partitions créées à partir de la source est inférieure à un seuil minimal pour les partitions (par défaut 10).
def addIngestionTimeColumns
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
Ajoute des colonnes de temps d'ingestion, telles que ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute à l'entrée DataFrame. Cette fonction est automatiquement générée dans le script généré par AWS Glue lorsque vous spécifiez une table Data Catalog avec Amazon S3 comme cible. Cette fonction met automatiquement à jour la partition avec les colonnes de temps d'ingestion sur la table de sortie. Cela permet aux données de sortie d'être automatiquement partitionnées à l'heure d'ingestion sans nécessiter de colonnes d'heure d'ingestion explicites dans les données d'entrée.
-
dataFrame–dataFrameauquel ajouter les colonnes de temps d'ingestion. -
timeGranularity— granularité des colonnes de temps. Les valeurs valides sont «day», «hour» et «minute». Par exemple, si «hour» est transmis à la fonction, les colonnes de temps «ingest_year», «ingest_month», «ingest_day» et «ingest_hour» seront ajoutées à l'original «dataFrame».
Renvoie le bloc de données après l'ajout des colonnes de granularité temporelle.
Exemple :
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrameFromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Renvoie un DataFrame créé avec la connexion et le format spécifiés. Utilisez cette fonction uniquement avec les sources de streaming AWS Glue.
connectionType: type de connexion en streaming. Les valeurs valides sontkinesisetkafka.-
connectionOptions: options de connexion, qui sont différentes pour Kinesis et Kafka. Vous trouverez la liste de toutes les options de connexion pour chaque source de données de streaming sur la page Types et options de connexion pour ETL dans AWS Glue pour Spark. Notez les différences suivantes dans les options de connexion en streaming :-
Les sources de streaming Kinesis nécessitent
streamARN,startingPosition,inferSchemaetclassification. -
Les sources de streaming Kafka nécessitent
connectionName,topicName,startingOffsets,inferSchemaetclassification.
-
transformationContext: contexte de transformation à utiliser (facultatif).format: spécification de format (facultatif). Utilisée pour une connexion Amazon S3 ou AWS Glue prenant en charge plusieurs formats. Pour plus d'informations sur les formats pris en charge, veuillez consulter Options de format pour les entrées et sorties dans AWS Glue pour Spark.formatOptions: options de format pour le format spécifié. Pour de plus amples informations sur les options de formats pris en charge, veuillez consulter Options de format de données.
Exemple pour la source de streaming Amazon Kinesis :
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Exemple pour la source de streaming Kafka :
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
S'applique à batch_function transmis à chaque micro-lot lu à partir de la source de streaming.
-
frame— DataFrame contenant le micro-lot actuel. -
batch_function— fonction qui sera appliquée à chaque micro-lot. -
options— collection de paires clé-valeur qui contient des informations sur le traitement de micro-lots. Les options suivantes sont requises :-
windowSize— durée de traitement de chaque lot. -
checkpointLocation— emplacement dans lequel les points de contrôle sont stockés pour la tâche ETL en streaming. -
batchMaxRetries– nombre maximum de nouvelles tentatives pour ce lot en cas d'échec. La valeur par défaut est 3. Cette option n'est configurable que pour Glue version 2.0 et ultérieure.
-
Exemple :
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
Crée un DataSink qui écrit dans un emplacement spécifié dans une table définie dans Data Catalog.
database– Nom de base de données dans le catalogue de données.tableName– Nom de la table dans le catalogue de données.redshiftTmpDir– Répertoire intermédiaire temporaire à utiliser avec certains récepteurs de données. Valeur définie sur vide par défaut.transformationContext– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.additionalOptions— Options supplémentaires fournies à AWS Glue.catalogId— ID du catalogue (ID du compte) Data Catalog auquel vous accédez. Lorsque la valeur est null, l'ID de compte par défaut de l'appelant est utilisé.
Renvoie le DataSink.
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
Crée un Caractéristique DataSource qui lit les données à partir d'une définition de table dans Data Catalog.
database— nom de la base de données dans Data Catalog.tableName– Nom de la table dans le catalogue de données.redshiftTmpDir– Répertoire intermédiaire temporaire à utiliser avec certains récepteurs de données. Valeur définie sur vide par défaut.transformationContext– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.pushDownPredicate– Filtre les partitions sans avoir à répertorier ni lire tous les fichiers de votre jeu de données. Pour plus d’informations, consultez Préfiltrage à l'aide des prédicats pushdown.additionalOptions— Ensemble de paires nom-valeur facultatives. Les options possibles comprennent celles répertoriées dans Types et options de connexion pour ETL dans AWS Glue pour Spark, sauf pourendpointUrl,streamName,bootstrap.servers,security.protocol,topicName,classificationetdelimiter. Est une autre option prise en chargecatalogPartitionPredicate:catalogPartitionPredicate– vous pouvez passer une expression de catalogue à filtrer en fonction des colonnes d’index. Cela envoie le filtrage du côté serveur. Pour en savoir plus, consultez AWS Glue Indexes de partition. Notez quepush_down_predicateetcatalogPartitionPredicateutilisent des syntaxes différentes. Le premier utilise la syntaxe standard SQL Spark et le dernier utilise l’analyseur JSQL.catalogId— ID du catalogue (ID du compte) Data Catalog auquel vous accédez. Lorsque la valeur est null, l'ID de compte par défaut de l'appelant est utilisé.
Renvoie le DataSource.
Exemple pour la source de streaming
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
Crée un DataSink qui écrit dans une base de données JDBC spécifiée d'un objet Connection dans Data Catalog. L'objet Connection comporte des informations de connexion pour se connecter à un récepteur JDBC incluant l'URL, le nom d'utilisateur, le mot de passe, le VPC, le sous-réseau et les groupes de sécurité.
catalogConnection– Nom de la connexion dans le catalogue de données qui contient l'URL JDBC sur laquelle écrire.options– Chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires nécessaires pour écrire dans un magasin de données JDBC. Cela consiste notamment à :dbtable (obligatoire) — Nom de la table JDBC. Pour les magasins de données JDBC qui prennent en charge les schémas dans une base de données, spécifiez
schema.table-name. Si aucun schéma n’est fourni, c’est le schéma « public » par défaut qui est utilisé. L'exemple suivant illustre un paramètre d'options qui pointe sur un schéma nommétestet une table nomméetest_tabledans la base de donnéestest_db.options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")database (obligatoire) — Nom de la base de données JDBC.
Toutes les options supplémentaires transmises directement à l'enregistreur JDBC SparkSQL. Pour plus d'informations, consultez Redshift data source for Spark
.
redshiftTmpDir— répertoire intermédiaire temporaire à utiliser avec certains récepteurs de données. Valeur définie sur vide par défaut.transformationContext– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.catalogId— ID du catalogue (ID du compte) Data Catalog auquel vous accédez. Lorsque la valeur est null, l'ID de compte par défaut de l'appelant est utilisé.
Exemple de code :
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
Renvoie le DataSink.
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
Crée un DataSink qui écrit des données dans une destination comme Amazon Simple Storage Service (Amazon S3), JDBC ou le Catalogue de données AWS Glue, Apache Kafka ou un flux de données Apache Kafka ou Amazon Kinesis.
-
connectionType— Type de connexion. Consultez Types et options de connexion pour ETL dans AWS Glue pour Spark. -
connectionOptions— Chaîne JSON de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir la connexion avec le récepteur de données. Consultez Types et options de connexion pour ETL dans AWS Glue pour Spark. -
transformationContext– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.
Renvoie le DataSink.
def getSinkWithFormat
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
Crée un DataSink qui écrit des données dans une destination comme Amazon S3, JDBC ou le catalogue de données, ou un flux de données Apache Kafka ou Amazon Kinesis. Définit également le format des données à écrire dans la destination.
connectionType— Type de connexion. Consultez Types et options de connexion pour ETL dans AWS Glue pour Spark.-
options— Chaîne JSON de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir une connexion avec le récepteur de données. Consultez Types et options de connexion pour ETL dans AWS Glue pour Spark. transformationContext– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.format— Format des données à écrire dans la destination.formatOptions– Chaîne JSON de paires nom-valeur qui fournissent des options supplémentaires pour le formatage des données à la destination. Consultez Options de format de données.
Renvoie le DataSink.
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
Crée un Caractéristique DataSource qui lit les données à partir d'une source telle qu'Amazon S3, JDBC ou AWS Glue Data Catalog. Prend également en charge les sources de données de streaming Kafka et Kinesis.
connectionType– Type de données de la source de données. Consultez Types et options de connexion pour ETL dans AWS Glue pour Spark.-
connectionOptions— Chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir la connexion avec la source de données. Pour plus d’informations, consultez Types et options de connexion pour ETL dans AWS Glue pour Spark.Une source de streaming Kinesis nécessite les options de connexion suivantes :
streamARN,startingPosition,inferSchemaetclassification.Une source de streaming Kafka nécessite les options de connexion suivantes :
connectionName,topicName,startingOffsets,inferSchemaetclassification. transformationContext– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.pushDownPredicate– Prédicat sur les colonnes de partition.
Renvoie le DataSource.
Exemple pour la source de streaming Amazon Kinesis :
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Exemple pour la source de streaming Kafka :
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
def getSourceWithFormat
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Crée un Caractéristique DataSource qui lit les données à partir d'une source comme Amazon S3, JDBC ou AWS Glue Data Catalog et définit également le format des données stockées dans la source.
connectionType– Type de données de la source de données. Consultez Types et options de connexion pour ETL dans AWS Glue pour Spark.-
options– Chaîne de paires nom-valeur JSON qui fournissent des informations supplémentaires pour établir la connexion avec la source de données. Consultez Types et options de connexion pour ETL dans AWS Glue pour Spark. transformationContext– Contexte de transformation associé au récepteur à utiliser par les signets de la tâche. Valeur définie sur vide par défaut.format– Format des données stockées à la source. Lorsque leconnectionTypeest « s3 », vous pouvez également spécifierformat. Peut être « avro », « csv », « grokLog », « ion », « json », « xml », « parquet » ou « orc ».formatOptions– Chaîne JSON de paires nom-valeur qui fournissent des options supplémentaires pour l'analyse des données à la source. Consultez Options de format de données.
Renvoie le DataSource.
Exemples
Créez un DynamicFrame à partir d'une source de données qui est un fichier de valeurs séparées par des virgules (CSV) sur Amazon S3 :
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
Créez un DynamicFrame à partir d'une source de données qui est un PostgreSQL à l'aide d'une connexion JDBC :
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()
Créez un DynamicFrame à partir d'une source de données MySQL à l'aide d'une connexion JDBC :
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
Permet d'obtenir l'objet SparkSession associé au GlueContext. Utilisez cet objet SparkSession pour enregistrer les tables et les fonctions définies par l'utilisateur à utiliser avec l'objet DataFrame créé à partir de DynamicFrames.
Renvoie le SparkSession.
def startTransaction
def startTransaction(readOnly: Boolean):String
Démarrer une nouvelle transaction. Appelle en interne l'API Démarrer la transaction Lake Formation.
readOnly– Valeur booléenne indiquant si cette transaction doit être en lecture seule ou en lecture et en écriture. Les écritures effectuées à l'aide d'un ID de transaction en lecture seule seront rejetées. Les transactions en lecture seule n'ont pas besoin d'être validées.
Retourne l'ID de transaction.
def commitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
Tentative de validation de la transaction spécifiée. commitTransaction peut être renvoyé avant la fin de la validation de la transaction. Appelle en interne la Lake Formation commitTransaction API.
transactionId– (Chaîne) La transaction à valider.waitForCommit– (Booléen) Détermine si lecommitTransactionretourne immédiatement. La valeur par défaut est True. Si elle est false,commitTransactioninterroge et attend que la transaction soit validée. Le temps d'attente est limité à 1 minute en utilisant le backoff exponentiel avec un maximum de 6 tentatives de nouvelle tentative.
Renvoie une valeur de type Booléen pour indiquer si la validation a été effectuée ou non.
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
Tentative d'annulation de la transaction spécifiée. Appelle en interne la Lake Formation CancelTransaction API.
transactionId– (Chaîne) La transaction à annuler.
Retourne une exception TransactionCommittedException si la transaction a déjà été validée.
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
Crée un objet GlueContext utilisant le SparkContext spécifié, les partitions minimales et les partitions cible.
sc— LeSparkContext.minPartitions— Nombre minimal de partitions.targetPartitions— Nombre cible de partitions.
Renvoie le GlueContext.
def this
def this( sc : SparkContext )
Crée un objet GlueContext avec le SparkContext fourni. Définit le nombre minimal de partitions à 10 et de partitions cibles à 20.
sc— LeSparkContext.
Renvoie le GlueContext.
def this
def this( sparkContext : JavaSparkContext )
Crée un objet GlueContext avec le JavaSparkContext fourni. Définit le nombre minimal de partitions à 10 et de partitions cibles à 20.
sparkContext— LeJavaSparkContext.
Renvoie le GlueContext.