

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Kinesis Data Streams を作成して管理する
<a name="working-with-streams"></a>

Amazon Kinesis Data Streams は、大量のデータをリアルタイムで取り込み、そのデータを永続的に保存して、消費できるようにします。Kinesis Data Streams によって保存されるデータの単位は、*データレコード*です。*データストリーム*は、データレコードのグループを表します。データストリームのデータレコードはシャードに配分されます。

*シャード*には、ストリーム内のデータレコードのシーケンスです。Kinesis Data Streams の基本スループット単位として機能します。シャードは、オンデマンドモードとプロビジョンド容量モードの両方で、書き込み**用に 1 MB/秒 と 1,000 レコード/秒、読み取り**用に 2 MB/秒をサポートします。シャード制限により、予測可能なパフォーマンスが保証され、信頼性の高いデータストリーミングワークフローの設計と運用が容易になります。

このセクションでは、ストリームのキャパシティモードを設定する方法と、 AWS マネジメントコンソール または APIs を使用してストリームを作成する方法について説明します。その後、ストリームに対して追加のアクションを実行できます。

**Topics**
+ [適切なストリームモードを選ぶ](how-do-i-size-a-stream.md)
+ [を使用してストリームを作成する AWS マネジメントコンソール](how-do-i-create-a-stream.md)
+ [API を使用してストリームを作成する](kinesis-using-sdk-java-create-stream.md)
+ [ストリームを更新する](updating-a-stream.md)
+ [ストリームの一覧表示](kinesis-using-sdk-java-list-streams.md)
+ [シャードを一覧表示する](kinesis-using-sdk-java-list-shards.md)
+ [ストリームを削除する](kinesis-using-sdk-java-delete-stream.md)
+ [ストリームをリシャーディングする](kinesis-using-sdk-java-resharding.md)
+ [データ保持期間を変更する](kinesis-extended-retention.md)
+ [Amazon Kinesis Data Streams リソースにタグを付ける](tagging.md)
+ [大きなレコードの処理](large-records.md)
+ [でレジリエンステストを実行する AWS Fault Injection Service](kinesis-fis.md)

# 適切なストリームモードを選ぶ
<a name="how-do-i-size-a-stream"></a>

以下のトピックでは、アプリケーションに適したモードを選択する方法と、必要に応じてモードを切り替える方法について説明します。

