

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Amazon EMR 无服务器运行 Spark 作业
<a name="tutorial-run-spark-with-emr-serverless"></a>

运行 Spark 工作负载（用于日志处理、功能工程、复杂 ETL 或科学分析）的数据工程团队通常在 FSx for ONTAP 卷上保存源数据，这些数据由本地摄取管道、NFS 或 SMB 数据移动器或直接挂载卷的应用程序编写。

通过将 Amazon S3 接入点连接到卷，Amazon EMR Serverless 通过接入点读取数据，对其运行 Spark 作业，然后将结果写回同一个卷。Amazon EMR Serverless 会自动处理集群生命周期 — 您提交任务并为其运行的秒数付费。

这种模式适合需要完整 Spark 运行时的工作负载（自定义库、迭代算法、长时间运行的转换或通过 Amazon EMR Studio 实现的交互式笔记本），在这些工作负载中，较轻的选项（适用于 AWS Glue SQL 的 Amazon Athena 和托管 ETL）并不合适。有关这些替代方案的信息，请参阅[使用 Amazon Athena 使用 SQL 查询文件](tutorial-query-data-with-athena.md)和[使用构建 ETL 管道 AWS Glue](tutorial-transform-data-with-glue.md)。

在本教程中，您将模拟一个气象小组，汇总了在 FSx for ONTAP 卷上进行的一年的 NOAA 全球表面每日摘要 (GSOD) 观测结果。您提交一份 PySpark 任务，该作业读取原始 CSV 文件，计算每个站点的每月聚合数据（平均温度、总降水量和降水事件发生的天数），并将结果写为 Parquet 按月分区——所有这些都要通过接入点完成。

**注意**  
本教程大约需要 **30 到 40 分钟**才能完成。 AWS 服务 使用者会对您创建的资源产生费用。如果您及时完成所有步骤，包括**清理**部分，则美国东部（弗吉尼亚北部）的预期费用将低于 **1美元** AWS 区域。该估算值不包括 FSx 对 ONTAP 容量本身的持续费用。

## 先决条件
<a name="tutorial-emr-prerequisites"></a>
+ 连接了 Amazon S3 接入点的 ONTAP 卷的 FSx。接入点必须具有**互联网**网络来源，这样 Amazon EMR Serverless 服务才能访问该接入点。有关说明，请参阅[创建接入点](fsxn-creating-access-points.md)。
+ AWS CLI 已安装版本 2，并配置了可以创建 IAM 角色和 Amazon EMR 无服务器资源的证书。

## 步骤 1：将示例数据集上传到接入点
<a name="tutorial-emr-upload-data"></a>

NOAA GSOD 数据集是每日天气观测的公共数据集，每个站点每年一个 CSV 文件。在本教程中，您将从公共 `noaa-gsod-pds` Amazon S3 存储桶中下载 100 个站点的子集，然后将其上传到您的接入点。

1. 下载 2024 年的前 100 个电台文件。

   ```
   $ mkdir -p ~/gsod && cd ~/gsod
   aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt
   while read f; do
       aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors
   done < files.txt
   ls | wc -l
   ```

   该命令可下载大约 100 个 CSV 文件，总大小约为 7—8 MB。

1. 将文件上传到前`gsod/2024/`缀下的接入点。{{access-point-alias}}替换为您的接入点别名。

   ```
   $ aws s3 cp ~/gsod/ "s3://{{access-point-alias}}/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors
   ```

## 第 2 步：写作 PySpark 业
<a name="tutorial-emr-write-script"></a>

该作业读取输入前缀下的所有 CSV 文件，过滤表示缺失数据的哨兵值，解析`FRSHTT`位域（雾、雨、雪、冰雹、雷霆、龙卷风）以计算降水事件天数、按`(station, month)`天进行聚合，并将分区的 Parquet 写回接入点。

1. 将以下脚本保存到名为的文件中`gsod_monthly.py`。

   ```
   # gsod_monthly.py
   import sys
   from pyspark.sql import SparkSession
   from pyspark.sql import functions as F
   
   INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2]
   
   # GSOD sentinels for missing data
   TEMP_SENTINEL = 9999.9
   PRCP_SENTINEL = 99.99
   
   spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate()
   
   raw = spark.read.option("header", True).csv(INPUT_PATH)
   
   cleaned = (raw
       .select(
           F.col("STATION").alias("station"),
           F.col("NAME").alias("station_name"),
           F.col("LATITUDE").cast("double").alias("lat"),
           F.col("LONGITUDE").cast("double").alias("lon"),
           F.to_date("DATE", "yyyy-MM-dd").alias("date"),
           F.col("TEMP").cast("double").alias("temp_f"),
           F.col("PRCP").cast("double").alias("prcp_in"),
           F.col("FRSHTT").alias("frshtt"),
       )
       .filter(F.col("temp_f") != TEMP_SENTINEL)
       .withColumn("month", F.date_format("date", "yyyy-MM"))
       .withColumn(
           "prcp_in",
           F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")),
       )
       # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado.
       # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events.
       .withColumn(
           "had_precip_event",
           F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0),
       )
   )
   
   monthly = (cleaned
       .groupBy("station", "station_name", "lat", "lon", "month")
       .agg(
           F.avg("temp_f").alias("avg_temp_f"),
           F.min("temp_f").alias("min_temp_f"),
           F.max("temp_f").alias("max_temp_f"),
           F.sum("prcp_in").alias("total_prcp_in"),
           F.sum("had_precip_event").alias("precip_event_days"),
           F.count("*").alias("observation_days"),
       )
   )
   
   (monthly.write
       .mode("overwrite")
       .partitionBy("month")
       .parquet(OUTPUT_PATH))
   
   spark.stop()
   ```

