本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
從 KCL 1.x 遷移至 KCL 3.x
概觀
本指南提供將消費者應用程式從 KCL 1.x 遷移至 KCL 3.x 的說明。由於 KCL 1.x 和 KCL 3.x 之間的架構差異,遷移需要更新數個元件以確保相容性。
相較於 KCL 3.x,KCL 1.x 使用不同的類別和界面。相較於 KCL 3.x,KCL 1.x 使用不同的類別和界面。您必須先將記錄處理器、記錄處理器工廠和工作者類別遷移至 KCL 3.x 相容格式,並遵循 KCL 1.x 遷移至 KCL 3.x 遷移的步驟。
遷移步驟
步驟 1:遷移記錄處理器
下列範例顯示針對 KCL 1.x DynamoDB Streams Kinesis 轉接器實作的記錄處理器:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
遷移 RecordProcessor 類別
-
將介面從
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
和com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
變更為com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
,如下所示:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
-
更新
initialize
和processRecords
方法的匯入陳述式:// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
-
將
shutdown
方法取代為以下的新方法:leaseLost
、shardEnded
和shutdownRequested
。// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
以下是記錄處理器類別的更新版本:
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
注意
DynamoDB Streams Kinesis Adapter 現在使用 SDKv2 記錄模型。在 SDKv2 中,複雜AttributeValue
物件 (BS
、NS
、M
、L
、SS
) 永遠不會傳回 null。使用 hasBs()
、hasNs()
、hasL()
、、 hasM()
hasSs()
方法來驗證這些值是否存在。
步驟 2:遷移記錄處理器工廠
記錄處理器處理站負責在取得租用時建立記錄處理器。以下是 KCL 1.x 工廠的範例:
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
遷移 RecordProcessorFactory
-
將實作的界面從 變更為
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
software.amazon.kinesis.processor.ShardRecordProcessorFactory
,如下所示:com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
以下是 3.0 中記錄處理器工廠的範例:
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
步驟 3:遷移工作者
在 KCL 的 3.0 版中,稱為 Scheduler 的新類別會取代工作者類別。以下是 KCL 1.x 工作者的範例:
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
遷移至工作者
-
將
Worker
類別的import
陳述式變更為Scheduler
和ConfigsBuilder
類別的匯入陳述式。// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
將 的匯入
StreamTracker
和變更StreamsWorkerFactory
匯入至StreamsSchedulerFactory
。import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
-
選擇要從中啟動應用程式的位置。它可以是
TRIM_HORIZON
或LATEST
。import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-
建立
StreamTracker
執行個體。StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
-
建立
AmazonDynamoDBStreamsAdapterClient
物件。import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
-
建立
ConfigsBuilder
物件。import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
-
建立
Scheduler
,如下列範例所示:import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
重要
此CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
設定可維持適用於 KCL v3 的 DynamoDB Streams Kinesis Adapter 與 KCL v1 之間的相容性,而非 KCL v2 與 v3 之間的相容性。
步驟 4:KCL 3.x 組態概觀和建議
如需 KCL 3.x 中相關 KCL 1.x 後推出的組態詳細說明,請參閱 KCL 組態和 KCL 遷移用戶端組態。
KCL 3.x 中具有更新預設值的組態
billingMode
-
在 KCL 1.x 版中, 的預設值
billingMode
設定為PROVISIONED
。不過,KCL 3.x 版的預設值billingMode
為PAY_PER_REQUEST
(隨需模式)。我們建議您使用租賃資料表的隨需容量模式,根據您的用量自動調整容量。如需為您的租用資料表使用佈建容量的指引,請參閱具有佈建容量模式之租用資料表的最佳實務。 idleTimeBetweenReadsInMillis
-
在 KCL 1.x 版中, 的預設值
idleTimeBetweenReadsInMillis
設定為 1,000 (或 1 秒)。KCL 3.x 版會將 idleTimeBetweenReadsInMillis
的預設值設為 1,500 (或 1.5 秒),但 Amazon DynamoDB Streams Kinesis Adapter 會將預設值覆寫為 1,000 (或 1 秒)。
KCL 3.x 中的新組態
leaseAssignmentIntervalMillis
-
此組態會定義新發現碎片開始處理之前的時間間隔,計算方式為 1.5 ×
leaseAssignmentIntervalMillis
。如果未明確設定此設定,則時間間隔預設為 1.5 ×failoverTimeMillis
。處理新碎片需要掃描租用資料表,並在租用資料表上查詢全域次要索引 (GSI)。降低leaseAssignmentIntervalMillis
會增加這些掃描和查詢操作的頻率,進而產生更高的 DynamoDB 成本。我們建議將此值設定為 2000,以將處理新碎片的延遲降至最低。 shardConsumerDispatchPollIntervalMillis
-
此組態定義碎片取用者連續輪詢之間觸發狀態轉換的間隔。在 KCL 1.x 版中,此行為是由 參數控制,該
idleTimeInMillis
參數並未公開為可設定設定。使用 KCL 3.x 版,我們建議您將此組態設定為符合 KCL 1.x 版設定中用於idleTimeInMillis
的值。
Amazon DynamoDB Streams Kinesis Adapter 未覆寫的組態
shardSyncIntervalMillis
-
與 KCL 1.x 版相容的 DynamoDB Streams Kinesis Adapter 明確
shardSyncIntervalMillis
設定為 0。相比之下,與 KCL 3.x 版相容的 DynamoDB Streams Kinesis Adapter 不會再為此組態設定值。若要維持與 1.x 版相同的轉接器行為,請將此組態的值設為 0。 leasesRecoveryAuditorExecutionFrequencyMillis
-
與 KCL 1.x 版相容的 DynamoDB Streams Kinesis Adapter 明確
leasesRecoveryAuditorExecutionFrequencyMillis
設定為 1000。相比之下,與 KCL 3.x 版相容的 DynamoDB Streams Kinesis Adapter 不會再為此組態設定預設值。若要維持與 1.x 版相同的轉接器行為,請將此組態的值設定為 1000。
步驟 5:從 KCL 2.x 遷移至 KCL 3.x
若要確保與最新的 Kinesis Client Library (KCL) 版本順利轉換和相容性,請遵循遷移指南中從 KCL 2.x 升級到 KCL 3.x 的指示中的步驟 5-8。