從 KCL 1.x 移轉到 KCL 3.x - Amazon DynamoDB

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

從 KCL 1.x 移轉到 KCL 3.x

概觀

本指南提供如何將取用者應用程式從 KCL 1.x 移轉到 KCL 3.x 的說明。由於 KCL 1.x 和 KCL 3.x 之間的架構差異,移轉需要更新多個元件以確保相容性。

KCL 1.x 使用的類別和介面與 KCL 3.x 不同。您必須先將記錄處理器、記錄處理器處理站及工作者類別移轉至 KCL 3.x 相容格式,並遵循 KCL 1.x 移轉至 KCL 3.x 的移轉步驟。

移轉步驟

步驟 1:移轉記錄處理器

以下範例顯示基於 KCL 1.x DynamoDB Streams Kinesis 轉接器所實作的記錄處理器:

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
移轉 RecordProcessor 類別
  1. 將介面從 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware 更改為 software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor,如下所示:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. 更新 initializeprocessRecords 方法的匯入陳述式:

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. shutdownRequested 方法取代為以下的新方法:leaseLostshardEndedshutdownRequested

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

記錄處理器類別經更新後的版本如下:

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
注意

DynamoDB Streams Kinesis 轉接器現在使用 SDKv2 記錄模型。在 SDKv2 中,複雜 AttributeValue 物件 (BSNSMLSS) 永遠不會傳回空值。使用 hasBs()hasNs()hasM()hasL()hasSs() 方法驗證這些值是否存在。

步驟 2:移轉記錄處理器處理站

記錄處理器處理站負責在取得租用時建立記錄處理器。以下是 KCL 1.x 處理站的範例:

package com.amazonaws.codesamples; import software.amazon.dynamodb.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
移轉 RecordProcessorFactory
  • 將實作的介面從 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory 更改為 software.amazon.kinesis.processor.ShardRecordProcessorFactory,如下所示:

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

以下是 3.0 中記錄處理器工廠的範例:

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

步驟 3:移轉工作者

在 KCL 的版本 3.0,名為排程器​的新類別會取代工作者類別。以下是 KCL 1.x 工作者的範例:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
移轉至工作者
  1. Worker 類別的 import 陳述式變更為 SchedulerConfigsBuilder 類別的匯入陳述式。

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. 匯入 StreamTracker 並將匯入 StreamsWorkerFactory 變更至 StreamsSchedulerFactory

    import software.amazon.kinesis.processor.StreamTracker; // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory; import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
  3. 選擇要啟動應用程式的位置。其可能是 TRIM_HORIZONLATEST

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. 建立 StreamTracker 執行個體。

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. 建立 AmazonDynamoDBStreamsAdapterClient 物件。

    import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. 建立 ConfigsBuilder 物件。

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. 使用 ConfigsBuilder 建立 Scheduler,如下列範例所示:

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); // Use ConfigsBuilder to configure settings RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
重要

CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X 設定可維持 KCL v3 與 KCL v1 之間的 DynamoDB Streams Kinesis Adapter 相容性,而非 KCL v2 與 v3 之間的相容性。

步驟 4:KCL 3.x 組態概觀和建議

如需詳細了解 KCL 3.x 中,在 KCL 1.x 之後推出的相關組態,請參閱 KCL 組態KCL 移轉用戶端組態

重要

我們建議使用 ConfigsBuilder 在 KCL 3.x 及更新版本中設定組態,而非直接建立 checkpointConfigcoordinatorConfigleaseManagementConfigmetricsConfigprocessorConfigretrievalConfig 的物件,以避免排程器初始化問題。ConfigsBuilder 則提供更靈活且跟更容易維護的 KCL 應用程式設定方式。

KCL 3.x 的更新預設值組態

billingMode

KCL 1.x 版中,billingMode 的預設值設定為 PROVISIONED。不過,KCL 3.x 版 billingMode 的預設值為 PAY_PER_REQUEST (隨需模式)。我們建議租用資料表使用隨需容量模式,根據用量自動調整容量。如需租用資料表使用佈建容量的指南,請參閱具有佈建容量模式的租用資料表最佳實務

idleTimeBetweenReadsInMillis

在 KCL 1.x 版中,idleTimeBetweenReadsInMillis 的預設值設定為 1,000 (或 1 秒)。KCL 3.x 版會將 idleTimeBetweenReadsInMillis 的預設值設定為 1,500 (或 1.5 秒),但 Amazon DynamoDB Streams Kinesis 轉接器會將預設值覆寫為 1,000 (或 1 秒)。

KCL 3.x 中的新組態

leaseAssignmentIntervalMillis

此組態會定義新發現碎片開始處理之前的時間間隔,計算方式為 1.5 x leaseAssignmentIntervalMillis。如果未明確配置此設定,則時間間隔預設為 1.5 x failoverTimeMillis。處理新碎片包含掃描租用資料表,並在租用資料表上查詢全域次要索引 (GSI)。降低 leaseAssignmentIntervalMillis 會增加掃描和查詢操作的頻率,進而產生更高的 DynamoDB 成本。我們建議將此值設定為 2000 (或 2 秒),將處理新碎片的延遲降至最低。

shardConsumerDispatchPollIntervalMillis

此組態定義了碎片取用者進行連續輪詢以觸發狀態轉換的間隔時間。在 KCL 1.x 版中,此行為由 idleTimeInMillis 參數控制,該參數並未公開為可配置設定。使用 KCL 3.x 版時,我們建議設定此組態,符合您在 KCL 1.x 版設定用於 idleTimeInMillis 的值。

步驟 5:從 KCL 2.x 移轉至 KCL 3.x

為了確保順利轉換至最新的 Kinesis Client Library (KCL) 版本及其相容性,請遵循移轉指南從 KCL 2.x 升級到 KCL 3.x 中,步驟 5-8 的指示。

如需常見的 KCL 3.x 疑難排解問題,請參閱 KCL 取用者應用程式疑難排解