View a markdown version of this page

使用 Amazon EMR Serverless 執行 Spark 任務 - FSx for OnTAP

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Amazon EMR Serverless 執行 Spark 任務

執行 Spark 工作負載的資料工程團隊 — 用於日誌處理、功能工程、複雜 ETL 或科學分析 — 通常具有由內部部署擷取管道、NFS 或 SMB 資料移動器或直接掛載磁碟區的應用程式所撰寫的 FSx for ONTAP 磁碟區的來源資料。

將 Amazon S3 存取點連接到磁碟區後,Amazon EMR Serverless 會透過存取點讀取資料、對其執行 Spark 任務,並將結果寫回相同的磁碟區。Amazon EMR Serverless 會自動處理叢集生命週期 — 您提交任務並支付其執行的秒數。

此模式適合需要完整 Spark 執行期 (自訂程式庫、反覆運算演算法、長時間執行轉換或透過 Amazon EMR Studio 互動式筆記本) 的工作負載,其中輕量型選項:適用於 SQL 的 Amazon Athena 和適用於受管 ETL 的 Amazon Athena 不適合。 AWS Glue 如需這些替代方案的資訊,請參閱 使用 Amazon Athena 查詢具有 SQL 的檔案使用 建置 ETL 管道 AWS Glue

在本教學課程中,您會模擬氣象團隊彙總一年的 NOAA Global Surface Summary of the Day (GSOD) 觀察,這些觀察會暫存在 FSx for ONTAP 磁碟區上。您提交 PySpark 任務,該任務會讀取原始 CSV 檔案、運算每月每站彙總 (平均溫度、總降水量,以及具有降水事件的天數),並將結果寫入為按月分割的 Parquet,所有這些都是透過存取點。

注意

本教學課程約需 30 到 40 分鐘完成。 AWS 服務 使用的 會針對您建立的資源產生費用。如果您立即完成所有步驟,包括清除區段,美國東部 (維吉尼亞北部) 的預期成本不到 1 美元。 AWS 區域此預估不包含 FSx for ONTAP 磁碟區本身的持續費用。

先決條件

  • 連接 Amazon S3 存取點的 FSx for ONTAP 磁碟區。存取點必須具有網際網路原始伺服器,以便 Amazon EMR Serverless 服務可以連接它。如需說明,請參閱建立存取點

  • AWS CLI 第 2 版安裝並設定了可建立 IAM 角色和 Amazon EMR Serverless 資源的登入資料。

步驟 1:將範例資料集上傳到存取點

NOAA GSOD 資料集是每日天氣觀察的公有資料集,每個工作站每年一個 CSV 檔案。在本教學課程中,您會從公有 Amazon S3 noaa-gsod-pds 儲存貯體下載 100 個工作站子集,並將其上傳至您的存取點。

  1. 下載 2024 年前 100 個工作站檔案。

    $ mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -l

    命令會下載約 100 個 CSV 檔案,總計約 7–8 MB。

  2. 將檔案上傳至字gsod/2024/首下方的存取點。將access-point-alias取代為您的存取點別名。

    $ aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors

步驟 2:寫入 PySpark 任務

任務會讀取輸入字首下的所有 CSV 檔案、篩選代表遺失資料的 sentinel 值、剖析FRSHTT位元欄位 (Fog、Rain、Snow、Hail、Thunder、Tornado) 以計算降雨事件天數、每個 的彙總(station, month),並將分割的 Parquet 寫入存取點。

  1. 將下列指令碼儲存至名為 的檔案gsod_monthly.py

    # gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop()
  2. 將指令碼上傳至字scripts/首下方的存取點。

    $ aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"

步驟 3:建立 Amazon EMR Serverless 任務角色

Amazon EMR Serverless 會在執行任務時擔任 IAM 執行角色。此角色需要許可,才能讀取和寫入存取點,以及將日誌寫入 CloudWatch Logs。展開下一節的設定步驟。

  1. 將下列信任政策儲存為 emr-trust-policy.json。它允許 Amazon EMR Serverless 擔任該角色。

    { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] }
  2. 將下列許可政策儲存為 emr-permissions.json。將 regionaccount-idaccess-point-name 取代為您的值。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] }
  3. 建立角色並連接政策。

    $ aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json

步驟 4:建立和啟動 Amazon EMR Serverless 應用程式

