Esecuzione di processi ETL su tabelle Amazon S3 con AWS Glue - Amazon Simple Storage Service

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esecuzione di processi ETL su tabelle Amazon S3 con AWS Glue

AWS Glue è un servizio di integrazione dei dati senza server che consente agli utenti di analisi di scoprire, preparare, spostare e integrare facilmente i dati provenienti da più fonti. Puoi utilizzare i AWS Glue processi per eseguire pipeline di estrazione, trasformazione e caricamento (ETL) per caricare i dati nei tuoi data lake. Per ulteriori informazioni su AWS Glue, consulta What is? AWS Glue nella Guida per gli AWS Glue sviluppatori.

Un AWS Glue job incapsula uno script che si connette ai dati di origine, li elabora e quindi li scrive nella destinazione dei dati. Di solito un processo esegue script di estrazione, trasformazione e caricamento (ETL). Jobs può eseguire script progettati per Apache Spark ambienti di runtime. È possibile monitorare le esecuzioni dei processi per comprendere i parametri di runtime come esito positivo, durata e ora di inizio.

Puoi utilizzare i AWS Glue job per elaborare i dati nelle tue tabelle S3 connettendoti alle tabelle tramite l'integrazione con i servizi di AWS analisi oppure connetterti direttamente utilizzando l'endpoint Amazon S3 Iceberg REST Tables o Amazon S3 Tables Catalog for. Apache Iceberg Questa guida illustra i passaggi di base per iniziare a utilizzare S3 Tables, tra AWS Glue cui:

Nota

S3 Tables è supportato dalla AWS Glue versione 5.0 o successiva.

Prerequisiti

Prima di poter interrogare le tabelle da un AWS Glue job, devi configurare un ruolo IAM da AWS Glue utilizzare per eseguire il job e caricare Amazon S3 Tables Catalog for su un bucket S3 Apache Iceberg JAR a cui AWS Glue possa accedere quando esegue il job.

  • Integrare i bucket di tabelle con i servizi di analisi di AWS.

  • Crea un ruolo IAM per. AWS Glue

    • Allega la policy AmazonS3TablesFullAccess gestita al ruolo.

    • Allega la policy AmazonS3FullAccess gestita al ruolo.

  • (Facoltativo) Se utilizzi Amazon S3 Tables Catalog for, Apache Iceberg devi scaricare il catalogo client JAR e caricarlo in un bucket S3.

    Scaricamento del catalogo JAR
    1. Controlla la versione più recente su Maven Central. Puoi scaricarlo JAR da Maven Central usando il tuo browser o usando il seguente comando. Assicurati di sostituirlo version number con la versione più recente.

      wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/0.1.5/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar
    2. Carica i file scaricati JAR in un bucket S3 a cui il tuo ruolo AWS Glue IAM può accedere. Puoi usare il seguente AWS CLI comando per caricare il. JAR Assicurati di sostituirla version number con la versione più recente e la bucket name e path con la tua.

      aws s3 cp s3-tables-catalog-for-iceberg-runtime-0.1.5.jar s3://amzn-s3-demo-bucket/jars/

Crea uno script per connetterti ai bucket da tavolo

Per accedere ai dati della tabella quando esegui un processo AWS Glue ETL, configuri una Spark sessione per la Apache Iceberg connessione al tuo table bucket S3. Puoi modificare uno script esistente per connetterti al tuo table bucket o creare un nuovo script. Per ulteriori informazioni sulla creazione di AWS Glue script, consulta Tutorial: Writing an AWS Glue for Spark script nella Developer Guide.AWS Glue

Puoi configurare la sessione per connetterti ai tuoi table bucket tramite uno dei seguenti metodi di accesso alle tabelle S3 Tables:

  • Integrazione di S3 Tables con i servizi di analisi AWS

  • Endpoint Amazon S3 Tables Iceberg REST

  • Catalogo di tabelle Amazon S3 per Apache Iceberg

Scegli tra i seguenti metodi di accesso per visualizzare le istruzioni di configurazione e gli esempi di configurazione.

