View a markdown version of this page

を使用して ETL パイプラインを構築する AWS Glue - FSx for ONTAP

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

を使用して ETL パイプラインを構築する AWS Glue

データエンジニアリングチームは、多くの場合、アプリケーションからの FSx for ONTAP ボリュームへの未加工データランディング、毎日のファイルドロップ、NFS または SMB 経由のパートナー統合を行います。ダウンストリーム分析用にデータを準備するには、データを大規模に読み取り、変換、強化、再パーティショニングし、厳選された出力をアナリストやアプリケーションが利用できるようにする必要があります。

Amazon S3 アクセスポイントを FSx for ONTAP ボリュームにアタッチすると、 はソースデータを AWS Glue 読み取り、ランタイム (Apache Spark、Python シェル、または Ray) を選択して変換し、キュレートされた出力を同じボリュームに書き込みます。raw データセットとキュレートされたデータセットの両方が FSx for ONTAP に保持されるため、ボリュームのスナップショット、バックアップ、保持ポリシーはパイプライン全体に均一に適用されます。FSx for ONTAP ボリュームは NFS、SMB、および Amazon S3 API から同時にアクセスできるため、未加工データは NFS または SMB クライアントによって生成され、キュレートされた出力はこれらのプロトコルのいずれかによって消費されます。

このチュートリアルでは、Amazon Athena を使用して SQL でファイルをクエリするチュートリアルの NYC Taxi トリップデータセットを使用します。 AWS Glue ETL ジョブは、未加工の Parquet データを読み取り、計算された列を追加し、無効なレコードをフィルタリングして、変換された出力を時刻でパーティション分割されたボリュームに書き込みます。

注記

このチュートリアルの所要時間は約 25~35 分です。 AWS のサービス 使用する には、作成したリソースの料金が発生します。クリーンアップセクションを含むすべてのステップをすぐに完了すると、米国東部 (バージニア北部) の予想コストは 1 USD 未満になります。 AWS リージョンこの見積もりには、FSx for ONTAP ボリューム自体の継続的な料金は含まれません。

前提条件

