

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# を使用して ETL パイプラインを構築する AWS Glue
<a name="tutorial-transform-data-with-glue"></a>

データエンジニアリングチームは、多くの場合、アプリケーションからの FSx for ONTAP ボリュームへの未加工データランディング、毎日のファイルドロップ、NFS または SMB 経由のパートナー統合を行います。ダウンストリーム分析用にデータを準備するには、データを大規模に読み取り、変換、強化、再パーティショニングし、厳選された出力をアナリストやアプリケーションが利用できるようにする必要があります。

Amazon S3 アクセスポイントを FSx for ONTAP ボリュームにアタッチすると、 はソースデータを AWS Glue 読み取り、ランタイム (Apache Spark、Python シェル、または Ray) を選択して変換し、キュレートされた出力を同じボリュームに書き込みます。raw データセットとキュレートされたデータセットの両方が FSx for ONTAP に保持されるため、ボリュームのスナップショット、バックアップ、保持ポリシーはパイプライン全体に均一に適用されます。FSx for ONTAP ボリュームは NFS、SMB、および Amazon S3 API から同時にアクセスできるため、未加工データは NFS または SMB クライアントによって生成され、キュレートされた出力はこれらのプロトコルのいずれかによって消費されます。

このチュートリアルでは、[Amazon Athena を使用して SQL でファイルをクエリする](tutorial-query-data-with-athena.md)チュートリアルの NYC Taxi トリップデータセットを使用します。 AWS Glue ETL ジョブは、未加工の Parquet データを読み取り、計算された列を追加し、無効なレコードをフィルタリングして、変換された出力を時刻でパーティション分割されたボリュームに書き込みます。

**注記**  
このチュートリアルの所要時間は約 **25～35 分**です。 AWS のサービス 使用する には、作成したリソースの料金が発生します。**クリーンアップ**セクションを含むすべてのステップをすぐに完了すると、米国東部 (バージニア北部) の予想コストは **1 USD** 未満になります。 AWS リージョンこの見積もりには、FSx for ONTAP ボリューム自体の継続的な料金は含まれません。

## 前提条件
<a name="tutorial-glue-prerequisites"></a>

作業を開始する前に、次の項目があることを確認します。
+ [Amazon Athena を使用して SQL でファイルをクエリする](tutorial-query-data-with-athena.md) チュートリアルのステップ 1～3 を完了します。この手順では、NYC Taxi データセットをアクセスポイントにアップロードし、 に`fsxn_taxi_demo`データベースを作成し AWS Glue Data Catalog、`taxi_data`テーブルを登録します。このチュートリアルはこれらのリソースに基づいて構築されているため、このチュートリアルを完了するまで Athena チュートリアルの**クリーンアップ**セクションを実行しないでください。
+ CloudWatch Logs への書き込みアクセス、アクセスポイントへの読み取り/書き込みアクセス、およびこのチュートリアルで使用される AWS Glue Data Catalog データベースへのアクセスを許可するインラインポリシー AWS Glue を持つ の IAM ロール。次の手順では、このチュートリアルに必要な最小限のアクセス許可を持つロールを作成します。

  1. 次の信頼ポリシーを として保存します`glue-trust-policy.json`。これにより、 AWS Glue がロールを引き受けることができます。

     ```
     {
         "Version": "2012-10-17", 		 	 	 
         "Statement": [
             {
                 "Effect": "Allow",
                 "Principal": {"Service": "glue.amazonaws.com"},
                 "Action": "sts:AssumeRole"
             }
         ]
     }
     ```

  1. 次のアクセス許可ポリシーを として保存します`glue-permissions.json`。`{{region}}`、`{{account-id}}`、 を自分の値`{{access-point-name}}`に置き換えます。

     ```
     {
         "Version": "2012-10-17", 		 	 	 
         "Statement": [
             {
                 "Sid": "Logs",
                 "Effect": "Allow",
                 "Action": [
                     "logs:CreateLogGroup",
                     "logs:CreateLogStream",
                     "logs:PutLogEvents"
                 ],
                 "Resource": "arn:aws:logs:{{region}}:{{account-id}}:log-group:/aws-glue/*"
             },
             {
                 "Sid": "AccessPoint",
                 "Effect": "Allow",
                 "Action": [
                     "s3:GetObject",
                     "s3:PutObject",
                     "s3:ListBucket",
                     "s3:DeleteObject"
                 ],
                 "Resource": [
                     "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}",
                     "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
                 ]
             },
             {
                 "Sid": "DataCatalog",
                 "Effect": "Allow",
                 "Action": [
                     "glue:GetDatabase",
                     "glue:GetTable",
                     "glue:GetTables",
                     "glue:CreateTable",
                     "glue:UpdateTable",
                     "glue:DeleteTable",
                     "glue:BatchCreatePartition",
                     "glue:BatchDeletePartition",
                     "glue:CreatePartition",
                     "glue:UpdatePartition",
                     "glue:GetPartition",
                     "glue:GetPartitions"
                 ],
                 "Resource": [
                     "arn:aws:glue:{{region}}:{{account-id}}:catalog",
                     "arn:aws:glue:{{region}}:{{account-id}}:database/fsxn_taxi_demo",
                     "arn:aws:glue:{{region}}:{{account-id}}:table/fsxn_taxi_demo/*"
                 ]
             }
         ]
     }
     ```

  1. ロールを作成し、インラインポリシーをアタッチします。

     ```
     $ aws iam create-role \
         --role-name {{fsxn-tutorial-glue-etl-role}} \
         --assume-role-policy-document file://glue-trust-policy.json
     
     aws iam put-role-policy \
         --role-name {{fsxn-tutorial-glue-etl-role}} \
         --policy-name glue-fsxn-access \
         --policy-document file://glue-permissions.json
     ```

  このチュートリアルでは、アクセスポイント自体に ETL スクリプトを保存するため、個別の Amazon S3 バケットは必要ありません。`AccessPoint` ステートメントは、スクリプトとタクシーデータの両方を対象としています。`DataCatalog`ステートメントは AWS Glue 、ステップ 4 でクローラーが更新する`fsxn_taxi_demo`データベースへのカタログアクセスをスコープします。