AWS analytics services integration

Come prerequisito per eseguire query Spark sulle tabelle AWS Glue utilizzando l'integrazione dei servizi di AWS analisi, è necessario integrare i bucket di tabelle con AWS i servizi di analisi

Puoi configurare la connessione al tuo table bucket tramite una Spark sessione in un job o con AWS Glue Studio magics in una sessione interattiva. Per utilizzare gli esempi seguenti, sostituiscili placeholder values con le informazioni per il tuo table bucket.

Utilizzo di uno script PySpark

Utilizza il seguente frammento di codice in PySpark uno script per configurare un AWS Glue job per la connessione al tuo table bucket utilizzando l'integrazione.

spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") \ .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/warehouse/") \ .getOrCreate()
Utilizzo di una sessione interattiva AWS Glue

Se state usando una sessione interattiva con notebook AWS Glue 5.0, specificate le stesse configurazioni usando la %%configure magia in una cella prima dell'esecuzione del codice.

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tables --conf spark.sql.catalog.s3tables=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tables.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.s3tables.glue.id=111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3tables.warehouse=s3://amzn-s3-demo-table-bucket/warehouse/"}
Amazon S3 Tables Iceberg REST endpoint

Puoi configurare la connessione al tuo table bucket tramite una Spark sessione in un job o con AWS Glue Studio magics in una sessione interattiva. Per utilizzare gli esempi seguenti, sostituiscili placeholder values con le informazioni per il tuo table bucket.

Utilizzo di uno script PySpark

Utilizza il seguente frammento di codice in PySpark uno script per configurare un AWS Glue job per la connessione al tuo table bucket utilizzando l'endpoint.

spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate()
Utilizzando una sessione interattiva AWS Glue

Se state usando una sessione interattiva con notebook AWS Glue 5.0, specificate le stesse configurazioni usando la %%configure magia in una cella prima dell'esecuzione del codice. Sostituisci i valori segnaposto con le informazioni per il tuo bucket da tavolo.

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3_rest_catalog --conf spark.sql.catalog.s3_rest_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3_rest_catalog.type=rest --conf spark.sql.catalog.s3_rest_catalog.uri=https://s3tables.Region.amazonaws.com/iceberg --conf spark.sql.catalog.s3_rest_catalog.warehouse=arn:aws:s3tables:Region:111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled=true --conf spark.sql.catalog.s3_rest_catalog.rest.signing-name=s3tables --conf spark.sql.catalog.s3_rest_catalog.rest.signing-region=Region --conf spark.sql.catalog.s3_rest_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled=false"}
Amazon S3 Tables Catalog for Apache Iceberg

Come prerequisito per la connessione alle tabelle utilizzando Amazon S3 Tables Apache Iceberg Catalog, devi prima scaricare il file jar del catalogo più recente e caricarlo in un bucket S3. Quindi, quando crei il tuo lavoro, aggiungi il percorso al catalogo dei clienti JAR come parametro speciale. Per ulteriori informazioni sui parametri dei job in AWS Glue, consultate Parametri speciali usati nei AWS Glue job nella AWS Glue Developer Guide.

Puoi configurare la connessione al tuo table bucket tramite una Spark sessione in un job o con AWS Glue Studio magics in una sessione interattiva. Per utilizzare gli esempi seguenti, sostituiscili placeholder values con le informazioni per il tuo table bucket.

Utilizzo di uno script PySpark

Utilizza il seguente frammento di codice in PySpark uno script per configurare un AWS Glue job per la connessione al tuo table bucket utilizzando. JAR Sostituisci i valori segnaposto con le informazioni per il tuo table bucket.

spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate()
Utilizzando una sessione interattiva AWS Glue

Se state usando una sessione interattiva con notebook AWS Glue 5.0, specificate le stesse configurazioni usando la %%configure magia in una cella prima dell'esecuzione del codice. Sostituisci i valori segnaposto con le informazioni per il tuo bucket da tavolo.

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tablesbucket --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket", "extra-jars": "s3://amzn-s3-demo-bucket/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"}

