Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Classe GlueContext
Esegue il wrapping dell'oggetto Apache Spark SparkContext
__init__
__init__(sparkContext)
sparkContext: il contesto Apache Spark da usare.
Creazione
getSource
getSource(connection_type, transformation_ctx = "", **options)
Crea un oggetto DataSource che può essere utilizzato per leggere DynamicFrames da fonti esterne.
connection_type: il tipo di connessione da utilizzare, ad esempio Amazon Simple Storage Service (Amazon S3), Amazon Redshift e JDBC. I valori validi includonos3,mysql,postgresql,redshift,sqlserver,oracleedynamodb.transformation_ctx: il contesto di trasformazione da usare (opzionale).options: una raccolta di coppie nome/valore opzionali. Per ulteriori informazioni, consulta Tipi e opzioni di connessione per ETL in AWS Glue per Spark.
Di seguito è riportato un esempio dell'utilizzo di getSource:
>>> data_source = context.getSource("file", paths=["/in/path"]) >>> data_source.setFormat("json") >>> myFrame = data_source.getFrame()
create_dynamic_frame_from_rdd
create_dynamic_frame_from_rdd(data, name, schema=None, sample_ratio=None, transformation_ctx="")
Restituisce un DynamicFrame che viene creato da un Apache Spark Resilient Distributed Dataset (RDD).
data: l'origine dati da usare.name: il nome dei dati da usare.schema: lo schema da usare (opzionale).sample_ratio: il rapporto di esempio da usare (opzionale).transformation_ctx: il contesto di trasformazione da usare (opzionale).
create_dynamic_frame_from_catalog
create_dynamic_frame_from_catalog(database, table_name, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, catalog_id = None)
Restituisce un DynamicFrame creato utilizzando un database del catalogo dati e un nome della tabella. Quando si utilizza questo metodo, si forniscono format_options tramite le proprietà della tabella sulla tabella Catalogo dati AWS Glue specificata e altre opzioni tramite l'argomento additional_options.
Database: il database da cui leggere.table_name: il nome della tabella da cui leggere.redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).transformation_ctx: il contesto di trasformazione da usare (opzionale).push_down_predicate: filtra le partizioni senza dover elencare e leggere tutti i file nel set di dati. Per informazioni sulle origini supportate e le limitazioni, consultare Optimizing reads with pushdown in AWS Glue ETL. Per ulteriori informazioni, consulta Prefiltraggio con i predicati pushdown.additional_options: una raccolta di coppie nome/valore opzionali. Le opzioni possibili includono quelle elencate in Tipi e opzioni di connessione per ETL in AWS Glue per Spark ad eccezione diendpointUrl,streamName,bootstrap.servers,security.protocol,topicName,classificationedelimiter. Un'altra opzione supportata ècatalogPartitionPredicate:catalogPartitionPredicate: è possibile passare un'espressione di catalogo per filtrare in base alle colonne di indice. Questo esegue il push down del filtro sul lato server. Per ulteriori informazioni, consultare la pagina relativa agli indici di partizionamento di AWS Glue. Tenere presente chepush_down_predicateecatalogPartitionPredicateusano sintassi diverse. Il primo utilizza la sintassi standard Spark SQL e il secondo utilizza il parser JSQL.catalog_id: l'ID catalogo (ID account) del catalogo dati a cui si accede. Se Nessuno, viene utilizzato l'ID account predefinito del chiamante.
create_dynamic_frame_from_options
create_dynamic_frame_from_options(connection_type, connection_options={},
format=None, format_options={}, transformation_ctx = "")
Restituisce un DynamicFrame creato con la connessione e il formato specificati.
connection_type: il tipo di connessione, come Amazon S3, Amazon Redshift e JDBC. I valori validi includonos3,mysql,postgresql,redshift,sqlserver,oracleedynamodb.connection_options: opzioni di connessione, come tabella di database e percorsi (opzionale). Per un oggettoconnection_typedis3, viene definito un elenco di percorsi Amazon S3.connection_options = {"paths": ["s3://aws-glue-target/temp"]}Per le connessioni JDBC, diverse proprietà devono essere definite. Il nome del database deve fare parte dell'URL. Puoi opzionalmente essere incluso nelle opzioni di connessione.
avvertimento
Si consiglia di non archiviare le password nello script. Valutare la possibilità di utilizzare
boto3per recuperarle da Gestione dei segreti AWS o da Catalogo dati AWS Glue.connection_options = {"url": "jdbc-url/database", "user": "username", "password":passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}La proprietà
dbtableè il nome della tabella JDBC. Per gli archivi dati JDBC che supportano schemi all'interno di un database, specificareschema.table-name. Se non viene fornito alcuno schema, viene usato lo schema "pubblico" predefinito.Per ulteriori informazioni, consultare Tipi e opzioni di connessione per ETL in AWS Glue per Spark.
format: una specifica di formato. Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.format_options: opzioni di formato per il formato specificato. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.transformation_ctx: il contesto di trasformazione da usare (opzionale).push_down_predicate: filtra le partizioni senza dover elencare e leggere tutti i file nel set di dati. Per informazioni sulle origini supportate e le limitazioni, consultare Optimizing reads with pushdown in AWS Glue ETL. Per ulteriori informazioni, consulta Prefiltraggio con i predicati pushdown.
create_sample_dynamic_frame_from_catalog
create_sample_dynamic_frame_from_catalog(database, table_name, num, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, sample_options = {}, catalog_id = None)
Restituisce un DynamicFrame di esempio creato utilizzando un database del catalogo dati e un nome della tabella. La DynamicFrame contiene solo i primi num registri da un'origine dati.
-
database: il database da cui leggere. -
table_name: il nome della tabella da cui leggere. -
num: il numero massimo di registri nel frame dinamico di esempio restituito. redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).-
transformation_ctx: il contesto di trasformazione da usare (opzionale). push_down_predicate: filtra le partizioni senza dover elencare e leggere tutti i file nel set di dati. Per ulteriori informazioni, consultare Prefiltraggio con i predicati pushdown.-
additional_options: una raccolta di coppie nome/valore opzionali. Le opzioni possibili includono quelle elencate in Tipi e opzioni di connessione per ETL in AWS Glue per Spark ad eccezione diendpointUrl,streamName,bootstrap.servers,security.protocol,topicName,classificationedelimiter. -
sample_options: parametri per controllare il comportamento di campionamento (opzionale). Parametri attuali disponibili per le origini Amazon S3:maxSamplePartitions: il numero massimo di partizioni che il campionamento leggerà. Il valore predefinito è 10maxSampleFilesPerPartition: il numero massimo di file che il campionamento leggerà in una partizione. Il valore predefinito è 10.Questi parametri aiutano a ridurre il tempo impiegato dall'elenco dei file. Ad esempio, supponiamo che il set di dati contenga 1000 partizioni e ogni partizione contenga 10 file. Se è impostato
maxSamplePartitions= 10 emaxSampleFilesPerPartition= 10, invece di elencare tutti i 10.000 file, il campionamento elencherà e leggerà solo le prime 10 partizioni con i primi 10 file in ognuna di esse: 10*10 = 100 file in totale.
-
catalog_id: l'ID catalogo del catalogo dati a cui si accede (l'ID account del catalogo dati). Impostato suNoneper default. L'impostazione predefinita diNoneè l'ID catalogo dell'account chiamante nel servizio.
create_sample_dynamic_frame_from_options
create_sample_dynamic_frame_from_options(connection_type, connection_options={}, num, sample_options={}, format=None, format_options={}, transformation_ctx = "")
Restituisce un DynamicFrame di esempio creato con la connessione e il formato specificati. La DynamicFrame contiene solo i primi num registri da un'origine dati.
connection_type: il tipo di connessione, come Amazon S3, Amazon Redshift e JDBC. I valori validi includonos3,mysql,postgresql,redshift,sqlserver,oracleedynamodb.connection_options: opzioni di connessione, come tabella di database e percorsi (opzionale). Per ulteriori informazioni, consultare Tipi e opzioni di connessione per ETL in AWS Glue per Spark.-
num: il numero massimo di registri nel frame dinamico di esempio restituito. -
sample_options: parametri per controllare il comportamento di campionamento (opzionale). Parametri attuali disponibili per le origini Amazon S3:maxSamplePartitions: il numero massimo di partizioni che il campionamento leggerà. Il valore predefinito è 10maxSampleFilesPerPartition: il numero massimo di file che il campionamento leggerà in una partizione. Il valore predefinito è 10.Questi parametri aiutano a ridurre il tempo impiegato dall'elenco dei file. Ad esempio, supponiamo che il set di dati contenga 1000 partizioni e ogni partizione contenga 10 file. Se è impostato
maxSamplePartitions= 10 emaxSampleFilesPerPartition= 10, invece di elencare tutti i 10.000 file, il campionamento elencherà e leggerà solo le prime 10 partizioni con i primi 10 file in ognuna di esse: 10*10 = 100 file in totale.
format: una specifica di formato. Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.format_options: opzioni di formato per il formato specificato. Consulta Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.-
transformation_ctx: il contesto di trasformazione da usare (opzionale). push_down_predicate: filtra le partizioni senza dover elencare e leggere tutti i file nel set di dati. Per ulteriori informazioni, consulta Prefiltraggio con i predicati pushdown.
add_ingestion_time_columns
add_ingestion_time_columns(dataFrame, timeGranularity = "")
Aggiunge colonne del tempo di importazione dati come ingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute al DataFrame di input. Questa funzione viene generata automaticamente nello script generato da AWS Glue quando si specifica una tabella del catalogo dati con Amazon S3 come destinazione. Questa funzione aggiorna automaticamente la partizione con le colonne del tempo di importazione dati nella tabella di output. Ciò consente ai dati di output di venire partizionati automaticamente nel tempo di importazione dati senza necessitare di colonne di tempo di inserimento esplicite nei dati di input.
-
dataFrame: ildataFrameal quale aggiungere le colonne del tempo di importazione dati. -
timeGranularity: la granularità delle colonne temporali. I valori validi sono “day”, “hour” e “minute”. Ad esempio, se “hour" viene passato alla funzione, ildataFrameoriginale avrà “ingest_year”, “ingest_month”, “ingest_day” e “ingest_hour” colonne temporali aggiunte.
Restituisce il frame di dati dopo l'aggiunta di colonne di granularità di tempo.
Esempio:
dynamic_frame = DynamicFrame.fromDF(glueContext.add_ingestion_time_columns(dataFrame, "hour"))
create_data_frame_from_catalog
create_data_frame_from_catalog(database, table_name, transformation_ctx = "",
additional_options = {})
Restituisce un DataFrame creato utilizzando le informazioni da una tabella del catalogo dati.
-
database: il database del catalogo dati da cui leggere. -
table_name: il nome della tabella de catalogo dati da cui leggere. -
transformation_ctx: il contesto di trasformazione da usare (opzionale). -
additional_options: una raccolta di coppie nome/valore opzionali. Le opzioni possibili includono quelle elencate in Tipi e opzioni di connessione per ETL in AWS Glue per Spark per le origini di streaming, ad esempiostartingPosition,maxFetchTimeInMs, estartingOffsets.-
useSparkDataSource: se impostato su true, forza AWS Glue a utilizzare l'API nativa di Spark Data Source per leggere la tabella. L'API Spark Data Source supporta i seguenti formati: AVRO, binario, CSV, JSON, ORC, Parquet e testo. In una tabella del catalogo dati, il formato può essere specificato utilizzando la proprietàclassification. Per ulteriori informazioni sull'API Spark Data Source, consulta la documentazione ufficiale di Apache Spark. L'uso di
create_data_frame_from_catalogconuseSparkDataSourceoffre i seguenti vantaggi:-
Restituisce direttamente un
DataFramee fornisce un'alternativa acreate_dynamic_frame.from_catalog().toDF(). -
Supporta il controllo delle autorizzazioni a livello di tabella AWS Lake Formation per i formati nativi.
-
Supporta la lettura di formati di data lake senza il controllo delle autorizzazioni a livello di tabella AWS Lake Formation. Per ulteriori informazioni, consultare Utilizzo di framework data lake con processi ETL di AWS Glue.
Quando si abilita
useSparkDataSource, è possibile anche aggiungere una qualsiasi delle opzioni Spark Data Sourcein additional_optionsa seconda delle necessità. AWS Glue passa queste opzioni direttamente al lettore Spark. -
-
useCatalogSchema: se impostato su true, AWS Glue applica lo schema del catalogo dati alDataFramerisultante. Altrimenti, il lettore deduce lo schema dai dati. Se si abilita l'opzioneuseCatalogSchema, è necessario impostare ancheuseSparkDataSourcesu true.
-
Limitazioni
Quando si utilizza l'opzione useSparkDataSource considerare le seguenti limitazioni:
-
Se si usa
useSparkDataSource, AWS Glue crea un nuovoDataFramein una sessione Spark separata che è diversa dalla sessione Spark originale. -
Il filtro delle partizioni Spark DataFrame non funziona con le seguenti funzionalità di AWS Glue.
Per utilizzare il filtro delle partizioni con queste funzionalità, è possibile usare il predicato pushdown di AWS Glue. Per ulteriori informazioni, consultare Prefiltraggio con i predicati pushdown. Il filtraggio sulle colonne non partizionate non viene modificato.
Lo script di esempio seguente dimostra il modo errato di eseguire il filtraggio delle partizioni con l'opzione
excludeStorageClasses.// Incorrect partition filtering using Spark filter with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Suppose year and month are partition keys. // Filtering on year and month won't work, the filtered_df will still // contain data with other year/month values. filtered_df = read_df.filter("year == '2017 and month == '04' and 'state == 'CA'")Lo script di esempio seguente dimostra il modo corretto di utilizzare un predicato pushdown in modo da eseguire il filtraggio delle partizioni con l'opzione
excludeStorageClasses.// Correct partition filtering using the AWS Glue pushdown predicate // with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, // Use AWS Glue pushdown predicate to perform partition filtering push_down_predicate = "(year=='2017' and month=='04')" additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Use Spark filter only on non-partitioned columns filtered_df = read_df.filter("state == 'CA'")
Esempio: creazione di una tabella CSV utilizzando il lettore di origini dati Spark
// Read a CSV table with '\t' as separator read_df = glueContext.create_data_frame.from_catalog( database=<database_name>, table_name=<table_name>, additional_options = {"useSparkDataSource": True, "sep": '\t'} )
create_data_frame_from_options
create_data_frame_from_options(connection_type, connection_options={},
format=None, format_options={}, transformation_ctx = "")
Questa API è obsoleta. Utilizza invece le API getSource(). Restituisce un DataFrame creato con la connessione e il formato specificati. Utilizza questa funzione solo con origini di streaming AWS Glue.
-
connection_type: il tipo di connessione streaming. I valori validi includonokinesisekafka. -
connection_options: opzioni di connessione, che sono diverse per Kinesis e Kafka. È possibile trovare l'elenco di tutte le opzioni di connessione per ogni origine dati di streaming all'indirizzo Tipi e opzioni di connessione per ETL in AWS Glue per Spark. Di seguito vengono illustrate le differenze delle opzioni di connessione di streaming:-
Le origini di streaming di Kinesis richiedono
streamARN,startingPosition,inferSchemaeclassification. -
Le origini di streaming di Kafka richiedono
connectionName,topicName,startingOffsets,inferSchemaeclassification.
-
-
format: una specifica di formato. Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Per ulteriori informazioni sui formati supportati, consulta Opzioni del formato dati per input e output in AWS Glue per Spark. -
format_options: opzioni di formato per il formato specificato. Per ulteriori informazioni sulle opzioni di formato supportate, consulta Opzioni del formato dati per input e output in AWS Glue per Spark. -
transformation_ctx: il contesto di trasformazione da usare (opzionale).
Esempio per l'origine di streaming Amazon Kinesis:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
Esempio per l'origine di streaming Kafka:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
forEachBatch
forEachBatch(frame, batch_function, options)
Applica la batch_function passata a ogni micro batch che viene letto dall'origine di streaming.
-
frame: il frame di dati contenente il micro batch corrente. -
batch_function: una funzione che verrà applicata per ogni micro batch. -
options: una raccolta di coppie chiave-valore che contiene informazioni su come elaborare micro batch. Sono richieste le seguenti opzioni:-
windowSize: la quantità di tempo da dedicare all'elaborazione di ciascun batch. -
checkpointLocation: la posizione in cui sono archiviati i checkpoint per il processo ETL di streaming. -
batchMaxRetries: numero massimo di tentativi per riprovare il processo se il batch ha esito negativo. Il valore predefinito è 3. Questa opzione è configurabile solo per Glue versione 2.0 e successive.
-
Esempio:
glueContext.forEachBatch( frame = data_frame_datasource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://kafka-auth-dataplane/confluent-test/output/checkpoint/" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): datasource0 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame" ) additionalOptions_datasink1 = {"enableUpdateCatalog": True} additionalOptions_datasink1["partitionKeys"] = ["ingest_yr", "ingest_mo", "ingest_day"] datasink1 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "tempdb", table_name = "kafka-auth-table-output", transformation_ctx = "datasink1", additional_options = additionalOptions_datasink1 )
Utilizzo di set di dati in Amazon S3
purge_table
purge_table(catalog_id=None, database="", table_name="", options={},
transformation_ctx="")
Elimina i file da Amazon S3 per il database e la tabella del catalogo specificati. Se tutti i file in una partizione vengono eliminati, anche la partizione viene eliminata dal catalogo. L'operazione purge_table sulle tabelle registrate con Lake Formation non è supportata.
Per poter recuperare gli oggetti eliminati, puoi abilitare la funzione di controllo delle versioni degli oggetti nel bucket Amazon S3. Quando un oggetto viene eliminato da un bucket per il quale non è abilitata la funzione Versioni multiple degli oggetti, l'oggetto non può essere recuperato. Per ulteriori informazioni su come recuperare gli oggetti eliminati in un bucket abilitato per le versioni, consultare In che modo può essere recuperato un oggetto Amazon S3 che è stato eliminato?
-
catalog_id: l'ID catalogo del catalogo dati a cui si accede (l'ID account del catalogo dati). Impostato suNoneper default. L'impostazione predefinita diNoneè l'ID catalogo dell'account chiamante nel servizio. database: il database da usare.table_name: il nome della tabella da usare.options: opzioni per filtrare i file da eliminare e per la generazione di file manifesto.retentionPeriod: specifica un periodo in numero di ore per la conservazione dei file. I file più recenti del periodo di conservazione vengono mantenuti. Impostato su 168 ore (7 giorni) per impostazione predefinita.partitionPredicate: le partizioni che soddisfano questo predicato vengono eliminate. I file all'interno del periodo di conservazione in queste partizioni non vengono eliminati. Impostato su"": vuoto per impostazione predefinita.excludeStorageClasses: i file con classe di storage nelexcludeStorageClassesnon vengono eliminati. L'impostazione di default èSet(): un set vuoto.manifestFilePath: un percorso opzionale per la generazione di file manifesto. Tutti i file che sono stati eliminati correttamente vengono registrati inSuccess.csve quelli che non sono riusciti inFailed.csv
transformation_ctx: il contesto di trasformazione da usare (opzionale). Utilizzato nel percorso del file manifest.
glueContext.purge_table("database", "table", {"partitionPredicate": "(month=='march')", "retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})purge_s3_path
purge_s3_path(s3_path, options={}, transformation_ctx="")
Elimina i file dal percorso Amazon S3 specificato in modo ricorsivo.
Per poter recuperare gli oggetti eliminati, puoi abilitare la funzione di controllo delle versioni degli oggetti nel bucket Amazon S3. Quando un oggetto viene eliminato da un bucket per il quale non è abilitata la funzione di controllo delle versioni degli oggetti, l'oggetto non può essere recuperato. Per ulteriori informazioni su come recuperare gli oggetti eliminati in un bucket abilitato per il controllo delle versioni, consulta In che modo può essere recuperato un oggetto Amazon S3 che è stato eliminato?
s3_path: il percorso in Amazon S3 dei file da eliminare nel formatos3://<bucket>/<prefix>/options: opzioni per filtrare i file da eliminare e per la generazione di file manifesto.retentionPeriod: specifica un periodo in numero di ore per la conservazione dei file. I file più recenti del periodo di conservazione vengono mantenuti. Impostato su 168 ore (7 giorni) per impostazione predefinita.excludeStorageClasses: i file con classe di storage nelexcludeStorageClassesnon vengono eliminati. L'impostazione di default èSet(): un set vuoto.manifestFilePath: un percorso facoltativo per la generazione di file manifesto. Tutti i file che sono stati eliminati correttamente vengono registrati inSuccess.csve quelli che non sono riusciti inFailed.csv
transformation_ctx: il contesto di trasformazione da usare (opzionale). Utilizzato nel percorso del file manifest.
glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})transition_table
transition_table(database, table_name, transition_to, options={}, transformation_ctx="", catalog_id=None)
Esegue la transizione della classe di storage dei file archiviati su Amazon S3 per il database e la tabella del catalogo specificati.
Puoi eseguire la transizione tra due classi di archiviazione qualsiasi. Per le classi di archiviazione GLACIER e DEEP_ARCHIVE, è possibile passare a queste classi. Tuttavia, dovresti utilizzare un S3 RESTORE per eseguire la transizione dalle classi di archiviazione GLACIER a DEEP_ARCHIVE.
Se si eseguono processi ETL AWS Glue che leggono file o partizioni da Amazon S3, è possibile escludere alcuni tipi di classe di archiviazione Amazon S3. Per ulteriori informazioni, consulta Esclusione delle classi di archiviazione Amazon S3.
database: il database da usare.table_name: il nome della tabella da usare.transition_to: la classe di storage Amazon S3 in cui eseguire la transizione.options: opzioni per filtrare i file da eliminare e per la generazione di file manifesto.retentionPeriod: specifica un periodo in numero di ore per la conservazione dei file. I file più recenti del periodo di conservazione vengono mantenuti. Impostato su 168 ore (7 giorni) per impostazione predefinita.partitionPredicate: le partizioni che soddisfano questo predicato vengono trasferite. I file all'interno del periodo di conservazione in queste partizioni non vengono passati. Impostato su"": vuoto per impostazione predefinita.excludeStorageClasses: i file con classe di storage nel setexcludeStorageClassesnon vengono passati. L'impostazione di default èSet(): un set vuoto.manifestFilePath: un percorso facoltativo per la generazione di file manifesto. Tutti i file che sono stati passati correttamente vengono registrati inSuccess.csve quelli che non sono riusciti inFailed.csvaccountId: l'ID account Amazon Web Services per eseguire la trasformazione di transizione. Obbligatorio per questa trasformazione.roleArn: il ruolo AWS per eseguire la trasformazione di transizione. Obbligatorio per questa trasformazione.
transformation_ctx: il contesto di trasformazione da usare (opzionale). Utilizzato nel percorso del file manifest.catalog_id: l'ID catalogo del catalogo dati a cui si accede (l'ID account del catalogo dati). Impostato suNoneper default. L'impostazione predefinita diNoneè l'ID catalogo dell'account chiamante nel servizio.
glueContext.transition_table("database", "table", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})transition_s3_path
transition_s3_path(s3_path, transition_to, options={}, transformation_ctx="")
Esegue la transizione della classe di storage nel percorso Amazon S3 specificato in modo ricorsivo.
È possibile eseguire la transizione tra due classi di archiviazione qualsiasi. Per le classi di archiviazione GLACIER e DEEP_ARCHIVE, è possibile passare a queste classi. Tuttavia, si dovrebbe utilizzare un S3 RESTORE per eseguire la transizione dalle classi di archiviazione GLACIER a DEEP_ARCHIVE.
Se esegui processi ETL AWS Glue che leggono file o partizioni da Amazon S3, puoi escludere alcuni tipi di classe di archiviazione Amazon S3. Per ulteriori informazioni, consultare Esclusione delle classi di archiviazione Amazon S3.
s3_path: il percorso in Amazon S3 dei file da convertire nel formatos3://<bucket>/<prefix>/transition_to: la classe di storage Amazon S3 in cui eseguire la transizione.options: opzioni per filtrare i file da eliminare e per la generazione di file manifesto.retentionPeriod: specifica un periodo in numero di ore per la conservazione dei file. I file più recenti del periodo di conservazione vengono mantenuti. Impostato su 168 ore (7 giorni) per impostazione predefinita.partitionPredicate: le partizioni che soddisfano questo predicato vengono trasferite. I file all'interno del periodo di conservazione in queste partizioni non vengono passati. Impostato su"": vuoto per impostazione predefinita.excludeStorageClasses: i file con classe di storage nel setexcludeStorageClassesnon vengono passati. L'impostazione di default èSet(): un set vuoto.manifestFilePath: un percorso facoltativo per la generazione di file manifesto. Tutti i file che sono stati passati correttamente vengono registrati inSuccess.csve quelli che non sono riusciti inFailed.csvaccountId: l'ID account Amazon Web Services per eseguire la trasformazione di transizione. Obbligatorio per questa trasformazione.roleArn: il ruolo AWS per eseguire la trasformazione di transizione. Obbligatorio per questa trasformazione.
transformation_ctx: il contesto di trasformazione da usare (opzionale). Utilizzato nel percorso del file manifest.
glueContext.transition_s3_path("s3://bucket/prefix/", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})Estrazione in corso
extract_jdbc_conf
extract_jdbc_conf(connection_name, catalog_id = None)
Restituisce un dict con chiavi con le proprietà di configurazione dall'oggetto di connessione AWS Glue nel catalogo dati.
user: il nome utente del database.password: la password del database.vendor: specifica un fornitore (mysql,postgresql,oracle,sqlservere così via).enforceSSL: una stringa booleana che indica se è necessaria una connessione sicura.customJDBCCert: utilizza un certificato client specifico dal percorso Amazon S3 indicato.skipCustomJDBCCertValidation: una stringa booleana che indica secustomJDBCCertdeve essere convalidato da una CA.customJDBCCertString: informazioni aggiuntive sul certificato personalizzato, specifico per il tipo di driver.url(obsoleto): l'URL JDBC con solo protocollo, server e porta.fullUrl: l'URL JDBC immesso al momento della creazione della connessione (disponibile in AWS Glueversione 3.0 o successive).
Esempio di recupero delle configurazioni JDBC:
jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_glue_connection_name") print(jdbc_conf) >>> {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:mysql://myserver:3306', 'fullUrl': 'jdbc:mysql://myserver:3306/mydb', 'customJDBCCertString': '', 'user': 'admin', 'customJDBCCert': '', 'password': '1234', 'vendor': 'mysql'}
Transazioni
start_transaction
start_transaction(read_only)
Avvia una nuova transazione. Chiama internamente l'API Lake Formation startTransaction.
read_only: (booleano) indica se questa transazione debba essere di sola lettura o lettura e scrittura. Le scritture effettuate utilizzando un ID transazione di sola lettura verranno rifiutate. Il commit delle transazioni di sola lettura non deve essere eseguito.
Restituisce l'ID transazione.
commit_transaction
commit_transaction(transaction_id, wait_for_commit = True)
Tenta di eseguire il commit della transazione specificata. commit_transaction può rispondere prima che la transazione abbia terminato il commit. Chiama internamente l'API Lake Formation commitTransaction.
transaction_id: (stringa) la transazione di cui eseguire il commit.wait_for_commit: (booleano) determina se ilcommit_transactionrestituisce immediatamente. Il valore di default è true. Se false,commit_transactioneffettua il polling e aspetta che sia stato eseguito il commit della transazione. Il tempo di attesa è limitato a 1 minuto utilizzando il backoff esponenziale con un massimo di 6 tentativi.
Restituisce un valore booleano per indicare se il commit sia stato eseguito o meno.
cancel_transaction
cancel_transaction(transaction_id)
Tenta di annullare la transazione specificata. Restituisce un'eccezione TransactionCommittedException se è stato precedentemente eseguito il commit della transazione. Chiama internamente l'API Lake Formation CancelTransaction.
-
transaction_id: (stringa) la transazione da annullare.
Scrittura
getSink
getSink(connection_type, format = None, transformation_ctx = "", **options)
Ottiene un oggetto DataSink che può essere utilizzato per scrivere DynamicFrames su fonti esterne. Verifica prima il format SparkSQL per essere certo di ricevere il sink previsto.
connection_type: il tipo di connessione da utilizzare, come Amazon S3, Amazon Redshift e JDBC. I valori validi includonos3,mysql,postgresql,redshift,sqlserver,oracle,kinesisekafka.format: il formato SparkSQL da utilizzare (opzionale).transformation_ctx: il contesto di trasformazione da usare (opzionale).options: raccolta di coppie nome-valore utilizzate per specificare le opzioni di connessione. Alcuni dei valori possibili sono:-
userepassword: per l'autorizzazione -
url: l'endpoint per il archivio dati -
dbtable: il nome della tabella di destinazione -
bulkSize: il grado di parallelismo per le operazioni di inserimento
-
Le opzioni che è possibile specificare dipendono dal tipo di connessione. Per ulteriori valori ed esempi, consultare Tipi e opzioni di connessione per ETL in AWS Glue per Spark.
Esempio:
>>> data_sink = context.getSink("s3") >>> data_sink.setFormat("json"), >>> data_sink.writeFrame(myFrame)
write_dynamic_frame_from_options
write_dynamic_frame_from_options(frame, connection_type, connection_options={}, format=None,
format_options={}, transformation_ctx = "")
Legge e restituisce un DynamicFrame usando la connessione e il formato specificati.
frame: ilDynamicFrameda scrivere.connection_type: il tipo di connessione, come Amazon S3, Amazon Redshift e JDBC. I valori validi includonos3,mysql,postgresql,redshift,sqlserver,oracle,kinesisekafka.connection_options: opzioni di connessione, come tabella di database e percorso (opzionale). Per unconnection_typedis3è definito un percorso Amazon S3.connection_options = {"path": "s3://aws-glue-target/temp"}Per le connessioni JDBC, diverse proprietà devono essere definite. Il nome del database deve fare parte dell'URL. Puoi opzionalmente essere incluso nelle opzioni di connessione.
avvertimento
Si consiglia di non archiviare le password nello script. Valuta la possibilità di utilizzare
boto3per recuperarle da Gestione dei segreti AWS o da Catalogo dati AWS Glue.connection_options = {"url": "jdbc-url/database", "user": "username", "password":passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}La proprietà
dbtableè il nome della tabella JDBC. Per i archivi dati JDBC che supportano schemi all'interno di un database, specificaschema.table-name. Se non viene fornito alcuno schema, viene usato lo schema “pubblico” predefinito.Per ulteriori informazioni, consultare Tipi e opzioni di connessione per ETL in AWS Glue per Spark.
format: una specifica di formato. Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Consultare Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.format_options: opzioni di formato per il formato specificato. Consultare Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.transformation_ctx: un contesto di trasformazione da usare (opzionale).
write_from_options
write_from_options(frame_or_dfc, connection_type,
connection_options={}, format={}, format_options={}, transformation_ctx = "")
Scrive e restituisce un DynamicFrame o una DynamicFrameCollection creati con la connessione e le informazioni di formattazione specificati.
frame_or_dfc: ilDynamicFrameo laDynamicFrameCollectionper scrivere.connection_type: il tipo di connessione, come Amazon S3, Amazon Redshift e JDBC. I valori validi sonos3,mysql,postgresql,redshift,sqlservereoracle.connection_options: opzioni di connessione, come tabella di database e percorso (opzionale). Per unconnection_typedis3è definito un percorso Amazon S3.connection_options = {"path": "s3://aws-glue-target/temp"}Per le connessioni JDBC, diverse proprietà devono essere definite. Il nome del database deve fare parte dell'URL. Può opzionalmente essere incluso nelle opzioni di connessione.
avvertimento
Si consiglia di non archiviare le password nello script. Valutare la possibilità di utilizzare
boto3per recuperarle da Gestione dei segreti AWS o da Catalogo dati AWS Glue.connection_options = {"url": "jdbc-url/database", "user": "username", "password":passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}La proprietà
dbtableè il nome della tabella JDBC. Per i archivi dati JDBC che supportano schemi all'interno di un database, specificareschema.table-name. Se non viene fornito alcuno schema, viene usato lo schema “pubblico” predefinito.Per ulteriori informazioni, consultare Tipi e opzioni di connessione per ETL in AWS Glue per Spark.
format: una specifica di formato. Viene usata per una connessione Amazon S3 o AWS Glue che supporta più formati. Consultare Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.format_options: opzioni di formato per il formato specificato. Consultare Opzioni del formato dati per input e output in AWS Glue per Spark per informazioni sui formati supportati.transformation_ctx: un contesto di trasformazione da usare (opzionale).
write_dynamic_frame_from_catalog
write_dynamic_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)
Scrive e restituisce un DynamicFrame utilizzando un database del catalogo dati e una tabella.
frame: ilDynamicFrameda scrivere.Database: il database del catalogo dati che contiene la tabella.table_name: il nome della tabella del catalogo dati associata alla destinazione.redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).transformation_ctx: il contesto di trasformazione da usare (opzionale).-
additional_options: una raccolta di coppie nome/valore opzionali. catalog_id: l'ID catalogo (ID account) del catalogo dati a cui si accede. Se Nessuno, viene utilizzato l'ID account predefinito del chiamante.
write_data_frame_from_catalog
write_data_frame_from_catalog(frame, database, table_name, redshift_tmp_dir,
transformation_ctx = "", additional_options = {}, catalog_id = None)
Scrive e restituisce un DataFrame utilizzando un database del catalogo dati e una tabella. Questo metodo supporta la scrittura nei formati di data lake (Hudi, Iceberg e Delta Lake). Per ulteriori informazioni, consultare Utilizzo di framework data lake con processi ETL di AWS Glue.
frame: ilDataFrameda scrivere.Database: il database del catalogo dati che contiene la tabella.table_name: il nome della tabella del catalogo dati associata alla destinazione.redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).transformation_ctx: il contesto di trasformazione da usare (opzionale).-
additional_options: una raccolta di coppie nome/valore opzionali.-
useSparkDataSink: se impostato su true, forza AWS Glue a utilizzare l'API nativa di Spark Data Sink per scrivere sulla tabella. Quando si abilita questa opzione, è possibile anche aggiungere una qualsiasi delle opzioni Spark Data Sourcea additional_optionsa seconda delle necessità. AWS Glue passa queste opzioni direttamente allo scrittore Spark.
-
catalog_id: l'ID catalogo (ID account) del catalogo dati a cui si accede. Se non si specifica un valore, verrà utilizzato l'ID account predefinito del chiamante.
Limitazioni
Quando si utilizza l'opzione useSparkDataSink considerare le seguenti limitazioni:
-
L'opzione enableUpdateCatalog non è supportata quando si utilizza l'opzione
useSparkDataSink.
Esempio: scrittura su una tabella Hudi utilizzando lo scrittore Spark Data Source
hudi_options = { 'useSparkDataSink': True, 'hoodie.table.name':<table_name>, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name':<table_name>, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database':<database_name>, 'hoodie.datasource.hive_sync.table':<table_name>, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms'} glueContext.write_data_frame.from_catalog( frame =<df_product_inserts>, database =<database_name>, table_name =<table_name>, additional_options = hudi_options )
write_dynamic_frame_from_jdbc_conf
write_dynamic_frame_from_jdbc_conf(frame, catalog_connection, connection_options={},
redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)
Legge e restituisce un DynamicFrame usando le informazioni sulla connessione JDBC specificate.
frame: ilDynamicFrameda scrivere.catalog_connection: una connessione del catalogo da utilizzare.connection_options: opzioni di connessione, come tabella di database e percorso (opzionale). Per ulteriori informazioni, consultare Tipi e opzioni di connessione per ETL in AWS Glue per Spark.redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).transformation_ctx: un contesto di trasformazione da usare (opzionale).catalog_id: l'ID catalogo (ID account) del catalogo dati a cui si accede. Se Nessuno, viene utilizzato l'ID account predefinito del chiamante.
write_from_jdbc_conf
write_from_jdbc_conf(frame_or_dfc, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)
Legge e restituisce un DynamicFrame o una DynamicFrameCollection usando le informazioni sulla connessione JDBC specificate.
frame_or_dfc: ilDynamicFrameo laDynamicFrameCollectionper scrivere.catalog_connection: una connessione del catalogo da utilizzare.connection_options: opzioni di connessione, come tabella di database e percorso (opzionale). Per ulteriori informazioni, consultare Tipi e opzioni di connessione per ETL in AWS Glue per Spark.redshift_tmp_dir: una directory temporanea Amazon Redshift da usare (opzionale).transformation_ctx: un contesto di trasformazione da usare (opzionale).catalog_id: l'ID catalogo (ID account) del catalogo dati a cui si accede. Se Nessuno, viene utilizzato l'ID account predefinito del chiamante.