View a markdown version of this page

Crea pipeline ETL utilizzando AWS Glue - FSx per ONTAP

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Crea pipeline ETL utilizzando AWS Glue

I team di progettazione dei dati spesso utilizzano dati grezzi che arrivano su un volume FSx for ONTAP da applicazioni, perdite giornaliere di file o integrazioni di partner tramite NFS o SMB. La preparazione di tali dati per l'analisi a valle richiede la lettura, la trasformazione, l'arricchimento o il ripartizionamento su larga scala e la messa a disposizione di analisti e applicazioni dell'output curato.

Con un access point Amazon S3 collegato al volume FSx for ONTAP, AWS Glue legge i dati di origine, li trasforma con il tipo di runtime che preferisci (Apache Spark, Python shell o Ray) e riscrive l'output curato sullo stesso volume. Sia i set di dati grezzi che quelli curati rimangono su FSx for ONTAP, quindi le policy di snapshot, backup e conservazione del volume si applicano in modo uniforme a tutta la pipeline. Poiché un volume FSx for ONTAP è accessibile simultaneamente tramite NFS, SMB e l'API Amazon S3, i dati grezzi possono essere prodotti da client NFS o SMB e l'output curato può essere utilizzato da uno qualsiasi di questi protocolli.

In questo tutorial, utilizzi il set di dati di NYC Taxi Trip contenuto nel tutorial. Interroga i file con SQL utilizzando Amazon Athena Un job AWS Glue ETL legge i dati grezzi di Parquet, aggiunge colonne calcolate, filtra i record non validi e riscrive l'output trasformato nel volume partizionato per ora del giorno.

Nota

Il completamento di questo tutorial richiede dai 25 ai 35 minuti circa. Le risorse Servizi AWS utilizzate sono soggette a costi per le risorse create. Se completi tempestivamente tutti i passaggi, inclusa la sezione Pulizia, il costo previsto è inferiore a 1 USD negli Stati Uniti orientali (Virginia settentrionale). Regione AWS Questa stima non include i costi correnti per il volume FSx for ONTAP stesso.

Prerequisiti

