

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 在 Step Functions 中使用 Lambda 函数处理批量数据
<a name="tutorial-itembatcher-param-task"></a>

在本教程中，您将使用*分布式 Map 状态*的 [ItemBatcher（Map）](input-output-itembatcher.md) 字段来处理 Lambda 函数中的整批项目。每批最多包含三个项目。*分布式 Map 状态*启动四个子工作流执行，其中每个执行处理三个项目，而一个执行处理单个项目。每个子工作流执行都会调用一个 Lambda 函数，该函数对批次中存在的各个项目进行迭代。

您将创建一个对整数数组执行乘法的状态机。假设输入的整数数组是 `[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]`，乘法因子是 `7`。这些整数与乘法因子 7 相乘后形成的数组将是 `[7, 14, 21, 28, 35, 42, 49, 56, 63, 70]`。

## 第 1 步：创建状态机
<a name="itembatcher-param-task-create-state-machine"></a>

在此步骤中，您将创建状态机的工作流原型，该原型将整批数据传递给将在[第 2 步](#itembatcher-param-task-config-resource)中创建的 Lambda 函数。
+ 使用以下定义通过 [Step Functions 控制台](https://console.aws.amazon.com/states/home?region=us-east-1#/)创建状态机。有关创建状态机的信息，请参阅[开始使用分布式 Map 状态教程](tutorial-map-distributed.md)中的[第 1 步：创建工作流原型](tutorial-map-distributed.md#use-dist-map-create-workflow)。

  在此状态机中，您可以定义一个*分布式 Map 状态*，该状态接受 10 个整数的数组作为输入，并以 `3` 个整数为一批将该数组传递给一个 Lambda 函数。Lambda 函数迭代批次中在的各个项目，并返回名为 `multiplied` 的输出数组。输出数组包含对输入数组中传递的项目执行乘法的结果。
**重要**  
[确保将以下代码中 Lambda 函数的 Amazon 资源名称 (ARN) 替换为您将在第 2 步](#itembatcher-param-task-config-resource)中创建的函数的 ARN。

  ```
  {
    "StartAt": "Pass",
    "States": {
      "Pass": {
        "Type": "Pass",
        "Next": "Map",
        "Result": {
          "MyMultiplicationFactor": 7,
          "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        }
      },
      "Map": {
        "Type": "Map",
        "ItemProcessor": {
          "ProcessorConfig": {
            "Mode": "DISTRIBUTED",
            "ExecutionType": "STANDARD"
          },
          "StartAt": "Lambda Invoke",
          "States": {
            "Lambda Invoke": {
              "Type": "Task",
              "Resource": "arn:aws:states:::lambda:invoke",
              "OutputPath": "$.Payload",
              "Parameters": {
                "Payload.$": "$",
                "FunctionName": "arn:aws:lambda:region:account-id:function:functionName"
              },
              "Retry": [
                {
                  "ErrorEquals": [
                    "Lambda.ServiceException",
                    "Lambda.AWSLambdaException",
                    "Lambda.SdkClientException",
                    "Lambda.TooManyRequestsException"
                  ],
                  "IntervalSeconds": 2,
                  "MaxAttempts": 6,
                  "BackoffRate": 2
                }
              ],
              "End": true
            }
          }
        },
        "End": true,
        "Label": "Map",
        "MaxConcurrency": 1000,
        "ItemBatcher": {
          "MaxItemsPerBatch": 3,
          "BatchInput": {
            "MyMultiplicationFactor.$": "$.MyMultiplicationFactor"
          }
        },
        "ItemsPath": "$.MyItems"
      }
    }
  }
  ```

## 第 2 步：创建 Lambda 函数
<a name="itembatcher-param-task-config-resource"></a>

在此步骤中，您将创建 Lambda 函数，该函数可处理批次传递的所有项目。

**重要**  
确保您的 Lambda 函数与状态机处于同一 AWS 区域。

**创建 Lambda 函数**

1. 使用 [Lambda 控制台](https://console.aws.amazon.com/lambda/home)创建一个 **Python** Lambda 函数，命名为 **ProcessEntireBatch**。有关创建 Lambda 函数的信息，请参阅[开始使用分布式 Map 状态](tutorial-map-distributed.md)教程中的[第 4 步：配置 Lambda 函数](tutorial-map-distributed.md#use-dist-map-config-resource)。

1. 复制以下 Lambda 函数代码，并将其粘贴到 Lambda 函数的**代码源**部分。

   ```
   import json
   
   def lambda_handler(event, context):
       multiplication_factor = event['BatchInput']['MyMultiplicationFactor']
       items = event['Items']
       
       results = [multiplication_factor * item for item in items]
       
       return {
           'statusCode': 200,
           'multiplied': results
       }
   ```

1. 创建 Lambda 函数后，复制显示在页面右上角的函数 ARN。以下是一个 ARN 示例，其中 *`function-name`* 是 Lambda 函数的名称（在本例中为 `ProcessEntireBatch`）：

   ```
   arn:aws:lambda:region:123456789012:function:function-name
   ```

   您需要在[第 1 步](#itembatcher-param-task-create-state-machine)中创建的状态机中提供函数 ARN。

1. 选择**部署**，以部署更改。

## 第 3 步：运行状态机
<a name="itembatcher-param-task-run-state-machine"></a>

运行[状态机](#itembatcher-param-task-create-state-machine)时，*分布式 Map 状态*会启动四个子工作流程执行，其中每个执行处理三个项目，而一个执行处理单个项目。

以下示例显示了由其中一个子工作流执行传递给 [`ProcessEntireBatch`](#itembatcher-param-task-config-resource) 函数的数据。

```
{
  "BatchInput": {
    "MyMultiplicationFactor": 7
  },
  "Items": [1, 2, 3]
}
```

给定此输入，以下示例显示了 Lambda 函数返回的名为 `multiplied` 的输出数组。

```
{
  "statusCode": 200,
  "multiplied": [7, 14, 21]
}
```

状态机返回以下输出，其中包含为四个子工作流执行四个数组，名为 `multiplied`。这些数组包含各个输入项的乘法结果。

```
[
  {
    "statusCode": 200,
    "multiplied": [7, 14, 21]
  },
  {
    "statusCode": 200,
    "multiplied": [28, 35, 42]
  },
  {
    "statusCode": 200,
    "multiplied": [49, 56, 63]
  },
  {
    "statusCode": 200,
    "multiplied": [70]
  }
]
```

要将返回的所有数组项合并到一个输出数组中，可以使用 [ResultSelector](input-output-inputpath-params.md#input-output-resultselector) 字段。在*分布式 Map 状态*中定义此字段，用于查找所有 `multiplied` 数组，提取这些数组中的所有项目，然后将它们组合成一个输出数组。

要使用 `ResultSelector` 字段，请更新您的状态机定义，如以下示例所示。

```
{
  "StartAt": "Pass",
  "States": {
    ...
    ...
    "Map": {
      "Type": "Map",
      ...
      ...
      "ItemsPath": "$.MyItems",
      "ResultSelector": {
        "multiplied.$": "$..multiplied[*]"
      }
    }
  }
}
```

更新的状态机返回统一的输出数组，如以下示例所示。

```
{
  "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]
}
```