Kinesis データストリームコンシューマーのトラブルシューティング
以下のトピックでは、Amazon Kinesis Data Streams コンシューマーの一般的な問題に対するソリューションを提供します。
LeaseManagementConfig コンストラクタでのコンパイルエラー
Kinesis Client Library (KCL) バージョン 3.x 以降にアップグレードする際、LeaseManagementConfig コンストラクタに関連するコンパイルエラーが発生する場合があります。KCL バージョン 3.x 以降で ConfigsBuilder を使用せずに、設定を行うために直接 LeaseManagementConfig オブジェクトを作成している場合、KCL アプリケーションコードのコンパイル時に次のエラーメッセージが表示される可能性があります。
Cannot resolve constructor 'LeaseManagementConfig(String, DynamoDbAsyncClient, KinesisAsyncClient, String)'
KCL バージョン 3.x 以降では、tableName パラメータの後に、applicationName (型: String) という追加のパラメータを指定する必要があります。
-
変更前: leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, workerIdentifier)
-
変更後: leaseManagementConfig = new LeaseManagementConfig(tableName, applicationName, dynamoDBClient, kinesisClient, streamName, workerIdentifier)
LeaseManagementConfig オブジェクトを直接作成するのではなく、KCL 3.x 以降のバージョンでは ConfigsBuilder を使用して設定を行うことをお勧めします。ConfigsBuilder を使用すると、KCL アプリケーションをより柔軟かつ保守しやすい方法で構成できます。
以下は、ConfigsBuilder を使用して KCL の設定を行う例です。
ConfigsBuilder configsBuilder = new ConfigsBuilder( streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory() ); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig() .failoverTimeMillis(60000), // this is an example configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );
Kinesis クライアントライブラリの使用時に一部の Kinesis Data Streams レコードがスキップされる
レコードがスキップされる最も一般的な原因は、processRecords からスローされる処理されない例外です。Kinesis Client Library (KCL) は、processRecords コードを使用して、データレコードの処理で発生するすべての例外を処理します。processRecords からスローされるすべての例外は、 KCLによって吸収されます。反復的なエラーに対する無限再試行を回避するために、KCL では例外の発生時に処理中であったレコードのバッチを再送信しません。KCL は、レコードプロセッサを再起動することなく、データレコードの次のバッチで processRecords を呼び出します。これにより、事実上、コンシューマーアプリケーションではレコードがスキップされたことになります。レコードのスキップを防止するには、processRecords 内ですべての例外を適切に処理します。
同じシャードに属するレコードが、異なるレコードプロセッサによって同時に処理される
実行されている Kinesis Client Library (KCL) アプリケーションでは、シャードの所有者はひとりだけです。ただし、複数のレコードプロセッサが一時的に同じシャードを処理する場合があります。ネットワーク接続を紛失したワーカーインスタンスの場合、KCL はフェイルオーバー時間の期限が切れた後に、到達できないワーカーはレコードを処理していないと仮定し、他のワーカーインスタンスが引き継ぐように指示します。このとき短時間ですが、新しいレコードプロセッサと到達不可能なワーカーのレコードプロセッサが同じシャードのデータを処理する場合があります。
アプリケーションに適したフェイルオーバー時間を設定します。低レイテンシーアプリケーションの場合、10秒のデフォルトは、待機する最大時間を表している場合があります。ただし、より頻繁に接続が失われる地域で通話を行うなどの接続問題が予想される場合、この数値は低すぎる場合があります。
ネットワーク接続は通常、以前の到達不可能なワーカーに復元されるため、アプリケーションではこのシナリオを予期して処理する必要があります。レコードプロセッサのシャードが別のレコードプロセッサに引き継がれた場合、レコードプロセッサは正常なシャットダウンを実行するために次の 2 つのケースを処理する必要があります。
-
processRecordsへの現在の呼び出しが完了した後で、KCL はシャットダウンの理由 'ZOMBIE' を使用してレコードプロセッサでシャットダウンメソッドを呼び出します。レコードプロセッサは、すべてのリソースを必要に応じて適切にクリーンアップした後、終了する必要があります。 -
zombieワーカーからチェックポイントを作成しようとすると、KCL は
ShutdownExceptionをスローします。この例外を受け取った後、コードは現在のメソッドを正常に終了する必要があります。
詳細については、重複レコードを処理するを参照してください。
コンシューマーアプリケーションの読み取りの速度が予想よりも遅い
読み取りのスループットが予想よりも遅くなる最も一般的な理由は次のとおりです。
-
複数のコンシューマーアプリケーションの読み取りの合計が、シャードごとの制限を超えています。詳細については、クォータと制限を参照してください。この場合、Kinesis データストリームのシャードの数が増えます。
-
呼び出しごとの GetRecords の最大数を指定する制限が、低い値で設定されている可能性があります。KCL を使用している場合は、ワーカーに設定した
maxRecordsプロパティの値が低い可能性があります。一般的に、このプロパティにはシステムのデフォルトを使用することをお勧めします。 -
processRecords呼び出し内のロジックに予想よりも時間がかかる場合があります。これには、ロジックが CPU を大量に消費する、I/O をブロックする、同期のボトルネックになっているなど、多くの理由が考えられます。これに該当するかどうかをテストするには、空のレコードプロセッサをテスト実行し、読み取りスループットを比較します。受信データに遅れずに対応する方法については、シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。を参照してください。
コンシューマーアプリケーションが 1 つのみである場合、通常、PUT レートの少なくとも 2 倍高速に読み取りを実行できます。これは、書き込みに対して 1 秒あたり最大 1,000 レコードを書き込むことができ、最大合計データ書き込み速度が 1 秒あたり 1 MB (パーティションキーを含む) になるためです。オープンな各シャードは、読み取りに対して 1 秒あたり最大 5 トランザクションをサポートでき、最大合計データ読み取り速度は 1 秒あたり 2MB です。各読み取り (GetRecords) は、レコードのバッチを取得します。GetRecords によって返されるデータのサイズは、シャードの使用状況によって異なります。GetRecords が返すことができるデータの最大サイズは、10 MB です。呼び出しがその制限を返す場合、次の 5 秒以内に行われるそれ以降の呼び出しは ProvisionedThroughputExceededException をスローします。
ストリームにデータがある場合でも、GetRecords が空のレコード配列を返す
レコードの消費、つまり取得は、プルモデルです。開発者は、バックオフがない連続ループで GetRecords を呼び出す必要があります。GetRecords のすべての呼び出しは、ShardIterator 値も返します。この値は、ループの次のイテレーションで使用する必要があります。
GetRecords オペレーションはブロックしません。その代わりに、関連データレコードまたは空の Records 要素とともに、直ちに制御を戻します。空の Records 要素は、2 つの条件の下で返されます。
-
現在シャードにはそれ以上のデータがない。
-
シャードの
ShardIteratorで指定されたパートの近くにデータがない。
後者の条件は微妙ですが、レコードを取得するときに無限のシーク時間 (レイテンシー) を回避するために必要な設計上のトレードオフです。そのため、ストリームを使用するアプリケーションはループし、GetRecords を呼び出して、当然のこととして空のレコードを処理します。
本稼働シナリオで、連続ループが終了するのは、NextShardIterator の値が NULL である場合のみにする必要があります。NextShardIterator が NULL である場合、現在のシャードが閉じられ、ShardIterator 値は最後のレコードを過ぎたことを示します。コンシューマーアプリケーションが SplitShard または MergeShards を呼び出さない場合、シャードは開いたままになり、GetRecords の呼び出しは NextShardIterator である NULL 値を返しません。
Kinesis Client Library (KCL) を使用する場合、お客様に対しては前述の消費パターンは抽象化されます。これには、動的に変更する一連のシャードの自動処理が含まれます。KCL により、デベロッパーは入力レコードを処理するロジックのみを提供します。ライブラリが自動的に GetRecords の継続的な呼び出しを行うため、これが可能になります。
シャードイテレーターが予期せずに終了する
新しいシャードのイテレータは、GetRecords リクエスト (NextShardIterator として) 返されます。これは次の GetRecordsリクエスト (ShardIterator として) 使用します。通常の場合、このシャードイテレーターは使用する前に有効期限が切れることはありません。ただし、5 分以上 GetRecords を呼び出さなかったため、またはコンシューマーアプリケーションの再起動を実行したため、シャードイテレータの有効期限が切れる場合があります。
シャードイテレーターの有効期限がすぐに切れて使用できない場合、これは Kinesis で使用している DynamoDB テーブルの容量不足でリースデータを保存できないことを示している可能性があります。この状況は、多数のシャードがある場合により発生する可能性が高くなります。この問題を解決するには、シャードテーブルに割り当てられた書き込み容量を増やします。詳細については、リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡するを参照してください。
コンシューマーレコードの処理が遅れる
ほとんどのユースケースで、コンシューマーアプリケーションはストリームから最新のデータを読み取ります。特定の状況下では、コンシューマーの読み取りが遅れるという好ましくない事態が発生します。コンシューマーの読み取りの遅れ具合を確認したら、遅れの最も一般的な理由を参照してください。
GetRecords.IteratorAgeMilliseconds メトリクスを起動して、ストリーム内のすべてのシャードとコンシューマーの読み取り位置を追跡します。イテレータの経過日数が保持期間 (デフォルトで 24 時間、最大で 365 日まで設定可能) の 50% を経過すると失効する場合、レコードの有効期限切れによるデータ損失のリスクがあります。とりあえずの解決策は、保持期間を長くすることです。これにより、問題のトラブルシューティングを行う間に重要なデータが失われるのを防ぎます。詳細については、Amazon CloudWatch による Amazon Kinesis Data Streams サービスを監視するを参照してください。次に、Kinesis Client Library (KCL)、MillisBehindLatest が出力するカスタム CloudWatch メトリクスを使用して、コンシューマーアプリケーションの読み取りが各シャードからどのくらい遅れているかを確認します。詳細については、Amazon CloudWatch を使用した Kinesis Client Library を監視するを参照してください。
コンシューマーが遅れる最も一般的な理由:
-
GetRecords.IteratorAgeMillisecondsの突然の上昇またはMillisBehindLatestは、通常ダウンストリームアプリケーションに対する API オペレーションの障害などの一時的な問題を示します。どちらかのメトリクスが恒常的にこのような動きを示す場合、この急激な上昇を調査します。 -
これらのメトリクスが徐々に上昇する場合は、レコードの処理速度が不十分なためストリームにコンシューマーが追いついていないことを示します。この状況に共通の原因は、物理リソースの不足またはストリームスループットの上昇にレコード処理ロジックが追随できないことです。
processTaskオペレーション (RecordProcessor.processRecords.Time、Success、RecordsProcessedなど) に関連して KCL が出力する他のカスタム CloudWatch メトリクスを確認することで、この状況を確認できます。-
スループットの増加に伴う
processRecords.Timeメトリクスの上昇が確認された場合、レコード処理ロジックを分析して、スループットの増加に対応したスケーリングができない理由を調べる必要があります。 -
スループットの上昇とは関連性がない
processRecords.Time値の上昇が認められた場合は、重要なパスでブロック呼び出しを行っていないか確認します。これは、レコード処理の低下を招きます。代替策として、シャードの数を増やして並列処理を増やす方法があります。最後に、ピーク需要時に適切な容量の物理リソース (メモリ、CPU 使用率など) が基盤の処理ノードに存在することを確認します。
-
承認されていない KMS キーの権限エラー
このエラーは、AWS KMS キーのアクセス許可なしで、コンシューマーアプリケーションが暗号化されたストリームから読み取りを行ったときに発生します。KMS キーにアクセスする許可をアプリケーションに割り当てるには、AWS KMS でのキーポリシーの使用およびAWS KMS での IAM ポリシーの使用を参照してください。
DynamoDbException: 更新式で指定されたドキュメントパスが更新に対して無効です
AWS SDK for Java バージョン 2.27.19 から 2.27.23 で KCL 3.x を使用する場合、次の DynamoDB 例外が発生することがあります。
"software.amazon.awssdk.services.dynamodb.model.DynamoDbException: The document path provided in the update expression is invalid for update (Service: DynamoDb, Status Code: 400, Request ID: xxx)"
このエラーは、KCL 3.x が管理する DynamoDB メタデータテーブルに影響を及ぼす、AWS SDK for Java の既知の問題が原因で発生します。この問題は 2.27.19 で発生するようになり、2.27.23 までのすべてのバージョンで問題が発生します。この問題は AWS SDK for Java バージョン 2.27.24 で解決されました。最適なパフォーマンスと安定性のため、2.28.0 以降のバージョンへアップグレードすることをお勧めします。