

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 DynamoDB Streams Kinesis 轉接器處理串流記錄
<a name="Streams.KCLAdapter"></a>

建議透過 Amazon Kinesis 轉接器耗用來自 Amazon DynamoDB 的串流。DynamoDB Streams API 類似於 Kinesis Data Streams 的 API 是刻意為之。在這兩種服務中，資料串流由碎片組成，碎片是用於串流紀錄的容器。這兩種服務的 API 都包含 `ListStreams`、`DescribeStream`、`GetShards` 以及 `GetShardIterator` 操作。(雖然這些 DynamoDB Streams 動作與其在 Kinesis Data Streams 中的對應動作類似，但它們並非完全相同。)

身為 DynamoDB Streams 使用者，您可以使用在 KCL 內找到的設計模式來處理 DynamoDB Streams 碎片和串流紀錄。為此，您可以使用 DynamoDB Streams Kinesis 轉接器。Kinesis 轉接器會實作 Kinesis Data Streams 介面，以便您將 KCL 用於耗用和處理來自 DynamoDB Streams 的紀錄。如需如何設定和安裝 DynamoDB Streams Kinesis Adapter 的指示，請參閱 [GitHub 儲存庫](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)。

您可以使用 Kinesis Client Library (KCL) 為 Kinesis Data Streams 撰寫應用程式。KCL 會在低階 Kinesis Data Streams API 上提供有用的抽象，可以簡化程式碼。如需 KCL 的詳細資訊，請參閱《Amazon Kinesis Data Streams 開發人員指南》中的[使用 Kinesis Client Library 開發取用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。

DynamoDB 建議使用 KCL 3.x 版搭配適用於 Java 的 AWS SDK v2.x。目前 DynamoDB Streams Kinesis Adapter 1.x 版搭配 適用於 Java 的 AWS SDK 適用於 v1.x 的 AWS SDK，在過渡期間會繼續如預期完全支援整個生命週期，以符合 [AWS SDKs和工具維護政策](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)。

**注意**  
Amazon Kinesis Client Library (KCL) 版本 1.x 和 2.x 已過期。KCL 1.x 將於 2026 年 1 月 30 日終止支援。我們強烈建議您，在 2026 年 1 月 30 日之前，將使用 1.x 版的 KCL 應用程式移轉至最新的 KCL 版本。如需尋找最新的 KCL 版本，請參閱 GitHub 上的 [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) 頁面。如需最新 Kinesis Client Library 版本的相關詳細資訊，請參閱[使用 Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html)。如需有關從 KCL 1.x 移轉至 3.x 的詳細資訊，請參閱《從 KCL 1.x 移轉到 KCL 3.x》。

下圖顯示這些程式庫彼此如何互動。

![\[DynamoDB Streams、Kinesis Data Streams 和 KCL 之間的互動，用於處理 DynamoDB Streams 記錄。\]](http://docs.aws.amazon.com/zh_tw/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


有了 DynamoDB Streams Kinesis 轉接器，您就可以開始針對 KCL 介面進行開發，並將 API 呼叫順暢地導向 DynamoDB Streams 端點。

當應用程式啟動後，其會呼叫 KCL 以將工作者執行個體化。您必須向工作者提供應用程式的組態資訊，例如串流描述項和 AWS 登入資料，以及您提供的記錄處理器類別名稱。當其在紀錄處理器中執行程式碼時，工作者會執行下列任務：
+ 連線到串流
+ 列舉串流內的碎片
+ 檢查並列舉串流中已關閉父碎片的子碎片
+ 與其他工作者 (若有) 協調碎片關聯性
+ 為其所管理的每個碎片執行個體化記錄處理器
+ 從串流提取紀錄
+ 在高輸送量期間擴展 GetRecords API 呼叫速率 （如果已設定追趕模式）
+ 將記錄推送至對應的記錄處理器
+ 對已處理的記錄執行檢查點作業
+ 當工作者執行個體數目變更時，平衡碎片與工作者的關聯
+ 當碎片進行分割時，平衡碎片與工作者的關聯

KCL 轉接器支援追趕模式，這是一種自動呼叫速率調整功能，用於處理暫時輸送量增加。當串流處理延遲超過可設定的閾值 （預設一分鐘） 時，追趕模式會將 GetRecords API 呼叫頻率擴展為可設定的 值 （預設 3 倍），以更快地擷取記錄，然後在延遲下降時恢復正常。在高傳輸量期間，DynamoDB 寫入活動可能會使用預設輪詢率讓消費者負擔過重，這非常寶貴。可以透過`catchupEnabled`組態參數啟用追趕模式 （預設 false)。

**注意**  
如需此處所列 KCL 概念的描述，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[使用 Kinesis Client Library 開發取用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。  
如需搭配 使用串流的詳細資訊， AWS Lambda 請參閱 [DynamoDB 串流和 AWS Lambda 觸發](Streams.Lambda.md)

# 從 KCL 1.x 移轉到 KCL 3.x
<a name="streams-migrating-kcl"></a>

## 概觀
<a name="migrating-kcl-overview"></a>

本指南提供如何將取用者應用程式從 KCL 1.x 移轉到 KCL 3.x 的說明。由於 KCL 1.x 和 KCL 3.x 之間的架構差異，移轉需要更新多個元件以確保相容性。

KCL 1.x 使用的類別和介面與 KCL 3.x 不同。您必須先將記錄處理器、記錄處理器處理站及工作者類別移轉至 KCL 3.x 相容格式，並遵循 KCL 1.x 移轉至 KCL 3.x 的移轉步驟。

## 移轉步驟
<a name="migration-steps"></a>

**Topics**
+ [步驟 1：移轉記錄處理器](#step1-record-processor)
+ [步驟 2：移轉記錄處理器處理站](#step2-record-processor-factory)
+ [步驟 3：移轉工作者](#step3-worker-migration)
+ [步驟 4：KCL 3.x 組態概觀和建議](#step4-configuration-migration)
+ [步驟 5：從 KCL 2.x 移轉至 KCL 3.x](#step5-kcl2-to-kcl3)

### 步驟 1：移轉記錄處理器
<a name="step1-record-processor"></a>

以下範例顯示基於 KCL 1.x DynamoDB Streams Kinesis 轉接器所實作的記錄處理器：

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**移轉 RecordProcessor 類別**

1. 將介面從 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 和 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` 更改為 `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor`，如下所示：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. 更新 `initialize` 和 `processRecords` 方法的匯入陳述式：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. 將 `shutdownRequested` 方法取代為以下的新方法：`leaseLost`、`shardEnded` 和 `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

記錄處理器類別經更新後的版本如下：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**注意**  
DynamoDB Streams Kinesis 轉接器現在使用 SDKv2 記錄模型。在 SDKv2 中，複雜 `AttributeValue` 物件 (`BS`、`NS`、`M`、`L`、`SS`) 永遠不會傳回空值。使用 `hasBs()`、`hasNs()`、`hasM()`、`hasL()`、`hasSs()` 方法驗證這些值是否存在。

### 步驟 2：移轉記錄處理器處理站
<a name="step2-record-processor-factory"></a>

記錄處理器處理站負責在取得租用時建立記錄處理器。以下是 KCL 1.x 處理站的範例：

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**移轉 `RecordProcessorFactory`**
+ 將實作的介面從 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` 更改為 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`，如下所示：

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

以下是 3.0 中記錄處理器工廠的範例：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### 步驟 3：移轉工作者
<a name="step3-worker-migration"></a>

在 KCL 的版本 3.0，名為**排程器**​的新類別會取代**工作者**類別。以下是 KCL 1.x 工作者的範例：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**移轉至工作者**

1. 將 `Worker` 類別的 `import` 陳述式變更為 `Scheduler` 和 `ConfigsBuilder` 類別的匯入陳述式。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 匯入 `StreamTracker` 並將匯入 `StreamsWorkerFactory` 變更至 `StreamsSchedulerFactory`。

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. 選擇要啟動應用程式的位置。其可能是 `TRIM_HORIZON` 或 `LATEST`。

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. 建立 `StreamTracker` 執行個體。

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. 建立 `AmazonDynamoDBStreamsAdapterClient` 物件。

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. 建立 `ConfigsBuilder` 物件。

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 使用 `ConfigsBuilder` 建立 `Scheduler`，如下列範例所示：

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**重要**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 設定可維持 KCL v3 與 KCL v1 之間的 DynamoDB Streams Kinesis Adapter 相容性，而非 KCL v2 與 v3 之間的相容性。

### 步驟 4：KCL 3.x 組態概觀和建議
<a name="step4-configuration-migration"></a>

如需詳細了解 KCL 3.x 中，在 KCL 1.x 之後推出的相關組態，請參閱 [KCL 組態](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html)和 [KCL 移轉用戶端組態](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)。

**重要**  
我們建議使用 `ConfigsBuilder` 在 KCL 3.x 及更新版本中設定組態，而非直接建立 `checkpointConfig`、`coordinatorConfig`、`leaseManagementConfig`、`metricsConfig`、`processorConfig` 及 `retrievalConfig` 的物件，以避免排程器初始化問題。`ConfigsBuilder` 則提供更靈活且跟更容易維護的 KCL 應用程式設定方式。

#### KCL 3.x 的更新預設值組態
<a name="kcl3-configuration-overview"></a>

`billingMode`  
KCL 1.x 版中，`billingMode` 的預設值設定為 `PROVISIONED`。不過，KCL 3.x 版 `billingMode` 的預設值為 `PAY_PER_REQUEST` (隨需模式)。我們建議租用資料表使用隨需容量模式，根據用量自動調整容量。如需租用資料表使用佈建容量的指南，請參閱[具有佈建容量模式的租用資料表最佳實務](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)。

`idleTimeBetweenReadsInMillis`  
在 KCL 1.x 版中，`idleTimeBetweenReadsInMillis` 的預設值設定為 1,000 (或 1 秒)。KCL 3.x 版會將 i`dleTimeBetweenReadsInMillis` 的預設值設定為 1,500 (或 1.5 秒)，但 Amazon DynamoDB Streams Kinesis 轉接器會將預設值覆寫為 1,000 (或 1 秒)。

#### KCL 3.x 中的新組態
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
此組態會定義新發現碎片開始處理之前的時間間隔，計算方式為 1.5 x `leaseAssignmentIntervalMillis`。如果未明確配置此設定，則時間間隔預設為 1.5 x `failoverTimeMillis`。處理新碎片包含掃描租用資料表，並在租用資料表上查詢全域次要索引 (GSI)。降低 `leaseAssignmentIntervalMillis` 會增加掃描和查詢操作的頻率，進而產生更高的 DynamoDB 成本。我們建議將此值設定為 2000 (或 2 秒)，將處理新碎片的延遲降至最低。

`shardConsumerDispatchPollIntervalMillis`  
此組態定義了碎片取用者進行連續輪詢以觸發狀態轉換的間隔時間。在 KCL 1.x 版中，此行為由 `idleTimeInMillis` 參數控制，該參數並未公開為可配置設定。使用 KCL 3.x 版時，我們建議設定此組態，符合您在 KCL 1.x 版設定用於 ` idleTimeInMillis` 的值。

### 步驟 5：從 KCL 2.x 移轉至 KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

為了確保順利轉換至最新的 Kinesis Client Library (KCL) 版本及其相容性，請遵循移轉指南[從 KCL 2.x 升級到 KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics) 中，步驟 5-8 的指示。

如需常見的 KCL 3.x 疑難排解問題，請參閱 [KCL 取用者應用程式疑難排解](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)。

# 轉返為先前的 KCL 版本
<a name="kcl-migration-rollback"></a>

本主題說明如何將取用者應用程式轉返為先前的 KCL 版本。轉返程序包含兩個步驟：

1. 執行 [KCL 移轉工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 重新部署先前 KCL 版本的程式碼。

## 步驟 1：執行 KCL 移轉工具
<a name="kcl-migration-rollback-step1"></a>

當您需要轉返至先前的 KCL 版本時，您必須執行 KCL 移轉工具。此工具會執行兩項重要任務：
+ 移除 DynamoDB 租用資料表上，稱為工作者指標資料表和全域次要索引的中繼資料資料表。這些成品由 KCL 3.x 建立，但當您轉返至先前版本時，不需要這些成品。
+ 此工具可讓所有工作者在 KCL 1.x 相容模式下執行，並開始使用先前 KCL 版本的負載平衡演算法。如果您使用 KCL 3.x 新負載平衡演算法時遇到問題，此步驟會立即緩解該問題。

**重要**  
DynamoDB 中的協調器狀態資料表必須存在，且不得在移轉、轉返及向前復原過程中刪除。

**注意**  
請務必讓取用者應用程式中的所有工作者，在指定時間使用相同的負載平衡演算法。KCL 移轉工具可確保 KCL 3.x 取用者應用程式中的所有工作者全數切換到 KCL 1.x 相容模式，以便在應用程式轉返至先前的 KCL 版本期間，所有工作者都執行相同的負載平衡演算法。

您可以在 [KCL GitHub 儲存庫](https://github.com/awslabs/amazon-kinesis-client/tree/master)的指令碼目錄中，下載 [KCL 移轉工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。從具備適當許可的工作者或主機執行指令碼，以寫入協調器狀態資料表、工作者指標資料表和租用資料表。確保已針對 KCL 取用者應用程式設定適當的 [IAM 許可](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html)。使用指定的命令，每個 KCL 應用程式僅執行一次指令碼：

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### Parameters
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
將*區域*取代為 AWS 區域。

`--application_name`  
如果您使用 DynamoDB 中繼資料資料表 (租用資料表、協調器狀態資料表和工作者指標資料表) 的預設名稱，則需要此參數。如果您已為這些資料表指定自訂名稱，可以省略此參數。使用您現有應用程式的名稱取代 *applicationName*。如果未提供自訂名稱，工具會使用此名稱衍生預設資料表名稱。

`--lease_table_name`  
當您已在 KCL 組態中自訂租用資料表的名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。使用您為租用資料表指定的自訂資料表名稱，取代 *leaseTableName*。

`--coordinator_state_table_name`  
當您在 KCL 組態中為自訂協調器狀態資料表名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。使用您為協調器狀態資料表指定的自訂資料表名稱，取代 *coordinatorStateTableName*。

`--worker_metrics_table_name`  
當您在 KCL 組態中自訂工作者指標資料表的名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。使用您為工作者指標資料表指定的自訂資料表名稱，取代 *workerMetricsTableName*。

## 步驟 2：使用先前的 KCL 版本重新部署程式碼
<a name="kcl-migration-rollback-step2"></a>

**重要**  
KCL 移轉工具產生的輸出中提及 2.x 版時，應解釋為提及 KCL 1.x 版。執行指令碼不會執行完整的轉返，只會切換到 KCL 1.x 版使用的負載平衡演算法。

執行轉返 KCL 移轉工具後，您會看到以下其中一則訊息：

訊息 1  
「轉返已完成。您的應用程式先前執行 2x 相容功能。請使用先前的 KCL 版本部署程式碼，轉返至先前的應用程式二進位檔。」  
**必要動作：**這表示您的工作者在 KCL 1.x 相容模式下執行。將具有先前 KCL 版本的程式碼，重新部署到您的工作者。

訊息 2  
「轉返已完成。您的 KCL 應用程式先前執行 3x 功能，並會轉返至 2x 相容功能。如果您在短時間內沒看到緩解措施，請使用先前的 KCL 版本部署程式碼，轉返到先前的應用程式二進位檔。」  
**必要動作：**這表示您的工作者是在 KCL 3.x 模式下執行，而 KCL 移轉工具會將所有工作者切換到 KCL 1.x 相容模式。將具有先前 KCL 版本的程式碼，重新部署到您的工作者。

訊息 3  
「應用程式已轉返。所有可刪除的 KCLv3 資源都已清除，以避免產生費用，直到應用程式可以透過移轉向前復原為止。」  
**必要動作：**這表示您的工作者已轉返，以在 KCL 1.x 相容模式下執行。將具有先前 KCL 版本的程式碼，重新部署到您的工作者。

# 轉返後向前復原至 KCL 3.x
<a name="kcl-migration-rollforward"></a>

本主題說明如何在轉返後將取用者應用程式向前復原至 KCL 3.x。當您需要向前復原時，您必須完成兩步驟的程序：

1. 執行 [KCL 移轉工具](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)。

1. 使用 KCL 3.x 部署程式碼。

## 步驟 1：執行 KCL 移轉工具
<a name="kcl-migration-rollforward-step1"></a>

使用以下命令執行 KCL 移轉工具，以向前復原至 KCL 3.x：

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### Parameters
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
將*區域*取代為您的 AWS 區域。

`--application_name`  
如果您使用協調器狀態資料表的預設名稱，則需要此參數。如果您已指定協調器狀態資料表的自訂名稱，可以省略此參數。使用您現有應用程式的名稱取代 *applicationName*。如果未提供自訂名稱，工具會使用此名稱衍生預設資料表名稱。

`--coordinator_state_table_name`  
當您在 KCL 組態中為自訂協調器狀態資料表名稱時，需要此參數。如果您使用的是預設資料表名稱，可以省略此參數。使用您為協調器狀態資料表指定的自訂資料表名稱，取代 *coordinatorStateTableName*。

以向前復原模式執行移轉工具後，KCL 會建立 KCL 3.x 所需的下列 DynamoDB 資源：
+ 租用資料表上的全域次要索引
+ 工作者指標資料表

## 步驟 2：使用 KCL 3.x 部署程式碼
<a name="kcl-migration-rollforward-step2"></a>

執行向前復原的 KCL 移轉工具之後，請使用 KCL 3.x 將程式碼部署至工作者。若要完成移轉，請參閱[步驟 8：完成移轉](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish)。

# 逐步解說：DynamoDB Streams Kinesis 轉接器
<a name="Streams.KCLAdapter.Walkthrough"></a>

本節會逐步解說使用 Amazon Kinesis Client Library 與 Amazon DynamoDB Streams Kinesis 轉接器的 Java 應用程式。此應用程式示範資料複寫的範例，將一份資料表的寫入活動套用至第二份資料表，讓兩份資料表的內容保持同步。如需來源碼，請參閱「[完整程式：DynamoDB Streams Kinesis 轉接器](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)」。

此程式執行下列操作：

1. 建立兩個 DynamoDB 資料表，並命名為 `KCL-Demo-src` 和 `KCL-Demo-dst`。這些資料表各會啟用串流。

1. 在來源資料表中新增、更新與刪除項目，以產生更新活動。這會導致將資料寫入資料表的串流。

1. 從串流讀取紀錄、將其重新建構為 DynamoDB 請求，再將請求套用至目標資料表。

1. 掃描來源與目標資料表，以確定其內容相同。

1. 刪除資料表以清除。

下列章節將說明這些步驟，並在演練結尾顯示完整的應用程式。

**Topics**
+ [步驟 1：建立 DynamoDB 資料表](#Streams.KCLAdapter.Walkthrough.Step1)
+ [步驟 2：在來源資料表中產生更新活動](#Streams.KCLAdapter.Walkthrough.Step2)
+ [步驟 3：處理串流](#Streams.KCLAdapter.Walkthrough.Step3)
+ [步驟 4：確定這兩個資料表的內容相同](#Streams.KCLAdapter.Walkthrough.Step4)
+ [步驟 5：清除](#Streams.KCLAdapter.Walkthrough.Step5)
+ [完整程式：DynamoDB Streams Kinesis 轉接器](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## 步驟 1：建立 DynamoDB 資料表
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

第一步是建立兩個 DynamoDB 資料表：一個來源資料表與一個目標資料表。來源資料表串流中的 `StreamViewType` 是 `NEW_IMAGE`。這表示每當在此資料表中修改項目時，項目的「修改後」影像就會寫入串流。串流可透過此方法追蹤資料表上的所有寫入活動。

下列範例顯示用來建立這兩份資料表的程式碼。

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## 步驟 2：在來源資料表中產生更新活動
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

下一步是在來源資料表上產生一些寫入活動。在此活動進行期間，來源資料表的串流也會近乎即時地更新。

此應用程式會使用呼叫 `PutItem`、`UpdateItem` 與 `DeleteItem` API 操作的方法，來定義小幫手類別，以寫入資料。下列程式碼範例示範如何使用這些方法。

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## 步驟 3：處理串流
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

程式現在會開始處理串流。DynamoDB Streams Kinesis 轉接器會作為 KCL 與 DynamoDB Streams 端點之間的透明層，讓程式碼可以充分利用 KCL，而不需要發出低階 DynamoDB Streams 呼叫。此程式會執行下列任務：
+ 它會以遵守 KCL 介面定義的方法 (`StreamsRecordProcessor`、`initialize` 與 `processRecords`) 來定義紀錄處理器類別 `shutdown`。`processRecords` 方法包含從來源資料表串流讀取與寫入目標資料表所需的邏輯。
+ 它會定義紀錄處理器類別的類別處理站 (`StreamsRecordProcessorFactory`)。這是使用 KCL 之 Java 程式的必要任務。
+ 它會執行個體化與類別處理站相關聯的新 KCL `Worker`。
+ 它會在紀錄處理完成時關閉 `Worker`。

或者，在 Streams KCL Adapter 組態中啟用追趕模式，以便在串流處理延遲超過一分鐘 （預設） 時自動將 GetRecords API 呼叫速率擴展 3 倍 （預設），協助您的串流取用者處理資料表中的高輸送量峰值。

若要進一步了解 KCL 介面定義，請參閱 *Amazon Kinesis Data Streams 開發人員指南*中的[使用 Kinesis Client Library 開發取用者](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)。

下列程式碼範例顯示 `StreamsRecordProcessor` 中的主迴圈。`case` 陳述式會根據串流紀錄中顯示的 `OperationType` 來決定要執行的動作。

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## 步驟 4：確定這兩個資料表的內容相同
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

此時，來源與目標資料表的內容會同步。此應用程式會對這兩個資料表發出 `Scan` 請求，以確認其內容事實上相同。

`DemoHelper` 類別包含呼叫低階 `Scan` API 的 `ScanTable` 方法。下列範例示範其使用方法。

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## 步驟 5：清除
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

示範已完成，因此應用程式會刪除來源與目標資料表。請參閱以下程式碼範例。即使在刪除資料表後，其串流也會保持可用長達 24 小時，然後就自動刪除。

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# 完整程式：DynamoDB Streams Kinesis 轉接器
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

以下是執行此[逐步解說：DynamoDB Streams Kinesis 轉接器](Streams.KCLAdapter.Walkthrough.md)所述之任務的完整 Java 程式。當您執行它時，應該會看到類似如下的輸出。

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**重要**  
 若要執行此程式，請確定用戶端應用程式可使用政策存取 DynamoDB 和 Amazon CloudWatch。如需詳細資訊，請參閱[適用於 DynamoDB 的以身分為基礎的政策](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies)。

原始程式碼包含四個`.java`檔案。若要建置此程式，請新增下列相依性，其中包括 Amazon Kinesis Client Library (KCL) 3.x 和適用於 Java v2 的 AWS SDK 做為暫時性相依性：

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

來源檔案為：
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```