Exécution de tâches ETL sur les tables Amazon S3 avec AWS Glue
AWS Glue est un service d’intégration de données sans serveur qui facilite la découverte, la préparation, le déplacement et l’intégration de données provenant de plusieurs sources pour les utilisateurs de l’analytique. Vous pouvez utiliser des tâches AWS Glue pour exécuter des pipelines d’extraction, transformation et chargement (ETL) pour charger les données dans vos lacs de données. Pour plus d’informations sur AWS Glue, consultez Qu’est-ce que AWS Glue ? dans le Manuel du développeur AWS Glue.
Une tâche AWS Glue encapsule un script qui se connecte à vos données source, les traite, puis les écrit dans votre cible de données. En général, une tâche exécute les scripts d'extraction, de transformation et de chargement (ETL). Les tâches peuvent exécuter des scripts conçus pour les environnements d’exécution Apache Spark. Vous pouvez surveiller les exécutions de tâche pour comprendre les métriques d’exécution telles que le statut d’achèvement, la durée et l’heure de début.
Vous pouvez utiliser des tâches AWS Glue pour traiter les données de vos tables S3 en vous connectant à vos tables via l’intégration avec les services d’analytique AWS, ou en vous connectant directement à l’aide du point de terminaison Iceberg REST d’Amazon S3 Tables ou du catalogue d’Amazon S3 Tables pour Apache Iceberg. Ce guide décrit les étapes de base pour commencer à utiliser AWS Glue avec S3 Tables, notamment :
Choisissez votre méthode d’accès en fonction des exigences spécifiques de la tâche ETL AWS Glue :
-
Intégration des services d’analytique AWS (recommandée) : recommandée lorsque vous avez besoin d’une gestion centralisée des métadonnées entre plusieurs services d’analytique AWS, que vous devez tirer parti des autorisations existantes du catalogue de données AWS Glue et d’un contrôle d’accès précis avec Lake Formation, ou lorsque vous créez des pipelines ETL de production qui s’intègrent à d’autres services AWS tels qu’Athena ou Amazon EMR.
-
Point de terminaison Iceberg REST d’Amazon S3 Tables : recommandé lorsque vous devez vous connecter à des tables S3 à partir de moteurs de requêtes tiers qui prennent en charge Apache Iceberg ou créent des applications ETL personnalisées nécessitant un accès direct à l’API REST, ou lorsque vous souhaitez contrôler les opérations du catalogue sans dépendre du catalogue de données AWS Glue.
-
Catalogue d’Amazon S3 Tables pour Apache Iceberg : à utiliser uniquement pour les applications héritées ou pour des scénarios de programmation spécifiques nécessitant la bibliothèque client Java. Cette méthode n’est pas recommandée pour les nouvelles implémentations de tâches ETL AWS Glue en raison de la complexité et de la gestion des dépendances JAR supplémentaires.
Étape 1 : prérequis
Avant de pouvoir interroger des tables à partir d’une tâche AWS Glue, vous devez configurer un rôle IAM que AWS Glue peut utiliser pour exécuter la tâche. Choisissez votre méthode d’accès pour voir les prérequis spécifiques à cette méthode.
- AWS analytics services integration (Recommended)
-
Conditions préalables requises pour utiliser l’intégration analytique AWS de S3 Tables pour exécuter des tâches AWS Glue.
- Amazon S3 Tables REST Iceberg endpoint
-
Prérequis pour utiliser le catalogue de point de terminaison Iceberg REST d’Amazon S3 Tables pour exécuter des tâches ETL AWS Glue.
- Amazon S3 Tables Catalog for Apache Iceberg
-
Les prérequis utilisent le catalogue d’Amazon S3 Tables Apache Iceberg pour exécuter des tâches ETL AWS Glue.
Étape 2 : création d’un script pour se connecter aux compartiments de table
Pour accéder aux données de votre table lorsque vous exécutez une tâche ETL AWS Glue, vous configurez une session Spark pour Apache Iceberg qui se connecte à votre compartiment de table S3. Vous pouvez modifier un script existant pour vous connecter à votre compartiment de table ou créer un nouveau script. Pour plus d’informations sur la création de scripts AWS Glue, consultez Didacticiel : rédiger un script AWS Glue pour Spark dans le Guide du développeur AWS Glue.
Vous pouvez configurer la session pour vous connecter à vos compartiments de table via l’une des méthodes d’accès à S3 Tables suivantes :
-
Intégration des services d’analytique AWS de S3 Tables (recommandé)
-
Point de terminaison Iceberg REST d’Amazon S3 Tables
-
Catalogue d’Amazon S3 Tables pour Apache Iceberg
Choisissez l’une des méthodes d’accès suivantes pour consulter les instructions de configuration et les exemples de configuration.
- AWS analytics services integration (Recommended)
-
En tant que prérequis pour interroger des tables avec Spark sur AWS Glue en utilisant l’intégration des services d’analytique AWS, vous devez intégrer vos compartiments de table aux services d’analytique AWS
Vous pouvez configurer la connexion à votre compartiment de table par le biais d’une session Spark dans le cadre d’une tâche ou avec la magie AWS Glue Studio dans le cadre d’une session interactive. Pour utiliser les exemples suivants, remplacez les valeurs des espaces réservés par les informations de votre compartiment de table.
- Utilisation d’un script PySpark
-
Utilisez l’extrait de code suivant dans un script PySpark pour configurer une tâche AWS Glue afin de vous connecter à votre compartiment de table à l’aide de l’intégration.
spark = SparkSession.builder.appName("SparkIcebergSQL") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog","s3tables") \
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/warehouse/") \
.getOrCreate()
- Utilisation d’une session AWS Glue interactive
-
Si vous utilisez une session de bloc-notes interactive avec AWS Glue 5.0, spécifiez les mêmes configurations en utilisant la magie %%configure d’une cellule avant l’exécution du code.
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tables --conf spark.sql.catalog.s3tables=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tables.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog --conf spark.sql.catalog.s3tables.glue.id=111122223333:s3tablescatalog/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3tables.warehouse=s3://amzn-s3-demo-table-bucket/warehouse/"}
- Amazon S3 Tables REST Iceberg endpoint
-
Vous pouvez configurer la connexion à votre compartiment de table par le biais d’une session Spark dans le cadre d’une tâche ou avec la magie AWS Glue Studio dans le cadre d’une session interactive. Pour utiliser les exemples suivants, remplacez les valeurs des espaces réservés par les informations de votre compartiment de table.
- Utilisation d’un script PySpark
Utilisez l’extrait de code suivant dans un script PySpark pour configurer une tâche AWS Glue afin de vous connecter à votre compartiment de table à l’aide du point de terminaison.
spark = SparkSession.builder.appName("glue-s3-tables-rest") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3_rest_catalog") \
.config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3_rest_catalog.type", "rest") \
.config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \
.config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \
.config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
- Utilisation d’une session AWS Glue interactive
Si vous utilisez une session de bloc-notes interactive avec AWS Glue 5.0, spécifiez les mêmes configurations en utilisant la magie %%configure d’une cellule avant l’exécution du code. Remplacez les valeurs des espaces réservés par les informations de votre compartiment de table.
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3_rest_catalog --conf spark.sql.catalog.s3_rest_catalog=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3_rest_catalog.type=rest --conf spark.sql.catalog.s3_rest_catalog.uri=https://s3tables.Region.amazonaws.com/iceberg --conf spark.sql.catalog.s3_rest_catalog.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket --conf spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled=true --conf spark.sql.catalog.s3_rest_catalog.rest.signing-name=s3tables --conf spark.sql.catalog.s3_rest_catalog.rest.signing-region=Region --conf spark.sql.catalog.s3_rest_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO --conf spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled=false"}
- Amazon S3 Tables Catalog for Apache Iceberg
-
Pour vous connecter à des tables à l’aide du catalogue d’Amazon S3 Tables pour Apache Iceberg, vous devez d’abord télécharger le dernier fichier jar du catalogue et le charger dans un compartiment S3. Ensuite, lorsque vous créez votre tâche, vous ajoutez le chemin d’accès au catalogue client JAR en tant que paramètre spécial. Pour plus d’informations sur les paramètres des tâches dans AWS Glue, consultez Paramètres spéciaux utilisés dans les tâches AWS Glue du Guide du développeur AWS Glue.
Vous pouvez configurer la connexion à votre compartiment de table par le biais d’une session Spark dans le cadre d’une tâche ou avec la magie AWS Glue Studio dans le cadre d’une session interactive. Pour utiliser les exemples suivants, remplacez les valeurs des espaces réservés par les informations de votre compartiment de table.
- Utilisation d’un script PySpark
-
Utilisez l’extrait de code suivant dans un script PySpark pour configurer une tâche AWS Glue afin de vous connecter à votre compartiment de table à l’aide du JAR. Remplacez les valeurs des espaces réservés par les informations de votre compartiment de table.
spark = SparkSession.builder.appName("glue-s3-tables") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tablesbucket") \
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.getOrCreate()
- Utilisation d’une session AWS Glue interactive
-
Si vous utilisez une session de bloc-notes interactive avec AWS Glue 5.0, spécifiez les mêmes configurations en utilisant la magie %%configure d’une cellule avant l’exécution du code. Remplacez les valeurs des espaces réservés par les informations de votre compartiment de table.
%%configure
{"conf": "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions --conf spark.sql.defaultCatalog=s3tablesbucket --conf spark.sql.catalog.s3tablesbucket=org.apache.iceberg.spark.SparkCatalog --conf spark.sql.catalog.s3tablesbucket.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog --conf spark.sql.catalog.s3tablesbucket.warehouse=arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket", "extra-jars": "s3://amzn-s3-demo-bucket/jars/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"}
Exemples de scripts
Les exemples de script PySpark suivants peuvent être utilisés pour tester l’interrogation des tables S3 avec une tâche AWS Glue. Ces scripts se connectent à votre compartiment de table et exécutent des requêtes pour : créer un nouvel espace de noms, créer un exemple de table, insérer des données dans la table et renvoyer les données de la table. Pour utiliser les scripts, remplacez les valeurs des espaces réservés par les informations de votre compartiment de table.
Choisissez parmi les scripts suivants en fonction de votre méthode d’accès à S3 Tables.
- S3 Tables integration with AWS analytics services
-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("SparkIcebergSQL") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog","s3tables")
.config("spark.sql.catalog.s3tables", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tables.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog") \
.config("spark.sql.catalog.s3tables.glue.id", "111122223333:s3tablescatalog/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3tables.warehouse", "s3://amzn-s3-demo-table-bucket/bucket/amzn-s3-demo-table-bucket") \
.getOrCreate()
namespace = "new_namespace"
table = "new_table"
spark.sql("SHOW DATABASES").show()
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
- Amazon S3 Tables REST Iceberg endpoint
-
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("glue-s3-tables-rest") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3_rest_catalog") \
.config("spark.sql.catalog.s3_rest_catalog", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3_rest_catalog.type", "rest") \
.config("spark.sql.catalog.s3_rest_catalog.uri", "https://s3tables.Region.amazonaws.com/iceberg") \
.config("spark.sql.catalog.s3_rest_catalog.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.config("spark.sql.catalog.s3_rest_catalog.rest.sigv4-enabled", "true") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-name", "s3tables") \
.config("spark.sql.catalog.s3_rest_catalog.rest.signing-region", "Region") \
.config('spark.sql.catalog.s3_rest_catalog.io-impl','org.apache.iceberg.aws.s3.S3FileIO') \
.config('spark.sql.catalog.s3_rest_catalog.rest-metrics-reporting-enabled','false') \
.getOrCreate()
namespace = "s3_tables_rest_namespace"
table = "new_table_s3_rest"
spark.sql("SHOW DATABASES").show()
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
- Amazon S3 Tables Catalog for Apache Iceberg
-
from pyspark.sql import SparkSession
#Spark session configurations
spark = SparkSession.builder.appName("glue-s3-tables") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.4_2.12:1.4.2") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.defaultCatalog", "s3tablesbucket") \
.config("spark.sql.catalog.s3tablesbucket", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.s3tablesbucket.catalog-impl", "software.amazon.s3tables.iceberg.S3TablesCatalog") \
.config("spark.sql.catalog.s3tablesbucket.warehouse", "arn:aws:s3tables:Region:111122223333:bucket/amzn-s3-demo-table-bucket") \
.getOrCreate()
#Script
namespace = "new_namespace"
table = "new_table"
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS s3tablesbucket.{namespace}")
spark.sql(f"DESCRIBE NAMESPACE {namespace}").show()
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {namespace}.{table} (
id INT,
name STRING,
value INT
)
""")
spark.sql(f"""
INSERT INTO {namespace}.{table}
VALUES
(1, 'ABC', 100),
(2, 'XYZ', 200)
""")
spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 10").show()
Étape 3 : création d’une tâche AWS Glue qui interroge les tables
Les procédures suivantes montrent comment configurer des tâches AWS Glue qui vous connectent à vos compartiments de table S3. Vous pouvez effectuer cette opération à l’aide de la console AWS CLI ou à l’aide de l’éditeur de script AWS Glue Studio. Pour plus d’informations, consultez Création de tâches dans AWS Glue dans le Guide de l’utilisateur AWS Glue.
La procédure suivante montre comment utiliser l’éditeur de script AWS Glue Studio pour créer une tâche ETL qui interroge vos tables S3.
Ouvrez la console AWS Glue, à l’adresse https://console.aws.amazon.com/glue/.
-
Dans le volet de navigation, choisissez Tâches ETL.
-
Choisissez Éditeur de script, puis choisissez Charger un script et chargez le script PySpark que vous avez créé pour interroger les tables S3.
-
Sélectionnez l’onglet Détails de la tâche et saisissez les informations suivantes pour Propriétés de base.
-
Pour Nom, saisissez le nom de la tâche.
-
Pour rôle IAM, sélectionnez le rôle que vous avez créé pour AWS Glue.
-
(Facultatif) Si vous utilisez le catalogue d’Amazon S3 Tables pour la méthode d’accès Apache Iceberg, développez les Propriétés avancées et pour Chemin Jars dépendants, entrez l’URI S3 du fichier jar du catalogue client que vous avez chargé dans un compartiment S3 comme prérequis. Par exemple, s3://amzn-s3-demo-bucket1/jars/s3-tables-catalog-for-iceberg-runtime-0,1.5.jar
-
Choisissez Enregistrer pour créer la tâche.
-
Choisissez Exécuter, lancez la tâche et vérifiez le statut de la tâche sous l’onglet Exécutions.
La procédure suivante montre comment utiliser l’AWS CLI pour créer une tâche ETL qui interroge vos tables S3. Pour utiliser les commandes, remplacez les valeurs de l’espace réservé par les vôtres.
-
Créez une tâche AWS Glue.
aws glue create-job \
--name etl-tables-job \
--role arn:aws:iam::111122223333:role/AWSGlueServiceRole \
--command '{
"Name": "glueetl",
"ScriptLocation": "s3://amzn-s3-demo-bucket1/scripts/glue-etl-query.py",
"PythonVersion": "3"
}' \
--default-arguments '{
"--job-language": "python",
"--class": "GlueApp"
}' \
--glue-version "5.0"
(Facultatif) Si vous utilisez le catalogue d’Amazon S3 Tables pour Apache Iceberg comme méthode d’accès, ajoutez le catalogue client JAR aux --default-arguments avec le paramètre --extra-jars. Remplacez les espaces réservés d’entrée par les vôtres lorsque vous ajoutez le paramètre.
"--extra-jars": "s3://amzn-s3-demo-bucket/jar-path/s3-tables-catalog-for-iceberg-runtime-0.1.5.jar"
-
Démarrez votre tâche.
aws glue start-job-run \
--job-name etl-tables-job
-
Pour vérifier le statut de votre tâche, copiez l’ID d’exécution de la commande précédente et saisissez-le dans la commande suivante.
aws glue get-job-run --job-name etl-tables-job \
--run-id jr_ec9a8a302e71f8483060f87b6c309601ea9ee9c1ffc2db56706dfcceb3d0e1ad