

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Kinesis Data Streams コンシューマーを最適化する
<a name="advanced-consumers"></a>

表示される特定の動作に基づいて、Amazon Kinesis Data Streams コンシューマーをさらに最適化できます。

以下のトピックを確認して、解決策を特定します。

**Topics**
+ [低レイテンシー処理を改善する](kinesis-low-latency.md)
+ [Amazon Kinesis プロデューサーライブラリ AWS Lambda で を使用してシリアル化されたデータを処理する](kinesis-record-deaggregation.md)
+ [シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。](kinesis-record-processor-scaling.md)
+ [重複レコードを処理する](kinesis-record-processor-duplicates.md)
+ [起動、シャットダウン、スロットリングを処理する](kinesis-record-processor-additional-considerations.md)

# 低レイテンシー処理を改善する
<a name="kinesis-low-latency"></a>

*伝達遅延*は、レコードがストリームに書き込まれた瞬間からコンシューマーアプリケーションによって読み取られるまでの、エンドツーエンドのレイテンシーとして定義されます。この遅延はいくつかの要因によって異なりますが、最も大きく影響するのはコンシューマーアプリケーションのポーリング間隔です。

ほとんどのアプリケーションについては、アプリケーションごとに各シャードを 1 秒 1 回ポーリングすることをお勧めします。この設定では、Amazon Kinesis Data Streams の制限 (1 秒あたり 5 回の`GetRecords` 呼び出し) を超えることなく、複数のコンシューマーアプリケーションで同時に 1 つのストリームを処理できます。また、処理するデータバッチが大きいほど、アプリケーション内のネットワークおよびその他ダウンストリームのレイテンシーをより効率的に短縮できる傾向があります。

KCL のデフォルト値は、毎秒のポーリングのベストプラクティスに従うよう設定されています。このデフォルト設定により、平均的な伝達遅延が通常 1 秒未満になります。

Kinesis Data Streams レコードは、書き込まれた後、すぐに読み取り可能になります。ユースケースには、この性能を活用して、ストリームが使用可能になり次第、ストリームからデータを使用することが必要なものもあります。次の例に示されているように、KCL のデフォルト設定を上書きしてポーリングの頻度を高くすると、伝達遅延を大幅に短縮できます。

Java KCL 設定コードを次に示します。

```
kinesisClientLibConfiguration = new
        KinesisClientLibConfiguration(applicationName,
        streamName,               
        credentialsProvider,
        workerId).withInitialPositionInStream(initialPositionInStream).withIdleTimeBetweenReadsInMillis(250);
```

Python および Ruby KCL のプロパティファイル設定を次に示します。

```
idleTimeBetweenReadsInMillis = 250
```

**注記**  
Kinesis Data Streams は、`GetRecords` コールをシャードごとに 1 秒あたり 5 回に制限しているため、`idleTimeBetweenReadsInMillis` プロパティを 200 ms 未満に設定すると、アプリケーションで `ProvisionedThroughputExceededException` 例外が発生する可能性があります。この例外の発生回数が多くなりすぎると、エクスポネンシャルバックオフが発生することになり、処理中の予期しない大幅なレイテンシーの原因になります。このプロパティを 200 ms またはそれより少し高く設定した場合も、処理中のアプリケーションが複数あれば、同様のスロットリングが発生します。

# Amazon Kinesis プロデューサーライブラリ AWS Lambda で を使用してシリアル化されたデータを処理する
<a name="kinesis-record-deaggregation"></a>

