

# 将 AWS Lambda 与 Amazon DynamoDB 结合使用
<a name="with-ddb"></a>

**注意**  
如果想要将数据发送到 Lambda 函数以外的目标，或要在发送数据之前丰富数据，请参阅 [Amazon EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)（Amazon EventBridge 管道）。

您可以使用 AWS Lambda 函数来处理 [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)中的记录。使用 DynamoDB Streams，每次更新 DynamoDB 表时，您都可以触发 Lambda 函数以执行其他工作。

在处理 DynamoDB 流时，您需要实施部分批处理响应逻辑，以防止在批处理中的某些记录失败时重试成功处理的记录。Powertools for AWS Lambda 的[批处理器实用程序](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/)适用于 Python、TypeScript、.NET 和 Java，并通过自动处理部分批处理响应逻辑简化了此实现，从而减少了开发时间并提高了可靠性。

**Topics**
+ [轮询和批处理流](#dynamodb-polling-and-batching)
+ [轮询和流的起始位置](#dyanmo-db-stream-poll)
+ [DynamoDB Streams 中的分片同时读取器](#events-dynamodb-simultaneous-readers)
+ [事件示例](#events-sample-dynamodb)
+ [使用 Lambda 处理 DynamoDB 记录](services-dynamodb-eventsourcemapping.md)
+ [使用 DynamoDB 和 Lambda 配置部分批处理响应](services-ddb-batchfailurereporting.md)
+ [在 Lambda 中保留 DynamoDB 事件源的丢弃记录](services-dynamodb-errors.md)
+ [在 Lambda 中实现有状态的 DynamoDB 流处理](services-ddb-windows.md)
+ [Amazon DynamoDB 事件源映射的 Lambda 参数](services-ddb-params.md)
+ [对 DynamoDB 事件源使用事件筛选](with-ddb-filtering.md)
+ [教程：将 AWS Lambda 与 Amazon DynamoDB Streams 结合使用](with-ddb-example.md)

## 轮询和批处理流
<a name="dynamodb-polling-and-batching"></a>

Lambda 以每秒 4 次的基本频率轮询 DynamoDB 流中的分区来获取记录。如果记录可用，Lambda 会调用函数并等待结果。如果处理成功，Lambda 会恢复轮询，直到其收到更多记录。

默认情况下，Lambda 会在记录可用时尽快调用您的函数。如果 Lambda 从事件源中读取的批处理只有一条记录，则 Lambda 将会只向该函数发送一条记录。为避免在记录数量较少的情况下调用该函数，您可以配置 *batching window*（批处理时段），让事件源缓冲最多五分钟的记录。调用函数前，Lambda 会持续从事件源中读取记录，直到收集完整批处理、批处理时段到期或批处理达到 6MB 的有效负载时为止。有关更多信息，请参阅 [批处理行为](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching)。

**警告**  
Lambda 事件源映射至少处理每个事件一次，有可能出现重复处理记录的情况。为避免与重复事件相关的潜在问题，我们强烈建议您将函数代码设为幂等性。要了解更多信息，请参阅 AWS 知识中心的[如何让我的 Lambda 函数保持幂等性](https://repost.aws/knowledge-center/lambda-function-idempotent)。

Lambda 在发送下次批处理之前不会等待任何配置的[扩展](lambda-extensions.md)完成。换句话说，扩展可能会在 Lambda 处理下一批记录时继续运行。如果您违反了账户的任何[并发](lambda-concurrency.md)设置或限制，可能会导致节流问题。要检测这是否是潜在问题，请监控函数并检查所显示的[并发指标](monitoring-concurrency.md#general-concurrency-metrics)是否高于事件源映射的预期。由于调用间隔时间较短，Lambda 可能会短暂报告高于分片数量的并发使用量。即使对于没有扩展名的 Lambda 函数也是如此。

配置 [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor) 设置以同时使用多个 Lambda 调用处理 DynamoDB 流的一个分片。您可以指定 Lambda 通过从 1（默认值）到 10 的并行化因子从分区中轮询的并发批次数。例如，假设您将 `ParallelizationFactor` 设置为 2，则最多可以有 200 个并发 Lambda 调用来处理 100 个 DynamoDB 流分片（但您可能实际上会看到不同的 `ConcurrentExecutions` 指标值）。这有助于在数据量不稳定并且 [IteratorAge](monitoring-metrics-types.md#performance-metrics) 较高时纵向扩展处理吞吐量。增加每个分片的并发批次数后，Lambda 仍然可以确保项目（分区和排序键）级别的顺序处理。

## 轮询和流的起始位置
<a name="dyanmo-db-stream-poll"></a>

请注意，事件源映射创建和更新期间的流轮询最终是一致的。
+ 在事件源映射创建期间，可能需要几分钟才能开始轮询来自流的事件。
+ 在事件源映射更新期间，可能需要几分钟才能停止和重新开始轮询来自流的事件。

此行为意味着，如果你指定 `LATEST` 作为流的起始位置，事件源映射可能会在创建或更新期间错过事件。为确保不会错过任何事件，请将流的起始位置指定为 `TRIM_HORIZON`。

## DynamoDB Streams 中的分片同时读取器
<a name="events-dynamodb-simultaneous-readers"></a>

对于作为非全局表的单区域表，您可以设计最多两个 Lambda 函数来同时从同一个 DynamoDB Streams 分片读取数据。超过此限制会导致请求被拒。对于全局表，我们建议您将并行函数的数量限制为一个，以避免请求节流。

## 事件示例
<a name="events-sample-dynamodb"></a>

**Example**  

```
{
  "Records": [
    {
      "eventID": "1",
      "eventVersion": "1.0",
      "dynamodb": {
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "NewImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES",
        "SequenceNumber": "111",
        "SizeBytes": 26
      },
      "awsRegion": "us-west-2",
      "eventName": "INSERT",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    },
    {
      "eventID": "2",
      "eventVersion": "1.0",
      "dynamodb": {
        "OldImage": {
          "Message": {
            "S": "New item!"
          },
          "Id": {
            "N": "101"
          }
        },
        "SequenceNumber": "222",
        "Keys": {
          "Id": {
            "N": "101"
          }
        },
        "SizeBytes": 59,
        "NewImage": {
          "Message": {
            "S": "This item has changed"
          },
          "Id": {
            "N": "101"
          }
        },
        "StreamViewType": "NEW_AND_OLD_IMAGES"
      },
      "awsRegion": "us-west-2",
      "eventName": "MODIFY",
      "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525",
      "eventSource": "aws:dynamodb"
    }
  ]}
```