Ejecución de trabajos de ETL en tablas de Amazon S3 con AWS Glue - Amazon Simple Storage Service

Ejecución de trabajos de ETL en tablas de Amazon S3 con AWS Glue

AWS Glue es un servicio de integración de datos sin servidor que facilita a los usuarios de análisis descubrir, preparar, migrar e integrar datos de varios orígenes. Puede utilizar trabajos de AWS Glue con el fin de ejecutar canalizaciones de extracción, transformación y carga (ETL) para cargar datos en los lagos de datos. Para obtener más información sobre AWS Glue, consulte ¿Qué es AWS Glue? en la Guía para desarrolladores de AWS Glue.

Un trabajo de AWS Glue encapsula un script que se conecta a los datos de origen, los procesa y, a continuación, los escribe en el destino de datos. Normalmente, un trabajo ejecuta scripts de extracción, transformación y carga (ETL). Los trabajos pueden ejecutar scripts diseñados para los entornos de tiempo de ejecución de Apache Spark. Puede monitorear las ejecuciones de trabajos para comprender las métricas de tiempo de ejecución como el estado de realización, la duración y la hora de inicio.

Puede utilizar los trabajos de AWS Glue para procesar los datos de las tablas de S3 conectándose a las tablas mediante la integración con los servicios de análisis de AWS, o bien, conectarse directamente mediante el punto de conexión de tablas de Amazon S3 de Iceberg REST o el catálogo de tablas de Amazon S3 para Apache Iceberg. Esta guía cubre los pasos básicos para empezar a utilizar AWS Glue con tablas de S3, como:

nota

Tablas de S3 es compatible con AWS Glue versión 5.0 o superior.

Requisitos previos

Para poder consultar tablas desde un trabajo de AWS Glue, debe configurar un rol de IAM que AWS Glue pueda utilizar para ejecutar el trabajo y cargar el archivo JAR del catálogo de Tablas de Amazon S3 para Apache Iceberg en un bucket de S3 al que AWS Glue pueda acceder cuando ejecute el trabajo.

  • Integre los buckets de tablas con los servicios de análisis de AWS.

  • Cree un rol de IAM para AWS Glue.

    • Asocie la política administrada por AmazonS3TablesFullAccess al rol.

    • Asocie la política administrada por AmazonS3FullAccess al rol.

  • (Opcional) Si utiliza el catálogo de tablas de Amazon S3 para Apache Iceberg, debe descargar el catálogo del cliente JAR y cargarlo en un bucket de S3.

    Descarga del catálogo JAR
    1. Busque la última versión en Maven Central. Puede descargar el archivo JAR desde Maven Central con el navegador o con el siguiente comando. Asegúrese de reemplazar el número de versión por la última versión.

      wget https://repo1.maven.org/maven2/software/amazon/s3tables/s3-tables-catalog-for-iceberg-runtime/0.1.5/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar
    2. Cargue el archivo JAR descargado en un bucket de S3 al que pueda acceder el rol de IAM de AWS Glue. Puede utilizar el siguiente comando de la AWS CLI para cargar el archivo JAR. Asegúrese de reemplazar el número de versión por la versión más reciente y el nombre del bucket y la ruta por los suyos.

      aws s3 cp s3-tables-catalog-for-iceberg-runtime-0.1.5.jar s3://amzn-s3-demo-bucket/jars/

Creación de un script para conectarse a los buckets de tabla

Para acceder a los datos de la tabla cuando ejecuta un trabajo de ETL de AWS Glue, se configura una sesión de Spark para Apache Iceberg que se conecta al bucket de tablas de S3. Puede modificar un script existente para conectarse al bucket de tablas o crear un nuevo script. Para obtener más información sobre cómo crear scripts de AWS Glue, consulte Tutorial: Writing an AWS Glue for Spark script en la Guía para desarrolladores de AWS Glue.

Puede configurar la sesión para conectarse a los buckets de tablas mediante cualquiera de los siguientes métodos de acceso de las tablas de S3:

  • Integración de tablas de S3 con servicios de análisis de AWS

  • Punto de conexión de Iceberg REST de tablas de Amazon S3

  • Catálogo de tablas de Amazon S3 para Apache Iceberg

Elija uno de los siguientes métodos de acceso para ver las instrucciones de configuración y los ejemplos de configuración.

AWS analytics services integration

