从 KCL 1.x 迁移到 KCL 3.x
概览
本指南提供有关将使用者应用程序从 KCL 1.x 迁移到 KCL 3.x 的说明。由于 KCL 1.x 和 KCL 3.x 之间的架构差异,迁移需要更新多个组件以确保兼容性。
与 KCL 3.x 相比,KCL 1.x 使用不同的类和接口。与 KCL 3.x 相比,KCL 1.x 使用不同的类和接口。必须先将记录处理器、记录处理器工厂和工作线程类迁移到 KCL 3.x 兼容格式,然后按照将 KCL 1.x 迁移到 KCL 3.x 的迁移步骤进行操作。
迁移步骤
步骤 1:迁移记录处理器
以下示例显示了为 KCL 1.x DynamoDB Streams Kinesis Adapter 实现的记录处理器:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
迁移 RecordProcessor 类
-
将接口从
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
和com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
更改为com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
,如下所示:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
-
更新
initialize
和processRecords
方法的导入语句:// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
-
使用以下新方法替换
shutdown
方法:leaseLost
、shardEnded
和shutdownRequested
。// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
下面是记录处理器类的更新版本:
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
注意
DynamoDB Streams Kinesis Adapter 现在使用 SDKv2 记录模型。在 SDKv2 中,复杂的 AttributeValue
对象(BS
、NS
、M
、L
、SS
)从不会返回 null。使用 hasBs()
、hasNs()
、hasM()
、hasL()
、hasSs()
方法来验证这些值是否存在。
步骤 2:迁移记录处理器工厂
记录处理器工厂负责在获得租约时创建记录处理器。下面是 KCL 1.x 工厂的示例:
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
迁移到 RecordProcessorFactory
-
将已实施的接口从
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
更改为software.amazon.kinesis.processor.ShardRecordProcessorFactory
,如下所示:com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
下面是 3.0 中的记录处理器工厂的示例:
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
步骤 3:迁移工作线程
在 KCL 版本 3.0 中,名为调度器的新类取代了工作线程类。下面是 KCL 1.x 工作线程的示例:
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
迁移工作程序
-
将
Worker
类的import
语句更改为Scheduler
和ConfigsBuilder
类的导入语句。// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
导入
StreamTracker
并将StreamsWorkerFactory
的导入更改为StreamsSchedulerFactory
。import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
-
选择从中启动应用程序的位置。它可以为
TRIM_HORIZON
或LATEST
。import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-
创建一个
StreamTracker
实例。StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
-
创建
AmazonDynamoDBStreamsAdapterClient
对象。import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
-
创建
ConfigsBuilder
对象。import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
-
创建
Scheduler
,如以下示例所示:import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
重要
CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
设置在 KCL v3 与 KCL v1 的 DynamoDB Streams Kinesis Adapter 之间保持兼容性,而未在 KCL v2 和 v3 之间保持兼容性。
步骤 4:KCL 3.x 配置概述和建议
有关 KCL 1.x 之后引入的与 KCL 3.x 相关的配置的详细描述,请参阅 KCL configurations 和 KCL migration client configuration。
在 KCL 3.x 中具有更新默认值的配置
billingMode
-
在 KCL 版本 1.x 中,
billingMode
的默认值设置为PROVISIONED
。但在 KCL 版本 3.x 中,默认billingMode
为PAY_PER_REQUEST
(按需模式)。我们建议您对租约表使用按需容量模式,以便根据您的使用情况自动调整容量。有关对租约表使用预置容量的指导,请参阅 Best practices for the lease table with provisioned capacity mode。 idleTimeBetweenReadsInMillis
-
在 KCL 版本 1.x 中,
idleTimeBetweenReadsInMillis
的默认值设置为 1000(或 1 秒)。KCL 版本 3.x 将 idleTimeBetweenReadsInMillis
的默认值设置为 1500(或 1.5 秒),但 Amazon DynamoDB Streams Kinesis Adapter 将默认值改写为 1000(或 1 秒)。
KCL 3.x 中的新配置
leaseAssignmentIntervalMillis
-
此配置定义在新发现的分片开始处理之前的时间间隔,计算方法为 1.5 ×
leaseAssignmentIntervalMillis
。如果未显式配置此设置,则时间间隔默认为 1.5 ×failoverTimeMillis
。处理新分片包括扫描租约表并在租约表上查询全局二级索引(GSI)。降低leaseAssignmentIntervalMillis
会增加这些扫描和查询操作的频率,从而导致 DynamoDB 成本更高。我们建议将此值设置为 2000,以最大限度地减少处理新分片的延迟。 shardConsumerDispatchPollIntervalMillis
-
此配置定义了分片使用者用于触发状态转换的连续轮询之间的间隔。在 KCL 版本 1.x 中,此行为由
idleTimeInMillis
参数控制,该参数未作为可配置的设置公开。在 KCL 版本 3.x 中,我们建议将此配置设置为与 KCL 版本 1.x 设置中用于idleTimeInMillis
的值相匹配。
Amazon DynamoDB Streams Kinesis Adapter 未覆盖的配置
shardSyncIntervalMillis
-
与 KCL 版本 1.x 兼容的 DynamoDB Streams Kinesis Adapter 显式将
shardSyncIntervalMillis
设置为 0。相比之下,与 KCL 版本 3.x 兼容的 DynamoDB Streams Kinesis Adapter 不再为此配置设置值。要保持与版本 1.x 相同的适配器行为,请将此配置的值设置为 0。 leasesRecoveryAuditorExecutionFrequencyMillis
-
与 KCL 版本 1.x 兼容的 DynamoDB Streams Kinesis Adapter 显式将
leasesRecoveryAuditorExecutionFrequencyMillis
设置为 1000。相比之下,与 KCL 版本 3.x 兼容的 DynamoDB Streams Kinesis Adapter 不再为此配置设置默认值。要保持与版本 1.x 相同的适配器行为,请将此配置的值设置为 1000。
步骤 5:从 KCL 2.x 迁移到 KCL 3.x
为确保平稳过渡并与最新的 Kinesis Client Library(KCL)版本兼容,请按照迁移指南的 upgrading from KCL 2.x to KCL 3.x 说明中的步骤 5-8 进行操作。