

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 建置 ETL 管道 AWS Glue
<a name="tutorial-transform-data-with-glue"></a>

資料工程團隊通常具有來自應用程式、每日檔案捨棄或合作夥伴透過 NFS 或 SMB 整合之 ONTAP 磁碟區的 FSx 上的原始資料登陸。準備下游分析的資料需要大規模讀取、轉換、擴充或重新分割資料，並讓分析師和應用程式可以使用精心策劃的輸出。

將 Amazon S3 存取點連接到 FSx for ONTAP 磁碟區， AWS Glue 讀取來源資料，使用您選擇的執行時間 (Apache Spark、Python shell 或 Ray) 進行轉換，並將精選的輸出寫回相同的磁碟區。原始和精選資料集都會保留在 FSx for ONTAP 上，因此磁碟區的快照、備份和保留政策會統一套用至整個管道。由於 FSx for ONTAP 磁碟區可透過 NFS、SMB 和 Amazon S3 API 同時存取，因此原始資料可由 NFS 或 SMB 用戶端產生，而任何這些通訊協定都可以使用精選的輸出。

在本教學課程中，您會使用[使用 Amazon Athena 查詢具有 SQL 的檔案](tutorial-query-data-with-athena.md)教學課程中的 NYC Taxi 行程資料集。 AWS Glue ETL 任務會讀取原始 Parquet 資料、新增運算的資料欄、篩選無效的記錄，以及將轉換後的輸出寫回依當日時間分割的磁碟區。

**注意**  
本教學課程約需 **25 到 35 分鐘**完成。 AWS 服務 使用的 會針對您建立的資源產生費用。如果您立即完成所有步驟，包括**清除**區段，美國東部 （維吉尼亞北部） 的預期成本不到 **1 美元**。 AWS 區域此預估不包含 FSx for ONTAP 磁碟區本身的持續費用。

## 先決條件
<a name="tutorial-glue-prerequisites"></a>

開始前，請確定您具有下列項目：
+ 完成[使用 Amazon Athena 查詢具有 SQL 的檔案](tutorial-query-data-with-athena.md)教學課程的步驟 1 到 3。此程序會將 NYC Taxi 資料集上傳至存取點，在 中建立`fsxn_taxi_demo`資料庫 AWS Glue Data Catalog，並註冊`taxi_data`資料表。本教學課程以這些資源為基礎，因此在完成本教學課程之前，請勿執行 Athena 教學課程的**清除**區段。
+ 的 IAM 角色 AWS Glue 具有內嵌政策，可授予 CloudWatch Logs 的寫入存取權、存取點的讀取/寫入存取權，以及本教學課程使用的 AWS Glue Data Catalog 資料庫存取權。下列步驟會建立具有本教學課程所需最低許可的角色。

  1. 將下列信任政策儲存為 `glue-trust-policy.json`。它允許 AWS Glue 擔任該角色。

     ```
     {
         "Version": "2012-10-17", 		 	 	 
         "Statement": [
             {
                 "Effect": "Allow",
                 "Principal": {"Service": "glue.amazonaws.com"},
                 "Action": "sts:AssumeRole"
             }
         ]
     }
     ```

  1. 將下列許可政策儲存為 `glue-permissions.json`。將 `{{region}}`、 `{{account-id}}`和 取代`{{access-point-name}}`為您的值。

     ```
     {
         "Version": "2012-10-17", 		 	 	 
         "Statement": [
             {
                 "Sid": "Logs",
                 "Effect": "Allow",
                 "Action": [
                     "logs:CreateLogGroup",
                     "logs:CreateLogStream",
                     "logs:PutLogEvents"
                 ],
                 "Resource": "arn:aws:logs:{{region}}:{{account-id}}:log-group:/aws-glue/*"
             },
             {
                 "Sid": "AccessPoint",
                 "Effect": "Allow",
                 "Action": [
                     "s3:GetObject",
                     "s3:PutObject",
                     "s3:ListBucket",
                     "s3:DeleteObject"
                 ],
                 "Resource": [
                     "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}",
                     "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
                 ]
             },
             {
                 "Sid": "DataCatalog",
                 "Effect": "Allow",
                 "Action": [
                     "glue:GetDatabase",
                     "glue:GetTable",
                     "glue:GetTables",
                     "glue:CreateTable",
                     "glue:UpdateTable",
                     "glue:DeleteTable",
                     "glue:BatchCreatePartition",
                     "glue:BatchDeletePartition",
                     "glue:CreatePartition",
                     "glue:UpdatePartition",
                     "glue:GetPartition",
                     "glue:GetPartitions"
                 ],
                 "Resource": [
                     "arn:aws:glue:{{region}}:{{account-id}}:catalog",
                     "arn:aws:glue:{{region}}:{{account-id}}:database/fsxn_taxi_demo",
                     "arn:aws:glue:{{region}}:{{account-id}}:table/fsxn_taxi_demo/*"
                 ]
             }
         ]
     }
     ```

  1. 建立角色並連接內嵌政策。

     ```
     $ aws iam create-role \
         --role-name {{fsxn-tutorial-glue-etl-role}} \
         --assume-role-policy-document file://glue-trust-policy.json
     
     aws iam put-role-policy \
         --role-name {{fsxn-tutorial-glue-etl-role}} \
         --policy-name glue-fsxn-access \
         --policy-document file://glue-permissions.json
     ```

  本教學課程會將 ETL 指令碼存放在存取點本身，因此不需要單獨的 Amazon S3 儲存貯體。`AccessPoint` 陳述式涵蓋指令碼和計程車資料；陳述`DataCatalog`式會 AWS Glue 範圍目錄存取步驟 4 中爬蟲程式更新的`fsxn_taxi_demo`資料庫。

**重要**  
Amazon S3 存取點必須使用網際網路原始伺服器。 AWS Glue 工作必須從受管基礎設施存取 Amazon S3，而不是從您的 VPC 存取。

## 步驟 1：建立 ETL 指令碼
<a name="tutorial-glue-create-script"></a>

下列 PySpark 指令碼會從 FSx 讀取 ONTAP 磁碟區的原始計程車行程資料、套用轉換，並將結果寫入磁碟區。將此指令碼儲存為 `taxi_transform.py`。

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

ap_alias = args['AP_ALIAS']

# Read raw taxi data from FSx through the access point
df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/")

# Transform: filter invalid records, add computed columns
transformed = df \
    .filter(col("trip_distance") > 0) \
    .filter(col("total_amount") > 0) \
    .filter(col("passenger_count") > 0) \
    .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \
    .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \
    .withColumn("cost_per_mile",
        spark_round(col("total_amount") / col("trip_distance"), 2)) \
    .withColumn("time_of_day",
        when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning")
        .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon")
        .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening")
        .otherwise("night")
    ) \
    .select(
        "tpep_pickup_datetime", "tpep_dropoff_datetime",
        "passenger_count", "trip_distance",
        "PULocationID", "DOLocationID",
        "fare_amount", "tip_amount", "total_amount",
        "pickup_hour", "pickup_day_of_week",
        "cost_per_mile", "time_of_day"
    )

# Write transformed data back to FSx, partitioned by time of day
transformed.write \
    .mode("overwrite") \
    .partitionBy("time_of_day") \
    .parquet(f"s3://{ap_alias}/taxi-data-transformed/")

job.commit()
```

指令碼會執行下列轉換：
+ **篩選**零或負行程距離、票價或乘客計數的記錄。
+ **新增運算的資料欄：**`pickup_hour`、`cost_per_mile`、 `pickup_day_of_week`和 `time_of_day`（上午、下午、晚上或晚上）。
+ **選取**與分析相關的資料欄子集。
+ 依 **分割**輸出`time_of_day`，這可改善依時段篩選時的查詢效能。

## 步驟 2：上傳指令碼並建立任務
<a name="tutorial-glue-create-job"></a>

透過存取點將 ETL 指令碼上傳至 FSx for ONTAP 磁碟區，並建立 AWS Glue 參考的任務。在任務啟動時從存取點 AWS Glue 載入指令碼，就像從標準 Amazon S3 儲存貯體載入指令碼一樣。

```
$ # Upload the script to the access point
aws s3 cp taxi_transform.py \
    s3://{{my-ap-alias-ext-s3alias}}/glue-scripts/taxi_transform.py

# Create the Glue job
aws glue create-job \
    --name {{fsxn-taxi-transform}} \
    --role {{my-glue-role-arn}} \
    --command '{
        "Name": "glueetl",
        "ScriptLocation": "s3://{{my-ap-alias-ext-s3alias}}/glue-scripts/taxi_transform.py",
        "PythonVersion": "3"
    }' \
    --default-arguments '{
        "--AP_ALIAS": "{{my-ap-alias-ext-s3alias}}",
        "--job-language": "python"
    }' \
    --glue-version "4.0" \
    --number-of-workers 2 \
    --worker-type "G.1X"
```

## 步驟 3：執行任務
<a name="tutorial-glue-run-job"></a>

```
$ aws glue start-job-run --job-name {{fsxn-taxi-transform}}
```

監控任務狀態。任務通常會在一到兩分鐘內完成，其中包含兩個 G.1X 工作者。

```
$ aws glue get-job-runs --job-name {{fsxn-taxi-transform}} \
    --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"