**Topics**
+ [Kinesis Data Streams の各種モードについて](#diff-modes-kds)
+ [オンデマンド標準モードの機能とユースケース](#ondemandmode)
+ [オンデマンドアドバンテージモードの機能とユースケース](#ondemand-advantage-mode)
+ [プロビジョンドモード機能とユースケース](#provisionedmode)
+ [モードの切り替え](#switchingmodes)

## Kinesis Data Streams の各種モードについて
<a name="diff-modes-kds"></a>

モードは、データストリームの容量の管理方法と、データストリームの使用に対する課金方法を決定します。Amazon Kinesis Data Streams では、データストリームのモードとして**オンデマンド標準**、**オンデマンドアドバンテージ**、**プロビジョンド**を選択できます。
+ **オンデマンド標準** - オンデマンドモードのデータストリームは、容量計画を必要とせず、1 分あたりの書き込みスループットおよび読み込みスループットのギガバイトを処理するように自動的にスケールされます。オンデマンドモードでは、Kinesis Data Streams は必要なスループットを提供するために、シャードを自動的に管理します。
+  **オンデマンドアドバンテージ** - オンデマンドストリームに対してより多くの機能を利用でき、かつ単純化された料金体系を提供するアカウントレベルのモードです。このモードでは、ストリームの書き込みスループットキャパシティを、任意のタイミングで事前にウォームアップできます。料金については、ストリーム単位の固定料金がなくなり、すべてのオンデマンドストリームにおけるデータインジェスト、データ取得、および拡張保持の使用量に対する料金が、**オンデマンド標準**の料金と比べて少なくとも 60% 低くなります。
+ **プロビジョンド** - プロビジョンドモードのデータストリームの場合、データストリームのシャードカウントを指定する必要があります。データストリームの総容量は、シャードの容量の合計です。必要に応じて、データストリームのシャードの数を増減することができます。

Kinesis Data Streams の `PutRecord` および `PutRecords` API を使用して、任意のモードでデータストリームにデータを書き込むことができます。データを取得するために、3 つのモードはすべて、`GetRecords` API を使用するデフォルトのコンシューマーと `SubscribeToShard` API を使用する拡張ファンアウト (EFO) のコンシューマーをサポートします。

保持モード、暗号化、モニタリングメトリクスなど、すべての Kinesis Data Streams の機能は、オンデマンドモードとプロビジョンドモードの両方でサポートされています。Kinesis Data Streams は、オンデマンドおよびプロビジョンドの容量モードの両方で、高い耐久性と可用性を提供します。

## オンデマンド標準モードの機能とユースケース
<a name="ondemandmode"></a>

オンデマンドモードのデータストリームは、容量計画を必要とせず、1 分あたりの書き込みスループットおよび読み込みスループットのギガバイトを処理するように自動的にスケーリングされます。オンデマンドモードでは、サーバー、ストレージ、またはスループットのプロビジョニングや管理が不要になるため、低レイテンシーで大量のデータを取り込んで保存することが簡単になります。運用上のオーバーヘッドなしで、1 日あたり数十億ものレコードを取り込むことができます。

オンデマンドモードは、変動性が高く、予測不可能なアプリケーショントラフィックのニーズに対応するのに最適です。これらのワークロードをピーク容量にプロビジョニングする必要がなくなり、使用率が低いため、コストが高くなる可能性があります。オンデマンドモードは、予測不能で変動性の高いトラフィックパターンを持つワークロードに適しています。

オンデマンド容量モードでは、データストリームから読み書きされるデータのギガバイトあたりの料金が発生します。アプリケーションで実行することが予測される読み込みおよび書き込みスループットを指定する必要はありません。Kinesis Data Streams は、ワークロードが増加または減少するときに、即座にワークロードに対応します。詳細については、[Amazon Kinesis Data Streams の料金表](https://aws.amazon.com/kinesis/data-streams/pricing/)を参照してください。

オンデマンドモードのデータストリームは、過去 30 日間に観測されたピーク書き込みスループットの最大 2 倍に対応します。データストリームの書き込みスループットが新しいピークに達すると、Kinesis Data Streams はデータストリームの容量を自動的にスケーリングします。例えば、データストリームの書き込みスループットが 10 MB/秒から 40 MB/秒の間で変化する場合、Kinesis Data Streams を使用すると、以前のピークスループットの 2 倍または 80 MB/秒に簡単にバーストできます。同じデータストリームが 50 MB/秒の新しいピークスループットを維持する場合、Kinesis Data Streams は 100 MB/秒の書き込みスループットを取り込むのに十分な容量を確保します。ただし、15 分以内にトラフィックが以前のピークの 2 倍以上に増加すると、書き込みスロットリングが発生する可能性があります。これらのスロットルされたリクエストは再試行する必要があります。

オンデマンドモードのデータストリームの総読み込み容量は、書き込みスループットに比例して増加します。これにより、コンシューマーアプリケーションは、受信データをリアルタイムで処理するための適切な読み取りスループットを常に確保できます。`GetRecords` API を使用したデータの読み取りと比較して、書き込みスループットは少なくとも 2 倍になります。`GetRecord` API で 1 つのコンシューマーアプリケーションを使用することをお勧めします。これにより、アプリケーションがダウンタイムから回復する必要があるときに追いつくのに十分なスペースが確保されます。複数のコンシューマーアプリケーションを追加する必要があるシナリオでは、Kinesis Data Streams の拡張ファンアウト機能を使用することをお勧めします。拡張ファンアウトは、`SubscribeToShard` API を使用して最大 20 のコンシューマーアプリケーションをデータストリームに追加することをサポートし、各コンシューマーアプリケーションは専用のスループットを備えています。

### 読み取りおよび書き込みスループットの例外処理
<a name="hotshards"></a>

オンデマンドモード (プロビジョンド容量モードと同じ) では、データストリームにデータを書き込むために、各レコードでパーティションキーを指定する必要があります。Kinesis Data Streams は、パーティションキーを使用して、シャード間でデータを分散します。Kinesis Data Streams は、各シャードのトラフィックをモニタリングします。着信トラフィックがシャードあたり 500 KB/秒を超えると、15 分以内にシャードが分割されます。親シャードのハッシュキー値は、子シャード間で均等に再配分されます。

 着信トラフィックが以前のピークの 2 倍を超えると、データがシャード全体に均等に分散されていても、約 15 分間、読み取りまたは書き込みの例外が発生する可能性があります。すべてのレコードが Kinesis Data Streams に適切に保存されるように、このようなリクエストをすべて再試行することをお勧めします。

不均等なデータ分散につながるパーティションキーを使用し、特定のシャードに割り当てられたレコードがその制限を超えると、読み取りおよび書き込みの例外が発生することがあります。オンデマンドモードでは、単一のパーティションキーがシャードの 1 MB/秒のスループットおよび 1,000 レコード/秒の制限を超えない限り、データストリームは不均等なデータ分散パターンを処理するように自動的に適応します。

オンデマンドモードでは、トラフィックの増加が検出されると、Kinesis Data Streams はシャードを均等に分割します。ただし、特定のシャードへの着信トラフィックの上位部分を駆動しているハッシュキーは検出および分離されません。非常に不均等なパーティションキーを使用している場合は、書き込みの例外を引き続き受け取る可能性があります。このようなユースケースでは、きめ細かくシャードの分割をサポートするプロビジョンド容量モードを使用することをお勧めします。

## オンデマンドアドバンテージモードの機能とユースケース
<a name="ondemand-advantage-mode"></a>

オンデマンドアドバンテージは、リージョン内のすべてのオンデマンドストリームに追加機能を提供し、異なる料金体系を適用するアカウントレベルの設定です。このモードでは、オンデマンドストリームは引き続き既存の機能を保持し、実際のデータ使用量に基づいて自動的に容量をスケールします。ストリームの書き込みスループットキャパシティを事前にウォームアップしたい場合は、ウォームスループットを設定できます。たとえば、データストリームの書き込みスループットが 10 MB/秒〜40 MB/秒の範囲であれば、スロットリングなしで最大 80 MB/秒の瞬間的なスループット増加に対応できます。一方で、今後 200 MB/秒程度のピークが予測される場合は、ウォームスループットを 200 MB/秒に設定しておくことで、データ流入時に必要な容量を確保できます。ウォームスループットを使用しても追加料金は発生しません。

オンデマンドアドバンテージモードのもう一つの利点は、オンデマンドストリームがよりシンプルな料金体系へ移行することです。このモードを有効にすると、アカウントにはストリーム単位の固定料金が発生しなくなり、データインジェスト、データ取得、およびオプションの長期保持料金のみが課金対象となります。また、それぞれの料金項目は、オンデマンド標準と比べて大幅に割引された価格が適用されます。詳細については、「[Amazon Kinesis Data Streams の料金表](https://aws.amazon.com/kinesis/data-streams/pricing/)」を参照してください。

このモードでは、拡張ファンアウトによるデータ取得も標準のデータ取得と比べて追加料金は発生しません。さらに、オンデマンドアドバンテージモードでは、ストリームごとに最大 50 のコンシューマーを登録して、拡張ファンアウトを使用できます。オンデマンドアドバンテージを有効化すると、アカウントはオンデマンドストリーム全体で、少なくとも 25 MiB/秒のデータインジェストと 25 MiB/秒のデータ取得を利用することが前提となります。最小利用要件を満たしているアカウントについて、Kinesis Data Streams コンソールでは、アカウントの利用パターンが**オンデマンドアドバンテージモード**に適しているかどうかを確認できる機能が用意されています。

アカウントのデータ使用量が要件を下回る場合は、その不足分について料金が発生しますが、同じ割引率が適用されます。オンデマンドアドバンテージを有効化した場合、無効化できるのは 24 時間経過後となる最低利用期間があります。総合的に見て、オンデマンドアドバンテージは、最低コミットメントに近い、あるいはそれを上回るスループットを継続的に使用する場合、多数の拡張ファンアウトコンシューマーが必要な場合、または数百単位のデータストリームを運用する場合に、Kinesis Data Streams を利用するための最適なモードです。

## プロビジョンドモード機能とユースケース
<a name="provisionedmode"></a>

プロビジョニングモードでは、データストリームを作成した後、 AWS マネジメントコンソール または [UpdateShardCount](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html) API を使用してシャード容量を動的にスケールアップまたはスケールダウンできます。Kinesis Data Streams プロデューサーまたはコンシューマーアプリケーションが、ストリームに対してデータを書き込んだり、ストリームからデータを読み取ったりしている間に更新を行うことができます。

プロビジョンドモードは、予測しやすい容量要件を持つ予測可能なトラフィックに適しています。シャード間でのデータの分散方法をきめ細かく制御したい場合は、プロビジョンドモードを使用できます。

プロビジョンドモードでは、データストリームのシャードカウントを指定する必要があります。プロビジョンドモードでデータストリームのサイズを決定するには、以下の入力値が必要です。
+ ストリームに書き込まれるデータレコードの KB 単位での平均サイズ (近似の KB 単位 (`average_data_size_in_KB`) まで切り上げられます)。
+ 1 秒間にストリームで読み書きされるデータレコードの数 (`records_per_second`) です。
+ ストリームから独立して同時にデータを消費する Kinesis Data Streams アプリケーションであるコンシューマーの数 (`number_of_consumers`)。
+ KB 単位での受信書き込み帯域幅 (`incoming_write_bandwidth_in_KB`)。`average_data_size_in_KB`を `records_per_second` に乗算した値に等しくなります。
+ KB 単位の送信読み取り帯域幅 (`outgoing_read_bandwidth_in_KB`)。`incoming_write_bandwidth_in_KB`を `number_of_consumers` に乗算した値に等しくなります。

ストリームに必要なシャードの数 (`number_of_shards`) を計算するには、入力値を以下の式にあてはめます。

```
number_of_shards = ceiling(max(incoming_write_bandwidth_in_KiB/1024, outgoing_read_bandwidth_in_KiB/2048))
```

ピークスループットを処理するようにデータストリームを設定しないと、プロビジョンドモードで読み取りおよび書き込みのスループットの例外が発生する可能性があります。この場合、データトラフィックに対応するようにデータストリームを手動でスケーリングする必要があります。

不均等なデータ分散につながるパーティションキーを使用し、あるシャードに割り当てられたレコードがその制限を超えると、読み取りおよび書き込みの例外が発生することがあります。プロビジョンドモードでこの問題を解決するには、このようなシャードを特定し、トラフィックに上手く対応できるように手動で分割します。詳細については、[ストリームのリシャーディング](https://docs.aws.amazon.com/streams/latest/dev/kinesis-using-sdk-java-resharding.html)を参照してください。

## モードの切り替え
<a name="switchingmodes"></a>

内のデータストリームごとに AWS アカウント、オンデマンドモードとプロビジョニングモードを 24 時間以内に 2 回切り替えることができます。モードを切り替えても、このデータストリームを使用するアプリケーションが中断することはありません。このデータストリームへの書き込みとデータストリームからの読み取りを続行できます。オンデマンドからプロビジョンド、またはプロビジョンドからオンデマンドのいずれかでモードを切り替えると、ストリームのステータスは更新中に設定されます。**プロパティを再度変更するには、データストリームのステータスが*アクティブ*になるのを待つ必要があります。

プロビジョンド容量モードからオンデマンド容量モードに切り替えると、データストリームは最初に移行前のシャードカウントを保持します。この時点から、Kinesis Data Streams はデータトラフィックをモニタリングし、書き込みスループットに応じて、このオンデマンドデータストリームのシャードカウントをスケーリングします。オンデマンドモードからプロビジョンドモードに切り替えると、データストリームは最初に移行前のシャードカウントを保持しますが、この時点から、書き込みスループットに適切に対応するために、このデータストリームのシャードカウントをモニタリングおよび調整する必要があります。

アカウントレベルの設定を有効化することで、**オンデマンド標準**モードから**オンデマンドアドバンテージ**モードに切り替えることができます。この設定を有効化すると、アカウントはリージョン内のすべてのオンデマンドストリームに対して、データインジェスト 25 MiB/秒、データ取得 25 MiB/秒の最低使用量をコミットすることになります。一度**オンデマンドアドバンテージ**を有効化すると、無効化できるようになるまで最低 24 時間待つ必要がありますが、切り替えリクエスト自体はいつでも行えます。**オンデマンドアドバンテージ**から**オンデマンドスタンダード**に戻したい場合は、まずオンデマンドストリームに設定されているウォームスループットをすべて削除する必要があります。

# を使用してストリームを作成する AWS マネジメントコンソール
<a name="how-do-i-create-a-stream"></a>

Kinesis Data Streams コンソール、Kinesis Data Streams API、または AWS Command Line Interface (AWS CLI) を使用して、ストリームを作成できます。

**コンソールを使用してデータストリームを作成するには**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) で Kinesis コンソールを開きます。

1. ナビゲーションバーで、リージョンセレクターを展開し、リージョンを選択します。

1. [**データストリームの作成**] を選択します。

1. **[Create Kinesis stream]** (Kinesis ストリームの作成) ページで、データストリームの名前を入力し、**[On-demand]** (オンデマンド) または**[Provisioned]** (プロビジョンド) のどちらかの容量モードを選択します。デフォルトでは、**[On-demand]** (オンデマンド) モードが選択されます。詳細については、[適切なストリームモードを選ぶ](how-do-i-size-a-stream.md)を参照してください。

   **[On-demand]** (オンデマンド) モードの場合、**[Create Kinesis stream]** (Kinesis ストリームの作成) を選択して、データストリームを作成することができます。**[Provisioned]** (プロビジョンド) モードの場合、必要なシャードの数を指定してから、**[Create Kinesis stream]** (Kinesis ストリームの作成) を選択します。

   ストリームの作成中、[**Kinesis ストリーム**] ページのストリームの**ステータス**は、**Creating** になります。ストリームを使用する準備が完了すると、**ステータス**は **Active** に変わります。

1. ストリームの名前を選択します。[**ストリームの詳細**] ページには、ストリーム設定の概要とモニタリング情報が表示されます。

**Kinesis Data Streams API を使用してストリームを作成するには**
+ Kinesis Data Streams API を使用したストリームの作成については、[API を使用してストリームを作成する](kinesis-using-sdk-java-create-stream.md)を参照してください。

**を使用してストリームを作成するには AWS CLI**
+ を使用してストリームを作成する方法については AWS CLI、[create-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html) コマンドを参照してください。

# API を使用してストリームを作成する
<a name="kinesis-using-sdk-java-create-stream"></a>

次の手順で Kinesis Data Streams を作成します。

## Kinesis Data Streams クライアントの構築
<a name="kinesis-using-sdk-java-create-client"></a>

Kinesis Data Streams を使用する前に、クライアントオブジェクトを構築する必要があります。次の Java コードは、クライアントビルダーをインスタンス化し、それを使用してリージョン、認証情報、およびクライアント設定を指定します。次に、クライアントオブジェクトを構築します。

```
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
clientBuilder.setRegion(regionName);
clientBuilder.setCredentials(credentialsProvider);
clientBuilder.setClientConfiguration(config);
        
AmazonKinesis client = clientBuilder.build();
```

詳細については、AWS 全般のリファレンスで [Kinesis Data Streams のリージョンとエンドポイント](https://docs.aws.amazon.com/general/latest/gr/rande.html#ak_region)を参照してください。**

## ストリームを作成する
<a name="kinesis-using-sdk-java-create-the-stream"></a>

Kinesis Data Streams クライアントを作成したら、コンソールまたはプログラムでストリームを作成できます。プログラムでストリームを作成するには、`CreateStreamRequest` オブジェクトをインスタンス化し、ストリームの名前を指定します。プロビジョンドモードを使用する場合は、使用するデータストリームのシャード数を指定する必要があります。
+ **オンデマンド**:

  ```
  CreateStreamRequest createStreamRequest = new CreateStreamRequest();
  createStreamRequest.setStreamName( myStreamName );
  ```
+ **プロビジョンド**:

  ```
  CreateStreamRequest createStreamRequest = new CreateStreamRequest();
  createStreamRequest.setStreamName( myStreamName );
  createStreamRequest.setShardCount( myStreamSize );
  ```

ストリーム名はストリームを識別するために使用されます。名前は、アプリケーションで使用される AWS アカウントに限定されます。また、リージョンにも限定されます。つまり、2 つの異なる AWS アカウントの 2 つのストリームは同じ名前を持つことができ、2 つの異なるリージョンの AWS 2 つのストリームは同じ名前を持つことができますが、同じアカウントと同じリージョンの 2 つのストリームを持つことはできません。

ストリームのスループットはシャードの数の関数です。プロビジョニングされたスループットを向上させるには、より多くのシャードが必要です。シャードが増えると、ストリームに AWS 課金されるコストも増加します。アプリケーションに適切なシャードの数の計算の詳細については、[適切なストリームモードを選ぶ](how-do-i-size-a-stream.md)を参照してください。

 `createStreamRequest` オブジェクトを設定した後、クライアントの `createStream` メソッドを呼び出すことで、ストリームを作成します。`createStream` の呼び出し後、ストリームに対してさらにオペレーションを実行するには、ストリームが `ACTIVE` 状態になるまで待機します。ストリームの状態を確認するには、`describeStream` メソッドを呼び出します。ただし、ストリームが存在しない場合、`describeStream` は例外をスローします。そのために、`describeStream` の呼び出しは `try/catch` ブロックで囲みます。

```
client.createStream( createStreamRequest );
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName( myStreamName );

long startTime = System.currentTimeMillis();
long endTime = startTime + ( 10 * 60 * 1000 );
while ( System.currentTimeMillis() < endTime ) {
  try {
    Thread.sleep(20 * 1000);
  } 
  catch ( Exception e ) {}
  
  try {
    DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest );
    String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
    if ( streamStatus.equals( "ACTIVE" ) ) {
      break;
    }
    //
    // sleep for one second
    //
    try {
      Thread.sleep( 1000 );
    }
    catch ( Exception e ) {}
  }
  catch ( ResourceNotFoundException e ) {}
}
if ( System.currentTimeMillis() >= endTime ) {
  throw new RuntimeException( "Stream " + myStreamName + " never went active" );
}
```

# ストリームを更新する
<a name="updating-a-stream"></a>

Kinesis Data Streams コンソール、Kinesis Data Streams API、または AWS CLIを使用して、ストリームの詳細を更新できます。

**注記**  
既存のストリーム、または最近作成したストリームに対して、サーバー側の暗号化を有効にすることができます。

## コンソールを使用する
<a name="update-stream-console"></a>

**コンソールを使用してデータストリームを更新するには**

1. Amazon Kinesis コンソール ([https://console.aws.amazon.com/kinesis/](https://console.aws.amazon.com/kinesis/)) を開きます。

1. ナビゲーションバーで、リージョンセレクターを展開し、リージョンを選択します。

1. リストのストリームの名前を選択します。[**ストリームの詳細**] ページには、ストリーム設定の概要とモニタリング情報が表示されます。

1. データストリームのオンデマンド容量モードとプロビジョンド容量モードを切り替えるには、**[Configuratio]** (設定) タブの **[Edit capacity mode]** (容量モードの編集) を選択します。詳細については、「[適切なストリームモードを選ぶ](how-do-i-size-a-stream.md)」を参照してください。
**重要**  
 AWS アカウントのデータストリームごとに、オンデマンドモードとプロビジョニングモードを 24 時間以内に 2 回切り替えることができます。

1. プロビジョンドモードのデータストリームの場合、シャードの数を編集するには、**[Configuration]** (設定) タブの **[Edit provisioned shards]** (プロビジョニングされたシャードの編集) を選択し、新しいシャードカウントを入力します。

1. データレコードのサーバー側の暗号化を有効にするには、[**サーバー側の暗号化**] セクションの [**編集**] を選択します。暗号化のマスターキーとして使用する KMS キーを選択するか、 Kinesis によって管理されるデフォルトのマスターキー **aws/kinesis** を使用します。ストリームの暗号化を有効にし、独自の AWS KMS マスターキーを使用する場合は、プロデューサーアプリケーションとコンシューマーアプリケーションが、使用した AWS KMS マスターキーにアクセスできることを確認してください。ユーザーが生成した AWS KMS キーにアクセスするためのアクセス許可をアプリケーションに割り当てるには、[ユーザー生成の KMS キーを使用するための許可](permissions-user-key-KMS.md)を参照してください。

1. データ保持期間を編集するには、[**Data retention period**] セクションの [**Edit**] を選択し、新しいデータ保持期間を入力します。

1. アカウントでカスタムメトリクスを有効にした場合は、[**シャードレベルメトリクスの編集**] セクションの [**編集**] を選択し、ストリームのメトリクスを指定します。詳細については、「[Amazon CloudWatch による Amazon Kinesis Data Streams サービスを監視する](monitoring-with-cloudwatch.md)」を参照してください。

## API を使用する
<a name="update-stream-api"></a>

API を使用してストリームの詳細を更新するには、次の方法を参照してください。
+ [AddTagsToStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_AddTagsToStream.html)
+ [DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html)
+ [DisableEnhancedMonitoring](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DisableEnhancedMonitoring.html)
+ [EnableEnhancedMonitoring](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_EnableEnhancedMonitoring.html)
+ [IncreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_IncreaseStreamRetentionPeriod.html)
+ [RemoveTagsFromStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RemoveTagsFromStream.html)
+ [StartStreamEncryption](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StartStreamEncryption.html)
+ [StopStreamEncryption](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_StopStreamEncryption.html)
+ [UpdateShardCount](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html)

## を使用する AWS CLI
<a name="update-stream-cli"></a>

を使用してストリームを更新する方法については AWS CLI、[Kinesis CLI リファレンス](https://docs.aws.amazon.com/cli/latest/reference/kinesis/index.html)を参照してください。

# ストリームの一覧表示
<a name="kinesis-using-sdk-java-list-streams"></a>

ストリームの範囲は、Kinesis Data Streams クライアントのインスタンス化に使用される AWS 認証情報に関連付けられた AWS アカウントと、クライアントに指定されたリージョンに限定されます。 AWS アカウントを使用して多数のストリームを 1 度にアクティブにできます。ストリームは、Kinesis Data Streams コンソールでリストするか、プログラムによってリストすることができます。このセクションのコードは、 AWS アカウントのすべてのストリームを一覧表示する方法を示しています。

```
ListStreamsRequest listStreamsRequest = new ListStreamsRequest();
listStreamsRequest.setLimit(20); 
ListStreamsResult listStreamsResult = client.listStreams(listStreamsRequest);
List<String> streamNames = listStreamsResult.getStreamNames();
```

このコード例では、最初に `ListStreamsRequest` の新しいインスタンスを作成し、その `setLimit` メソッドを呼び出して、最大 20 個のストリームが `listStreams` の呼び出しごとに返されるように指定しています。`setLimit` の値を指定しない場合は、アカウント内のストリーム数以下のストリームが Kinesis Data Streams によって返されます。次に、コードはクライアントの `listStreamsRequest` メソッドに `listStreams` を渡します。`listStreams` の戻り値は `ListStreamsResult` オブジェクトに格納されます。コードはこのオブジェクトの `getStreamNames` メソッドを呼び出して、返されたストリームの名前を `streamNames` リストに格納します。アカウントとリージョンにこの制限で指定したよりも多くのストリームがある場合でも、Kinesis Data Streams によって返されるストリームの数が指定した制限に満たないことがあります。確実にすべてのストリームを取得するには、次のコード例で説明している `getHasMoreStreams` メソッドを使用します。

```
while (listStreamsResult.getHasMoreStreams()) 
{
    if (streamNames.size() > 0) {
      listStreamsRequest.setExclusiveStartStreamName(streamNames.get(streamNames.size() - 1));
    }
    listStreamsResult = client.listStreams(listStreamsRequest);
    streamNames.addAll(listStreamsResult.getStreamNames());
}
```

このコードは、`getHasMoreStreams` の `listStreamsRequest` メソッドを呼び出して、`listStreams` の最初の呼び出しで返されたストリームの数よりも多いストリームがあるかどうかを確認します。ある場合、コードは `setExclusiveStartStreamName` メソッドを呼び出して、`listStreams` の前の呼び出しで返された最後のストリームの名前を指定します。`setExclusiveStartStreamName` メソッドは `listStreams` の次の呼び出しをそのストリームの後から開始します。その呼び出しによって返されたストリーム名のグループが `streamNames` リストに追加されます。すべてのストリームの名前がリストに収集されるまで、この処理を続行します。

 `listStreams` で返されるストリームは以下のいずれかの状態になります。
+ `CREATING`
+ `ACTIVE`
+ `UPDATING`
+ `DELETING`

前の`describeStream`で示した [API を使用してストリームを作成する](kinesis-using-sdk-java-create-stream.md) メソッドを使用して、ストリームの状態を確認できます。

# シャードを一覧表示する
<a name="kinesis-using-sdk-java-list-shards"></a>

データストリームは 1 つ以上のシャードを持つことができます。データストリームからシャードを一覧表示または取得するのに推奨される方法は、[ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html) API を使用することです。次の例では、データストリーム内のシャードを一覧表示する方法を示します。この例で使用しているメインオペレーションと、このオペレーションで設定できるすべてのパラメータの詳細については、[ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html)を参照してください。

```
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;

import java.util.concurrent.TimeUnit;

public class ShardSample {

    public static void main(String[] args) {

        KinesisAsyncClient client = KinesisAsyncClient.builder().build();

        ListShardsRequest request = ListShardsRequest
                .builder().streamName("myFirstStream")
                .build();

        try {
            ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS);
            System.out.println(response.toString());
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }
    }
}
```

前のコード例を実行するには、次のような POM ファイルを使用できます。

```
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>kinesis.data.streams.samples</groupId>
    <artifactId>shards</artifactId>
    <version>1.0-SNAPSHOT</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>software.amazon.awssdk</groupId>
            <artifactId>kinesis</artifactId>
            <version>2.0.0</version>
        </dependency>
    </dependencies>
</project>
```

`ListShards` API では、[ShardFilter](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html) パラメーターを使用して API の応答をフィルターで除外できます。一度に 1 つのフィルターしか指定できません。

ListShards API を呼び出すときに `ShardFilter` パラメータを使用する場合、`Type` は必須プロパティであり、指定する必要があります。`AT_TRIM_HORIZON`、`FROM_TRIM_HORIZON`、または `AT_LATEST` タイプを指定する場合は、`ShardId` または `Timestamp` のオプションのプロパティを指定する必要はありません。

`AFTER_SHARD_ID` タイプを指定する場合は、オプションの `ShardId` プロパティの値も指定する必要があります。`ShardId` プロパティは、機能的には ListShards API の `ExclusiveStartShardId` パラメーターと同じです。`ShardId` プロパティが指定されている場合、レスポンスには、指定した `ShardId` の直後に ID が続くシャードで始まるシャードが含まれます。

`AT_TIMESTAMP` または `FROM_TIMESTAMP_ID` タイプを指定する場合は、オプションの `Timestamp` プロパティの値も指定する必要があります。`AT_TIMESTAMP` タイプを指定する場合は、指定されたタイムスタンプで開いていたすべてのシャードが返されます。`FROM_TIMESTAMP` タイプを指定する場合は、TIP に指定されたタイムスタンプから始まるすべてのシャードが返されます。

**重要**  
`DescribeStreamSummary` および `ListShard` API は、データストリームに関する情報を取得するための、よりスケーラブルな方法を提供します。具体的には、DescribeStream API のクォータによってスロットリングが発生する可能性があります。詳細については、「[クォータと制限](service-sizes-and-limits.md)」を参照してください。また、`DescribeStream`クォータは AWS 、アカウント内のすべてのデータストリームとやり取りするすべてのアプリケーション間で共有されることに注意してください。一方、ListShards API のクォータは、単一のデータストリームに固有です。したがって、ListShards API を使用すると TPS が高くなるだけでなく、データストリームを作成するにつれてアクションのスケーリングが向上します。  
DescribeStream API を呼び出すすべてのプロデューサーとコンシューマーを移行して、代わりに DescribeStreamSummary および ListShard API を呼び出すことをお勧めします。これらのプロデューサーとコンシューマーを特定するには、KPL および KCL のユーザーエージェントが API コールでキャプチャされるため、Athena を使用して CloudTrail ログを解析することをお勧めします。  

```
SELECT useridentity.sessioncontext.sessionissuer.username, 
useridentity.arn,eventname,useragent, count(*) FROM 
cloudtrail_logs WHERE Eventname IN ('DescribeStream')  AND 
eventtime
    BETWEEN ''
        AND ''
GROUP BY  useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent
ORDER BY  count(*) DESC LIMIT 100
```
API を呼び出す AWS Lambda および Amazon Firehose と Kinesis Data Streams の統合`DescribeStream`は、統合が代わりに `DescribeStreamSummary`および を呼び出すように再設定することをお勧めします`ListShards`。具体的には、 AWS Lambda の場合は、イベントソースマッピングを更新する必要があります。Amazon Firehose については、対応する IAM 許可を更新して、それらに `ListShards` IAM 許可を含める必要があります。

# ストリームを削除する
<a name="kinesis-using-sdk-java-delete-stream"></a>

ストリームは Kinesis Data Streams コンソールで削除するか、プログラムによって削除することができます。ストリームをプログラムで削除するには、次のコードに示されているように `DeleteStreamRequest` を使用します。

```
DeleteStreamRequest deleteStreamRequest = new DeleteStreamRequest();
deleteStreamRequest.setStreamName(myStreamName);
client.deleteStream(deleteStreamRequest);
```

ストリームを削除する前に、そのストリーム上で動作しているアプリケーションをすべてシャットダウンします。削除したストリームとアプリケーションがやり取りしようとすると、`ResourceNotFound` 例外を受け取ります。また、削除前のストリームと同じ名前の新しいストリームを作成した場合、前のストリームとやり取りしていたアプリケーションが実行されていると、それらのアプリケーションが前のストリームであるかのように新しいストリームとやり取りしようとして、予期しない動作につながることがあります。

# ストリームをリシャーディングする
<a name="kinesis-using-sdk-java-resharding"></a>

**重要**  
[UpdateShardCount](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UpdateShardCount.html) API を使用して、ストリームのシャードを組み直すことができます。それ以外の場合は、ここで説明したように分割とマージを実行できます。

Amazon Kinesis Data Streams では、*リシャーディング*がサポートされています。リシャーディングでは、ストリーム内のシャードの数を調整して、ストリームのデータフロー率の変化に適応させることができます。リシャーディングは高度なオペレーションと見なされます。Kinesis Data Streams を初めて使用する場合は、Kinesis Data Streams の他のあらゆる機能に詳しくなってから、このトピックをお読みください。

リシャーディングには、シャードの分割と結合という 2 種類のオペレーションがあります。シャードの分割では、1 つのシャードを 2 つシャードに分けます。シャードの結合では、2 つシャードを 1 つのシャードに組み合わせます。リシャーディングは、1 回のオペレーションでシャードに分割できる数と 1 回のオペレーションで結合できるシャードの数が 2 個以下に限られるという意味で、常に*ペアワイズ*です。リシャーディングオペレーションの対象となるシャードまたはシャードペアは、*親*シャードと呼ばれます。リシャーディングオペレーションを実行した結果のシャードまたはシャードペアは、*子*シャードと呼ばれます。

分割によりストリーム内のシャードの数が増え、したがってストリームのデータ容量は増えます。シャード単位で請求されるため、分割によりストリームのコストが増えます。同様に、マージによってストリーム内のシャードの数が減り、それに伴いストリームのデータ容量 (コスト) が減少します。

リシャーディングは、通常、プロデューサー (入力) アプリケーションやコンシューマー (取得) アプリケーションとは別の管理アプリケーションによって実行されます。このような管理アプリケーションは、Amazon CloudWatch が提供するメトリクス、またはプロデューサーとコンシューマーから収集されたメトリクスに基づいて、ストリームの全体的なパフォーマンスをモニタリングします。管理アプリケーションには、コンシューマーまたはプロデューサーよりも広範な IAM 許可も必要になります。コンシューマーとプロデューサーは通常、リシャーディングに使用される API にアクセスする必要がないためです。Kinesis Data Streams の IAM 許可の詳細については、[IAM を使用した Amazon Kinesis Data Streams リソースへのアクセスの制御](controlling-access.md) を参照してください。

リシャーディングの詳細については、[Kinesis Data Streams で開いているシャードの数を変更するにはどうすればよいですか?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-streams-open-shards/) を参照してください。

**Topics**
+ [リシャーディング戦略を決定する](kinesis-using-sdk-java-resharding-strategies.md)
+ [シャードを分割する](kinesis-using-sdk-java-resharding-split.md)
+ [2 つのシャードを結合する](kinesis-using-sdk-java-resharding-merge.md)
+ [リシャーディングアクションを完了する](kinesis-using-sdk-java-after-resharding.md)

# リシャーディング戦略を決定する
<a name="kinesis-using-sdk-java-resharding-strategies"></a>

Amazon Kinesis Data Streams におけるリシャーディングの目的は、ストリームをデータの流量の変化に適応させることです。シャードを分割すると、ストリームの容量 (およびコスト) が増えます。シャードを結合すると、ストリームのコスト (および容量) が減ります。

 リシャーディングへの 1 つのアプローチとして考えられるのは、ストリーム内のすべてのシャードを分割することです。これにより、ストリームの容量は倍増します。ただし、実際に必要になるよりも多くの容量が追加されるため、不要なコストが生じる可能性があります。

メトリクスを使用して、シャードが*ホット*であるか*コールド*であるか、つまり、想定より過多なデータを受け取っているか、過少なデータを受け取っているかを判断できます。ホットシャードは分割して、それらのハッシュキーに対応した容量を増やすことができます。同様に、コールドシャードは結合して、未使用の容量をより有効に活用できます。

Kinesis Data Streams が発行する Amazon CloudWatch メトリクスから、ストリームのパフォーマンスデータを取得できます。ただし、ストリームについて独自のメトリックを収集することもできます。1 つのアプローチとして考えられるのは、データレコードのパーティションキーによって生成されたハッシュキー値をログに記録することです。ストリームにレコードを追加するときにパーティションキーを指定していることを思い出してください。

```
putRecordRequest.setPartitionKey( String.format( "myPartitionKey" ) );
```

Kinesis Data Streams では、[MD5](http://en.wikipedia.org/wiki/MD5) を使用してパーティションキーからハッシュキーを計算します。レコードのパーティションキーを指定しているため、MD5 を使用してそのレコードのハッシュキー値を計算し、ログに記録できます。

また、データレコードが割り当てられているシャードの ID をログに記録することもできます。シャード ID は、`getShardId` メソッドによって返される `putRecordResults` オブジェクトおよび `putRecords` メソッドによって返される `putRecordResult` オブジェクトの `putRecord` メソッドを使用することによって利用できます。

```
String shardId = putRecordResult.getShardId();
```

シャード ID とハッシュキー値を使用すると、最も多いまたは少ないトラフィックを受け取っているシャードとハッシュキーを特定できます。その後、リシャーディングによりこれらのハッシュキーに対応した容量を増やすか減らすことができます。

# シャードを分割する
<a name="kinesis-using-sdk-java-resharding-split"></a>

Amazon Kinesis Data Streams のシャードを分割するには、親シャードのハッシュキー値を子シャードに再配分する方法を指定する必要があります。データレコードをストリームに追加すると、レコードはハッシュキー値に基づいてシャードに割り当てられます。ハッシュキー値は、ストリームに追加するデータレコードに指定するパーティションキーの [MD5](http://en.wikipedia.org/wiki/MD5) ハッシュです。パーティションキーが同じデータレコードはハッシュキー値も同じです。

指定したシャードに使用可能なハッシュキー値は、順序付けられた連続する正の整数で構成されます。ハッシュキーの一連の値は以下の式を使用して導き出します。

```
shard.getHashKeyRange().getStartingHashKey();
shard.getHashKeyRange().getEndingHashKey();
```

シャードを分割するときは、この一連の値を指定します。そのハッシュキー値とそれより上位のすべてのハッシュキー値は、いずれかの子シャードの配分されます。それより下位のすべてのハッシュキー値は、その他の子のシャードに配分されます。

以下のコードでは、子シャード間でハッシュキーを均等に再配分し、親シャードを半分に分割する基本的なシャード分割オペレーションを示します。これは、親シャードを分割する方法の 1 つに過ぎません。たとえば、親シャードの下位 1/3 のキーを 1 つの子シャードに配分し、上位 2/3 のキーをその他の子シャードに配分して、シャードを分割することもできます。ただし、多くアプリケーションに効果的なのは、シャードを半分に分割することです。

以下のコードでは、`myStreamName` にストリームの名前が格納され、オブジェクト変数 `shard` に分割するシャードが格納されるとします。新しい `splitShardRequest` オブジェクトをインスタンス化し、ストリーム名とシャード ID を設定することから始めます。

```
SplitShardRequest splitShardRequest = new SplitShardRequest();
splitShardRequest.setStreamName(myStreamName);
splitShardRequest.setShardToSplit(shard.getShardId());
```

シャード内の最小値と最大値の中間にあるハッシュキー値を決定します。これは、子シャードの開始ハッシュキー値になり、親シャードのハッシュキーの上位半分が含まれます。この値を `setNewStartingHashKey` メソッドで指定します。この値だけを指定する必要があります。この値より下位のハッシュキーは、分割によって作成されたその他の子シャードに、Kinesis Data Streams によって自動的に配分されます。最後のステップとして、Kinesis Data Streams クライアントで `splitShard` メソッドを呼び出します。

```
BigInteger startingHashKey = new BigInteger(shard.getHashKeyRange().getStartingHashKey());
BigInteger endingHashKey   = new BigInteger(shard.getHashKeyRange().getEndingHashKey());
String newStartingHashKey  = startingHashKey.add(endingHashKey).divide(new BigInteger("2")).toString();

splitShardRequest.setNewStartingHashKey(newStartingHashKey);
client.splitShard(splitShardRequest);
```

この方法の後の最初の手順は、[ストリームが再度アクティブになるまで待機する](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-wait-until-active)に示されています。

# 2 つのシャードを結合する
<a name="kinesis-using-sdk-java-resharding-merge"></a>

 シャードの結合オペレーションは、指定した 2 つのシャードを取得し、1 つシャードに組み合わせます。結合後、1 つの子シャードは 2 つの親シャードのすべてのハッシュキー値のデータを受け取ります。

**シャードの隣接**  
2 つのシャードを結合するには、シャードが*隣接*している必要があります。2 つのシャードのハッシュキー範囲が途切れておらず連続している場合、2 つのシャードは隣接していると考えられます。たとえば、2 つのシャードがあり、1 つのハッシュキー範囲が 276〜381、もう 1 つのハッシュキー範囲が 382〜454 であるとします。この 2 つのシャードは 1 つのシャードに結合可能であり、結合した場合のハッシュキー範囲は 276〜454 となります。

別の例として 2 つのシャードがあり、1 つのハッシュキー範囲が 276〜381、もう 1 つのハッシュキー範囲が 455〜560 であるとします。この 2 つのシャードは結合できません。これらの間に 1 つ以上のシャード (ハッシュキー範囲が 382〜454) が介在している可能性があります。

ストリーム内のすべての `OPEN` シャードのセットは、グループとして、常に MD5 ハッシュキー値の範囲全体にまたがります。`CLOSED` など、シャード状態の詳細については、[リシャーディング後のデータのルーティング、データの永続化、シャードの状態を考慮する](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-data-routing) を参照してください。

結合候補になるシャードを特定するには、`CLOSED` 状態にあるすべてのシャードを除外する必要があります。`OPEN` であり、`CLOSED` ではないシャードには、`null` の終了シーケンス番号があります。以下のコードを使用してシャードの終了シーケンス番号をテストできます。

```
if( null == shard.getSequenceNumberRange().getEndingSequenceNumber() ) 
{
  // Shard is OPEN, so it is a possible candidate to be merged.
}
```

CLOSED 状態のシャードを除外した後、各シャードでサポートされている最大ハッシュキー値で、残りのシャードを並べ替えます。以下のコード使用してこの値を取得できます。

```
shard.getHashKeyRange().getEndingHashKey();
```

 このフィルタリングして並び替えたリストで 2 つシャードが隣接している場合、それらのシャードは結合できます。

**結合オペレーションのコード**  
 以下のコードでは、2 つシャードを結合しています。`myStreamName` には、ストリームの名前が格納され、オブジェクト変数 `shard1` と `shard2` には、結合する 2 つの隣接するシャードが格納されるとします。

結合オペレーションの場合、新しい `mergeShardsRequest` オブジェクトをインスタンス化することから始めます。`setStreamName` メソッドでストリーム名を指定します。次に、`setShardToMerge` と `setAdjacentShardToMerge` のメソッドを使用して、結合する 2 つのシャードを指定します。最後に、Kinesis Data Streams クライアントで `mergeShards` メソッドを呼び出して、このオペレーションを実行します。

```
MergeShardsRequest mergeShardsRequest = new MergeShardsRequest();
mergeShardsRequest.setStreamName(myStreamName);
mergeShardsRequest.setShardToMerge(shard1.getShardId());
mergeShardsRequest.setAdjacentShardToMerge(shard2.getShardId());
client.mergeShards(mergeShardsRequest);
```

この方法の後の最初の手順は、[ストリームが再度アクティブになるまで待機する](kinesis-using-sdk-java-after-resharding.md#kinesis-using-sdk-java-resharding-wait-until-active)に示されています。

# リシャーディングアクションを完了する
<a name="kinesis-using-sdk-java-after-resharding"></a>

Amazon Kinesis Data Streams でリシャーディングの手順が終了し、通常のレコード処理を再開する前に必要な手順や検討事項があります。以下のセクションでは、これらについて説明します。

**Topics**
+ [ストリームが再度アクティブになるまで待機する](#kinesis-using-sdk-java-resharding-wait-until-active)
+ [リシャーディング後のデータのルーティング、データの永続化、シャードの状態を考慮する](#kinesis-using-sdk-java-resharding-data-routing)

## ストリームが再度アクティブになるまで待機する
<a name="kinesis-using-sdk-java-resharding-wait-until-active"></a>

リシャーディングオペレーションとして `splitShard` または `mergeShards` のいずれかを呼び出した後、ストリームが再びアクティブになるまで待機する必要があります。使用するコードは、[ストリームの作成](kinesis-using-sdk-java-create-stream.md)後にストリームがアクティブになるまで待機する場合のものと同じです。コードは次のとおりです。

```
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
describeStreamRequest.setStreamName( myStreamName );

long startTime = System.currentTimeMillis();
long endTime = startTime + ( 10 * 60 * 1000 );
while ( System.currentTimeMillis() < endTime ) 
{
  try {
    Thread.sleep(20 * 1000);
  } 
  catch ( Exception e ) {}
  
  try {
    DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest );
    String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus();
    if ( streamStatus.equals( "ACTIVE" ) ) {
      break;
    }
   //
    // sleep for one second
    //
    try {
      Thread.sleep( 1000 );
    }
    catch ( Exception e ) {}
  }
  catch ( ResourceNotFoundException e ) {}
}
if ( System.currentTimeMillis() >= endTime ) 
{
  throw new RuntimeException( "Stream " + myStreamName + " never went active" );
}
```

## リシャーディング後のデータのルーティング、データの永続化、シャードの状態を考慮する
<a name="kinesis-using-sdk-java-resharding-data-routing"></a>

Kinesis Data Streams は、リアルタイムのデータストリーミングサービスです。アプリケーションは、データがストリーム内のシャードを継続的に流れることを前提とする必要があります。リシャーディングすると、親シャードに流れていたデータレコードは、データレコードのパーティションキーがマッピングされるハッシュキー値に基づいて、子シャードに流れるように再ルーティングされます。ただし、リシャーディング前に親シャードにあったデータレコードはすべて、それらのシャードに残ります。リシャーディング後に親シャードが失われることはありません。それらのシャードはリシャーディング前に格納されていたデータと共に保持されます。親シャードのデータレコードには、Kinesis Data Streams API の [`getShardIterator` および `getRecords`](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data) オペレーションを使用するか、Kinesis Client Library を介してアクセスできます。

**注記**  
データレコードは、現在の保持期間にストリームを追加した時間からアクセスできます。これは、その期間内のストリームのシャードの変更に関係なく当てはまります。ストリームの保持期間の詳細については、[データ保持期間を変更する](kinesis-extended-retention.md)を参照してください。

リシャーディングの過程で、親シャードは `OPEN` 状態から `CLOSED` 状態に、さらに `EXPIRED` 状態へと移行します。
+  **OPEN**: リシャーディングオペレーションに先立って、親シャードは `OPEN` 状態にあります。つまり、データレコードはシャードに追加したり、シャードから取得したりできます。
+  **CLOSED**: リシャーディングオペレーション後に、親シャードは `CLOSED` 状態に移行します。つまり、データレコードはシャードに追加されなくなります。このシャードに追加されることになっていたデータレコードは、子シャードに追加されるようになります。ただし、データレコードは引き続き、制限された時間内にシャードから取得できます。
+  **EXPIRED**: ストリーム保持期間の有効期限が切れると、親シャードのすべてのデータレコードが期限切れとなり、アクセスできなくなります。この時点で、シャード自体は `EXPIRED` 状態に移行します。`getStreamDescription().getShards` を呼び出してストリーム内のシャードを列挙しても、返されるシャードのリストには 状態のシャードは含まれません。`EXPIRED`ストリームの保持期間の詳細については、[データ保持期間を変更する](kinesis-extended-retention.md)を参照してください。

リシャーディング後、ストリームが再び `ACTIVE` 状態になるとすぐに、子シャードからのデータの読み取りを開始できます。ただし、リシャーディング後に残った親シャードには、リシャーディング前にストリームに追加されてまだ読み取られていないデータがそのまま格納されている可能性があります。親シャードからすべてのデータを読み取る前に、子シャードからデータを読み取った場合は、特定のハッシュキーが原因で、読み取ったデータがデータレコードのシーケンス番号に基づいた順序に並ばない可能性があります。したがって、データの順序が重要である場合は、リシャーディング後そのデータを使い切るまで、親シャードからのデータの読み取りを続行する必要があります。子シャードからのデータの読み取りは必ずその後で開始してください。`getRecordsResult.getNextShardIterator` が を返した場合は、親シャード内のすべてのデータを読み取ったということです。`null`

# データ保持期間を変更する
<a name="kinesis-extended-retention"></a>

Amazon Kinesis Data Streams は、データストリームのデータレコードの保持期間の変更をサポートしています。Kinesis data stream はデータレコードの順序付けられたシーケンスで、リアルタイムで書き込みと読み取りができることが前提となっています。したがって、データレコードはシャードに一時的に保存されます。レコードが追加されてからアクセスできなくなるまでの期間は、*保持期間*と呼ばれます。Kinesis data stream は、デフォルトでは 24 時間レコードを保持し、最大値は 8,760 時間 (365 日) です。

保持期間は、Kinesis Data Streams コンソールまたは[IncreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_IncreaseStreamRetentionPeriod.html) および [DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html) オペレーションで更新できます。Kinesis Data Streams コンソールでは、複数のデータストリームの保持期間を同時に一括編集できます。[IncreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_IncreaseStreamRetentionPeriod.html) オペレーションまたは Kinesis Data Streams コンソールを使用して、保持期間を最大 8760 時間 (365 日) まで増やすことができます。また、[DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html) オペレーションまたは Kinesis Data Streams コンソールを使用して、保持期間を最短 24 時間まで減らすことができます。両方のオペレーションに対するリクエスト構文には、時間にストリーム名と保存期間が含まれます。最後に、[DescribeStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html) オペレーションを呼び出すことで、ストリームの現在の保持期間を確認できます。

 AWS CLIを使用して保持期間を変更する例を以下に示します。

```
aws kinesis increase-stream-retention-period --stream-name retentionPeriodDemo --retention-period-hours 72
```

Kinesis Data Streams は、数分間延長した保持期間内で古い保持期間のアクセス停止を解除することができます。たとえば、保持期間を 24 時間から 48 時間に変更すると、23 時間 55 分前にストリームに追加されたレコードは、さらに 24 時間後まで使用できます。

Kinesis Data Streams は、保持期間が短縮されると、新しい保持期間よりも古いレコードをほぼ即座にアクセス不能にします。そのため、[DecreaseStreamRetentionPeriod](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DecreaseStreamRetentionPeriod.html) オペレーションを呼び出すときは、細心の注意が必要です。

問題が発生した場合は、期限が切れる前にデータを読めるようにデータ保持期間を設定します。レコード処理ロジックの問題または長期間にわたるダウンストリームの依存関係など、あらゆる可能性を慎重に検討してください。データコンシューマーの回復時間に余裕が出るように、保持期間は慎重に設定します。保持期間 API オペレーションでは、この期間を事前に設定したり、オペレーションイベントにリアクティブに対応したりできます。

 24 時間を超える保持期間を設定されたストリームに追加料金が適用されます。詳細については、「[Amazon Kinesis Data Streams の料金](https://aws.amazon.com/kinesis/data-streams/pricing/)」を参照してください。

# Amazon Kinesis Data Streams リソースにタグを付ける
<a name="tagging"></a>

Amazon Kinesis Data Streams で作成したストリームおよび拡張ファンアウトコンシューマーに、独自のメタデータをタグ**形式で割り当てることができます。タグは、ストリームに対して定義するキーと値のペアです。タグの使用は、 AWS リソースを管理し、請求データを含むデータを整理するためのシンプルで強力な方法です。

**Topics**
+ [タグの基本を確認する](#tagging-basics)
+ [タグ付けを使用したコストを追跡する](#tagging-billing)
+ [タグの制限を理解する](#tagging-restrictions)
+ [Kinesis Data Streams コンソールを使用してストリームにタグ付けする](#tagging-console)
+ [を使用してストリームにタグを付ける AWS CLI](#tagging-cli)
+ [Kinesis Data Streams API を使用したストリームにタグを付ける](#tagging-api)
+ [を使用してコンシューマーにタグを付ける AWS CLI](#tagging-consumers-cli)
+ [Kinesis Data Streams API を使用してコンシューマーにタグ付けする](#tagging-consumers-api)

## タグの基本を確認する
<a name="tagging-basics"></a>

タグ付けできる Kinesis Data Streams リソースには、データストリームと拡張ファンアウトコンシューマーが含まれます。Kinesis Data Streams コンソール AWS CLI、または Kinesis Data Streams API を使用して、次のタスクを完了します。
+ タグの付いたリソースを作成する
+ リソースにタグを追加する
+ リソースのタグを一覧表示する
+ リソースからタグを削除する

**注記**  
Kinesis Data Streams コンソールでは、拡張ファンアウトコンシューマーにタグを適用することはできません。コンシューマーにタグを適用するには、 AWS CLI または Kinesis Data Streams API を使用します。

タグを使用して リソースを分類できます。例えば、目的、所有者、環境などに基づいてリソースを分類できます。タグごとにキーと値を定義するため、特定のニーズに合わせてカテゴリのカスタムセットを作成できます。たとえば、所有者と、関連するアプリケーションに基づいてリソースを追跡するのに役立つタグのセットを定義できます。次にいくつかのタグの例を示します。
+ プロジェクト: プロジェクト名
+ 所有者: 名前
+ 目的: 負荷テスト 
+ アプリケーション: アプリケーション名
+ 環境:本稼働 

**重要**  
ストリームの作成中にタグを追加するには、そのストリームに対して `kinesis:CreateStream` と `kinesis:AddTagsToStream` の権限を付与する必要があります。ストリーム作成時に `kinesis:TagResource` 権限を使用してタグ付けすることは**できません**。
コンシューマー登録中にタグを追加するには、`kinesis:TagResource` と`kinesis:RegisterStreamConsumer` の権限を付与する必要があります。

## タグ付けを使用したコストを追跡する
<a name="tagging-billing"></a>

タグを使用して、 AWS コストを分類および追跡できます。Kinesis Data Streams リソースにタグを適用すると、 AWS コスト配分レポートには、タグ別に集計された使用量とコストが含まれます。コストセンター、アプリケーション名、所有者などの自社のカテゴリを表すタグを適用すると、複数のサービスにわたってコストを分類することができます。詳細については、*AWS Billing ユーザーガイド*の[コスト配分タグを使用したカスタム請求レポート](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html)を参照してください。

## タグの制限を理解する
<a name="tagging-restrictions"></a>

タグには以下の制限があります。

**基本制限**
+ 1 つのリソースに付けることができるタグの最大数は 50 です。
+ タグキーと値は大文字と小文字が区別されます。
+ 削除されたリソースのタグを変更または編集することはできません。

**タグキーの制限**
+ 各タグキーは一意である必要があります。既に使用されているキーを持つタグを追加すると、新しいタグによって既存のキーと値のペアが上書きされます。
+ このプレフィックスはユーザーに代わって`aws:`このプレフィックスで始まるタグ AWS AWS を作成するため、 でタグキーを開始することはできませんが、編集または削除することはできません。
+ タグキーの長さは 1～128 文字 (Unicode) にする必要があります。
+ タグキーは、次の文字で構成する必要があります。Unicode 文字、数字、空白、特殊文字 (`_ . / = + - @`)。

**タグ値の制限**
+ タグ値の長さは 0～255 文字 (Unicode) にする必要があります。
+ タグ値は空白にすることができます。空白にしない場合は、次の文字で構成する必要があります。Unicode 文字、数字、空白、特殊文字 (`_ . / = + - @`)。

## Kinesis Data Streams コンソールを使用してストリームにタグ付けする
<a name="tagging-console"></a>

Kinesis Data Streams コンソールを使用して、ストリームにタグを追加、更新、一覧表示、および削除できます。

**ストリームのタグを表示するには**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) で Kinesis コンソールを開きます。

1. 左のナビゲーションペインの [**データストリーム**] を選択します。

1. [**データストリーム**] ページで、タグ付けするストリームを選択します。

1. [ストリームの詳細] ページで、[**設定**] タブを選択します。

1. [**タグ**] セクションで、ストリームに適用されているタグを表示します。

**タグを使用してデータストリームを作成するには**

1. Kinesis Data Streams コンソールを開きます。

1. 左のナビゲーションペインの [**データストリーム**] を選択します。

1. [**データストリームの作成**] を選択します。

1. [**データストリームの作成**] ページで、データストリームの名前を入力します。

1. [**データストリームの容量**] には、[**オンデマンド**] または [**プロビジョンド**] のキャパシティモードを選択します。

   キャパシティモードの詳細については、[適切なストリームモードを選ぶ](how-do-i-size-a-stream.md) を参照してください。

1. **[タグ]** セクションで、次の操作を行います。

   1. [**新しいタグを追加**] をクリックします。

   1. [**キー**] フィールドでタグを入力し、オプションとして [**値**] フィールドで値を指定します。

      エラーが表示された場合は、指定したタグキーまたは値がタグ制限を満たしていません。詳細については、「[タグの制限を理解する](#tagging-restrictions)」を参照してください。

1. [**データストリームの作成**] を選択します。

**ストリームにタグを追加または更新するには**

1. Kinesis Data Streams コンソールを開きます。

1. 左のナビゲーションペインの [**データストリーム**] を選択します。

1. [**データストリーム**] ページで、タグを追加または更新するストリームを選択します。

1. [ストリームの詳細] ページで、[**設定**] タブを選択します。

1. **[タグ]** セクションで、**[タグを管理]** を選択します。

1. [**タグ**] で、次のいずれかを実行します。
   + 新しいタグを追加するには、[**新しいタグを追加**] を選択し、タグの [**キー**] と [**値**] を入力します。このステップを必要な回数繰り返します。

     ストリームあたり追加できるタグの最大数は 50 です。
   + 既存のタグを更新するには、そのタグの [**キー**] の [**値**] フィールドに新しいタグ値を入力します。

   エラーが表示された場合は、指定したタグキーまたは値がタグ制限を満たしていません。詳細については、「[タグの制限を理解する](#tagging-restrictions)」を参照してください。

1. **[Save changes]** (変更の保存) をクリックします。

**ストリームからタグを削除するには**

1. Kinesis Data Streams コンソールを開きます。

1. 左のナビゲーションペインの [**データストリーム**] を選択します。

1. [**データストリーム**] ページで、タグを削除するストリームを選択します。

1. [ストリームの詳細] ページで、[**設定**] タブを選択します。

1. **[タグ]** セクションで、**[タグを管理]** を選択します。

1. 削除するタグの [**キー**] と [**値**] のペアを検出します。次に、**[削除]** を選択します。

1. **[Save changes]** (変更の保存) をクリックします。

## を使用してストリームにタグを付ける AWS CLI
<a name="tagging-cli"></a>

 AWS CLIを使用して、ストリームにタグの追加、一覧表示、および削除できます 例については、次のドキュメントを参照してください。

 [create-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/create-stream.html)   
タグ付きのストリームを作成します。

 [add-tags-to-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/add-tags-to-stream.html)   
指定したストリームのタグを追加または更新します。

 [list-tags-for-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/list-tags-for-stream.html)  
指定したストリームのタグを一覧表示します。

 [remove-tags-from-stream](https://docs.aws.amazon.com/cli/latest/reference/kinesis/remove-tags-from-stream.html)  
指定したストリームからタグを削除します。

## Kinesis Data Streams API を使用したストリームにタグを付ける
<a name="tagging-api"></a>

Kinesis Data Streams API を使用して、ストリームにタグを追加、一覧表示、および削除できます。例については、次のドキュメントを参照してください。

 [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html)   
タグ付きのストリームを作成します。

 [AddTagsToStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_AddTagsToStream.html)   
指定したストリームのタグを追加または更新します。

 [ListTagsForStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListTagsForStream.html)  
指定したストリームのタグを一覧表示します。

 [RemoveTagsFromStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RemoveTagsFromStream.html)  
指定したストリームからタグを削除します。

## を使用してコンシューマーにタグを付ける AWS CLI
<a name="tagging-consumers-cli"></a>

 AWS CLIを使用して、コンシューマーにタグを追加、一覧表示、および削除できます。例については、次のドキュメントを参照してください。

[register-stream-consumer](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/register-stream-consumer.html)  
タグを使用して Kinesis データストリームにコンシューマーを登録します。

[tag-resource](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/tag-resource.html)  
指定された Kinesis リソースのタグを追加または更新します。

[list-tags-for-resource](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/list-tags-for-resource.html)  
指定された Kinesis リソースのタグを一覧表示します。

[untag-resource](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesis/untag-resource.html)  
指定された Kinesis リソースからタグを削除します。

## Kinesis Data Streams API を使用してコンシューマーにタグ付けする
<a name="tagging-consumers-api"></a>

Kinesis Data Streams API を使用してコンシューマーにタグを追加、一覧表示、および削除できます。例については、次のドキュメントを参照してください。

[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)  
タグを使用して Kinesis データストリームにコンシューマーを登録します。

[TagResource](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_TagResource.html)  
指定された Kinesis リソースのタグを追加または更新します。

[ListTagsForResource](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListTagsForResource.html)  
指定された Kinesis リソースのタグを一覧表示します。

[UntagResource](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_UntagResource.html)  
指定された Kinesis リソースからタグを削除します。

# 大きなレコードの処理
<a name="large-records"></a>

Amazon Kinesis Data Streams は、最大 10 メビバイト (MiB) のレコードをサポートします。この機能は、デフォルトの 1 MiB のレコードサイズ制限を超える断続的なデータペイロードを処理する場合に推奨されます。既存および新規に作成されたストリームのデフォルトの最大レコードサイズは 1 MiB に設定されています。

この機能は、断続的に大きなデータペイロードを処理する必要がある IoT アプリケーション、変更データキャプチャ (CDC) パイプライン、機械学習ワークフローに有用です。ストリームで大きなレコードの使用を開始するには、ストリームの最大レコードサイズ制限を更新します。

**重要**  
1 シャードあたりのスループット制限 (書き込み 1 MB/秒、読み取り 2 MB/秒) は、大きなレコードサイズをサポートしても変更されません。Kinesis Data Streams は、1 MiB 以下のレコードをベースとしたトラフィックに加えて、断続的に発生する大きなレコードを処理できるように設計されています。継続的に大量の大きなレコードを取り込む用途向けには設計されていません。

## 大きなレコードを使用するようにストリームを更新する
<a name="update-stream"></a>

**Kinesis Data Streams で大きなレコードを処理するには、次の手順を実行します。**

1. Kinesis Data Streams コンソールに移動します。

1. ストリームを選択し、[**設定**] タブを開きます。

1. [**最大レコードサイズ**] の横にある [**編集**] をクリックします。

1. 最大レコードサイズ (最大 10 MiB) を設定します。

1. 変更内容を保存します。

この設定は、この Kinesis データストリームの最大レコードサイズのみを調整します。この上限を引き上げる前に、すべての下流アプリケーションが大きなレコードを処理できることを確認してください。

 AWS CLI を使用してこの設定を更新することもできます。

```
aws kinesis update-max-record-size \ --stream-arn  \
        --max-record-size-in-ki-b 5000
```

## 大きなレコードでストリームのパフォーマンスを最適化する
<a name="optimizing-performance"></a>

大きなレコードは、全体のトラフィックの 2% 未満に維持することをお勧めします。1 つのストリーム内で、各シャードのスループットキャパシティは毎秒 1 MiB です。Kinesis Data Streams は、大きなレコードに対応するため、一時的に最大 10 MiB までバーストできますが、平均では毎秒 1 MiB に抑えられます。この大きなレコードをサポートするための容量は、ストリーム内で継続的に補充されます。補充の速度は、大きなレコードのサイズとベースラインレコードのサイズによって異なります。最良の結果を得るには、均一に分散されたパーティションキーを使用します。Kinesis のオンデマンドスケーリングの仕組みについて詳しくは、「[オンデマンド標準モードの機能とユースケース](how-do-i-size-a-stream.html#ondemandmode)」を参照してください。

## 大きなレコードによるスロットリングを軽減する
<a name="mitigate-throttling"></a>

**スロットリングを軽減するには**

1. プロデューサーアプリケーションに指数バックオフを伴う再試行ロジックを実装します。

1. 利用可能なシャード全体に大きなレコードを分散させるために、ランダム化されたパーティションキーを使用します。

1. 継続的に大きなレコードをストリーミングする場合は、ペイロードを Amazon S3 に保存し、メタデータ参照のみをストリームに送信します。詳細については、[Processing large records with Amazon Kinesis Data Streams](https://aws.amazon.com/blogs/big-data/processing-large-records-with-amazon-kinesis-data-streams/) を参照してください。

## Kinesis Data Streams API を使用して大きなレコードを処理する
<a name="records-apis"></a>

大きなレコードのサポートでは、新しい API が 1 つ追加され、既存の 2 つのコントロールプレーン API が更新され、最大 10 MiB までのレコードを処理できるようになりました。

レコードサイズを変更するための API:
+ `UpdateMaxRecordSize`: 既存のストリームに対して最大 10 MiB までのレコードサイズ上限を設定します。

既存 API の更新点:
+ `CreateStream`: ストリーム作成時にレコードサイズ上限を設定できるオプションパラメータ `MaxRecordSizeInKiB` が追加されました。
+ `DescribeStreamSummary`: 現在のストリーム設定を表示するための `MaxRecordSizeInKiB` フィールドが返されます。

ここに挙げたすべての API は、既存のストリームとの下位互換性を維持しています。API ドキュメントの詳細については、[Amazon Kinesis Data Streams Service API リファレンス](https://docs.aws.amazon.com/kinesis/latest/APIReference/Welcome.html)を参照してください。

## AWS 大きなレコードと互換性のあるコンポーネント
<a name="record-compatability"></a>

次の AWS コンポーネントは、大きなレコードと互換性があります。


| コンポーネント | 説明 | 
| --- | --- | 
|  AWS SDK | AWS SDK は、大規模なレコードの処理をサポートしています。 AWS SDKs で使用可能なメソッドを使用して、ストリームの最大レコードサイズを最大 10 MiB に更新できます。詳細については、[AWS 「 SDK でこのサービスを使用する](https://docs.aws.amazon.com/streams/latest/dev/sdk-general-information-section.html)」を参照してください。 | 
|  Kinesis Consumer Library (KCL) | バージョン 2.x 以降、KCL は大きなレコードの処理をサポートしています。大きなレコードのサポートを使用するには、ストリームの `maxRecordSize` を更新し、KCL を使用します。詳細については、[Use Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/kcl.html) を参照してください。 | 
|  Kinesis Producer Library (KPL) | バージョン 1.0.5 以降、KPL は大きなレコードの処理をサポートしています。大きなレコードのサポートを使用するには、ストリームの maxRecordSize を更新し、KPL を使用します。詳細については、[Develop producers using the Amazon Kinesis Producer Library (KPL)](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) を参照してください。 | 
|  Amazon EMR | Apache Spark を使用した Amazon EMR は、Kinesis Data Streams の上限 (10 MiB) までの大きなレコードの処理をサポートしています。大きなレコードのサポートを使用するには、`readStream` 関数を使用します。詳細については、[Amazon EMR and Amazon Kinesis integration](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-kinesis.html) を参照してください。 | 
|  Amazon Data Firehose | Kinesis Data Streams と併用する場合、Amazon Data Firehose の大きなレコードの動作は配信先によって異なります。 [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/large-records.html) 大きなレコードを Snowflake または Redshift に配信する必要がある場合は、まず Amazon S3 にデータを配信します。その後、抽出、変換、ロード (ETL) プロセスを使用してデータをロードします。その他の配信先については、本番環境にスケールする前に概念実証 (PoC) 環境で大きなレコードの動作を確認してください。大きなレコードの処理方法は配信先によって異なります。  | 
|  AWS Lambda | AWS Lambda は最大 6 MiBs のペイロードをサポートします。この上限には、base64 エンコードに変換された Kinesis ペイロードと、イベントソースマッピング (ESM) に関連付けられたメタデータが含まれます。6 MiB 未満のレコードは、追加設定なしで ESM を使用して処理されます。6 MiB を超えるレコードは、Lambda が失敗時の送信先を使用して処理します。処理上限を超えるレコードを扱うには、ESM を使用して失敗時の送信先を設定する必要があります。失敗時の送信先に送信される各イベントは、失敗した呼び出しに関するメタデータを含む JSON ドキュメントです。 レコードサイズにかかわらず、ESM 内で失敗時の送信先を作成することをお勧めします。これにより、いかなるレコードも破棄されないことが保証されます。詳細については、「[失敗した呼び出しの送信先の設定](https://docs.aws.amazon.com/lambda/latest/dg/kinesis-on-failure-destination.html#kinesis-on-failure-destination-console)」を参照してください。 | 
|  Amazon Redshift | Amazon Redshift は、Kinesis Data Streams からデータをストリーミングする場合、1 MiB 未満のレコードサイズのみをサポートしています。この上限を超えるレコードは処理されません。処理されなかったレコードは `sys_stream_scan_errors` としてログに記録されます。詳細については、[SYS\$1STREAM\$1SCAN\$1ERRORS](https://docs.aws.amazon.com/redshift/latest/dg/r_SYS_STREAM_SCAN_ERRORS.html) を参照してください。 | 
|  Flink connector for Kinesis Data Streams | Kinesis Data Streams からデータを取り込む方法には、Kinesis ソースコネクタ と Kinesis シンクコネクタ の 2 つのアプローチがあります。ソースコネクタは 1 MiB 未満から最大 10 MiB までのレコードの処理をサポートしています。1 MiB を超えるレコードにはシンクコネクタを使用しないでください。詳細については、[Use connectors to move data in Amazon Managed Service for Apache Flink with the DataStream API](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html) を参照してください。 | 

## 大きなレコードがサポートされるリージョン
<a name="supported-regions"></a>

この Amazon Kinesis Data Streams 機能は、次の AWS リージョンでのみ使用できます。


| AWS リージョン | リージョン名 | 
| --- | --- | 
|  eu-north-1 | 欧州 (ストックホルム) | 
|  me-south-1 | 中東 (バーレーン) | 
|  ap-south-1 | アジアパシフィック (ムンバイ) | 
|  eu-west-3 | 欧州 (パリ) | 
|  ap-southeast-3 | アジアパシフィック (ジャカルタ) | 
|  us-east-2 | 米国東部 (オハイオ) | 
|  af-south-1 | アフリカ (ケープタウン) | 
|  eu-west-1 | 欧州 (アイルランド) | 
|  me-central-1 | 中東 (アラブ首長国連邦) | 
|  eu-central-1 | 欧州 (フランクフルト) | 
|  sa-east-1 | 南米 (サンパウロ) | 
|  ap-east-1 | アジアパシフィック (香港) | 
|  ap-south-2 | アジアパシフィック (ハイデラバード) | 
|  us–east–1 | 米国東部 (バージニア北部) | 
|  ap-northeast-2 | アジアパシフィック (ソウル) | 
|  ap-northeast-3 | アジアパシフィック (大阪) | 
|  eu-west-2 | 欧州 (ロンドン) | 
|  ap-southeast-4 | アジアパシフィック (メルボルン) | 
|  ap-northeast-1 | アジアパシフィック (東京) | 
|  us-west-2 | 米国西部 (オレゴン) | 
|  us-west-1 | 米国西部 (北カリフォルニア) | 
|  ap-southeast-1 | アジアパシフィック (シンガポール) | 
|  ap-southeast-2 | アジアパシフィック (シドニー) | 
|  il-central-1 | イスラエル (テルアビブ) | 
|  ca-central-1 | カナダ (中部) | 
|  ca-west-1 | カナダ西部 (カルガリー) | 
|  eu-south-2 | 欧州 (スペイン) | 
|  cn-northwest-1 | 中国 (寧夏) | 
|  eu-central-2 | 欧州 (チューリッヒ) | 
| us-gov-east-1 | AWS GovCloud (米国東部) | 
| us-gov-west-1 | AWS GovCloud (米国西部) | 

# でレジリエンステストを実行する AWS Fault Injection Service
<a name="kinesis-fis"></a>

AWS Fault Injection Service は、 AWS ワークロードでフォールトインジェクション実験を実行するのに役立つフルマネージドサービスです。Amazon Kinesis Data Streams と AWS FIS の統合により、制御された環境で一般的な Amazon Kinesis Data Streams API エラーに対するアプリケーションの耐障害性をテストできます。この機能により、障害が発生する前にエラー処理や再試行ロジックを検証し、システムをモニタリングすることができます。詳細については、[「 とは AWS Fault Injection Service](https://docs.aws.amazon.com/fis/latest/userguide/what-is.html)」を参照してください。

**アクション**
+ API 内部エラー: ターゲットの IAM ロールからのリクエストに内部エラーを挿入します。特定のレスポンスは、各サービスと API によって異なります。アクション `aws:fis:inject-api-internal-error` により、`InternalFailure` エラー (HTTP 500) が発生します。
+ API スロットルエラー: ターゲットの IAM ロールからのリクエストに内部エラーを挿入します。特定のレスポンスは、各サービスと API によって異なります。アクション `aws:fis:inject-api-throttle-error` により、`ThrottlingException` エラー (HTTP 400) が発生します。
+ API 使用不可エラー: ターゲットの IAM ロールからのリクエストに内部エラーを挿入します。特定のレスポンスは、各サービスと API によって異なります。アクション `aws:fis:inject-api-unavailable-error` により、`ServiceUnavailable` エラー (HTTP 503) が発生します。
+ API プロビジョンドスループット例外: ターゲットの IAM ロールからのリクエストに内部エラーを挿入します。特定のレスポンスは、各サービスと API によって異なります。アクション `aws:kinesis:inject-api-provisioned-throughput-exception` により、`ProvisionedThroughputExceededException` エラー (HTTP 400) が発生します。
+ API 期限切れイテレーター例外: ターゲットの IAM ロールからのリクエストに内部エラーを挿入します。特定のレスポンスは、各サービスと API によって異なります。アクション `aws:kinesis:inject-api-expired-iterator-exception` により、`ExpiredIteratorException` エラー (HTTP 400) が発生します。

詳細については、[Amazon Kinesis Data Streams actions](https://docs.aws.amazon.com/fis/latest/userguide/fis-actions-reference.html#aws-kinesis-actions) を参照してください。

**考慮事項**
+ 上記のアクションは、Amazon Kinesis Data Streams のプロビジョンド型およびオンデマンド型の両方のオファリングで使用できます。
+ 選択した期間に基づいて実験が完了すると、ストリーミングが再開されます。実行中の実験は、完了する前に停止することもできます。または、Amazon CloudWatch Application Insights でアプリケーションの状態を定義するアラームに基づいて実験を停止するための停止条件を定義することもできます。
+ 最大 280 個のストリームをテストできます。

リージョンのサポートについて詳しくは、[AWS Fault Injection Service endpoints and quotas](https://docs.aws.amazon.com/general/latest/gr/fis.html) を参照してください。

# プロビジョンドスループット例外エラー
<a name="kinesis-fis-provisioned-throughput"></a>

Kinesis ストリームに対するリクエストレートが 1 つ以上のシャードのスループット制限を超えた場合、プロビジョンドスループット超過例外エラー (HTTP 400) が発生します。各シャードには読み取りおよび書き込みの容量制限があり、その制限を超えるとこの例外が発生します。この例外が発生する主なシナリオには、データインジェストまたは消費の急増、処理中のデータ量に対して不十分なシャード容量、またはパーティションキーの不均等な分散などがあります。

**例外の処理に関する推奨事項**
+ 指数バックオフと再試行メカニズムを実装します。
+ スループットの増加に対応できるように、シャード数を増やします。
+ パーティションキーが適切に分散されていることを確認します。
+ ストリームメトリクスをモニタリングします。

さらに、Kinesis のオンデマンドキャパシティモードを使用すると、ワークロードが自動的に調整され、この例外の発生を最小限に抑えることができます。詳細については、「[What is AWS Fault Injection Service?](https://docs.aws.amazon.com/fis/latest/userguide/what-is.html)」を参照してください。

**注記**  
不適切な分散による問題は、オンデマンドモードの自動スケーリング機能の対象外です。

**基本的な実験を実行するには**

1. ベースラインメトリクスを使用する: テストを実施する前に、通常時のスループットパターンを記録します。

1. 実験を作成する: `aws:kinesis:inject-api-provisioned-throughput-exception` アクションを使用します。

1. 強度を設定する: まずはリクエストスロットリングを 25% に設定します。

1. レスポンスをモニタリングする: 指数バックオフを使用して再試行ロジックを検証します。

1. スケーリングを検証する: 自動スケーリングによってアクティベーションがトリガーされることを確認します。

1. アラームを確認する: `CloudWatch` アラームが想定どおりに動作していることを確認します。

アプリケーションは、適切なバックオフ戦略を実装し、`WriteProvisionedThroughputExceeded` メトリクスおよび `ReadProvisionedThroughputExceeded` メトリクスをモニタリングし、必要に応じてシャードスケーリングをトリガーする必要があります。

**アクションの詳細**
+ **リソースタイプ**: IAM ロール ARN
+ **ターゲットオペレーション**: `PutRecord`、`PutRecords`、`GetRecords`
+ ****エラーコード****: `ProvisionedThroughputExceededException` (HTTP 400)
+ ****説明****: リクエストレートがシャードの容量制限を超えるシナリオをシミュレートし、アプリケーションのスロットリングおよびスケーリング応答をテストします。

**パラメータ**
+ **IAM ロール ARN**: アプリケーションが Kinesis Data Streams オペレーションに使用するロール。
+ **オペレーション**: ターゲットオペレーション: `PutRecord`、`PutRecords`、`GetRecords`。
+ **リソースリスト**: 特定のストリーム名またはシャード識別子。
+ **期間**: 実験期間。1 分から 12 時間です。 AWS FIS API では、値は ISO 8601 形式の文字列です。例えば、PT1M は 1 分を表します。 AWS FIS コンソールで、秒数、分数、または時間数を入力します。
+ **強度: **スロットリングするリクエストの割合。

**必要なアクセス許可**
+ `kinesis:InjectApiError`

実験テンプレートの例

 次の例は、指定されたタグを持つ最大 5 つの Kinesis Data ストリームに対するプロビジョニングされたスループット例外を示しています。 は、ランダムに影響するストリーム AWS FIS を選択します。5 分後に障害が除去されます。

```
{
    "description": "Kinesis stream experiment",
    "targets": {
        "KinesisStreams-Target-1": {
            "resourceType": "aws:kinesis:stream",
            "resourceTags": {
                   "tag-key": "tag-value"
            },
            "selectionMode": "COUNT(5)"
        }
    },
    "actions": {
         "kinesis": {
              "actionId": "aws:kinesis:stream-provisioned-throughput-exception",
              "description": "my-stream",
              "parameters": {
                   "duration": "PT5M",
                   "percentage": "100",
                   "service": "kinesis"
              },
              "targets": {
                    "KinesisStreams": "KinesisStreams-Target-1"
              }
         }
   },
   "stopConditions": [
         {
              "source": "none"
         }
   ],
   "roleArn": "arn:aws:iam::111122223333:role/role-name",
   "tags": {},
   "experimentOptions": {
       "accountTargeting": "single-account",
       "emptyTargetResolutionMode": "fail"
   }    
}
```

実験ロールのアクセス許可の例

次のアクセス許可により、リクエストの 50% に影響する特定のストリームで `aws:kinesis:stream-provisioned-throughput-exception` および `aws:kinesis:stream-expired-iterator-exception` のアクションを実行できます。

# 期限切れイテレーター例外エラー
<a name="kinesis-fis-expired-iterator"></a>

 期限切れイテレーター例外エラー (HTTP 400) は、シャードイテレーターが有効期限切れとなり、`GetRecords` の呼び出し時にストリームレコードの取得に使用できなくなった場合に発生します。これは、長時間実行されるデータ処理タスク、ネットワークの問題、またはアプリケーションのダウンタイムによって読み取り処理の間に遅延が発生した場合に起こります。

**注記**  
シャードイテレーターは、発行されてから 5 分間有効です。

**例外の処理に関する推奨事項**
+ シャードイテレーターが期限切れになる前に更新します。
+ 新しいイテレーターを取得するためのエラー処理を組み込みます。
+ シャードイテレーターの有効期限を自動的に管理する Kinesis Client Library (KCL) を使用します。

詳細については、[「 とは」を参照してください AWS Fault Injection Service。](https://docs.aws.amazon.com/fis/latest/userguide/what-is.html)

**基本的な実験を実行するには**

1. 実験テンプレートを作成する: AWS FIS コンソールを使用します。

1. アクションを選択する: `aws:kinesis:inject-api-expired-iterator-exception` アクションを使用します。

1. ターゲットを設定する: IAM ロールと Kinesis Data Streams オペレーションを指定します。

1. 期間を設定する: 初回テストでは 5〜10 分から開始します。

1. 停止条件の追加: [の停止条件 AWS FIS](https://docs.aws.amazon.com/fis/latest/userguide/stop-conditions.html)。

1. 実験を実行する: アプリケーションの動作をモニタリングします。

**アクションの詳細**
+ **リソースタイプ**: IAM ロール ARN
+ **ターゲットオペレーション**: `GetRecords`
+ ****エラーコード****: `ExpiredIteratorException`(HTTP 400)
+ ****説明****: 指定されたイテレーターが許可されている最大の有効期間を超えた場合に発生し、レコード処理が遅すぎる、またはチェックポイント処理ロジックが失敗するシナリオをシミュレートします。

**パラメータ**
+ **IAM ロール ARN**: アプリケーションが Kinesis Data Streams オペレーションに使用するロール。
+ **オペレーション**: ターゲットオペレーション: `GetRecords`
+ **リソースリスト**: 特定のストリーム名または ARN。
+ **期間**: 実験期間。これは設定可能です。
+ **強度: **スロットリングするリクエストの割合。

**必要なアクセス許可**
+ `kinesis:InjectApiError`