

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用构建 ETL 管道 AWS Glue
<a name="tutorial-transform-data-with-glue"></a>

数据工程团队通常会通过 NFS 或 SMB 将来自应用程序、每日文件丢弃或合作伙伴集成的 ONTAP 量的原始数据存放到 FSx 上。为下游分析准备这些数据需要大规模读取、转换、丰富或重新分区，并将精选的输出提供给分析师和应用程序。

通过连接到 FSx for ONTAP 卷的 Amazon S3 接入点， AWS Glue 读取源数据，根据你选择的运行时间（Apache Spark、Python shell 或 Ray）对其进行转换，然后将精选的输出写回同一个卷。原始数据集和精选数据集都保留在 FSx for ONTAP 上，因此卷的快照、备份和保留策略在整个管道中统一适用。由于可以通过 NFS、SMB 和 Amazon S3 API 同时访问适用于 ONTAP 卷的 FSx，因此原始数据可以由 NFS 或 SMB 客户端生成，精选的输出可以由这些协议中的任何一个使用。

在本教程中，您将使用教程中的纽约出租车行[使用 Amazon Athena 使用 SQL 查询文件](tutorial-query-data-with-athena.md)程数据集。 AWS Glue ETL 作业读取原始 Parquet 数据，添加计算列，筛选无效记录，并将转换后的输出写回按时间分区的卷中。

**注意**  
本教程大约需要 **25 到 35 分钟**才能完成。 AWS 服务 使用者会对您创建的资源产生费用。如果您及时完成所有步骤，包括**清理**部分，则美国东部（弗吉尼亚北部）的预期费用将低于 **1美元** AWS 区域。该估算值不包括 FSx 对 ONTAP 容量本身的持续费用。

## 先决条件
<a name="tutorial-glue-prerequisites"></a>

在开始之前，请确保您具有以下各项：
+ 完成[使用 Amazon Athena 使用 SQL 查询文件](tutorial-query-data-with-athena.md)本教程的步骤 1 到 3。该过程将 NYC Taxi 数据集上传到接入点，在中 AWS Glue Data Catalog创建`fsxn_taxi_demo`数据库并注册`taxi_data`表。本教程以这些资源为基础，因此在完成本教程之前，请勿运行 Athena 教程**的 “清理**” 部分。
+ 的 IAM 角色 AWS Glue 具有内联策略，该策略授予对 CloudWatch 日志的写 read/write 入权限、访问点的访问权限和对本教程使用的 AWS Glue Data Catalog 数据库的访问权限。以下步骤创建具有本教程所需最低权限的角色。

  1. 将以下信任策略另存为`glue-trust-policy.json`。它 AWS Glue 允许扮演这个角色。

     ```
     {
         "Version": "2012-10-17", 		 	 	 
         "Statement": [
             {
                 "Effect": "Allow",
                 "Principal": {"Service": "glue.amazonaws.com"},
                 "Action": "sts:AssumeRole"
             }
         ]
     }
     ```

  1. 将以下权限策略另存为`glue-permissions.json`。用您的值替换`{{region}}``{{account-id}}`、和`{{access-point-name}}`。

     ```
     {
         "Version": "2012-10-17", 		 	 	 
         "Statement": [
             {
                 "Sid": "Logs",
                 "Effect": "Allow",
                 "Action": [
                     "logs:CreateLogGroup",
                     "logs:CreateLogStream",
                     "logs:PutLogEvents"
                 ],
                 "Resource": "arn:aws:logs:{{region}}:{{account-id}}:log-group:/aws-glue/*"
             },
             {
                 "Sid": "AccessPoint",
                 "Effect": "Allow",
                 "Action": [
                     "s3:GetObject",
                     "s3:PutObject",
                     "s3:ListBucket",
                     "s3:DeleteObject"
                 ],
                 "Resource": [
                     "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}",
                     "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
                 ]
             },
             {
                 "Sid": "DataCatalog",
                 "Effect": "Allow",
                 "Action": [
                     "glue:GetDatabase",
                     "glue:GetTable",
                     "glue:GetTables",
                     "glue:CreateTable",
                     "glue:UpdateTable",
                     "glue:DeleteTable",
                     "glue:BatchCreatePartition",
                     "glue:BatchDeletePartition",
                     "glue:CreatePartition",
                     "glue:UpdatePartition",
                     "glue:GetPartition",
                     "glue:GetPartitions"
                 ],
                 "Resource": [
                     "arn:aws:glue:{{region}}:{{account-id}}:catalog",
                     "arn:aws:glue:{{region}}:{{account-id}}:database/fsxn_taxi_demo",
                     "arn:aws:glue:{{region}}:{{account-id}}:table/fsxn_taxi_demo/*"
                 ]
             }
         ]
     }
     ```

  1. 创建角色并附加内联策略。

     ```
     $ aws iam create-role \
         --role-name {{fsxn-tutorial-glue-etl-role}} \
         --assume-role-policy-document file://glue-trust-policy.json
     
     aws iam put-role-policy \
         --role-name {{fsxn-tutorial-glue-etl-role}} \
         --policy-name glue-fsxn-access \
         --policy-document file://glue-permissions.json
     ```

  本教程将 ETL 脚本存储在接入点本身，因此不需要单独的 Amazon S3 存储桶。该`AccessPoint`语句涵盖脚本和出租车数据；该`DataCatalog`语句的作用域是对第 4 步中爬虫更新的`fsxn_taxi_demo`数据库的 AWS Glue 目录访问权限。

