本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 建置 ETL 管道 AWS Glue
資料工程團隊通常具有來自應用程式、每日檔案捨棄或合作夥伴透過 NFS 或 SMB 整合之 ONTAP 磁碟區的 FSx 上的原始資料登陸。準備下游分析的資料需要大規模讀取、轉換、擴充或重新分割資料,並讓分析師和應用程式可以使用精心策劃的輸出。
將 Amazon S3 存取點連接到 FSx for ONTAP 磁碟區, AWS Glue 讀取來源資料,使用您選擇的執行時間 (Apache Spark、Python shell 或 Ray) 進行轉換,並將精選的輸出寫回相同的磁碟區。原始和精選資料集都會保留在 FSx for ONTAP 上,因此磁碟區的快照、備份和保留政策會統一套用至整個管道。由於 FSx for ONTAP 磁碟區可透過 NFS、SMB 和 Amazon S3 API 同時存取,因此原始資料可由 NFS 或 SMB 用戶端產生,而任何這些通訊協定都可以使用精選的輸出。
在本教學課程中,您會使用使用 Amazon Athena 查詢具有 SQL 的檔案教學課程中的 NYC Taxi 行程資料集。 AWS Glue ETL 任務會讀取原始 Parquet 資料、新增運算的資料欄、篩選無效的記錄,以及將轉換後的輸出寫回依當日時間分割的磁碟區。
注意
本教學課程約需 25 到 35 分鐘完成。 AWS 服務 使用的 會針對您建立的資源產生費用。如果您立即完成所有步驟,包括清除區段,美國東部 (維吉尼亞北部) 的預期成本不到 1 美元。 AWS 區域此預估不包含 FSx for ONTAP 磁碟區本身的持續費用。
先決條件
開始前,請確定您具有下列項目:
完成使用 Amazon Athena 查詢具有 SQL 的檔案教學課程的步驟 1 到 3。此程序會將 NYC Taxi 資料集上傳至存取點,在 中建立
fsxn_taxi_demo資料庫 AWS Glue Data Catalog,並註冊taxi_data資料表。本教學課程以這些資源為基礎,因此在完成本教學課程之前,請勿執行 Athena 教學課程的清除區段。的 IAM 角色 AWS Glue 具有內嵌政策,可授予 CloudWatch Logs 的寫入存取權、存取點的讀取/寫入存取權,以及本教學課程使用的 AWS Glue Data Catalog 資料庫存取權。下列步驟會建立具有本教學課程所需最低許可的角色。
將下列信任政策儲存為
glue-trust-policy.json。它允許 AWS Glue 擔任該角色。{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }將下列許可政策儲存為
glue-permissions.json。將、region和 取代account-id為您的值。access-point-name{ "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:account-id:log-group:/aws-glue/*" }, { "Sid": "AccessPoint", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:ListBucket", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "DataCatalog", "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetTable", "glue:GetTables", "glue:CreateTable", "glue:UpdateTable", "glue:DeleteTable", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions" ], "Resource": [ "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/fsxn_taxi_demo", "arn:aws:glue:region:account-id:table/fsxn_taxi_demo/*" ] } ] }建立角色並連接內嵌政策。
$aws iam create-role \ --role-namefsxn-tutorial-glue-etl-role\ --assume-role-policy-document file://glue-trust-policy.json aws iam put-role-policy \ --role-namefsxn-tutorial-glue-etl-role\ --policy-name glue-fsxn-access \ --policy-document file://glue-permissions.json
本教學課程會將 ETL 指令碼存放在存取點本身,因此不需要單獨的 Amazon S3 儲存貯體。
AccessPoint陳述式涵蓋指令碼和計程車資料;陳述DataCatalog式會 AWS Glue 範圍目錄存取步驟 4 中爬蟲程式更新的fsxn_taxi_demo資料庫。
重要
Amazon S3 存取點必須使用網際網路原始伺服器。 AWS Glue 工作必須從受管基礎設施存取 Amazon S3,而不是從您的 VPC 存取。
步驟 1:建立 ETL 指令碼
下列 PySpark 指令碼會從 FSx 讀取 ONTAP 磁碟區的原始計程車行程資料、套用轉換,並將結果寫入磁碟區。將此指令碼儲存為 taxi_transform.py。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ap_alias = args['AP_ALIAS'] # Read raw taxi data from FSx through the access point df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/") # Transform: filter invalid records, add computed columns transformed = df \ .filter(col("trip_distance") > 0) \ .filter(col("total_amount") > 0) \ .filter(col("passenger_count") > 0) \ .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \ .withColumn("cost_per_mile", spark_round(col("total_amount") / col("trip_distance"), 2)) \ .withColumn("time_of_day", when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning") .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon") .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening") .otherwise("night") ) \ .select( "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "PULocationID", "DOLocationID", "fare_amount", "tip_amount", "total_amount", "pickup_hour", "pickup_day_of_week", "cost_per_mile", "time_of_day" ) # Write transformed data back to FSx, partitioned by time of day transformed.write \ .mode("overwrite") \ .partitionBy("time_of_day") \ .parquet(f"s3://{ap_alias}/taxi-data-transformed/") job.commit()
指令碼會執行下列轉換:
篩選零或負行程距離、票價或乘客計數的記錄。
新增運算的資料欄:
pickup_hour、cost_per_mile、pickup_day_of_week和time_of_day(上午、下午、晚上或晚上)。選取與分析相關的資料欄子集。
依 分割輸出
time_of_day,這可改善依時段篩選時的查詢效能。
步驟 2:上傳指令碼並建立任務
透過存取點將 ETL 指令碼上傳至 FSx for ONTAP 磁碟區,並建立 AWS Glue 參考的任務。在任務啟動時從存取點 AWS Glue 載入指令碼,就像從標準 Amazon S3 儲存貯體載入指令碼一樣。
$# Upload the script to the access point aws s3 cp taxi_transform.py \ s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py # Create the Glue job aws glue create-job \ --namefsxn-taxi-transform\ --rolemy-glue-role-arn\ --command '{ "Name": "glueetl", "ScriptLocation": "s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py", "PythonVersion": "3" }' \ --default-arguments '{ "--AP_ALIAS": "my-ap-alias-ext-s3alias", "--job-language": "python" }' \ --glue-version "4.0" \ --number-of-workers 2 \ --worker-type "G.1X"
步驟 3:執行任務
$aws glue start-job-run --job-namefsxn-taxi-transform
監控任務狀態。任務通常會在一到兩分鐘內完成,其中包含兩個 G.1X 工作者。
$aws glue get-job-runs --job-namefsxn-taxi-transform\ --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"
當任務完成時,請驗證 FSx for ONTAP 磁碟區上的轉換輸出。
$aws s3 ls s3://my-ap-alias-ext-s3alias/taxi-data-transformed/PRE time_of_day=afternoon/ PRE time_of_day=evening/ PRE time_of_day=morning/ PRE time_of_day=night/
輸出會依一天中的時間分割成四個目錄。每個分割區都包含具有轉換資料的 Parquet 檔案。
步驟 4:查詢轉換的資料
在轉換的輸出上執行 AWS Glue 爬蟲程式以在 中註冊, AWS Glue Data Catalog然後向 Athena 查詢。
$# Create a crawler for the transformed data aws glue create-crawler \ --namefsxn-taxi-transformed-crawler\ --rolemy-glue-role-arn\ --database-namefsxn_taxi_demo\ --targets '{"S3Targets": [{"Path": "s3://my-ap-alias-ext-s3alias/taxi-data-transformed/"}]}' # Run the crawler aws glue start-crawler --namefsxn-taxi-transformed-crawler
爬蟲程式完成後,在 Athena 中查詢轉換的資料。分割的結構可讓 Athena 僅掃描相關的分割區。
-- Average cost per mile by time of day SELECT time_of_day, COUNT(*) AS trip_count, ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile, ROUND(AVG(tip_amount), 2) AS avg_tip FROM fsxn_taxi_demo.taxi_data_transformed GROUP BY time_of_day ORDER BY trip_count DESC
-- Busiest pickup locations during morning rush SELECT PULocationID AS pickup_location, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance FROM fsxn_taxi_demo.taxi_data_transformed WHERE time_of_day = 'morning' GROUP BY PULocationID ORDER BY trip_count DESC LIMIT 10
由於資料由 分割time_of_day,因此第二個查詢只會掃描morning分割區,減少資料讀取量並改善查詢效能。
考量事項
需要網際網路原始伺服器。 AWS Glue 工作會從 VPC 外部的受管基礎設施存取 Amazon S3。您必須使用網際網路來源存取點。
Read and write. AWS Glue ETL 任務可以透過存取點讀取和寫入 FSx for ONTAP 磁碟區。存取點政策和檔案系統使用者必須同時允許
s3:GetObject和s3:PutObject。工作者大小。 AWS Glue 工作者的數量和類型會影響工作效能和成本。對於 48 MB 範例資料集,兩個 G.1X 工作者就足夠了。對於較大的資料集,請增加工作者計數或使用 G.2X 工作者。
分割。寫入分割輸出可改善 Athena 和其他分析服務的下游查詢效能。根據通常查詢資料的方式選擇分割區索引鍵。
指令碼 storage.loads AWS Glue ETL 指令碼會在任務啟動時從 Amazon S3 載入。本教學課程會將指令碼存放在存取點上,讓指令碼與資料一起運作,但您也可以將其託管在標準 Amazon S3 儲存貯體中。如果您使用獨立儲存貯體,請在指令碼儲存貯體 ARN
s3:GetObject上使用 延伸角色的內嵌政策。
清除
若要避免持續收費,請刪除您在本教學課程中建立的資源。
在 Athena 查詢編輯器中,捨棄爬蟲程式建立的資料表:
DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
$# Delete the Glue job and crawler aws glue delete-job --namefsxn-taxi-transformaws glue delete-crawler --namefsxn-taxi-transformed-crawler# Delete the ETL script and transformed data from the access point aws s3 rm s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py aws s3 rm s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ --recursive # Delete the IAM role aws iam delete-role-policy \ --role-namefsxn-tutorial-glue-etl-role\ --policy-name glue-fsxn-access aws iam delete-role --role-namefsxn-tutorial-glue-etl-role