**重要**  
Amazon S3 アクセスポイントは、インターネットネットワークオリジンを使用する必要があります。 AWS Glue ジョブは、VPC からではなく、マネージドインフラストラクチャから Amazon S3 にアクセスします。

## ステップ 1: ETL スクリプトを作成する
<a name="tutorial-glue-create-script"></a>

次の PySpark スクリプトは、FSx for ONTAP ボリュームから未加工のタクシー旅行データを読み取り、変換を適用し、その結果をボリュームに書き込みます。このスクリプトを として保存します`taxi_transform.py`。

```
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round

args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

ap_alias = args['AP_ALIAS']

# Read raw taxi data from FSx through the access point
df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/")

# Transform: filter invalid records, add computed columns
transformed = df \
    .filter(col("trip_distance") > 0) \
    .filter(col("total_amount") > 0) \
    .filter(col("passenger_count") > 0) \
    .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \
    .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \
    .withColumn("cost_per_mile",
        spark_round(col("total_amount") / col("trip_distance"), 2)) \
    .withColumn("time_of_day",
        when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning")
        .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon")
        .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening")
        .otherwise("night")
    ) \
    .select(
        "tpep_pickup_datetime", "tpep_dropoff_datetime",
        "passenger_count", "trip_distance",
        "PULocationID", "DOLocationID",
        "fare_amount", "tip_amount", "total_amount",
        "pickup_hour", "pickup_day_of_week",
        "cost_per_mile", "time_of_day"
    )

# Write transformed data back to FSx, partitioned by time of day
transformed.write \
    .mode("overwrite") \
    .partitionBy("time_of_day") \
    .parquet(f"s3://{ap_alias}/taxi-data-transformed/")

job.commit()
```

スクリプトは次の変換を実行します。
+ ゼロまたは負の旅行距離、運賃、または乗客数を持つレコードを**フィルタリングします**。
+ **計算された列 、、、および (午前、午後、夕方、または夜間) を追加します**。 `pickup_hour` `pickup_day_of_week` `cost_per_mile` `time_of_day`
+ 分析に関連する列のサブセット**を選択します**。
+ 出力を **でパーティション分割**します。これにより`time_of_day`、期間でフィルタリングする際のクエリパフォーマンスが向上します。

## ステップ 2: スクリプトをアップロードしてジョブを作成する
<a name="tutorial-glue-create-job"></a>

アクセスポイントを介して FSx for ONTAP ボリュームに ETL スクリプトをアップロードし、それを参照する AWS Glue ジョブを作成します。 は、標準の Amazon S3 バケットからスクリプトを AWS Glue ロードするのと同じ方法で、ジョブの起動時にアクセスポイントからスクリプトをロードします。

```
$ # Upload the script to the access point
aws s3 cp taxi_transform.py \
    s3://{{my-ap-alias-ext-s3alias}}/glue-scripts/taxi_transform.py

# Create the Glue job
aws glue create-job \
    --name {{fsxn-taxi-transform}} \
    --role {{my-glue-role-arn}} \
    --command '{
        "Name": "glueetl",
        "ScriptLocation": "s3://{{my-ap-alias-ext-s3alias}}/glue-scripts/taxi_transform.py",
        "PythonVersion": "3"
    }' \
    --default-arguments '{
        "--AP_ALIAS": "{{my-ap-alias-ext-s3alias}}",
        "--job-language": "python"
    }' \
    --glue-version "4.0" \
    --number-of-workers 2 \
    --worker-type "G.1X"
```

## ステップ 3: ジョブを実行する
<a name="tutorial-glue-run-job"></a>

```
$ aws glue start-job-run --job-name {{fsxn-taxi-transform}}
```

ジョブのステータスをモニタリングします。通常、ジョブは 2 つの G.1X ワーカーを使用して 1～2 分で完了します。

