

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Kinesis Data Streams にデータを書き込む
<a name="building-producers"></a>

*プロデューサー*は、Amazon Kinesis Data Streams にデータを書き込むアプリケーションです。 AWS SDK for Java と Kinesis Producer Library (KPL) を使用して、Kinesis Data Streams のプロデューサーを構築できます。

Kinesis Data Streams を初めて利用する場合は、[Amazon Kinesis Data Streams とは](introduction.md)および[AWS CLI を使用して Amazon Kinesis Data Streams オペレーションを実行する](getting-started.md)で説明されている概念と用語について理解することから始めてください。

**重要**  
Kinesis Data Streams は、データストリームのデータレコードの保持期間の変更をサポートしています。詳細については、[データ保持期間を変更する](kinesis-extended-retention.md)を参照してください。

ストリームにデータを送信するには、ストリームの名前、パーティションキー、ストリームに追加するデータ BLOB を指定する必要があります。パーティションキーは、データレコードが追加されるストリーム内のシャードを決定するために使用されます。

シャード内のすべてのデータは、そのシャードを処理する同じワーカーに送信されます。使用するパーティションキーはアプリケーションのロジックによって異なります。パーティションキーの数は、通常、シャードカウントよりかなり大きくする必要があります。これは、データレコードを特定のシャードにマッピングする方法を決定するために、パーティションキーが使用されるからです。十分なパーティションキーがある場合、ストリーム内のシャードに均等にデータを分散することができます。

**Topics**
+ [Amazon Kinesis Producer Library (KPL) を使用してプロデューサーを開発する](developing-producers-with-kpl.md)
+ [で Amazon Kinesis Data Streams API を使用してプロデューサーを開発する AWS SDK for Java](developing-producers-with-sdk.md)
+ [Kinesis エージェントを使用して Amazon Kinesis Data Streams に書き込む](writing-with-agents.md)
+ [他の AWS サービスを使用して Kinesis Data Streams に書き込む](using-other-services.md)
+ [サードパーティーの統合を使用して Kinesis Data Streams に書き込む](using-other-services-third-party.md)
+ [Amazon Kinesis Data Streams プロデューサーのトラブルシューティング](troubleshooting-producers.md)
+ [Kinesis Data Streams プロデューサーを最適化する](advanced-producers.md)

# Amazon Kinesis Producer Library (KPL) を使用してプロデューサーを開発する
<a name="developing-producers-with-kpl"></a>

Amazon Kinesis Data Streams プロデューサーは、ユーザーデータレコードを Kinesis data stream に配置する (*データの取り込み*とも呼ばれます) アプリケーションです。Amazon Kinesis Producer Library (KPL) を使用すると、プロデューサーアプリケーションの開発が簡素化され、デベロッパーは Kinesis Data Streams に対する優れた書き込みスループットを実現できます。

Amazon CloudWatch で KPL をモニタリングできます。詳細については、「[Amazon CloudWatch を使用した Kinesis Client Library を監視する](monitoring-with-kpl.md)」を参照してください。