1. 将脚本上传到`scripts/`前缀下的接入点。

   ```
   $ aws s3 cp gsod_monthly.py "s3://{{access-point-alias}}/scripts/gsod_monthly.py"
   ```

## 步骤 3：创建 Amazon EMR 无服务器工作角色
<a name="tutorial-emr-iam-role"></a>

Amazon EMR Serverless 在运行您的任务时担任 IAM 执行角色。该角色需要权限才能读取和写入接入点以及将日志写入 CloudWatch 日志。展开以下部分了解设置步骤。

### 创建 Amazon EMR 无服务器工作角色
<a name="tutorial-emr-iam-role-steps"></a>

1. 将以下信任策略另存为`emr-trust-policy.json`。它允许 Amazon EMR Serverless 担任该角色。

   ```
   {
       "Version": "2012-10-17", 		 	 	 
       "Statement": [{
           "Effect": "Allow",
           "Principal": {"Service": "emr-serverless.amazonaws.com"},
           "Action": "sts:AssumeRole"
       }]
   }
   ```

1. 将以下权限策略另存为`emr-permissions.json`。用您的值替换{{region}}{{account-id}}、和{{access-point-name}}。

   ```
   {
       "Version": "2012-10-17", 		 	 	 
       "Statement": [
           {
               "Sid": "Logs",
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents",
                   "logs:DescribeLogGroups",
                   "logs:DescribeLogStreams"
               ],
               "Resource": "*"
           },
           {
               "Sid": "APRead",
               "Effect": "Allow",
               "Action": ["s3:GetObject", "s3:ListBucket"],
               "Resource": [
                   "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}",
                   "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
               ]
           },
           {
               "Sid": "APWrite",
               "Effect": "Allow",
               "Action": [
                   "s3:PutObject", "s3:DeleteObject",
                   "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts"
               ],
               "Resource": "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
           }
       ]
   }
   ```

1. 创建角色并附加策略。

   ```
   $ aws iam create-role --role-name fsxn-emr-job-role \
       --assume-role-policy-document file://emr-trust-policy.json
   aws iam put-role-policy --role-name fsxn-emr-job-role \
       --policy-name emr-access --policy-document file://emr-permissions.json
   ```

## 第 4 步：创建并启动 Amazon EMR 无服务器应用程序
<a name="tutorial-emr-create-app"></a>

Amazon EMR 无服务器应用程序是针对特定版本标签和引擎（Spark 或 Hive）的长寿命计算环境。您向它提交一份或多份工作。应用程序会根据任务需求自动向上和向下扩展计算，并在没有作业运行时闲置。

1. 使用最新版本的亚马逊 EMR 创建 Spark 应用程序。

   ```
   $ aws emr-serverless create-application \
       --name fsxn-emr-app --type SPARK --release-label emr-7.0.0
   ```

   记下响应中的 `applicationId`。

1. 启动应用程序。启动会预热一小部分工作人员，因此第一个作业运行时不会出现冷启动延迟。

   ```
   $ aws emr-serverless start-application --application-id {{application-id}}
   ```

   等待状态变成`STARTED`。

   ```
   $ aws emr-serverless get-application --application-id {{application-id}} \
       --query 'application.state'
   ```

## 第 5 步：提交 Spark 任务
<a name="tutorial-emr-submit-job"></a>

使用应用程序 ID 和执行角色提交作业。该作业通过接入点从中读取原始 CSV`gsod-monthly/`，`gsod/2024/`并将分区的 Parquet 写入其中。

1. 将作业驱动程序配置另存为`job-driver.json`。替换占位符。

   ```
   {
       "sparkSubmit": {
           "entryPoint": "s3://{{access-point-alias}}/scripts/gsod_monthly.py",
           "entryPointArguments": [
               "s3://{{access-point-alias}}/gsod/2024/",
               "s3://{{access-point-alias}}/gsod-monthly/"
           ],
           "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2"
       }
   }
   ```

