

# 将更改数据捕获用于 DynamoDB Streams
<a name="Streams"></a>

 DynamoDB Streams 可在任何 DynamoDB 表中捕获按时间排序的项目级修改序列，并将这类信息存储在日志中长达 24 个小时。应用程序可访问此日志，并在数据项目修改前后近乎实时地查看所显示的数据项目。

 静态加密会加密 DynamoDB 流中的数据。有关更多信息，请参阅 [静态 DynamoDB 加密](EncryptionAtRest.md)。

*DynamoDB 流*是一种有关 DynamoDB 表中的项目更改的有序信息流。当您对表启用流时，DynamoDB 将捕获有关对表中的数据项目进行的每项修改的信息。

每当应用程序在表中创建、更新或删除项目时，DynamoDB Streams 都将编写一条具有已修改项目的主键属性的流记录。*流记录*包含有关对 DynamoDB 表中的单个项目所做的数据修改的信息。您可以配置流，以便流记录捕获其他信息，例如已修改项目的“前”和“后”图像。

DynamoDB Streams 可以帮助确保：
+ 每个流记录仅在流中显示一次。
+ 对于 DynamoDB 表中修改的每个项目，流记录将按照对该项目进行的实际修改的顺序显示。

DynamoDB Streams 近乎实时编写流记录，以便您能构建使用这些流并根据内容采取操作的应用程序。