Como requisito previo para consultar tablas con Spark en AWS Glue mediante la integración de servicios de análisis de AWS, debe Integrar los buckets de tabla con los servicios de análisis de AWS

Puede configurar la conexión al bucket de tabla mediante una sesión de Spark en un trabajo o con comandos mágicos de AWS Glue Studio en una sesión interactiva. Para utilizar los ejemplos siguientes, sustituya los valores de marcador de posición por la información del bucket de tablas propio.

Uso de un script de PySpark

Utilice el siguiente fragmento de código en un script de PySpark para configurar un trabajo de AWS Glue que se conecte al bucket de tabla mediante la integración.

spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") \ .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/warehouse/") \ .getOrCreate()
Uso de una sesión de AWS Glue interactiva

Si utiliza una sesión de cuaderno interactivo con AWS Glue 5.0, especifique las mismas configuraciones mediante el comando mágico %%configure en una celda antes de ejecutar el código.

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tables --conf spark.sql.catalog.s3tables=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tables.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.s3tables.glue.id=111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3tables.warehouse=s3://amzn-s3-demo-table-bucket/warehouse/"}
Amazon S3 Tables REST de Iceberg endpoint

Puede configurar la conexión al bucket de tabla mediante una sesión de Spark en un trabajo o con comandos mágicos de AWS Glue Studio en una sesión interactiva. Para utilizar los ejemplos siguientes, sustituya los valores de marcador de posición por la información del bucket de tablas propio.

Uso de un script de PySpark

Utilice el siguiente fragmento de código en un script de PySpark para configurar un trabajo de AWS Glue para conectarse al bucket de tabla mediante el punto de conexión.

spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate()
Uso de una sesión de AWS Glue interactiva

Si utiliza una sesión de cuaderno interactivo con AWS Glue 5.0, especifique las mismas configuraciones mediante el comando mágico %%configure en una celda antes de ejecutar el código. Reemplace los valores de marcador de posición con la información del bucket de tablas propio.

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3_rest_catalog --conf spark.sql.catalog.s3_rest_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3_rest_catalog.type=rest --conf spark.sql.catalog.s3_rest_catalog.uri=https://s3tables.Region.amazonaws.com/iceberg --conf spark.sql.catalog.s3_rest_catalog.warehouse=arn:aws:s3tables:Region:111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled=true --conf spark.sql.catalog.s3_rest_catalog.rest.signing-name=s3tables --conf spark.sql.catalog.s3_rest_catalog.rest.signing-region=Region --conf spark.sql.catalog.s3_rest_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled=false"}
Amazon S3 Tables Catalog for Apache Iceberg

Como requisito previo para conectarse a las tablas mediante el catálogo de tablas de Amazon S3 para Apache Iceberg, primero debe descargar el último archivo JAR del catálogo y cargarlo en un bucket de S3. Luego, cuando cree su trabajo, agregue la ruta al catálogo del cliente JAR como parámetro especial. Para obtener más información sobre los parámetros de los trabajos en AWS Glue, consulte Parámetros especiales utilizados en los trabajos de AWS Glue en la Guía para desarrolladores de AWS Glue.

Puede configurar la conexión al bucket de tabla mediante una sesión de Spark en un trabajo o con comandos mágicos de AWS Glue Studio en una sesión interactiva. Para utilizar los ejemplos siguientes, sustituya los valores de marcador de posición por la información del bucket de tablas propio.

Uso de un script de PySpark

Utilice el siguiente fragmento de código en un script de PySpark para configurar un trabajo de AWS Glue y conectarse al bucket de tabla mediante JAR. Reemplace los valores de marcador de posición con la información del bucket de tablas propio.

spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate()
Uso de una sesión de AWS Glue interactiva

Si utiliza una sesión de cuaderno interactivo con AWS Glue 5.0, especifique las mismas configuraciones mediante el comando mágico %%configure en una celda antes de ejecutar el código. Reemplace los valores de marcador de posición con la información del bucket de tablas propio.

%%configure {"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tablesbucket --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket", "extra-jars": "s3://amzn-s3-demo-bucket/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"}

Scripts de muestra

Los scripts de PySpark de ejemplo siguientes se pueden utilizar para probar la consulta de tablas de S3 con un trabajo de AWS Glue. Estos scripts se conectan al bucket de tablas y ejecutan consultas para: crear un nuevo espacio de nombres, crear una tabla de ejemplo, insertar datos en la tabla y devolver los datos de tabla. Para utilizar los scripts, sustituya los valores de marcador de posición por la información del bucket de tablas propio.

