Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Cree canalizaciones de ETL mediante AWS Glue
Los equipos de ingeniería de datos suelen tener datos sin procesar que llegan a un volumen de FSx for ONTAP desde aplicaciones, descargas diarias de archivos o integraciones de socios a través de NFS o SMB. Preparar esos datos para el análisis posterior requiere leerlos, transformarlos, enriquecerlos o reparticionarlos a escala y poner los resultados seleccionados a disposición de los analistas y las aplicaciones.
Con un punto de acceso Amazon S3 conectado al volumen FSx for ONTAP, AWS Glue lee los datos de origen, los transforma según el tiempo de ejecución que elija (Apache Spark, Python shell o Ray) y vuelve a escribir la salida seleccionada en el mismo volumen. Tanto los conjuntos de datos sin procesar como los seleccionados permanecen en FSx for ONTAP, por lo que las políticas de instantáneas, copias de seguridad y retención del volumen se aplican de manera uniforme en todo el proceso. Como se puede acceder simultáneamente a un volumen de FSx for ONTAP a través de NFS, SMB y la API de Amazon S3, los clientes de NFS o SMB pueden generar los datos sin procesar y cualquiera de esos protocolos puede consumir el resultado seleccionado.
En este tutorial, utilizará el conjunto de datos de viajes en taxi de Nueva York del tutorial. Consulte archivos con SQL mediante Amazon Athena Un trabajo de AWS Glue ETL lee los datos sin procesar de Parquet, añade columnas calculadas, filtra los registros no válidos y vuelve a escribir la salida transformada en el volumen dividido por hora del día.
nota
Este tutorial tarda aproximadamente de 25 a 35 minutos en completarse. Los Servicios de AWS usuarios incurren en cargos por los recursos que cree. Si completa todos los pasos, incluida la sección de limpieza, con prontitud, el coste previsto será inferior a 1 dólar en la zona este de EE. UU. (Virginia del Norte) Región de AWS. Esta estimación no incluye los cargos continuos del FSx para el propio volumen de ONTAP.
Requisitos previos
Antes de empezar, asegúrese de que tiene lo siguiente:
Complete los pasos 1 a 3 del Consulte archivos con SQL mediante Amazon Athena tutorial. Este procedimiento carga el conjunto de datos de NYC Taxi al punto de acceso, crea la
fsxn_taxi_demobase de datos en el AWS Glue Data Catalog y registra lataxi_datatabla. Este tutorial se basa en esos recursos, así que no ejecute la sección de limpieza del tutorial de Athena hasta que haya terminado este tutorial.Un rol de IAM AWS Glue con una política en línea que otorga acceso de escritura a CloudWatch los registros, read/write acceso al punto de acceso y acceso a la AWS Glue Data Catalog base de datos utilizada en este tutorial. Los siguientes pasos crean un rol con los permisos mínimos necesarios para este tutorial.
Guarde la siguiente política de confianza como
glue-trust-policy.json. Permite AWS Glue asumir el rol.{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }Guarde la siguiente política de permisos como
glue-permissions.json. Sustituyaregion, yaccount-idpor sus valores.access-point-name{ "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:account-id:log-group:/aws-glue/*" }, { "Sid": "AccessPoint", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:ListBucket", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "DataCatalog", "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetTable", "glue:GetTables", "glue:CreateTable", "glue:UpdateTable", "glue:DeleteTable", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions" ], "Resource": [ "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/fsxn_taxi_demo", "arn:aws:glue:region:account-id:table/fsxn_taxi_demo/*" ] } ] }Cree el rol y adjunte la política en línea.
$aws iam create-role \ --role-namefsxn-tutorial-glue-etl-role\ --assume-role-policy-document file://glue-trust-policy.json aws iam put-role-policy \ --role-namefsxn-tutorial-glue-etl-role\ --policy-name glue-fsxn-access \ --policy-document file://glue-permissions.json
Este tutorial almacena el script ETL en el propio punto de acceso, por lo que no se requiere un bucket de Amazon S3 independiente. La
AccessPointdeclaración abarca tanto el script como los datos del taxi; laDataCatalogdeclaración abarca el acceso al AWS Glue catálogo a lafsxn_taxi_demobase de datos que el rastreador actualiza en el paso 4.
importante
El punto de acceso Amazon S3 debe utilizar un origen de red de Internet. AWS Glue los trabajos acceden a Amazon S3 desde una infraestructura gestionada, no desde su VPC.
Paso 1: Cree el script ETL
El siguiente PySpark script lee los datos sin procesar del viaje en taxi del volumen de FSx for ONTAP, aplica transformaciones y vuelve a escribir los resultados en el volumen. Guarde este script como. taxi_transform.py
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ap_alias = args['AP_ALIAS'] # Read raw taxi data from FSx through the access point df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/") # Transform: filter invalid records, add computed columns transformed = df \ .filter(col("trip_distance") > 0) \ .filter(col("total_amount") > 0) \ .filter(col("passenger_count") > 0) \ .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \ .withColumn("cost_per_mile", spark_round(col("total_amount") / col("trip_distance"), 2)) \ .withColumn("time_of_day", when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning") .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon") .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening") .otherwise("night") ) \ .select( "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "PULocationID", "DOLocationID", "fare_amount", "tip_amount", "total_amount", "pickup_hour", "pickup_day_of_week", "cost_per_mile", "time_of_day" ) # Write transformed data back to FSx, partitioned by time of day transformed.write \ .mode("overwrite") \ .partitionBy("time_of_day") \ .parquet(f"s3://{ap_alias}/taxi-data-transformed/") job.commit()
El script realiza las siguientes transformaciones:
Filtra los registros con un número cero o negativo de distancia de viaje, tarifa o número de pasajeros.
Agrega las columnas calculadas:
pickup_hourpickup_day_of_weekcost_per_mile,, ytime_of_day(mañana, tarde, tarde o noche).Selecciona un subconjunto de columnas relevantes para el análisis.
Divide la salida por
time_of_day, lo que mejora el rendimiento de las consultas al filtrar por período de tiempo.
Paso 2: carga el script y crea el trabajo
Cargue el script ETL en su volumen de FSx for ONTAP a través del punto de acceso y cree un AWS Glue trabajo que haga referencia a él. AWS Glue carga el script desde el punto de acceso al inicio del trabajo, del mismo modo que carga los scripts desde un bucket estándar de Amazon S3.
$# Upload the script to the access point aws s3 cp taxi_transform.py \ s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py # Create the Glue job aws glue create-job \ --namefsxn-taxi-transform\ --rolemy-glue-role-arn\ --command '{ "Name": "glueetl", "ScriptLocation": "s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py", "PythonVersion": "3" }' \ --default-arguments '{ "--AP_ALIAS": "my-ap-alias-ext-s3alias", "--job-language": "python" }' \ --glue-version "4.0" \ --number-of-workers 2 \ --worker-type "G.1X"
Paso 3: Ejecute el trabajo
$aws glue start-job-run --job-namefsxn-taxi-transform
Supervise el estado del trabajo. Por lo general, el trabajo se completa en uno o dos minutos con dos G.1X trabajadores.
$aws glue get-job-runs --job-namefsxn-taxi-transform\ --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"
Cuando finalice el trabajo, compruebe la salida transformada en el volumen FSx para ONTAP.
$aws s3 ls s3://my-ap-alias-ext-s3alias/taxi-data-transformed/PRE time_of_day=afternoon/ PRE time_of_day=evening/ PRE time_of_day=morning/ PRE time_of_day=night/
La salida se divide en cuatro directorios por hora del día. Cada partición contiene archivos Parquet con los datos transformados.
Paso 4: consulte los datos transformados
Ejecute un AWS Glue rastreador en la salida transformada para registrarla en y, a continuación AWS Glue Data Catalog, consulte con Athena.
$# Create a crawler for the transformed data aws glue create-crawler \ --namefsxn-taxi-transformed-crawler\ --rolemy-glue-role-arn\ --database-namefsxn_taxi_demo\ --targets '{"S3Targets": [{"Path": "s3://my-ap-alias-ext-s3alias/taxi-data-transformed/"}]}' # Run the crawler aws glue start-crawler --namefsxn-taxi-transformed-crawler
Cuando el rastreador finalice, consulta los datos transformados en Athena. La estructura particionada permite a Athena escanear solo las particiones relevantes.
-- Average cost per mile by time of day SELECT time_of_day, COUNT(*) AS trip_count, ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile, ROUND(AVG(tip_amount), 2) AS avg_tip FROM fsxn_taxi_demo.taxi_data_transformed GROUP BY time_of_day ORDER BY trip_count DESC
-- Busiest pickup locations during morning rush SELECT PULocationID AS pickup_location, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance FROM fsxn_taxi_demo.taxi_data_transformed WHERE time_of_day = 'morning' GROUP BY PULocationID ORDER BY trip_count DESC LIMIT 10
Como los datos están divididos en particionestime_of_day, la segunda consulta escanea solo la morning partición, lo que reduce la cantidad de datos leídos y mejora el rendimiento de la consulta.
Consideraciones
Se requiere el origen de Internet. AWS Glue los trabajos acceden a Amazon S3 desde una infraestructura gestionada externa a su VPC. Debe utilizar un punto de acceso originado en Internet.
Lee y escribe. AWS Glue Los trabajos de ETL pueden leer y escribir en su volumen FSx for ONTAP a través del punto de acceso. La política del punto de acceso y el usuario del sistema de archivos deben permitir tanto como
s3:GetObject.s3:PutObjectDimensionamiento de los trabajadores. La cantidad y el tipo de AWS Glue trabajadores afectan el desempeño laboral y el costo. Para el conjunto de datos de muestra de 48 MB, dos G.1X trabajadores son suficientes. Para conjuntos de datos más grandes, aumente el número de trabajadores o utilice G.2X trabajadores.
Particionamiento. La escritura de resultados particionados mejora el rendimiento de las consultas posteriores en Athena y otros servicios de análisis. Elija las claves de partición en función de cómo se consultan normalmente los datos.
Almacenamiento de scripts. AWS Glue carga los scripts ETL de Amazon S3 al iniciar el trabajo. Este tutorial almacena el script en el punto de acceso para que el script se encuentre junto a los datos, pero también puede alojarlo en un bucket estándar de Amazon S3. Si utiliza un depósito independiente, amplíe la política en línea del rol con
s3:GetObjectel ARN del depósito de scripts.
Limpieza
Para evitar cargos continuos, elimina los recursos que creaste en este tutorial.
En el editor de consultas de Athena, coloca la tabla creada por el rastreador:
DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
$# Delete the Glue job and crawler aws glue delete-job --namefsxn-taxi-transformaws glue delete-crawler --namefsxn-taxi-transformed-crawler# Delete the ETL script and transformed data from the access point aws s3 rm s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py aws s3 rm s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ --recursive # Delete the IAM role aws iam delete-role-policy \ --role-namefsxn-tutorial-glue-etl-role\ --policy-name glue-fsxn-access aws iam delete-role --role-namefsxn-tutorial-glue-etl-role