1. 将以下监视配置另存为`job-config.json`。它将驱动程序和执行者日志发送到日 CloudWatch 志。

   ```
   {
       "monitoringConfiguration": {
           "cloudWatchLoggingConfiguration": {
               "enabled": true,
               "logGroupName": "/aws/emr-serverless/fsxn-emr-app"
           }
       }
   }
   ```

1. 提交作业。

   ```
   $ aws emr-serverless start-job-run \
       --application-id {{application-id}} \
       --execution-role-arn arn:aws:iam::{{account-id}}:role/fsxn-emr-job-role \
       --name gsod-monthly \
       --job-driver file://job-driver.json \
       --configuration-overrides file://job-config.json
   ```

   记下响应中的 `jobRunId`。

1. 轮询作业状态。作业从过渡`SCHEDULED`到`RUNNING`到`SUCCESS`。

   ```
   $ aws emr-serverless get-job-run \
       --application-id {{application-id}} \
       --job-run-id {{job-run-id}} \
       --query 'jobRun.state'
   ```

**注意**  
如果作业失败，请在日志组下的 “日志” 中查看驱动程序 CloudWatch 日志`/aws/emr-serverless/fsxn-emr-app`。Amazon EMR Serverless 每次作业运行都会写入一个日志流。

## 步骤 6：检查输出
<a name="tutorial-emr-inspect-output"></a>

验证任务每月写入一个 Parquet 分区以及输出是否可读。

1. 列出输出分区。

   ```
   $ aws s3 ls "s3://{{access-point-alias}}/gsod-monthly/" --recursive
   ```

   您应该看到每个`month=YYYY-MM/`分区一个 Parquet 文件，并在根部看到一个`_SUCCESS`标记。

1. 在本地读取分区以验证内容。

   ```
   $ aws s3 cp "s3://{{access-point-alias}}/gsod-monthly/month=2024-06/" . \
       --recursive --exclude "_SUCCESS"
   python3 -c "import pyarrow.parquet as pq; \
       t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \
       print(t.schema); print(t.to_pandas().head())"
   ```

   输出架构包括`station`、、`station_name`、`lat``lon`、`avg_temp_f`、`min_temp_f`、`max_temp_f`、`total_prcp_in`、`precip_event_days`、和`observation_days`。

## 扩展模式
<a name="tutorial-emr-extending"></a>
+ **使用 Spark SQL 查询输出。**将分区输出注册为表， AWS Glue Data Catalog 然后使用 Spark SQL、Athena 或任何其他读取目录表的工具对其进行查询。 AWS Glue 有关注册接入点支持的数据集的说明，请参阅[使用 Amazon Athena 使用 SQL 查询文件](tutorial-query-data-with-athena.md)。
+ **使用 Iceberg 进行 ACID 写入。**对于更新或合并数据的工作负载，请将作业配置为写入接入点上的 Iceberg 表，而不是普通的 Parquet。Amazon EMR Serverless 默认在最新版本的标签上包含 Iceberg 运行时。
+ **与亚马逊 EMR Studio 以交互方式运行。**将 Jupyter 笔记本连接到 Amazon EMR 无服务器应用程序，以交互方式浏览数据。*在《亚马逊 EMR 无服务器用户指南》中查看[使用 Amazon EMR Serverless 进行交互式工作负载](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/interactive-workloads.html)。*
+ **安排作业。**使用 Amazon EventBridge Scheduler 或 AWS Step Functions 按定期计划运行作业（例如，当新一天的数据进入卷时）。

## 问题排查
<a name="tutorial-emr-troubleshooting"></a>

接入点`AccessDenied`上的 Job 失败  
验证任务角色策略是否在接入点 ARN（而不是存储桶`s3:ListBucket`上）上授予`s3:GetObject`和，以及接入点是否有互联网网络来源，以便 Amazon EMR Serverless 服务可以访问它。

Job 成功但输出为空  
检查输入路径。Amazon S3 按字面意思`ListObjectsV2`处理前缀，因此`s3://alias/gsod/2024`（没有尾部斜杠）和`s3://alias/gsod/2024/`（尾部斜杠）的行为可能有所不同。指向文件目录时，请包括尾部的斜杠。

驱动程序日志不在 CloudWatch 日志中  
监视配置必须在应用程序`--configuration-overrides`上传递`start-job-run`，而不是在应用程序上传递。每个作业运行都会在配置的日志组下写入自己的日志流。

## 清理
<a name="tutorial-emr-clean-up"></a>

停止并删除应用程序，移除 IAM 角色，并删除您不再需要的所有已上传数据。

```
$ aws emr-serverless stop-application --application-id {{application-id}}
aws emr-serverless delete-application --application-id {{application-id}}
aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access
aws iam delete-role --role-name fsxn-emr-job-role
aws s3 rm "s3://{{access-point-alias}}/scripts/gsod_monthly.py"
aws s3 rm "s3://{{access-point-alias}}/gsod/" --recursive
aws s3 rm "s3://{{access-point-alias}}/gsod-monthly/" --recursive
```