Elija uno de los siguientes scripts según el método de acceso de tablas de S3.

S3 Tables integration with AWS analytics services
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("SparkIcebergSQL") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog","s3tables") .config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \ .config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() namespace = "new_namespace" table = "new_table" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables REST de Iceberg endpoint
from pyspark.sql import SparkSession spark = SparkSession.builder.appName("glue-s3-tables-rest") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3_rest_catalog") \ .config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3_rest_catalog.type", "rest") \ .config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \ .config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \ .config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \ .config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \ .config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \ .getOrCreate() namespace = "s3_tables_rest_namespace" table = "new_table_s3_rest" spark.sql("SHOW DATABASES").show() spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Amazon S3 Tables Catalog for Apache Iceberg
from pyspark.sql import SparkSession #Spark session configurations spark = SparkSession.builder.appName("glue-s3-tables") \ .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \ .config("spark.sql.defaultCatalog", "s3tablesbucket") \ .config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \ .config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \ .getOrCreate() #Script namespace = "new_namespace" table = "new_table" spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}") spark.sql(f"DESCRIBE NAMESPACE {namespace}").show() spark.sql(f""" CREATE TABLE IF NOT EXISTS {namespace}.{table} ( id INT, name STRING, value INT ) """) spark.sql(f""" INSERT INTO {namespace}.{table} VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()

Creación de un trabajo de AWS Glue que consulta las tablas

Los siguientes procedimientos muestran cómo configurar los trabajos de AWS Glue que se conectan a los buckets de tablas de S3. Puede hacerlo con la AWS CLI o mediante la consola con el editor de scripts de AWS Glue Studio. Para obtener más información, consulte Creación de trabajos en AWS Glue en la Guía del usuario de AWS Glue.

El siguiente procedimiento muestra cómo utilizar el editor de scripts de AWS Glue Studio para crear un trabajo ETL que consulte las tablas de S3.

  1. Abra la consola de AWS Glue en https://console.aws.amazon.com/glue/.

  2. En el panel de navegación, seleccione Trabajos de ETL.

  3. Elija Editor de script, seleccione Cargar script y cargue el script de PySpark que ha creado para consultar las tablas de S3.

  4. Seleccione la pestaña Detalles del trabajo e introduzca lo siguiente en Propiedades básicas.

    • En Nombre, introduzca el nombre del trabajo.

    • En Rol de IAM, seleccione el rol que ha creado para AWS Glue.

  5. (Opcional) Si utiliza el catálogo de tablas de Amazon S3 para el método de acceso de Apache Iceberg, expanda Propiedades avanzadas y para Ruta de archivos JAR dependientes, ingrese el URI de S3 del archivo jar del catálogo de clientes que ha cargado en un bucket de S3 como requisito previo. Por ejemplo, s3://amzn-s3-demo-bucket1/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar

  6. Seleccione Guardar para crear el trabajo.

  7. Elija Ejecutar para iniciar el trabajo y consulte el estado del trabajo en la pestaña Ejecuciones.

El siguiente procedimiento muestra cómo utilizar la AWS CLI para crear un trabajo de ETL que consulte las tablas de S3. Para utilizar los comandos, reemplace los valores de marcador de posición por los suyos.

Requisitos previos
  1. Cree un trabajo AWS Glue.

    aws glue create-job \ --name etl-tables-job \ --role arn:aws:iam::111122223333:role/AWSGlueServiceRole \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://amzn-s3-demo-bucket1/scripts/glue-etl-query.py", "PythonVersion": "3" }' \ --default-arguments '{ "--job-language": "python", "--class": "GlueApp" }' \ --glue-version "5.0"
    nota

    (Opcional) Si utiliza el catálogo de tablas de Amazon S3 para el método de acceso de Apache Iceberg, agregue el catálogo de clientes JAR al --default-arguments mediante el parámetro --extra-jars. Sustituya los marcadores de posición de entrada por los suyos propios cuando agrega el parámetro.

    "--extra-jars": "s3://amzn-s3-demo-bucket/jar-path/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"
  2. Inicie el trabajo.

    aws glue start-job-run \ --job-name etl-tables-job
  3. Para revisar el estado del trabajo, copie el ID de ejecución del comando anterior e introdúzcalo en el siguiente comando.

    aws glue get-job-run --job-name etl-tables-job \ --run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad