View a markdown version of this page

使用 建置 ETL 管道 AWS Glue - FSx for OnTAP

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 建置 ETL 管道 AWS Glue

資料工程團隊通常具有來自應用程式、每日檔案捨棄或合作夥伴透過 NFS 或 SMB 整合之 ONTAP 磁碟區的 FSx 上的原始資料登陸。準備下游分析的資料需要大規模讀取、轉換、擴充或重新分割資料,並讓分析師和應用程式可以使用精心策劃的輸出。

將 Amazon S3 存取點連接到 FSx for ONTAP 磁碟區, AWS Glue 讀取來源資料,使用您選擇的執行時間 (Apache Spark、Python shell 或 Ray) 進行轉換,並將精選的輸出寫回相同的磁碟區。原始和精選資料集都會保留在 FSx for ONTAP 上,因此磁碟區的快照、備份和保留政策會統一套用至整個管道。由於 FSx for ONTAP 磁碟區可透過 NFS、SMB 和 Amazon S3 API 同時存取,因此原始資料可由 NFS 或 SMB 用戶端產生,而任何這些通訊協定都可以使用精選的輸出。

在本教學課程中,您會使用使用 Amazon Athena 查詢具有 SQL 的檔案教學課程中的 NYC Taxi 行程資料集。 AWS Glue ETL 任務會讀取原始 Parquet 資料、新增運算的資料欄、篩選無效的記錄,以及將轉換後的輸出寫回依當日時間分割的磁碟區。

注意

本教學課程約需 25 到 35 分鐘完成。 AWS 服務 使用的 會針對您建立的資源產生費用。如果您立即完成所有步驟,包括清除區段,美國東部 (維吉尼亞北部) 的預期成本不到 1 美元。 AWS 區域此預估不包含 FSx for ONTAP 磁碟區本身的持續費用。

先決條件

開始前,請確定您具有下列項目:

  • 完成使用 Amazon Athena 查詢具有 SQL 的檔案教學課程的步驟 1 到 3。此程序會將 NYC Taxi 資料集上傳至存取點,在 中建立fsxn_taxi_demo資料庫 AWS Glue Data Catalog,並註冊taxi_data資料表。本教學課程以這些資源為基礎,因此在完成本教學課程之前,請勿執行 Athena 教學課程的清除區段。

  • 的 IAM 角色 AWS Glue 具有內嵌政策,可授予 CloudWatch Logs 的寫入存取權、存取點的讀取/寫入存取權,以及本教學課程使用的 AWS Glue Data Catalog 資料庫存取權。下列步驟會建立具有本教學課程所需最低許可的角色。

    1. 將下列信任政策儲存為 glue-trust-policy.json。它允許 AWS Glue 擔任該角色。

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }
    2. 將下列許可政策儲存為 glue-permissions.json。將 regionaccount-id和 取代access-point-name為您的值。

      { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:account-id:log-group:/aws-glue/*" }, { "Sid": "AccessPoint", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:ListBucket", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "DataCatalog", "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetTable", "glue:GetTables", "glue:CreateTable", "glue:UpdateTable", "glue:DeleteTable", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions" ], "Resource": [ "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/fsxn_taxi_demo", "arn:aws:glue:region:account-id:table/fsxn_taxi_demo/*" ] } ] }
    3. 建立角色並連接內嵌政策。

      $ aws iam create-role \ --role-name fsxn-tutorial-glue-etl-role \ --assume-role-policy-document file://glue-trust-policy.json aws iam put-role-policy \ --role-name fsxn-tutorial-glue-etl-role \ --policy-name glue-fsxn-access \ --policy-document file://glue-permissions.json

    本教學課程會將 ETL 指令碼存放在存取點本身,因此不需要單獨的 Amazon S3 儲存貯體。AccessPoint 陳述式涵蓋指令碼和計程車資料;陳述DataCatalog式會 AWS Glue 範圍目錄存取步驟 4 中爬蟲程式更新的fsxn_taxi_demo資料庫。

重要

Amazon S3 存取點必須使用網際網路原始伺服器。 AWS Glue 工作必須從受管基礎設施存取 Amazon S3,而不是從您的 VPC 存取。

步驟 1:建立 ETL 指令碼

下列 PySpark 指令碼會從 FSx 讀取 ONTAP 磁碟區的原始計程車行程資料、套用轉換,並將結果寫入磁碟區。將此指令碼儲存為 taxi_transform.py

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ap_alias = args['AP_ALIAS'] # Read raw taxi data from FSx through the access point df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/") # Transform: filter invalid records, add computed columns transformed = df \ .filter(col("trip_distance") > 0) \ .filter(col("total_amount") > 0) \ .filter(col("passenger_count") > 0) \ .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \ .withColumn("cost_per_mile", spark_round(col("total_amount") / col("trip_distance"), 2)) \ .withColumn("time_of_day", when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning") .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon") .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening") .otherwise("night") ) \ .select( "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "PULocationID", "DOLocationID", "fare_amount", "tip_amount", "total_amount", "pickup_hour", "pickup_day_of_week", "cost_per_mile", "time_of_day" ) # Write transformed data back to FSx, partitioned by time of day transformed.write \ .mode("overwrite") \ .partitionBy("time_of_day") \ .parquet(f"s3://{ap_alias}/taxi-data-transformed/") job.commit()

指令碼會執行下列轉換:

  • 篩選零或負行程距離、票價或乘客計數的記錄。

  • 新增運算的資料欄:pickup_hourcost_per_milepickup_day_of_weektime_of_day(上午、下午、晚上或晚上)。

  • 選取與分析相關的資料欄子集。

  • 分割輸出time_of_day,這可改善依時段篩選時的查詢效能。

步驟 2:上傳指令碼並建立任務

透過存取點將 ETL 指令碼上傳至 FSx for ONTAP 磁碟區,並建立 AWS Glue 參考的任務。在任務啟動時從存取點 AWS Glue 載入指令碼,就像從標準 Amazon S3 儲存貯體載入指令碼一樣。

$ # Upload the script to the access point aws s3 cp taxi_transform.py \ s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py # Create the Glue job aws glue create-job \ --name fsxn-taxi-transform \ --role my-glue-role-arn \ --command '{ "Name": "glueetl", "ScriptLocation": "s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py", "PythonVersion": "3" }' \ --default-arguments '{ "--AP_ALIAS": "my-ap-alias-ext-s3alias", "--job-language": "python" }' \ --glue-version "4.0" \ --number-of-workers 2 \ --worker-type "G.1X"

步驟 3:執行任務

$ aws glue start-job-run --job-name fsxn-taxi-transform

監控任務狀態。任務通常會在一到兩分鐘內完成,其中包含兩個 G.1X 工作者。

$ aws glue get-job-runs --job-name fsxn-taxi-transform \ --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"

當任務完成時,請驗證 FSx for ONTAP 磁碟區上的轉換輸出。

$ aws s3 ls s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ PRE time_of_day=afternoon/ PRE time_of_day=evening/ PRE time_of_day=morning/ PRE time_of_day=night/

輸出會依一天中的時間分割成四個目錄。每個分割區都包含具有轉換資料的 Parquet 檔案。

步驟 4:查詢轉換的資料

在轉換的輸出上執行 AWS Glue 爬蟲程式以在 中註冊, AWS Glue Data Catalog然後向 Athena 查詢。

$ # Create a crawler for the transformed data aws glue create-crawler \ --name fsxn-taxi-transformed-crawler \ --role my-glue-role-arn \ --database-name fsxn_taxi_demo \ --targets '{"S3Targets": [{"Path": "s3://my-ap-alias-ext-s3alias/taxi-data-transformed/"}]}' # Run the crawler aws glue start-crawler --name fsxn-taxi-transformed-crawler

爬蟲程式完成後,在 Athena 中查詢轉換的資料。分割的結構可讓 Athena 僅掃描相關的分割區。

-- Average cost per mile by time of day SELECT time_of_day, COUNT(*) AS trip_count, ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile, ROUND(AVG(tip_amount), 2) AS avg_tip FROM fsxn_taxi_demo.taxi_data_transformed GROUP BY time_of_day ORDER BY trip_count DESC
-- Busiest pickup locations during morning rush SELECT PULocationID AS pickup_location, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance FROM fsxn_taxi_demo.taxi_data_transformed WHERE time_of_day = 'morning' GROUP BY PULocationID ORDER BY trip_count DESC LIMIT 10

由於資料由 分割time_of_day,因此第二個查詢只會掃描morning分割區,減少資料讀取量並改善查詢效能。

考量事項

  • 需要網際網路原始伺服器。 AWS Glue 工作會從 VPC 外部的受管基礎設施存取 Amazon S3。您必須使用網際網路來源存取點。

  • Read and write. AWS Glue ETL 任務可以透過存取點讀取和寫入 FSx for ONTAP 磁碟區。存取點政策和檔案系統使用者必須同時允許 s3:GetObjects3:PutObject

  • 工作者大小。 AWS Glue 工作者的數量和類型會影響工作效能和成本。對於 48 MB 範例資料集,兩個 G.1X 工作者就足夠了。對於較大的資料集,請增加工作者計數或使用 G.2X 工作者。

  • 分割。寫入分割輸出可改善 Athena 和其他分析服務的下游查詢效能。根據通常查詢資料的方式選擇分割區索引鍵。

  • 指令碼 storage.loads AWS Glue ETL 指令碼會在任務啟動時從 Amazon S3 載入。本教學課程會將指令碼存放在存取點上,讓指令碼與資料一起運作,但您也可以將其託管在標準 Amazon S3 儲存貯體中。如果您使用獨立儲存貯體,請在指令碼儲存貯體 ARN s3:GetObject 上使用 延伸角色的內嵌政策。

清除

若要避免持續收費,請刪除您在本教學課程中建立的資源。

在 Athena 查詢編輯器中,捨棄爬蟲程式建立的資料表:

DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
$ # Delete the Glue job and crawler aws glue delete-job --name fsxn-taxi-transform aws glue delete-crawler --name fsxn-taxi-transformed-crawler # Delete the ETL script and transformed data from the access point aws s3 rm s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py aws s3 rm s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ --recursive # Delete the IAM role aws iam delete-role-policy \ --role-name fsxn-tutorial-glue-etl-role \ --policy-name glue-fsxn-access aws iam delete-role --role-name fsxn-tutorial-glue-etl-role