Prima di iniziare, assicurati di disporre di:

  • Completa i passaggi da 1 a 3 del Interroga i file con SQL utilizzando Amazon Athena tutorial. Questa procedura carica il set di dati NYC Taxi sul punto di accesso, crea il fsxn_taxi_demo database in e registra la AWS Glue Data Catalog tabella. taxi_data Questo tutorial si basa su queste risorse, quindi non eseguite la sezione Cleanup del tutorial di Athena prima di averlo completato.

  • Un ruolo IAM AWS Glue con una policy in linea che garantisce l'accesso in scrittura ai CloudWatch log, l' read/write accesso al punto di accesso e l'accesso al AWS Glue Data Catalog database utilizzato in questo tutorial. I passaggi seguenti creano un ruolo con le autorizzazioni minime necessarie per questo tutorial.

    1. Salva la seguente politica di attendibilità comeglue-trust-policy.json. Permette AWS Glue di assumere il ruolo.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }
    2. Salva la seguente politica di autorizzazioni comeglue-permissions.json. Sostituisci region account-id e access-point-name con i tuoi valori.

      { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:account-id:log-group:/aws-glue/*" }, { "Sid": "AccessPoint", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:ListBucket", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "DataCatalog", "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetTable", "glue:GetTables", "glue:CreateTable", "glue:UpdateTable", "glue:DeleteTable", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions" ], "Resource": [ "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/fsxn_taxi_demo", "arn:aws:glue:region:account-id:table/fsxn_taxi_demo/*" ] } ] }
    3. Crea il ruolo e allega la politica in linea.

      $ aws iam create-role \ --role-name fsxn-tutorial-glue-etl-role \ --assume-role-policy-document file://glue-trust-policy.json aws iam put-role-policy \ --role-name fsxn-tutorial-glue-etl-role \ --policy-name glue-fsxn-access \ --policy-document file://glue-permissions.json

    Questo tutorial memorizza lo script ETL sull'access point stesso, quindi non è necessario alcun bucket Amazon S3 separato. L'AccessPointistruzione copre sia lo script che i dati del taxi; l'DataCatalogistruzione riguarda l'accesso al AWS Glue catalogo al fsxn_taxi_demo database aggiornato dal crawler nella Fase 4.

Importante

Il punto di accesso Amazon S3 deve utilizzare un'origine di rete Internet. AWS Glue i job accedono ad Amazon S3 dall'infrastruttura gestita, non dal tuo VPC.

Fase 1: Creare lo script ETL

PySpark Lo script seguente legge i dati grezzi del viaggio in taxi dal volume FSx for ONTAP, applica le trasformazioni e riscrive i risultati nel volume. Salva questo script come. taxi_transform.py

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ap_alias = args['AP_ALIAS'] # Read raw taxi data from FSx through the access point df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/") # Transform: filter invalid records, add computed columns transformed = df \ .filter(col("trip_distance") > 0) \ .filter(col("total_amount") > 0) \ .filter(col("passenger_count") > 0) \ .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \ .withColumn("cost_per_mile", spark_round(col("total_amount") / col("trip_distance"), 2)) \ .withColumn("time_of_day", when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning") .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon") .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening") .otherwise("night") ) \ .select( "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "PULocationID", "DOLocationID", "fare_amount", "tip_amount", "total_amount", "pickup_hour", "pickup_day_of_week", "cost_per_mile", "time_of_day" ) # Write transformed data back to FSx, partitioned by time of day transformed.write \ .mode("overwrite") \ .partitionBy("time_of_day") \ .parquet(f"s3://{ap_alias}/taxi-data-transformed/") job.commit()

Lo script esegue le seguenti trasformazioni:

  • Filtra i record con distanza di viaggio, tariffa o numero di passeggeri pari a zero o negativi.

  • Aggiunge le colonne calcolate: pickup_hour pickup_day_of_weekcost_per_mile,, e time_of_day (mattina, pomeriggio, sera o notte).

  • Seleziona un sottoinsieme di colonne rilevanti per l'analisi.

  • Partiziona l'output pertime_of_day, il che migliora le prestazioni delle query durante il filtraggio per periodo di tempo.

Passaggio 2: carica lo script e crea il lavoro

Carica lo script ETL sul tuo volume FSx for ONTAP tramite il punto di accesso e crea AWS Glue un job che vi faccia riferimento. AWS Glue carica lo script dal punto di accesso all'avvio del processo, allo stesso modo in cui carica gli script da un bucket Amazon S3 standard.

$ # Upload the script to the access point aws s3 cp taxi_transform.py \ s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py # Create the Glue job aws glue create-job \ --name fsxn-taxi-transform \ --role my-glue-role-arn \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py", "PythonVersion": "3" }' \ --default-arguments '{ "--AP_ALIAS": "my-ap-alias-ext-s3alias", "--job-language": "python" }' \ --glue-version "4.0" \ --number-of-workers 2 \ --worker-type "G.1X"

Fase 3: Esegui il processo

$ aws glue start-job-run --job-name fsxn-taxi-transform

Monitora lo stato del lavoro. Il lavoro viene in genere completato in uno o due minuti con due G.1X lavoratori.

$ aws glue get-job-runs --job-name fsxn-taxi-transform \ --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"

Al termine del processo, verificate l'output trasformato sul volume FSx for ONTAP.

$ aws s3 ls s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ PRE time_of_day=afternoon/ PRE time_of_day=evening/ PRE time_of_day=morning/ PRE time_of_day=night/

L'output è suddiviso in quattro directory in base all'ora del giorno. Ogni partizione contiene file Parquet con i dati trasformati.

Fase 4: Interroga i dati trasformati

Esegui un AWS Glue crawler sull'output trasformato per registrarlo in AWS Glue Data Catalog, quindi interrogalo con Athena.

$ # Create a crawler for the transformed data aws glue create-crawler \ --name fsxn-taxi-transformed-crawler \ --role my-glue-role-arn \ --database-name fsxn_taxi_demo \ --targets '{"S3Targets": [{"Path": "s3://my-ap-alias-ext-s3alias/taxi-data-transformed/"}]}' # Run the crawler aws glue start-crawler --name fsxn-taxi-transformed-crawler

Una volta completato il crawler, interroga i dati trasformati in Athena. La struttura partizionata consente ad Athena di scansionare solo le partizioni pertinenti.

-- Average cost per mile by time of day SELECT time_of_day, COUNT(*) AS trip_count, ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile, ROUND(AVG(tip_amount), 2) AS avg_tip FROM fsxn_taxi_demo.taxi_data_transformed GROUP BY time_of_day ORDER BY trip_count DESC
-- Busiest pickup locations during morning rush SELECT PULocationID AS pickup_location, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance FROM fsxn_taxi_demo.taxi_data_transformed WHERE time_of_day = 'morning' GROUP BY PULocationID ORDER BY trip_count DESC LIMIT 10

Poiché i dati sono partizionati datime_of_day, la seconda query analizza solo la morning partizione, riducendo la quantità di dati letti e migliorando le prestazioni delle query.

Considerazioni

  • È richiesta l'origine Internet. AWS Glue i job accedono ad Amazon S3 da un'infrastruttura gestita esterna al tuo VPC. È necessario utilizzare un punto di accesso di origine Internet.

  • Leggi e scrivi. AWS Glue I job ETL possono sia leggere che scrivere sul volume FSx for ONTAP tramite il punto di accesso. La policy del punto di accesso e l'utente del file system devono consentire entrambi e. s3:GetObject s3:PutObject

  • Dimensionamento del lavoratore. Il numero e il tipo di AWS Glue lavoratori influiscono sulle prestazioni lavorative e sui costi. Per il set di dati di esempio da 48 MB, sono sufficienti due G.1X lavoratori. Per set di dati più grandi, aumenta il numero di lavoratori o utilizza G.2X i lavoratori.

  • Partizionamento. La scrittura dell'output partizionato migliora le prestazioni delle query a valle in Athena e in altri servizi di analisi. Scegliete le chiavi di partizione in base al modo in cui i dati vengono normalmente interrogati.

  • Archiviazione degli script. AWS Glue carica gli script ETL da Amazon S3 all'avvio del processo. Questo tutorial memorizza lo script sul punto di accesso in modo che risieda insieme ai dati, ma puoi anche ospitarlo in un bucket Amazon S3 standard. Se utilizzi un bucket autonomo, estendi la policy in linea del ruolo con s3:GetObject l'ARN del bucket di script.

Eliminazione

Per evitare addebiti continui, elimina le risorse che hai creato in questo tutorial.

Nell'editor di query Athena, rilasciate la tabella creata dal crawler:

DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
$ # Delete the Glue job and crawler aws glue delete-job --name fsxn-taxi-transform aws glue delete-crawler --name fsxn-taxi-transformed-crawler # Delete the ETL script and transformed data from the access point aws s3 rm s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py aws s3 rm s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ --recursive # Delete the IAM role aws iam delete-role-policy \ --role-name fsxn-tutorial-glue-etl-role \ --policy-name glue-fsxn-access aws iam delete-role --role-name fsxn-tutorial-glue-etl-role