```
$ aws glue get-job-runs --job-name {{fsxn-taxi-transform}} \
    --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"
```

ジョブが完了したら、FSx for ONTAP ボリュームで変換された出力を確認します。

```
$ aws s3 ls s3://{{my-ap-alias-ext-s3alias}}/taxi-data-transformed/
                           PRE time_of_day=afternoon/
                           PRE time_of_day=evening/
                           PRE time_of_day=morning/
                           PRE time_of_day=night/
```

出力は、時刻ごとに 4 つのディレクトリに分割されます。各パーティションには、変換されたデータを含む Parquet ファイルが含まれています。

## ステップ 4: 変換されたデータをクエリする
<a name="tutorial-glue-query-transformed"></a>

変換された出力で AWS Glue クローラを実行して に登録し AWS Glue Data Catalog、Athena でクエリを実行します。

```
$ # Create a crawler for the transformed data
aws glue create-crawler \
    --name {{fsxn-taxi-transformed-crawler}} \
    --role {{my-glue-role-arn}} \
    --database-name {{fsxn_taxi_demo}} \
    --targets '{"S3Targets": [{"Path": "s3://{{my-ap-alias-ext-s3alias}}/taxi-data-transformed/"}]}'

# Run the crawler
aws glue start-crawler --name {{fsxn-taxi-transformed-crawler}}
```

クローラが完了したら、Athena で変換されたデータをクエリします。パーティション構造により、Athena は関連するパーティションのみをスキャンできます。

```
-- Average cost per mile by time of day
SELECT
    time_of_day,
    COUNT(*) AS trip_count,
    ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile,
    ROUND(AVG(tip_amount), 2) AS avg_tip
FROM fsxn_taxi_demo.taxi_data_transformed
GROUP BY time_of_day
ORDER BY trip_count DESC
```

```
-- Busiest pickup locations during morning rush
SELECT
    PULocationID AS pickup_location,
    COUNT(*) AS trip_count,
    ROUND(AVG(trip_distance), 2) AS avg_distance
FROM fsxn_taxi_demo.taxi_data_transformed
WHERE time_of_day = 'morning'
GROUP BY PULocationID
ORDER BY trip_count DESC
LIMIT 10
```

データは によってパーティション化されるため`time_of_day`、2 番目のクエリは`morning`パーティションのみをスキャンするため、読み取りデータの量が減少し、クエリのパフォーマンスが向上します。

## 考慮事項
<a name="tutorial-glue-considerations"></a>
+ **インターネットオリジンが必須。** AWS Glue ジョブは VPC 外のマネージドインフラストラクチャから Amazon S3 にアクセスします。インターネットオリジンアクセスポイントを使用する必要があります。
+ **読み取りと書き込み.** AWS Glue ETL ジョブは、アクセスポイントを介して FSx for ONTAP ボリュームとの間で読み取りと書き込みの両方を行うことができます。アクセスポイントポリシーとファイルシステムユーザーは、 `s3:GetObject`と の両方を許可する必要があります`s3:PutObject`。
+ **ワーカーのサイズ設定。** AWS Glue ワーカーの数とタイプは、ジョブのパフォーマンスとコストに影響します。48 MB のサンプルデータセットでは、2 つの G.1X ワーカーで十分です。データセットが大きい場合は、ワーカー数を増やすか、G.2X ワーカーを使用します。
+ **パーティション分割。**パーティション化された出力を記述すると、Athena やその他の分析サービスのダウンストリームクエリパフォーマンスが向上します。データの通常のクエリ方法に基づいてパーティションキーを選択します。
+ **Script storage.** AWS Glue loads ジョブの起動時に Amazon S3 から ETL スクリプトをロードします。このチュートリアルでは、スクリプトをアクセスポイントに保存して、スクリプトがデータと一緒に保存されるようにしますが、標準の Amazon S3 バケットでホストすることもできます。スタンドアロンバケットを使用する場合は、スクリプトバケット ARN `s3:GetObject`の を使用してロールのインラインポリシーを拡張します。

## クリーンアップ
<a name="tutorial-glue-clean-up"></a>

継続的な課金を回避するには、このチュートリアルで作成したリソースを削除します。

Athena クエリエディタで、クローラによって作成されたテーブルを削除します。

```
DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
```

```
$ # Delete the Glue job and crawler
aws glue delete-job --name {{fsxn-taxi-transform}}
aws glue delete-crawler --name {{fsxn-taxi-transformed-crawler}}

# Delete the ETL script and transformed data from the access point
aws s3 rm s3://{{my-ap-alias-ext-s3alias}}/glue-scripts/taxi_transform.py
aws s3 rm s3://{{my-ap-alias-ext-s3alias}}/taxi-data-transformed/ --recursive

# Delete the IAM role
aws iam delete-role-policy \
    --role-name {{fsxn-tutorial-glue-etl-role}} \
    --policy-name glue-fsxn-access
aws iam delete-role --role-name {{fsxn-tutorial-glue-etl-role}}
```