使用分布式 Map 处理 Amazon S3 存储桶中的数据 - AWS Step Functions

使用分布式 Map 处理 Amazon S3 存储桶中的数据

此示例项目演示了如何使用分布式 Map 状态 来处理大规模数据,例如,分析历史天气数据并确定每月地球上平均温度最高的气象站。天气数据记录在 1.2 万多个 CSV 文件中,这些文件又存储在 Amazon S3 存储桶中。

此示例项目包括两个分布式 Map 状态,分别名为 Distributed S3 copy NOA DataProcessNOAADataDistributed S3 copy NOA Data 会迭代名为 noaa-gsod-pds 的公共 Amazon S3 存储桶中的 CSV 文件,然后将其复制到 AWS 账户中的Amazon S3 存储桶。ProcessNOAAData 对复制的文件进行迭代,并包含一个用于执行温度分析的 Lambda 函数。

示例项目首先通过调用 ListObjectsV2 API 操作来检查 Amazon S3 存储桶的内容。根据响应此调用返回的密钥数量,示例项目会做出以下决定之一:

  • 如果密钥计数大于或等于 1,则项目将转换到 ProcessNOAAData 状态。此分布式 Map 状态包括一个名为 TemperatureFunction 的 Lambda 函数,用于查找每月平均气温最高的气象站。此函数返回一个以 year-month 为密钥的目录和一个包含将气象站相关信息作为值的目录。

  • 如果返回的密钥计数不超过 1,则 Distributed S3 copy NOA Data 状态将列出公共存储桶 noaa-gsod-pds 中的所有对象,并以 100 个为一批,将单个对象迭代复制到您账户中的另一个存储桶。内联 Map 会执行对象的迭代复制。

    复制完所有对象后,项目将转换到 ProcessNOAAData 状态,处理天气数据。

示例项目最后过渡到一个 reducer Lambda 函数,该函数对 TemperatureFunction 函数返回的结果进行最终聚合,并将结果写入 Amazon DynamoDB 表。

使用分布式 Map,一次最多可运行 1 万个并行子工作流。在此示例项目中,ProcessNOAAData 分布式 Map 的最大并发量设置为 3000,这就限制了它只能并行执行 3000 个子工作流。

本示例项目将创建一台状态机、支持的 AWS 资源,并将配置相关 IAM 权限。探索此示例项目,了解如何使用分布式 Map 编排大规模并行工作负载,或将其作为您自己项目的起点。

重要

此示例项目仅在美国东部(弗吉尼亚州北部)区域可用。

第 1 步:创建状态机

  1. 打开 Step Functions 控制台,然后选择创建状态机

  2. 选择从模板创建,然后找到相关的入门模板。选择下一步以继续。

  3. 选择模板使用方式:

    1. 运行演示 — 创建只读状态机。审核后,您可以创建工作流和所有相关资源。

    2. 构建依据 — 提供可编辑的工作流定义,您可借助自有资源对其进行审核、定制并部署。(不会自动创建函数或队列等相关资源。)

  4. 选择使用模板继续进行选择。

    注意

    部署到您的账户的服务将会收取标准费用。

第 2 步:运行演示状态机

如果您选择了运行演示选项,则所有相关资源都将部署并准备好运行。如果您选择了构建依据选项,则可能需要先设置占位符值并创建其他资源,然后才能运行自定义工作流。

  1. 选择部署并运行

  2. 等待 CloudFormation 堆栈部署。这一过程耗时最多 10 分钟。

  3. 出现开始执行选项后,查看输入并选择开始执行

恭喜您!

现在,您应该有了一个正在运行的状态机演示。您可以在图表视图中选择状态来查看输入、输出、变量、定义和事件。