```

當任務完成時，請驗證 FSx for ONTAP 磁碟區上的轉換輸出。

```
$ aws s3 ls s3://{{my-ap-alias-ext-s3alias}}/taxi-data-transformed/
                           PRE time_of_day=afternoon/
                           PRE time_of_day=evening/
                           PRE time_of_day=morning/
                           PRE time_of_day=night/
```

輸出會依一天中的時間分割成四個目錄。每個分割區都包含具有轉換資料的 Parquet 檔案。

## 步驟 4：查詢轉換的資料
<a name="tutorial-glue-query-transformed"></a>

在轉換的輸出上執行 AWS Glue 爬蟲程式以在 中註冊， AWS Glue Data Catalog然後向 Athena 查詢。

```
$ # Create a crawler for the transformed data
aws glue create-crawler \
    --name {{fsxn-taxi-transformed-crawler}} \
    --role {{my-glue-role-arn}} \
    --database-name {{fsxn_taxi_demo}} \
    --targets '{"S3Targets": [{"Path": "s3://{{my-ap-alias-ext-s3alias}}/taxi-data-transformed/"}]}'

# Run the crawler
aws glue start-crawler --name {{fsxn-taxi-transformed-crawler}}
```

爬蟲程式完成後，在 Athena 中查詢轉換的資料。分割的結構可讓 Athena 僅掃描相關的分割區。

```
-- Average cost per mile by time of day
SELECT
    time_of_day,
    COUNT(*) AS trip_count,
    ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile,
    ROUND(AVG(tip_amount), 2) AS avg_tip
FROM fsxn_taxi_demo.taxi_data_transformed
GROUP BY time_of_day
ORDER BY trip_count DESC
```

```
-- Busiest pickup locations during morning rush
SELECT
    PULocationID AS pickup_location,
    COUNT(*) AS trip_count,
    ROUND(AVG(trip_distance), 2) AS avg_distance
FROM fsxn_taxi_demo.taxi_data_transformed
WHERE time_of_day = 'morning'
GROUP BY PULocationID
ORDER BY trip_count DESC
LIMIT 10
```

由於資料由 分割`time_of_day`，因此第二個查詢只會掃描`morning`分割區，減少資料讀取量並改善查詢效能。

## 考量事項
<a name="tutorial-glue-considerations"></a>
+ **需要網際網路原始伺服器。** AWS Glue 工作會從 VPC 外部的受管基礎設施存取 Amazon S3。您必須使用網際網路來源存取點。
+ **Read and write.** AWS Glue ETL 任務可以透過存取點讀取和寫入 FSx for ONTAP 磁碟區。存取點政策和檔案系統使用者必須同時允許 `s3:GetObject`和 `s3:PutObject`。
+ **工作者大小。** AWS Glue 工作者的數量和類型會影響工作效能和成本。對於 48 MB 範例資料集，兩個 G.1X 工作者就足夠了。對於較大的資料集，請增加工作者計數或使用 G.2X 工作者。
+ **分割。**寫入分割輸出可改善 Athena 和其他分析服務的下游查詢效能。根據通常查詢資料的方式選擇分割區索引鍵。
+ **指令碼 storage.loads** AWS Glue ETL 指令碼會在任務啟動時從 Amazon S3 載入。本教學課程會將指令碼存放在存取點上，讓指令碼與資料一起運作，但您也可以將其託管在標準 Amazon S3 儲存貯體中。如果您使用獨立儲存貯體，請在指令碼儲存貯體 ARN `s3:GetObject` 上使用 延伸角色的內嵌政策。

## 清除
<a name="tutorial-glue-clean-up"></a>

若要避免持續收費，請刪除您在本教學課程中建立的資源。

在 Athena 查詢編輯器中，捨棄爬蟲程式建立的資料表：

```
DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
```

```
$ # Delete the Glue job and crawler
aws glue delete-job --name {{fsxn-taxi-transform}}
aws glue delete-crawler --name {{fsxn-taxi-transformed-crawler}}

# Delete the ETL script and transformed data from the access point
aws s3 rm s3://{{my-ap-alias-ext-s3alias}}/glue-scripts/taxi_transform.py
aws s3 rm s3://{{my-ap-alias-ext-s3alias}}/taxi-data-transformed/ --recursive

# Delete the IAM role
aws iam delete-role-policy \
    --role-name {{fsxn-tutorial-glue-etl-role}} \
    --policy-name glue-fsxn-access
aws iam delete-role --role-name {{fsxn-tutorial-glue-etl-role}}
```