View a markdown version of this page

使用 Amazon EMR 无服务器运行 Spark 作业 - FSx for ONTAP

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Amazon EMR 无服务器运行 Spark 作业

运行 Spark 工作负载(用于日志处理、功能工程、复杂 ETL 或科学分析)的数据工程团队通常在 FSx for ONTAP 卷上保存源数据,这些数据由本地摄取管道、NFS 或 SMB 数据移动器或直接挂载卷的应用程序编写。

通过将 Amazon S3 接入点连接到卷,Amazon EMR Serverless 通过接入点读取数据,对其运行 Spark 作业,然后将结果写回同一个卷。Amazon EMR Serverless 会自动处理集群生命周期 — 您提交任务并为其运行的秒数付费。

这种模式适合需要完整 Spark 运行时的工作负载(自定义库、迭代算法、长时间运行的转换或通过 Amazon EMR Studio 实现的交互式笔记本),在这些工作负载中,较轻的选项(适用于 AWS Glue SQL 的 Amazon Athena 和托管 ETL)并不合适。有关这些替代方案的信息,请参阅使用 Amazon Athena 使用 SQL 查询文件使用构建 ETL 管道 AWS Glue

在本教程中,您将模拟一个气象小组,汇总了在 FSx for ONTAP 卷上进行的一年的 NOAA 全球表面每日摘要 (GSOD) 观测结果。您提交一份 PySpark 任务,该作业读取原始 CSV 文件,计算每个站点的每月聚合数据(平均温度、总降水量和降水事件发生的天数),并将结果写为 Parquet 按月分区——所有这些都要通过接入点完成。

注意

本教程大约需要 30 到 40 分钟才能完成。 AWS 服务 使用者会对您创建的资源产生费用。如果您及时完成所有步骤,包括清理部分,则美国东部(弗吉尼亚北部)的预期费用将低于 1美元 AWS 区域。该估算值不包括 FSx 对 ONTAP 容量本身的持续费用。

先决条件

  • 连接了 Amazon S3 接入点的 ONTAP 卷的 FSx。接入点必须具有互联网网络来源,这样 Amazon EMR Serverless 服务才能访问该接入点。有关说明,请参阅创建接入点

  • AWS CLI 已安装版本 2,并配置了可以创建 IAM 角色和 Amazon EMR 无服务器资源的证书。

步骤 1:将示例数据集上传到接入点

NOAA GSOD 数据集是每日天气观测的公共数据集,每个站点每年一个 CSV 文件。在本教程中,您将从公共 noaa-gsod-pds Amazon S3 存储桶中下载 100 个站点的子集,然后将其上传到您的接入点。

  1. 下载 2024 年的前 100 个电台文件。

    $ mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -l

    该命令可下载大约 100 个 CSV 文件,总大小约为 7—8 MB。

  2. 将文件上传到前gsod/2024/缀下的接入点。access-point-alias替换为您的接入点别名。

    $ aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors

第 2 步:写作 PySpark 业

该作业读取输入前缀下的所有 CSV 文件,过滤表示缺失数据的哨兵值,解析FRSHTT位域(雾、雨、雪、冰雹、雷霆、龙卷风)以计算降水事件天数、按(station, month)天进行聚合,并将分区的 Parquet 写回接入点。

  1. 将以下脚本保存到名为的文件中gsod_monthly.py

    # gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop()
  2. 将脚本上传到scripts/前缀下的接入点。

    $ aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"

步骤 3:创建 Amazon EMR 无服务器工作角色

Amazon EMR Serverless 在运行您的任务时担任 IAM 执行角色。该角色需要权限才能读取和写入接入点以及将日志写入 CloudWatch 日志。展开以下部分了解设置步骤。

  1. 将以下信任策略另存为emr-trust-policy.json。它允许 Amazon EMR Serverless 担任该角色。

    { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] }
  2. 将以下权限策略另存为emr-permissions.json。用您的值替换regionaccount-id、和access-point-name

    { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] }
  3. 创建角色并附加策略。

    $ aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json

第 4 步:创建并启动 Amazon EMR 无服务器应用程序

