

# 使用 DynamoDB Streams Kinesis Adapter 处理流记录
<a name="Streams.KCLAdapter"></a>

使用 Amazon Kinesis Adapter 是使用来自 Amazon DynamoDB 的流的建议方法。DynamoDB Streams API 有意与 Kinesis Data Streams 的 API 类似。在这两种服务中，数据流都由分片组成，分片是流记录的容器。这两种服务的 API 都包含 `ListStreams`、`DescribeStream`、`GetShards` 和 `GetShardIterator` 操作。（虽然这些 DynamoDB Streams 操作与 Kinesis Data Streams 中的对应操作类似，但它们并不完全相同。）

作为 DynamoDB Streams 用户，您可以使用 KCL 中找到的设计模式来处理 DynamoDB Streams 分片和流记录。若要执行此操作，请使用 DynamoDB Streams Kinesis Adapter。Kinesis Adapter 实现 Kinesis Data Streams 接口，以便 KCL 可用于使用和处理来自 DynamoDB Streams 的记录。有关如何设置和安装 DynamoDB Streams Kinesis Adapter 的说明，请参阅 [GitHub 存储库](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)。

您可以使用 Kinesis 客户端库 (KCL) 为 Kinesis Data Streams 编写应用程序。KCL 提供低级 Kinesis Data Streams API 之上的有用抽象来简化编码。有关 KCL 的更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[使用 Kinesis 客户端库开发使用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。

DynamoDB 建议将 KCL 版本 3.x 与适用于 Java 的 AWS SDK v2.x 一起使用。在过渡期间，当前 DynamoDB Streams Kinesis Adapter 版本 1.x 与 AWS SDK for 适用于 Java 的 AWS SDK v1.x 将按照 [AWS SDK 和工具维护政策](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)，在其整个生命周期内继续按预期得到全面支持。

**注意**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日终止支持。我们强烈建议您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问 GitHub 上的 [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) 页面。有关最新 KCL 版本的信息，请参阅 [Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅“从 KCL 1.x 迁移到 KCL 3.x”。

以下图表显示了这些库之间的交互方式。

![\[通过 DynamoDB Streams、Kinesis Data Streams 和 KCL 之间的交互处理 DynamoDB Streams 记录。\]](http://docs.aws.amazon.com/zh_cn/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


有了 DynamoDB Streams Kinesis Adapter，您可以开始针对 KCL 接口进行开发，使 API 调用无缝定向到 DynamoDB Streams 端点。

应用程序启动后，调用 KCL 来实例化工作进程。必须为工作进程提供应用程序的配置信息，如流描述符和 AWS 凭证，以及您提供的记录处理器类的名称。在记录处理器中运行代码时，工作进程执行以下任务：
+ 连接到流
+ 枚举流中的分片
+ 检查并枚举流中已关闭父分片的子分片
+ 协调与其他工作程序的分片关联（如果有）
+ 为其管理的每个分片实例化记录处理器
+ 从流中提取记录
+ 在高吞吐量期间扩展 GetRecords API 调用速率（如果配置了追赶模式）
+ 将记录推送到对应的记录处理器
+ 对已处理记录进行检查点操作
+ 在工作程序实例计数更改时均衡分片与工作程序的关联
+ 在分片被拆分时平衡分片与工作程序的关联

KCL 适配器支持追赶模式，这是一种自动调整调用速率的功能，用于处理临时增加的吞吐量。当流处理滞后超过可配置的阈值（默认为一分钟）时，追赶模式会按可配置值（默认 3 倍）扩展 GetRecords API 调用频率，以更快地检索记录，然后在滞后下降后恢复正常。这在高吞吐量时段非常有用，在这段时间里，DynamoDB 写入活动可能会使用默认轮询速率让使用者不堪重负。可以通过 `catchupEnabled` 配置参数启用追赶模式（默认为 false）。

**注意**  
有关此处列出的 KCL 概念的说明，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[使用 Kinesis 客户端库开发使用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。  
有关将流与 AWS Lambda 配合使用的更多信息，请参阅 [DynamoDB Streams 和 AWS Lambda 触发器](Streams.Lambda.md)

# 从 KCL 1.x 迁移到 KCL 3.x
<a name="streams-migrating-kcl"></a>

## 概述
<a name="migrating-kcl-overview"></a>

本指南提供有关将使用者应用程序从 KCL 1.x 迁移到 KCL 3.x 的说明。由于 KCL 1.x 和 KCL 3.x 之间的架构差异，迁移需要更新多个组件以确保兼容性。

与 KCL 3.x 相比，KCL 1.x 使用不同的类和接口。必须先将记录处理器、记录处理器工厂和工作线程类迁移到 KCL 3.x 兼容格式，然后按照将 KCL 1.x 迁移到 KCL 3.x 的迁移步骤进行操作。

## 迁移步骤
<a name="migration-steps"></a>

**Topics**
+ [步骤 1：迁移记录处理器](#step1-record-processor)
+ [步骤 2：迁移记录处理器工厂](#step2-record-processor-factory)
+ [步骤 3：迁移工作线程](#step3-worker-migration)
+ [步骤 4：KCL 3.x 配置概述和建议](#step4-configuration-migration)
+ [步骤 5：从 KCL 2.x 迁移到 KCL 3.x](#step5-kcl2-to-kcl3)

### 步骤 1：迁移记录处理器
<a name="step1-record-processor"></a>

以下示例显示了为 KCL 1.x DynamoDB Streams Kinesis Adapter 实现的记录处理器：

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**迁移 RecordProcessor 类**

1. 将接口从 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 和 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` 更改为 `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor`，如下所示：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. 更新 `initialize` 和 `processRecords` 方法的导入语句：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. 使用以下新方法替换 `shutdownRequested` 方法：`leaseLost`、`shardEnded` 和 `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

下面是记录处理器类的更新版本：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**注意**  
DynamoDB Streams Kinesis Adapter 现在使用 SDKv2 记录模型。在 SDKv2 中，复杂的 `AttributeValue` 对象（`BS`、`NS`、`M`、`L`、`SS`）从不会返回 null。使用 `hasBs()`、`hasNs()`、`hasM()`、`hasL()`、`hasSs()` 方法来验证这些值是否存在。

### 步骤 2：迁移记录处理器工厂
<a name="step2-record-processor-factory"></a>

记录处理器工厂负责在获得租约时创建记录处理器。下面是 KCL 1.x 工厂的示例：

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**迁移到 `RecordProcessorFactory`**
+ 将已实施的接口从 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` 更改为 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`，如下所示：

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

下面是 3.0 中的记录处理器工厂的示例：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### 步骤 3：迁移工作线程
<a name="step3-worker-migration"></a>

在 KCL 版本 3.0 中，名为**调度器**的新类取代了**工作线程**类。下面是 KCL 1.x 工作线程的示例：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**迁移工作程序**

1. 将 `import` 类的 `Worker` 语句更改为 `Scheduler` 和 `ConfigsBuilder` 类的导入语句。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 导入 `StreamTracker` 并将 `StreamsWorkerFactory` 的导入更改为 `StreamsSchedulerFactory`。

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. 选择从中启动应用程序的位置。它可以为 `TRIM_HORIZON` 或 `LATEST`。

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. 创建一个 `StreamTracker` 实例。

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. 创建 `AmazonDynamoDBStreamsAdapterClient` 对象。

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. 创建 `ConfigsBuilder` 对象。

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 使用 `ConfigsBuilder` 创建 `Scheduler`，如以下示例所示：

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**重要**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 设置在 KCL v3 与 KCL v1 的 DynamoDB Streams Kinesis Adapter 之间保持兼容性，而未在 KCL v2 和 v3 之间保持兼容性。

### 步骤 4：KCL 3.x 配置概述和建议
<a name="step4-configuration-migration"></a>

有关 KCL 1.x 之后引入的与 KCL 3.x 相关的配置的详细描述，请参阅 [KCL configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) 和 [KCL migration client configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)。

**重要**  
在 KCL 3.x 及更高版本中，我们建议不要直接创建 `checkpointConfig`、`coordinatorConfig`、`leaseManagementConfig`、`metricsConfig`、`processorConfig` 和 `retrievalConfig` 的对象，而是使用 `ConfigsBuilder` 设置配置，来避免出现调度器初始化问题。`ConfigsBuilder` 提供了更灵活且更易于维护的方式配置 KCL 应用程序。

#### 在 KCL 3.x 中具有更新默认值的配置
<a name="kcl3-configuration-overview"></a>

`billingMode`  
在 KCL 版本 1.x 中，`billingMode` 的默认值设置为 `PROVISIONED`。但在 KCL 版本 3.x 中，默认 `billingMode` 为 `PAY_PER_REQUEST`（按需模式）。我们建议您对租约表使用按需容量模式，以便根据您的使用情况自动调整容量。有关对租约表使用预置容量的指导，请参阅 [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)。

`idleTimeBetweenReadsInMillis`  
在 KCL 版本 1.x 中，`idleTimeBetweenReadsInMillis` 的默认值设置为 1000（或 1 秒）。KCL 版本 3.x 将 i`dleTimeBetweenReadsInMillis` 的默认值设置为 1500（或 1.5 秒），但 Amazon DynamoDB Streams Kinesis Adapter 将默认值改写为 1000（或 1 秒）。

#### KCL 3.x 中的新配置
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
此配置定义在新发现的分片开始处理之前的时间间隔，计算方法为 1.5 × `leaseAssignmentIntervalMillis`。如果未显式配置此设置，则时间间隔默认为 1.5 × `failoverTimeMillis`。处理新分片包括扫描租约表并在租约表上查询全局二级索引（GSI）。降低 `leaseAssignmentIntervalMillis` 会增加这些扫描和查询操作的频率，从而导致 DynamoDB 成本更高。建议将此值设置为 2000（即 2 秒），以尽可能减少处理新分片的延迟。

`shardConsumerDispatchPollIntervalMillis`  
此配置定义了分片使用者用于触发状态转换的连续轮询之间的间隔。在 KCL 版本 1.x 中，此行为由 `idleTimeInMillis` 参数控制，该参数未作为可配置的设置公开。在 KCL 版本 3.x 中，我们建议将此配置设置为与 KCL 版本 1.x 设置中用于 ` idleTimeInMillis` 的值相匹配。

### 步骤 5：从 KCL 2.x 迁移到 KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

为确保平稳过渡并与最新的 Kinesis Client Library（KCL）版本兼容，请按照迁移指南的 [upgrading from KCL 2.x to KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics) 说明中的步骤 5-8 进行操作。

有关常见的 KCL 3.x 故障排除问题，请参阅 [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)。

# 回滚至先前 KCL 版本
<a name="kcl-migration-rollback"></a>

本主题介绍如何将使用者应用程序回滚到先前 KCL 版本。回滚过程由两个步骤组成：

1. 运行 [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 重新部署以前的 KCL 版本代码。

## 步骤 1：运行 KCL 迁移工具
<a name="kcl-migration-rollback-step1"></a>

当需要回滚到先前 KCL 版本时，必须运行 KCL 迁移工具。此工具可执行两个重要任务：
+ 它在 DynamoDB 中的租约表上移除一个名为工作线程指标表的元数据表和全局二级索引。这些构件由 KCL 3.x 创建，但在回滚到先前版本时并不需要。
+ 它使所有工作线程均在与 KCL 1.x 兼容的模式下运行，并开始使用先前 KCL 版本中使用的负载均衡算法。如果 KCL 3.x 中的新负载均衡算法存在问题，这将立即缓解问题。

**重要**  
DynamoDB 中的协调器状态表必须存在，并且在迁移、回滚和前滚过程中不得删除。

**注意**  
重要的是，使用者应用程序中的所有工作线程在给定时间均使用相同的负载均衡算法。KCL 迁移工具可确保 KCL 3.x 使用者应用程序中的所有工作线程都切换到 KCL 1.x 兼容模式，以便在应用程序回滚到先前 KCL 版本期间，所有工作线程都运行相同的负载均衡算法。

您可以在 [KCL GitHub 存储库](https://github.com/awslabs/amazon-kinesis-client/tree/master)的 scripts 目录中下载 [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。从具有相应权限的工作线程或主机运行脚本，以写入协调器状态表、工作线程指标表和租约表。确保为 KCL 使用者应用程序配置了适当的 [IAM permissions](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html)。使用指定的命令对每个 KCL 应用程序仅运行一次脚本：

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### 参数
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
将*区域*替换为您的 AWS 区域。

`--application_name`  
如果您为 DynamoDB 元数据表（租约表、协调器状态表和工作线程指标表）使用默认名称，则需要此参数。如果您为这些表指定了自定义名称，则可以忽略此参数。将 *applicationName* 替换为实际的 KCL 应用程序名称。如果未提供自定义名称，该工具将使用此名称来派生默认表名称。

`--lease_table_name`  
如果您在 KCL 配置中为租约表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 *leaseTableName* 替换为您为租约表指定的自定义表名称。

`--coordinator_state_table_name`  
如果您在 KCL 配置中为协调器状态表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 *coordinatorStateTableName* 替换为您为协调器状态表指定的自定义表名称。

`--worker_metrics_table_name`  
如果您在 KCL 配置中为工作线程指标表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 *workerMetricsTableName* 替换为您为工作线程指标表指定的自定义表名称。

## 步骤 2：使用先前 KCL 版本重新部署代码
<a name="kcl-migration-rollback-step2"></a>

**重要**  
在 KCL 迁移工具生成的输出中提及版本 2.x 的任何内容都应解释为指的是 KCL 版本 1.x。运行该脚本不会执行完全回滚，它只会将负载均衡算法切换到 KCL 版本 1.x 中使用的算法。

运行 KCL 迁移工具来进行回滚后，您将看到以下消息之一：

消息 1  
“回滚已完成。应用程序正在运行 2x 兼容功能。请使用先前 KCL 版本部署代码，以回滚到先前的应用程序二进制文件。”  
**所需操作：**这意味着工作线程正在 KCL 1.x 兼容模式下运行。使用先前 KCL 版本将代码重新部署到工作线程。

消息 2  
“回滚已完成。KCL 应用程序正在运行 3x 功能，并将回滚到 2x 兼容功能。如果您在短时间内看不到缓解，请使用先前 KCL 版本部署代码，回滚到先前的应用程序二进制文件。”  
**所需操作：**这意味着工作线程正在 KCL 3.x 模式下运行，KCL 迁移工具已将所有工作线程切换到 KCL 1.x 兼容模式。使用先前 KCL 版本将代码重新部署到工作线程。

消息 3  
“应用程序已经回滚。任何可以删除的 KCLv3 资源都被清理以免产生费用，直至应用程序可以通过迁移进行前滚。”  
**所需操作：**这意味着工作线程已经回滚到在 KCL 1.x 兼容模式下运行。使用先前 KCL 版本将代码重新部署到工作线程。

# 回滚后前滚到 KCL 3.x
<a name="kcl-migration-rollforward"></a>

本主题介绍如何在回滚后将使用者应用程序前滚到 KCL 3.x。当您需要前滚时，必须完成一个由两步组成的过程：

1. 运行 [KCL Migration Tool](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 使用 KCL 3.x 部署代码。

## 步骤 1：运行 KCL 迁移工具
<a name="kcl-migration-rollforward-step1"></a>

使用以下命令运行 KCL 迁移工具，以便前滚到 KCL 3.x：

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### 参数
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
将*区域*替换为您的 AWS 区域。

`--application_name`  
如果您为协调器状态表使用默认名称，则需要此参数。如果您已为协调器状态表指定了自定义名称，则可以忽略此参数。将 *applicationName* 替换为实际的 KCL 应用程序名称。如果未提供自定义名称，该工具将使用此名称来派生默认表名称。

`--coordinator_state_table_name`  
如果您在 KCL 配置中为协调器状态表设置了自定义名称，则需要此参数。如果您使用的是默认表名称，则可以忽略此参数。将 *coordinatorStateTableName* 替换为您为协调器状态表指定的自定义表名称。

在前滚模式下运行迁移工具后，KCL 会创建 KCL 3.x 所需的以下 DynamoDB 资源：
+ 租约表上的全局二级索引
+ 工作线程指标表

## 步骤 2：使用 KCL 3.x 部署代码
<a name="kcl-migration-rollforward-step2"></a>

运行 KCL 迁移工具以进行前滚后，使用 KCL 3.x 将代码部署到工作线程。要完成迁移，请参阅 [Step 8: Complete the migration](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish)。

# 演练：DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough"></a>

本节是使用 Amazon Kinesis Client Library 和 Amazon DynamoDB Streams Kinesis Adapter 的 Java 应用程序的演练。此应用程序演示了数据复制示例，其中将一个表中的写入活动应用于另一个表，并且两个表中的内容保持同步。有关源代码，请参阅 [完成程序：DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)。

此程序执行以下操作：

1. 创建名为 `KCL-Demo-src` 和 `KCL-Demo-dst` 的两个 DynamoDB 表。每个表上均启用一个流。

1. 通过添加、更新和删除项目在源表中生成更新活动。这会导致数据写入表的流中。

1. 从流中读取记录、将记录重新构造为 DynamoDB 请求并将请求应用于目标表。

1. 扫描源表和目标表，以确保其内容一致。

1. 通过删除表进行清除。

以下各节将描述这些步骤，本演练结尾将显示完整的应用程序。

**Topics**
+ [第 1 步：创建 DynamoDB 表](#Streams.KCLAdapter.Walkthrough.Step1)
+ [第 2 步：在源表中生成更新活动](#Streams.KCLAdapter.Walkthrough.Step2)
+ [第 3 步：处理流](#Streams.KCLAdapter.Walkthrough.Step3)
+ [第 4 步：确保两个表具有相同的内容](#Streams.KCLAdapter.Walkthrough.Step4)
+ [第 5 步：清理](#Streams.KCLAdapter.Walkthrough.Step5)
+ [完成程序：DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## 第 1 步：创建 DynamoDB 表
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

第一步是创建两个 DynamoDB 表，一个源表和一个目标表。源表的流上的 `StreamViewType` 为 `NEW_IMAGE`。这意味着无论何时修改此表中的项目，项目“之后”的映像都将写入到流中。这样一来，流将跟踪表上的所有写入活动。

以下示例显示用于创建两个表的代码。

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## 第 2 步：在源表中生成更新活动
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

下一步是在源表上生成某些写入活动。在此活动发生时，源表的流也会近乎实时更新。

此应用程序通过调用用于写入数据的 `PutItem`、`UpdateItem` 和 `DeleteItem` API 操作的方法来定义帮助程序类。以下代码示例演示如何使用这些方法。

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## 第 3 步：处理流
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

现在，此程序开始处理流。DynamoDB Streams Kinesis Adapter 充当 KCL 和 DynamoDB Streams 端点之间的透明层，以便代码可充分利用 KCL 而不必进行低级 DynamoDB Streams 调用。此程序执行以下任务：
+ 它通过符合 KCL 接口定义的方法 (`StreamsRecordProcessor`、`initialize` 和 `processRecords`) 定义记录处理器类 `shutdown`。`processRecords` 方法包含从源表的流中进行读取以及对目标表进行写入时所需的逻辑。
+ 它定义了记录处理器类的类工厂 (`StreamsRecordProcessorFactory`)。这是使用 KCL 的 Java 程序所需的。
+ 它实例化一个新的 KCL `Worker`，它与类工厂关联。
+ 当记录处理完成时，它会关闭 `Worker`。

或者，在 Streams KCL Adapter 配置中启用追赶模式，以便在流处理滞后超过一分钟（默认值）时，自动将 GetRecords API 调用速率扩展 3 倍（默认值），从而有助于流使用者处理表中的高吞吐量峰值。

要了解有关 KCL 接口定义的详细信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*的[使用 Kinesis 客户端库开发使用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。

以下代码示例演示了 `StreamsRecordProcessor` 中的主循环。`case` 语句基于流记录中显示的 `OperationType` 来确定要执行的操作。

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## 第 4 步：确保两个表具有相同的内容
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

此时，源表和目标表的内容是同步的。此应用程序针对两个表发送 `Scan` 请求以验证其内容是否实质相同。

`DemoHelper` 类包含调用低级 `ScanTable` API 的 `Scan` 方法。下例说明具体用法。

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## 第 5 步：清理
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

演示完成后，此应用程序将删除源表和目标表。请看下面的代码示例。甚至在删除两个表后，其流也可在自动删除后的最多 24 个小时内保持可用。

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# 完成程序：DynamoDB Streams Kinesis Adapter
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

下文是执行 [演练：DynamoDB Streams Kinesis Adapter](Streams.KCLAdapter.Walkthrough.md) 所述任务的完整 Java 程序。当您运行该程序时，将显示与以下内容类似的输出。

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**重要**  
 要运行此程序，请确保客户端应用程序可以使用策略来访问 DynamoDB 和 Amazon CloudWatch。有关更多信息，请参阅 [适用于 DynamoDB 的基于身份的策略](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies)。

源代码包括四个 `.java` 文件。要构建此程序，请添加以下依赖项，其中包括作为传递依赖项的 Amazon Kinesis Client Library（KCL）3.x 和适用于 Java 的 AWS SDK v2：

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

源文件为：
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```