作業を開始する前に、次の項目があることを確認します。

  • Amazon Athena を使用して SQL でファイルをクエリする チュートリアルのステップ 1~3 を完了します。この手順では、NYC Taxi データセットをアクセスポイントにアップロードし、 にfsxn_taxi_demoデータベースを作成し AWS Glue Data Catalog、taxi_dataテーブルを登録します。このチュートリアルはこれらのリソースに基づいて構築されているため、このチュートリアルを完了するまで Athena チュートリアルのクリーンアップセクションを実行しないでください。

  • CloudWatch Logs への書き込みアクセス、アクセスポイントへの読み取り/書き込みアクセス、およびこのチュートリアルで使用される AWS Glue Data Catalog データベースへのアクセスを許可するインラインポリシー AWS Glue を持つ の IAM ロール。次の手順では、このチュートリアルに必要な最小限のアクセス許可を持つロールを作成します。

    1. 次の信頼ポリシーを として保存しますglue-trust-policy.json。これにより、 AWS Glue がロールを引き受けることができます。

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }
    2. 次のアクセス許可ポリシーを として保存しますglue-permissions.jsonregionaccount-id、 を自分の値access-point-nameに置き換えます。

      { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:account-id:log-group:/aws-glue/*" }, { "Sid": "AccessPoint", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:ListBucket", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "DataCatalog", "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetTable", "glue:GetTables", "glue:CreateTable", "glue:UpdateTable", "glue:DeleteTable", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions" ], "Resource": [ "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/fsxn_taxi_demo", "arn:aws:glue:region:account-id:table/fsxn_taxi_demo/*" ] } ] }
    3. ロールを作成し、インラインポリシーをアタッチします。

      $ aws iam create-role \ --role-name fsxn-tutorial-glue-etl-role \ --assume-role-policy-document file://glue-trust-policy.json aws iam put-role-policy \ --role-name fsxn-tutorial-glue-etl-role \ --policy-name glue-fsxn-access \ --policy-document file://glue-permissions.json

    このチュートリアルでは、アクセスポイント自体に ETL スクリプトを保存するため、個別の Amazon S3 バケットは必要ありません。AccessPoint ステートメントは、スクリプトとタクシーデータの両方を対象としています。DataCatalogステートメントは AWS Glue 、ステップ 4 でクローラーが更新するfsxn_taxi_demoデータベースへのカタログアクセスをスコープします。

重要

Amazon S3 アクセスポイントは、インターネットネットワークオリジンを使用する必要があります。 AWS Glue ジョブは、VPC からではなく、マネージドインフラストラクチャから Amazon S3 にアクセスします。

ステップ 1: ETL スクリプトを作成する

次の PySpark スクリプトは、FSx for ONTAP ボリュームから未加工のタクシー旅行データを読み取り、変換を適用し、その結果をボリュームに書き込みます。このスクリプトを として保存しますtaxi_transform.py

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ap_alias = args['AP_ALIAS'] # Read raw taxi data from FSx through the access point df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/") # Transform: filter invalid records, add computed columns transformed = df \ .filter(col("trip_distance") > 0) \ .filter(col("total_amount") > 0) \ .filter(col("passenger_count") > 0) \ .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \ .withColumn("cost_per_mile", spark_round(col("total_amount") / col("trip_distance"), 2)) \ .withColumn("time_of_day", when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning") .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon") .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening") .otherwise("night") ) \ .select( "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "PULocationID", "DOLocationID", "fare_amount", "tip_amount", "total_amount", "pickup_hour", "pickup_day_of_week", "cost_per_mile", "time_of_day" ) # Write transformed data back to FSx, partitioned by time of day transformed.write \ .mode("overwrite") \ .partitionBy("time_of_day") \ .parquet(f"s3://{ap_alias}/taxi-data-transformed/") job.commit()

スクリプトは次の変換を実行します。

  • ゼロまたは負の旅行距離、運賃、または乗客数を持つレコードをフィルタリングします

  • 計算された列 、、、および (午前、午後、夕方、または夜間) を追加しますpickup_hour pickup_day_of_week cost_per_mile time_of_day

  • 分析に関連する列のサブセットを選択します

  • 出力を でパーティション分割します。これによりtime_of_day、期間でフィルタリングする際のクエリパフォーマンスが向上します。

ステップ 2: スクリプトをアップロードしてジョブを作成する

アクセスポイントを介して FSx for ONTAP ボリュームに ETL スクリプトをアップロードし、それを参照する AWS Glue ジョブを作成します。 は、標準の Amazon S3 バケットからスクリプトを AWS Glue ロードするのと同じ方法で、ジョブの起動時にアクセスポイントからスクリプトをロードします。

$ # Upload the script to the access point aws s3 cp taxi_transform.py \ s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py # Create the Glue job aws glue create-job \ --name fsxn-taxi-transform \ --role my-glue-role-arn \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py", "PythonVersion": "3" }' \ --default-arguments '{ "--AP_ALIAS": "my-ap-alias-ext-s3alias", "--job-language": "python" }' \ --glue-version "4.0" \ --number-of-workers 2 \ --worker-type "G.1X"

ステップ 3: ジョブを実行する

$ aws glue start-job-run --job-name fsxn-taxi-transform

ジョブのステータスをモニタリングします。通常、ジョブは 2 つの G.1X ワーカーを使用して 1~2 分で完了します。

$ aws glue get-job-runs --job-name fsxn-taxi-transform \ --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"

ジョブが完了したら、FSx for ONTAP ボリュームで変換された出力を確認します。

$ aws s3 ls s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ PRE time_of_day=afternoon/ PRE time_of_day=evening/ PRE time_of_day=morning/ PRE time_of_day=night/

出力は、時刻ごとに 4 つのディレクトリに分割されます。各パーティションには、変換されたデータを含む Parquet ファイルが含まれています。

ステップ 4: 変換されたデータをクエリする

変換された出力で AWS Glue クローラを実行して に登録し AWS Glue Data Catalog、Athena でクエリを実行します。

$ # Create a crawler for the transformed data aws glue create-crawler \ --name fsxn-taxi-transformed-crawler \ --role my-glue-role-arn \ --database-name fsxn_taxi_demo \ --targets '{"S3Targets": [{"Path": "s3://my-ap-alias-ext-s3alias/taxi-data-transformed/"}]}' # Run the crawler aws glue start-crawler --name fsxn-taxi-transformed-crawler

クローラが完了したら、Athena で変換されたデータをクエリします。パーティション構造により、Athena は関連するパーティションのみをスキャンできます。

-- Average cost per mile by time of day SELECT time_of_day, COUNT(*) AS trip_count, ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile, ROUND(AVG(tip_amount), 2) AS avg_tip FROM fsxn_taxi_demo.taxi_data_transformed GROUP BY time_of_day ORDER BY trip_count DESC
-- Busiest pickup locations during morning rush SELECT PULocationID AS pickup_location, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance FROM fsxn_taxi_demo.taxi_data_transformed WHERE time_of_day = 'morning' GROUP BY PULocationID ORDER BY trip_count DESC LIMIT 10

データは によってパーティション化されるためtime_of_day、2 番目のクエリはmorningパーティションのみをスキャンするため、読み取りデータの量が減少し、クエリのパフォーマンスが向上します。

考慮事項

  • インターネットオリジンが必須。 AWS Glue ジョブは VPC 外のマネージドインフラストラクチャから Amazon S3 にアクセスします。インターネットオリジンアクセスポイントを使用する必要があります。

  • 読み取りと書き込み. AWS Glue ETL ジョブは、アクセスポイントを介して FSx for ONTAP ボリュームとの間で読み取りと書き込みの両方を行うことができます。アクセスポイントポリシーとファイルシステムユーザーは、 s3:GetObjectと の両方を許可する必要がありますs3:PutObject

  • ワーカーのサイズ設定。 AWS Glue ワーカーの数とタイプは、ジョブのパフォーマンスとコストに影響します。48 MB のサンプルデータセットでは、2 つの G.1X ワーカーで十分です。データセットが大きい場合は、ワーカー数を増やすか、G.2X ワーカーを使用します。

  • パーティション分割。パーティション化された出力を記述すると、Athena やその他の分析サービスのダウンストリームクエリパフォーマンスが向上します。データの通常のクエリ方法に基づいてパーティションキーを選択します。

  • Script storage. AWS Glue loads ジョブの起動時に Amazon S3 から ETL スクリプトをロードします。このチュートリアルでは、スクリプトをアクセスポイントに保存して、スクリプトがデータと一緒に保存されるようにしますが、標準の Amazon S3 バケットでホストすることもできます。スタンドアロンバケットを使用する場合は、スクリプトバケット ARN s3:GetObjectの を使用してロールのインラインポリシーを拡張します。

クリーンアップ

継続的な課金を回避するには、このチュートリアルで作成したリソースを削除します。

Athena クエリエディタで、クローラによって作成されたテーブルを削除します。

DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
$ # Delete the Glue job and crawler aws glue delete-job --name fsxn-taxi-transform aws glue delete-crawler --name fsxn-taxi-transformed-crawler # Delete the ETL script and transformed data from the access point aws s3 rm s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py aws s3 rm s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ --recursive # Delete the IAM role aws iam delete-role-policy \ --role-name fsxn-tutorial-glue-etl-role \ --policy-name glue-fsxn-access aws iam delete-role --role-name fsxn-tutorial-glue-etl-role