KCL 2.x から KCL 3.x に移行する - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

KCL 2.x から KCL 3.x に移行する

このトピックでは、KCL 2.x のコンシューマーを KCL 3.x へ移行するための手順を段階的に説明します。KCL 3.x は、KCL 2.x コンシューマーのインプレース移行をサポートしています。ワーカーをローリング方式で順次移行している間も、Kinesis データストリームからのデータ取得を継続できます。

重要

KCL 3.x は KCL 2.x と同じインターフェイスおよびメソッドを維持しています。したがって、移行中にレコード処理コードを更新する必要はありません。ただし、適切な設定を行い、移行に必要な手順を確認する必要があります。スムーズに移行するために、以下の移行手順に従うことを強くお勧めします。

ステップ 1: 前提条件

KCL 3.x の使用を始める前に、以下のものが揃っていることを確認してください。

  • Java Development Kit (JDK) 8 以降

  • AWS SDK for Java2.x

  • 依存関係管理用の Maven または Gradle

重要

KCL 3.x でAWS SDK for Javaバージョン 2.27.19 ~ 2.27.23 を使用しないでください。これらのバージョンには、KCL が使用する DynamoDB に関連した例外エラーを引き起こす問題が含まれています。この問題を回避するには、AWS SDK for Javaバージョン 2.28.0 以降を使用することをお勧めします。

ステップ 2: 依存関係を追加する

Maven を使用している場合は、以下の依存関係を pom.xml ファイルに追加します。3.x.x を最新の KCL バージョンに置き換えていることを確認してください。

<dependency> <groupId>software.amazon.kinesis</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>3.x.x</version> <!-- Use the latest version --> </dependency>

Gradle を使用している場合は、以下を build.gradle ファイルに追加します。3.x.x を最新の KCL バージョンに置き換えていることを確認してください。

implementation 'software.amazon.kinesis:amazon-kinesis-client:3.x.x'

最新の KCL バージョンは、Maven Central Repository で確認できます。

ステップ 3: 移行関連の設定をセットアップする

KCL 2.x から KCL 3.x に移行するには、次の設定パラメータを設定する必要があります。

  • CoordinatorConfig.clientVersionConfig: この設定は、アプリケーションをどの KCL バージョン互換モードで実行するかを決定します。KCL 2.x から 3.x へ移行する際は、この設定を CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X に設定する必要があります。この設定を行うには、スケジューラオブジェクトを作成する際に次の行を追加してください。

configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X)

以下は、KCL 2.x から 3.x へ移行する際に、CoordinatorConfig.clientVersionConfig を設定する方法の例です。必要に応じて、その他の設定も要件に合わせて調整できます。

Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordiantorConfig().clientVersionConfig(ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPLATIBLE_WITH_2X), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

KCL 2.x と KCL 3.x では負荷分散アルゴリズムが異なるため、コンシューマーアプリケーション内のすべてのワーカーが、同じ時点では同一の負荷分散アルゴリズムを使用していることが重要です。異なる負荷分散アルゴリズムを使用するワーカーが同時に動作していると、両アルゴリズムが独立して動作するため、負荷分散が最適化されない可能性があります。

この KCL 2.x 互換設定により、KCL 3.x アプリケーションは KCL 2.x と互換性のあるモードで動作し、コンシューマーアプリケーション内のすべてのワーカーが KCL 3.x にアップグレードされるまで、KCL 2.x の負荷分散アルゴリズムを利用できます。移行が完了すると、KCL は自動的に完全な KCL 3.x の機能モードへ切り替わり、稼働中のすべてのワーカーで新しい KCL 3.x の負荷分散アルゴリズムを使用し始めます。

重要

ConfigsBuilder を使用せず、設定を行うために LeaseManagementConfig オブジェクトを作成している場合は、KCL 3.x 以降では applicationName という追加のパラメータを指定する必要があります。詳細については、LeaseManagementConfig コンストラクタでのコンパイルエラーを参照してください。KCL の設定には、ConfigsBuilder を使用することをお勧めします。ConfigsBuilder は、KCL アプリケーションをより柔軟かつ保守しやすい方法で設定できる手段を提供します。

ステップ 4: shutdownRequested() メソッド実装のベストプラクティスに従う

KCL 3.x では、リース再割り当て時にリースが別のワーカーへ引き継がれる際のデータ再処理を最小限に抑えるため、スムーズなリース引き継ぎと呼ばれる機能が導入されています。これは、リースが引き継がれる前に、処理済みの最新シーケンス番号をリーステーブルにチェックポイントすることで実現されます。スムーズなリース引き継ぎを正しく機能させるためには、RecordProcessor クラスの shutdownRequested メソッド内で、checkpointer オブジェクトを必ず呼び出すようにする必要があります。shutdownRequested メソッド内で checkpointer オブジェクトを呼び出さない場合は、次の例に示すように実装できます。

重要
  • 次の実装例は、スムーズなリース引き継ぎの最小要件です。必要に応じて、チェックポイントに関連する追加のロジックを含めるように拡張できます。非同期処理を実行している場合は、チェックポイントを呼び出す前に、ダウンストリームに配信されたすべてのレコードが処理されていることを確認してください。

  • スムーズなリース引き継ぎによって、リース移行時のデータ再処理が発生する可能性は大幅に低減されますが、その可能性が完全になくなるわけではありません。データの整合性と一貫性を維持するには、ダウンストリームのコンシューマーアプリケーションをべき等になるように設計します。つまり、重複してレコードを処理する可能性があっても、システム全体に悪影響を及ぼさずに処理できるようにしておく必要があります。

/** * Invoked when either Scheduler has been requested to gracefully shutdown * or lease ownership is being transferred gracefully so the current owner * gets one last chance to checkpoint. * * Checkpoints and logs the data a final time. * * @param shutdownRequestedInput Provides access to a checkpointer, allowing a record processor to checkpoint * before the shutdown is completed. */ public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { // Ensure that all delivered records are processed // and has been successfully flushed to the downstream before calling // checkpoint // If you are performing any asynchronous processing or flushing to // downstream, you must wait for its completion before invoking // the below checkpoint method. log.info("Scheduler is shutting down, checkpointing."); shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at requested shutdown. Giving up.", e); } }

ステップ 5: KCL 3.x でワーカーメトリクスを収集するための前提条件を確認する

KCL 3.x は、ワーカー間で負荷を均等に分散するため、CPU 使用率などの CPU 利用メトリクスを収集します。コンシューマーアプリケーションのワーカーは、Amazon EC2、Amazon ECS、Amazon EKS、または AWS Fargate 上で実行できます。KCL 3.x がワーカーから CPU 使用率メトリクスを収集できるのは、以下の前提条件が満たされている場合のみです。

Amazon Elastic Compute Cloud(Amazon EC2)

  • オペレーティングシステムは Linux OS である必要があります。

  • EC2 インスタンスで IMDSv2 を有効にする必要があります。

Amazon Elastic Container Service (Amazon ECS) on Amazon EC2

での Amazon ECSAWS Fargate

Amazon Elastic Kubernetes Service (Amazon EKS) on Amazon EC2

  • オペレーティングシステムは Linux OS である必要があります。

での Amazon EKSAWS Fargate

  • Fargate プラットフォーム 1.3.0 以降

重要

前提条件が満たされず、KCL 3.x がワーカーから CPU 使用率メトリクスを収集できない場合、KCL はリースごとのスループット量に基づいて負荷を再分散します。このフォールバックの負荷再分散メカニズムにより、各ワーカーに割り当てられたリースから得られる合計スループット量が、ワーカー間で均等になるように調整されます。詳細については、「KCL がワーカーにリースを割り当て、負荷を分散する方法」を参照してください。

ステップ 6: KCL 3.x 用に IAM アクセス許可を更新する

KCL 3.x コンシューマーアプリケーションに関連付けられている IAM ロールまたは IAM ポリシーに、次の許可を追加する必要があります。これは、KCL アプリケーションで使用中の既存の IAM ポリシーを更新する作業を伴います。詳細については、「KCL コンシューマーアプリケーションに必要な IAM アクセス許可」を参照してください。

重要

既存の KCL アプリケーションでは、KCL 2.x では不要であったため、IAM ポリシーに次の IAM アクションおよびリソースが追加されていない可能性があります。KCL 3.x アプリケーションを実行する前に、これらが追加されていることを必ず確認してください。

  • アクション: UpdateTable

    • リソース (ARN): arn:aws:dynamodb:region:account:table/KCLApplicationName

  • アクション: Query

    • リソース (ARN): arn:aws:dynamodb:region:account:table/KCLApplicationName/index/*

  • アクション: CreateTableDescribeTableScanGetItemPutItemUpdateItemDeleteItem

    • リソース (ARN): arn:aws:dynamodb:region:account:table/KCLApplicationName-WorkerMetricStatsarn:aws:dynamodb:region:account:table/KCLApplicationName-CoordinatorState

    ARNsKCLApplicationName」をそれぞれ独自のアプリケーション名AWS リージョン、AWS アカウント数値、および KCL アプリケーション名に置き換えます。KCL が作成するメタデータテーブルの名前を設定でカスタマイズしている場合は、KCL アプリケーション名ではなく、その指定したテーブル名を使用してください。

ステップ 7: KCL 3.x のコードをワーカーにデプロイする

移行に必要な設定を行い、これまでの移行チェックリストをすべて完了したら、コードをビルドしてワーカーへデプロイできます。

注記

LeaseManagementConfig コンストラクタでコンパイルエラーが発生した場合は、トラブルシューティング情報について LeaseManagementConfig コンストラクタでのコンパイルエラーを参照してください。

ステップ 8: 移行を完了する

KCL 3.x のコードをデプロイしている間、KCL は引き続き KCL 2.x のリース割り当てアルゴリズムを使用します。すべてのワーカーに KCL 3.x のコードを正常にデプロイすると、KCL がそれを自動的に検知し、ワーカーのリソース使用状況に基づく新しいリース割り当てアルゴリズムに切り替わります。新しいリース割り当てアルゴリズムの詳細については、KCL がワーカーにリースを割り当て、負荷を分散する方法 を参照してください。

デプロイ中は、CloudWatch に出力される次のメトリクスを使用して移行状況をモニタリングできます。これらのメトリクスは Migration オペレーションの下でモニタリングできます。すべてのメトリクスは KCL アプリケーション単位のメトリクスであり、メトリクスレベルは SUMMARY に設定されています。CurrentState:3xWorker メトリクスの Sum 統計値が、KCL アプリケーション内のワーカー合計数と一致した場合、KCL 3.x への移行が正常に完了したことを示します。

重要

すべてのワーカーが新しいリース割り当てアルゴリズムを実行できる状態になってから、KCL がその新しいアルゴリズムに切り替えるまでには、最低でも 10 分かかります。

KCL 移行プロセスの CloudWatch メトリクス
メトリクス 説明
CurrentState:3xWorker

KCL 3.x へ正常に移行し、新しいリース割り当てアルゴリズムを実行している KCL ワーカーの数です。このメトリクスの Sum 値がワーカーの総数と一致した場合、KCL 3.x への移行が正常に完了したことを示します。

  • メトリクスレベル: Summary

  • 単位: カウント

  • 統計: 最も有用な統計は Sum です。

CurrentState:2xCompatibleWorker

移行プロセス中に KCL 2.x 互換モードで実行されている KCL ワーカーの数です。このメトリクスの値がゼロ以外である場合、移行がまだ進行中であることを示します。

  • メトリクスレベル: Summary

  • 単位: カウント

  • 統計: 最も有用な統計は Sum です。

Fault

移行プロセス中に発生した例外の数です。これらの例外の多くは一時的なエラーであり、KCL 3.x は自動的にリトライして移行を完了しようとします。Fault メトリクスの値が継続的に記録されている場合は、移行期間のログを確認して追加のトラブルシューティングを行ってください。問題が解決しない場合は、 にお問い合わせくださいサポート。

  • メトリクスレベル: Summary

  • 単位: カウント

  • 統計: 最も有用な統計は Sum です。

GsiStatusReady

リーステーブル上のグローバルセカンダリインデックス (GSI) の作成状況です。このメトリクスは、KCL 3.x を実行するための前提条件である、リーステーブル上の GSI が作成されているかどうかを示します。値は 0 または 1 で、1 の場合は GSI が正常に作成されたことを意味します。ロールバック状態の間、このメトリクスは出力されません。ロールフォワード後は、再びこのメトリクスをモニタリングできるようになります。

  • メトリクスレベル: Summary

  • 単位: カウント

  • 統計: 最も有用な統計は Sum です。

workerMetricsReady

すべてのワーカーからのワーカーメトリクス送信状況です。このメトリクスは、すべてのワーカーが CPU 使用率などのメトリクスを送信しているかどうかを示します。値は 0 または 1 で、1 の場合はすべてのワーカーが正常にメトリクスを送信しており、新しいリース割り当てアルゴリズムの利用準備が整っていることを示します。ロールバック状態の間、このメトリクスは出力されません。ロールフォワード後は、再びこのメトリクスをモニタリングできるようになります。

  • メトリクスレベル: Summary

  • 単位: カウント

  • 統計: 最も有用な統計は Sum です。

KCL は、移行中に 2.x 互換モードへロールバックできる機能を提供しています。KCL 3.x への移行が完了し、ロールバックが不要になった場合は、CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2XCoordinatorConfig.clientVersionConfig 設定を削除することをお勧めします。この設定を削除すると、KCL アプリケーションから移行関連のメトリクスが出力されなくなります。

注記

移行期間中および移行完了後の一定期間は、アプリケーションのパフォーマンスと安定性をモニタリングすることをお勧めします。問題が発生した場合は、KCL 移行ツールを使用して、ワーカーを KCL 2.x 互換の機能にロールバックすることができます。