Script di esempio

I seguenti PySpark script di esempio possono essere usati per testare l'interrogazione delle tabelle S3 con un job. AWS Glue Questi script si connettono al tuo table bucket ed eseguono query per: creare un nuovo namespace, creare una tabella di esempio, inserire dati nella tabella e restituire i dati della tabella. Per utilizzare gli script, sostituiscili placeholder values con le informazioni per il tuo table bucket.

Scegli tra i seguenti script in base al tuo metodo di accesso a S3 Tables.

S3 Tables integration with AWS analytics services
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() namespace = "new_namespace" table = "new_table" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Iceberg REST endpoint
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate() namespace = "s3_tables_rest_namespace" table = "new_table_s3_rest" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Catalog for Apache Iceberg
from pyspark.sql import SparkSession #Spark session configurations spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() #Script namespace = "new_namespace" table = "new_table" spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}") spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()

Crea un AWS Glue lavoro che interroga le tabelle

Le seguenti procedure mostrano come configurare i AWS Glue job che si connettono ai bucket di tabella S3. Puoi farlo usando AWS CLI o usando la console con l'editor di AWS Glue Studio script. Per ulteriori informazioni, consulta la sezione Authoring jobs AWS Glue nella Guida per l'AWS Glue utente.

La procedura seguente mostra come utilizzare l'editor di AWS Glue Studio script per creare un job ETL che interroga le tabelle S3.

  1. Apri la console all' AWS Glue indirizzo. https://console.aws.amazon.com/glue/

  2. Dal riquadro di navigazione, scegli ETL jobs.

  3. Scegli Script editor, quindi scegli Carica script e carica lo PySpark script che hai creato per interrogare le tabelle S3.

  4. Seleziona la scheda Dettagli del lavoro e inserisci quanto segue per le proprietà di base.

    • In Nome, inserisci un nome per il lavoro.

    • Per IAM Role, seleziona il ruolo per cui hai creato AWS Glue.

  5. (Facoltativo) Se utilizzi il metodo di accesso Amazon S3 Tables Catalog per il metodo di Apache Iceberg accesso, espandi le proprietà avanzate e per il JARs percorso dipendente, inserisci l'URI S3 del jar del catalogo client che hai caricato in un bucket S3 come prerequisito. Ad esempio, s3:///s3- -runtime- .jar amzn-s3-demo-bucket1 jars tables-catalog-for-iceberg 0.1.5

  6. Scegliete Salva per creare il lavoro.

  7. Scegli Esegui, avvia il processo e controlla lo stato del lavoro nella scheda Esecuzioni.

La procedura seguente mostra come utilizzare per AWS CLI creare un job ETL che interroga le tabelle S3. Per utilizzare i comandi, sostituiscili placeholder values con i tuoi.

Prerequisiti
  1. Crea un AWS Glue lavoro.

    aws glue create-job \ --name etl-tables-job \ --role arn:aws:iam::111122223333:role/AWSGlueServiceRole \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://amzn-s3-demo-bucket1/scripts/glue-etl-query.py", "PythonVersion": "3" }' \ --default-arguments '{ "--job-language": "python", "--class": "GlueApp" }' \ --glue-version "5.0"
    Nota

    (Facoltativo) Se utilizzi il metodo di accesso Amazon S3 Tables Catalog for Apache Iceberg access, aggiungi il catalogo client JAR all'--default-argumentsutilizzo del --extra-jars parametro. Sostituiscilo input placeholders con il tuo quando aggiungi il parametro.

    "--extra-jars": "s3://amzn-s3-demo-bucket/jar-path/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"
  2. Inizia il tuo lavoro.

    aws glue start-job-run \ --job-name etl-tables-job
  3. Per verificare lo stato del processo, copia l'ID di esecuzione dal comando precedente e inseriscilo nel comando seguente.

    aws glue get-job-run --job-name etl-tables-job \ --run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad