Esecuzione di processi ETL su tabelle Amazon S3 con AWS Glue - Amazon Simple Storage Service

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esecuzione di processi ETL su tabelle Amazon S3 con AWS Glue

AWS Glue è un servizio di integrazione dei dati senza server che consente agli utenti di analisi di scoprire, preparare, spostare e integrare facilmente i dati provenienti da più fonti. Puoi utilizzare i AWS Glue processi per eseguire pipeline di estrazione, trasformazione e caricamento (ETL) per caricare i dati nei tuoi data lake. Per ulteriori informazioni su AWS Glue, consulta What is? AWS Glue nella Guida per gli AWS Glue sviluppatori.

Un AWS Glue job incapsula uno script che si connette ai dati di origine, li elabora e quindi li scrive nella destinazione dei dati. Di solito un processo esegue script di estrazione, trasformazione e caricamento (ETL). I processi possono eseguire script progettati per gli ambienti di runtime di Apache Spark. È possibile monitorare le esecuzioni dei processi per comprendere i parametri di runtime come esito positivo, durata e ora di inizio.

Puoi utilizzare i AWS Glue job per elaborare i dati nelle tue tabelle S3 connettendoti alle tabelle tramite l'integrazione con i servizi di AWS analisi oppure connetterti direttamente utilizzando l'endpoint Amazon S3 Iceberg REST Tables o Amazon S3 Tables Catalog for. Apache Iceberg Questa guida illustra i passaggi di base per iniziare a utilizzare S3 Tables, tra AWS Glue cui:

Scegliete il metodo di accesso in base ai requisiti specifici del vostro lavoro AWS Glue ETL:

  • AWS integrazione dei servizi di analisi (consigliata): consigliata quando hai bisogno di una gestione centralizzata dei metadati su più servizi di AWS analisi, devi sfruttare le autorizzazioni esistenti di AWS Glue Data Catalog e il controllo granulare degli accessi con Lake Formation o stai costruendo pipeline ETL di produzione che si integrano con altri servizi AWS come Athena o Amazon EMR.

  • Iceberg RESTEndpoint Amazon S3 Tables: consigliato quando è necessario connettersi alle tabelle S3 da motori di query di terze parti che supportanoApache Iceberg, creano applicazioni ETL personalizzate che richiedono l'accesso diretto all'API REST o quando è necessario il controllo delle operazioni del catalogo senza dipendenze da Data Catalog. AWS Glue

  • Catalogo di Tabelle Amazon S3 per Apache Iceberg: da utilizzare solo per applicazioni legacy o scenari di programmazione specifici che richiedono la libreria client Java. Questo metodo non è consigliato per le nuove implementazioni di job AWS Glue ETL a causa della complessità e della gestione delle dipendenze aggiuntive. JAR

Nota

Il servizio Tabelle Amazon S3 è supportato su AWS Glue versione 5.0 o successiva.

Fase 1: prerequisiti

Prima di poter interrogare le tabelle da un AWS Glue job, devi configurare un ruolo IAM da AWS Glue utilizzare per eseguire il job. Scegli il metodo di accesso per visualizzare i prerequisiti specifici relativi a quel metodo.

AWS analytics services integration (Recommended)

Prerequisiti richiesti per utilizzare l'integrazione di AWS analisi di S3 Tables per eseguire AWS Glue i job.

Amazon S3 Tables Iceberg REST endpoint

Prerequisiti per utilizzare l'endpoint Amazon S3 Iceberg REST Tables per AWS Glue eseguire processi ETL.

  • Crea un ruolo IAM per. AWS Glue

    • Collegare la policy gestita AmazonS3TablesFullAccess al ruolo.

    • Collegare la policy gestita AmazonS3FullAccess al ruolo.

Amazon S3 Tables Catalog for Apache Iceberg

I prerequisiti utilizzano Amazon S3 Tables Catalog Apache Iceberg per AWS Glue eseguire processi ETL.

  • Crea un ruolo IAM per. AWS Glue

    • Collegare la policy gestita AmazonS3TablesFullAccess al ruolo.

    • Collegare la policy gestita AmazonS3FullAccess al ruolo.

    • Per utilizzare il catalogo di Tabelle Amazon S3 per Apache Iceberg, è necessario scaricare il catalogo client JAR e caricarlo in un bucket S3.

      Download del file JAR del catalogo
      1. Individua la versione più recente su Maven Central. Puoi scaricare il file JAR da Maven Central tramite il browser oppure utilizzando il seguente comando. Assicurati di sostituirlo version number con la versione più recente.

        wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/0.1.5/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar
      2. Carica il file JAR scaricato in un bucket S3 accessibile dal tuo ruolo IAM di AWS Glue . È possibile utilizzare il seguente AWS CLI comando per caricare ilJAR. Assicurati di sostituirla version number con la versione più recente e la bucket name e path con la tua.

        aws s3 cp s3-tables-catalog-for-iceberg-runtime-0.1.5.jar s3://amzn-s3-demo-bucket/jars/

