

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Amazon EMR Serverless 執行 Spark 任務
<a name="tutorial-run-spark-with-emr-serverless"></a>

執行 Spark 工作負載的資料工程團隊 — 用於日誌處理、功能工程、複雜 ETL 或科學分析 — 通常具有由內部部署擷取管道、NFS 或 SMB 資料移動器或直接掛載磁碟區的應用程式所撰寫的 FSx for ONTAP 磁碟區的來源資料。

將 Amazon S3 存取點連接到磁碟區後，Amazon EMR Serverless 會透過存取點讀取資料、對其執行 Spark 任務，並將結果寫回相同的磁碟區。Amazon EMR Serverless 會自動處理叢集生命週期 — 您提交任務並支付其執行的秒數。

此模式適合需要完整 Spark 執行期 （自訂程式庫、反覆運算演算法、長時間執行轉換或透過 Amazon EMR Studio 互動式筆記本） 的工作負載，其中輕量型選項：適用於 SQL 的 Amazon Athena 和適用於受管 ETL 的 Amazon Athena 不適合。 AWS Glue 如需這些替代方案的資訊，請參閱 [使用 Amazon Athena 查詢具有 SQL 的檔案](tutorial-query-data-with-athena.md)和 [使用 建置 ETL 管道 AWS Glue](tutorial-transform-data-with-glue.md)。

在本教學課程中，您會模擬氣象團隊彙總一年的 NOAA Global Surface Summary of the Day (GSOD) 觀察，這些觀察會暫存在 FSx for ONTAP 磁碟區上。您提交 PySpark 任務，該任務會讀取原始 CSV 檔案、運算每月每站彙總 （平均溫度、總降水量，以及具有降水事件的天數），並將結果寫入為按月分割的 Parquet，所有這些都是透過存取點。

**注意**  
本教學課程約需 **30 到 40 分鐘**完成。 AWS 服務 使用的 會針對您建立的資源產生費用。如果您立即完成所有步驟，包括**清除**區段，美國東部 （維吉尼亞北部） 的預期成本不到 **1 美元**。 AWS 區域此預估不包含 FSx for ONTAP 磁碟區本身的持續費用。

## 先決條件
<a name="tutorial-emr-prerequisites"></a>
+ 連接 Amazon S3 存取點的 FSx for ONTAP 磁碟區。存取點必須具有**網際網路**原始伺服器，以便 Amazon EMR Serverless 服務可以連接它。如需說明，請參閱[建立存取點](fsxn-creating-access-points.md)。
+ AWS CLI 第 2 版安裝並設定了可建立 IAM 角色和 Amazon EMR Serverless 資源的登入資料。

## 步驟 1：將範例資料集上傳到存取點
<a name="tutorial-emr-upload-data"></a>

NOAA GSOD 資料集是每日天氣觀察的公有資料集，每個工作站每年一個 CSV 檔案。在本教學課程中，您會從公有 Amazon S3 `noaa-gsod-pds` 儲存貯體下載 100 個工作站子集，並將其上傳至您的存取點。

1. 下載 2024 年前 100 個工作站檔案。

   ```
   $ mkdir -p ~/gsod && cd ~/gsod
   aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt
   while read f; do
       aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors
   done < files.txt
   ls | wc -l
   ```

   命令會下載約 100 個 CSV 檔案，總計約 7–8 MB。

1. 將檔案上傳至字`gsod/2024/`首下方的存取點。將{{access-point-alias}}取代為您的存取點別名。

   ```
   $ aws s3 cp ~/gsod/ "s3://{{access-point-alias}}/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors
   ```

## 步驟 2：寫入 PySpark 任務
<a name="tutorial-emr-write-script"></a>

任務會讀取輸入字首下的所有 CSV 檔案、篩選代表遺失資料的 sentinel 值、剖析`FRSHTT`位元欄位 (Fog、Rain、Snow、Hail、Thunder、Tornado) 以計算降雨事件天數、每個 的彙總`(station, month)`，並將分割的 Parquet 寫入存取點。

1. 將下列指令碼儲存至名為 的檔案`gsod_monthly.py`。

   ```
   # gsod_monthly.py
   import sys
   from pyspark.sql import SparkSession
   from pyspark.sql import functions as F
   
   INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2]
   
   # GSOD sentinels for missing data
   TEMP_SENTINEL = 9999.9
   PRCP_SENTINEL = 99.99
   
   spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate()
   
   raw = spark.read.option("header", True).csv(INPUT_PATH)
   
   cleaned = (raw
       .select(
           F.col("STATION").alias("station"),
           F.col("NAME").alias("station_name"),
           F.col("LATITUDE").cast("double").alias("lat"),
           F.col("LONGITUDE").cast("double").alias("lon"),
           F.to_date("DATE", "yyyy-MM-dd").alias("date"),
           F.col("TEMP").cast("double").alias("temp_f"),
           F.col("PRCP").cast("double").alias("prcp_in"),
           F.col("FRSHTT").alias("frshtt"),
       )
       .filter(F.col("temp_f") != TEMP_SENTINEL)
       .withColumn("month", F.date_format("date", "yyyy-MM"))
       .withColumn(
           "prcp_in",
           F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")),
       )
       # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado.
       # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events.
       .withColumn(
           "had_precip_event",
           F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0),
       )
   )
   
   monthly = (cleaned
       .groupBy("station", "station_name", "lat", "lon", "month")
       .agg(
           F.avg("temp_f").alias("avg_temp_f"),
           F.min("temp_f").alias("min_temp_f"),
           F.max("temp_f").alias("max_temp_f"),
           F.sum("prcp_in").alias("total_prcp_in"),
           F.sum("had_precip_event").alias("precip_event_days"),
           F.count("*").alias("observation_days"),
       )
   )
   
   (monthly.write
       .mode("overwrite")
       .partitionBy("month")
       .parquet(OUTPUT_PATH))
   
   spark.stop()
   ```

1. 將指令碼上傳至字`scripts/`首下方的存取點。

   ```
   $ aws s3 cp gsod_monthly.py "s3://{{access-point-alias}}/scripts/gsod_monthly.py"
   ```

## 步驟 3：建立 Amazon EMR Serverless 任務角色
<a name="tutorial-emr-iam-role"></a>

Amazon EMR Serverless 會在執行任務時擔任 IAM 執行角色。此角色需要許可，才能讀取和寫入存取點，以及將日誌寫入 CloudWatch Logs。展開下一節的設定步驟。

### 建立 Amazon EMR Serverless 任務角色
<a name="tutorial-emr-iam-role-steps"></a>

1. 將下列信任政策儲存為 `emr-trust-policy.json`。它允許 Amazon EMR Serverless 擔任該角色。

   ```
   {
       "Version": "2012-10-17", 		 	 	 
       "Statement": [{
           "Effect": "Allow",
           "Principal": {"Service": "emr-serverless.amazonaws.com"},
           "Action": "sts:AssumeRole"
       }]
   }
   ```

1. 將下列許可政策儲存為 `emr-permissions.json`。將 {{region}}、{{account-id}} 和 {{access-point-name}} 取代為您的值。

   ```
   {
       "Version": "2012-10-17", 		 	 	 
       "Statement": [
           {
               "Sid": "Logs",
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents",
                   "logs:DescribeLogGroups",
                   "logs:DescribeLogStreams"
               ],
               "Resource": "*"
           },
           {
               "Sid": "APRead",
               "Effect": "Allow",
               "Action": ["s3:GetObject", "s3:ListBucket"],
               "Resource": [
                   "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}",
                   "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
               ]
           },
           {
               "Sid": "APWrite",
               "Effect": "Allow",
               "Action": [
                   "s3:PutObject", "s3:DeleteObject",
                   "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts"
               ],
               "Resource": "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
           }
       ]
   }
   ```

1. 建立角色並連接政策。

   ```
   $ aws iam create-role --role-name fsxn-emr-job-role \
       --assume-role-policy-document file://emr-trust-policy.json
   aws iam put-role-policy --role-name fsxn-emr-job-role \
       --policy-name emr-access --policy-document file://emr-permissions.json
   ```

## 步驟 4：建立和啟動 Amazon EMR Serverless 應用程式
<a name="tutorial-emr-create-app"></a>

Amazon EMR Serverless 應用程式是特定發行標籤和引擎 (Spark 或 Hive) 的長期運算環境。您向其提交一或多個任務。應用程式會根據任務需求自動擴展和縮減運算，並在沒有任務執行時閒置。

1. 使用最新的 Amazon EMR 版本建立 Spark 應用程式。

   ```
   $ aws emr-serverless create-application \
       --name fsxn-emr-app --type SPARK --release-label emr-7.0.0
   ```

   記下回應中的 `applicationId`。

1. 啟動應用程式。開始預熱一小群工作者，讓第一個任務在沒有冷啟動延遲的情況下執行。

   ```
   $ aws emr-serverless start-application --application-id {{application-id}}
   ```

   等待狀態變成 `STARTED`。

   ```
   $ aws emr-serverless get-application --application-id {{application-id}} \
       --query 'application.state'
   ```

## 步驟 5：提交 Spark 任務
<a name="tutorial-emr-submit-job"></a>

使用應用程式 ID 和執行角色提交任務。任務會從 讀取原始 CSVs`gsod-monthly/`，`gsod/2024/`並透過存取點將分割的 Parquet 寫入 。

1. 將任務驅動程式組態儲存為 `job-driver.json`。取代預留位置。

   ```
   {
       "sparkSubmit": {
           "entryPoint": "s3://{{access-point-alias}}/scripts/gsod_monthly.py",
           "entryPointArguments": [
               "s3://{{access-point-alias}}/gsod/2024/",
               "s3://{{access-point-alias}}/gsod-monthly/"
           ],
           "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2"
       }
   }
   ```

1. 將下列監控組態儲存為 `job-config.json`。它會將驅動程式和執行器日誌傳送至 CloudWatch Logs。

   ```
   {
       "monitoringConfiguration": {
           "cloudWatchLoggingConfiguration": {
               "enabled": true,
               "logGroupName": "/aws/emr-serverless/fsxn-emr-app"
           }
       }
   }
   ```

1. 提交任務。

   ```
   $ aws emr-serverless start-job-run \
       --application-id {{application-id}} \
       --execution-role-arn arn:aws:iam::{{account-id}}:role/fsxn-emr-job-role \
       --name gsod-monthly \
       --job-driver file://job-driver.json \
       --configuration-overrides file://job-config.json
   ```

   記下回應中的 `jobRunId`。

1. 輪詢任務狀態。任務會從 轉換為 `SCHEDULED` `RUNNING` `SUCCESS`。

   ```
   $ aws emr-serverless get-job-run \
       --application-id {{application-id}} \
       --job-run-id {{job-run-id}} \
       --query 'jobRun.state'
   ```

**注意**  
如果任務失敗，請檢查日誌群組 下 CloudWatch Logs 中的驅動程式日誌`/aws/emr-serverless/fsxn-emr-app`。Amazon EMR Serverless 會在每次任務執行時寫入一個日誌串流。

## 步驟 6：檢查輸出
<a name="tutorial-emr-inspect-output"></a>

確認任務每個月撰寫一個 Parquet 分割區，而且輸出是可讀取的。

1. 列出輸出分割區。

   ```
   $ aws s3 ls "s3://{{access-point-alias}}/gsod-monthly/" --recursive
   ```

   每個`month=YYYY-MM/`分割區應該會看到一個 Parquet 檔案，加上根目錄的`_SUCCESS`標記。

1. 在本機讀取分割區以驗證內容。

   ```
   $ aws s3 cp "s3://{{access-point-alias}}/gsod-monthly/month=2024-06/" . \
       --recursive --exclude "_SUCCESS"
   python3 -c "import pyarrow.parquet as pq; \
       t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \
       print(t.schema); print(t.to_pandas().head())"
   ```

   輸出結構描述包括 `station`、`station_name`、`lat`、`lon`、`avg_temp_f``min_temp_f`、`max_temp_f`、、`precip_event_days`、 `total_prcp_in`和 `observation_days`。

## 擴展模式
<a name="tutorial-emr-extending"></a>
+ **使用 Spark SQL 查詢輸出。**向 將分割的輸出註冊為資料表， AWS Glue Data Catalog 並使用 Spark SQL、Athena 或任何其他讀取 AWS Glue 目錄資料表的工具進行查詢。如需註冊存取點後端資料集的說明，請參閱 [使用 Amazon Athena 查詢具有 SQL 的檔案](tutorial-query-data-with-athena.md)。
+ **使用 Iceberg 進行 ACID 寫入。**對於更新或合併資料的工作負載，請將任務設定為寫入存取點上的 Iceberg 資料表，而非純 Parquet。Amazon EMR Serverless 預設會在最近的版本標籤上包含 Iceberg 執行時間。
+ **使用 Amazon EMR Studio 以互動方式執行 。**將 Jupyter 筆記本連接至 Amazon EMR Serverless 應用程式，以互動方式探索資料。請參閱《[Amazon EMR Serverless 使用者指南》中的使用 Amazon EMR Serverless 的互動式工作負載](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/interactive-workloads.html)。 **
+ **排程任務。**使用 Amazon EventBridge 排程器或 AWS Step Functions 以週期性排程執行任務 （例如，當新的一天資料落入磁碟區時）。

## 疑難排解
<a name="tutorial-emr-troubleshooting"></a>

存取點`AccessDenied`上的 任務失敗  
確認任務角色政策在存取點 ARN `s3:ListBucket` （不在儲存貯體上） 上授予 `s3:GetObject`和 ，且存取點具有網際網路原始伺服器，以便 Amazon EMR Serverless 服務可以連接它。

任務成功，但輸出空白  
檢查輸入路徑。Amazon S3 會在字面上`ListObjectsV2`處理字首，因此 `s3://alias/gsod/2024`（沒有尾斜線） 和 `s3://alias/gsod/2024/`（斜線） 的行為可能不同。指向檔案目錄時，請包含結尾斜線。

驅動程式日誌不在 CloudWatch Logs 中  
監控組態必須在 `--configuration-overrides`上傳遞`start-job-run`，而不是在應用程式上傳遞。每個任務執行會在設定的日誌群組下寫入自己的日誌串流。

## 清除
<a name="tutorial-emr-clean-up"></a>

停止並刪除應用程式、移除 IAM 角色，以及刪除您不再需要的任何上傳資料。

```
$ aws emr-serverless stop-application --application-id {{application-id}}
aws emr-serverless delete-application --application-id {{application-id}}
aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access
aws iam delete-role --role-name fsxn-emr-job-role
aws s3 rm "s3://{{access-point-alias}}/scripts/gsod_monthly.py"
aws s3 rm "s3://{{access-point-alias}}/gsod/" --recursive
aws s3 rm "s3://{{access-point-alias}}/gsod-monthly/" --recursive
```