View a markdown version of this page

Créez des pipelines ETL en utilisant AWS Glue - FSx pour ONTAP

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Créez des pipelines ETL en utilisant AWS Glue

Les équipes d'ingénierie des données reçoivent souvent des données brutes sur un volume FSx for ONTAP provenant d'applications, de dépôts quotidiens de fichiers ou d'intégrations de partenaires via NFS ou SMB. La préparation de ces données pour les analyses en aval nécessite de les lire, de les transformer, de les enrichir ou de les repartitionner à grande échelle, et de mettre les résultats sélectionnés à la disposition des analystes et des applications.

Avec un point d'accès Amazon S3 connecté au volume FSx for ONTAP, il AWS Glue lit les données sources, les transforme avec le moteur d'exécution de votre choix (Apache Spark, Python shell ou Ray) et réécrit la sortie sélectionnée sur le même volume. Les ensembles de données bruts et organisés restent sur FSx for ONTAP, de sorte que les politiques de capture d'écran, de sauvegarde et de conservation du volume s'appliquent de manière uniforme sur l'ensemble du pipeline. Comme un volume FSx for ONTAP est accessible simultanément via NFS, SMB et l'API Amazon S3, les données brutes peuvent être produites par des clients NFS ou SMB et les sorties sélectionnées peuvent être utilisées par n'importe lequel de ces protocoles.

Dans ce didacticiel, vous utiliserez le jeu de données NYC Taxi trip du Interrogez des fichiers avec SQL à l'aide d'Amazon Athena didacticiel. Une tâche AWS Glue ETL lit les données brutes de Parquet, ajoute des colonnes calculées, filtre les enregistrements non valides et réécrit la sortie transformée sur le volume partitionné par heure.

Note

Ce didacticiel prend environ 25 à 35 minutes. Les ressources que vous Services AWS utilisez sont facturées pour les ressources que vous créez. Si vous effectuez rapidement toutes les étapes, y compris la section Nettoyage, le coût prévu est inférieur à 1$ dans l'est des États-Unis (Virginie du Nord) Région AWS. Cette estimation n'inclut pas les frais permanents pour le volume FSx for ONTAP lui-même.

Conditions préalables

