KCL 1.x から KCL 3.x への移行 - Amazon DynamoDB

KCL 1.x から KCL 3.x への移行

概要

このガイドでは、コンシューマーアプリケーションを KCL 1.x から KCL 3.x に移行する手順について説明します。KCL 1.x と KCL 3.x のアーキテクチャの違いにより、移行では互換性を確保するために複数のコンポーネントを更新する必要があります。

KCL 1.x は、KCL 3.x とは異なるクラスとインターフェイスを使用します。まずレコードプロセッサ、レコードプロセッサファクトリ、ワーカークラスを KCL 3.x 互換形式に移行し、KCL 1.x から KCL 3.x への移行手順に従う必要があります。

移行手順

ステップ 1: レコードプロセッサを移行する

以下の例は、KCL 1.x DynamoDB Streams Kinesis Adapter に実装されたレコードプロセッサを示しています。

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { for (Record record : processRecordsInput.getRecords()) { String data = new String(record.getData().array(), Charset.forName("UTF-8")); System.out.println(data); if (record instanceof RecordAdapter) { // record processing and checkpointing logic } } } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
RecordProcessor クラスを移行するには
  1. インターフェイスを com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor および com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware から software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor に変更します。以下に例を示します。

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor
  2. initialize メソッド processRecords とメソッドの import ステートメントを更新します。

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
  3. shutdownRequested メソッドを以下の新しいメソッドに置き換えます。leaseLostshardEnded、および shutdownRequested

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

以下に示しているのは、レコードプロセッサのクラスの更新されたバージョンです。

package com.amazonaws.codesamples; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord; import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor; import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord; import software.amazon.awssdk.services.dynamodb.model.Record; public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) { for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records()) Record ddbRecord = record.getRecord(); // processing and checkpointing logic for the ddbRecord } } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }
注記

DynamoDB Streams Kinesis Adapter は SDKv2 レコードモデルを使用するようになりました。SDKv2 では、複雑な AttributeValue オブジェクト (BSNSMLSS) が null を返すことはありません。hasBs()hasNs()hasM()hasL()hasSs() メソッドを使用して、これらの値が存在するかどうかを確認します。

ステップ 2: レコードプロセッサファクトリーを移行する

レコードプロセッサファクトリーは、リースが取得された際にレコードプロセッサの作成を担当します。以下に示しているのは、KCL 1.x ファクトリーの例です。

package com.amazonaws.codesamples; import software.amazon.dynamodb.AmazonDynamoDB; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class StreamsRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new StreamsRecordProcessor(dynamoDBClient, tableName); } }
RecordProcessorFactory を移行するには
  • 実装されているインターフェイスを com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory から software.amazon.kinesis.processor.ShardRecordProcessorFactory に変更します。以下に例を示します。

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { Change the return signature for createProcessor. // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

以下は、3.0 のレコードプロセッサファクトリーの例です。

package com.amazonaws.codesamples; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new StreamsRecordProcessor(); } }

ステップ 3: ワーカーを移行する

バージョン 3.0 の KCL では、新しいクラス Scheduler によって Worker クラスが置き換えられます。KCL 1.x のワーカーの例を次に示します。

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker( recordProcessorFactory, workerConfig, adapterClient, amazonDynamoDB, amazonCloudWatchClient);
ワーカーを移行するには
  1. import クラスの Worker ステートメントを Scheduler クラスと ConfigsBuilder クラスのインポートステートメントに変更します。

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. StreamTracker をインポートし、StreamsWorkerFactory のインポートを StreamsSchedulerFactory に変更します。

    import software.amazon.kinesis.processor.StreamTracker; // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory; import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
  3. アプリケーションを起動するポジションを選択します。TRIM_HORIZONLATEST のいずれかになります。

    import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
  4. StreamTracker インスタンスを作成します。

    StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker( streamArn, InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON) );
  5. AmazonDynamoDBStreamsAdapterClient オブジェクトを作成します。

    import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; ... AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create(); AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient( credentialsProvider, awsRegion);
  6. ConfigsBuilder オブジェクトを作成します。

    import software.amazon.kinesis.common.ConfigsBuilder; ... ConfigsBuilder configsBuilder = new ConfigsBuilder( streamTracker, applicationName, adapterClient, dynamoDbAsyncClient, cloudWatchAsyncClient, UUID.randomUUID().toString(), new StreamsRecordProcessorFactory());
  7. 次の例に示すように、ConfigsBuilder を使用して Scheduler を作成します。

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient); pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis); // Use ConfigsBuilder to configure settings RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig(); retrievalConfig.retrievalSpecificConfig(pollingConfig); CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig(); coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X); Scheduler scheduler = StreamsSchedulerFactory.createScheduler( configsBuilder.checkpointConfig(), coordinatorConfig, configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), retrievalConfig, adapterClient );
重要

この CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X 設定は、KCL v2 と v3 ではなく、KCL v3 と KCL v1 の DynamoDB Streams Kinesis Adapter 間の互換性を維持します。

ステップ 4: KCL 3.x 設定の概要と推奨事項

KCL 3.x に関連する KCL 1.x 後に導入された設定の詳細については、「KCL 設定」と「KCL 移行クライアント設定」を参照してください。

重要

KCL 3.x 以降のバージョンでは、checkpointConfigcoordinatorConfigleaseManagementConfigmetricsConfigprocessorConfig、および retrievalConfig オブジェクトを直接作成する代わりに、スケジューラの初期化の問題を回避するために、ConfigsBuilder を使用して設定を行うことをお勧めします。ConfigsBuilder を使用すると、KCL アプリケーションをより柔軟で保守性の高い方法で構成できます。

KCL 3.x でデフォルト値を更新する設定

billingMode

KCL バージョン 1.x では、billingMode のデフォルト値は PROVISIONED に設定されます。ただし、KCL バージョン 3.x では、デフォルトの billingModePAY_PER_REQUEST (オンデマンドモード) です。使用量に基づいて容量を自動的に調整するには、リーステーブルのオンデマンドキャパシティモードを使用することをお勧めします。リーステーブルにプロビジョンドキャパシティを使用するガイダンスについては、「プロビジョンドキャパシティモードのリーステーブルのベストプラクティス」を参照してください。

idleTimeBetweenReadsInMillis

KCL バージョン 1.x では、idleTimeBetweenReadsInMillis のデフォルト値は 1,000 (または 1 秒) に設定されています。KCL バージョン 3.x は idleTimeBetweenReadsInMillis のデフォルト値を 1,500 (または 1.5 秒) に設定しますが、Amazon DynamoDB Streams Kinesis Adapter はデフォルト値を 1,000 (または 1 秒) に上書きします。

KCL 3.x の新しい設定

leaseAssignmentIntervalMillis

この設定では、新しく検出されたシャードが処理を開始するまでの時間間隔を定義し、1.5 × leaseAssignmentIntervalMillis として計算されます。この設定が明示的に設定されていない場合、時間間隔はデフォルトで 1.5 × failoverTimeMillis に設定されます。新しいシャードの処理には、リーステーブルをスキャンし、リーステーブルのグローバルセカンダリインデックス (GSI) をクエリする必要があります。leaseAssignmentIntervalMillis を小さくすると、これらのスキャンおよびクエリオペレーションの頻度が増加し、DynamoDB のコストが高くなります。新しいシャードの処理の遅延を最小限に抑えるために、この値を 2,000 (または 2 秒) に設定することをお勧めします。

shardConsumerDispatchPollIntervalMillis

この設定では、シャードコンシューマーによる連続するポーリング間の間隔を定義して、状態遷移をトリガーします。KCL バージョン 1.x では、この動作は idleTimeInMillis パラメータによって制御され、設定可能な設定として公開されませんでした。KCL バージョン 3.x では、KCL バージョン 1.x のセットアップで idleTimeInMillis で使用される値と一致するようにこの設定を行うことをお勧めします。

ステップ 5: KCL 2.x から KCL 3.x に移行する

最新の Kinesis Client Library (KCL) バージョンとのスムーズな移行と互換性を確保するには、KCL 2.x から KCL 3.x へのアップグレードに関する移行ガイドの手順のステップ 5~8 に従います。

一般的な KCL 3.x のトラブルシューティングの問題については、「Troubleshooting KCL consumer applications」を参照してください。