Amazon EMR 无服务器应用程序是针对特定版本标签和引擎(Spark 或 Hive)的长寿命计算环境。您向它提交一份或多份工作。应用程序会根据任务需求自动向上和向下扩展计算,并在没有作业运行时闲置。

  1. 使用最新版本的亚马逊 EMR 创建 Spark 应用程序。

    $ aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0

    记下响应中的 applicationId

  2. 启动应用程序。启动会预热一小部分工作人员,因此第一个作业运行时不会出现冷启动延迟。

    $ aws emr-serverless start-application --application-id application-id

    等待状态变成STARTED

    $ aws emr-serverless get-application --application-id application-id \ --query 'application.state'

第 5 步:提交 Spark 任务

使用应用程序 ID 和执行角色提交作业。该作业通过接入点从中读取原始 CSVgsod-monthly/gsod/2024/并将分区的 Parquet 写入其中。

  1. 将作业驱动程序配置另存为job-driver.json。替换占位符。

    { "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } }
  2. 将以下监视配置另存为job-config.json。它将驱动程序和执行者日志发送到日 CloudWatch 志。

    { "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } }
  3. 提交作业。

    $ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.json

    记下响应中的 jobRunId

  4. 轮询作业状态。作业从过渡SCHEDULEDRUNNINGSUCCESS

    $ aws emr-serverless get-job-run \ --application-id application-id \ --job-run-id job-run-id \ --query 'jobRun.state'
注意

如果作业失败,请在日志组下的 “日志” 中查看驱动程序 CloudWatch 日志/aws/emr-serverless/fsxn-emr-app。Amazon EMR Serverless 每次作业运行都会写入一个日志流。

步骤 6:检查输出

验证任务每月写入一个 Parquet 分区以及输出是否可读。

  1. 列出输出分区。

    $ aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursive

    您应该看到每个month=YYYY-MM/分区一个 Parquet 文件,并在根部看到一个_SUCCESS标记。

  2. 在本地读取分区以验证内容。

    $ aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"

    输出架构包括station、、station_namelatlonavg_temp_fmin_temp_fmax_temp_ftotal_prcp_inprecip_event_days、和observation_days

扩展模式

  • 使用 Spark SQL 查询输出。将分区输出注册为表, AWS Glue Data Catalog 然后使用 Spark SQL、Athena 或任何其他读取目录表的工具对其进行查询。 AWS Glue 有关注册接入点支持的数据集的说明,请参阅使用 Amazon Athena 使用 SQL 查询文件

  • 使用 Iceberg 进行 ACID 写入。对于更新或合并数据的工作负载,请将作业配置为写入接入点上的 Iceberg 表,而不是普通的 Parquet。Amazon EMR Serverless 默认在最新版本的标签上包含 Iceberg 运行时。

  • 与亚马逊 EMR Studio 以交互方式运行。将 Jupyter 笔记本连接到 Amazon EMR 无服务器应用程序,以交互方式浏览数据。在《亚马逊 EMR 无服务器用户指南》中查看使用 Amazon EMR Serverless 进行交互式工作负载

  • 安排作业。使用 Amazon EventBridge Scheduler 或 AWS Step Functions 按定期计划运行作业(例如,当新一天的数据进入卷时)。

问题排查

接入点AccessDenied上的 Job 失败

验证任务角色策略是否在接入点 ARN(而不是存储桶s3:ListBucket上)上授予s3:GetObject和,以及接入点是否有互联网网络来源,以便 Amazon EMR Serverless 服务可以访问它。

Job 成功但输出为空

检查输入路径。Amazon S3 按字面意思ListObjectsV2处理前缀,因此s3://alias/gsod/2024(没有尾部斜杠)和s3://alias/gsod/2024/(尾部斜杠)的行为可能有所不同。指向文件目录时,请包括尾部的斜杠。

驱动程序日志不在 CloudWatch 日志中

监视配置必须在应用程序--configuration-overrides上传递start-job-run,而不是在应用程序上传递。每个作业运行都会在配置的日志组下写入自己的日志流。

清理

停止并删除应用程序,移除 IAM 角色,并删除您不再需要的所有已上传数据。

$ aws emr-serverless stop-application --application-id application-id aws emr-serverless delete-application --application-id application-id aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive