View a markdown version of this page

Cree canalizaciones de ETL mediante AWS Glue - FSx para ONTAP

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Cree canalizaciones de ETL mediante AWS Glue

Los equipos de ingeniería de datos suelen tener datos sin procesar que llegan a un volumen de FSx for ONTAP desde aplicaciones, descargas diarias de archivos o integraciones de socios a través de NFS o SMB. Preparar esos datos para el análisis posterior requiere leerlos, transformarlos, enriquecerlos o reparticionarlos a escala y poner los resultados seleccionados a disposición de los analistas y las aplicaciones.

Con un punto de acceso Amazon S3 conectado al volumen FSx for ONTAP, AWS Glue lee los datos de origen, los transforma según el tiempo de ejecución que elija (Apache Spark, Python shell o Ray) y vuelve a escribir la salida seleccionada en el mismo volumen. Tanto los conjuntos de datos sin procesar como los seleccionados permanecen en FSx for ONTAP, por lo que las políticas de instantáneas, copias de seguridad y retención del volumen se aplican de manera uniforme en todo el proceso. Como se puede acceder simultáneamente a un volumen de FSx for ONTAP a través de NFS, SMB y la API de Amazon S3, los clientes de NFS o SMB pueden generar los datos sin procesar y cualquiera de esos protocolos puede consumir el resultado seleccionado.

En este tutorial, utilizará el conjunto de datos de viajes en taxi de Nueva York del tutorial. Consulte archivos con SQL mediante Amazon Athena Un trabajo de AWS Glue ETL lee los datos sin procesar de Parquet, añade columnas calculadas, filtra los registros no válidos y vuelve a escribir la salida transformada en el volumen dividido por hora del día.

nota

Este tutorial tarda aproximadamente de 25 a 35 minutos en completarse. Los Servicios de AWS usuarios incurren en cargos por los recursos que cree. Si completa todos los pasos, incluida la sección de limpieza, con prontitud, el coste previsto será inferior a 1 dólar en la zona este de EE. UU. (Virginia del Norte) Región de AWS. Esta estimación no incluye los cargos continuos del FSx para el propio volumen de ONTAP.

Requisitos previos

Antes de empezar, asegúrese de que tiene lo siguiente:

  • Complete los pasos 1 a 3 del Consulte archivos con SQL mediante Amazon Athena tutorial. Este procedimiento carga el conjunto de datos de NYC Taxi al punto de acceso, crea la fsxn_taxi_demo base de datos en el AWS Glue Data Catalog y registra la taxi_data tabla. Este tutorial se basa en esos recursos, así que no ejecute la sección de limpieza del tutorial de Athena hasta que haya terminado este tutorial.

  • Un rol de IAM AWS Glue con una política en línea que otorga acceso de escritura a CloudWatch los registros, read/write acceso al punto de acceso y acceso a la AWS Glue Data Catalog base de datos utilizada en este tutorial. Los siguientes pasos crean un rol con los permisos mínimos necesarios para este tutorial.

    1. Guarde la siguiente política de confianza comoglue-trust-policy.json. Permite AWS Glue asumir el rol.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }
    2. Guarde la siguiente política de permisos comoglue-permissions.json. Sustituya regionaccount-id, y access-point-name por sus valores.

      { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:account-id:log-group:/aws-glue/*" }, { "Sid": "AccessPoint", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:ListBucket", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "DataCatalog", "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetTable", "glue:GetTables", "glue:CreateTable", "glue:UpdateTable", "glue:DeleteTable", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions" ], "Resource": [ "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/fsxn_taxi_demo", "arn:aws:glue:region:account-id:table/fsxn_taxi_demo/*" ] } ] }
    3. Cree el rol y adjunte la política en línea.

      $ aws iam create-role \ --role-name fsxn-tutorial-glue-etl-role \ --assume-role-policy-document file://glue-trust-policy.json aws iam put-role-policy \ --role-name fsxn-tutorial-glue-etl-role \ --policy-name glue-fsxn-access \ --policy-document file://glue-permissions.json

    Este tutorial almacena el script ETL en el propio punto de acceso, por lo que no se requiere un bucket de Amazon S3 independiente. La AccessPoint declaración abarca tanto el script como los datos del taxi; la DataCatalog declaración abarca el acceso al AWS Glue catálogo a la fsxn_taxi_demo base de datos que el rastreador actualiza en el paso 4.

importante

El punto de acceso Amazon S3 debe utilizar un origen de red de Internet. AWS Glue los trabajos acceden a Amazon S3 desde una infraestructura gestionada, no desde su VPC.

Paso 1: Cree el script ETL

El siguiente PySpark script lee los datos sin procesar del viaje en taxi del volumen de FSx for ONTAP, aplica transformaciones y vuelve a escribir los resultados en el volumen. Guarde este script como. taxi_transform.py

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ap_alias = args['AP_ALIAS'] # Read raw taxi data from FSx through the access point df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/") # Transform: filter invalid records, add computed columns transformed = df \ .filter(col("trip_distance") > 0) \ .filter(col("total_amount") > 0) \ .filter(col("passenger_count") > 0) \ .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \ .withColumn("cost_per_mile", spark_round(col("total_amount") / col("trip_distance"), 2)) \ .withColumn("time_of_day", when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning") .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon") .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening") .otherwise("night") ) \ .select( "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "PULocationID", "DOLocationID", "fare_amount", "tip_amount", "total_amount", "pickup_hour", "pickup_day_of_week", "cost_per_mile", "time_of_day" ) # Write transformed data back to FSx, partitioned by time of day transformed.write \ .mode("overwrite") \ .partitionBy("time_of_day") \ .parquet(f"s3://{ap_alias}/taxi-data-transformed/") job.commit()

El script realiza las siguientes transformaciones:

  • Filtra los registros con un número cero o negativo de distancia de viaje, tarifa o número de pasajeros.

  • Agrega las columnas calculadas: pickup_hour pickup_day_of_weekcost_per_mile,, y time_of_day (mañana, tarde, tarde o noche).

  • Selecciona un subconjunto de columnas relevantes para el análisis.

  • Divide la salida portime_of_day, lo que mejora el rendimiento de las consultas al filtrar por período de tiempo.

Paso 2: carga el script y crea el trabajo

Cargue el script ETL en su volumen de FSx for ONTAP a través del punto de acceso y cree un AWS Glue trabajo que haga referencia a él. AWS Glue carga el script desde el punto de acceso al inicio del trabajo, del mismo modo que carga los scripts desde un bucket estándar de Amazon S3.

$ # Upload the script to the access point aws s3 cp taxi_transform.py \ s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py # Create the Glue job aws glue create-job \ --name fsxn-taxi-transform \ --role my-glue-role-arn \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py", "PythonVersion": "3" }' \ --default-arguments '{ "--AP_ALIAS": "my-ap-alias-ext-s3alias", "--job-language": "python" }' \ --glue-version "4.0" \ --number-of-workers 2 \ --worker-type "G.1X"

Paso 3: Ejecute el trabajo

$ aws glue start-job-run --job-name fsxn-taxi-transform

Supervise el estado del trabajo. Por lo general, el trabajo se completa en uno o dos minutos con dos G.1X trabajadores.

$ aws glue get-job-runs --job-name fsxn-taxi-transform \ --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"

Cuando finalice el trabajo, compruebe la salida transformada en el volumen FSx para ONTAP.

$ aws s3 ls s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ PRE time_of_day=afternoon/ PRE time_of_day=evening/ PRE time_of_day=morning/ PRE time_of_day=night/

La salida se divide en cuatro directorios por hora del día. Cada partición contiene archivos Parquet con los datos transformados.

Paso 4: consulte los datos transformados

Ejecute un AWS Glue rastreador en la salida transformada para registrarla en y, a continuación AWS Glue Data Catalog, consulte con Athena.

$ # Create a crawler for the transformed data aws glue create-crawler \ --name fsxn-taxi-transformed-crawler \ --role my-glue-role-arn \ --database-name fsxn_taxi_demo \ --targets '{"S3Targets": [{"Path": "s3://my-ap-alias-ext-s3alias/taxi-data-transformed/"}]}' # Run the crawler aws glue start-crawler --name fsxn-taxi-transformed-crawler

Cuando el rastreador finalice, consulta los datos transformados en Athena. La estructura particionada permite a Athena escanear solo las particiones relevantes.

-- Average cost per mile by time of day SELECT time_of_day, COUNT(*) AS trip_count, ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile, ROUND(AVG(tip_amount), 2) AS avg_tip FROM fsxn_taxi_demo.taxi_data_transformed GROUP BY time_of_day ORDER BY trip_count DESC
-- Busiest pickup locations during morning rush SELECT PULocationID AS pickup_location, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance FROM fsxn_taxi_demo.taxi_data_transformed WHERE time_of_day = 'morning' GROUP BY PULocationID ORDER BY trip_count DESC LIMIT 10

Como los datos están divididos en particionestime_of_day, la segunda consulta escanea solo la morning partición, lo que reduce la cantidad de datos leídos y mejora el rendimiento de la consulta.

Consideraciones

  • Se requiere el origen de Internet. AWS Glue los trabajos acceden a Amazon S3 desde una infraestructura gestionada externa a su VPC. Debe utilizar un punto de acceso originado en Internet.

  • Lee y escribe. AWS Glue Los trabajos de ETL pueden leer y escribir en su volumen FSx for ONTAP a través del punto de acceso. La política del punto de acceso y el usuario del sistema de archivos deben permitir tanto comos3:GetObject. s3:PutObject

  • Dimensionamiento de los trabajadores. La cantidad y el tipo de AWS Glue trabajadores afectan el desempeño laboral y el costo. Para el conjunto de datos de muestra de 48 MB, dos G.1X trabajadores son suficientes. Para conjuntos de datos más grandes, aumente el número de trabajadores o utilice G.2X trabajadores.

  • Particionamiento. La escritura de resultados particionados mejora el rendimiento de las consultas posteriores en Athena y otros servicios de análisis. Elija las claves de partición en función de cómo se consultan normalmente los datos.

  • Almacenamiento de scripts. AWS Glue carga los scripts ETL de Amazon S3 al iniciar el trabajo. Este tutorial almacena el script en el punto de acceso para que el script se encuentre junto a los datos, pero también puede alojarlo en un bucket estándar de Amazon S3. Si utiliza un depósito independiente, amplíe la política en línea del rol con s3:GetObject el ARN del depósito de scripts.

Limpieza

Para evitar cargos continuos, elimina los recursos que creaste en este tutorial.

En el editor de consultas de Athena, coloca la tabla creada por el rastreador:

DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
$ # Delete the Glue job and crawler aws glue delete-job --name fsxn-taxi-transform aws glue delete-crawler --name fsxn-taxi-transformed-crawler # Delete the ETL script and transformed data from the access point aws s3 rm s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py aws s3 rm s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ --recursive # Delete the IAM role aws iam delete-role-policy \ --role-name fsxn-tutorial-glue-etl-role \ --policy-name glue-fsxn-access aws iam delete-role --role-name fsxn-tutorial-glue-etl-role