

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 将消费端从 KCL 1.x 迁移到 KCL 2.x
<a name="kcl-migration"></a>

**重要**  
Amazon Kinesis Client Library（KCL）版本 1.x 和 2.x 已过时。KCL 1.x 将于 2026 年 1 月 30 日 end-of-support上市。我们**强烈建议**您在 2026 年 1 月 30 日之前，将使用版本 1.x 的 KCL 应用程序迁移到最新的 KCL 版本。要查找最新的 KCL 版本，请访问上的 [Amazon Kinesis 客户端库页面](https://github.com/awslabs/amazon-kinesis-client)。 GitHub有关最新 KCL 版本的信息，请参阅[使用 Kinesis Client Library](kcl.md)。有关从 KCL 1.x 迁移到 KCL 3.x 的信息，请参阅[从 KCL 1.x 迁移到 KCL 3.x](kcl-migration-1-3.md)。

本主题介绍 Kinesis Client Library（KCL）版本 1.x 和 2.x 之间的区别。其中还向您展示如何将消费端从 KCL 版本 1.x 迁移到版本 2.x。在迁移您的客户端后，它将从最后一个检查点位置开始处理记录。

KCL 版本 2.0 引入了以下接口更改：


**KCL 接口更改**  

| KCL 1.x 接口 | KCL 2.0 接口 | 
| --- | --- | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor | software.amazon.kinesis.processor.ShardRecordProcessor | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory | software.amazon.kinesis.processor.ShardRecordProcessorFactory | 
| com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware | 折叠为 software.amazon.kinesis.processor.ShardRecordProcessor | 

**Topics**
+ [迁移记录处理器类](#recrod-processor-migration)
+ [迁移记录处理器工厂](#recrod-processor-factory-migration)
+ [迁移工作线程](#worker-migration)
+ [配置 Amazon Kinesis 客户端](#client-configuration)
+ [闲置时间删除](#idle-time-removal)
+ [客户端配置删除](#client-configuration-removals)

## 迁移记录处理器类
<a name="recrod-processor-migration"></a>

以下示例显示了为 KCL 1.x 实现的记录处理器：

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        //
        // Process records, and possibly checkpoint
        //
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**迁移记录处理器类**

1. 将接口从 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 和 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware` 更改为 `software.amazon.kinesis.processor.ShardRecordProcessor`，如下所示：

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
   public class TestRecordProcessor implements ShardRecordProcessor {
   ```

1. 更新 `import` 和 `initialize` 方法的 `processRecords` 语句。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
   ```

1. 使用以下新方法替换 `shutdown` 方法：`leaseLost`、`shardEnded` 和 `shutdownRequested`。

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

下面是记录处理器类的更新版本。

```
package com.amazonaws.kcl;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.kinesis.processor.ShardRecordProcessor;

public class TestRecordProcessor implements ShardRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

## 迁移记录处理器工厂
<a name="recrod-processor-factory-migration"></a>

记录处理器工厂负责在获得租约时创建记录处理器。下面是 KCL 1.x 工厂的示例。

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class TestRecordProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new TestRecordProcessor();
    }
}
```

**迁移记录处理器工厂**

1. 将已实施的接口从 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory` 更改为 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`，如下所示。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   import software.amazon.kinesis.processor.ShardRecordProcessor;
   
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
   import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
   
   // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
   public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
   ```

1. 更改 `createProcessor` 的返回签名。

   ```
   // public IRecordProcessor createProcessor() {
   public ShardRecordProcessor shardRecordProcessor() {
   ```

下面是 2.0 中的记录处理器工厂的示例：

```
package com.amazonaws.kcl;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new TestRecordProcessor();
    }
}
```

## 迁移工作线程
<a name="worker-migration"></a>

在 KCL 版本 2.0 中，名为 `Scheduler` 的新类取代了 `Worker` 类。下面是 KCL 1.x 工作程序的示例。

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = new Worker.Builder()
    .recordProcessorFactory(recordProcessorFactory)
    .config(config)
    .build();
```

**迁移工作程序**

1. 将 `Worker` 类的 `import` 语句更改为 `Scheduler` 和 `ConfigsBuilder` 类的导入语句。

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. 创建 `ConfigsBuilder` 和 `Scheduler`，如以下示例所示。

   建议您使用 `KinesisClientUtil` 创建 `KinesisAsyncClient`，并在 `KinesisAsyncClient` 中配置 `maxConcurrency`。
**重要**  
Amazon Kinesis 户端可能会看到延迟大幅增加，除非您将 `KinesisAsyncClient` 配置为具有足够高的 `maxConcurrency`，以允许所有租期以及额外使用 `KinesisAsyncClient`。

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   import software.amazon.kinesis.common.ConfigsBuilder;
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
   Region region = Region.AP_NORTHEAST_2;
   KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
   ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory());
   
   Scheduler scheduler = new Scheduler(
       configsBuilder.checkpointConfig(),
       configsBuilder.coordinatorConfig(),
       configsBuilder.leaseManagementConfig(),
       configsBuilder.lifecycleConfig(),
       configsBuilder.metricsConfig(),
       configsBuilder.processorConfig(),
       configsBuilder.retrievalConfig()
       );
   ```

## 配置 Amazon Kinesis 客户端
<a name="client-configuration"></a>

随着 Kinesis Client Library 版本 2.0 的发布，客户端的配置已从单个配置类（`KinesisClientLibConfiguration`）变为 6 个配置类。下表描述了迁移。


**配置字段及其新类**  

| 原始字段 | 新配置类 | 说明 | 
| --- | --- | --- | 
| applicationName | ConfigsBuilder | 此 KCL 应用程序的名称。用作 tableName 和 consumerName 的默认名称。 | 
| tableName | ConfigsBuilder | 允许覆盖用于 Amazon DynamoDB 租赁表的表名称。 | 
| streamName | ConfigsBuilder | 此应用程序从其中处理记录的流的名称。 | 
| kinesisEndpoint | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| dynamoDBEndpoint | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| initialPositionInStreamExtended | RetrievalConfig | 分片中 KCL 开始获取记录的位置，从应用程序的初始运行开始。 | 
| kinesisCredentialsProvider | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| dynamoDBCredentialsProvider | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| cloudWatchCredentialsProvider | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| failoverTimeMillis | LeaseManagementConfig | 在您可以将租赁所有者视为已失败之前必须经过的毫秒数。 | 
| workerIdentifier | ConfigsBuilder | 表示应用程序处理器的这种实例化的唯一标识符。此值必须唯一。 | 
| shardSyncIntervalMillis | LeaseManagementConfig | 分片同步调用之间的时间。 | 
| maxRecords | PollingConfig | 允许设置 Kinesis 返回的最大记录数。 | 
| idleTimeBetweenReadsInMillis | CoordinatorConfig | 此选项已删除。请参阅“闲置时间删除”。 | 
| callProcessRecordsEvenForEmptyRecordList | ProcessorConfig | 如果设置，即使 Kinesis 中未提供任何记录，也会调用记录处理器。 | 
| parentShardPollIntervalMillis | CoordinatorConfig | 记录处理器应轮询多少时间才能查看是否已完成父分片。 | 
| cleanupLeasesUponShardCompletion | LeaseManagementConfig | 如果设置，只要子租赁已开始处理，即可删除租赁。 | 
| ignoreUnexpectedChildShards | LeaseManagementConfig | 如果设置，将忽略具有打开的分片的子分片。这主要适用于 DynamoDB Streams。 | 
| kinesisClientConfig | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| dynamoDBClientConfig | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| cloudWatchClientConfig | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| taskBackoffTimeMillis | LifecycleConfig | 等待重试失败任务的时间。 | 
| metricsBufferTimeMillis | MetricsConfig | 控制 CloudWatch 指标发布。 | 
| metricsMaxQueueSize | MetricsConfig | 控制 CloudWatch 指标发布。 | 
| metricsLevel | MetricsConfig | 控制 CloudWatch 指标发布。 | 
| metricsEnabledDimensions | MetricsConfig | 控制 CloudWatch 指标发布。 | 
| validateSequenceNumberBeforeCheckpointing | CheckpointConfig | 此选项已删除。请参阅“检查点序列号验证”。 | 
| regionName | ConfigsBuilder | 此选项已删除。请参阅“客户端配置删除”。 | 
| maxLeasesForWorker | LeaseManagementConfig | 应用程序的单个实例应接受的最大租赁数量。 | 
| maxLeasesToStealAtOneTime | LeaseManagementConfig | 应用程序一次应尝试窃取的最大租赁数量。 | 
| initialLeaseTableReadCapacity | LeaseManagementConfig | 在 Kinesis 客户端库需要创建新的 DynamoDB 租用表时使用的 DynamoDB IOPs 读取。 | 
| initialLeaseTableWriteCapacity | LeaseManagementConfig | 在 Kinesis 客户端库需要创建新的 DynamoDB 租用表时使用的 DynamoDB IOPs 读取。 | 
| initialPositionInStreamExtended | LeaseManagementConfig | 应用程序应在流中开始的初始位置。此值仅在创建初始租赁时使用。 | 
| skipShardSyncAtWorkerInitializationIfLeasesExist | CoordinatorConfig | 如果租赁表包含现有租赁，请禁用同步的分片数据。待办事项： KinesisEco-438 | 
| shardPrioritization | CoordinatorConfig | 要使用的分片优先级。 | 
| shutdownGraceMillis | 不适用 | 此选项已删除。参见 “ MultiLang 移除”。 | 
| timeoutInSeconds | 不适用 | 此选项已删除。参见 “ MultiLang 移除”。 | 
| retryGetRecordsInSeconds | PollingConfig | 配置 GetRecords 尝试失败之间的延迟。 | 
| maxGetRecordsThreadPool | PollingConfig | 使用的线程池大小 GetRecords。 | 
| maxLeaseRenewalThreads | LeaseManagementConfig | 控制租赁续订线程池的大小。您的应用程序可以容纳的租赁越多，此池应该就越大。 | 
| recordsFetcherFactory | PollingConfig | 允许替换用于创建从流中检索的提取程序的工厂。 | 
| logWarningForTaskAfterMillis | LifecycleConfig | 任务尚未完成的情况下在记录警告之前要等待的时长。 | 
| listShardsBackoffTimeInMillis | RetrievalConfig | 发生故障时在调用 ListShards 之间要等待的时间（以毫秒为单位）。 | 
| maxListShardsRetryAttempts | RetrievalConfig | ListShards 在放弃之前重试的最长时间。 | 

## 闲置时间删除
<a name="idle-time-removal"></a>

在 KCL 版本 1.x 中，`idleTimeBetweenReadsInMillis` 对应了两个数量：
+ 任务分派检查之间的时间量。您现在可以通过设置 `CoordinatorConfig#shardConsumerDispatchPollIntervalMillis` 来在任务之间配置此时间。
+ 未从 Kinesis Data Streams 中返回任何记录时的睡眠时间量。在版本 2.0 中，带增强型扇出功能的记录是从其各自的检索器中推送的。分片消费端上的活动仅发生在推送的请求到达时。

## 客户端配置删除
<a name="client-configuration-removals"></a>

在版本 2.0 中，KCL 不再创建客户端。这取决于用户提供有效的客户端。进行此更改后，已删除控制客户端创建的所有配置参数。如果需要这些参数，您可以在向 `ConfigsBuilder` 提供客户端之前在客户端上设置它们。


****  

| 已删除字段 | 等效配置 | 
| --- | --- | 
| kinesisEndpoint | 使用以下首选端点配置开发工具包 KinesisAsyncClient：KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()。 | 
| dynamoDBEndpoint | 使用以下首选端点配置开发工具包 DynamoDbAsyncClient：DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()。 | 
| kinesisClientConfig | 使用以下所需配置来配置开发工具包 KinesisAsyncClient：KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| dynamoDBClientConfig | 使用以下所需配置来配置开发工具包 DynamoDbAsyncClient：DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| cloudWatchClientConfig | 使用以下所需配置来配置开发工具包 CloudWatchAsyncClient：CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()。 | 
| regionName | 使用首选区域配置开发工具包。这对所有开发工具包客户端均相同。例如 KinesisAsyncClient.builder().region(Region.US\$1WEST\$12).build()。 | 