本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用构建 ETL 管道 AWS Glue
数据工程团队通常会通过 NFS 或 SMB 将来自应用程序、每日文件丢弃或合作伙伴集成的 ONTAP 量的原始数据存放到 FSx 上。为下游分析准备这些数据需要大规模读取、转换、丰富或重新分区,并将精选的输出提供给分析师和应用程序。
通过连接到 FSx for ONTAP 卷的 Amazon S3 接入点, AWS Glue 读取源数据,根据你选择的运行时间(Apache Spark、Python shell 或 Ray)对其进行转换,然后将精选的输出写回同一个卷。原始数据集和精选数据集都保留在 FSx for ONTAP 上,因此卷的快照、备份和保留策略在整个管道中统一适用。由于可以通过 NFS、SMB 和 Amazon S3 API 同时访问适用于 ONTAP 卷的 FSx,因此原始数据可以由 NFS 或 SMB 客户端生成,精选的输出可以由这些协议中的任何一个使用。
在本教程中,您将使用教程中的纽约出租车行使用 Amazon Athena 使用 SQL 查询文件程数据集。 AWS Glue ETL 作业读取原始 Parquet 数据,添加计算列,筛选无效记录,并将转换后的输出写回按时间分区的卷中。
注意
本教程大约需要 25 到 35 分钟才能完成。 AWS 服务 使用者会对您创建的资源产生费用。如果您及时完成所有步骤,包括清理部分,则美国东部(弗吉尼亚北部)的预期费用将低于 1美元 AWS 区域。该估算值不包括 FSx 对 ONTAP 容量本身的持续费用。
先决条件
在开始之前,请确保您具有以下各项:
完成使用 Amazon Athena 使用 SQL 查询文件本教程的步骤 1 到 3。该过程将 NYC Taxi 数据集上传到接入点,在中 AWS Glue Data Catalog创建
fsxn_taxi_demo数据库并注册taxi_data表。本教程以这些资源为基础,因此在完成本教程之前,请勿运行 Athena 教程的 “清理” 部分。的 IAM 角色 AWS Glue 具有内联策略,该策略授予对 CloudWatch 日志的写 read/write 入权限、访问点的访问权限和对本教程使用的 AWS Glue Data Catalog 数据库的访问权限。以下步骤创建具有本教程所需最低权限的角色。
将以下信任策略另存为
glue-trust-policy.json。它 AWS Glue 允许扮演这个角色。{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }将以下权限策略另存为
glue-permissions.json。用您的值替换region、和account-id。access-point-name{ "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:account-id:log-group:/aws-glue/*" }, { "Sid": "AccessPoint", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:ListBucket", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "DataCatalog", "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetTable", "glue:GetTables", "glue:CreateTable", "glue:UpdateTable", "glue:DeleteTable", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions" ], "Resource": [ "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/fsxn_taxi_demo", "arn:aws:glue:region:account-id:table/fsxn_taxi_demo/*" ] } ] }创建角色并附加内联策略。
$aws iam create-role \ --role-namefsxn-tutorial-glue-etl-role\ --assume-role-policy-document file://glue-trust-policy.json aws iam put-role-policy \ --role-namefsxn-tutorial-glue-etl-role\ --policy-name glue-fsxn-access \ --policy-document file://glue-permissions.json
本教程将 ETL 脚本存储在接入点本身,因此不需要单独的 Amazon S3 存储桶。该
AccessPoint语句涵盖脚本和出租车数据;该DataCatalog语句的作用域是对第 4 步中爬虫更新的fsxn_taxi_demo数据库的 AWS Glue 目录访问权限。
重要
Amazon S3 接入点必须使用互联网网络来源。 AWS Glue 任务通过托管基础设施访问 Amazon S3,而不是从您的 VPC。
步骤 1:创建 ETL 脚本
以下 PySpark 脚本从 FSx 中读取 ONTAP 音量的原始出租车行程数据,应用转换并将结果写回音量。将此脚本另存为taxi_transform.py。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ap_alias = args['AP_ALIAS'] # Read raw taxi data from FSx through the access point df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/") # Transform: filter invalid records, add computed columns transformed = df \ .filter(col("trip_distance") > 0) \ .filter(col("total_amount") > 0) \ .filter(col("passenger_count") > 0) \ .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \ .withColumn("cost_per_mile", spark_round(col("total_amount") / col("trip_distance"), 2)) \ .withColumn("time_of_day", when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning") .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon") .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening") .otherwise("night") ) \ .select( "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "PULocationID", "DOLocationID", "fare_amount", "tip_amount", "total_amount", "pickup_hour", "pickup_day_of_week", "cost_per_mile", "time_of_day" ) # Write transformed data back to FSx, partitioned by time of day transformed.write \ .mode("overwrite") \ .partitionBy("time_of_day") \ .parquet(f"s3://{ap_alias}/taxi-data-transformed/") job.commit()
该脚本执行以下转换:
筛选行程距离、票价或乘客人数为零或负的记录。
添加计算列:
pickup_hour、pickup_day_of_weekcost_per_mile、和time_of_day(上午、下午、晚上或晚上)。选择与分析相关的列子集。
对输出进行@@ 分区
time_of_day,这样可以提高按时间段筛选时的查询性能。
步骤 2:上传脚本并创建作业
通过接入点将 ETL 脚本上传到您的 FSx for ONTAP 卷,然后 AWS Glue 创建一个引用该脚本的作业。 AWS Glue 在任务启动时从接入点加载脚本,就像从标准 Amazon S3 存储桶加载脚本一样。
$# Upload the script to the access point aws s3 cp taxi_transform.py \ s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py # Create the Glue job aws glue create-job \ --namefsxn-taxi-transform\ --rolemy-glue-role-arn\ --command '{ "Name": "glueetl", "ScriptLocation": "s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py", "PythonVersion": "3" }' \ --default-arguments '{ "--AP_ALIAS": "my-ap-alias-ext-s3alias", "--job-language": "python" }' \ --glue-version "4.0" \ --number-of-workers 2 \ --worker-type "G.1X"
步骤 3:运行作业
$aws glue start-job-run --job-namefsxn-taxi-transform
监控作业状态。如果有两名工作 G.1X 人员,工作通常会在一到两分钟内完成。
$aws glue get-job-runs --job-namefsxn-taxi-transform\ --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"
任务完成后,在 FSx 上验证 ONTAP 卷的转换后的输出。
$aws s3 ls s3://my-ap-alias-ext-s3alias/taxi-data-transformed/PRE time_of_day=afternoon/ PRE time_of_day=evening/ PRE time_of_day=morning/ PRE time_of_day=night/
输出按一天中的时间分为四个目录。每个分区都包含带有转换数据的 Parquet 文件。
步骤 4:查询转换后的数据
对转换后的输出运行 AWS Glue 爬虫以将其注册到中 AWS Glue Data Catalog,然后使用 Athena 进行查询。
$# Create a crawler for the transformed data aws glue create-crawler \ --namefsxn-taxi-transformed-crawler\ --rolemy-glue-role-arn\ --database-namefsxn_taxi_demo\ --targets '{"S3Targets": [{"Path": "s3://my-ap-alias-ext-s3alias/taxi-data-transformed/"}]}' # Run the crawler aws glue start-crawler --namefsxn-taxi-transformed-crawler
爬虫完成后,在 Athena 中查询转换后的数据。分区结构允许 Athena 仅扫描相关的分区。
-- Average cost per mile by time of day SELECT time_of_day, COUNT(*) AS trip_count, ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile, ROUND(AVG(tip_amount), 2) AS avg_tip FROM fsxn_taxi_demo.taxi_data_transformed GROUP BY time_of_day ORDER BY trip_count DESC
-- Busiest pickup locations during morning rush SELECT PULocationID AS pickup_location, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance FROM fsxn_taxi_demo.taxi_data_transformed WHERE time_of_day = 'morning' GROUP BY PULocationID ORDER BY trip_count DESC LIMIT 10
由于数据是按分区的time_of_day,因此第二个查询仅扫描该morning分区,从而减少了读取的数据量并提高了查询性能。
注意事项
需要互联网来源。 AWS Glue 任务可从您的 VPC 外部的托管基础设施访问 Amazon S3。您必须使用源自互联网的接入点。
读和写。 AWS Glue ETL 作业可以通过接入点读取和写入您的 FSx for ONTAP 卷。接入点策略和文件系统用户必须同时允许
s3:GetObject和s3:PutObject。工人尺码。 AWS Glue 工人的数量和类型会影响工作绩效和成本。对于 48 MB 的样本数据集,两个 G.1X 工作人员就足够了。对于较大的数据集,请增加工作人员数量或使用 G.2X 工作线程。
分区。编写分区输出可提高 Athena 和其他分析服务的下游查询性能。根据通常的数据查询方式选择分区键。
脚本存储。 AWS Glue 在任务启动时从 Amazon S3 加载 ETL 脚本。本教程将脚本存储在接入点上,以便脚本与数据并存,但您也可以将其托管在标准的 Amazon S3 存储桶中。如果您使用独立存储桶,请在脚本存储桶 ARN
s3:GetObject上使用该角色的内联策略进行扩展。
清理
为避免持续收费,请删除您在本教程中创建的资源。
在 Athena 查询编辑器中,删除爬虫创建的表:
DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
$# Delete the Glue job and crawler aws glue delete-job --namefsxn-taxi-transformaws glue delete-crawler --namefsxn-taxi-transformed-crawler# Delete the ETL script and transformed data from the access point aws s3 rm s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py aws s3 rm s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ --recursive # Delete the IAM role aws iam delete-role-policy \ --role-namefsxn-tutorial-glue-etl-role\ --policy-name glue-fsxn-access aws iam delete-role --role-namefsxn-tutorial-glue-etl-role