**重要**  
Amazon S3 接入点必须使用互联网网络来源。 AWS Glue 任务通过托管基础设施访问 Amazon S3，而不是从您的 VPC。

## 步骤 1：创建 ETL 脚本
<a name="tutorial-glue-create-script"></a>

以下 PySpark 脚本从 FSx 中读取 ONTAP 音量的原始出租车行程数据，应用转换并将结果写回音量。将此脚本另存为`taxi_transform.py`。

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

ap_alias = args['AP_ALIAS']

# Read raw taxi data from FSx through the access point
df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/")

# Transform: filter invalid records, add computed columns
transformed = df \
    .filter(col("trip_distance") > 0) \
    .filter(col("total_amount") > 0) \
    .filter(col("passenger_count") > 0) \
    .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \
    .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \
    .withColumn("cost_per_mile",
        spark_round(col("total_amount") / col("trip_distance"), 2)) \
    .withColumn("time_of_day",
        when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning")
        .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon")
        .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening")
        .otherwise("night")
    ) \
    .select(
        "tpep_pickup_datetime", "tpep_dropoff_datetime",
        "passenger_count", "trip_distance",
        "PULocationID", "DOLocationID",
        "fare_amount", "tip_amount", "total_amount",
        "pickup_hour", "pickup_day_of_week",
        "cost_per_mile", "time_of_day"
    )

# Write transformed data back to FSx, partitioned by time of day
transformed.write \
    .mode("overwrite") \
    .partitionBy("time_of_day") \
    .parquet(f"s3://{ap_alias}/taxi-data-transformed/")

job.commit()
```

该脚本执行以下转换：
+ **筛选**行程距离、票价或乘客人数为零或负的记录。
+ **添加计算列：**`pickup_hour`、`pickup_day_of_week``cost_per_mile`、和`time_of_day`（上午、下午、晚上或晚上）。
+ **选择**与分析相关的列子集。
+ 对输出进行@@ **分区**`time_of_day`，这样可以提高按时间段筛选时的查询性能。

## 步骤 2：上传脚本并创建作业
<a name="tutorial-glue-create-job"></a>

通过接入点将 ETL 脚本上传到您的 FSx for ONTAP 卷，然后 AWS Glue 创建一个引用该脚本的作业。 AWS Glue 在任务启动时从接入点加载脚本，就像从标准 Amazon S3 存储桶加载脚本一样。

```
$ # Upload the script to the access point
aws s3 cp taxi_transform.py \
    s3://{{my-ap-alias-ext-s3alias}}/glue-scripts/taxi_transform.py

# Create the Glue job
aws glue create-job \
    --name {{fsxn-taxi-transform}} \
    --role {{my-glue-role-arn}} \
    --command '{
        "Name": "glueetl",
        "ScriptLocation": "s3://{{my-ap-alias-ext-s3alias}}/glue-scripts/taxi_transform.py",
        "PythonVersion": "3"
    }' \
    --default-arguments '{
        "--AP_ALIAS": "{{my-ap-alias-ext-s3alias}}",
        "--job-language": "python"
    }' \
    --glue-version "4.0" \
    --number-of-workers 2 \
    --worker-type "G.1X"
```

## 步骤 3：运行作业
<a name="tutorial-glue-run-job"></a>

```
$ aws glue start-job-run --job-name {{fsxn-taxi-transform}}
```

监控作业状态。如果有两名工作 G.1X 人员，工作通常会在一到两分钟内完成。

```
$ aws glue get-job-runs --job-name {{fsxn-taxi-transform}} \
    --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"