**Topics**
+ [KPL のロールを確認する](#developing-producers-with-kpl-role)
+ [KPL を使用するメリットを実現する](#developing-producers-with-kpl-advantage)
+ [KPL を使用しないタイミングを理解する](#developing-producers-with-kpl-when)
+ [KPL をインストールする](kinesis-kpl-dl-install.md)
+ [KPL 0.x から KPL 1.x に移行する](kpl-migration-1x.md)
+ [KPL の Amazon Trust Services (ATS) 証明書への移行](kinesis-kpl-upgrades.md)
+ [KPL でサポートされるプラットフォーム](kinesis-kpl-supported-plats.md)
+ [KPL の主要な概念](kinesis-kpl-concepts.md)
+ [KPL をプロデューサーコードと統合する](kinesis-kpl-integration.md)
+ [KPL を使用して Kinesis Data Stream に書き込む](kinesis-kpl-writing.md)
+ [Amazon Kinesis Producer Library を設定する](kinesis-kpl-config.md)
+ [コンシューマーの集約解除を実装する](kinesis-kpl-consumer-deaggregation.md)
+ [Amazon Data Firehose で KPL を使用する](kpl-with-firehose.md)
+ [Schema Registry で KPL AWS Glue を使用する](kpl-with-schemaregistry.md)
+ [KPL プロキシ設定を設定する](kpl-proxy-configuration.md)
+ [KPL バージョンライフサイクルポリシー](kpl-version-lifecycle-policy.md)

**注記**  
KPL は、最新バージョンにアップグレードすることが推奨されます。KPL は新しいリリースに伴って定期的に更新されています。これには、最新の依存関係パッチ、セキュリティパッチ、バグ修正、および下位互換性のある新機能が含まれます。詳細については、[https://github.com/awslabs/amazon-kinesis-producer/releases/](https://github.com/awslabs/amazon-kinesis-producer/releases/) を参照してください。

## KPL のロールを確認する
<a name="developing-producers-with-kpl-role"></a>

KPL は使いやすく、Kinesis Data Streams への書き込みに役立つ、高度な設定可能ライブラリです。これは、プロデューサーアプリケーションのコードと Kinesis Data Streams API アクション間の仲介として機能します。KPL は次の主要なタスクを実行します。
+ 自動的で設定可能な再試行メカニズムにより 1 つ以上の Kinesis Data Streams へ書き込む
+ レコードを収集し、`PutRecords` を使用して、リクエストごとに複数シャードへ複数レコードを書き込む
+ ユーザーレコードを集約し、ペイロードサイズを増加させ、スループットを改善する
+ コンシューマーで [Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) (KCL) とシームレスに統合して、バッチ処理されたレコードを集約解除する
+ Amazon CloudWatch メトリクスをユーザーに代わって送信し、プロデューサーのパフォーマンスを確認可能にする

KPL は [AWS SDK](https://aws.amazon.com/tools/) で使用できる Kinesis Data Streams API とは異なることに注意してください。Kinesis Data Streams API では Kinesis Data Streams の多くの機能 (ストリームの作成、リシャーディング、レコードの入力と取得など) を管理できます。KPL はデータの取り込みに特化した抽象化レイヤーを提供します。Kinesis Data Streams API の詳細については、[Amazon Kinesis API リファレンス](https://docs.aws.amazon.com/kinesis/latest/APIReference/)を参照してください。

## KPL を使用するメリットを実現する
<a name="developing-producers-with-kpl-advantage"></a>

Kinesis Data Streams プロデューサーの開発に KPL を使用する主な利点を以下に示します。

KPL は、同期または非同期のユースケースで使用できます。同期動作を使用する特別な理由がないかぎり、非同期インターフェイスの優れたパフォーマンスを使用することを推奨します。これら 2 つのユースケースの詳細とコード例については、[KPL を使用して Kinesis Data Stream に書き込む](kinesis-kpl-writing.md)を参照してください。

 **パフォーマンスのメリット**   
KPL は、高性能のプロデューサーの構築に役立ちます。Amazon EC2 インスタンスをプロキシとして使用し、100 バイトのイベントを数百または数千の低電力デバイスから収集して、レコードを Kinesis Data Streams に書き込む場合を考えてみます。これらの EC2 インスタンスはそれぞれ、毎秒数千イベントをデータストリームに書き込む必要があります。必要なスループットを実現するには、お客様の側で、再試行ロジックとレコード集約解除に加え、バッチ処理やマルチスレッドなどの複雑なロジックをプロデューサーに実装する必要があります。KPL が、これらのタスクをすべて実行します。

 **コンシューマー側の使いやすさ**   
コンシューマー側のデベロッパーが Java で KCL を使用する場合、追加作業なしで KPL が統合されます。KCL で、複数の KPL ユーザーレコードで構成されている集約された Kinesis Data Streams レコードを取得するときは、自動的に KPL が呼び出され、個々のユーザーレコードが抽出され、ユーザーに返されます。  
KCL を使用せずに API オペレーション `GetRecords` を直接使用するコンシューマー側のデベロッパーの場合、KPL Java ライブラリを使用して個々のユーザーレコードを抽出して、これらのレコードをユーザーに返すことができます。

 **プロデューサーのモニタリング**   
Amazon CloudWatch と KPL を使用して、Kinesis Data Streams プロデューサーを収集、モニタリング、分析できます。KPL は、スループット、エラー、およびその他のメトリクスをユーザーに代わって CloudWatch に送信し、ストリーム、シャード、またはプロデューサーレベルでモニタリングするように設定できます。

 **非同期アーキテクチャ**   
KPL は、レコードを Kinesis Data Streams に送信する前にそれらのレコードをバッファ処理する場合があるため、ランタイムを続行する前にレコードがサーバーに到着したことを確認するために、発信者アプリケーションを強制的にブロックし待機させることはしません。レコードを KPL に配置する呼び出しは、必ずすぐに処理が戻り、レコードの送信やサーバーからの応答の受信を待ちません。代わりに、レコードを Kinesis Data Streams に送信した結果を後で受信するための `Future` オブジェクトが作成されます。これは AWS SDK の非同期クライアントと同じ動作です。

## KPL を使用しないタイミングを理解する
<a name="developing-producers-with-kpl-when"></a>

KPL では、ライブラリ内で最大 `RecordMaxBufferedTime` まで追加の処理遅延が生じる場合があります (ユーザーが設定可能)。`RecordMaxBufferedTime` の値が大きいほど、パッキング効率とパフォーマンスが向上します。この追加的な遅延を許容できないアプリケーションは、 AWS SDK を直接使用することが必要になる場合があります。Kinesis Data Streams で AWS SDK を使用する方法の詳細については、「」を参照してください[で Amazon Kinesis Data Streams API を使用してプロデューサーを開発する AWS SDK for Java](developing-producers-with-sdk.md)。`RecordMaxBufferedTime` やその他のユーザー設定可能な KPLのプロパティの詳細については、[Amazon Kinesis Producer Library を設定する](kinesis-kpl-config.md) を参照してください。

# KPL をインストールする
<a name="kinesis-kpl-dl-install"></a>

Amazon では、macOS、Windows、最新の Linux ディストリビューション向けに C\$1\$1 Amazon Kinesis Producer Library (KPL) のビルド済みバイナリを提供しています (サポートされているプラッフォームの詳細については、次のセクションを参照してください)。これらのバイナリは、Java の .jar ファイルの一部としてパッケージ化されており、Maven を使用してパッケージをインストールする場合、自動的に呼び出され、使用されます。KPL と KCL の最新バージョンを確認するには、次の Maven 検索リンクをご利用ください。
+ [KPL](https://search.maven.org/#search|ga|1|amazon-kinesis-producer)
+ [KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client)

Linux のバイナリは、GNU コンパイラコレクション (GCC) でコンパイルされ、Linux の libstdc\$1\$1 に静的にリンクされています。これらのバイナリは、glibc バージョン 2.5 以降を含むすべての 64 ビット Linux ディストリビューションで動作することが推定されています。

以前のバージョンの Linux ディストリビューションのユーザーは、GitHub のソースとともに提供されるビルド手順で KPL をビルドできます。KPL を GitHub からダウンロードするには、[Amazon Kinesis Producer Library](https://github.com/awslabs/amazon-kinesis-producer) を参照してください。

**重要**  
Amazon Kinesis Producer Library (KPL) 0.x は、2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 0.x を使用して KPL アプリケーションを最新の KPL バージョンに移行することを**強くお勧めします**。最新の KPL バージョンについては、[Github の KPL ページ](https://github.com/awslabs/amazon-kinesis-producer)を参照してください。KPL 0.x から KPL 1.x への移行については、[KPL 0.x から KPL 1.x に移行する](kpl-migration-1x.md) を参照してください。

# KPL 0.x から KPL 1.x に移行する
<a name="kpl-migration-1x"></a>

このトピックでは、コンシューマーを KPL 0.x から KPL 1.x に移行する方法について、段階的に説明します。KPL 1.x では、以前のバージョンとのインターフェイス互換性を維持しながら、 AWS SDK for Java 2.x のサポートが導入されています。KPL 1.x へ移行する際、データ処理の中核となるロジックを更新する必要はありません。

1. **次の前提条件を満たしていることを確認する**
   + Java Development Kit (JDK) 8 以降
   + AWS SDK for Java 2.x
   + 依存関係管理用の Maven または Gradle

1. **依存関係を追加する**

   Maven を使用している場合は、以下の依存関係を pom.xml ファイルに追加します。必ず groupId を `com.amazonaws` から `software.amazon.kinesis` に更新し、バージョンを `1.x.x` から最新の KPL バージョンに更新してください。

   ```
   <dependency>
       <groupId>software.amazon.kinesis</groupId>
       <artifactId>amazon-kinesis-producer</artifactId>
       <version>1.x.x</version> <!-- Use the latest version -->
   </dependency>
   ```

   Gradle を使用している場合は、以下を `build.gradle` ファイルに追加します。必ず `1.x.x` を最新の KPCL バージョンに置き換えてください。

   ```
   implementation 'software.amazon.kinesis:amazon-kinesis-producer:1.x.x'
   ```

   最新の KPL バージョンは、[Maven Central Repository](https://central.sonatype.com/search?q=amazon-kinesis-producer) で確認できます。

1. **KPL の import ステートメントを更新する**

   KPL 1.x は AWS SDK for Java 2.x を使用し、 で始まる更新されたパッケージ名を使用します。これは`software.amazon.kinesis`、 で始まる以前の KPL のパッケージ名とは異なります`com.amazonaws.services.kinesis`。

   `com.amazonaws.services.kinesis` の import ステートメントを `software.amazon.kinesis` に置き換えます。次の表に、置き換える必要がある import ステートメントを示します。  
**Import の置き換え**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/kpl-migration-1x.html)

1. ** AWS 認証情報プロバイダークラスのインポートステートメントを更新する**

   KPL 1.x に移行するときは、 AWS SDK for Java 1.x に基づく KPL アプリケーションコードのインポートのパッケージとクラスを、 AWS SDK for Java 2.x に基づく対応するパッケージとクラスに更新する必要があります。KPL アプリケーションでよく使用される import ステートメントは、認証情報プロバイダークラスにあります。[認証情報プロバイダーの変更の完全なリストについては、2.x 移行ガイドのドキュメントの](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html)「認証情報プロバイダーの変更」を参照してください。 AWS SDK for Java 以下は、KPL アプリケーションで一般的に必要となる import ステートメントの変更例です。

   **KPL 0.x の import ステートメント**

   ```
   import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
   ```

   **KPL 1.x の import ステートメント**

   ```
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   ```

    AWS SDK for Java 1.x に基づいて他の認証情報プロバイダーをインポートする場合は、それらを AWS SDK for Java 2.x に相当するものに更新する必要があります。 AWS SDK for Java 1.x からクラス/パッケージをインポートしなかった場合は、このステップを無視できます。

1. **KPL 設定内の認証情報プロバイダー設定を更新する**

   KPL 1.x の認証情報プロバイダー設定には、 AWS SDK for Java 2.x の認証情報プロバイダーが必要です。デフォルトの認証情報プロバイダーを上書き`KinesisProducerConfiguration`して の AWS SDK for Java 1.x の認証情報プロバイダーを渡す場合は、 AWS SDK for Java 2.x 認証情報プロバイダーで更新する必要があります。[認証情報プロバイダーの変更の一覧については、2.x 移行ガイドのドキュメントの](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html)「認証情報プロバイダーの変更」を参照してください。 AWS SDK for Java KPL 設定でデフォルトの認証情報プロバイダーを上書きしていない場合は、この手順は無視できます。

   たとえば、KPL のデフォルト認証情報プロバイダーを次のコードで上書きしている場合です。

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // SDK v1 default credentials provider
   config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
   ```

    AWS SDK for Java 2.x の認証情報プロバイダーを使用するためには、これらを次のコードに更新する必要があります。

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // New SDK v2 default credentials provider
   config.setCredentialsProvider(DefaultCredentialsProvider.create());
   ```

# KPL の Amazon Trust Services (ATS) 証明書への移行
<a name="kinesis-kpl-upgrades"></a>

2018 年 2 月 9 日の午前 9:00 (太平洋標準時) に、Amazon Kinesis Data Streams は ATS 証明書をインストールしました。Kinesis Producer Library (KPL) を使用して、Amazon Kinesis Data Streams にレコードを継続して書き込むには、KPL のインストールを[バージョン 0.12.6](http://search.maven.org/#artifactdetails|com.amazonaws|amazon-kinesis-producer|0.12.6|jar) 以降にアップグレードする必要があります。この変更は、すべての AWS リージョンに影響します。

ATS への移行の詳細については、[AWS「独自の認証機関への移行を準備する方法](https://aws.amazon.com/blogs/security/how-to-prepare-for-aws-move-to-its-own-certificate-authority/)」を参照してください。

問題が発生し、技術サポートが必要な場合は、 AWS サポートセンターで[サポートケースを作成](https://console.aws.amazon.com/support/v1#/case/create)してください。

# KPL でサポートされるプラットフォーム
<a name="kinesis-kpl-supported-plats"></a>

Amazon Kinesis Producer Library (KPL) は、C\$1\$1 で書かれており、メインユーザープロセスの子プロセスとして実行されます。プリコンパイルされている 64 ビットのネイティブバイナリは、Java ベースにバンドルされており、Java wrapper によって管理されます。

次のオペレーティングシステムでは、追加ライブラリをインストールすることなく Java のパッケージを実行できます。
+ カーネルバージョン 2.6.18 (2006 年 9 月) の Linux ディストリビューション以降
+ Apple iOS X 10.9 以降
+ Windows Server 2008 以降
**重要**  
Windows Server 2008 以降は、バージョン 0.14.0 までのすべての KPL バージョンでサポートされています。  
Windows プラットフォームは、KPL バージョン 0.14.0 以降ではサポートされていません。

KPL は、64 ビット版のみであることに注意してください。

## ソースコード
<a name="kinesis-kpl-supported-plats-source-code"></a>

KPL のインストールで提供されるバイナリがお客様の環境に適さない場合は、KPL のコアが C\$1\$1 のモジュールとして書き込まれます。C\$1\$1 モジュールと Java インターフェイスのソースコードは、Amazon パブリックライセンスの下で公開され、GitHub の [Amazon Kinesis Producer Library](https://github.com/awslabs/amazon-kinesis-producer) で入手できます。KPL は、最近の規格に準拠した C\$1\$1 コンパイラと JRE を使用できるすべてのプラットフォームで使用できますが、Amazon では、サポートされるプラットフォームの一覧にないプラットフォームを正式にはサポートしません。

# KPL の主要な概念
<a name="kinesis-kpl-concepts"></a>

以下のセクションでは、Amazon Kinesis Producer Library (KPL) を理解し、その利点を引き出すために必要な概念と用語について説明します。

**Topics**
+ [レコード](#kinesis-kpl-concepts-records)
+ [バッチ処理](#kinesis-kpl-concepts-batching)
+ [集計](#kinesis-kpl-concepts-aggretation)
+ [収集](#kinesis-kpl-concepts-collection)

## レコード
<a name="kinesis-kpl-concepts-records"></a>

このガイドでは、*KPL ユーザーレコード*と *Kinesis Data Streams レコード*を区別します。修飾語を付けずに*レコード*という用語を使用する場合は、*KPL ユーザーレコード*を意味します。Kinesis Data Streams レコードを意味するときは、明示的に *Kinesis Data Streams レコード*と表現します。

KPL ユーザーレコードは、ユーザーにとって特定の意味のあるデータの BLOB です。たとえば、ウェブサイトの UI イベントまたはウェブサーバーのログエントリを表す JSON BLOB がそれに該当します。

Kinesis Data Streams レコードは、Kinesis Data Streams サービス API で定義される`Record` データ構造のインスタンスです。これには、パーティションキー、シーケンス番号、データの BLOB が含まれています。

## バッチ処理
<a name="kinesis-kpl-concepts-batching"></a>

*バッチ処理*は、各項目に対して単一のアクションを繰り返し実行する代わりに、複数の項目に対してそのアクションを実行することを意味します。

ここでは、項目はレコードに対応し、アクションはレコードを Kinesis Data Streams に送信することに対応します。バッチ処理を使用しない場合、各レコードを別々の Kinesis Data Streams レコードに配置し、それぞれを Kinesis Data Streams に送信するたびに HTTP リクエストを実行します。バッチ処理では、各 HTTP リクエストにより、1 つではなく複数のレコードを処理できます。

KPL では、2 種類のバッチ処理がサポートされます。
+ *集約* - 複数のレコードを単一の Kinesis Data Streams レコードに保存します。
+ *収集* - API オペレーション `PutRecords` を使用して、Kinesis Data Streams 内の 1 つ以上のシャードに複数の Kinesis Data Streams レコードを送信します。

2 種類の KPL バッチ処理は、共存できるように設計されており、互いに独立して有効または無効にできます。デフォルトでは、どちらも有効です。

## 集計
<a name="kinesis-kpl-concepts-aggretation"></a>

*集約*は、複数レコードを 1 つの Kinesis Data Streams レコードに保存することを意味します。集約を使用すると、API コールごとに送信されるレコード数を増やすことができ、効率的にプロデューサーのスループットを高めることができます。

Kinesis Data Streams シャードは、1 秒あたり最大で 1,000 レコードまたは 1 MB のスループットをサポートします。1 秒あたりの Kinesis Data Streams レコードの制限により、お客様のレコードは 1 KB 未満に制限されます。レコードの集約を使用すると、複数のレコードを単一の Kinesis Data Streams レコードに結合できます。そのため、お客様はシャードあたりのスループットを改善することができます。

リージョンが us-east-1 の 1 つのシャードで、1 つが 512 バイトのレコードを 1 秒あたり 1,000 レコードの一定割合で処理する場合を考えます。KPL 集約を使用すると、1,000 レコードを 10 Kinesis Data Streams レコードに詰めることができ、RPS を 10 に減らすことができます (それぞれ 50 KB)。

## 収集
<a name="kinesis-kpl-concepts-collection"></a>

*収集*は、各 Kinesis Data Streams レコードをそれぞれの HTTP リクエストで送信するのではなく、複数の Kinesis Data Streams レコードをバッチ処理し、API オペレーション `PutRecords` を呼び出して単一の HTTP リクエストでそれらを送信することを意味します。

これにより、個別の HTTP リクエストを多数実行するオーバーヘッドが減るため、収集を使用しない場合に比べスループットが向上します。実際、`PutRecords` 自体が、この目的のために設計されています。

収集は、Kinesis Data Streams レコードのグループを使用している点で集約と異なります。収集された Kinesis Data Streams レコードには、ユーザーの複数のレコードをさらに含めることができます。この関係は、次のように図示できます。

```
record 0 --|
record 1   |        [ Aggregation ]
    ...    |--> Amazon Kinesis record 0 --|
    ...    |                              |
record A --|                              |
                                          |
    ...                   ...             |
                                          |
record K --|                              |
record L   |                              |      [ Collection ]
    ...    |--> Amazon Kinesis record C --|--> PutRecords Request
    ...    |                              |
record S --|                              |
                                          |
    ...                   ...             |
                                          |
record AA--|                              |
record BB  |                              |
    ...    |--> Amazon Kinesis record M --|
    ...    |
record ZZ--|
```

# KPL をプロデューサーコードと統合する
<a name="kinesis-kpl-integration"></a>

Amazon Kinesis Producer Library (KPL) は独立したプロセスで実行され、IPC を使用して親ユーザープロセスと通信します。このアーキテクチャは、[マイクロサービス](http://en.wikipedia.org/wiki/Microservices)と呼ばれる場合があり、次の 2 つの主な理由からこれが選択されます。

**1) KPL がクラッシュしても、ユーザープロセスはクラッシュしません**  
プロセスには Kinesis Data Streams と無関係なタスクが含まれている場合があり、KPL がクラッシュしてもオペレーションを続行できます。また、親ユーザープロセスが KPL を再起動し、完全に機能する状態に復旧することもできます (この機能は、正式なラッパーに含まれています)。

メトリクスを Kinesis Data Streams に送信するウェブサーバーがその例です。このサーバーは、Kinesis Data Streams 部分が動作を停止してもページの提供を続行できます。そのため、KPL のバグが原因でサーバー全体がクラッシュすると、不要なサービス停止が発生します。

**2) 任意のクライアントをサポートできます**  
正式にサポートされている言語以外の言語を使用するお客様もいます。これらのお客様も KPL を簡単に使用できます。

## 推奨される使用状況
<a name="kinesis-kpl-integration-usage"></a>

使用状況の異なるユーザーに推奨される設定を次の表に示します。この表を参考に、KPL を使用できるかどうか、どのように使用できるかを判断できます。集約が有効な場合、コンシューマー側で集約解除を使用してレコードを抽出する必要があることにも注意してください。


| プロデューサー側の言語 | コンシューマー側の言語 | KCL バージョン | チェックポイントロジック | KPL の使用可否 | 注意 | 
| --- | --- | --- | --- | --- | --- | 
| Java 以外 | \$1 | \$1 | \$1 | 不可 | 該当なし | 
| Java | Java | Java SDK を直接使用 | 該当なし | 可 | 集約を使用する場合、GetRecords を呼び出した後に、提供された集約解除ライブラリを使用する必要があります。 | 
| Java | Java 以外 | SDK を直接使用 | 該当なし | 可 | 集約を無効にする必要があります。 | 
| Java | Java | 1.3.x | 該当なし | 可 | 集約を無効にする必要があります。 | 
| Java | Java  | 1.4.x | 引数なしでチェックポイントを呼び出す | 可 | なし | 
| Java | Java | 1.4.x | 明示的なシーケンス番号を使用してチェックポイントを呼び出す | 可 | 集約を無効にするかコードを変更し、チェックポイント作成用の拡張されたシーケンス番号を使用します。 | 
| Java | Java 以外  | 1.3.x \$1 複数言語デーモン \$1 言語固有のラッパー | 該当なし | 可 | 集約を無効にする必要があります。 | 

# KPL を使用して Kinesis Data Stream に書き込む
<a name="kinesis-kpl-writing"></a>

以下のセクションでは、最も基本的なプロデューサーから完全に非同期なコードまで順にサンプルコードを示します。

## 最低限のプロデューサーコード
<a name="kinesis-kpl-writing-code"></a>

次のコードは、最小限の機能するプロデューサーを書くために必要なものがすべて含まれています。Amazon Kinesis Producer Library (KPL) ユーザーレコードはバックグラウンドで処理されます。

```
// KinesisProducer gets credentials automatically like 
// DefaultAWSCredentialsProviderChain. 
// It also gets region automatically from the EC2 metadata service. 
KinesisProducer kinesis = new KinesisProducer();  
// Put some records 
for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block       
    kinesis.addUserRecord("myStream", "myPartitionKey", data); 
}  
// Do other stuff ...
```

## 結果に対する同期的な応答
<a name="kinesis-kpl-writing-synchronous"></a>

前のコード例では、 ユーザーレコードが成功したかどうかをチェックしませんでした。KPL は、失敗に対処するために必要な再試行を実行します。ただし、結果を確認する必要がある場合は、次の例 (分かりやすくするため前の例を使用しています) のように、`addUserRecord` から返される `Future` オブジェクトを使用して結果を確認します。

```
KinesisProducer kinesis = new KinesisProducer();  

// Put some records and save the Futures 
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); 
for (int i = 0; i < 100; i++) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block 
    putFutures.add(
        kinesis.addUserRecord("myStream", "myPartitionKey", data)); 
}  

// Wait for puts to finish and check the results 
for (Future<UserRecordResult> f : putFutures) {
    UserRecordResult result = f.get(); // this does block     
    if (result.isSuccessful()) {         
        System.out.println("Put record into shard " + 
                            result.getShardId());     
    } else {
        for (Attempt attempt : result.getAttempts()) {
            // Analyze and respond to the failure         
        }
    }
}
```

## 結果に対する非同期的な応答
<a name="kinesis-kpl-writing-asynchronous"></a>

前の例では、`get()` オブジェクトに対して `Future` を呼び出しているため、ランタイムがブロックされます。ランタイムのブロックを避ける必要がある場合には、次の例に示すように非同期コールバックを使用できます。

```
KinesisProducer kinesis = new KinesisProducer();

FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {     
    @Override public void onFailure(Throwable t) {
        /* Analyze and respond to the failure  */ 
    };     
    @Override public void onSuccess(UserRecordResult result) { 
        /* Respond to the success */ 
    };
};

for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));      
    ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);     
    // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
    Futures.addCallback(f, myCallback); 
}
```

# Amazon Kinesis Producer Library を設定する
<a name="kinesis-kpl-config"></a>

デフォルト設定のままで、ほとんどのユースケースに問題なく使用できますが、デフォルト設定の一部を変更することで、ニーズに合わせて `KinesisProducer` の動作を調整することができます。それには、`KinesisProducerConfiguration` クラスのインスタンスを `KinesisProducer` コンストラクタに渡します。たとえば、次のようにします。

```
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
        .setRecordMaxBufferedTime(3000)
        .setMaxConnections(1)
        .setRequestTimeout(60000)
        .setRegion("us-west-1");
        
final KinesisProducer kinesisProducer = new KinesisProducer(config);
```

プロパティファイルから設定をロードすることもできます。

```
KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile("default_config.properties");
```

ユーザープロセスがアクセスできる任意のパスとファイル名に置き換えることができます。さらに、このようにして作成した `KinesisProducerConfiguration` インスタンスに対して設定メソッドを呼び出して、設定をカスタマイズできます。

プロパティファイルでは、PascalCase 内の名前を使用してパラメータを指定する必要があります。その名前は、`KinesisProducerConfiguration` クラスの設定メソッドで使用されるものと一致します。例: 

```
RecordMaxBufferedTime = 100
MaxConnections = 4
RequestTimeout = 6000
Region = us-west-1
```

設定パラメータの使用方法と値の制限の詳細については、[sample configuration properties file on GitHub](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties)を参照してください。

`KinesisProducer` の初期化後に、使用した `KinesisProducerConfiguration` インスタンスを変更しても何の変化もないことに注意してください。現在、`KinesisProducer` は動的設定をサポートしていません。

# コンシューマーの集約解除を実装する
<a name="kinesis-kpl-consumer-deaggregation"></a>

KCL は、リリース 1.4.0 から KPL ユーザーレコードの自動集計解除をサポートしています。以前のバージョンの KCL で書かれたコンシューマーアプリケーションのコードは、KCL を更新した後、コードを何も修正せずにコンパイルできます。ただし、プロデューサー側で KPL の集約を使用している場合、チェックポイントが多少関係してきます。集約されたレコード内のすべてのサブレコードは同じシーケンス番号を持っているため、サブレコード間の区別が必要な場合、チェックポイントを使用して追加のデータを保存する必要があります。この追加データは、*サブシーケンス番号*と呼ばれます。

**Topics**
+ [以前のバージョンの KCL から移行する](#kinesis-kpl-consumer-deaggregation-migration)
+ [KPL の集約解除のための KCL の拡張を使用する](#kinesis-kpl-consumer-deaggregation-extensions)
+ [GetRecords を直接使用する](#kinesis-kpl-consumer-deaggregation-getrecords)

## 以前のバージョンの KCL から移行する
<a name="kinesis-kpl-consumer-deaggregation-migration"></a>

集約を使用している場合でも、既存のチェックポイント呼び出しを変更する必要はありません。Kinesis Data Streams に保存されているすべてのレコードを正しく取得できることが保証されています。以下で説明する特定のユースケースをサポートするために、現在 KCL には、2 つの新しいチェックポイントオペレーションが用意されています。

既存のコードが KPL サポート以前の KCL 用に書かれていて、チェックポイントオペレーションが引数なしで呼び出される場合、そのコードの動作は、バッチ内にある最後の KPL ユーザーレコードのシーケンス番号に対するチェックポイントの作成と同等です。シーケンス番号文字列を使用してチェックポイントオペレーションを呼び出す場合は、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、バッチの指定されたシーケンス番号に対するチェックポイントの作成と同等です。

引数なしで新しいKCL チェックポイントオペレーション `checkpoint()` を呼び出すことは、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、バッチ内の最後の `Record` 呼び出しのシーケンス番号に対するチェックポイントの作成と意味的に同等です。

新しい KCL チェックポイントオペレーション `checkpoint(Record record)` を呼び出すことは、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、指定された `Record` のシーケンス番号に対するチェックポイントの作成と意味的に同等です。`Record` 呼び出しが実際には `UserRecord` である場合、`UserRecord` のシーケンス番号とサブシーケンス番号にチェックポイントが作成されます。

新しい KCL チェックポイントオペレーション `checkpoint(String sequenceNumber, long subSequenceNumber)` を呼び出すと、指定されたシーケンス番号とサブシーケンス番号に明示的にチェックポイントが作成されます。

いずれの場合も、チェックポイントが Amazon DynamoDB チェックポイントテーブルに保存された後は、アプリケーションがクラッシュして再起動した場合、KCL により、レコードの取得が正常に再開されます。さらにレコードがシーケンス内に含まれている場合は、最後にチェックポイントが作成されたシーケンス番号が付けられているレコード内の次のサブシーケンス番号のレコードから取得が開始されます。前のシーケンス番号のレコードにある最後のサブシーケンス番号が、最新のチェックポイントに含まれている場合、その次のシーケンス番号が付けられているレコードから取得が開始されます。

次のセクションでは、レコードのスキップや重複を避けるために必要な、コンシューマーのシーケンスとサブシーケンスのチェックポイントの詳細について説明します。コンシューマーのレコード処理を停止し再起動するときに、レコードのスキップや重複が重要でない場合は、変更せずに既存のコードを実行してかまいません。

## KPL の集約解除のための KCL の拡張を使用する
<a name="kinesis-kpl-consumer-deaggregation-extensions"></a>

KPL の集約解除ではサブシーケンスチェックポイントを使用できます。サブシーケンスチェックポイントを使いやすくするために、`UserRecord` クラスが KCL に追加されています。

```
public class UserRecord extends Record {     
    public long getSubSequenceNumber() {
    /* ... */
    }      
    @Override 
    public int hashCode() {
    /* contract-satisfying implementation */ 
    }      
    @Override 
    public boolean equals(Object obj) {
    /* contract-satisfying implementation */ 
    } 
}
```

このクラスは、現在 `Record` の代わりに使用されています。これは `Record` のサブクラスであるため、既存のコードは影響を受けません。`UserRecord` クラスは、実際のサブレコードと通常の集約されていないレコードの両方を表します。集約されていないレコードは、サブレコードを 1 つだけ含む集約されたレコードと考えることができます。

さらに、2 つの新しいオペレーションが `IRecordProcessorCheckpointer` に追加されています。

```
public void checkpoint(Record record); 
public void checkpoint(String sequenceNumber, long subSequenceNumber);
```

サブシーケンス番号チェックポイントの使用を開始するには、次の変更を行います。次のフォームコードを変更します。

```
checkpointer.checkpoint(record.getSequenceNumber());
```

新しいフォームコードは次のようになります。

```
checkpointer.checkpoint(record);
```

サブシーケンスチェックポイントでは、`checkpoint(Record record)` フォームを使用することをお勧めします。ただし、チェックポイントの作成で使用する文字列にすでに `sequenceNumbers` を保存している場合は、次の例に示すように、`subSequenceNumber` も保存する必要があります。

```
String sequenceNumber = record.getSequenceNumber(); 
long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber();  // ... do other processing  
checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
```

この実装では `UserRecord` を必ず使用するため、`UserRecord` から `Record` へのキャストは必ず成功します。シーケンス番号の計算を実行する必要がない場合、この方法はお勧めしません。

KPL ユーザーレコードの処理中に、CL は、サブシーケンス番号を Amazon DynamoDB に各行の追加フィールドとして書き込みます。以前のバージョンの KCL では、チェックポイントを再開するときに `AFTER_SEQUENCE_NUMBER` を使用してレコードを取得していました。KPL サポートを含む現在の KCL では、代わりに `AT_SEQUENCE_NUMBER` を使用します。チェックポイントが作成されたシーケンス番号のレコードを取得するとき、チェックポイントが作成されたサブシーケンス番号がチェックされ、サブレコードが必要に応じて削除されます (最後のサブレコードにチェックポイントが作成されている場合、すべてのサブレコードが削除されます)。ここでも、集約されていないレコードは、単一のサブレコードを含む集約されたレコードと考えることができ、集約されたレコードと集約されていないレコードの両方で同じアルゴリズムを使用できます。

## GetRecords を直接使用する
<a name="kinesis-kpl-consumer-deaggregation-getrecords"></a>

KCL の使用を選択せずに、API オペレーション `GetRecords` を直接呼び出して Kinesis Data Streams レコードを取得することもできます。これらの取得したレコードを元の KPL ユーザーレコードに解凍するには、`UserRecord.java` にある次の静的なオペレーションの 1 つを呼び出します。

```
public static List<Record> deaggregate(List<Record> records)

public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)
```

最初のオペレーションでは、`startingHashKey` のデフォルト値 `0` (ゼロ) と `endingHashKey` のデフォルト値 `2^128 -1` を使用します。

これらの各オペレーションは、Kinesis Data Streams レコードの指定されたリストを KPL ユーザーレコードのリストに集約解除します。KPL ユーザーレコードの明示的なハッシュキーまたはパーティションキーが `startingHashKey` と `endingHashKey` の範囲 (境界を含む) 外にある場合、これらのユーザーレコードは、返されるレコードのリストから破棄されます。

# Amazon Data Firehose で KPL を使用する
<a name="kpl-with-firehose"></a>

Kinesis Producer Library (KPL) を使用して Kinesis データストリームにデータを書き込む場合、集約を使用してその Kinesis データストリームに書き込むレコードを結合できます。その後そのデータストリームを Firehose 配信ストリームのソースとして使用する場合、Firehose はレコードの集約を解除してから送信先に配信します。データを変換するように配信ストリームを設定する場合、Firehose はレコードの集約を解除してから AWS Lambdaに配信します。詳細については、「[Writing to Amazon Firehose Using Kinesis Data Streams](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html)」を参照してください。

# Schema Registry で KPL AWS Glue を使用する
<a name="kpl-with-schemaregistry"></a>

Kinesis データストリームを AWS Glue Schema Registry と統合できます。 AWS Glue スキーマレジストリを使用すると、スキーマを一元的に検出、制御、および進化させながら、生成されたデータが登録されたスキーマによって継続的に検証されるようにできます。スキーマ は、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。 AWS Glue Schema Registry を使用すると、ストリーミングアプリケーション内のend-to-endのデータ品質とデータガバナンスを向上させることができます。詳細については、[AWS Glue スキーマレジストリ](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)を参照してください。この統合を設定する方法の 1 つは、Java で KPL および Kinesis Client Library (KCL) ライブラリを使用することです。

**重要**  
現在、Kinesis Data Streams と AWS Glue スキーマレジストリの統合は、Java に実装された KPL プロデューサーを使用する Kinesis データストリームでのみサポートされています。多言語サポートは提供されていません。

KPL を使用して Kinesis Data Streams とスキーマレジストリの統合を設定する方法の詳細については、[「ユースケース: Amazon Kinesis Data Streams と AWS Glue スキーマレジストリの統合」の「KPL/KCL ライブラリを使用したデータの操作](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)」セクションを参照してください。

# KPL プロキシ設定を設定する
<a name="kpl-proxy-configuration"></a>

インターネットに直接接続できないアプリケーションの場合、すべての AWS SDK クライアントは HTTP または HTTPS プロキシの使用をサポートしています。一般的なエンタープライズ環境では、すべてのアウトバウンドネットワークトラフィックがプロキシサーバーを経由する必要があります。アプリケーションが Kinesis プロデューサーライブラリ (KPL) を使用してプロキシサーバーを使用する AWS 環境でデータを収集して に送信する場合、アプリケーションには KPL プロキシ設定が必要です。KPL は、Kinesis SDK AWS 上に構築された高レベルのライブラリです。これは、ネイティブプロセスとラッパーに分割されています。ネイティブプロセスがレコードの処理ジョブと送信ジョブのすべてを実行する一方で、ラッパーはネイティブプロセスの管理と、ネイティブプロセスとの通信を実行します。詳細については、「[Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library](https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/)」を参照してください。

ラッパーは Java で記述され、ネイティブプロセスは Kinesis SDK を使用して C\$1\$1 で記述されます。KPL バージョン 0.14.7 以降では、すべてのプロキシ設定をネイティブプロセスに渡すことができる、Java ラッパー内のプロキシ設定がサポートされるようになりました。詳細については、[https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7](https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7) を参照してください。

KPL アプリケーションへのプロキシ設定の追加には、以下のコードを使用できます。

```
KinesisProducerConfiguration configuration = new KinesisProducerConfiguration();
// Next 4 lines used to configure proxy 
configuration.setProxyHost("10.0.0.0"); // required
configuration.setProxyPort(3128); // default port is set to 443
configuration.setProxyUserName("username"); // no default 
configuration.setProxyPassword("password"); // no default

KinesisProducer kinesisProducer = new KinesisProducer(configuration);
```

# KPL バージョンライフサイクルポリシー
<a name="kpl-version-lifecycle-policy"></a>

このトピックでは、Amazon Kinesis Producer Library (KPL) のバージョンライフサイクルポリシーの概要を説明します。 では、新機能と機能強化、バグ修正、セキュリティパッチ、依存関係の更新をサポートするために、KPL バージョンの新しいリリース AWS を定期的に提供しています。最新の機能、セキュリティ更新、基本的な依存関係に対応するため、KPL を常に最新バージョンへ更新しておくことをお勧めします。サポートが終了した KPL バージョンを継続使用することは**お勧めしません**。

主要な KPL バージョンのライフサイクルは、次の 3 つのフェーズで構成されます。
+ **一般提供 (GA)** – このフェーズでは、メジャーバージョンが完全にサポートされています。 は、Kinesis Data Streams の新機能や API アップデートのサポート、バグやセキュリティの修正を含む、定期的なマイナーバージョンとパッチバージョンのリリース AWS を提供します。
+ **メンテナンスモード** – パッチバージョンのリリース AWS を制限して、重大なバグ修正とセキュリティの問題にのみ対処します。Kinesis Data Streams の新機能や API に対する更新は、このメジャーバージョンには行われません。
+ **サポート終了** – そのメジャーバージョンに対して更新やリリースは一切提供されません。以前に公開されたリリースは引き続き公開パッケージマネージャーから入手でき、コードは GitHub に残ります。サポートが終了したバージョンの使用は、ユーザーの裁量で行われます。最新のメジャーバージョンにアップグレードすることをお勧めします。


| メジャーバージョン | 現在のフェーズ | リリース日 | メンテナンスモード日 | サポート終了日 | 
| --- | --- | --- | --- | --- | 
| KPL 0.x | メンテナンスモード | 2015-06-02 | 2025-04-17 | 2026-01-30 | 
| KPL 1.x | 一般提供 | 2024-12-15 | -- | -- | 

# で Amazon Kinesis Data Streams API を使用してプロデューサーを開発する AWS SDK for Java
<a name="developing-producers-with-sdk"></a>

 AWS SDK for Java で Amazon Kinesis Data Streams API を使用してプロデューサーを開発できます。Kinesis Data Streams を初めて利用する場合は、[Amazon Kinesis Data Streams とは](introduction.md)および[AWS CLI を使用して Amazon Kinesis Data Streams オペレーションを実行する](getting-started.md)で説明されている概念と用語について理解することから始めてください。

以下の例では、[Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/) について説明し、[AWS SDK for Java](https://aws.amazon.com/sdk-for-java/) を使用してストリームにデータを追加 (入力) します。ただし、ほとんどのユースケースでは、Kinesis Data Streams KPL ライブラリを使用します。詳細については、[Amazon Kinesis Producer Library (KPL) を使用してプロデューサーを開発する](developing-producers-with-kpl.md)を参照してください。

この章で紹介する Java サンプルコードは、基本的な Kinesis Data Streams API オペレーションを実行する方法を示しており、オペレーションタイプ別に論理的に分割されています。これらのサンプルは、すべての例外を確認しているわけではなく、すべてのセキュリティやパフォーマンスの側面を考慮しているわけでもない点で、本稼働環境に使用できるコードを表すものではありません。また、他のプログラミング言語を使用して [Kinesis Data Streams API](https://docs.aws.amazon.com/kinesis/latest/APIReference/) を呼び出すこともできます。利用可能なすべての AWS SDKs[「アマゾン ウェブ サービスの開発を開始する](https://aws.amazon.com/developers/getting-started/)」を参照してください。

各タスクには前提条件があります。たとえば、ストリームを作成するまではストリームにデータを追加できず、ストリームを作成するにはクライアントを作成する必要があります。詳細については、「[Kinesis Data Streams を作成して管理する](working-with-streams.md)」を参照してください。

**Topics**
+ [ストリームにデータを追加する](#kinesis-using-sdk-java-add-data-to-stream)
+ [AWS Glue Schema Registry を使用してデータを操作する](kinesis-integration-glue-schema-registry.md)

## ストリームにデータを追加する
<a name="kinesis-using-sdk-java-add-data-to-stream"></a>

ストリームを作成したら、レコードの形式でストリームにデータを追加できます。レコードはデータ BLOB の形式で処理するデータを格納するデータ構造です。データをレコードに保存した後、Kinesis Data Streams ではいずれの方法でもデータが検査、解釈、または変更されることはありません。各レコードにはシーケンス番号とパーティションキーも関連付けられます。

Kinesis Data Streams API には、ストリームにデータを追加するオペレーションとして [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) と [https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) の 2 つの異なるオペレーションがあります。`PutRecords` オペレーションは HTTP リクエストごとストリームに複数のレコードを送信し、単数形の `PutRecord` オペレーションは一度に 1 つずつストリームにレコードを送信します (各レコードについて個別の HTTP リクエストが必要です)。データプロデューサーあたりのスループットが向上するため、ほとんどのアプリケーションでは `PutRecords` を使用してください。これらの各オペレーションの詳細については、後のそれぞれのサブセクションを参照してください。

**Topics**
+ [PutRecords を使用して複数のレコードを追加する](#kinesis-using-sdk-java-putrecords)
+ [PutRecord を使用して単一レコードを追加する](#kinesis-using-sdk-java-putrecord)

ソースアプリケーションは Kinesis Data Streams API を使用してストリームにデータを追加するため、1 つ以上のコンシューマーアプリケーションが同時にストリームからデータを取得して処理する可能性があることを常に念頭に置いてください。コンシューマーが Kinesis Data Streams API を使用してデータを取得する方法の詳細については、[ストリームからのデータを取得する](developing-consumers-with-sdk.md#kinesis-using-sdk-java-get-data)を参照してください。

**重要**  
[データ保持期間を変更する](kinesis-extended-retention.md)

### PutRecords を使用して複数のレコードを追加する
<a name="kinesis-using-sdk-java-putrecords"></a>

[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) オペレーションは、1 つのリクエストで Kinesis Data Streams に複数のレコードを送信します。`PutRecords` を使用することによって、プロデューサーは Kinesis Data Streams にデータを送信するときに高スループットを実現できます。各`PutRecords` リクエストは、最大 500 レコードをサポートできます。リクエストに含まれる各レコードは 1 MB、リクエスト全体の上限はパーティションキーを含めて最大 5 MB。後で説明する単一の `PutRecord` オペレーションと同様に、`PutRecords` はシーケンス番号とパーティションキーを使用します。ただし、`PutRecord` の `SequenceNumberForOrdering` パラメータは、`PutRecords` の呼び出しには含まれません。`PutRecords` オペレーションでは、リクエストの自然な順序ですべてのレコードを処理するよう試みます。

各データレコードには一意のシーケンス番号があります。シーケンス番号は、`client.putRecords` を呼び出してストリームにデータレコードを追加した後に、Kinesis Data Streams によって割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。`PutRecords`リクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。

**注記**  
シーケンス番号は、同じストリーム内の一連のデータのインデックスとして使用することはできません。一連のデータを論理的に区別するには、パーティションキーを使用するか、データセットごとに個別のストリームを作成します。

`PutRecords` リクエストには、異なるパーティションキーのレコードを含めることができます。リクエストのスコープはストリームです。各リクエストには、リクエストの制限まで、パーティションキーとレコードのあらゆる組み合わせを含めることができます。複数の異なるパーティションキーを使用して、複数の異なるシャードを含むストリームに対して実行されたリクエストは、少数のパーティションキーを使用して少数のシャードに対して実行されたリクエストよりも一般的に高速です。レイテンシーを低減し、スループットを最大化するには、パーティションキーの数をシャードの数よりも大きくする必要があります。

#### PutRecords の例
<a name="kinesis-using-sdk-java-putrecords-example"></a>

次のコードでは、シーケンシャルなパーティションキーを持つ 100 件のデータレコードを作成し、`DataStream` という名前のストリームに格納しています。

```
        AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard();
        
        clientBuilder.setRegion(regionName);
        clientBuilder.setCredentials(credentialsProvider);
        clientBuilder.setClientConfiguration(config);
        
        AmazonKinesis kinesisClient = clientBuilder.build();
 
        PutRecordsRequest putRecordsRequest  = new PutRecordsRequest();
        putRecordsRequest.setStreamName(streamName);
        List <PutRecordsRequestEntry> putRecordsRequestEntryList  = new ArrayList<>(); 
        for (int i = 0; i < 100; i++) {
            PutRecordsRequestEntry putRecordsRequestEntry  = new PutRecordsRequestEntry();
            putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes()));
            putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i));
            putRecordsRequestEntryList.add(putRecordsRequestEntry); 
        }

        putRecordsRequest.setRecords(putRecordsRequestEntryList);
        PutRecordsResult putRecordsResult  = kinesisClient.putRecords(putRecordsRequest);
        System.out.println("Put Result" + putRecordsResult);
```

`PutRecords` のレスポンスには、レスポンスの `Records` の配列が含まれます。レスポンス配列の各レコードは、リクエスト配列内のレコードと自然な順序 (リクエストやレスポンスの上から下へ) で直接相互に関連付けられます。レスポンスの `Records` 配列には、常にリクエスト配列と同じ数のレコードが含まれます。

#### PutRecords 使用時のエラーを処理する
<a name="kinesis-using-sdk-java-putrecords-handling-failures"></a>

デフォルトでは、リクエスト内の個々のレコードでエラーが発生しても、`PutRecords` リクエスト内のそれ以降のレコードの処理は停止されません。つまり、レスポンスの `Records` 配列には、正常に処理されたレコードと、正常に処理されなかったレコードの両方が含まれていることを意味します。正常に処理されなかったレコードを検出し、それ以降の呼び出しに含める必要があります。

正常に処理されたレコードには `SequenceNumber` 値と `ShardID` 値が、正常に処理されなかったレコードには `ErrorCode` 値と `ErrorMessage` 値が含まれます。`ErrorCode` パラメータはエラーのタイプを反映し、`ProvisionedThroughputExceededException` または `InternalFailure` のいずれかの値になります。`ErrorMessage`は、`ProvisionedThroughputExceededException` 例外に関するより詳細な情報として、スロットリングされたレコードのアカウント ID、ストリーム名、シャード ID などを示します。次の例では、`PutRecords` リクエストに 3 つのレコードがあります。2 番目のレコードは失敗し、レスポンスに反映されます。

**Example PutRecords リクエストの構文**  

```
{
    "Records": [
        {
    	"Data": "XzxkYXRhPl8w",
	    "PartitionKey": "partitionKey1"
        },
        {
    	"Data": "AbceddeRFfg12asd",
	    "PartitionKey": "partitionKey1"	
        },
        {
    	"Data": "KFpcd98*7nd1",
	    "PartitionKey": "partitionKey3"
        }
    ],
    "StreamName": "myStream"
}
```

**Example PutRecords レスポンスの構文**  

```
{
    "FailedRecordCount”: 1,
    "Records": [
        {
	    "SequenceNumber": "21269319989900637946712965403778482371",
	    "ShardId": "shardId-000000000001"

        },
        {
	    “ErrorCode":”ProvisionedThroughputExceededException”,
	    “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."

        },
        {
	    "SequenceNumber": "21269319989999637946712965403778482985",
	    "ShardId": "shardId-000000000002"
        }
    ]
}
```

正常に処理されなかったレコードは、以降の `PutRecords` リクエストに含めることができます。最初に、`FailedRecordCount` の `putRecordsResult` パラメータを調べて、リクエスト内にエラーとなったレコードがあるかどうかを確認します。このようなレコードがある場合は、`putRecordsEntry` が `ErrorCode` 以外である各 `null` を、以降のリクエストに追加してください。このタイプのハンドラーの例については、次のコードを参照してください。

**Example PutRecords エラーハンドラー**  

```
PutRecordsRequest putRecordsRequest = new PutRecordsRequest();
putRecordsRequest.setStreamName(myStreamName);
List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>();
for (int j = 0; j < 100; j++) {
    PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry();
    putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes()));
    putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j));
    putRecordsRequestEntryList.add(putRecordsRequestEntry);
}

putRecordsRequest.setRecords(putRecordsRequestEntryList);
PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);

while (putRecordsResult.getFailedRecordCount() > 0) {
    final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>();
    final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords();
    for (int i = 0; i < putRecordsResultEntryList.size(); i++) {
        final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i);
        final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i);
        if (putRecordsResultEntry.getErrorCode() != null) {
            failedRecordsList.add(putRecordRequestEntry);
        }
    }
    putRecordsRequestEntryList = failedRecordsList;
    putRecordsRequest.setRecords(putRecordsRequestEntryList);
    putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest);
}
```

### PutRecord を使用して単一レコードを追加する
<a name="kinesis-using-sdk-java-putrecord"></a>

[https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) の各呼び出しは、1 つのレコードに対して動作します。アプリケーションで常にリクエストごとに 1 つのレコードを送信する必要がある場合や、`PutRecords` を使用できないその他の理由がある場合を除いて、[PutRecords を使用して複数のレコードを追加する](#kinesis-using-sdk-java-putrecords)で説明している `PutRecords` オペレーションを使用します。

各データレコードには一意のシーケンス番号があります。シーケンス番号は、`client.putRecord` を呼び出してストリームにデータレコードを追加した後に、Kinesis Data Streams によって割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。`PutRecord`リクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。

 入力が立て続けに行われた場合、返されるシーケンス番号は大きくなるとは限りません。入力オペレーションが基本的に Kinesis Data Streams に対して同時に実行されるためです。同じパーティションキーに対して厳密にシーケンス番号が大きくなるようにするには、[PutRecord の例](#kinesis-using-sdk-java-putrecord-example)のサンプルコードに示しているように、`SequenceNumberForOrdering` パラメータを使用します。

 `SequenceNumberForOrdering` を使用するかどうかにかかわらず、inesis Data Streams が `GetRecords` の呼び出しを通じて受け取るレコードは厳密にシーケンス番号順になります。

**注記**  
シーケンス番号は、同じストリーム内の一連のデータのインデックスとして使用することはできません。一連のデータを論理的に区別するには、パーティションキーを使用するか、データセットごとに個別のストリームを作成します。

パーティションキーはストリーム内のデータをグループ化するために使用されます。データレコードはそのパーティションキーに基づいてストリーム内でシャードに割り当てられます。具体的には、Kinesis Data Streams ではパーティションキー (および関連するデータ) を特定のシャードにマッピングするハッシュ関数への入力として、パーティションキーを使用します。

 このハッシュメカニズムの結果として、パーティションキーが同じすべてのデータレコードは、ストリーム内で同じシャードにマッピングされます。ただし、パーティションキーの数がシャードの数を超えている場合、一部のシャードにパーティションキーが異なるレコードが格納されることがあります。設計の観点から、すべてのシャードが適切に使用されるようにするには、シャードの数 (`setShardCount` の `CreateStreamRequest` メソッドで指定) を一意のパーティションキーの数よりも大幅に少なくする必要があります。また、1 つのパーティションキーへのデータの流量をシャードの容量より大幅に小さくする必要があります。

#### PutRecord の例
<a name="kinesis-using-sdk-java-putrecord-example"></a>

以下のコードでは、2 つのパーティションキーに配分される 10 件のデータレコードを作成し、`myStreamName` という名前のストリームに格納しています。

```
for (int j = 0; j < 10; j++) 
{
  PutRecordRequest putRecordRequest = new PutRecordRequest();
  putRecordRequest.setStreamName( myStreamName );
  putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() ));
  putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 ));  
  putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord );
  PutRecordResult putRecordResult = client.putRecord( putRecordRequest );
  sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber();
}
```

上記のコード例では、`setSequenceNumberForOrdering` を使用して、各パーティションキー内で順番が厳密に増えるようにしています。このパラメータを効果的に使用するには、現在のレコードの `SequenceNumberForOrdering` (レコード *n*) を前のレコード (レコード *n-1*) のシーケンス番号に設定します。ストリームに追加されたレコードのシーケンス番号を取得するには、`getSequenceNumber` の結果に対して `putRecord` を呼び出します。

`SequenceNumberForOrdering` パラメーターを指定すると、同じパーティションキーのシーケンス番号が厳密に大きくなります。`SequenceNumberForOrdering`では、複数のパーティションキーにわたるレコードの順序付けは用意されていません。

# AWS Glue Schema Registry を使用してデータを操作する
<a name="kinesis-integration-glue-schema-registry"></a>

Kinesis データストリームを AWS Glue Schema Registry と統合できます。 AWS Glue スキーマレジストリを使用すると、スキーマを一元的に検出、制御、および進化させながら、生成されたデータが登録されたスキーマによって継続的に検証されるようにできます。スキーマ は、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。 AWS Glue スキーマレジストリを使用すると、ストリーミングアプリケーション内のend-to-endのデータ品質とデータガバナンスを改善できます。詳細については、[AWS Glue スキーマレジストリ](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)を参照してください。この統合を設定する方法の 1 つは、 AWS Java SDK で利用可能な `PutRecords` および `PutRecord` Kinesis Data Streams API を使用することです。

PutRecords および PutRecord Kinesis Data Streams APIs、ユースケース: Amazon Kinesis Data Streams と Glue スキーマレジストリの統合の「Kinesis Data Streams APIs を使用したデータの操作」セクションを参照してください。 [ Amazon Kinesis AWS](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)

# Kinesis エージェントを使用して Amazon Kinesis Data Streams に書き込む
<a name="writing-with-agents"></a>

Kinesis エージェントはスタンドアロンの Java ソフトウェアアプリケーションであり、データを収集して Kinesis Data Streams に送信する簡単な方法です。このエージェントは一連のファイルを継続的に監視し、新しいデータをストリームに送信します。エージェントはファイルのローテーション、チェックポイント、失敗時の再試行を処理します。タイムリーで信頼性の高い簡単な方法で、すべてのデータを提供します。また、ストリーミング処理のモニタリングとトラブルシューティングに役立つ Amazon CloudWatch メトリクスも出力します。

デフォルトでは、レコードは改行文字 (`'\n'`) に基づいて各ファイルから解析されます。しかし、複数行レコードを解析するよう、エージェントを設定することもできます ([エージェント設定を指定する](#agent-config-settings)を参照)。

このエージェントは、ウェブサーバー、ログサーバーおよびデータベースサーバーなど、Linux ベースのサーバー環境にインストールできます。エージェントをインストールした後で、モニタリング用のファイルとデータストリームを指定して設定します。エージェントが設定されると、ファイルから永続的にデータを収集して、ストリームに安全にデータを送信します。

**Topics**
+ [Kinesis Agent の前提条件を完了する](#prereqs)
+ [エージェントをダウンロードしてインストールする](#download-install)
+ [エージェントを設定して開始する](#config-start)
+ [エージェント設定を指定する](#agent-config-settings)
+ [複数のファイルディレクトリを監視し、複数のストリームに書き込む](#sim-writes)
+ [エージェントを使用してデータを事前処理する](#pre-processing)
+ [エージェント CLI コマンドを使用する](#cli-commands)
+ [よくある質問](#agent-faq)

## Kinesis Agent の前提条件を完了する
<a name="prereqs"></a>
+ オペレーティングシステムは Amazon Linux AMI バージョン 2015.09 以降、または Red Hat Enterprise Linux バージョン 7 以降でなければなりません。
+ Amazon EC2 を使用してエージェントを実行している場合は、EC2 インスタンスを起動します。
+ 次のいずれかの方法を使用して AWS 認証情報を管理します。
  + EC2 インスタンスを起動する際に IAM ロールを指定します。
  + エージェントを設定するときに AWS 認証情報を指定します ([awsAccessKeyId](#awsAccessKeyId) と [awsSecretAccessKey](#awsSecretAccessKey) を参照）。
  + を編集`/etc/sysconfig/aws-kinesis-agent`して、リージョンと AWS アクセスキーを指定します。
  + EC2 インスタンスが別の AWS アカウントにある場合は、Kinesis Data Streams サービスへのアクセスを提供する IAM ロールを作成し、エージェントを設定するときにそのロールを指定します ([assumeRoleARN](#assumeRoleARN)」と[assumeRoleExternalId](#assumeRoleExternalId)」を参照）。前述の方法のいずれかを使用して、このロールを引き受けるアクセス許可を持つ他のアカウントのユーザーの AWS 認証情報を指定します。
+ 指定する IAM ロールまたは AWS 認証情報には、エージェントがストリームにデータを送信するための Kinesis Data Streams [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) オペレーションを実行するアクセス許可が必要です。エージェントの CloudWatch モニタリングを有効にしている場合は、CloudWatch [PutMetricData](https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_PutMetricData.html) オペレーションを実行する許可も必要になります。詳細については、[IAM を使用した Amazon Kinesis Data Streams リソースへのアクセスの制御](controlling-access.md)、[Amazon CloudWatch による Kinesis Data Streams エージェントのヘルスに監視する](agent-health.md)、および[CloudWatch アクセスコントロール](https://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/UsingIAM.html)を参照してください。

## エージェントをダウンロードしてインストールする
<a name="download-install"></a>

最初に、インスタンスに接続します。詳細については、「*Amazon EC2 ユーザーガイド*」の「[Linux インスタンスへの接続](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-connect-to-instance-linux.html)」を参照してください。接続できない場合は、「Amazon EC2 ユーザーガイド」の「[インスタンスへの接続に関するトラブルシューティング](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/TroubleshootingInstancesConnecting.html)」を参照してください。**

**Amazon Linux AMI を使用してエージェントを設定する**  
次のコマンドを使用して、エージェントをダウンロードしてインストールします。

```
sudo yum install –y aws-kinesis-agent
```

**Red Hat Enterprise Linux を使用してエージェントを設定する**  
次のコマンドを使用して、エージェントをダウンロードしてインストールします。

```
sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
```

**GitHubを使用してエージェントを設定する**

1. エージェントを [awlabs/amazon-kinesis-agent](https://github.com/awslabs/amazon-kinesis-agent) からダウンロードします。

1. ダウンロードしたディレクトリまで移動し、次のコマンドを実行してエージェントをインストールします。

   ```
   sudo ./setup --install
   ```

**Docker コンテナにエージェントをセットアップするには**  
Kinesis Agent は、[amazonlinux](https://docs.aws.amazon.com/AmazonECR/latest/userguide/amazon_linux_container_image.html) コンテナベースを使ってコンテナで実行することもできます。次の Docker ファイルを使用し、`docker build` を実行します。

```
FROM amazonlinux

RUN yum install -y aws-kinesis-agent which findutils
COPY agent.json /etc/aws-kinesis/agent.json

CMD ["start-aws-kinesis-agent"]
```

## エージェントを設定して開始する
<a name="config-start"></a>

**エージェントを設定して開始するには**

1. (デフォルトのファイルアクセス許可を使用している場合、スーパーユーザーとして) 設定ファイル (`/etc/aws-kinesis/agent.json`) を開き、編集します。

   この設定ファイルで、エージェントがデータを集めるファイル (`"filePattern"`) とエージェントがデータを送信するストリーム (`"kinesisStream"`) を指定します。ファイル名はパターンで、エージェントはファイルローテーションを確認する点に注意してください。1 秒あたりに一度だけ、ファイルを交替または新しいファイルを作成できます。エージェントはファイル作成タイムスタンプを使用して、どのファイルを追跡してストリームに送信するかを判断します。新規ファイルの作成やファイルの交換を 1 秒あたりに一度以上頻繁に交換すると、エージェントはそれらを適切に区別できません。

   ```
   { 
      "flows": [
           { 
               "filePattern": "/tmp/app.log*", 
               "kinesisStream": "yourkinesisstream"
           } 
      ] 
   }
   ```

1. エージェントを手動で開始する:

   ```
   sudo service aws-kinesis-agent start
   ```

1. (オプション) システムスタートアップ時にエージェントを開始するように設定します。

   ```
   sudo chkconfig aws-kinesis-agent on
   ```

エージェントは、システムのサービスとしてバックグラウンドで実行されます。継続的に指定ファイルをモニタリングし、指定されたストリームにデータを送信します。エージェント活動は、`/var/log/aws-kinesis-agent/aws-kinesis-agent.log` に記録されます。

## エージェント設定を指定する
<a name="agent-config-settings"></a>

エージェントは、2つの必須設定、`filePattern` と `kinesisStream`、さらに追加機能として任意の設定をサポートしています。必須およびオプションの設定を `/etc/aws-kinesis/agent.json` で指定できます。

設定ファイルを変更した場合は、次のコマンドを使用してエージェントを停止および起動する必要があります。

```
sudo service aws-kinesis-agent stop
sudo service aws-kinesis-agent start
```

または、次のコマンドを使用できます。

```
sudo service aws-kinesis-agent restart
```

全般設定は次のとおりです。


| 構成設定 | 説明 | 
| --- | --- | 
| <a name="assumeRoleARN"></a>assumeRoleARN |  ユーザーが引き受けるロールの ARN。詳細については、IAM *ユーザーガイド*の[「IAM ロールを使用した AWS アカウント間のアクセスの委任](https://docs.aws.amazon.com/IAM/latest/UserGuide/tutorial_cross-account-with-roles.html)」を参照してください。  | 
| <a name="assumeRoleExternalId"></a>assumeRoleExternalId |  ロールを引き受けることができるユーザーを決定するオプションの ID。詳細については、*IAM ユーザーガイド*の[外部 ID の使用方法](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-user_externalid.html)を参照してください。  | 
| <a name="awsAccessKeyId"></a>awsAccessKeyId |  AWS デフォルトの認証情報を上書きする アクセスキー ID。この設定は、他のすべての認証情報プロバイダーに優先されます。  | 
| <a name="awsSecretAccessKey"></a>awsSecretAccessKey |  AWS デフォルトの認証情報を上書きするシークレットキー。この設定は、他のすべての認証情報プロバイダーに優先されます。  | 
| cloudwatch.emitMetrics |  エージェントがメトリクスを CloudWatch に出力できるようにします (true に設定された場合)。 デフォルト: true  | 
| cloudwatch.endpoint |  CloudWatch のリージョンのエンドポイントです。 デフォルト: `monitoring.us-east-1.amazonaws.com`  | 
| kinesis.endpoint |  Kinesis Data Streams のリージョンのエンドポイントです。 デフォルト: `kinesis.us-east-1.amazonaws.com`  | 

フロー設定は次のとおりです。


| 構成設定 | 説明 | 
| --- | --- | 
| dataProcessingOptions |  ストリームに送信される前に解析された各レコードに適用されるの処理オプションの一覧。処理オプションは指定した順序で実行されます。詳細については、[エージェントを使用してデータを事前処理する](#pre-processing)を参照してください。  | 
| kinesisStream |  [必須] ストリームの名前。  | 
| filePattern |  [必須] エージェントによって取得されるために一致する必要があるディレクトリとファイルパターン。このパターンに一致するすべてのファイルは、読み取り権限を `aws-kinesis-agent-user` に付与する必要があります。ファイルを含むディレクトリには、読み取りと実行権限を `aws-kinesis-agent-user` に付与する必要があります。  | 
| initialPosition |  ファイルの解析が開始される最初の位置。有効な値は、`START_OF_FILE` および `END_OF_FILE` です。 デフォルト: `END_OF_FILE`  | 
| maxBufferAgeMillis |  エージェントがストリームに送信する前にデータをバッファーする最大時間 (ミリ秒)。 値の範囲: 1,000～900,000 (1 秒～15 分) デフォルト: 60,000 (1 分)  | 
| maxBufferSizeBytes |  エージェントがストリームに送信する前にデータをバッファーする最大サイズ (バイト)。 値の範囲: 1～4,194,304 (4 MB) デフォルト: 4,194,304 (4 MB)  | 
| maxBufferSizeRecords |  エージェントがストリームに送信する前にデータをバッファーするレコードの最大数。 値の範囲: 1 ～ 500 デフォルト: 500  | 
| minTimeBetweenFilePollsMillis |  エージェントが新しいデータのモニタリング対象ファイルをポーリングし、解析する時間間隔 (ミリ秒単位)。 値の範囲: 1 以上 デフォルト: 100  | 
| multiLineStartPattern |  レコードの開始を識別するパターン。レコードは、パターンに一致する 1 行と、それに続くパターンに一致しない行で構成されます。有効な値は正規表現です。デフォルトでは、ログファイルのそれぞれの改行は 1 つのレコードとして解析されます。  | 
| partitionKeyOption |  パーティションのキーを生成する方法。有効な値は `RANDOM` (ランダムに生成される整数) と `DETERMINISTIC` (データから計算されるハッシュ値) です。 デフォルト: `RANDOM`  | 
| skipHeaderLines |  モニタリング対象ファイルの始めにエージェントが解析をスキップするの行数。 値の範囲: 0 以上 デフォルト: 0 (ゼロ)  | 
| truncatedRecordTerminator |  レコードのサイズが Kinesis Data Streams レコードの許容サイズを超えたときに解析されたレコードを切り捨てるために、エージェントが使用する文字列。(1,000 KB) デフォルト: `'\n'` (改行)  | 

## 複数のファイルディレクトリを監視し、複数のストリームに書き込む
<a name="sim-writes"></a>

複数のフロー設定を指定することによって、エージェントが複数のファイルディレクトリを監視し、複数のストリームにデータを送信するように設定できます。以下の設定例では、エージェントは 2 つのファイルディレクトリをモニタリングし、それぞれ Kinesis ストリームおよび Firehose 配信ストリームにデータを送信します。Kinesis Data Streams と Firehose に異なるエンドポイントを指定できるため、Kinesis ストリームと Firehose 配信ストリームが同じリージョンに存在する必要はありません。

```
{
    "cloudwatch.emitMetrics": true,
    "kinesis.endpoint": "https://your/kinesis/endpoint", 
    "firehose.endpoint": "https://your/firehose/endpoint", 
    "flows": [
        {
            "filePattern": "/tmp/app1.log*", 
            "kinesisStream": "yourkinesisstream"
        }, 
        {
            "filePattern": "/tmp/app2.log*",
            "deliveryStream": "yourfirehosedeliverystream" 
        }
    ] 
}
```

Firehose でのエージェントの使用の詳細については、「[Kinesis エージェントを使用した Amazon Data Firehose への書き込み](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-agents.html)」を参照してください。

## エージェントを使用してデータを事前処理する
<a name="pre-processing"></a>

エージェントはストリームにレコードを送信する前に、モニタリング対象ファイルから解析したレコードを事前処理できます。ファイルフローに `dataProcessingOptions` 設定を追加することで、この機能を有効にできます。1 つ以上の処理オプションを追加でき、また指定されている順序で実行されます。

エージェントは、リストされた次の処理オプションに対応しています。エージェントはオープンソースであるため、処理オプションを開発および拡張できます。[Kinesis エージェント](https://github.com/awslabs/amazon-kinesis-agent)からエージェントをダウンロードできます。処理オプション

`SINGLELINE`  
改行文字、先頭のスペース、末尾のスペースを削除することで、複数行レコードを単一行レコードに変換します。  

```
{
    "optionName": "SINGLELINE"
}
```

`CSVTOJSON`  
区切り形式から JSON 形式にレコードを変換します。  

```
{
    "optionName": "CSVTOJSON",
    "customFieldNames": [ "field1", "field2", ... ],
    "delimiter": "yourdelimiter"
}
```  
`customFieldNames`  
[必須] 各 JSON キー値のペアでキーとして使用されるフィールド名。たとえば、`["f1", "f2"]` を指定した場合は、レコードv1、v2は `{"f1":"v1","f2":"v2"}` に変換されます。  
`delimiter`  
レコードで区切り記号として使用する文字列。デフォルトはカンマ (,) です。

`LOGTOJSON`  
ログ形式から JSON 形式にレコードを変換します。サポートされているログ形式は、**Apache Common Log**、**Apache Combined Log**、**Apache Error Log**、および **RFC3164 Syslog** です。  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "logformat",
    "matchPattern": "yourregexpattern",
    "customFieldNames": [ "field1", "field2", … ]
}
```  
`logFormat`  
[必須] ログエントリ形式。以下の値を指定できます。  
+ `COMMONAPACHELOG` — Apache Common Log 形式。各ログエントリは、デフォルトで次のパターン`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}`になります。
+ `COMBINEDAPACHELOG` — Apache Combined Log 形式。各ログエントリは、デフォルトで次のパターン`%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}`になります。
+ `APACHEERRORLOG` — Apache Error Log 形式。各ログエントリは、デフォルトで次のパターン`[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}`になります。
+ `SYSLOG` — FC3164 Syslog 形式。各ログエントリは、デフォルトで次のパターン`%{timestamp} %{hostname} %{program}[%{processid}]: %{message}`になります。  
`matchPattern`  
ログエントリから値を取得するために使用する正規表現パターン。この設定は、ログエントリが定義されたログ形式の一つとして存在していない場合に使用されます。この設定を使用する場合は、`customFieldNames` を指定する必要があります。  
`customFieldNames`  
JSON キー値のペアでキーとして使用されるカスタムフィールド名。`matchPattern` から抽出した値のフィールド名を定義するために、または事前定義されたログ形式のデフォルトのフィールド名を上書きするために、この設定を使用できます。

**Example : LOGTOJSON 設定**  <a name="example-logtojson"></a>
JSON形式に変換された Apache Common Log エントリの `LOGTOJSON` 設定の一つの例を次に示します。  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG"
}
```
変換前:  

```
64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291
```
変換後:  

```
{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
```

**Example : カスタムフィールドがある LOGTOJSON 設定**  <a name="example-logtojson-custom-fields"></a>
こちらは `LOGTOJSON` 設定の別の例です。  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"]
}
```
この設定では、前の例からの同じ Apache Common Log エントリは、次のように JSON 形式に変換されます。  

```
{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
```

**Example : Apache Common Log エントリの変換**  <a name="example-apache-common-log-entry"></a>
次のフロー設定は Apache Common Log エントリを JSON 形式の単一行レコードに変換します。  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "kinesisStream": "my-stream",
            "dataProcessingOptions": [
                {
                    "optionName": "LOGTOJSON",
                    "logFormat": "COMMONAPACHELOG"
                }
            ]
        }
    ] 
}
```

**Example : 複数行レコードの変換**  <a name="example-convert-multiline"></a>
次のフロー設定は、最初の行が`[SEQUENCE=`で開始している複数行レコードを解析します。各レコードはまず単一行レコードに変換されます。次に、値はタブの区切り記号に基づいたレコードから取得されます。取得された値は指定された `customFieldNames` 値にマッピングされ、JSON 形式の単一行レコードを形成します。  

```
{ 
    "flows": [
        {
            "filePattern": "/tmp/app.log*", 
            "kinesisStream": "my-stream",
            "multiLineStartPattern": "\\[SEQUENCE=",
            "dataProcessingOptions": [
                {
                    "optionName": "SINGLELINE"
                },
                {
                    "optionName": "CSVTOJSON",
                    "customFieldNames": [ "field1", "field2", "field3" ],
                    "delimiter": "\\t"
                }
            ]
        }
    ] 
}
```

**Example : 一致パターンで LOGTOJSON 設定**  <a name="example-logtojson-match-pattern"></a>
こちらは、最後のフィールド (バイト) が省略された JSON 形式に変換された Apache Common Log エントリの `LOGTOJSON` 設定の一例です。  

```
{
    "optionName": "LOGTOJSON",
    "logFormat": "COMMONAPACHELOG",
    "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})",
    "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"]
}
```
変換前:  

```
123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200
```
変換後:  

```
{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}
```

## エージェント CLI コマンドを使用する
<a name="cli-commands"></a>

システムスタートアップ時のエージェントの自動的開始: 

```
sudo chkconfig aws-kinesis-agent on
```

エージェントのステータスの確認: 

```
sudo service aws-kinesis-agent status
```

エージェントの停止: 

```
sudo service aws-kinesis-agent stop
```

この場所からエージェントのログファイルを読む:

```
/var/log/aws-kinesis-agent/aws-kinesis-agent.log
```

エージェントのアンインストール:

```
sudo yum remove aws-kinesis-agent
```

## よくある質問
<a name="agent-faq"></a>

### Windows 用の Kinesis Agent はありますか?
<a name="agent-faq-1"></a>

[Windows 用 Kinesis Agent](https://docs.aws.amazon.com/kinesis-agent-windows/latest/userguide/what-is-kinesis-agent-windows.html) は Linux プラットフォーム用 Kinesis Agent とは異なるソフトウェアです。

### Kinesis Agent の速度が低下したり、`RecordSendErrors` が増加したりするのはなぜですか?
<a name="agent-faq-2"></a>

通常、これは Kinesis のスロットリングが原因です。Kinesis Data Streams の `WriteProvisionedThroughputExceeded` メトリクス、または Firehose 配信ストリームの `ThrottledRecords` メトリクスをチェックします。これらのメトリクスが 0 を超えている場合は、ストリームの上限を引き上げる必要があることを意味します。詳細については、「[Kinesis Data Stream limits](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)」と「[Amazon Firehose Delivery Streams](https://docs.aws.amazon.com/firehose/latest/dev/limits.html)」を参照してください。

スロットリングが原因ではないことがわかったら、Kinesis Agent が大量の小規模ファイルをテーリングするように設定されているかどうかを確認してください。Kinesis Agent が新しいファイルをテーリングするときには遅延が発生するため、少量の大きなファイルをテーリングするようにします。ログファイルを大きなファイルに統合してみてください。

### `java.lang.OutOfMemoryError` の例外が発生するのはなぜですか?
<a name="agent-faq-4"></a>

Kinesis Agent に、現在のワークロードを処理するための十分なメモリがないためです。`/usr/bin/start-aws-kinesis-agent` で `JAVA_START_HEAP` と `JAVA_MAX_HEAP` を増やしてエージェントを再起動してみてください。

### `IllegalStateException : connection pool shut down` の例外が発生するのはなぜですか?
<a name="agent-faq-5"></a>

Kinesis エージェントに、現在のワークロードを処理するための十分な接続がないためです。`/etc/aws-kinesis/agent.json` の一般的なエージェント設定で `maxConnections` と `maxSendingThreads` を増やしてみてください。これらのフィールドのデフォルト値は、使用可能なランタイムプロセッサの 12 倍です。高度なエージェント設定については、「[AgentConfiguration.java](https://github.com/awslabs/amazon-kinesis-agent/blob/master/src/com/amazon/kinesis/streaming/agent/config/AgentConfiguration.java)」を参照してください。

### Kinesis Agent に関する別の問題をデバッグする方法を教えてください。
<a name="agent-faq-6"></a>

`DEBUG` レベルログは `/etc/aws-kinesis/log4j.xml` で有効にできます。

### Kinesis Agent はどのように設定するとよいですか?
<a name="agent-faq-7"></a>

`maxBufferSizeBytes` の値が小さいほど、Kinesis Agent がデータを送信する頻度が高くなります。そのため、レコードの配信時間が短縮されますが、Kinesis への 1 秒あたりのリクエスト数も増えます。

### Kinesis Agent が重複レコードを送信するのはなぜですか?
<a name="agent-faq-8"></a>

これはファイルテーリングの設定ミスが原因です。各 `fileFlow’s filePattern` が それぞれ 1 つのファイルのみと一致するようにします。また、使用されている `logrotate` モードが `copytruncate` モードになっている場合にも発生することがあります。重複を避けるため、モードをデフォルトか作成モードに変更してみてください。重複レコードの処理に関する詳細は、「[Handling Duplicate Records](https://docs.aws.amazon.com/streams/latest/dev/kinesis-record-processor-duplicates.html)」を参照してください。

# 他の AWS サービスを使用して Kinesis Data Streams に書き込む
<a name="using-other-services"></a>

次の AWS サービスは、Amazon Kinesis Data Streams と直接統合して、Kinesis Data Streams にデータを書き込むことができます。関心のある各サービスの情報を確認し、提供されたリファレンスを参照してください。

**Topics**
+ [を使用して Kinesis Data Streams に書き込む AWS Amplify](using-other-services-amplify.md)
+ [Amazon Aurora を使用して Kinesis Data Stream に書き込む](using-other-services-aurora.md)
+ [Amazon CloudFront を使用して Kinesis Data Stream に書き込む](using-other-services-CloudFront.md)
+ [Amazon CloudWatch Logs を使用して Kinesis Data Streams に書き込む](using-other-services-cw-logs.md)
+ [Amazon Connect を使用して Kinesis Data Streams に書き込む](using-other-services-connect.md)
+ [を使用して Kinesis Data Streams に書き込む AWS Database Migration Service](using-other-services-migration.md)
+ [Amazon DynamoDB を使用して Kinesis Data Streams に書き込む](using-other-services-ddb.md)
+ [Amazon EventBridge を使用して Kinesis Data Streams に書き込む](using-other-services-eventbridges.md)
+ [を使用して Kinesis Data Streams に書き込む AWS IoT Core](using-other-services-iot-core.md)
+ [Amazon Relational Database Service を使用して Kinesis Data Streams に書き込む](using-other-services-rds.md)
+ [Amazon Pinpoint を使用して Kinesis Data Streams に書き込む](using-other-services-pinpoint.md)
+ [Amazon Quantum Ledger Database (Amazon QLDB) を使用して Kinesis Data Streams に書き込む](using-other-services-quantum-ledger.md)

# を使用して Kinesis Data Streams に書き込む AWS Amplify
<a name="using-other-services-amplify"></a>

Amazon Kinesis Data Streams を使用して、 AWS Amplify を使用して構築されたモバイルアプリケーションからのデータを、リアルタイムでの処理のためにストリーミングすることができます。その後、リアルタイムダッシュボードの構築、例外のキャプチャとアラートの生成、推奨事項の促進、およびビジネスや運用に関するその他のリアルタイムでの判断を行うことができます。また、Amazon Simple Storage Service、Amazon DynamoDB、Amazon Redshift など他のサービスにデータを送信することもできます。

詳細については、AWS Amplify Developer Center で [Using Amazon Kinesis](https://docs.amplify.aws/react/build-a-backend/more-features/analytics/streaming-data/) を参照してください。**

# Amazon Aurora を使用して Kinesis Data Stream に書き込む
<a name="using-other-services-aurora"></a>

Amazon Kinesis Data Streams を使用して Amazon Aurora DB クラスター上のアクティビティをモニタリングできます。Aurora DB クラスターは、データベースアクティビティストリームを使用することで、アクティビティを Amazon Kinesis データストリームにリアルタイムでプッシュします。その後、これらのアクティビティを消費し、監査して、アラートを生成する、コンプライアンス管理用のアプリケーションを構築できます。また、Amazon Amazon Firehose を使用してデータを保存することもできます。

詳細については、「Amazon Aurora デベロッパーガイド」の「[データベースアクティビティストリーム](https://docs.aws.amazon.com/AmazonRDS/latest/AuroraUserGuide/DBActivityStreams.html)」を参照してください。

# Amazon CloudFront を使用して Kinesis Data Stream に書き込む
<a name="using-other-services-CloudFront"></a>

CloudFront のリアルタイムログで Amazon Kinesis Data Streams を使用して、ディストリビューションに対して行われたリクエストに関する情報をリアルタイムで取得することができます。独自の [Kinesis データストリームコンシューマー](https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html)を構築することも、Amazon Data Firehose を使用してログデータを Amazon S3、Amazon Redshift、Amazon OpenSearch Service、またはサードパーティーのログ処理サービスに送信することもできます。

詳細については、「*Amazon CloudFront デベロッパーガイド*」の「[リアルタイムログ](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html)」を参照してください。

# Amazon CloudWatch Logs を使用して Kinesis Data Streams に書き込む
<a name="using-other-services-cw-logs"></a>

CloudWatch サブスクリプションを使用して Amazon CloudWatch Logs からのログイベントのリアルタイムフィードにアクセスし、カスタム処理、分析、および他のシステムへのロードを行うために、Kinesis データストリームに配信することができます。

詳細については、「Amazon CloudWatch Logs ユーザーガイド」の「[サブスクリプションを使用したログデータのリアルタイム処理](https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Subscriptions.html)」を参照してください。

# Amazon Connect を使用して Kinesis Data Streams に書き込む
<a name="using-other-services-connect"></a>

Kinesis Data Streams を使用して、Amazon Connect インスタンスからコンタクトレコードとエージェントイベントをリアルタイムでエクスポートできます。Amazon Connect Customer Profiles からのデータストリーミングを有効にして、新しいプロファイルの作成や既存のプロファイルへの変更に関する Kinesis データストリームへの更新情報を自動的に受け取ることもできます。

その後、コンシューマーアプリケーションを構築して、データをリアルタイムで処理し、分析することができます。例えば、コンタクトレコードやカスタマープロファイルデータを使用することで、最新情報を用いて CRM やマーケティング自動化ツールなどのソースシステムデータを最新の状態に保つことができます。エージェントイベントデータを使用すれば、エージェント情報やイベントを表示するダッシュボードを作成したり、特定のエージェントアクティビティに関するカスタム通知をトリガーしたりすることができます。

詳細については、「Amazon Connect 管理者ガイド」の「[インスタンスのデータストリーミング](https://docs.aws.amazon.com/connect/latest/adminguide/data-streaming.html)」、「[リアルタイムエクスポートの設定](https://docs.aws.amazon.com/connect/latest/adminguide/set-up-real-time-export.html)」、および「[エージェントのイベントストリーム](https://docs.aws.amazon.com/connect/latest/adminguide/agent-event-streams.html)」を参照してください。

# を使用して Kinesis Data Streams に書き込む AWS Database Migration Service
<a name="using-other-services-migration"></a>

を使用して AWS Database Migration Service 、Kinesis データストリームにデータを移行できます。その後、データレコードをリアルタイムで処理するコンシューマーアプリケーションを構築できます。また、Amazon Simple Storage Service、Amazon DynamoDB、および Amazon Redshift などのその他のダウンストリームサービスに、データを簡単に送信することもできます

詳細については、「AWS Database Migration Service ユーザーガイド」の「[ゲートウェイを使用したデータの取り込み](https://docs.aws.amazon.com/dms/latest/userguide/CHAP_Target.Kinesis.html)」を参照してください。

# Amazon DynamoDB を使用して Kinesis Data Streams に書き込む
<a name="using-other-services-ddb"></a>

Amazon Kinesis Data Streams を使用して Amazon DynamoDB への変更をキャプチャできます。Kinesis Data Streams は、DynamoDB テーブルの項目レベルの変更をキャプチャーし、それらを Kinesis Data Streams にレプリケートします。コンシューマーアプリケーションは、このストリームにアクセスして項目レベルの変更をリアルタイムで確認し、これらの変更をダウンストリームに配信したり、内容に基づいてアクションを実行したりすることができます。

詳細については、「Amazon DynamoDB デベロッパーガイド」の「[Kinesis Data Streams の DynamoDB との連携について](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/kds.html)」を参照してください。

# Amazon EventBridge を使用して Kinesis Data Streams に書き込む
<a name="using-other-services-eventbridges"></a>

Kinesis Data Streams を使用すると、EventBridge の AWS API コール[イベント](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-events.html)をストリームに送信し、コンシューマーアプリケーションを構築し、大量のデータを処理できます。また、Kinesis Data Streams を EventBridge Pipes 内のターゲットとして使用して、オプションのフィルタリングと強化を行った後で、利用可能なソースの 1 つからストリームにレコードを配信することもできます。

詳細については、「Amazon EventBridge ユーザーガイド」の「[Amazon Kinesis ストリームにイベントを送信する](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-relay-events-kinesis-stream.html)」と「[EventBridge Pipes](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)」を参照してください。

# を使用して Kinesis Data Streams に書き込む AWS IoT Core
<a name="using-other-services-iot-core"></a>

IoT ルールアクションを使用して AWS AWS IoT Core の MQTT メッセージからリアルタイムでデータを書き込むことができます。その後、書き込まれたデータを処理し、内容を分析してアラートを生成するとともに、データを分析アプリケーションや他の AWS サービスに配信するアプリケーションを構築できます。

詳細については、「AWS IoT デベロッパーガイド」の「[Kinesis Data Streams](https://docs.aws.amazon.com/iot/latest/developerguide/kinesis-rule-action.html)」を参照してください。

# Amazon Relational Database Service を使用して Kinesis Data Streams に書き込む
<a name="using-other-services-rds"></a>

Amazon Kinesis Data Streams を使用して、Amazon RDS インスタンス上のアクティビティをモニタリングできます。Amazon RDS は、データベースアクティビティストリームを使用することで、アクティビティを Kinesis データストリームにリアルタイムでプッシュします。その後、これらのアクティビティを消費し、監査して、アラートを生成する、コンプライアンス管理用のアプリケーションを構築できます。また、Amazon Data Firehose を使用してデータを保存することもできます。

詳細については、「Amazon RDS デベロッパーガイド」の「[データベースアクティビティストリーム](https://docs.aws.amazon.com/AmazonRDS/latest/UserGuide/DBActivityStreams.html)」を参照してください。

# Amazon Pinpoint を使用して Kinesis Data Streams に書き込む
<a name="using-other-services-pinpoint"></a>

Amazon Pinpoint は、Amazon Kinesis Data Streams にイベントデータを送信するように設定することができます。Amazon Pinpoint は、キャンペーン、ジャーニー、トランザクション用の Eメールや SMS メッセージのイベントデータを送信することができます。その後、データを分析アプリケーションに取り込むか、イベントの内容に基づいてアクションを実行する独自のコンシューマーアプリケーションを構築することができます。

詳細については、「Amazon Pinpoint Developer Guide」の「[Streaming Events](https://docs.aws.amazon.com/pinpoint/latest/developerguide/event-streams.html)」を参照してください。

# Amazon Quantum Ledger Database (Amazon QLDB) を使用して Kinesis Data Streams に書き込む
<a name="using-other-services-quantum-ledger"></a>

ジャーナルにコミットされたすべてのドキュメントリビジョンをキャプチャして、このデータを Amazon Kinesis Data Streams にリアルタイムで配信するストリームを Amazon QLDB で作成することができます。QLDB ストリーミングは、台帳のジャーナルから Kinesis データストリームリソースへの連続的なデータのフローです。その後、Kinesis ストリーミングプラットフォーム、または Kinesis Client Library を使用して、ストリームを消費し、データレコードを処理して、データコンテンツを分析することができます。QLDB ストリームは、`control`、`block summary`、および `revision details` の 3 つのタイプで Kinesis Data Streams にデータを書き込みます。

詳細については、「Amazon QLDB Developer Guide」の「[Streams](https://docs.aws.amazon.com/qldb/latest/developerguide/streams.html)」を参照してください。

# サードパーティーの統合を使用して Kinesis Data Streams に書き込む
<a name="using-other-services-third-party"></a>

Kinesis Data Streams には、Kinesis Data Streams と統合する以下のサードパーティーオプションのいずれかを使用してデータを書き込むことができます。詳細を確認し、関連ドキュメントのリソースとリンクを検索するオプションを選択します。

**Topics**
+ [Apache Flink](using-other-services-flink.md)
+ [Fluentd](using-other-services-Fluentd.md)
+ [Debezium](using-other-services-Debezium.md)
+ [Oracle GoldenGate](using-other-services-Oracle-GoldenGate.md)
+ [Kafka Connect](using-other-services-kafka-connect.md)
+ [Adobe Experience](using-other-services-adobe.md)
+ [Striim](using-other-services-Striim.md)

# Apache Flink
<a name="using-other-services-flink"></a>

Apache Flink は、制限なしおよび制限付きのデータストリームでのステートフル計算のためのフレームワークかつ分散処理エンジンです。Apache Flink から Kinesis Data Streams への書き込みの詳細については、「[Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/datastream/kinesis/)」を参照してください。

# Fluentd
<a name="using-other-services-Fluentd"></a>

Fluentd は、統合ロギングレイヤーのためのオープンソースデータコレクターです。Fluentd から Kinesis Data Streams への書き込みの詳細については、「[Stream Processing with Kinesis](https://docs.fluentd.org/how-to-guides/kinesis-stream)」を参照してください。

# Debezium
<a name="using-other-services-Debezium"></a>

Debezium は、変更データキャプチャのためのオープンソースの分散型プラットフォームです。Debezium から Kinesis Data Streams への書き込みの詳細については、「[Streaming MySQL Data Changes to Amazon Kinesis](https://debezium.io/blog/2018/08/30/streaming-mysql-data-changes-into-kinesis/)」を参照してください。

# Oracle GoldenGate
<a name="using-other-services-Oracle-GoldenGate"></a>

Oracle GoldenGate は、1 つのデータベースから別のデータベースへのデータのレプリケーション、フィルタリング、および変換を可能にするソフトウェア製品です。Oracle GoldenGate から Kinesis Data Streams への書き込みの詳細については、「[Data replication to Kinesis Data Stream using Oracle GoldenGate](https://blogs.oracle.com/dataintegration/post/data-replication-to-aws-kinesis-data-stream-using-oracle-goldengate)」を参照してください。

# Kafka Connect
<a name="using-other-services-kafka-connect"></a>

Kafka Connect は、Apache Kafka と他のシステムの間でデータをスケーラブルかつ確実にストリーミングするためのツールです。Apache Kafka から Kinesis Data Streams へのデータの書き込みの詳細については、「[Kinesis kafka connector](https://github.com/awslabs/kinesis-kafka-connector)」を参照してください。

# Adobe Experience
<a name="using-other-services-adobe"></a>

Adobe Experience Platform は、組織があらゆるシステムからの顧客データを一元化して標準化することを可能にします。その後、データサイエンスと機械学習を適用して、充実感のあるパーソナライズされたエクスペリエンスの設計と提供を劇的に向上させます。Adobe Experience Platform から Kinesis Data Streams へのデータの書き込みの詳細については、[Amazon Kinesis connection](https://experienceleague.adobe.com/docs/experience-platform/destinations/catalog/cloud-storage/amazon-kinesis.html?lang=en) の作成方法を参照してください。

# Striim
<a name="using-other-services-Striim"></a>

Striim は、リアルタイムでのデータの収集、フィルタリング、変換、強化、集約、分析、および配信のための完全なエンドツーエンドのインメモリプラットフォームです。Striim から Kinesis Data Streams にデータを書き込む方法の詳細については、「[Kinesis Writer](https://www.striim.com/docs/en/kinesis-writer.html)」を参照してください。

# Amazon Kinesis Data Streams プロデューサーのトラブルシューティング
<a name="troubleshooting-producers"></a>

**Topics**
+ [プロデューサーアプリケーションの書き込みの速度が予想よりも遅い](#producer-writing-at-slower-rate)
+ [不正な KMS マスターキーアクセス許可エラーが表示される](#unauthorized-kms-producer)
+ [プロデューサーのその他の一般的な問題のトラブルシューティング](#misc-troubleshooting-producer)

## プロデューサーアプリケーションの書き込みの速度が予想よりも遅い
<a name="producer-writing-at-slower-rate"></a>

**Topics**
+ [サービスの制限を超過している](#service-limits-exceeded)
+ [プロデューサーを最適化したい](#producer-optimization)
+ [`flushSync()` オペレーションの誤用](#misuse-tag)

### サービスの制限を超過している
<a name="service-limits-exceeded"></a>

サービスの制限を超過している 呼び出しによって制限が異なることに注意して、[クォータと制限](service-sizes-and-limits.md) を確認してください。たとえば、書き込みと読み取りのシャードレベルの制限は最もよく知られていますが、以下のようなストリームレベルの制限もあります。
+ [CreateStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_CreateStream.html)
+ [DeleteStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeleteStream.html)
+ [ListStreams](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListStreams.html)
+ [GetShardIterator](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)
+ [MergeShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_MergeShards.html)
+ [DescribeStream](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStream.html)
+ [DescribeStreamSummary](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html)

`CreateStream`、`DeleteStream`、`ListStreams`、 `GetShardIterator`、`MergeShards` のオペレーションは、1 秒あたり 5 個の呼び出しに制限されます。`DescribeStream` オペレーションは、1 秒あたり 10 個の呼び出しに制限されます。`DescribeStreamSummary` オペレーションは、1 秒あたり 20 個の呼び出しに制限されます。

このような呼び出しが原因でない場合は、選択したパーティションキーを使用してすべてのシャードに *put* オペレーションを均等に分散できること、どのパーティションキーもサービスの制限に達していないことを確認します。これには、ピークスループットを測定して、ストリームのシャードの数を考慮する必要があります。ストリーム管理の詳細については、[Kinesis Data Streams を作成して管理する](working-with-streams.md)を参照してください。

**ヒント**  
シングルレコードオペレーション [PutRecord](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) では、スループットスロットリングの計算結果がキロバイト単位に四捨五入されます。マルチレコードオペレーション [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) では、各セルのレコードの累計が四捨五入されます。たとえば、`PutRecords` は 1.1 KB になる 600 レコードのリクエストをスロットリングしません。

### プロデューサーを最適化したい
<a name="producer-optimization"></a>

プロデューサーの最適化を始める前に、次の重要なタスクを完了しておく必要があります。最初に、レコードのサイズと 1 秒あたりのレコード数で必要となるスループットピークを特定します。次に、制限要素としてのストリーム容量を除外します ([サービスの制限を超過している](#service-limits-exceeded))。ストリーム容量を除外している場合は、以下のプロデューサーの 2 つの一般的なタイプのトラブルシューティングのヒントと最適化のガイドラインを使用します。

**ラージプロデューサー**

ラージプロデューサーは、通常オンプレミスサーバーまたは Amazon EC2 インスタンスから実行されます。ラージプロデューサーからより高いスループットを必要とするお客様は、通常レコードあたりのレイテンシーに注意を払います。レイテンシーを処理する戦略として、お客様がレコードをマイクロバッチ/バッファできる場合は、[Amazon Kinesis Producer Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-producers-with-kpl.html) (高度な集約ロジックがある) を使用するか、マルチレコードオペレーション [PutRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecords.html) を使用するか、レコードをより大きいファイルに集約してからシングルレコードオペレーション [PutRecord](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) を使用します。バッチ/バッファを使用できない場合は、複数のスレッドを使用して Kinesis Data Streams サービスに同時に書き込みます。 AWS SDK for Java およびその他の SDKsには、ごくわずかなコードでこれを行うことができる非同期クライアントが含まれています。

**スモールプロデューサー**

スモールプロデューサーは、通常モバイルアプリケーション、IoT デバイス、またはウェブクライアントです。モバイルアプリの場合は、 AWS Mobile SDKs で `PutRecords`オペレーションまたは Kinesis Recorder を使用することをお勧めします。詳細については、 AWS Mobile SDK for Android 「 入門ガイド」および AWS Mobile SDK for iOS 「 入門ガイド」を参照してください。モバイルアプリケーションは、本来断続的な接続を処理する必要があり、`PutRecords` のようなバッチ put タイプを必要とします。何らかの理由でバッチを使用できない場合は、上記のラージプロデューサーの情報を参照してください。プロデューサーがブラウザの場合、生成されるデータの量は通常非常に小さなものとなります。ただし、アプリケーションの重要なパスに *put* オペレーションを配置することはお勧めしません。

### `flushSync()` オペレーションの誤用
<a name="misuse-tag"></a>

`flushSync()` を誤って使用すると、書き込みパフォーマンスに大きな悪影響を及ぼす可能性があります。`flushSync()` オペレーションは、KPL アプリケーション終了時にバッファされたすべてのレコードが送信されることを保証するための、シャットダウン時専用の操作です。このオペレーションを書き込みのたびに実行するように実装している場合、1 回の書き込みごとに約 500 ミリ秒 の大きな遅延が追加されます。書き込みパフォーマンスの不要な遅延を避けるため、`flushSync()` はアプリケーションのシャットダウン時のみ実行されるように実装されていることを確認してください。

## 不正な KMS マスターキーアクセス許可エラーが表示される
<a name="unauthorized-kms-producer"></a>

このエラーは、プロデューサーアプリケーションが KMS マスターキーに対するアクセス許可なしで、暗号化されたストリームに書き込みを行うときに発生します。KMS キーにアクセスする許可をアプリケーションに割り当てるには、[KMS でのキーポリシーの使用](https://docs.aws.amazon.com/kms/latest/developerguide/key-policies.html)および[AWS KMS での IAM ポリシーの使用](https://docs.aws.amazon.com/kms/latest/developerguide/iam-policies.html)を参照してください。

## プロデューサーのその他の一般的な問題のトラブルシューティング
<a name="misc-troubleshooting-producer"></a>
+ [Kinesis データストリームで 500 内部サーバーエラーが返されるのはなぜですか?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-500-error/)
+ [Flink から Kinesis Data Streams に書き込むときのタイムアウトエラーのトラブルシューティング方法を教えてください。](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-flink-timeout/)
+ [Kinesis Data Streams のスロットリングエラーのトラブルシューティング方法を教えてください。](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-throttling-errors/)
+ [Kinesis Data Streams がスロットリングされるのはなぜですか?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-throttling/)
+ [KPL を使用して、データレコードを Kinesis Data Streams に入れるにはどうすればよいですか?](https://aws.amazon.com/premiumsupport/knowledge-center/kinesis-data-stream-kpl/)

# Kinesis Data Streams プロデューサーを最適化する
<a name="advanced-producers"></a>

表示される特定の動作に応じて、Amazon Kinesis Data Streams プロデューサーをさらに最適化できます。以下のトピックを確認して、解決策を特定します。

**Topics**
+ [KPL の再試行とレート制限の動作をカスタマイズする](kinesis-producer-adv-retries-rate-limiting.md)
+ [KPL 集約にベストプラクティスを適用する](kinesis-producer-adv-aggregation.md)

# KPL の再試行とレート制限の動作をカスタマイズする
<a name="kinesis-producer-adv-retries-rate-limiting"></a>

KPL `addUserRecord()` オペレーションを使用して Amazon Kinesis Producer Library (KPL) ユーザーレコードを追加すると、レコードはタイムスタンプが付けられて、`RecordMaxBufferedTime` 設定パラメータで期限が設定されたバッファに追加されます。このタイムスタンプと期限の組み合わせにより、バッファの優先順位が設定されます。レコードは、次の条件に基づいてバッファからフラッシュされます。
+ バッファの優先度
+ 集約設定
+ 収集設定

バッファの動作に影響を与える集約および収集の設定パラメータは次のとおりです。
+ `AggregationMaxCount`
+ `AggregationMaxSize`
+ `CollectionMaxCount`
+ `CollectionMaxSize`

フラッシュされたレコードは、Kinesis Data Streams API オペレーション `PutRecords` の呼び出しを使用して Amazon Kinesis Data Streams レコードとして Kinesis Data Streams に送信されます。`PutRecords` オペレーションはストリームにリクエストを送信しますが、すべての失敗または部分的な失敗を示す場合があります。失敗したレコードは、自動的に KPL バッファに戻されます。新しい期限は、次の 2 つの値のうち小さい方に基づいて設定されます。
+ 現在の `RecordMaxBufferedTime` 設定の半分
+ レコードの有効期限値

この戦略では、再試行する KPL ユーザーレコードをそれ以降の Kinesis Data Streams API コールに含めることができ、Kinesis Data Streams レコードの有効期限値を適用しながら、スループットを改善し、複雑さを減らすことができます。バックオフアルゴリズムがないため、これは比較的積極的な再試行戦略です。過剰な再試行による大量送信は、次のセクションで説明するレート制限により防止できます。

## レート制限
<a name="kinesis-producer-adv-retries-rate-limiting-rate-limit"></a>

KPL にはレート制限機能があり、1 つのプロデューサーからの送信されるシャード単位のスループットを制限できます。レート制限は、Kinesis Data Streams のレコードとバイトに別々のバケットを使用するトークンバケットアルゴリズムを使用して実装されています。Kinesis Data Streams への書き込みが成功するたびに、特定のしきい値に達するまで、各バケットに 1 つまたは複数のトークンが追加されます。このしきい値は設定できますが、デフォルトでは実際のシャード制限より 50 パーセント大きく設定され、単一のプロデューサーによるシャードの飽和が許されています。

この制限を小さくすることにより、過剰な再試行による大量送信を抑制できます。ただし、ベストプラクティスは、各プロデューサーについて、最大スループットまで積極的に再試行することと、ストリームの容量を拡大し、適切なパーティションキー戦略を実装することにより、結果的に過剰と判断されたスロットリングを適切に処理することです。

# KPL 集約にベストプラクティスを適用する
<a name="kinesis-producer-adv-aggregation"></a>

結果として得られた Amazon Kinesis Data Streams レコードのシーケンス番号方式は同じままですが、集約された Kinesis Data Streams レコードに含まれる Amazon Kinesis Producer Library (KPL) ユーザーレコードのインデックス作成は 0 (ゼロ) から始まります。ただし、シーケンス番号に依存しない方法で KPL ユーザーレコードを一意に識別する限り、集約 (KPL ユーザーレコードの Kinesis Data Streams レコードへの集約) とその後の集約解除 ( Kinesis Data Streams レコードの KPL ユーザーレコードへの集約解除) で自動的に考慮されるため、このようにインデックス作成が 0 (ゼロ) から始まることをコード上は無視してかまいません。これは、コンシューマーが KCL または AWS SDK を使用しているかどうかにかかわらず適用されます。この集約機能を使用するには、コンシューマーが AWS SDK で提供されている API を使用して記述されている場合、KPL の Java 部分をビルドにプルする必要があります。

KPL ユーザーレコードの一意な識別子としてシーケンス番号を使用する場合、`Record` および `UserRecord` に提供されている、契約に順守した `public int hashCode()` および `public boolean equals(Object obj)` オペレーションを使用して、KPL ユーザーレコードの比較を有効にすることをお勧めします。さらに、KPL ユーザーレコードのサブシーケンス番号を調べる必要がある場合は、そのユーザーレコードを `UserRecord` インスタンスにキャストして、そのサブシーケンス番号を取得することができます。

詳細については、[コンシューマーの集約解除を実装する](kinesis-kpl-consumer-deaggregation.md)を参照してください。