从 KCL 1.x 迁移到 KCL 3.x - Amazon DynamoDB

从 KCL 1.x 迁移到 KCL 3.x

概览

本指南提供有关将使用者应用程序从 KCL 1.x 迁移到 KCL 3.x 的说明。由于 KCL 1.x 和 KCL 3.x 之间的架构差异,迁移需要更新多个组件以确保兼容性。

与 KCL 3.x 相比,KCL 1.x 使用不同的类和接口。与 KCL 3.x 相比,KCL 1.x 使用不同的类和接口。必须先将记录处理器、记录处理器工厂和工作线程类迁移到 KCL 3.x 兼容格式,然后按照将 KCL 1.x 迁移到 KCL 3.x 的迁移步骤进行操作。

迁移步骤

步骤 1:迁移记录处理器

以下示例显示了为 KCL 1.x DynamoDB Streams Kinesis Adapter 实现的记录处理器:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
迁移 RecordProcessor 类
  1. 将接口从 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware 更改为 com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor,如下所示:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. 更新 initializeprocessRecords 方法的导入语句:

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. 使用以下新方法替换 shutdown 方法:leaseLostshardEndedshutdownRequested

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

下面是记录处理器类的更新版本:

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
注意

DynamoDB Streams Kinesis Adapter 现在使用 SDKv2 记录模型。在 SDKv2 中,复杂的 AttributeValue 对象(BSNSMLSS)从不会返回 null。使用 hasBs()hasNs()hasM()hasL()hasSs() 方法来验证这些值是否存在。

步骤 2:迁移记录处理器工厂

记录处理器工厂负责在获得租约时创建记录处理器。下面是 KCL 1.x 工厂的示例:

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
迁移到 RecordProcessorFactory
  • 将已实施的接口从 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory 更改为 software.amazon.kinesis.processor.ShardRecordProcessorFactory,如下所示:

    com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

下面是 3.0 中的记录处理器工厂的示例:

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

步骤 3:迁移工作线程

在 KCL 版本 3.0 中,名为调度器的新类取代了工作线程类。下面是 KCL 1.x 工作线程的示例:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
迁移工作程序
  1. Worker 类的 import 语句更改为 SchedulerConfigsBuilder 类的导入语句。

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. 导入 StreamTracker 并将 StreamsWorkerFactory 的导入更改为 StreamsSchedulerFactory

    import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
  3. 选择从中启动应用程序的位置。它可以为 TRIM_HORIZONLATEST

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. 创建一个 StreamTracker 实例。

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. 创建 AmazonDynamoDBStreamsAdapterClient 对象。

    import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. 创建 ConfigsBuilder 对象。

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. 创建 Scheduler,如以下示例所示:

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
重要

CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X 设置在 KCL v3 与 KCL v1 的 DynamoDB Streams Kinesis Adapter 之间保持兼容性,而未在 KCL v2 和 v3 之间保持兼容性。

步骤 4:KCL 3.x 配置概述和建议

有关 KCL 1.x 之后引入的与 KCL 3.x 相关的配置的详细描述,请参阅 KCL configurationsKCL migration client configuration

在 KCL 3.x 中具有更新默认值的配置

billingMode

在 KCL 版本 1.x 中,billingMode 的默认值设置为 PROVISIONED。但在 KCL 版本 3.x 中,默认 billingModePAY_PER_REQUEST(按需模式)。我们建议您对租约表使用按需容量模式,以便根据您的使用情况自动调整容量。有关对租约表使用预置容量的指导,请参阅 Best practices for the lease table with provisioned capacity mode

idleTimeBetweenReadsInMillis

在 KCL 版本 1.x 中,idleTimeBetweenReadsInMillis 的默认值设置为 1000(或 1 秒)。KCL 版本 3.x 将 idleTimeBetweenReadsInMillis 的默认值设置为 1500(或 1.5 秒),但 Amazon DynamoDB Streams Kinesis Adapter 将默认值改写为 1000(或 1 秒)。

KCL 3.x 中的新配置

leaseAssignmentIntervalMillis

此配置定义在新发现的分片开始处理之前的时间间隔,计算方法为 1.5 × leaseAssignmentIntervalMillis。如果未显式配置此设置,则时间间隔默认为 1.5 × failoverTimeMillis。处理新分片包括扫描租约表并在租约表上查询全局二级索引(GSI)。降低 leaseAssignmentIntervalMillis 会增加这些扫描和查询操作的频率,从而导致 DynamoDB 成本更高。我们建议将此值设置为 2000,以最大限度地减少处理新分片的延迟。

shardConsumerDispatchPollIntervalMillis

此配置定义了分片使用者用于触发状态转换的连续轮询之间的间隔。在 KCL 版本 1.x 中,此行为由 idleTimeInMillis 参数控制,该参数未作为可配置的设置公开。在 KCL 版本 3.x 中,我们建议将此配置设置为与 KCL 版本 1.x 设置中用于 idleTimeInMillis 的值相匹配。

Amazon DynamoDB Streams Kinesis Adapter 未覆盖的配置

shardSyncIntervalMillis

与 KCL 版本 1.x 兼容的 DynamoDB Streams Kinesis Adapter 显式将 shardSyncIntervalMillis 设置为 0。相比之下,与 KCL 版本 3.x 兼容的 DynamoDB Streams Kinesis Adapter 不再为此配置设置值。要保持与版本 1.x 相同的适配器行为,请将此配置的值设置为 0。

leasesRecoveryAuditorExecutionFrequencyMillis

与 KCL 版本 1.x 兼容的 DynamoDB Streams Kinesis Adapter 显式将 leasesRecoveryAuditorExecutionFrequencyMillis 设置为 1000。相比之下,与 KCL 版本 3.x 兼容的 DynamoDB Streams Kinesis Adapter 不再为此配置设置默认值。要保持与版本 1.x 相同的适配器行为,请将此配置的值设置为 1000。

步骤 5:从 KCL 2.x 迁移到 KCL 3.x

为确保平稳过渡并与最新的 Kinesis Client Library(KCL)版本兼容,请按照迁移指南的 upgrading from KCL 2.x to KCL 3.x 说明中的步骤 5-8 进行操作。