**Topics**
+ [DynamoDB Streams 的端点](#Streams.Endpoints)
+ [启用流](#Streams.Enabling)
+ [读取和处理流](#Streams.Processing)
+ [DynamoDB Streams 和生存时间](time-to-live-ttl-streams.md)
+ [使用 DynamoDB Streams Kinesis Adapter 处理流记录](Streams.KCLAdapter.md)
+ [DynamoDB Streams 低级 API：Java 示例](Streams.LowLevel.Walkthrough.md)
+ [DynamoDB Streams 和 AWS Lambda 触发器](Streams.Lambda.md)
+ [DynamoDB Streams 和 Apache Flink](StreamsApacheFlink.xml.md)

## DynamoDB Streams 的端点
<a name="Streams.Endpoints"></a>

AWS 为 DynamoDB 和 DynamoDB Streams 维护单独的端点。要使用数据库表和索引，您的应用程序必须访问 DynamoDB 端点。要读取和处理 DynamoDB Streams 记录，您的应用程序必须访问相同区域内的 DynamoDB Streams 端点。

DynamoDB Streams 提供两组端点。它们是：
+ **仅 IPv4 的端点**：使用 `streams.dynamodb.<region>.amazonaws.com` 命名约定的端点。
+ **双栈端点**：兼容 IPv4 和 IPv6 并遵循 `streams-dynamodb.<region>.api.aws` 命名约定的新端点。

**注意**  
有关 DynamoDB 和 DynamoDB Streams 区域和端点的完整列表，请参阅《AWS 一般参考》**中的[区域和端点](https://docs.aws.amazon.com/general/latest/gr/rande.html)。

AWS SDK 为 DynamoDB 和 DynamoDB Streams 提供单独的客户端。根据您的要求，您的应用程序可以访问 DynamoDB 端点，DynamoDB Streams 端点或同时访问二者。要连接到两个端点，您的应用程序必须实例化两个客户端 — 一个用于 DynamoDB，另一个用于 DynamoDB Streams。

## 启用流
<a name="Streams.Enabling"></a>

使用 AWS CLI 或某个 AWS SDK 创建新表时，可以在新表上启用流。还可以对现有表启用或禁用流，或更改流设置。DynamoDB Streams 可异步执行操作，因此在启用流的情况下不会影响表的性能。

管理 DynamoDB Streams 最简单的方法是使用 AWS 管理控制台。

1. 登录 AWS 管理控制台，并打开 DynamoDB 控制台：[https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)。

1. 在 DynamoDB 控制台控制面板上，选择**表**，然后选择现有表。

1. 选择**导出和流式传输**选项卡。

1. 在 **DynamoDB 流详细信息**部分，选择**开启**。

1. 在**开启 DynamoDB 流**页面中，选择在修改表中的数据时将写入流中的信息：
   + **仅键** — 仅所修改项目的键属性。
   + **新映像** — 修改后的整个项目。
   + **旧映像** — 修改前的整个项目。
   + **新旧映像** — 项目的新旧映像。

   根据需要进行设置后，选择**开启流**。

1. （可选）要禁用现有流，请选择 **DynamoDB 流详细信息**下的**关闭**。

您还可以使用 `CreateTable` 或 `UpdateTable` API 操作来启用或修改流。`StreamSpecification` 参数确定如何配置流：
+ `StreamEnabled` — 指定对表启用 (`true`) 或禁用 (`false`) 流。
+ `StreamViewType` — 指定在修改表中的数据时将写入流中的信息：
  + `KEYS_ONLY` — 仅所修改项目的键属性。
  + `NEW_IMAGE` — 整个项目在修改后的显示。
  + `OLD_IMAGE` — 整个项目在修改前的显示。
  + `NEW_AND_OLD_IMAGES` — 项目的新旧映像。

您可以随时启用或禁用流。但是，如果您尝试在已有流的表上启用流，则会收到 `ValidationException`。如果您尝试在没有流的表上禁用流，也会收到 `ValidationException`。

当您将 `StreamEnabled` 设置为 `true` 时，DynamoDB 将创建一个新流，并为其分配一个唯一的流描述符。如果您在表上禁用然后重新启用流，则将创建一个具有不同流描述符的新流。

每个流均由一个 Amazon 资源名称（ARN）进行唯一标识。以下是名为 `TestTable` 的 DynamoDB 表中一个流的示例 ARN。

```
arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
```

要确定表的最新流描述符，请发出一个 DynamoDB `DescribeTable` 请求并在响应中查找 `LatestStreamArn` 元素。

**注意**  
一旦设置了流，就无法编辑 `StreamViewType`。如果您需要在设置流后对其进行更改，则必须禁用当前流并创建一个新的流。

## 读取和处理流
<a name="Streams.Processing"></a>

要读取和处理流，您的应用程序必须连接到 DynamoDB Streams 端点并发出 API 请求。

流由*流记录* 构成。每条流记录均代表流所在的 DynamoDB 表中的一个数据修改。每条流记录均分配有一个序列号，该序列号反映了将记录发布至流的顺序。

流记录将组织到群组或*分区* 中。每个分区可充当多条流记录的容器，并包含访问和迭代这些记录所需的信息。分区中的流记录将在 24 小时后自动删除。

分区是临时的：将根据需要自动创建和删除它们。此外，任何分区均可拆分为多个新分区；这也是自动进行的。（请注意，父分片还可能只有一个子分片。） 分区可能会在响应其父表上的高级写入活动时拆分，以便应用程序可以并行处理来自多个分区的记录。

如果您禁用流，将关闭已打开的分区。流中的数据在 24 小时内可继续读取。

由于分区存在沿袭 (父分区和子分区)，应用程序必须始终先处理父分区，然后再处理子分区。这可帮助确保流记录也会按正确顺序进行处理。（如果您使用 DynamoDB Streams Kinesis Adapter，则会为您进行处理。您的应用程序按正确顺序处理分片和流记录。它自动处理新分片或过期分片，以及在应用程序运行期间拆分的分片。有关更多信息，请参阅[使用 DynamoDB Streams Kinesis Adapter 处理流记录](Streams.KCLAdapter.md)。）

下图显示流、流中的分区和分区中的流记录之间的关系。

![\[DynamoDB Streams 结构。表示数据修改的流记录以分片的形式加以组织。\]](http://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/images/streams-terminology.png)


**注意**  
如果您执行的 `PutItem` 或 `UpdateItem` 操作不更改项目中的任何数据，则 DynamoDB Streams *不会*为该操作编写流记录。

要访问流和处理其中的流记录，您必须执行以下操作：
+ 确定您要访问的流的唯一 ARN。
+ 确定流中的哪些分片包含您感兴趣的流记录。
+ 访问分片并检索所需的流记录。

**注意**  
从同一流的分片中同时读取的进程最多不得超过 2 个。读取器超过 2 个的分片可能会受到限制。

DynamoDB Streams API 提供以下操作以供应用程序使用：
+  `[ListStreams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)` — 返回当前账户和端点的流描述符列表。(可选) 您可以只请求特定表名称的流描述符。
+ `[DescribeStream](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)`：返回有关流的信息，包括流的当前状态、其 Amazon 资源名称（ARN）、其分片的构成及其相应的 DynamoDB 表。您可以选择使用 `ShardFilter` 字段来检索与父分片关联的现有子分片。
+ `[GetShardIterator](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)` — 返回一个描述分片中位置的*分片迭代器*。您可以请求该迭代器提供对流中最旧的点、最新的点或某个特定点的访问权。
+ `[GetRecords](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)` — 返回来自给定分片中的流记录。您必须提供从 `GetShardIterator` 请求中返回的分区迭代器。

有关这些 API 操作的完整描述（包括示例请求和响应），请参阅[Amazon DynamoDB Streams API 参考](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html)。

### 分片发现
<a name="Streams.ShardDiscovery"></a>



使用两种强有力的方法在您的 DynamoDB 流中发现新的分片。作为 Amazon DynamoDB Streams 用户，您可以通过两种有效的方法来跟踪和识别新的分片：

**轮询整个流拓扑**  
使用 `DescribeStream` API 定期对流进行轮询。这将返回流中的所有分片，包括已创建的任何新分片。通过比较一段时间内的结果，您可以检测到新添加的分片。

**发现子分片**  
使用 `DescribeStream` API 及 `ShardFilter` 参数来查找分片的子集。通过在请求中指定父分片，DynamoDB Streams 将返回其直接子分片。当只需跟踪分片沿袭而不需要扫描整个流时，这种方法很有用。  
使用来自 DynamoDB Streams 的数据的应用程序可以使用此 `ShardFilter` 参数，高效地从读取已关闭的分片过渡到其子分片，从而避免重复调用 `DescribeStream` API 来检索和遍历所有已关闭和打开的分片的分片映射。这有助于在父分片关闭后快速发现子分片，从而提高流处理应用程序的响应速度和成本效益。

这两种方法都能让您随时掌握 DynamoDB Streams 不断演变的结构，确保您不会错过任何关键的数据更新或分片修改。

### DynamoDB Streams 的数据留存限制
<a name="Streams.DataRetention"></a>

DynamoDB Streams 中所有数据的生命周期为 24 小时。您可以检索和分析任意给定表过去 24 小时的活动。但是，24 小时之前的数据可能会随时被修剪（删除）。

如果您对某个表禁用一个流，该流中的数据仍在 24 小时内可读。此时间过后，数据将过期，并且流记录将自动被删除。没有手动删除现有流的机制。您必须等待直至保留期限过期（24 小时），然后将删除所有流记录。

# DynamoDB Streams 和生存时间
<a name="time-to-live-ttl-streams"></a>

您可以通过在表中启用 Amazon DynamoDB Streams 并处理已过期项目的流记录来备份或者处理按[生存时间](TTL.md)（TTL）删除的项目。有关更多信息，请参阅 [读取和处理流](Streams.md#Streams.Processing)。

流记录包含用户身份字段`Records[<index>].userIdentity`。

在过期后被生存时间过程删除的项目包含以下字段：
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**注意**  
在全局表中使用 TTL 时，执行 TTL 的区域将设置 `userIdentity` 字段。复制删除操作时，不会在其它区域设置此字段。

以下 JSON 显示单个流记录的相关部分。

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## 使用 DynamoDB Streams 和 Lambda 归档已删除 TTL 的项目
<a name="streams-archive-ttl-deleted-items"></a>

结合使用 [DynamoDB 生存时间（TTL）](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html)、[DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) 和 [AWS Lambda](https://aws.amazon.com/lambda/) 有助于简化数据归档、降低 DynamoDB 存储成本并降低代码复杂性。使用 Lambda 作为流使用者提供了许多优势，最明显的是与 Kinesis Client Library (KCL) 等其他使用者相比，降低了成本。当通过 Lambda 来使用事件时，对 DynamoDB 流的 `GetRecords` API 调用不向您收费，并且 Lambda 可以通过识别流事件中的 JSON 模式来提供事件筛选。借助事件模式内容筛选，您可以定义多达五个不同的筛选条件来控制将哪些事件发送到 Lambda 进行处理。这有助于减少对 Lambda 函数的调用、简化代码并降低总体成本。

尽管 DynamoDB Streams 包含所有数据修改，例如 `Create`、`Modify` 和 `Remove` 操作，但这可能导致不必要地调用归档 Lambda 函数。例如，假设一个每小时有 200 万项数据修改的表流入流中，但其中不到 5％ 的数据修改是将在 TTL 流程中过期而需要归档的项目删除。使用 [Lambda 事件源筛选条件](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)，Lambda 函数每小时只调用 100,000 次。事件筛选的结果是，您只需为所需的调用付费。在没有事件筛选的情况下，您需要为获得的 200 万次调用付费。

事件筛选应用于 [Lambda 事件源映射](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html)，它是一个从选定事件（DynamoDB 流）读取并调用 Lambda 函数的资源。在下图中，您可以看到 Lambda 函数如何通过流和事件筛选条件使用已删除生存时间的项目。

![\[通过 TTL 流程删除的项目会启动使用流和事件筛选条件的 Lambda 函数。\]](http://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### DynamoDB 生存时间事件筛选条件模式
<a name="ttl-event-filter-pattern"></a>

将以下 JSON 添加到源映射[筛选标准](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html)仅允许对已删除 TTL 的项目调用 Lambda 函数：

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### 创建 AWS Lambda 事件源映射
<a name="create-event-source-mapping"></a>

使用以下代码段创建筛选的事件源映射，您可以将其连接到表的 DynamoDB 流。每个代码块都包括事件筛选条件模式。

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------

# 使用 DynamoDB Streams Kinesis Adapter 处理流记录
<a name="Streams.KCLAdapter"></a>

使用 Amazon Kinesis Adapter 是使用来自 Amazon DynamoDB 的流的建议方法。DynamoDB Streams API 有意与 Kinesis Data Streams 的 API 类似。在这两种服务中，数据流都由分片组成，分片是流记录的容器。这两种服务的 API 都包含 `ListStreams`、`DescribeStream`、`GetShards` 和 `GetShardIterator` 操作。（虽然这些 DynamoDB Streams 操作与 Kinesis Data Streams 中的对应操作类似，但它们并不完全相同。）

作为 DynamoDB Streams 用户，您可以使用 KCL 中找到的设计模式来处理 DynamoDB Streams 分片和流记录。若要执行此操作，请使用 DynamoDB Streams Kinesis Adapter。Kinesis Adapter 实现 Kinesis Data Streams 接口，以便 KCL 可用于使用和处理来自 DynamoDB Streams 的记录。有关如何设置和安装 DynamoDB Streams Kinesis Adapter 的说明，请参阅 [GitHub 存储库](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)。

您可以使用 Kinesis 客户端库 (KCL) 为 Kinesis Data Streams 编写应用程序。KCL 提供低级 Kinesis Data Streams API 之上的有用抽象来简化编码。有关 KCL 的更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[使用 Kinesis 客户端库开发使用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。

DynamoDB 建议将 KCL 版本 3.x 与适用于 Java 的 AWS SDK v2.x 一起使用。在过渡期间，当前 DynamoDB Streams Kinesis Adapter 版本 1.x 与 AWS SDK for 适用于 Java 的 AWS SDK v1.x 将按照 [AWS SDK 和工具维护政策](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)，在其整个生命周期内继续按预期得到全面支持。

**注意**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日终止支持。我们强烈建议您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问 GitHub 上的 [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) 页面。有关最新 KCL 版本的信息，请参阅 [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅“从 KCL 1.x 迁移到 KCL 3.x”。

以下图表显示了这些库之间的交互方式。

![\[通过 DynamoDB Streams、Kinesis Data Streams 和 KCL 之间的交互处理 DynamoDB Streams 记录。\]](http://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


有了 DynamoDB Streams Kinesis Adapter，您可以开始针对 KCL 接口进行开发，使 API 调用无缝定向到 DynamoDB Streams 端点。

应用程序启动后，调用 KCL 来实例化工作进程。必须为工作进程提供应用程序的配置信息，如流描述符和 AWS 凭证，以及您提供的记录处理器类的名称。在记录处理器中运行代码时，工作进程执行以下任务：
+ 连接到流
+ 枚举流中的分片
+ 检查并枚举流中已关闭父分片的子分片
+ 协调与其他工作程序的分片关联（如果有）
+ 为其管理的每个分片实例化记录处理器
+ 从流中提取记录
+ 在高吞吐量期间扩展 GetRecords API 调用速率（如果配置了追赶模式）
+ 将记录推送到对应的记录处理器
+ 对已处理记录进行检查点操作
+ 在工作程序实例计数更改时均衡分片与工作程序的关联
+ 在分片被拆分时平衡分片与工作程序的关联

KCL 适配器支持追赶模式，这是一种自动调整调用速率的功能，用于处理临时增加的吞吐量。当流处理滞后超过可配置的阈值（默认为一分钟）时，追赶模式会按可配置值（默认 3 倍）扩展 GetRecords API 调用频率，以更快地检索记录，然后在滞后下降后恢复正常。这在高吞吐量时段非常有用，在这段时间里，DynamoDB 写入活动可能会使用默认轮询速率让使用者不堪重负。可以通过 `catchupEnabled` 配置参数启用追赶模式（默认为 false）。

**注意**  
有关此处列出的 KCL 概念的说明，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[使用 Kinesis 客户端库开发使用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。  
有关将流与 AWS Lambda 配合使用的更多信息，请参阅 [DynamoDB Streams 和 AWS Lambda 触发器](Streams.Lambda.md)

# 从 KCL 1.x 迁移到 KCL 3.x
<a name="streams-migrating-kcl"></a>

## 概述
<a name="migrating-kcl-overview"></a>

本指南提供有关将使用者应用程序从 KCL 1.x 迁移到 KCL 3.x 的说明。由于 KCL 1.x 和 KCL 3.x 之间的架构差异，迁移需要更新多个组件以确保兼容性。

与 KCL 3.x 相比，KCL 1.x 使用不同的类和接口。必须先将记录处理器、记录处理器工厂和工作线程类迁移到 KCL 3.x 兼容格式，然后按照将 KCL 1.x 迁移到 KCL 3.x 的迁移步骤进行操作。

## 迁移步骤
<a name="migration-steps"></a>

**Topics**
+ [步骤 1：迁移记录处理器](#step1-record-processor)
+ [步骤 2：迁移记录处理器工厂](#step2-record-processor-factory)
+ [步骤 3：迁移工作线程](#step3-worker-migration)
+ [步骤 4：KCL 3.x 配置概述和建议](#step4-configuration-migration)
+ [步骤 5：从 KCL 2.x 迁移到 KCL 3.x](#step5-kcl2-to-kcl3)

### 步骤 1：迁移记录处理器
<a name="step1-record-processor"></a>

以下示例显示了为 KCL 1.x DynamoDB Streams Kinesis Adapter 实现的记录处理器：

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**迁移 RecordProcessor 类**

1. 将接口从 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 和 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` 更改为 `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor`，如下所示：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. 更新 `initialize` 和 `processRecords` 方法的导入语句：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. 使用以下新方法替换 `shutdownRequested` 方法：`leaseLost`、`shardEnded` 和 `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

下面是记录处理器类的更新版本：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**注意**  
DynamoDB Streams Kinesis Adapter 现在使用 SDKv2 记录模型。在 SDKv2 中，复杂的 `AttributeValue` 对象（`BS`、`NS`、`M`、`L`、`SS`）从不会返回 null。使用 `hasBs()`、`hasNs()`、`hasM()`、`hasL()`、`hasSs()` 方法来验证这些值是否存在。

### 步骤 2：迁移记录处理器工厂
<a name="step2-record-processor-factory"></a>

记录处理器工厂负责在获得租约时创建记录处理器。下面是 KCL 1.x 工厂的示例：

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**迁移到 `RecordProcessorFactory`**
+ 将已实施的接口从 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` 更改为 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`，如下所示：

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

下面是 3.0 中的记录处理器工厂的示例：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### 步骤 3：迁移工作线程
<a name="step3-worker-migration"></a>

在 KCL 版本 3.0 中，名为**调度器**的新类取代了**工作线程**类。下面是 KCL 1.x 工作线程的示例：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**迁移工作程序**

1. 将 `import` 类的 `Worker` 语句更改为 `Scheduler` 和 `ConfigsBuilder` 类的导入语句。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 导入 `StreamTracker` 并将 `StreamsWorkerFactory` 的导入更改为 `StreamsSchedulerFactory`。

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. 选择从中启动应用程序的位置。它可以为 `TRIM_HORIZON` 或 `LATEST`。

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. 创建一个 `StreamTracker` 实例。

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. 创建 `AmazonDynamoDBStreamsAdapterClient` 对象。

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. 创建 `ConfigsBuilder` 对象。

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 使用 `ConfigsBuilder` 创建 `Scheduler`，如以下示例所示：

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**重要**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 设置在 KCL v3 与 KCL v1 的 DynamoDB Streams Kinesis Adapter 之间保持兼容性，而未在 KCL v2 和 v3 之间保持兼容性。

### 步骤 4：KCL 3.x 配置概述和建议
<a name="step4-configuration-migration"></a>

有关 KCL 1.x 之后引入的与 KCL 3.x 相关的配置的详细描述，请参阅 [KCL configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) 和 [KCL migration client configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)。

**重要**  
在 KCL 3.x 及更高版本中，我们建议不要直接创建 `checkpointConfig`、`coordinatorConfig`、`leaseManagementConfig`、`metricsConfig`、`processorConfig` 和 `retrievalConfig` 的对象，而是使用 `ConfigsBuilder` 设置配置，来避免出现调度器初始化问题。`ConfigsBuilder` 提供了更灵活且更易于维护的方式配置 KCL 应用程序。

#### 在 KCL 3.x 中具有更新默认值的配置
<a name="kcl3-configuration-overview"></a>

`billingMode`  
在 KCL 版本 1.x 中，`billingMode` 的默认值设置为 `PROVISIONED`。但在 KCL 版本 3.x 中，默认 `billingMode` 为 `PAY_PER_REQUEST`（按需模式）。我们建议您对租约表使用按需容量模式，以便根据您的使用情况自动调整容量。有关对租约表使用预置容量的指导，请参阅 [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)。

`idleTimeBetweenReadsInMillis`  
在 KCL 版本 1.x 中，`idleTimeBetweenReadsInMillis` 的默认值设置为 1000（或 1 秒）。KCL 版本 3.x 将 i`dleTimeBetweenReadsInMillis` 的默认值设置为 1500（或 1.5 秒），但 Amazon DynamoDB Streams Kinesis Adapter 将默认值改写为 1000（或 1 秒）。

#### KCL 3.x 中的新配置
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
此配置定义在新发现的分片开始处理之前的时间间隔，计算方法为 1.5 × `leaseAssignmentIntervalMillis`。如果未显式配置此设置，则时间间隔默认为 1.5 × `failoverTimeMillis`。处理新分片包括扫描租约表并在租约表上查询全局二级索引（GSI）。降低 `leaseAssignmentIntervalMillis` 会增加这些扫描和查询操作的频率，从而导致 DynamoDB 成本更高。建议将此值设置为 2000（即 2 秒），以尽可能减少处理新分片的延迟。

`shardConsumerDispatchPollIntervalMillis`  
此配置定义了分片使用者用于触发状态转换的连续轮询之间的间隔。在 KCL 版本 1.x 中，此行为由 `idleTimeInMillis` 参数控制，该参数未作为可配置的设置公开。在 KCL 版本 3.x 中，我们建议将此配置设置为与 KCL 版本 1.x 设置中用于 ` idleTimeInMillis` 的值相匹配。

### 步骤 5：从 KCL 2.x 迁移到 KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

为确保平稳过渡并与最新的 Kinesis Client Library（KCL）版本兼容，请按照迁移指南的 [upgrading from KCL 2.x to KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics) 说明中的步骤 5-8 进行操作。

有关常见的 KCL 3.x 故障排除问题，请参阅 [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)。

# 回滚至先前 KCL 版本
<a name="kcl-migration-rollback"></a>

本主题介绍如何将使用者应用程序回滚到先前 KCL 版本。回滚过程由两个步骤组成：

1. 运行 [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 重新部署以前的 KCL 版本代码。

## 步骤 1：运行 KCL 迁移工具
<a name="kcl-migration-rollback-step1"></a>

当需要回滚到先前 KCL 版本时，必须运行 KCL 迁移工具。此工具可执行两个重要任务：
+ 它在 DynamoDB 中的租约表上移除一个名为工作线程指标表的元数据表和全局二级索引。这些构件由 KCL 3.x 创建，但在回滚到先前版本时并不需要。
+ 它使所有工作线程均在与 KCL 1.x 兼容的模式下运行，并开始使用先前 KCL 版本中使用的负载均衡算法。如果 KCL 3.x 中的新负载均衡算法存在问题，这将立即缓解问题。

**重要**  
DynamoDB 中的协调器状态表必须存在，并且在迁移、回滚和前滚过程中不得删除。

**注意**  
重要的是，使用者应用程序中的所有工作线程在给定时间均使用相同的负载均衡算法。KCL 迁移工具可确保 KCL 3.x 使用者应用程序中的所有工作线程都切换到 KCL 1.x 兼容模式，以便在应用程序回滚到先前 KCL 版本期间，所有工作线程都运行相同的负载均衡算法。

您可以在 [KCL GitHub 存储库](https://github.com/awslabs/amazon-kinesis-client/tree/master)的 scripts 目录中下载 [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。从具有相应权限的工作线程或主机运行脚本，以写入协调器状态表、工作线程指标表和租约表。确保为 KCL 使用者应用程序配置了适当的 [IAM permissions](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html)。使用指定的命令对每个 KCL 应用程序仅运行一次脚本：

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### 参数
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
将*区域*替换为您的 AWS 区域。

`--application_name`  
如果您为 DynamoDB 元数据表（租约表、协调器状态表和工作线程指标表）使用默认名称，则需要此参数。如果您为这些表指定了自定义名称，则可以忽略此参数。将 *applicationName* 替换为实际的 KCL 应用程序名称。如果未提供自定义名称，该工具将使用此名称来派生默认表名称。

`--lease_table_name`  
如果您在 KCL 配置中为租约表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 *leaseTableName* 替换为您为租约表指定的自定义表名称。

`--coordinator_state_table_name`  
如果您在 KCL 配置中为协调器状态表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 *coordinatorStateTableName* 替换为您为协调器状态表指定的自定义表名称。

`--worker_metrics_table_name`  
如果您在 KCL 配置中为工作线程指标表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 *workerMetricsTableName* 替换为您为工作线程指标表指定的自定义表名称。

## 步骤 2：使用先前 KCL 版本重新部署代码
<a name="kcl-migration-rollback-step2"></a>

**重要**  
在 KCL 迁移工具生成的输出中提及版本 2.x 的任何内容都应解释为指的是 KCL 版本 1.x。运行该脚本不会执行完全回滚，它只会将负载均衡算法切换到 KCL 版本 1.x 中使用的算法。

运行 KCL 迁移工具来进行回滚后，您将看到以下消息之一：

消息 1  
“回滚已完成。应用程序正在运行 2x 兼容功能。请使用先前 KCL 版本部署代码，以回滚到先前的应用程序二进制文件。”  
**所需操作：**这意味着工作线程正在 KCL 1.x 兼容模式下运行。使用先前 KCL 版本将代码重新部署到工作线程。

消息 2  
“回滚已完成。KCL 应用程序正在运行 3x 功能，并将回滚到 2x 兼容功能。如果您在短时间内看不到缓解，请使用先前 KCL 版本部署代码，回滚到先前的应用程序二进制文件。”  
**所需操作：**这意味着工作线程正在 KCL 3.x 模式下运行，KCL 迁移工具已将所有工作线程切换到 KCL 1.x 兼容模式。使用先前 KCL 版本将代码重新部署到工作线程。

消息 3  
“应用程序已经回滚。任何可以删除的 KCLv3 资源都被清理以免产生费用，直至应用程序可以通过迁移进行前滚。”  
**所需操作：**这意味着工作线程已经回滚到在 KCL 1.x 兼容模式下运行。使用先前 KCL 版本将代码重新部署到工作线程。

# 回滚后前滚到 KCL 3.x
<a name="kcl-migration-rollforward"></a>

本主题介绍如何在回滚后将使用者应用程序前滚到 KCL 3.x。当您需要前滚时，必须完成一个由两步组成的过程：

1. 运行 [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 使用 KCL 3.x 部署代码。

## 步骤 1：运行 KCL 迁移工具
<a name="kcl-migration-rollforward-step1"></a>

使用以下命令运行 KCL 迁移工具，以便前滚到 KCL 3.x：

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### 参数
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
将*区域*替换为您的 AWS 区域。

`--application_name`  
如果您为协调器状态表使用默认名称，则需要此参数。如果您已为协调器状态表指定了自定义名称，则可以忽略此参数。将 *applicationName* 替换为实际的 KCL 应用程序名称。如果未提供自定义名称，该工具将使用此名称来派生默认表名称。

`--coordinator_state_table_name`  
如果您在 KCL 配置中为协调器状态表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 *coordinatorStateTableName* 替换为您为协调器状态表指定的自定义表名称。

在前滚模式下运行迁移工具后，KCL 会创建 KCL 3.x 所需的以下 DynamoDB 资源：
+ 租约表上的全局二级索引
+ 工作线程指标表

## 步骤 2：使用 KCL 3.x 部署代码
<a name="kcl-migration-rollforward-step2"></a>

运行 KCL 迁移工具以进行前滚后，使用 KCL 3.x 将代码部署到工作线程。要完成迁移，请参阅 [Step 8: Complete the migration](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish)。

# 演练：DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

本节是使用 Amazon Kinesis Client Library 和 Amazon DynamoDB Streams Kinesis Adapter 的 Java 应用程序的演练。此应用程序演示了数据复制示例，其中将一个表中的写入活动应用于另一个表，并且两个表中的内容保持同步。有关源代码，请参阅 [完成程序：DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)。

此程序执行以下操作：

1. 创建名为 `KCL-Demo-src` 和 `KCL-Demo-dst` 的两个 DynamoDB 表。每个表上均启用一个流。

1. 通过添加、更新和删除项目在源表中生成更新活动。这会导致数据写入表的流中。

1. 从流中读取记录、将记录重新构造为 DynamoDB 请求并将请求应用于目标表。

1. 扫描源表和目标表，以确保其内容一致。

1. 通过删除表进行清除。

以下各节将描述这些步骤，本演练结尾将显示完整的应用程序。

**Topics**
+ [第 1 步：创建 DynamoDB 表](#Streams.KCLAdapter.Walkthrough.Step1)
+ [第 2 步：在源表中生成更新活动](#Streams.KCLAdapter.Walkthrough.Step2)
+ [第 3 步：处理流](#Streams.KCLAdapter.Walkthrough.Step3)
+ [第 4 步：确保两个表具有相同的内容](#Streams.KCLAdapter.Walkthrough.Step4)
+ [第 5 步：清理](#Streams.KCLAdapter.Walkthrough.Step5)
+ [完成程序：DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## 第 1 步：创建 DynamoDB 表
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

第一步是创建两个 DynamoDB 表，一个源表和一个目标表。源表的流上的 `StreamViewType` 为 `NEW_IMAGE`。这意味着无论何时修改此表中的项目，项目“之后”的映像都将写入到流中。这样一来，流将跟踪表上的所有写入活动。

以下示例显示用于创建两个表的代码。

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## 第 2 步：在源表中生成更新活动
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

下一步是在源表上生成某些写入活动。在此活动发生时，源表的流也会近乎实时更新。

此应用程序通过调用用于写入数据的 `PutItem`、`UpdateItem` 和 `DeleteItem` API 操作的方法来定义帮助程序类。以下代码示例演示如何使用这些方法。

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## 第 3 步：处理流
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

现在，此程序开始处理流。DynamoDB Streams Kinesis Adapter 充当 KCL 和 DynamoDB Streams 端点之间的透明层，以便代码可充分利用 KCL 而不必进行低级 DynamoDB Streams 调用。此程序执行以下任务：
+ 它通过符合 KCL 接口定义的方法 (`StreamsRecordProcessor`、`initialize` 和 `processRecords`) 定义记录处理器类 `shutdown`。`processRecords` 方法包含从源表的流中进行读取以及对目标表进行写入时所需的逻辑。
+ 它定义了记录处理器类的类工厂 (`StreamsRecordProcessorFactory`)。这是使用 KCL 的 Java 程序所需的。
+ 它实例化一个新的 KCL `Worker`，它与类工厂关联。
+ 当记录处理完成时，它会关闭 `Worker`。

或者，在 Streams KCL Adapter 配置中启用追赶模式，以便在流处理滞后超过一分钟（默认值）时，自动将 GetRecords API 调用速率扩展 3 倍（默认值），从而有助于流使用者处理表中的高吞吐量峰值。

要了解有关 KCL 接口定义的详细信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*的[使用 Kinesis 客户端库开发使用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。

以下代码示例演示了 `StreamsRecordProcessor` 中的主循环。`case` 语句基于流记录中显示的 `OperationType` 来确定要执行的操作。

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## 第 4 步：确保两个表具有相同的内容
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

此时，源表和目标表的内容是同步的。此应用程序针对两个表发送 `Scan` 请求以验证其内容是否实质相同。

`DemoHelper` 类包含调用低级 `ScanTable` API 的 `Scan` 方法。下例说明具体用法。

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## 第 5 步：清理
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

演示完成后，此应用程序将删除源表和目标表。请看下面的代码示例。甚至在删除两个表后，其流也可在自动删除后的最多 24 个小时内保持可用。

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# 完成程序：DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

下文是执行 [演练：DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.md) 所述任务的完整 Java 程序。当您运行该程序时，将显示与以下内容类似的输出。

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**重要**  
 要运行此程序，请确保客户端应用程序可以使用策略来访问 DynamoDB 和 Amazon CloudWatch。有关更多信息，请参阅 [适用于 DynamoDB 的基于身份的策略](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies)。

源代码包括四个 `.java` 文件。要构建此程序，请添加以下依赖项，其中包括作为传递依赖项的 Amazon Kinesis Client Library（KCL）3.x 和适用于 Java 的 AWS SDK v2：

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

源文件为：
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```

# DynamoDB Streams 低级 API：Java 示例
<a name="Streams.LowLevel.Walkthrough"></a>

**注意**  
本页上的代码并不详尽，不会处理使用 Amazon DynamoDB Streams 的所有场景。建议利用 Kinesis Client Library (KCL) 通过 Amazon Kinesis Adapter 使用 DynamoDB 中的流记录，如 [使用 DynamoDB Streams Kinesis Adapter 处理流记录](Streams.KCLAdapter.md) 中所述。

本节包含一个演示 DynamoDB Streams 的实际运用的 Java 程序。此程序执行以下操作：

1. 创建一个启用了流的 DynamoDB 表。

1. 描述此表的流设置。

1. 修改表中的数据。

1. 描述流中的分片。

1. 从分片中读取流记录。

1. 获取子分片并继续读取记录。

1. 清理。

当您运行此程序时，将显示与以下内容类似的输出。

```
Testing Streams Demo
Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id
Waiting for TestTableForStreams to be created...
Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208
Stream enabled: true
Update view type: NEW_AND_OLD_IMAGES

Performing write activities on TestTableForStreams
Processing item 1 of 100
Processing item 2 of 100
Processing item 3 of 100
...
Processing item 100 of 100
Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},}
    Shard iterator: EjYFEkX2a26eVTWe...
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES)
...
Deleting the table...
Table StreamsDemoTable deleted.
Demo complete
```

**Example 示例**  

```
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;

public class StreamsLowLevelDemo {


    public static void main(String[] args) {
        final String usage = "Testing Streams Demo";
        try {
            System.out.println(usage);

            String tableName = "StreamsDemoTable";
            String key = "Id";
            System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key);
            Region region = Region.US_WEST_2;
            DynamoDbClient ddb = DynamoDbClient.builder()
                    .region(region)
                    .build();

            DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder()
                    .region(region)
                    .build();
            DescribeTableRequest describeTableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
            TableDescription tableDescription = null;
            try{
                tableDescription = ddb.describeTable(describeTableRequest).table();
            }catch (Exception e){
                System.out.println("Table " + tableName + " does not exist.");
                tableDescription = createTable(ddb, tableName, key);
            }

            // Print the stream settings for the table
            String streamArn = tableDescription.latestStreamArn();
           
            StreamSpecification streamSpec = tableDescription.streamSpecification();
            System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " +
                   streamArn);
            System.out.println("Stream enabled: " + streamSpec.streamEnabled());
            System.out.println("Update view type: " + streamSpec.streamViewType());
            System.out.println();
            // Generate write activity in the table
            System.out.println("Performing write activities on " + tableName);
            int maxItemCount = 100;
            for (Integer i = 1; i <= maxItemCount; i++) {
                System.out.println("Processing item " + i + " of " + maxItemCount);
                // Write a new item
                putItemInTable(key, i, tableName, ddb);
                // Update the item
                updateItemInTable(key, i, tableName, ddb);
                // Delete the item
                deleteDynamoDBItem(key, i, tableName, ddb);
            }

            // Process Stream
            processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName);

            // Delete the table
            System.out.println("Deleting the table...");
            DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                    .tableName(tableName)
                    .build();
            ddb.deleteTable(deleteTableRequest);
            System.out.println("Table " + tableName + " deleted.");
            System.out.println("Demo complete");
            ddb.close();
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        }
    }

    private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) {
        // Get all the shard IDs from the stream. Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;
        do {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamArn(streamArn)
                    .exclusiveStartShardId(lastEvaluatedShardId).build();
            DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest);

            List<Shard> shards = describeStreamResponse.streamDescription().shards();

            // Process each shard on this page

            fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards);

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);

    }

    private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) {
        for (Shard shard : shards) {
            String shardId = shard.shardId();
            System.out.println("Shard: " + shard);

            // Get an iterator for the current shard
            GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
                    .streamArn(streamArn).shardId(shardId)
                    .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();

            GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest);

            String currentShardIter = getShardIteratorResult.shardIterator();

            // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
            // To prevent running the loop until the Shard is sealed, we process only the
            // items that were written into DynamoDB and then exit.
            int processedRecordCount = 0;
            while (currentShardIter != null && processedRecordCount < maxItemCount) {
                // Use the shard iterator to read the stream records
                GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder()
                        .shardIterator(currentShardIter).build();
                GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest);
                List<Record> records = getRecordsResult.records();
                for (Record record : records) {
                    System.out.println("        " + record.dynamodb());
                }
                processedRecordCount += records.size();
                currentShardIter = getRecordsResult.nextShardIterator();
            }
            if (currentShardIter == null){
                System.out.println("Shard has been fully processed. Shard iterator is null.");
                System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards");
                DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder()
                        .streamArn(streamArn)
                        .shardFilter(ShardFilter.builder()
                                .type(ShardFilterType.CHILD_SHARDS)
                                .shardId(shardId).build())
                        .build();
                DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards);
                fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards());
            }
        }
    }

    private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());
        item.put("Message", AttributeValue.builder()
                .s("New Item!")
                .build());
        PutItemRequest request = PutItemRequest.builder()
                .tableName(tableName)
                .item(item)
                .build();
        ddb.putItem(request);
    }

    private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {

        HashMap<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());


        HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>();
        updatedValues.put("Message", AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s("This is an updated item").build())
                .action(AttributeAction.PUT)
                .build());

        UpdateItemRequest request = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(itemKey)
                .attributeUpdates(updatedValues)
                .build();
        ddb.updateItem(request);
    }

    public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) {
        HashMap<String, AttributeValue> keyToGet = new HashMap<>();
        keyToGet.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());

        DeleteItemRequest deleteReq = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(keyToGet)
                .build();
        ddb.deleteItem(deleteReq);
    }

    public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) {
        DynamoDbWaiter dbWaiter = ddb.waiter();
        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType("NEW_AND_OLD_IMAGES")
                .build();
        CreateTableRequest request = CreateTableRequest.builder()
                .attributeDefinitions(AttributeDefinition.builder()
                        .attributeName(key)
                        .attributeType(ScalarAttributeType.S)
                        .build())
                .keySchema(KeySchemaElement.builder()
                        .attributeName(key)
                        .keyType(KeyType.HASH)
                        .build())
                .billingMode(BillingMode.PAY_PER_REQUEST) //  DynamoDB automatically scales based on traffic.
                .tableName(tableName)
                .streamSpecification(streamSpecification)
                .build();

        TableDescription newTable;
        try {
            CreateTableResponse response = ddb.createTable(request);
            DescribeTableRequest tableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
                    
            System.out.println("Waiting for " + tableName + " to be created...");

            // Wait until the Amazon DynamoDB table is created.
            WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
            waiterResponse.matched().response().ifPresent(System.out::println);
            newTable = response.tableDescription();
            return newTable;

        } catch (DynamoDbException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return null;
    }



}
```

# DynamoDB Streams 和 AWS Lambda 触发器
<a name="Streams.Lambda"></a>

Amazon DynamoDB 与 AWS Lambda 集成，使您能够创建*触发器*，自动响应 DynamoDB Streams 中的事件的代码片段。利用触发器，您可以创建应对 DynamoDB 表中的数据修改的应用程序。

**Topics**
+ [教程 1：对 Amazon DynamoDB 使用筛选器处理所有事件，以及对 AWS Lambda 使用 AWS CLI 进行处理](Streams.Lambda.Tutorial.md)
+ [教程 2：对 DynamoDB 和 Lambda 使用筛选器来处理部分事件。](Streams.Lambda.Tutorial2.md)
+ [将 DynamoDB Streams 与 Lambda 配合使用的最佳实践](Streams.Lambda.BestPracticesWithDynamoDB.md)

如果您在表中启用 DynamoDB Streams，则可以将流 Amazon 资源名称（ARN）与您编写的 AWS Lambda 函数关联起来。然后，对该 DynamoDB 表执行的所有变更操作都可以作为流中的项目捕获。例如，您可以设置触发器。这样，在修改了表中的某个项目时，该表的流中会立即出现一条新记录。

**注意**  
如果您将两个以上的 Lambda 函数订阅到一个 DynamoDB 流，则可能会发生读取节流。

[AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) 服务每秒轮询四次流来查找新记录。在有新的流记录可用时，将同步调用您的 Lambda 函数。同一个 DynamoDB 流上最多只能有两个 Lambda 函数订阅。如果您将两个以上的 Lambda 函数订阅到同一个 DynamoDB 流，则可能会发生读取节流。

Lambda 函数可以发送通知、启动工作流或执行您指定的任意操作。您可以编写一个仅将每个流记录复制到持久性存储（如 Amazon S3 文件网关 (Amazon S3)）中的 Lambda 函数，从而为您表中的写入活动创建永久审计跟踪。或者，假设您有一个写入到 `GameScores` 表的移动游戏应用程序。每当更新 `TopScore` 表的 `GameScores` 属性时，一个相应的流记录将被写入该表的流。然后，此事件会触发一个 Lambda 函数，该函数会在社交媒体网络上发布一条祝贺消息。此函数也可以编写为忽略以下任何流记录：不是对 `GameScores` 的更新，或者未修改 `TopScore` 属性。

如果您的函数返回错误，则 Lambda 将重试批处理，直到它成功处理或数据过期。还可以将 Lambda 配置为以较小批处理进行重试、限制重试次数、在记录变得过旧时丢弃以及其它选项。

作为性能最佳实践，Lambda 函数需要短时间运行。为避免引入不必要的处理延迟，它也不应执行复杂的逻辑。特别是对于高速流，最好是触发异步后处理 Step Function 工作流，而不是长时间运行的 Lambda 函数。

 您可以通过在 DynamoDB 流上配置基于资源的策略来跨不同的 AWS 账户使用 Lambda 触发器，以授予对 Lambda 函数的跨账户读取权限。要了解有关如何配置流以支持跨账户访问的更多信息，请参阅《DynamoDB 开发人员指南》中的[与跨账户 AWS Lambda 函数共享访问权限](rbac-cross-account-access.md#shared-access-cross-acount-lambda)。

有关 AWS Lambda 的更多信息，请参阅《AWS Lambda 开发人员指南》[https://docs.aws.amazon.com/lambda/latest/dg/](https://docs.aws.amazon.com/lambda/latest/dg/)。

# 教程 1：对 Amazon DynamoDB 使用筛选器处理所有事件，以及对 AWS Lambda 使用 AWS CLI 进行处理
<a name="Streams.Lambda.Tutorial"></a>

 

在本教程中，您将创建 AWS Lambda 触发器以处理来自 DynamoDB 表的流。

**Topics**
+ [第 1 步：创建一个启用了流的 DynamoDB 表](#Streams.Lambda.Tutorial.CreateTable)
+ [第 2 步：创建一个 Lambda 执行角色](#Streams.Lambda.Tutorial.CreateRole)
+ [第 3 步：创建一个 Amazon SNS 主题](#Streams.Lambda.Tutorial.SNSTopic)
+ [第 4 步：创建并测试一个 Lambda 函数](#Streams.Lambda.Tutorial.LambdaFunction)
+ [第 5 步：创建并测试一个触发器](#Streams.Lambda.Tutorial.CreateTrigger)

本教程的场景就是 Woofer 这个简单的社交网络。Woofer 用户使用发送给其他 Woofer 用户的 *bark*（短文本消息）进行通信。下图显示了此应用程序的组件和工作流。

![\[DynamoDB 表、流记录、Lambda 函数和 Amazon SNS 主题的 Woofer 应用程序工作流。\]](http://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. 用户将项目写入 DynamoDB 表 (`BarkTable`)。表中的每个项目代表一个 bark。

1. 写入新的流记录，体现添加到 `BarkTable` 中的新项目。

1. 新的流记录触发 AWS Lambda 函数 (`publishNewBark`)。

1. 如果流记录指示新项目已添加到 `BarkTable`，则 Lambda 函数会从流记录读取数据并将消息发布到 Amazon Simple Notification Service (Amazon SNS) 中的主题。

1. Amazon SNS 主题的订阅者收到消息。（在本教程中，唯一的订阅者是一个电子邮件地址。）

**开始前的准备工作**  
本教程使用 AWS Command Line Interface AWS CLI。如果您尚未配置，请按照 [AWS Command Line Interface 用户指南](https://docs.aws.amazon.com/cli/latest/userguide/)中的说明安装和配置 AWS CLI。

## 第 1 步：创建一个启用了流的 DynamoDB 表
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

在此步骤中，您将创建 DynamoDB 表 (`BarkTable`) 以存储来自 Woofer 用户的所有 bark。主键由 `Username`（分区键）和 `Timestamp`（排序键）组成。这两个属性的类型为字符串。

`BarkTable` 启用了流。在本教程后面的部分中，您通过将 AWS Lambda 函数与流关联来创建触发器。

1. 输入以下命令以创建表。

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. 在输出中，查找 `LatestStreamArn`。

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   记录 `region` 和 `accountID`，因为您在本教程接下来的步骤中需要这些信息。

## 第 2 步：创建一个 Lambda 执行角色
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

在此步骤中，您将创建 AWS Identity and Access Management (IAM) 角色 (`WooferLambdaRole`) 并向其分配权限。此角色将由您在[第 4 步：创建并测试一个 Lambda 函数](#Streams.Lambda.Tutorial.LambdaFunction)中创建的 Lambda 函数使用。

您还将为角色创建策略。策略包含 Lambda 函数在运行时需要的所有权限。

1. 使用以下内容创建名为 `trust-relationship.json` 的文件。

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. 输入以下命令来创建 `WooferLambdaRole`。

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. 使用以下内容创建名为 `role-policy.json` 的文件。（将 `region` 和 `accountID` 替换为您的 AWS 区域和帐户 ID。）

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   策略有四个语句，允许 `WooferLambdaRole` 执行以下操作：
   + 运行 Lambda 函数 (`publishNewBark`)。您将在本教程的后面部分中创建函数。
   + 访问 Amazon CloudWatch Logs Lambda 函数在运行时将诊断信息写入 CloudWatch Logs。
   + 从 `BarkTable` 的 DynamoDB 流读取数据。
   + 向 Amazon SNS 发布消息。

1. 输入以下命令以将策略附加到 `WooferLambdaRole`。

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## 第 3 步：创建一个 Amazon SNS 主题
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

在此步骤中，您将创建 Amazon SNS 主题 (`wooferTopic`) 并使用电子邮件地址订阅该主题。您的 Lambda 函数使用此主题发布来自 Woofer 用户的新 bark。

1. 输入以下命令以创建新 Amazon SNS 主题。

   ```
   aws sns create-topic --name wooferTopic
   ```

1. 输入以下命令以使用电子邮件地址订阅 `wooferTopic`。（使用您的 AWS 区域和账户 ID 替换 `region` 和 `accountID`，并使用有效的电子邮件地址替换 `example@example.com`。）

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. Amazon SNS 将向您的电子邮件地址发送确认邮件。选择该邮件中的**确认订阅**链接以完成订阅过程。

## 第 4 步：创建并测试一个 Lambda 函数
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

在此步骤中，您将创建 AWS Lambda 函数 (`publishNewBark`) 以处理来自 `BarkTable` 的流记录。

`publishNewBark` 函数仅处理与 `BarkTable` 中的新项目对应的流事件。该函数从此类事件读取数据，然后调用 Amazon SNS 以发布该事件。

1. 使用以下内容创建名为 `publishNewBark.js` 的文件。将 `region` 和 `accountID` 替换为您的 AWS 区域和帐户 ID。

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. 创建包含 `publishNewBark.js` 的 zip 文件。如果您有 zip 命令行实用程序，则可以输入以下命令来完成此操作。

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. 当您创建 Lambda 函数时，为 `WooferLambdaRole` 指定您在 [第 2 步：创建一个 Lambda 执行角色](#Streams.Lambda.Tutorial.CreateRole) 中创建的 Amazon 资源名称（ARN）。输入以下命令检索此 ARN。

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   在输出中，查找 `WooferLambdaRole` 的 ARN。

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   输入以下命令以创建 Lambda 函数。将 *roleARN* 替换为 `WooferLambdaRole` 的 ARN。

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. 现在测试 `publishNewBark`，验证它可以正常使用。为此，您将提供类似于来自 DynamoDB Streams 的真实记录的输入。

   使用以下内容创建名为 `payload.json` 的文件。将 `region` 和 `accountID` 替换为您的 AWS 区域和账户 ID。

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   输入以下命令以测试 `publishNewBark` 函数。

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   如果测试成功，您将看到以下输出。

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   此外，`output.txt` 文件将包含以下文本。

   ```
   "Successfully processed 1 records."
   ```

   您还会在数分钟内收到一封新电子邮件。
**注意**  
AWS Lambda 将诊断信息写入 Amazon CloudWatch Logs。如果您的 Lambda 函数出现错误，可以使用这些诊断信息排除故障：  
打开 CloudWatch 控制台：[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。
在导航窗格中，选择**日志**。
选择下列日志组：`/aws/lambda/publishNewBark`
选择最新日志流以查看函数输出（以及错误）。

## 第 5 步：创建并测试一个触发器
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

在 [第 4 步：创建并测试一个 Lambda 函数](#Streams.Lambda.Tutorial.LambdaFunction) 中，您测试了 Lambda 函数以确保它正确运行。在此步骤中，关联 Lambda 函数 (`publishNewBark`) 与事件源（`BarkTable` 流），创建*触发器*。

1. 在创建触发器时，您需要为 `BarkTable` 流指定 ARN。输入以下命令检索此 ARN。

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   在输出中，查找 `LatestStreamArn`。

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. 输入以下命令以创建触发器。使用实际流 ARN 替换 `streamARN`。

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. 测试触发器。键入以下命令以将项目添加到 `BarkTable`。

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   您应在数分钟内收到一封新电子邮件。

1. 打开 DynamoDB 控制台并再将几个项目添加到 `BarkTable`。您必须为 `Username` 和 `Timestamp` 属性指定值。（您还应为 `Message` 指定值，虽然该值并非必需。） 对于添加到 `BarkTable` 中的每个项目，您应收到一封新电子邮件。

   Lambda 函数仅处理您添加到 `BarkTable` 的新项目。如果您在表中更新或删除项目，函数不执行任何操作。

**注意**  
AWS Lambda 将诊断信息写入 Amazon CloudWatch Logs。如果您的 Lambda 函数出现错误，可以使用这些诊断信息排除故障。  
通过以下网址打开 CloudWatch 控制台：[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)。
在导航窗格中，选择**日志**。
选择下列日志组：`/aws/lambda/publishNewBark`
选择最新日志流以查看函数输出（以及错误）。

# 教程 2：对 DynamoDB 和 Lambda 使用筛选器来处理部分事件。
<a name="Streams.Lambda.Tutorial2"></a>

在本教程中，您将创建 AWS Lambda 触发器以处理来自 DynamoDB 表的流中的部分事件。

**Topics**
+ [组合起来 – CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [组合起来 – CDK](#Streams.Lambda.Tutorial2.CDK)

通过 [Lambda 事件筛选](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)，您可以使用筛选表达式来控制 Lambda 将哪些事件发送给函数进行处理。每个 DynamoDB 流最多可以配置 5 个不同的筛选器。如果您使用的是批处理时段，则 Lambda 会对每个新事件应用筛选条件，以确定是否将其包括在当前批处理中。

筛选器通过名为 `FilterCriteria` 的结构来应用。`FilterCriteria` 的 3 个主要属性为 `metadata properties`、`data properties` 和 `filter patterns`。

DynamoDB Streams 事件的示例结构如下所示：

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties` 是事件对象的字段。在 DynamoDB Streams 中，`metadata properties` 是 `dynamodb` 或 `eventName` 这样的字段。

`data properties` 是事件主体的字段。要根据 `data properties` 进行筛选，请确保将它们包含在正确的键内的 `FilterCriteria` 中。对于 DynamoDB 事件源，数据键为 `NewImage` 或 `OldImage`。

最后，筛选条件规则将定义要应用到特定属性的筛选条件表达式。下面是一些示例：


| 比较运算符 | 示例 | 规则语法（部分） | 
| --- | --- | --- | 
|  Null  |  产品类型为 null  |  `{ "product_type": { "S": null } } `  | 
|  Empty  |  产品名称为空  |  `{ "product_name": { "S": [ ""] } } `  | 
|  Equals  |  州为佛罗里达州  |  `{ "state": { "S": ["FL"] } } `  | 
|  And  |  产品的州为佛罗里达州且产品类别为巧克力  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  Or  |  产品的州为佛罗里达州或加利佛尼亚州  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Not  |  产品的州不是佛罗里达州  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  Exists  |  存在自制产品  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  不存在  |  不存在自制产品  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  始于  |  COMPANY 以 PK 开头  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

您最多可以为一个 Lambda 函数指定 5 个事件筛选模式。请注意，这 5 个事件中的每一个都将作为逻辑 OR 进行求值。因此，如果您配置了名为 `Filter_One` 和 `Filter_Two` 的两个筛选条件，则 Lambda 函数将执行 `Filter_One` OR `Filter_Two`。

**注意**  
在[Lambda 事件筛选](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)页面中，有一些用于筛选和比较数值的选项，但不适用于 DynamoDB 筛选事件，因为 DynamoDB 中的数字作为字符串存储。例如 ` "quantity": { "N": "50" }`，由于 `"N"` 属性，我们知道它是一个数字。

## 组合起来 – CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

为了展示事件筛选功能的实际应用，下面提供了一个示例 CloudFormation 模板。此模板将生成一个简单的 DynamoDB 表，带有分区键 PK 和排序键 SK，并启用了 Amazon DynamoDB Streams。它将创建一个 Lambda 函数和一个简单的 Lambda 执行角色，允许将日志写入 Amazon Cloudwatch，并从 Amazon DynamoDB Stream 中读取事件。它还在 DynamoDB Streams 与 Lambda 函数之间添加事件源映射，因此每次在 Amazon DynamoDB Stream 中出现事件时都可以执行该函数。

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

部署此 CloudFormation 模板后，您可以插入以下 Amazon DynamoDB 项目：

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

由于此 CloudFormation 模板中内嵌了简单的 Lambda 函数，您将在 Amazon CloudWatch 日志组中看到 Lambda 函数的事件，如下所示：

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**筛选示例**
+ **仅限与给定州匹配的产品**

此示例修改了 CloudFormation 模板，使其包含一个筛选条件，用于匹配来自佛罗里达州的所有产品，缩写为“FL”。

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

重新部署堆栈后，可以将以下 DynamoDB 项目添加到表中。请注意，它不会出现在 Lambda 函数日志中，因为本示例中的产品来自加利佛尼亚州。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **仅限以 PK 和 SK 中某些值开头的项目**

此示例修改 CloudFormation 模板，使其包含以下条件：

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

请注意 AND 条件要求条件位于模式内，其中键 PK 和 SK 位于同一个表达式中，以逗号分隔。

或者是以 PK 和 SK 开头的某些值，或者来自特定状态。

此示例修改 CloudFormation 模板，使其包含以下条件：

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

请注意，OR 条件是通过在筛选条件部分引入新模式来添加的。

## 组合起来 – CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

以下示例 CDK 项目 Formation 模板介绍了事件筛选功能。在使用此 CDK 项目之前，您需要[安装先决条件](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html)，包括[运行准备脚本](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html)。

**创建 CDK 项目**

首先通过在空目录中调用 `cdk init`，创建一个新的 AWS CDK 项目。

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

`cdk init` 命令使用项目文件夹的名称来命名项目的各种元素，包括类、子文件夹和文件。文件夹名称中的所有连字符都将转换为下划线。否则，该名称应遵循 Python 标识符的格式。例如，名称不应以数字开头，也不能包含空格。

要使用新项目，请激活其虚拟环境。这允许将项目的依赖项安装在本地项目文件夹中，而不是全局安装。

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**注意**  
您可将其视为用于激活虚拟环境的 Mac/Linux 命令。Python 模板包含一个批处理文件 `source.bat`，该文件允许在 Windows 上使用相同的命令。也可以使用传统的 Windows 命令 `.venv\Scripts\activate.bat`。如果您使用 AWS CDK Toolkit v1.70.0 或更早版本来初始化 AWS CDK 项目，则您的虚拟环境位于 `.env` 目录中，而不是 `.venv`。

**基本基础设施**

使用首选文本编辑器打开文件 `./ddb_filters/ddb_filters_stack.py`。此文件在您创建 AWS CDK 项目时自动生成。

接下来，添加函数 `_create_ddb_table` 和 `_set_ddb_trigger_function`。这些函数将在预置模式/按需模式下创建一个 DynamoDB 表，该表带有分区键 PK 和排序键 SK，并且默认启用了 Amazon DynamoDB Streams 以显示新映像和旧映像。

Lambda 函数将存储在文件夹 `lambda` 下的文件 `app.py` 中。此文件将稍后创建。它包含一个环境变量 `APP_TABLE_NAME`，这将成为此堆栈创建的 Amazon DynamoDB 表的名称。在同一个函数中，我们向 Lambda 函数授予流读取权限。最后，它将订阅 DynamoDB Streams 作为 Lambda 函数的事件源。

在 `__init__` 方法中文件的末尾，您将调用相应的构造以在堆栈中初始化它们。对于需要额外组件和服务的较大项目，最好在基础堆栈之外定义这些构造。

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

现在，我们将创建一个非常简单的 Lambda 函数，它将日志输出到 Amazon CloudWatch 中。为此，请创建一个名为 `lambda` 的新文件夹。

```
mkdir lambda
touch app.py
```

使用您常用的文本编辑器，将以下内容添加到 `app.py` 文件中：

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

确保您位于 `/ddb_filters/` 文件夹中，键入以下命令创建示例应用程序：

```
cdk deploy
```

在某个时候，系统会要求您确认是否要部署解决方案。键入 `Y` 接受更改。

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

部署更改后，打开 AWS 控制台并向表中添加一个项目。

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

现在，CloudWatch 日志应该包含此条目中的所有信息。

**筛选示例**
+ **仅限与给定州匹配的产品**

打开文件 `ddb_filters/ddb_filters/ddb_filters_stack.py` 并进行修改，使其包含与所有等于“FL”的产品相匹配的筛选条件。可以在第 45 行的 `event_subscription` 下方对其进行修改。

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **仅限以 PK 和 SK 中某些值开头的项目**

修改 Python 脚本以包含以下条件：

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **或者是以 PK 和 SK 开头的某些值，或者来自特定状态。**

修改 Python 脚本以包含以下条件：

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

请注意，向筛选数组添加更多元素时，将会添加 OR 条件。

**清除**

在工作目录的底部找到筛选器堆栈，然后执行 `cdk destroy`。系统将要求您确认删除资源：

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```

# 将 DynamoDB Streams 与 Lambda 配合使用的最佳实践
<a name="Streams.Lambda.BestPracticesWithDynamoDB"></a>

AWS Lambda 函数在*容器*中运行，这是与其他函数隔离的执行环境。在您首次运行某个函数时，AWS Lambda 创建新容器并开始执行该函数的代码。

Lambda 函数具有对每个调用执行一次的*处理程序*。该处理程序包含函数的主业务逻辑。例如，显示在 [第 4 步：创建并测试一个 Lambda 函数](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) 中的 Lambda 函数具有可处理 DynamoDB 流中记录的处理程序。

您也可以提供仅运行一次的初始化代码，在创建容器之后，但在 AWS Lambda 首次执行处理程序之前运行。[第 4 步：创建并测试一个 Lambda 函数](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction) 中显示的 Lambda 函数具有导入适用于 Node.js 中的 JavaScript 的 SDK，然后为 Amazon SNS 创建客户端的初始化代码。这些对象只应在处理程序外部定义一次。

执行函数之后，AWS Lambda 可选择为后续的函数调用重用容器。在这种情况下，您的函数处理程序可能能够重用您在初始化代码中定义的资源。（请注意，您无法控制 AWS Lambda 保留容器的时间长度，也根本无法控制是否会重用容器。）

对于使用 AWS Lambda 的 DynamoDB 触发器，我们建议：
+ AWS 服务客户端应该在初始化代码而非处理程序中实例化。这样可允许 AWS Lambda 在容器的整个生命周期中重用现有连接。
+ 通常而言，您无需明确管理连接或实施连接池，因为 AWS Lambda 将为您管理它。

DynamoDB 流的 Lambda 使用者不能保证只传输一次，并且可能导致偶尔出现重复。确保您的 Lambda 函数代码是幂等的，以防止由于重复处理而出现意外问题。

有关更多信息，请参阅 *AWS Lambda 开发人员指南*中的[使用 AWS Lambda 函数的最佳实践](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html)。

# DynamoDB Streams 和 Apache Flink
<a name="StreamsApacheFlink.xml"></a>

可以通过 Apache Flink 使用 Amazon DynamoDB Streams 记录。借助[适用于 Apache Flink 的亚马逊托管服务](https://aws.amazon.com/managed-service-apache-flink/)，可以使用 Apache Flink 来实时转换和分析流数据。Apache Flink 是一个用于处理实时数据的开源流处理框架。适用于 Apache Flink 的 Amazon DynamoDB Streams 连接器可简化 Apache Flink 工作负载的构建和管理，并可让您将应用程序与其它 AWS 服务集成。

适用于 Apache Flink 的亚马逊托管服务有助于您快速构建端到端流处理应用程序，以用于日志分析、点击流分析、物联网（IoT）、广告技术、游戏等。四个最常见的用例是流式提取-转换-加载（ETL）、事件驱动型应用程序、响应式实时分析和数据流的交互式查询。有关从 Amazon DynamoDB Streams 写入 Apache Flink 的更多信息，请参阅 [Amazon DynamoDB Streams Connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/)。