Avant de commencer, assurez-vous de disposer des éléments suivants :

  • Effectuez les étapes 1 à 3 du Interrogez des fichiers avec SQL à l'aide d'Amazon Athena didacticiel. Cette procédure télécharge le jeu de données NYC Taxi vers le point d'accès, crée la fsxn_taxi_demo base de données dans le AWS Glue Data Catalog et enregistre la taxi_data table. Ce didacticiel s'appuie sur ces ressources. N'exécutez donc pas la section Nettoyage du didacticiel Athena avant d'avoir terminé ce didacticiel.

  • Rôle IAM AWS Glue doté d'une politique intégrée qui accorde un accès en écriture aux CloudWatch journaux, read/write au point d'accès et à la AWS Glue Data Catalog base de données utilisée dans ce didacticiel. Les étapes suivantes permettent de créer un rôle doté des autorisations minimales requises pour ce didacticiel.

    1. Enregistrez la politique de confiance suivante sous le nomglue-trust-policy.json. Cela permet AWS Glue d'assumer le rôle.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }
    2. Enregistrez la politique d'autorisation suivante sous le nomglue-permissions.json. Remplacez regionaccount-id, et access-point-name par vos valeurs.

      { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:account-id:log-group:/aws-glue/*" }, { "Sid": "AccessPoint", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:ListBucket", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "DataCatalog", "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetTable", "glue:GetTables", "glue:CreateTable", "glue:UpdateTable", "glue:DeleteTable", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions" ], "Resource": [ "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/fsxn_taxi_demo", "arn:aws:glue:region:account-id:table/fsxn_taxi_demo/*" ] } ] }
    3. Créez le rôle et associez la politique intégrée.

      $ aws iam create-role \ --role-name fsxn-tutorial-glue-etl-role \ --assume-role-policy-document file://glue-trust-policy.json aws iam put-role-policy \ --role-name fsxn-tutorial-glue-etl-role \ --policy-name glue-fsxn-access \ --policy-document file://glue-permissions.json

    Ce didacticiel stocke le script ETL sur le point d'accès lui-même, de sorte qu'aucun compartiment Amazon S3 distinct n'est requis. L'AccessPointinstruction couvre à la fois le script et les données du taxi ; elle définit l'DataCatalogaccès au AWS Glue catalogue à la fsxn_taxi_demo base de données mise à jour par le robot à l'étape 4.

Important

Le point d'accès Amazon S3 doit utiliser une origine de réseau Internet. AWS Glue les jobs accèdent à Amazon S3 depuis une infrastructure gérée, et non depuis votre VPC.

Étape 1 : Création du script ETL

Le PySpark script suivant lit les données brutes des trajets en taxi depuis votre volume FSx for ONTAP, applique des transformations et réécrit les résultats sur le volume. Enregistrez ce script soustaxi_transform.py.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ap_alias = args['AP_ALIAS'] # Read raw taxi data from FSx through the access point df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/") # Transform: filter invalid records, add computed columns transformed = df \ .filter(col("trip_distance") > 0) \ .filter(col("total_amount") > 0) \ .filter(col("passenger_count") > 0) \ .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \ .withColumn("cost_per_mile", spark_round(col("total_amount") / col("trip_distance"), 2)) \ .withColumn("time_of_day", when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning") .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon") .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening") .otherwise("night") ) \ .select( "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "PULocationID", "DOLocationID", "fare_amount", "tip_amount", "total_amount", "pickup_hour", "pickup_day_of_week", "cost_per_mile", "time_of_day" ) # Write transformed data back to FSx, partitioned by time of day transformed.write \ .mode("overwrite") \ .partitionBy("time_of_day") \ .parquet(f"s3://{ap_alias}/taxi-data-transformed/") job.commit()

Le script effectue les transformations suivantes :

  • Filtre les enregistrements dont la distance parcourue, le tarif ou le nombre de passagers sont nuls ou négatifs.

  • Ajoute les colonnes calculées : pickup_hour pickup_day_of_weekcost_per_mile,, et time_of_day (matin, après-midi, soir ou nuit).

  • Sélectionne un sous-ensemble de colonnes pertinentes pour l'analyse.

  • Partitionne la sortie partime_of_day, ce qui améliore les performances des requêtes lors du filtrage par période.

Étape 2 : télécharger le script et créer la tâche

Téléchargez le script ETL sur votre volume FSx for ONTAP via le point d'accès et créez une AWS Glue tâche qui le référence. AWS Glue charge le script depuis le point d'accès au démarrage de la tâche, de la même manière qu'il charge les scripts depuis un compartiment Amazon S3 standard.

$ # Upload the script to the access point aws s3 cp taxi_transform.py \ s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py # Create the Glue job aws glue create-job \ --name fsxn-taxi-transform \ --role my-glue-role-arn \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py", "PythonVersion": "3" }' \ --default-arguments '{ "--AP_ALIAS": "my-ap-alias-ext-s3alias", "--job-language": "python" }' \ --glue-version "4.0" \ --number-of-workers 2 \ --worker-type "G.1X"

Étape 3 : Exécuter le job

$ aws glue start-job-run --job-name fsxn-taxi-transform

Surveillez l'état du travail. Le travail est généralement terminé en une à deux minutes avec deux G.1X travailleurs.

$ aws glue get-job-runs --job-name fsxn-taxi-transform \ --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"

Lorsque le travail est terminé, vérifiez la sortie transformée sur votre volume FSx for ONTAP.

$ aws s3 ls s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ PRE time_of_day=afternoon/ PRE time_of_day=evening/ PRE time_of_day=morning/ PRE time_of_day=night/

La sortie est partitionnée en quatre répertoires selon l'heure de la journée. Chaque partition contient des fichiers Parquet contenant les données transformées.

Étape 4 : Interrogez les données transformées

Lancez un AWS Glue robot d'exploration sur la sortie transformée pour l'enregistrer dans le AWS Glue Data Catalog, puis interrogez-la auprès d'Athena.

$ # Create a crawler for the transformed data aws glue create-crawler \ --name fsxn-taxi-transformed-crawler \ --role my-glue-role-arn \ --database-name fsxn_taxi_demo \ --targets '{"S3Targets": [{"Path": "s3://my-ap-alias-ext-s3alias/taxi-data-transformed/"}]}' # Run the crawler aws glue start-crawler --name fsxn-taxi-transformed-crawler

Une fois le robot d'exploration terminé, interrogez les données transformées dans Athena. La structure partitionnée permet à Athena de scanner uniquement les partitions pertinentes.

-- Average cost per mile by time of day SELECT time_of_day, COUNT(*) AS trip_count, ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile, ROUND(AVG(tip_amount), 2) AS avg_tip FROM fsxn_taxi_demo.taxi_data_transformed GROUP BY time_of_day ORDER BY trip_count DESC
-- Busiest pickup locations during morning rush SELECT PULocationID AS pickup_location, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance FROM fsxn_taxi_demo.taxi_data_transformed WHERE time_of_day = 'morning' GROUP BY PULocationID ORDER BY trip_count DESC LIMIT 10

Les données étant partitionnées partime_of_day, la deuxième requête analyse uniquement la morning partition, ce qui réduit la quantité de données lues et améliore les performances des requêtes.

Considérations

  • Origine Internet requise. AWS Glue les jobs accèdent à Amazon S3 depuis une infrastructure gérée extérieure à votre VPC. Vous devez utiliser un point d'accès d'origine Internet.

  • Lisez et écrivez. AWS Glue Les tâches ETL peuvent à la fois lire et écrire sur votre volume FSx for ONTAP via le point d'accès. La politique du point d'accès et l'utilisateur du système de fichiers doivent autoriser les deux s3:GetObject ets3:PutObject.

  • Dimensionnement des travailleurs. Le nombre et le type de AWS Glue travailleurs ont une incidence sur le rendement au travail et sur les coûts. Pour l'exemple de jeu de données de 48 Mo, deux G.1X travailleurs suffisent. Pour les ensembles de données plus volumineux, augmentez le nombre de travailleurs ou utilisez G.2X des travailleurs.

  • Partitionnement. L'écriture d'une sortie partitionnée améliore les performances des requêtes en aval dans Athena et dans d'autres services d'analyse. Choisissez les clés de partition en fonction de la manière dont les données sont généralement demandées.

  • Stockage de scripts. AWS Glue charge les scripts ETL depuis Amazon S3 au démarrage de la tâche. Ce didacticiel stocke le script sur le point d'accès afin qu'il cohabite avec les données, mais vous pouvez également l'héberger dans un compartiment Amazon S3 standard. Si vous utilisez un bucket autonome, étendez la politique intégrée du rôle à l'aide de l'ARN s3:GetObject du bucket de script.

Nettoyage

Pour éviter des frais récurrents, supprimez les ressources que vous avez créées dans ce didacticiel.

Dans l'éditeur de requêtes Athena, supprimez la table créée par le robot d'exploration :

DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
$ # Delete the Glue job and crawler aws glue delete-job --name fsxn-taxi-transform aws glue delete-crawler --name fsxn-taxi-transformed-crawler # Delete the ETL script and transformed data from the access point aws s3 rm s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py aws s3 rm s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ --recursive # Delete the IAM role aws iam delete-role-policy \ --role-name fsxn-tutorial-glue-etl-role \ --policy-name glue-fsxn-access aws iam delete-role --role-name fsxn-tutorial-glue-etl-role