```

任务完成后，在 FSx 上验证 ONTAP 卷的转换后的输出。

```
$ aws s3 ls s3://{{my-ap-alias-ext-s3alias}}/taxi-data-transformed/
                           PRE time_of_day=afternoon/
                           PRE time_of_day=evening/
                           PRE time_of_day=morning/
                           PRE time_of_day=night/
```

输出按一天中的时间分为四个目录。每个分区都包含带有转换数据的 Parquet 文件。

## 步骤 4：查询转换后的数据
<a name="tutorial-glue-query-transformed"></a>

对转换后的输出运行 AWS Glue 爬虫以将其注册到中 AWS Glue Data Catalog，然后使用 Athena 进行查询。

```
$ # Create a crawler for the transformed data
aws glue create-crawler \
    --name {{fsxn-taxi-transformed-crawler}} \
    --role {{my-glue-role-arn}} \
    --database-name {{fsxn_taxi_demo}} \
    --targets '{"S3Targets": [{"Path": "s3://{{my-ap-alias-ext-s3alias}}/taxi-data-transformed/"}]}'

# Run the crawler
aws glue start-crawler --name {{fsxn-taxi-transformed-crawler}}
```

爬虫完成后，在 Athena 中查询转换后的数据。分区结构允许 Athena 仅扫描相关的分区。

```
-- Average cost per mile by time of day
SELECT
    time_of_day,
    COUNT(*) AS trip_count,
    ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile,
    ROUND(AVG(tip_amount), 2) AS avg_tip
FROM fsxn_taxi_demo.taxi_data_transformed
GROUP BY time_of_day
ORDER BY trip_count DESC
```

```
-- Busiest pickup locations during morning rush
SELECT
    PULocationID AS pickup_location,
    COUNT(*) AS trip_count,
    ROUND(AVG(trip_distance), 2) AS avg_distance
FROM fsxn_taxi_demo.taxi_data_transformed
WHERE time_of_day = 'morning'
GROUP BY PULocationID
ORDER BY trip_count DESC
LIMIT 10
```

由于数据是按分区的`time_of_day`，因此第二个查询仅扫描该`morning`分区，从而减少了读取的数据量并提高了查询性能。

## 注意事项
<a name="tutorial-glue-considerations"></a>
+ **需要互联网来源。** AWS Glue 任务可从您的 VPC 外部的托管基础设施访问 Amazon S3。您必须使用源自互联网的接入点。
+ **读和写。** AWS Glue ETL 作业可以通过接入点读取和写入您的 FSx for ONTAP 卷。接入点策略和文件系统用户必须同时允许`s3:GetObject`和`s3:PutObject`。
+ **工人尺码。** AWS Glue 工人的数量和类型会影响工作绩效和成本。对于 48 MB 的样本数据集，两个 G.1X 工作人员就足够了。对于较大的数据集，请增加工作人员数量或使用 G.2X 工作线程。
+ **分区。**编写分区输出可提高 Athena 和其他分析服务的下游查询性能。根据通常的数据查询方式选择分区键。
+ **脚本存储。** AWS Glue 在任务启动时从 Amazon S3 加载 ETL 脚本。本教程将脚本存储在接入点上，以便脚本与数据并存，但您也可以将其托管在标准的 Amazon S3 存储桶中。如果您使用独立存储桶，请在脚本存储桶 ARN `s3:GetObject` 上使用该角色的内联策略进行扩展。

## 清理
<a name="tutorial-glue-clean-up"></a>

为避免持续收费，请删除您在本教程中创建的资源。

在 Athena 查询编辑器中，删除爬虫创建的表：

```
DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
```

```
$ # Delete the Glue job and crawler
aws glue delete-job --name {{fsxn-taxi-transform}}
aws glue delete-crawler --name {{fsxn-taxi-transformed-crawler}}

# Delete the ETL script and transformed data from the access point
aws s3 rm s3://{{my-ap-alias-ext-s3alias}}/glue-scripts/taxi_transform.py
aws s3 rm s3://{{my-ap-alias-ext-s3alias}}/taxi-data-transformed/ --recursive

# Delete the IAM role
aws iam delete-role-policy \
    --role-name {{fsxn-tutorial-glue-etl-role}} \
    --policy-name glue-fsxn-access
aws iam delete-role --role-name {{fsxn-tutorial-glue-etl-role}}
```