[Amazon Kinesis Producer Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html) (KPL) は、小さなユーザーフォーマットレコードを最大 1 MB のレコードに集約して、Amazon Kinesis Data Streams スループットを有効に利用できます。KCL for Java ではこれらのレコードの集約解除がサポートされていますが、 をストリームのコンシューマー AWS Lambda として使用する場合は、特別なモジュールを使用してレコードの集約を解除する必要があります。必要なプロジェクトコードと手順は、[Amazon Kinesis Producer Library Deaggregation Modules for AWS Lambda ](https://github.com/awslabs/kinesis-deaggregation)の GitHub から入手できます。このプロジェクトのコンポーネントにより、Java、Node.js AWS Lambda、Python で KPL シリアル化されたデータを処理できます。これらのコンポーネントは、[複数言語の KCL アプリケーション](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client-multilang/src/main/java/software/amazon/kinesis/multilang/package-info.java)の一部として使用することもできます。

# シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。
<a name="kinesis-record-processor-scaling"></a>

*リシャーディング*によって、ストリームのデータフロー率の変化に合わせて、ストリーム内のシャードカウントを増減できます。通常、リシャーディングはシャードのデータ処理メトリクスを監視する管理アプリケーションによって実行されます。KCL 自体はリシャーディングオペレーションを開始しませんが、リシャーディングに起因するシャードの数の変化に適応するように設計されています。

[リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する](shared-throughput-kcl-consumers.md#shared-throughput-kcl-consumers-leasetable)で説明したように、KCL は Amazon DynamoDB tテーブルを使用してストリーム内のシャードを追跡します。リシャーディングの結果として新しいシャードが作成されるときに、KCL は新しいシャードを検出し、テーブル内の新しい行に値を入力します。ワーカーは、新しいシャードを自動的に検出して、それらからのデータを処理するためのプロセッサを作成します。また、KCL は、ストリーム内のシャードを、利用可能なすべてのワーカーとレコードプロセッサに分散させます。

KCL は、リシャーディング前にシャードに存在していたすべてのデータが最初に処理されるようにします。このデータが処理されると、新しいシャードからのデータがレコードプロセッサに送信されます。このようにして、KCL は、データレコードが特定のパーティションキーのストリームに追加された順序を保持します。

## 例: リシャーディング、スケーリング、並列処理
<a name="kinesis-record-processor-scaling-example"></a>

次の例は、KCL を使用してスケーリングとリシャーディングを処理する方法を示しています。
+ アプリケーションが 1 つの EC2 インスタンスで実行中であり、4 つのシャードを含む 1 つの Kinesis Data Streams を処理しているとします。この 1 つのインスタンスに 1 つの KCL ワーカーと、4 つのレコードプロセッサ (各シャードに 1 つのレコードプロセッサ) があります。これらの 4 つのレコードプロセッサは、同一のプロセス内で並列実行されます。
+ 次に、別のインスタンスを使用するようにアプリケーションをスケールし、4 つのシャードが含まれる 1 つのストリームを 2 つのインスタンスが処理するとします。KCL ワーカーが 2 番目のインスタンスで起動すると、最初のインスタンスとの間で負荷分散が行われ、各インスタンスで 2 つのシャードが処理されるようになります。
+ その後、4 つのシャードを 5 つのシャードに分割するとします。KCL は再度インスタンスでの処理を調整します。一方のインスタンスが 3 つのシャードを処理し、もう一方のインスタンスが 2 つのシャードを処理するように調整されます。シャードをマージするときにも、同様の調整が行われます。

通常、KCL を使用する場合、インスタンスの数がシャードの数を超過しないように注意します (障害に対するスタンバイを目的とする場合を除く)。各シャードは厳密に 1 つの KCL ワーカーによって処理され、対応するレコードプロセッサが厳密に 1 つ存在するため、1 つのシャードを処理するために複数のインスタンスが必要になることはありません。ただし、1 つのワーカーで任意の数のシャードを処理できるため、シャードの数がインスタンスの数を超えても問題はありません。

アプリケーションでの処理をスケールアップするには、次のようなアプローチの組み合わせをテストするようにしてください。
+ インスタンスのサイズを大きくする (プロセス内ではすべてのレコードプロセッサが並列実行されるため)
+ インスタンスの数をオープンシャードの最大数まで増やす (シャードは個別に処理できるため)
+ シャードの数を増やす (並列性のレベルを向上させる)

Auto Scaling を使用すると、適切なメトリクスに基づいて自動的にインスタンスを拡張できます。詳細については、[Amazon EC2 Auto Scaling ユーザーガイド](https://docs.aws.amazon.com/autoscaling/ec2/userguide/)を参照してください。

リシャーディングによってストリーム内のシャードの数が増加すると、レコードプロセッサの数もそれに合わせて増加するため、これらをホストする EC2 インスタンスの負荷が高くなります。インスタンスが Auto Scaling グループの一部であり、負荷の増加が基準を満たす場合は、Auto Scaling グループがインスタンスを追加して増加した負荷を処理します。新しいインスタンスで追加のワーカーやレコードプロセッサがすぐにアクティブになるように、インスタンスの起動時に Amazon Kinesis Data Streams アプリケーションを起動するように設定してください。

リシャーディングの詳細については、[ストリームをリシャーディングする](kinesis-using-sdk-java-resharding.md)を参照してください。

# 重複レコードを処理する
<a name="kinesis-record-processor-duplicates"></a>

レコードが複数回 Amazon Kinesis Data Streams アプリケーションに配信される理由は、主にプロデューサーの再試行とコンシューマーの再試行の 2 つになります。アプリケーションは、個々のレコードを複数回処理することを見込んで、適切に処理する必要があります。

## プロデューサーの再試行
<a name="kinesis-record-processor-duplicates-producer"></a>

プロデューサーで `PutRecord` を呼び出してから Amazon Kinesis Data Streams の受信確認を受け取るまでの間に、ネットワーク関連のタイムアウトが発生する場合があります。この場合、プロデューサーはレコードが Kinesis Data Streams に配信されたかどうかを確認できません。各レコードがアプリケーションにとって重要であれば、同じデータを使用して呼び出しを再試行するようにプロデューサーが定義されているはずです。同じデータを使用した `PutRecord` の呼び出しが両方とも Kinesis Data Streams に正常にコミットされると、Kinesis Data Streams レコードは 2 つになります。2 つのレコードには同一のデータがありますが、一意のシーケンス番号も付けられています。厳密な保証を必要とするアプリケーションは、レコード内にプライマリキーを埋め込んで、後ほど処理するときに重複を削除する必要があります。プロデューサーの再試行に起因する重複の数が、コンシューマーの再試行に起因する重複の数より通常は少ないことに注意してください。

**注記**  
 AWS SDK を使用する場合は`PutRecord`、「 SDK *AWS とツールユーザーガイドSDKs * [の再試行動作](https://docs.aws.amazon.com/sdkref/latest/guide/feature-retry-behavior.html)について説明します。

## コンシューマーの再試行
<a name="kinesis-record-processor-duplicates-consumer"></a>

コンシューマー (データ処理アプリケーション) の再試行は、レコードプロセッサが再開するときに発生します。同じシャードのレコードプロセッサは、次の場合に再開します。

1. ワーカーが予期せず終了する 

1. ワーカーインスタンスが追加または削除される 

1. シャードがマージまたは分割される 

1. アプリケーションがデプロイされる 

これらすべての場合において、shards-to-worker-to-record-processor マッピングは、処理の負荷を分散するために継続的に更新されます。他のインスタンスに移行されたシャードプロセッサは、最後のチェックポイントからレコードの処理を再開します。これにより、以下の例にあるような重複レコード処理が発生します。負荷分散の詳細については、[シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。](kinesis-record-processor-scaling.md)を参照してください。

### 例: コンシューマーの再試行によるレコードの再配信
<a name="kinesis-record-processor-duplicates-consumer-example"></a>

この例では、ストリームから継続的にレコードを読み取り、ローカルファイルにレコードを集約し、このファイルを Amazon S3 にアップロードするアプリケーションがあるとします。分かりやすくするため、1 つのシャードと、このシャードを処理する 1 つのワーカーのみがあるとします。最後のチェックポイントがレコード番号 10,000 であると仮定して、次の例の一連のイベントを考えてみます。

1.  ワーカーで、シャードから次のレコードのバッチを読み込みます (1,0001 から 20000)。

1.  次に、ワーカーがレコードのバッチを関連付けられたレコードプロセッサに渡します。

1.  レコードプロセッサはデータを集約し、Amazon S3 ファイルを作成して、このファイルを Amazon S3 に正常にアップロードします。

1.  新しいチェックポイントが生成される前に、ワーカーが予期せず終了します。

1.  アプリケーション、ワーカー、およびレコードプロセッサが再開します。

1.  ワーカーは、正常な最後のチェックポイント (この場合は 1,0001) から読み込みを開始しました。

したがって、1,0001 から 20000 のレコードは複数回使用されます。

### コンシューマーの再試行に対する弾力性
<a name="kinesis-record-processor-duplicates-consumer-resilience"></a>

レコードが複数回処理される可能性はあるものの、アプリケーションでは、レコードが 1 回だけ処理されたかのような付随効果 (冪等処理) を提示する場合があります。この問題に対するソリューションは、複雑性と正確性に応じて異なります。最終的なデータの送信先が重複を適切に処理できる場合は、冪等処理の実行は最終送信先に任せることをお勧めします。例えば、[Opensearch](https://www.opensearch.org/) では、バージョニングと一意の ID の組み合わせを使用して重複処理を防ぐことができます。

前セクションのアプリケーション例では、ストリームから継続的にレコードを読み取り、レコードをローカルファイルに集約して、ファイルを Amazon S3 にアップロードします。図に示すように、1,0001 から 20000 のレコードが複数回使用されることにより、複数の Amazon S3 ファイルのデータは同じになります。この例からの重複を軽減する方法の 1 つは、ステップ 3 での次のスキーマの使用を確実にすることです。

1.  レコードプロセッサは、各 Amazon S3 ファイルに固定のレコード番号 (5000 など) を使用します。

1.  ファイル名には、このスキーマ (Amazon S3 プレフィックス、シャード ID、および `First-Sequence-Num`) を使用します。この場合は、`sample-shard000001-10001` のようになります。

1.  Amazon S3 ファイルをアップロードした後で、`Last-Sequence-Num` を指定してチェックポイントを作成します。この場合は、レコード番号 15000 にチェックポイントが作成されます。

このスキーマを使用すると、レコードが複数回処理されても、Amazon S3 ファイルには同じ名前と同じデータが保持されます。再試行しても、同じファイルに同じデータが複数回書き込まれるだけになります。

リシャーディング操作の場合は、シャードに残っているレコードの数が必要な一定数よりも少ないことがあります。この場合、`shutdown()` メソッドは Amazon S3 にファイルをフラッシュし、最後のシーケンス番号でチェックポイントを作成する必要があります。上記のスキーマは、リシャーディング操作との互換性もあります。

# 起動、シャットダウン、スロットリングを処理する
<a name="kinesis-record-processor-additional-considerations"></a>

ここでは、Amazon Kinesis Data Streams アプリケーションの設計に取り入れる必要がある、追加の考慮事項を示します。

**Topics**
+ [データプロデューサーとデータコンシューマーを起動する](#kinesis-record-processor-producer-consumer-coordination)
+ [Amazon Kinesis Data Streams アプリケーションをシャットダウンする](#developing-consumers-with-kcl-shutdown)
+ [読み込みのスロットリング](#kinesis-record-processor-read-throttling)

## データプロデューサーとデータコンシューマーを起動する
<a name="kinesis-record-processor-producer-consumer-coordination"></a>

デフォルトでは、KCL はストリームの末尾 (最後に追加されたレコード) からレコードの読み込みを開始します。この設定では、受信側のレコードプロセッサが実行される前に、データプロデューサーアプリケーションがストリームにレコードを追加した場合、レコードプロセッサが起動した後、これらのレコードはレコードプロセッサによって読み込まれません。

レコードプロセッサの動作を変更して、常にストリームの先頭からデータを読み込むには、Amazon Kinesis Data Streams アプリケーションの properties ファイルで次の値を設定します。

```
initialPositionInStream = TRIM_HORIZON
```

デフォルトでは、Amazon Kinesis Data Streams はすべてのデータを 24 時間保存します。また、最大 7 日間の延長保存と最大 365 日間の長期保存もサポートします。この期間は*保持期間*と呼ばれます。開始位置を `TRIM_HORIZON` に設定すると、保持期間で定義されているとおりに、ストリーム内の最も古いデータでレコードプロセッサが起動します。`TRIM_HORIZON` に設定しても、保持期間を上回る時間が経過した後でレコードプロセッサが起動される場合は、ストリーム内のレコードの一部が利用できなくなります。そのため、ストリームから読み込むコンシューマーアプリケーションが常に存在しており、CloudWatch メトリクス `GetRecords.IteratorAgeMilliseconds` を使用してアプリケーションが着信データに追随していることをモニタリングする必要があります。

シナリオによっては、レコードプロセッサがストリームの最初の数レコードを処理しなくても問題ない場合があります。例えば、ストリームがエンドツーエンドで正常に機能していることをテストするために、ストリームで数件の初期レコードを実行する場合があります。この初期確認を行った後でワーカーを起動し、ストリームへの本番データの送信を開始します。

`TRIM_HORIZON` の設定の詳細については、[シャードイテレーターを使用する](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data-shard-iterators)を参照してください。

## Amazon Kinesis Data Streams アプリケーションをシャットダウンする
<a name="developing-consumers-with-kcl-shutdown"></a>

Amazon Kinesis Data Streams アプリケーションが目的のタスクを完了したら、アプリケーションが実行されている EC2 インスタンスを終了することによって、アプリケーションをシャットダウンする必要があります。インスタンスは、[AWS マネジメントコンソール](https://console.aws.amazon.com//ec2/home) または [AWS CLI](https://docs.aws.amazon.com/cli/latest/reference/ec2/index.html) を使用して終了することができます。

 Amazon Kinesis Data Streams アプリケーションのシャットダウン後に、KCL がアプリケーションの状態を追跡するために使用した Amazon DynamoDB テーブルを削除する必要があります。

## 読み込みのスロットリング
<a name="kinesis-record-processor-read-throttling"></a>

ストリームのスループットは、シャードレベルでプロビジョニングされます。各シャードは、読み取りに対して 1 秒あたり最大 5 トランザクションのスループットで、最大合計データ読み取り速度は 1 秒あたり 2MB です。アプリケーション (または同じストリームで動作するアプリケーションのグループ) がシャードからデータをより高速に取得しようとすると、Kinesis Data Streams は対応する GET オペレーションを調整します。

Amazon Kinesis Data Streams アプリケーションでは、レコードプロセッサが制限よりも高速にデータを処理している場合 (フェイルオーバーの場合など)、スロットリングが発生します。KCL によってアプリケーションと Kinesis Data Streams とのやり取りが管理されるため、スロットリング例外は、アプリケーションコードではなく KCL コードで発生します。ただし、KCL によってこれらの例外がログに記録されるため、ログで例外を確認できます。

アプリケーションが絶えずスロットリングされていると思われる場合は、ストリームのシャード数を増やすことを検討してください。