Fase 2: creazione di uno script per il collegamento ai bucket della tabella

Per accedere ai dati della tabella quando esegui un processo AWS Glue ETL, configuri una Spark sessione per la Apache Iceberg connessione al tuo table bucket S3. Puoi modificare uno script esistente per collegarti al bucket della tabella o creare un nuovo script. Per ulteriori informazioni sulla creazione di AWS Glue script, consulta Tutorial: Writing an AWS Glue for Spark script nella Developer Guide.AWS Glue

Puoi configurare la sessione per connetterti ai bucket della tabella utilizzando uno dei seguenti metodi di accesso di Tabelle Amazon S3:

  • Integrazione dei servizi di AWS analisi di S3 Tables (consigliata)

  • Endpoint Iceberg REST di Tabelle Amazon S3

  • Catalogo di Tabelle Amazon S3 per Apache Iceberg

Scegli tra i seguenti metodi di accesso per visualizzare le istruzioni e gli esempi di configurazione.

AWS analytics services integration (Recommended)

Come prerequisito per eseguire query nelle tabelle AWS Glue utilizzando l'integrazione dei servizi di AWS analisi, è necessario integrare i bucket di tabelle con Spark i servizi di analisi AWS

Puoi configurare la connessione al tuo table bucket tramite una Spark sessione in un job o con AWS Glue Studio magics in una sessione interattiva. Per utilizzare gli esempi seguenti, sostituiscili placeholder values con le informazioni per il tuo table bucket.

Utilizzo di uno script PySpark

Utilizza il seguente frammento di codice in PySpark uno script per configurare un AWS Glue job per la connessione al tuo table bucket utilizzando l'integrazione.

spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") \ .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/warehouse/") \ .getOrCreate()
Utilizzo di una sessione AWS Glue interattiva

Se utilizzi una sessione notebook interattiva con AWS Glue 5.0, specifica le stesse configurazioni usando il comando magic %%configure in una cella prima dell’esecuzione del codice.

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tables --conf spark.sql.catalog.s3tables=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tables.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.s3tables.glue.id=111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3tables.warehouse=s3://amzn-s3-demo-table-bucket/warehouse/"}
Amazon S3 Tables Iceberg REST endpoint

Puoi configurare la connessione al tuo table bucket tramite una Spark sessione in un job o con AWS Glue Studio magics in una sessione interattiva. Per utilizzare gli esempi seguenti, sostituiscili placeholder values con le informazioni per il tuo table bucket.

Utilizzo di uno script PySpark

Utilizza il seguente frammento di codice in PySpark uno script per configurare un AWS Glue job per la connessione al tuo table bucket utilizzando l'endpoint.

spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate()
Utilizzo di una sessione AWS Glue interattiva

Se stai usando una sessione interattiva di notebook con AWS Glue 5.0, specifica le stesse configurazioni usando la %%configure magia in una cella prima dell'esecuzione del codice. Sostituisci i valori segnaposto con le informazioni relative al bucket della tabella.

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3_rest_catalog --conf spark.sql.catalog.s3_rest_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3_rest_catalog.type=rest --conf spark.sql.catalog.s3_rest_catalog.uri=https://s3tables.Region.amazonaws.com/iceberg --conf spark.sql.catalog.s3_rest_catalog.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled=true --conf spark.sql.catalog.s3_rest_catalog.rest.signing-name=s3tables --conf spark.sql.catalog.s3_rest_catalog.rest.signing-region=Region --conf spark.sql.catalog.s3_rest_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled=false"}
Amazon S3 Tables Catalog for Apache Iceberg

Come prerequisito per la connessione alle tabelle utilizzando il catalogo di Tabelle Amazon S3 per Apache Iceberg, devi innanzitutto scaricare il file jar del catalogo più recente e caricarlo in un bucket S3. Quindi, quando crei il processo, devi aggiungere il percorso al file JAR del catalogo client come parametro speciale. Per ulteriori informazioni sui parametri dei job in AWS Glue, consultate Parametri speciali usati nei AWS Glue job nella AWS Glue Developer Guide.

Puoi configurare la connessione al tuo table bucket tramite una Spark sessione in un job o con AWS Glue Studio magics in una sessione interattiva. Per utilizzare gli esempi seguenti, sostituiscili placeholder values con le informazioni per il tuo table bucket.

Utilizzo di uno script PySpark

Utilizza il seguente frammento di codice in PySpark uno script per configurare un AWS Glue processo per la connessione al tuo table bucket utilizzando. JAR Sostituisci i valori segnaposto con le informazioni relative al bucket della tabella.

spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate()
Utilizzo di una sessione AWS Glue interattiva

Se state utilizzando una sessione interattiva con notebook AWS Glue 5.0, specificate le stesse configurazioni utilizzando la funzione %%configure Magic in a cell prima dell'esecuzione del codice. Sostituisci i valori segnaposto con le informazioni relative al bucket della tabella.

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tablesbucket --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket", "extra-jars": "s3://amzn-s3-demo-bucket/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"}

Script di esempio

I seguenti PySpark script di esempio possono essere usati per testare l'interrogazione delle tabelle S3 con un job. AWS Glue Questi script si connettono al bucket della tabella ed eseguono query per creare un nuovo namespace, creare una tabella di esempio, inserire dati nella tabella e restituire i dati della tabella. Per utilizzare gli script, sostituiscili placeholder values con le informazioni per il tuo table bucket.

Scegli tra i seguenti script in base al tuo metodo di accesso a Tabelle S3.

S3 Tables integration with AWS analytics services
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() namespace = "new_namespace" table = "new_table" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Iceberg REST endpoint
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate() namespace = "s3_tables_rest_namespace" table = "new_table_s3_rest" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Catalog for Apache Iceberg
from pyspark.sql import SparkSession #Spark session configurations spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() #Script namespace = "new_namespace" table = "new_table" spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}") spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()

Fase 3: Creare un AWS Glue job che interroghi le tabelle

Le seguenti procedure mostrano come configurare i AWS Glue job che si connettono ai bucket di tabella S3. Puoi farlo usando AWS CLI o usando la console con l'editor di AWS Glue Studio script. Per ulteriori informazioni, consulta Creazione di processi in AWS Glue nella Guida per l’utente di AWS Glue .

La procedura seguente mostra come utilizzare l'editor di AWS Glue Studio script per creare un job ETL che interroga le tabelle S3.

  1. Apri la console all' AWS Glue indirizzo. https://console.aws.amazon.com/glue/

  2. Nel riquadro di navigazione, seleziona Processi ETL.

  3. Seleziona Editor di script, quindi scegli Carica script e carica lo script PySpark che hai creato per interrogare le tabelle S3.

  4. Seleziona la scheda Dettagli del processo e inserisci quanto segue per Proprietà di base.

    • In Nome, inserisci un nome per il processo.

    • In Ruolo IAM, seleziona il ruolo creato per AWS Glue.

  5. (Facoltativo) Se utilizzi il metodo di accesso Amazon S3 Tables Catalog per il metodo di Apache Iceberg accesso, espandi le proprietà avanzate e per il JARs percorso dipendente, inserisci l'URI S3 del jar del catalogo client che hai caricato in un bucket S3 come prerequisito. Ad esempio, s3:///s3- -runtime- .jar amzn-s3-demo-bucket1 jars tables-catalog-for-iceberg 0.1.5

  6. Seleziona Salva per creare il processo.

  7. Seleziona Esegui per avviare il processo e verifica lo stato del processo stesso nella scheda Esecuzioni.

La procedura seguente mostra come utilizzare per AWS CLI creare un job ETL che interroga le tabelle S3. Per utilizzare i comandi, sostituiscili placeholder values con i tuoi.

  1. Crea un AWS Glue lavoro.

    aws glue create-job \ --name etl-tables-job \ --role arn:aws:iam::111122223333:role/AWSGlueServiceRole \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://amzn-s3-demo-bucket1/scripts/glue-etl-query.py", "PythonVersion": "3" }' \ --default-arguments '{ "--job-language": "python", "--class": "GlueApp" }' \ --glue-version "5.0"
    Nota

    (Facoltativo) Se utilizzi il catalogo di Tabelle Amazon S3 per Apache Iceberg come metodo di accesso, aggiungi il file JAR del catalogo client a --default-arguments utilizzando il parametro --extra-jars. Sostituiscilo input placeholders con il tuo quando aggiungi il parametro.

    "--extra-jars": "s3://amzn-s3-demo-bucket/jar-path/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"
  2. Avvia il processo.

    aws glue start-job-run \ --job-name etl-tables-job
  3. Per verificare lo stato del processo, copia l’ID di esecuzione dal comando precedente e inseriscilo nel comando seguente.

    aws glue get-job-run --job-name etl-tables-job \ --run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad