從 KCL 1.x 遷移至 KCL 3.x - Amazon DynamoDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

從 KCL 1.x 遷移至 KCL 3.x

概觀

本指南提供將消費者應用程式從 KCL 1.x 遷移至 KCL 3.x 的說明。由於 KCL 1.x 和 KCL 3.x 之間的架構差異,遷移需要更新數個元件以確保相容性。

相較於 KCL 3.x,KCL 1.x 使用不同的類別和界面。相較於 KCL 3.x,KCL 1.x 使用不同的類別和界面。您必須先將記錄處理器、記錄處理器工廠和工作者類別遷移至 KCL 3.x 相容格式,並遵循 KCL 1.x 遷移至 KCL 3.x 遷移的步驟。

遷移步驟

步驟 1:遷移記錄處理器

下列範例顯示針對 KCL 1.x DynamoDB Streams Kinesis 轉接器實作的記錄處理器:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
遷移 RecordProcessor 類別
  1. 將介面從 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware 變更為 com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor,如下所示:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. 更新 initializeprocessRecords方法的匯入陳述式:

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. shutdown 方法取代為以下的新方法:leaseLostshardEndedshutdownRequested

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

以下是記錄處理器類別的更新版本:

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
注意

DynamoDB Streams Kinesis Adapter 現在使用 SDKv2 記錄模型。在 SDKv2 中,複雜AttributeValue物件 (BSNSMLSS) 永遠不會傳回 null。使用 hasBs()hasNs()hasL()、、 hasM()hasSs()方法來驗證這些值是否存在。

步驟 2:遷移記錄處理器工廠

記錄處理器處理站負責在取得租用時建立記錄處理器。以下是 KCL 1.x 工廠的範例:

package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
遷移 RecordProcessorFactory
  • 將實作的界面從 變更為 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory,如下所示:

    com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

以下是 3.0 中記錄處理器工廠的範例:

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

步驟 3:遷移工作者

在 KCL 的 3.0 版中,稱為 Scheduler 的新類別會取代工作者類別。以下是 KCL 1.x 工作者的範例:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
遷移至工作者
  1. Worker 類別的 import 陳述式變更為 SchedulerConfigsBuilder 類別的匯入陳述式。

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. 將 的匯入StreamTracker和變更StreamsWorkerFactory匯入至 StreamsSchedulerFactory

    import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
  3. 選擇要從中啟動應用程式的位置。它可以是 TRIM_HORIZONLATEST

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. 建立StreamTracker執行個體。

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. 建立 AmazonDynamoDBStreamsAdapterClient 物件。

    import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. 建立 ConfigsBuilder 物件。

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. 建立 Scheduler,如下列範例所示:

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
重要

CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X設定可維持適用於 KCL v3 的 DynamoDB Streams Kinesis Adapter 與 KCL v1 之間的相容性,而非 KCL v2 與 v3 之間的相容性。

步驟 4:KCL 3.x 組態概觀和建議

如需 KCL 3.x 中相關 KCL 1.x 後推出的組態詳細說明,請參閱 KCL 組態KCL 遷移用戶端組態

KCL 3.x 中具有更新預設值的組態

billingMode

在 KCL 1.x 版中, 的預設值billingMode設定為 PROVISIONED。不過,KCL 3.x 版的預設值billingModePAY_PER_REQUEST(隨需模式)。我們建議您使用租賃資料表的隨需容量模式,根據您的用量自動調整容量。如需為您的租用資料表使用佈建容量的指引,請參閱具有佈建容量模式之租用資料表的最佳實務

idleTimeBetweenReadsInMillis

在 KCL 1.x 版中, 的預設值idleTimeBetweenReadsInMillis設定為 1,000 (或 1 秒)。KCL 3.x 版會將 idleTimeBetweenReadsInMillis 的預設值設為 1,500 (或 1.5 秒),但 Amazon DynamoDB Streams Kinesis Adapter 會將預設值覆寫為 1,000 (或 1 秒)。

KCL 3.x 中的新組態

leaseAssignmentIntervalMillis

此組態會定義新發現碎片開始處理之前的時間間隔,計算方式為 1.5 × leaseAssignmentIntervalMillis。如果未明確設定此設定,則時間間隔預設為 1.5 × failoverTimeMillis。處理新碎片需要掃描租用資料表,並在租用資料表上查詢全域次要索引 (GSI)。降低 leaseAssignmentIntervalMillis會增加這些掃描和查詢操作的頻率,進而產生更高的 DynamoDB 成本。我們建議將此值設定為 2000,以將處理新碎片的延遲降至最低。

shardConsumerDispatchPollIntervalMillis

此組態定義碎片取用者連續輪詢之間觸發狀態轉換的間隔。在 KCL 1.x 版中,此行為是由 參數控制,該idleTimeInMillis參數並未公開為可設定設定。使用 KCL 3.x 版,我們建議您將此組態設定為符合 KCL 1.x 版設定中用於 idleTimeInMillis 的值。

Amazon DynamoDB Streams Kinesis Adapter 未覆寫的組態

shardSyncIntervalMillis

與 KCL 1.x 版相容的 DynamoDB Streams Kinesis Adapter 明確shardSyncIntervalMillis設定為 0。相比之下,與 KCL 3.x 版相容的 DynamoDB Streams Kinesis Adapter 不會再為此組態設定值。若要維持與 1.x 版相同的轉接器行為,請將此組態的值設為 0。

leasesRecoveryAuditorExecutionFrequencyMillis

與 KCL 1.x 版相容的 DynamoDB Streams Kinesis Adapter 明確leasesRecoveryAuditorExecutionFrequencyMillis設定為 1000。相比之下,與 KCL 3.x 版相容的 DynamoDB Streams Kinesis Adapter 不會再為此組態設定預設值。若要維持與 1.x 版相同的轉接器行為,請將此組態的值設定為 1000。

步驟 5:從 KCL 2.x 遷移至 KCL 3.x

若要確保與最新的 Kinesis Client Library (KCL) 版本順利轉換和相容性,請遵循遷移指南中從 KCL 2.x 升級到 KCL 3.x 的指示中的步驟 5-8。