Amazon EMR Serverless 應用程式是特定發行標籤和引擎 (Spark 或 Hive) 的長期運算環境。您向其提交一或多個任務。應用程式會根據任務需求自動擴展和縮減運算,並在沒有任務執行時閒置。

  1. 使用最新的 Amazon EMR 版本建立 Spark 應用程式。

    $ aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0

    記下回應中的 applicationId

  2. 啟動應用程式。開始預熱一小群工作者,讓第一個任務在沒有冷啟動延遲的情況下執行。

    $ aws emr-serverless start-application --application-id application-id

    等待狀態變成 STARTED

    $ aws emr-serverless get-application --application-id application-id \ --query 'application.state'

步驟 5:提交 Spark 任務

使用應用程式 ID 和執行角色提交任務。任務會從 讀取原始 CSVsgsod-monthly/gsod/2024/並透過存取點將分割的 Parquet 寫入 。

  1. 將任務驅動程式組態儲存為 job-driver.json。取代預留位置。

    { "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } }
  2. 將下列監控組態儲存為 job-config.json。它會將驅動程式和執行器日誌傳送至 CloudWatch Logs。

    { "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } }
  3. 提交任務。

    $ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.json

    記下回應中的 jobRunId

  4. 輪詢任務狀態。任務會從 轉換為 SCHEDULED RUNNING SUCCESS

    $ aws emr-serverless get-job-run \ --application-id application-id \ --job-run-id job-run-id \ --query 'jobRun.state'
注意

如果任務失敗,請檢查日誌群組 下 CloudWatch Logs 中的驅動程式日誌/aws/emr-serverless/fsxn-emr-app。Amazon EMR Serverless 會在每次任務執行時寫入一個日誌串流。

步驟 6:檢查輸出

確認任務每個月撰寫一個 Parquet 分割區,而且輸出是可讀取的。

  1. 列出輸出分割區。

    $ aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursive

    每個month=YYYY-MM/分割區應該會看到一個 Parquet 檔案,加上根目錄的_SUCCESS標記。

  2. 在本機讀取分割區以驗證內容。

    $ aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"

    輸出結構描述包括 stationstation_namelatlonavg_temp_fmin_temp_fmax_temp_f、、precip_event_daystotal_prcp_inobservation_days

擴展模式

  • 使用 Spark SQL 查詢輸出。向 將分割的輸出註冊為資料表, AWS Glue Data Catalog 並使用 Spark SQL、Athena 或任何其他讀取 AWS Glue 目錄資料表的工具進行查詢。如需註冊存取點後端資料集的說明,請參閱 使用 Amazon Athena 查詢具有 SQL 的檔案

  • 使用 Iceberg 進行 ACID 寫入。對於更新或合併資料的工作負載,請將任務設定為寫入存取點上的 Iceberg 資料表,而非純 Parquet。Amazon EMR Serverless 預設會在最近的版本標籤上包含 Iceberg 執行時間。

  • 使用 Amazon EMR Studio 以互動方式執行 。將 Jupyter 筆記本連接至 Amazon EMR Serverless 應用程式,以互動方式探索資料。請參閱《Amazon EMR Serverless 使用者指南》中的使用 Amazon EMR Serverless 的互動式工作負載

  • 排程任務。使用 Amazon EventBridge 排程器或 AWS Step Functions 以週期性排程執行任務 (例如,當新的一天資料落入磁碟區時)。

疑難排解

存取點AccessDenied上的 任務失敗

確認任務角色政策在存取點 ARN s3:ListBucket (不在儲存貯體上) 上授予 s3:GetObject和 ,且存取點具有網際網路原始伺服器,以便 Amazon EMR Serverless 服務可以連接它。

任務成功,但輸出空白

檢查輸入路徑。Amazon S3 會在字面上ListObjectsV2處理字首,因此 s3://alias/gsod/2024(沒有尾斜線) 和 s3://alias/gsod/2024/(斜線) 的行為可能不同。指向檔案目錄時,請包含結尾斜線。

驅動程式日誌不在 CloudWatch Logs 中

監控組態必須在 --configuration-overrides上傳遞start-job-run,而不是在應用程式上傳遞。每個任務執行會在設定的日誌群組下寫入自己的日誌串流。

清除

停止並刪除應用程式、移除 IAM 角色,以及刪除您不再需要的任何上傳資料。

$ aws emr-serverless stop-application --application-id application-id aws emr-serverless delete-application --application-id application-id aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive