本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
從 KCL 1.x 移轉到 KCL 3.x
概觀
本指南提供如何將取用者應用程式從 KCL 1.x 移轉到 KCL 3.x 的說明。由於 KCL 1.x 和 KCL 3.x 之間的架構差異,移轉需要更新多個元件以確保相容性。
KCL 1.x 使用的類別和介面與 KCL 3.x 不同。您必須先將記錄處理器、記錄處理器處理站及工作者類別移轉至 KCL 3.x 相容格式,並遵循 KCL 1.x 移轉至 KCL 3.x 的移轉步驟。
移轉步驟
步驟 1:移轉記錄處理器
以下範例顯示基於 KCL 1.x DynamoDB Streams Kinesis 轉接器所實作的記錄處理器:
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
移轉 RecordProcessor 類別
-
將介面從
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor和com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware更改為software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor,如下所示:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor -
更新
initialize和processRecords方法的匯入陳述式:// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; -
將
shutdownRequested方法取代為以下的新方法:leaseLost、shardEnded和shutdownRequested。// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
記錄處理器類別經更新後的版本如下:
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
注意
DynamoDB Streams Kinesis 轉接器現在使用 SDKv2 記錄模型。在 SDKv2 中,複雜 AttributeValue 物件 (BS、NS、M、L、SS) 永遠不會傳回空值。使用 hasBs()、hasNs()、hasM()、hasL()、hasSs() 方法驗證這些值是否存在。
步驟 2:移轉記錄處理器處理站
記錄處理器處理站負責在取得租用時建立記錄處理器。以下是 KCL 1.x 處理站的範例:
package com.amazonaws.codesamples; import software.amazon.dynamodb.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
移轉 RecordProcessorFactory
-
將實作的介面從
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory更改為software.amazon.kinesis.processor.ShardRecordProcessorFactory,如下所示:// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
以下是 3.0 中記錄處理器工廠的範例:
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
步驟 3:移轉工作者
在 KCL 的版本 3.0,名為排程器的新類別會取代工作者類別。以下是 KCL 1.x 工作者的範例:
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
移轉至工作者
-
將
Worker類別的import陳述式變更為Scheduler和ConfigsBuilder類別的匯入陳述式。// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder; -
匯入
StreamTracker並將匯入StreamsWorkerFactory變更至StreamsSchedulerFactory。import software.amazon.kinesis.processor.StreamTracker; // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory; import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory; -
選擇要啟動應用程式的位置。其可能是
TRIM_HORIZON或LATEST。import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; -
建立
StreamTracker執行個體。StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) ); -
建立
AmazonDynamoDBStreamsAdapterClient物件。import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion); -
建立
ConfigsBuilder物件。import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory()); -
使用
ConfigsBuilder建立Scheduler,如下列範例所示:import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); // Use ConfigsBuilder to configure settings RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
重要
CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X 設定可維持 KCL v3 與 KCL v1 之間的 DynamoDB Streams Kinesis Adapter 相容性,而非 KCL v2 與 v3 之間的相容性。
步驟 4:KCL 3.x 組態概觀和建議
如需詳細了解 KCL 3.x 中,在 KCL 1.x 之後推出的相關組態,請參閱 KCL 組態和 KCL 移轉用戶端組態。
重要
我們建議使用 ConfigsBuilder 在 KCL 3.x 及更新版本中設定組態,而非直接建立 checkpointConfig、coordinatorConfig、leaseManagementConfig、metricsConfig、processorConfig 及 retrievalConfig 的物件,以避免排程器初始化問題。ConfigsBuilder 則提供更靈活且跟更容易維護的 KCL 應用程式設定方式。
KCL 3.x 的更新預設值組態
billingMode-
KCL 1.x 版中,
billingMode的預設值設定為PROVISIONED。不過,KCL 3.x 版billingMode的預設值為PAY_PER_REQUEST(隨需模式)。我們建議租用資料表使用隨需容量模式,根據用量自動調整容量。如需租用資料表使用佈建容量的指南,請參閱具有佈建容量模式的租用資料表最佳實務。 idleTimeBetweenReadsInMillis-
在 KCL 1.x 版中,
idleTimeBetweenReadsInMillis的預設值設定為 1,000 (或 1 秒)。KCL 3.x 版會將 idleTimeBetweenReadsInMillis的預設值設定為 1,500 (或 1.5 秒),但 Amazon DynamoDB Streams Kinesis 轉接器會將預設值覆寫為 1,000 (或 1 秒)。
KCL 3.x 中的新組態
leaseAssignmentIntervalMillis-
此組態會定義新發現碎片開始處理之前的時間間隔,計算方式為 1.5 x
leaseAssignmentIntervalMillis。如果未明確配置此設定,則時間間隔預設為 1.5 xfailoverTimeMillis。處理新碎片包含掃描租用資料表,並在租用資料表上查詢全域次要索引 (GSI)。降低leaseAssignmentIntervalMillis會增加掃描和查詢操作的頻率,進而產生更高的 DynamoDB 成本。我們建議將此值設定為 2000 (或 2 秒),將處理新碎片的延遲降至最低。 shardConsumerDispatchPollIntervalMillis-
此組態定義了碎片取用者進行連續輪詢以觸發狀態轉換的間隔時間。在 KCL 1.x 版中,此行為由
idleTimeInMillis參數控制,該參數並未公開為可配置設定。使用 KCL 3.x 版時,我們建議設定此組態,符合您在 KCL 1.x 版設定用於idleTimeInMillis的值。
步驟 5:從 KCL 2.x 移轉至 KCL 3.x
為了確保順利轉換至最新的 Kinesis Client Library (KCL) 版本及其相容性,請遵循移轉指南從 KCL 2.x 升級到 KCL 3.x 中,步驟 5-8 的指示。
如需常見的 KCL 3.x 疑難排解問題,請參閱 KCL 取用者應用程式疑難排解。