

# 从 KCL 1.x 迁移到 KCL 3.x
<a name="streams-migrating-kcl"></a>

## 概述
<a name="migrating-kcl-overview"></a>

本指南提供有关将使用者应用程序从 KCL 1.x 迁移到 KCL 3.x 的说明。由于 KCL 1.x 和 KCL 3.x 之间的架构差异，迁移需要更新多个组件以确保兼容性。

与 KCL 3.x 相比，KCL 1.x 使用不同的类和接口。必须先将记录处理器、记录处理器工厂和工作线程类迁移到 KCL 3.x 兼容格式，然后按照将 KCL 1.x 迁移到 KCL 3.x 的迁移步骤进行操作。

## 迁移步骤
<a name="migration-steps"></a>

**Topics**
+ [步骤 1：迁移记录处理器](#step1-record-processor)
+ [步骤 2：迁移记录处理器工厂](#step2-record-processor-factory)
+ [步骤 3：迁移工作线程](#step3-worker-migration)
+ [步骤 4：KCL 3.x 配置概述和建议](#step4-configuration-migration)
+ [步骤 5：从 KCL 2.x 迁移到 KCL 3.x](#step5-kcl2-to-kcl3)

### 步骤 1：迁移记录处理器
<a name="step1-record-processor"></a>

以下示例显示了为 KCL 1.x DynamoDB Streams Kinesis Adapter 实现的记录处理器：

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**迁移 RecordProcessor 类**

1. 将接口从 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 和 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` 更改为 `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor`，如下所示：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. 更新 `initialize` 和 `processRecords` 方法的导入语句：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. 使用以下新方法替换 `shutdownRequested` 方法：`leaseLost`、`shardEnded` 和 `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

下面是记录处理器类的更新版本：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**注意**  
DynamoDB Streams Kinesis Adapter 现在使用 SDKv2 记录模型。在 SDKv2 中，复杂的 `AttributeValue` 对象（`BS`、`NS`、`M`、`L`、`SS`）从不会返回 null。使用 `hasBs()`、`hasNs()`、`hasM()`、`hasL()`、`hasSs()` 方法来验证这些值是否存在。

### 步骤 2：迁移记录处理器工厂
<a name="step2-record-processor-factory"></a>

记录处理器工厂负责在获得租约时创建记录处理器。下面是 KCL 1.x 工厂的示例：

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**迁移到 `RecordProcessorFactory`**
+ 将已实施的接口从 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` 更改为 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`，如下所示：

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

下面是 3.0 中的记录处理器工厂的示例：

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### 步骤 3：迁移工作线程
<a name="step3-worker-migration"></a>

在 KCL 版本 3.0 中，名为**调度器**的新类取代了**工作线程**类。下面是 KCL 1.x 工作线程的示例：

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**迁移工作程序**

1. 将 `import` 类的 `Worker` 语句更改为 `Scheduler` 和 `ConfigsBuilder` 类的导入语句。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 导入 `StreamTracker` 并将 `StreamsWorkerFactory` 的导入更改为 `StreamsSchedulerFactory`。

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. 选择从中启动应用程序的位置。它可以为 `TRIM_HORIZON` 或 `LATEST`。

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. 创建一个 `StreamTracker` 实例。

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. 创建 `AmazonDynamoDBStreamsAdapterClient` 对象。

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. 创建 `ConfigsBuilder` 对象。

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 使用 `ConfigsBuilder` 创建 `Scheduler`，如以下示例所示：

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**重要**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 设置在 KCL v3 与 KCL v1 的 DynamoDB Streams Kinesis Adapter 之间保持兼容性，而未在 KCL v2 和 v3 之间保持兼容性。

### 步骤 4：KCL 3.x 配置概述和建议
<a name="step4-configuration-migration"></a>

有关 KCL 1.x 之后引入的与 KCL 3.x 相关的配置的详细描述，请参阅 [KCL configurations](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) 和 [KCL migration client configuration](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)。

**重要**  
在 KCL 3.x 及更高版本中，我们建议不要直接创建 `checkpointConfig`、`coordinatorConfig`、`leaseManagementConfig`、`metricsConfig`、`processorConfig` 和 `retrievalConfig` 的对象，而是使用 `ConfigsBuilder` 设置配置，来避免出现调度器初始化问题。`ConfigsBuilder` 提供了更灵活且更易于维护的方式配置 KCL 应用程序。

#### 在 KCL 3.x 中具有更新默认值的配置
<a name="kcl3-configuration-overview"></a>

`billingMode`  
在 KCL 版本 1.x 中，`billingMode` 的默认值设置为 `PROVISIONED`。但在 KCL 版本 3.x 中，默认 `billingMode` 为 `PAY_PER_REQUEST`（按需模式）。我们建议您对租约表使用按需容量模式，以便根据您的使用情况自动调整容量。有关对租约表使用预置容量的指导，请参阅 [Best practices for the lease table with provisioned capacity mode](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)。

`idleTimeBetweenReadsInMillis`  
在 KCL 版本 1.x 中，`idleTimeBetweenReadsInMillis` 的默认值设置为 1000（或 1 秒）。KCL 版本 3.x 将 i`dleTimeBetweenReadsInMillis` 的默认值设置为 1500（或 1.5 秒），但 Amazon DynamoDB Streams Kinesis Adapter 将默认值改写为 1000（或 1 秒）。

#### KCL 3.x 中的新配置
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
此配置定义在新发现的分片开始处理之前的时间间隔，计算方法为 1.5 × `leaseAssignmentIntervalMillis`。如果未显式配置此设置，则时间间隔默认为 1.5 × `failoverTimeMillis`。处理新分片包括扫描租约表并在租约表上查询全局二级索引（GSI）。降低 `leaseAssignmentIntervalMillis` 会增加这些扫描和查询操作的频率，从而导致 DynamoDB 成本更高。建议将此值设置为 2000（即 2 秒），以尽可能减少处理新分片的延迟。

`shardConsumerDispatchPollIntervalMillis`  
此配置定义了分片使用者用于触发状态转换的连续轮询之间的间隔。在 KCL 版本 1.x 中，此行为由 `idleTimeInMillis` 参数控制，该参数未作为可配置的设置公开。在 KCL 版本 3.x 中，我们建议将此配置设置为与 KCL 版本 1.x 设置中用于 ` idleTimeInMillis` 的值相匹配。

### 步骤 5：从 KCL 2.x 迁移到 KCL 3.x
<a name="step5-kcl2-to-kcl3"></a>

为确保平稳过渡并与最新的 Kinesis Client Library（KCL）版本兼容，请按照迁移指南的 [upgrading from KCL 2.x to KCL 3.x](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics) 说明中的步骤 5-8 进行操作。

有关常见的 KCL 3.x 故障排除问题，请参阅 [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)。