KCL 1.x から KCL 3.x への移行
概要
このガイドでは、コンシューマーアプリケーションを KCL 1.x から KCL 3.x に移行する手順について説明します。KCL 1.x と KCL 3.x のアーキテクチャの違いにより、移行では互換性を確保するために複数のコンポーネントを更新する必要があります。
KCL 1.x は、KCL 3.x とは異なるクラスとインターフェイスを使用します。KCL 1.x は、KCL 3.x とは異なるクラスとインターフェイスを使用します。まずレコードプロセッサ、レコードプロセッサファクトリ、ワーカークラスを KCL 3.x 互換形式に移行し、KCL 1.x から KCL 3.x への移行手順に従う必要があります。
移行手順
トピック
ステップ 1: レコードプロセッサを移行する
以下の例は、KCL 1.x DynamoDB Streams Kinesis Adapter に実装されたレコードプロセッサを示しています。
package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
RecordProcessor クラスを移行するには
-
インターフェイスを
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor
およびcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware
からcom.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
に変更します。以下に例を示します。// import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
-
initialize
メソッドprocessRecords
とメソッドの import ステートメントを更新します。// import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
-
shutdown
メソッドを以下の新しいメソッドに置き換えます。leaseLost
、shardEnded
、およびshutdownRequested
。// @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }
以下に示しているのは、レコードプロセッサのクラスの更新されたバージョンです。
package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
注記
DynamoDB Streams Kinesis Adapter は SDKv2 レコードモデルを使用するようになりました。SDKv2 では、複雑な AttributeValue
オブジェクト (BS
、NS
、M
、L
、SS
) が null を返すことはありません。hasBs()
、hasNs()
、hasM()
、hasL()
、hasSs()
メソッドを使用して、これらの値が存在するかどうかを確認します。
ステップ 2: レコードプロセッサファクトリーを移行する
レコードプロセッサファクトリーは、リースが取得された際にレコードプロセッサの作成を担当します。以下に示しているのは、KCL 1.x ファクトリーの例です。
package com.amazonaws.codesamples; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
RecordProcessorFactory
を移行するには
-
実装されているインターフェイスを
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory
からsoftware.amazon.kinesis.processor.ShardRecordProcessorFactory
に変更します。以下に例を示します。com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory to software.amazon.kinesis.processor.ShardRecordProcessorFactory, as follows. // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {
以下は、3.0 のレコードプロセッサファクトリーの例です。
package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }
ステップ 3: ワーカーを移行する
バージョン 3.0 の KCL では、新しいクラス Scheduler によって Worker クラスが置き換えられます。KCL 1.x のワーカーの例を次に示します。
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
ワーカーを移行するには
-
Worker
クラスのimport
ステートメントをScheduler
クラスとConfigsBuilder
クラスのインポートステートメントに変更します。// import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
-
StreamTracker
をインポートし、StreamsWorkerFactory
のインポートをStreamsSchedulerFactory
に変更します。import software.amazon.kinesis.processor.StreamTracker; // import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsWorkerFactory; import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
-
アプリケーションを起動するポジションを選択します。
TRIM_HORIZON
、LATEST
のいずれかになります。import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
-
StreamTracker
インスタンスを作成します。StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
-
AmazonDynamoDBStreamsAdapterClient
オブジェクトを作成します。import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
-
ConfigsBuilder
オブジェクトを作成します。import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
-
次の例に示すように、
Scheduler
を作成します。import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X) Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
重要
この CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X
設定は、KCL v2 と v3 ではなく、KCL v3 と KCL v1 の DynamoDB Streams Kinesis Adapter 間の互換性を維持します。
ステップ 4: KCL 3.x 設定の概要と推奨事項
KCL 3.x に関連する KCL 1.x 後に導入された設定の詳細については、「KCL 設定」と「KCL 移行クライアント設定」を参照してください。
KCL 3.x でデフォルト値を更新する設定
billingMode
-
KCL バージョン 1.x では、
billingMode
のデフォルト値はPROVISIONED
に設定されます。ただし、KCL バージョン 3.x では、デフォルトのbillingMode
はPAY_PER_REQUEST
(オンデマンドモード) です。使用量に基づいて容量を自動的に調整するには、リーステーブルのオンデマンドキャパシティモードを使用することをお勧めします。リーステーブルにプロビジョンドキャパシティを使用するガイダンスについては、「プロビジョンドキャパシティモードのリーステーブルのベストプラクティス」を参照してください。 idleTimeBetweenReadsInMillis
-
KCL バージョン 1.x では、
idleTimeBetweenReadsInMillis
のデフォルト値は 1,000 (または 1 秒) に設定されています。KCL バージョン 3.x は idleTimeBetweenReadsInMillis
のデフォルト値を 1,500 (または 1.5 秒) に設定しますが、Amazon DynamoDB Streams Kinesis Adapter はデフォルト値を 1,000 (または 1 秒) に上書きします。
KCL 3.x の新しい設定
leaseAssignmentIntervalMillis
-
この設定では、新しく検出されたシャードが処理を開始するまでの時間間隔を定義し、1.5 ×
leaseAssignmentIntervalMillis
として計算されます。この設定が明示的に設定されていない場合、時間間隔はデフォルトで 1.5 ×failoverTimeMillis
に設定されます。新しいシャードの処理には、リーステーブルをスキャンし、リーステーブルのグローバルセカンダリインデックス (GSI) をクエリする必要があります。leaseAssignmentIntervalMillis
を小さくすると、これらのスキャンおよびクエリオペレーションの頻度が増加し、DynamoDB のコストが高くなります。新しいシャードの処理の遅延を最小限に抑えるために、この値を 2,000 に設定することをお勧めします。 shardConsumerDispatchPollIntervalMillis
-
この設定では、シャードコンシューマーによる連続するポーリング間の間隔を定義して、状態遷移をトリガーします。KCL バージョン 1.x では、この動作は
idleTimeInMillis
パラメータによって制御され、設定可能な設定として公開されませんでした。KCL バージョン 3.x では、KCL バージョン 1.x のセットアップでidleTimeInMillis
で使用される値と一致するようにこの設定を行うことをお勧めします。
Amazon DynamoDB Streams Kinesis Adapter によって上書きされない設定
shardSyncIntervalMillis
-
KCL バージョン 1.x と互換性のある DynamoDB Streams Kinesis Adapter は、明示的に
shardSyncIntervalMillis
を 0 に設定します。これに対して、KCL バージョン 3.x と互換性のある DynamoDB Streams Kinesis Adapter では、この設定の値が設定されなくなりました。バージョン 1.x と同じアダプター動作を維持するには、この設定の値を 0 に設定します。 leasesRecoveryAuditorExecutionFrequencyMillis
-
KCL バージョン 1.x と互換性のある DynamoDB Streams Kinesis Adapter は、明示的に
leasesRecoveryAuditorExecutionFrequencyMillis
を 1,000 に設定します。これに対して、KCL バージョン 3.x と互換性のある DynamoDB Streams Kinesis Adapter では、この設定のデフォルト値が設定されなくなりました。バージョン 1.x と同じアダプター動作を維持するには、この設定の値を 1,000 に設定します。
ステップ 5: KCL 2.x から KCL 3.x に移行する
最新の Kinesis Client Library (KCL) バージョンとのスムーズな移行と互換性を確保するには、KCL 2.x から KCL 3.x へのアップグレードに関する移行ガイドの手順のステップ 5~8 に従います。