

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon Kinesis Producer Library (KPL) を使用してプロデューサーを開発する
<a name="developing-producers-with-kpl"></a>

Amazon Kinesis Data Streams プロデューサーは、ユーザーデータレコードを Kinesis data stream に配置する (*データの取り込み*とも呼ばれます) アプリケーションです。Amazon Kinesis Producer Library (KPL) を使用すると、プロデューサーアプリケーションの開発が簡素化され、デベロッパーは Kinesis Data Streams に対する優れた書き込みスループットを実現できます。

Amazon CloudWatch で KPL をモニタリングできます。詳細については、「[Amazon CloudWatch を使用した Kinesis Client Library を監視する](monitoring-with-kpl.md)」を参照してください。

**Topics**
+ [KPL のロールを確認する](#developing-producers-with-kpl-role)
+ [KPL を使用するメリットを実現する](#developing-producers-with-kpl-advantage)
+ [KPL を使用しないタイミングを理解する](#developing-producers-with-kpl-when)
+ [KPL をインストールする](kinesis-kpl-dl-install.md)
+ [KPL 0.x から KPL 1.x に移行する](kpl-migration-1x.md)
+ [KPL の Amazon Trust Services (ATS) 証明書への移行](kinesis-kpl-upgrades.md)
+ [KPL でサポートされるプラットフォーム](kinesis-kpl-supported-plats.md)
+ [KPL の主要な概念](kinesis-kpl-concepts.md)
+ [KPL をプロデューサーコードと統合する](kinesis-kpl-integration.md)
+ [KPL を使用して Kinesis Data Stream に書き込む](kinesis-kpl-writing.md)
+ [Amazon Kinesis Producer Library を設定する](kinesis-kpl-config.md)
+ [コンシューマーの集約解除を実装する](kinesis-kpl-consumer-deaggregation.md)
+ [Amazon Data Firehose で KPL を使用する](kpl-with-firehose.md)
+ [Schema Registry で KPL AWS Glue を使用する](kpl-with-schemaregistry.md)
+ [KPL プロキシ設定を設定する](kpl-proxy-configuration.md)
+ [KPL バージョンライフサイクルポリシー](kpl-version-lifecycle-policy.md)

**注記**  
KPL は、最新バージョンにアップグレードすることが推奨されます。KPL は新しいリリースに伴って定期的に更新されています。これには、最新の依存関係パッチ、セキュリティパッチ、バグ修正、および下位互換性のある新機能が含まれます。詳細については、[https://github.com/awslabs/amazon-kinesis-producer/releases/](https://github.com/awslabs/amazon-kinesis-producer/releases/) を参照してください。

## KPL のロールを確認する
<a name="developing-producers-with-kpl-role"></a>

KPL は使いやすく、Kinesis Data Streams への書き込みに役立つ、高度な設定可能ライブラリです。これは、プロデューサーアプリケーションのコードと Kinesis Data Streams API アクション間の仲介として機能します。KPL は次の主要なタスクを実行します。
+ 自動的で設定可能な再試行メカニズムにより 1 つ以上の Kinesis Data Streams へ書き込む
+ レコードを収集し、`PutRecords` を使用して、リクエストごとに複数シャードへ複数レコードを書き込む
+ ユーザーレコードを集約し、ペイロードサイズを増加させ、スループットを改善する
+ コンシューマーで [Kinesis Client Library](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html) (KCL) とシームレスに統合して、バッチ処理されたレコードを集約解除する
+ Amazon CloudWatch メトリクスをユーザーに代わって送信し、プロデューサーのパフォーマンスを確認可能にする

KPL は [AWS SDK](https://aws.amazon.com/tools/) で使用できる Kinesis Data Streams API とは異なることに注意してください。Kinesis Data Streams API では Kinesis Data Streams の多くの機能 (ストリームの作成、リシャーディング、レコードの入力と取得など) を管理できます。KPL はデータの取り込みに特化した抽象化レイヤーを提供します。Kinesis Data Streams API の詳細については、[Amazon Kinesis API リファレンス](https://docs.aws.amazon.com/kinesis/latest/APIReference/)を参照してください。

## KPL を使用するメリットを実現する
<a name="developing-producers-with-kpl-advantage"></a>

Kinesis Data Streams プロデューサーの開発に KPL を使用する主な利点を以下に示します。

KPL は、同期または非同期のユースケースで使用できます。同期動作を使用する特別な理由がないかぎり、非同期インターフェイスの優れたパフォーマンスを使用することを推奨します。これら 2 つのユースケースの詳細とコード例については、[KPL を使用して Kinesis Data Stream に書き込む](kinesis-kpl-writing.md)を参照してください。

 **パフォーマンスのメリット**   
KPL は、高性能のプロデューサーの構築に役立ちます。Amazon EC2 インスタンスをプロキシとして使用し、100 バイトのイベントを数百または数千の低電力デバイスから収集して、レコードを Kinesis Data Streams に書き込む場合を考えてみます。これらの EC2 インスタンスはそれぞれ、毎秒数千イベントをデータストリームに書き込む必要があります。必要なスループットを実現するには、お客様の側で、再試行ロジックとレコード集約解除に加え、バッチ処理やマルチスレッドなどの複雑なロジックをプロデューサーに実装する必要があります。KPL が、これらのタスクをすべて実行します。

 **コンシューマー側の使いやすさ**   
コンシューマー側のデベロッパーが Java で KCL を使用する場合、追加作業なしで KPL が統合されます。KCL で、複数の KPL ユーザーレコードで構成されている集約された Kinesis Data Streams レコードを取得するときは、自動的に KPL が呼び出され、個々のユーザーレコードが抽出され、ユーザーに返されます。  
KCL を使用せずに API オペレーション `GetRecords` を直接使用するコンシューマー側のデベロッパーの場合、KPL Java ライブラリを使用して個々のユーザーレコードを抽出して、これらのレコードをユーザーに返すことができます。

 **プロデューサーのモニタリング**   
Amazon CloudWatch と KPL を使用して、Kinesis Data Streams プロデューサーを収集、モニタリング、分析できます。KPL は、スループット、エラー、およびその他のメトリクスをユーザーに代わって CloudWatch に送信し、ストリーム、シャード、またはプロデューサーレベルでモニタリングするように設定できます。

 **非同期アーキテクチャ**   
KPL は、レコードを Kinesis Data Streams に送信する前にそれらのレコードをバッファ処理する場合があるため、ランタイムを続行する前にレコードがサーバーに到着したことを確認するために、発信者アプリケーションを強制的にブロックし待機させることはしません。レコードを KPL に配置する呼び出しは、必ずすぐに処理が戻り、レコードの送信やサーバーからの応答の受信を待ちません。代わりに、レコードを Kinesis Data Streams に送信した結果を後で受信するための `Future` オブジェクトが作成されます。これは AWS SDK の非同期クライアントと同じ動作です。

## KPL を使用しないタイミングを理解する
<a name="developing-producers-with-kpl-when"></a>

KPL では、ライブラリ内で最大 `RecordMaxBufferedTime` まで追加の処理遅延が生じる場合があります (ユーザーが設定可能)。`RecordMaxBufferedTime` の値が大きいほど、パッキング効率とパフォーマンスが向上します。この追加的な遅延を許容できないアプリケーションは、 AWS SDK を直接使用することが必要になる場合があります。Kinesis Data Streams で AWS SDK を使用する方法の詳細については、「」を参照してください[で Amazon Kinesis Data Streams API を使用してプロデューサーを開発する AWS SDK for Java](developing-producers-with-sdk.md)。`RecordMaxBufferedTime` やその他のユーザー設定可能な KPLのプロパティの詳細については、[Amazon Kinesis Producer Library を設定する](kinesis-kpl-config.md) を参照してください。

# KPL をインストールする
<a name="kinesis-kpl-dl-install"></a>

Amazon では、macOS、Windows、最新の Linux ディストリビューション向けに C\$1\$1 Amazon Kinesis Producer Library (KPL) のビルド済みバイナリを提供しています (サポートされているプラッフォームの詳細については、次のセクションを参照してください)。これらのバイナリは、Java の .jar ファイルの一部としてパッケージ化されており、Maven を使用してパッケージをインストールする場合、自動的に呼び出され、使用されます。KPL と KCL の最新バージョンを確認するには、次の Maven 検索リンクをご利用ください。
+ [KPL](https://search.maven.org/#search|ga|1|amazon-kinesis-producer)
+ [KCL](https://search.maven.org/#search|ga|1|amazon-kinesis-client)

Linux のバイナリは、GNU コンパイラコレクション (GCC) でコンパイルされ、Linux の libstdc\$1\$1 に静的にリンクされています。これらのバイナリは、glibc バージョン 2.5 以降を含むすべての 64 ビット Linux ディストリビューションで動作することが推定されています。

以前のバージョンの Linux ディストリビューションのユーザーは、GitHub のソースとともに提供されるビルド手順で KPL をビルドできます。KPL を GitHub からダウンロードするには、[Amazon Kinesis Producer Library](https://github.com/awslabs/amazon-kinesis-producer) を参照してください。

**重要**  
Amazon Kinesis Producer Library (KPL) 0.x は、2026 年 1 月 30 日にサポートが終了します。2026 年 1 月 30 日より前に、バージョン 0.x を使用して KPL アプリケーションを最新の KPL バージョンに移行することを**強くお勧めします**。最新の KPL バージョンについては、[Github の KPL ページ](https://github.com/awslabs/amazon-kinesis-producer)を参照してください。KPL 0.x から KPL 1.x への移行については、[KPL 0.x から KPL 1.x に移行する](kpl-migration-1x.md) を参照してください。

# KPL 0.x から KPL 1.x に移行する
<a name="kpl-migration-1x"></a>

このトピックでは、コンシューマーを KPL 0.x から KPL 1.x に移行する方法について、段階的に説明します。KPL 1.x では、以前のバージョンとのインターフェイス互換性を維持しながら、 AWS SDK for Java 2.x のサポートが導入されています。KPL 1.x へ移行する際、データ処理の中核となるロジックを更新する必要はありません。

1. **次の前提条件を満たしていることを確認する**
   + Java Development Kit (JDK) 8 以降
   + AWS SDK for Java 2.x
   + 依存関係管理用の Maven または Gradle

1. **依存関係を追加する**

   Maven を使用している場合は、以下の依存関係を pom.xml ファイルに追加します。必ず groupId を `com.amazonaws` から `software.amazon.kinesis` に更新し、バージョンを `1.x.x` から最新の KPL バージョンに更新してください。

   ```
   <dependency>
       <groupId>software.amazon.kinesis</groupId>
       <artifactId>amazon-kinesis-producer</artifactId>
       <version>1.x.x</version> <!-- Use the latest version -->
   </dependency>
   ```

   Gradle を使用している場合は、以下を `build.gradle` ファイルに追加します。必ず `1.x.x` を最新の KPCL バージョンに置き換えてください。

   ```
   implementation 'software.amazon.kinesis:amazon-kinesis-producer:1.x.x'
   ```

   最新の KPL バージョンは、[Maven Central Repository](https://central.sonatype.com/search?q=amazon-kinesis-producer) で確認できます。

1. **KPL の import ステートメントを更新する**

   KPL 1.x は AWS SDK for Java 2.x を使用し、 で始まる更新されたパッケージ名を使用します。これは`software.amazon.kinesis`、 で始まる以前の KPL のパッケージ名とは異なります`com.amazonaws.services.kinesis`。

   `com.amazonaws.services.kinesis` の import ステートメントを `software.amazon.kinesis` に置き換えます。次の表に、置き換える必要がある import ステートメントを示します。  
**Import の置き換え**    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/streams/latest/dev/kpl-migration-1x.html)

1. ** AWS 認証情報プロバイダークラスのインポートステートメントを更新する**

   KPL 1.x に移行するときは、 AWS SDK for Java 1.x に基づく KPL アプリケーションコードのインポートのパッケージとクラスを、 AWS SDK for Java 2.x に基づく対応するパッケージとクラスに更新する必要があります。KPL アプリケーションでよく使用される import ステートメントは、認証情報プロバイダークラスにあります。[認証情報プロバイダーの変更の完全なリストについては、2.x 移行ガイドのドキュメントの](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html)「認証情報プロバイダーの変更」を参照してください。 AWS SDK for Java 以下は、KPL アプリケーションで一般的に必要となる import ステートメントの変更例です。

   **KPL 0.x の import ステートメント**

   ```
   import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
   ```

   **KPL 1.x の import ステートメント**

   ```
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   ```

    AWS SDK for Java 1.x に基づいて他の認証情報プロバイダーをインポートする場合は、それらを AWS SDK for Java 2.x に相当するものに更新する必要があります。 AWS SDK for Java 1.x からクラス/パッケージをインポートしなかった場合は、このステップを無視できます。

1. **KPL 設定内の認証情報プロバイダー設定を更新する**

   KPL 1.x の認証情報プロバイダー設定には、 AWS SDK for Java 2.x の認証情報プロバイダーが必要です。デフォルトの認証情報プロバイダーを上書き`KinesisProducerConfiguration`して の AWS SDK for Java 1.x の認証情報プロバイダーを渡す場合は、 AWS SDK for Java 2.x 認証情報プロバイダーで更新する必要があります。[認証情報プロバイダーの変更の一覧については、2.x 移行ガイドのドキュメントの](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/migration-client-credentials.html)「認証情報プロバイダーの変更」を参照してください。 AWS SDK for Java KPL 設定でデフォルトの認証情報プロバイダーを上書きしていない場合は、この手順は無視できます。

   たとえば、KPL のデフォルト認証情報プロバイダーを次のコードで上書きしている場合です。

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // SDK v1 default credentials provider
   config.setCredentialsProvider(new DefaultAWSCredentialsProviderChain());
   ```

    AWS SDK for Java 2.x の認証情報プロバイダーを使用するためには、これらを次のコードに更新する必要があります。

   ```
   KinesisProducerConfiguration config = new KinesisProducerConfiguration();
   // New SDK v2 default credentials provider
   config.setCredentialsProvider(DefaultCredentialsProvider.create());
   ```

# KPL の Amazon Trust Services (ATS) 証明書への移行
<a name="kinesis-kpl-upgrades"></a>

2018 年 2 月 9 日の午前 9:00 (太平洋標準時) に、Amazon Kinesis Data Streams は ATS 証明書をインストールしました。Kinesis Producer Library (KPL) を使用して、Amazon Kinesis Data Streams にレコードを継続して書き込むには、KPL のインストールを[バージョン 0.12.6](http://search.maven.org/#artifactdetails|com.amazonaws|amazon-kinesis-producer|0.12.6|jar) 以降にアップグレードする必要があります。この変更は、すべての AWS リージョンに影響します。

ATS への移行の詳細については、[AWS「独自の認証機関への移行を準備する方法](https://aws.amazon.com/blogs/security/how-to-prepare-for-aws-move-to-its-own-certificate-authority/)」を参照してください。

問題が発生し、技術サポートが必要な場合は、 AWS サポートセンターで[サポートケースを作成](https://console.aws.amazon.com/support/v1#/case/create)してください。

# KPL でサポートされるプラットフォーム
<a name="kinesis-kpl-supported-plats"></a>

Amazon Kinesis Producer Library (KPL) は、C\$1\$1 で書かれており、メインユーザープロセスの子プロセスとして実行されます。プリコンパイルされている 64 ビットのネイティブバイナリは、Java ベースにバンドルされており、Java wrapper によって管理されます。

次のオペレーティングシステムでは、追加ライブラリをインストールすることなく Java のパッケージを実行できます。
+ カーネルバージョン 2.6.18 (2006 年 9 月) の Linux ディストリビューション以降
+ Apple iOS X 10.9 以降
+ Windows Server 2008 以降
**重要**  
Windows Server 2008 以降は、バージョン 0.14.0 までのすべての KPL バージョンでサポートされています。  
Windows プラットフォームは、KPL バージョン 0.14.0 以降ではサポートされていません。

KPL は、64 ビット版のみであることに注意してください。

## ソースコード
<a name="kinesis-kpl-supported-plats-source-code"></a>

KPL のインストールで提供されるバイナリがお客様の環境に適さない場合は、KPL のコアが C\$1\$1 のモジュールとして書き込まれます。C\$1\$1 モジュールと Java インターフェイスのソースコードは、Amazon パブリックライセンスの下で公開され、GitHub の [Amazon Kinesis Producer Library](https://github.com/awslabs/amazon-kinesis-producer) で入手できます。KPL は、最近の規格に準拠した C\$1\$1 コンパイラと JRE を使用できるすべてのプラットフォームで使用できますが、Amazon では、サポートされるプラットフォームの一覧にないプラットフォームを正式にはサポートしません。

# KPL の主要な概念
<a name="kinesis-kpl-concepts"></a>

以下のセクションでは、Amazon Kinesis Producer Library (KPL) を理解し、その利点を引き出すために必要な概念と用語について説明します。

**Topics**
+ [レコード](#kinesis-kpl-concepts-records)
+ [バッチ処理](#kinesis-kpl-concepts-batching)
+ [集計](#kinesis-kpl-concepts-aggretation)
+ [収集](#kinesis-kpl-concepts-collection)

## レコード
<a name="kinesis-kpl-concepts-records"></a>

このガイドでは、*KPL ユーザーレコード*と *Kinesis Data Streams レコード*を区別します。修飾語を付けずに*レコード*という用語を使用する場合は、*KPL ユーザーレコード*を意味します。Kinesis Data Streams レコードを意味するときは、明示的に *Kinesis Data Streams レコード*と表現します。

KPL ユーザーレコードは、ユーザーにとって特定の意味のあるデータの BLOB です。たとえば、ウェブサイトの UI イベントまたはウェブサーバーのログエントリを表す JSON BLOB がそれに該当します。

Kinesis Data Streams レコードは、Kinesis Data Streams サービス API で定義される`Record` データ構造のインスタンスです。これには、パーティションキー、シーケンス番号、データの BLOB が含まれています。

## バッチ処理
<a name="kinesis-kpl-concepts-batching"></a>

*バッチ処理*は、各項目に対して単一のアクションを繰り返し実行する代わりに、複数の項目に対してそのアクションを実行することを意味します。

ここでは、項目はレコードに対応し、アクションはレコードを Kinesis Data Streams に送信することに対応します。バッチ処理を使用しない場合、各レコードを別々の Kinesis Data Streams レコードに配置し、それぞれを Kinesis Data Streams に送信するたびに HTTP リクエストを実行します。バッチ処理では、各 HTTP リクエストにより、1 つではなく複数のレコードを処理できます。

KPL では、2 種類のバッチ処理がサポートされます。
+ *集約* - 複数のレコードを単一の Kinesis Data Streams レコードに保存します。
+ *収集* - API オペレーション `PutRecords` を使用して、Kinesis Data Streams 内の 1 つ以上のシャードに複数の Kinesis Data Streams レコードを送信します。

2 種類の KPL バッチ処理は、共存できるように設計されており、互いに独立して有効または無効にできます。デフォルトでは、どちらも有効です。

## 集計
<a name="kinesis-kpl-concepts-aggretation"></a>

*集約*は、複数レコードを 1 つの Kinesis Data Streams レコードに保存することを意味します。集約を使用すると、API コールごとに送信されるレコード数を増やすことができ、効率的にプロデューサーのスループットを高めることができます。

Kinesis Data Streams シャードは、1 秒あたり最大で 1,000 レコードまたは 1 MB のスループットをサポートします。1 秒あたりの Kinesis Data Streams レコードの制限により、お客様のレコードは 1 KB 未満に制限されます。レコードの集約を使用すると、複数のレコードを単一の Kinesis Data Streams レコードに結合できます。そのため、お客様はシャードあたりのスループットを改善することができます。

リージョンが us-east-1 の 1 つのシャードで、1 つが 512 バイトのレコードを 1 秒あたり 1,000 レコードの一定割合で処理する場合を考えます。KPL 集約を使用すると、1,000 レコードを 10 Kinesis Data Streams レコードに詰めることができ、RPS を 10 に減らすことができます (それぞれ 50 KB)。

## 収集
<a name="kinesis-kpl-concepts-collection"></a>

*収集*は、各 Kinesis Data Streams レコードをそれぞれの HTTP リクエストで送信するのではなく、複数の Kinesis Data Streams レコードをバッチ処理し、API オペレーション `PutRecords` を呼び出して単一の HTTP リクエストでそれらを送信することを意味します。

これにより、個別の HTTP リクエストを多数実行するオーバーヘッドが減るため、収集を使用しない場合に比べスループットが向上します。実際、`PutRecords` 自体が、この目的のために設計されています。

収集は、Kinesis Data Streams レコードのグループを使用している点で集約と異なります。収集された Kinesis Data Streams レコードには、ユーザーの複数のレコードをさらに含めることができます。この関係は、次のように図示できます。

```
record 0 --|
record 1   |        [ Aggregation ]
    ...    |--> Amazon Kinesis record 0 --|
    ...    |                              |
record A --|                              |
                                          |
    ...                   ...             |
                                          |
record K --|                              |
record L   |                              |      [ Collection ]
    ...    |--> Amazon Kinesis record C --|--> PutRecords Request
    ...    |                              |
record S --|                              |
                                          |
    ...                   ...             |
                                          |
record AA--|                              |
record BB  |                              |
    ...    |--> Amazon Kinesis record M --|
    ...    |
record ZZ--|
```

# KPL をプロデューサーコードと統合する
<a name="kinesis-kpl-integration"></a>

Amazon Kinesis Producer Library (KPL) は独立したプロセスで実行され、IPC を使用して親ユーザープロセスと通信します。このアーキテクチャは、[マイクロサービス](http://en.wikipedia.org/wiki/Microservices)と呼ばれる場合があり、次の 2 つの主な理由からこれが選択されます。

**1) KPL がクラッシュしても、ユーザープロセスはクラッシュしません**  
プロセスには Kinesis Data Streams と無関係なタスクが含まれている場合があり、KPL がクラッシュしてもオペレーションを続行できます。また、親ユーザープロセスが KPL を再起動し、完全に機能する状態に復旧することもできます (この機能は、正式なラッパーに含まれています)。

メトリクスを Kinesis Data Streams に送信するウェブサーバーがその例です。このサーバーは、Kinesis Data Streams 部分が動作を停止してもページの提供を続行できます。そのため、KPL のバグが原因でサーバー全体がクラッシュすると、不要なサービス停止が発生します。

**2) 任意のクライアントをサポートできます**  
正式にサポートされている言語以外の言語を使用するお客様もいます。これらのお客様も KPL を簡単に使用できます。

## 推奨される使用状況
<a name="kinesis-kpl-integration-usage"></a>

使用状況の異なるユーザーに推奨される設定を次の表に示します。この表を参考に、KPL を使用できるかどうか、どのように使用できるかを判断できます。集約が有効な場合、コンシューマー側で集約解除を使用してレコードを抽出する必要があることにも注意してください。


| プロデューサー側の言語 | コンシューマー側の言語 | KCL バージョン | チェックポイントロジック | KPL の使用可否 | 注意 | 
| --- | --- | --- | --- | --- | --- | 
| Java 以外 | \$1 | \$1 | \$1 | 不可 | 該当なし | 
| Java | Java | Java SDK を直接使用 | 該当なし | 可 | 集約を使用する場合、GetRecords を呼び出した後に、提供された集約解除ライブラリを使用する必要があります。 | 
| Java | Java 以外 | SDK を直接使用 | 該当なし | 可 | 集約を無効にする必要があります。 | 
| Java | Java | 1.3.x | 該当なし | 可 | 集約を無効にする必要があります。 | 
| Java | Java  | 1.4.x | 引数なしでチェックポイントを呼び出す | 可 | なし | 
| Java | Java | 1.4.x | 明示的なシーケンス番号を使用してチェックポイントを呼び出す | 可 | 集約を無効にするかコードを変更し、チェックポイント作成用の拡張されたシーケンス番号を使用します。 | 
| Java | Java 以外  | 1.3.x \$1 複数言語デーモン \$1 言語固有のラッパー | 該当なし | 可 | 集約を無効にする必要があります。 | 

# KPL を使用して Kinesis Data Stream に書き込む
<a name="kinesis-kpl-writing"></a>

以下のセクションでは、最も基本的なプロデューサーから完全に非同期なコードまで順にサンプルコードを示します。

## 最低限のプロデューサーコード
<a name="kinesis-kpl-writing-code"></a>

次のコードは、最小限の機能するプロデューサーを書くために必要なものがすべて含まれています。Amazon Kinesis Producer Library (KPL) ユーザーレコードはバックグラウンドで処理されます。

```
// KinesisProducer gets credentials automatically like 
// DefaultAWSCredentialsProviderChain. 
// It also gets region automatically from the EC2 metadata service. 
KinesisProducer kinesis = new KinesisProducer();  
// Put some records 
for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block       
    kinesis.addUserRecord("myStream", "myPartitionKey", data); 
}  
// Do other stuff ...
```

## 結果に対する同期的な応答
<a name="kinesis-kpl-writing-synchronous"></a>

前のコード例では、 ユーザーレコードが成功したかどうかをチェックしませんでした。KPL は、失敗に対処するために必要な再試行を実行します。ただし、結果を確認する必要がある場合は、次の例 (分かりやすくするため前の例を使用しています) のように、`addUserRecord` から返される `Future` オブジェクトを使用して結果を確認します。

```
KinesisProducer kinesis = new KinesisProducer();  

// Put some records and save the Futures 
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); 
for (int i = 0; i < 100; i++) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block 
    putFutures.add(
        kinesis.addUserRecord("myStream", "myPartitionKey", data)); 
}  

// Wait for puts to finish and check the results 
for (Future<UserRecordResult> f : putFutures) {
    UserRecordResult result = f.get(); // this does block     
    if (result.isSuccessful()) {         
        System.out.println("Put record into shard " + 
                            result.getShardId());     
    } else {
        for (Attempt attempt : result.getAttempts()) {
            // Analyze and respond to the failure         
        }
    }
}
```

## 結果に対する非同期的な応答
<a name="kinesis-kpl-writing-asynchronous"></a>

前の例では、`get()` オブジェクトに対して `Future` を呼び出しているため、ランタイムがブロックされます。ランタイムのブロックを避ける必要がある場合には、次の例に示すように非同期コールバックを使用できます。

```
KinesisProducer kinesis = new KinesisProducer();

FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {     
    @Override public void onFailure(Throwable t) {
        /* Analyze and respond to the failure  */ 
    };     
    @Override public void onSuccess(UserRecordResult result) { 
        /* Respond to the success */ 
    };
};

for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));      
    ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);     
    // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
    Futures.addCallback(f, myCallback); 
}
```

# Amazon Kinesis Producer Library を設定する
<a name="kinesis-kpl-config"></a>

デフォルト設定のままで、ほとんどのユースケースに問題なく使用できますが、デフォルト設定の一部を変更することで、ニーズに合わせて `KinesisProducer` の動作を調整することができます。それには、`KinesisProducerConfiguration` クラスのインスタンスを `KinesisProducer` コンストラクタに渡します。たとえば、次のようにします。

```
KinesisProducerConfiguration config = new KinesisProducerConfiguration()
        .setRecordMaxBufferedTime(3000)
        .setMaxConnections(1)
        .setRequestTimeout(60000)
        .setRegion("us-west-1");
        
final KinesisProducer kinesisProducer = new KinesisProducer(config);
```

プロパティファイルから設定をロードすることもできます。

```
KinesisProducerConfiguration config = KinesisProducerConfiguration.fromPropertiesFile("default_config.properties");
```

ユーザープロセスがアクセスできる任意のパスとファイル名に置き換えることができます。さらに、このようにして作成した `KinesisProducerConfiguration` インスタンスに対して設定メソッドを呼び出して、設定をカスタマイズできます。

プロパティファイルでは、PascalCase 内の名前を使用してパラメータを指定する必要があります。その名前は、`KinesisProducerConfiguration` クラスの設定メソッドで使用されるものと一致します。例: 

```
RecordMaxBufferedTime = 100
MaxConnections = 4
RequestTimeout = 6000
Region = us-west-1
```

設定パラメータの使用方法と値の制限の詳細については、[sample configuration properties file on GitHub](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties)を参照してください。

`KinesisProducer` の初期化後に、使用した `KinesisProducerConfiguration` インスタンスを変更しても何の変化もないことに注意してください。現在、`KinesisProducer` は動的設定をサポートしていません。

# コンシューマーの集約解除を実装する
<a name="kinesis-kpl-consumer-deaggregation"></a>

KCL は、リリース 1.4.0 から KPL ユーザーレコードの自動集計解除をサポートしています。以前のバージョンの KCL で書かれたコンシューマーアプリケーションのコードは、KCL を更新した後、コードを何も修正せずにコンパイルできます。ただし、プロデューサー側で KPL の集約を使用している場合、チェックポイントが多少関係してきます。集約されたレコード内のすべてのサブレコードは同じシーケンス番号を持っているため、サブレコード間の区別が必要な場合、チェックポイントを使用して追加のデータを保存する必要があります。この追加データは、*サブシーケンス番号*と呼ばれます。

**Topics**
+ [以前のバージョンの KCL から移行する](#kinesis-kpl-consumer-deaggregation-migration)
+ [KPL の集約解除のための KCL の拡張を使用する](#kinesis-kpl-consumer-deaggregation-extensions)
+ [GetRecords を直接使用する](#kinesis-kpl-consumer-deaggregation-getrecords)

## 以前のバージョンの KCL から移行する
<a name="kinesis-kpl-consumer-deaggregation-migration"></a>

集約を使用している場合でも、既存のチェックポイント呼び出しを変更する必要はありません。Kinesis Data Streams に保存されているすべてのレコードを正しく取得できることが保証されています。以下で説明する特定のユースケースをサポートするために、現在 KCL には、2 つの新しいチェックポイントオペレーションが用意されています。

既存のコードが KPL サポート以前の KCL 用に書かれていて、チェックポイントオペレーションが引数なしで呼び出される場合、そのコードの動作は、バッチ内にある最後の KPL ユーザーレコードのシーケンス番号に対するチェックポイントの作成と同等です。シーケンス番号文字列を使用してチェックポイントオペレーションを呼び出す場合は、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、バッチの指定されたシーケンス番号に対するチェックポイントの作成と同等です。

引数なしで新しいKCL チェックポイントオペレーション `checkpoint()` を呼び出すことは、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、バッチ内の最後の `Record` 呼び出しのシーケンス番号に対するチェックポイントの作成と意味的に同等です。

新しい KCL チェックポイントオペレーション `checkpoint(Record record)` を呼び出すことは、暗黙的なサブシーケンス番号 0 (ゼロ) を伴う、指定された `Record` のシーケンス番号に対するチェックポイントの作成と意味的に同等です。`Record` 呼び出しが実際には `UserRecord` である場合、`UserRecord` のシーケンス番号とサブシーケンス番号にチェックポイントが作成されます。

新しい KCL チェックポイントオペレーション `checkpoint(String sequenceNumber, long subSequenceNumber)` を呼び出すと、指定されたシーケンス番号とサブシーケンス番号に明示的にチェックポイントが作成されます。

いずれの場合も、チェックポイントが Amazon DynamoDB チェックポイントテーブルに保存された後は、アプリケーションがクラッシュして再起動した場合、KCL により、レコードの取得が正常に再開されます。さらにレコードがシーケンス内に含まれている場合は、最後にチェックポイントが作成されたシーケンス番号が付けられているレコード内の次のサブシーケンス番号のレコードから取得が開始されます。前のシーケンス番号のレコードにある最後のサブシーケンス番号が、最新のチェックポイントに含まれている場合、その次のシーケンス番号が付けられているレコードから取得が開始されます。

次のセクションでは、レコードのスキップや重複を避けるために必要な、コンシューマーのシーケンスとサブシーケンスのチェックポイントの詳細について説明します。コンシューマーのレコード処理を停止し再起動するときに、レコードのスキップや重複が重要でない場合は、変更せずに既存のコードを実行してかまいません。

## KPL の集約解除のための KCL の拡張を使用する
<a name="kinesis-kpl-consumer-deaggregation-extensions"></a>

KPL の集約解除ではサブシーケンスチェックポイントを使用できます。サブシーケンスチェックポイントを使いやすくするために、`UserRecord` クラスが KCL に追加されています。

```
public class UserRecord extends Record {     
    public long getSubSequenceNumber() {
    /* ... */
    }      
    @Override 
    public int hashCode() {
    /* contract-satisfying implementation */ 
    }      
    @Override 
    public boolean equals(Object obj) {
    /* contract-satisfying implementation */ 
    } 
}
```

このクラスは、現在 `Record` の代わりに使用されています。これは `Record` のサブクラスであるため、既存のコードは影響を受けません。`UserRecord` クラスは、実際のサブレコードと通常の集約されていないレコードの両方を表します。集約されていないレコードは、サブレコードを 1 つだけ含む集約されたレコードと考えることができます。

さらに、2 つの新しいオペレーションが `IRecordProcessorCheckpointer` に追加されています。

```
public void checkpoint(Record record); 
public void checkpoint(String sequenceNumber, long subSequenceNumber);
```

サブシーケンス番号チェックポイントの使用を開始するには、次の変更を行います。次のフォームコードを変更します。

```
checkpointer.checkpoint(record.getSequenceNumber());
```

新しいフォームコードは次のようになります。

```
checkpointer.checkpoint(record);
```

サブシーケンスチェックポイントでは、`checkpoint(Record record)` フォームを使用することをお勧めします。ただし、チェックポイントの作成で使用する文字列にすでに `sequenceNumbers` を保存している場合は、次の例に示すように、`subSequenceNumber` も保存する必要があります。

```
String sequenceNumber = record.getSequenceNumber(); 
long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber();  // ... do other processing  
checkpointer.checkpoint(sequenceNumber, subSequenceNumber);
```

この実装では `UserRecord` を必ず使用するため、`UserRecord` から `Record` へのキャストは必ず成功します。シーケンス番号の計算を実行する必要がない場合、この方法はお勧めしません。

KPL ユーザーレコードの処理中に、CL は、サブシーケンス番号を Amazon DynamoDB に各行の追加フィールドとして書き込みます。以前のバージョンの KCL では、チェックポイントを再開するときに `AFTER_SEQUENCE_NUMBER` を使用してレコードを取得していました。KPL サポートを含む現在の KCL では、代わりに `AT_SEQUENCE_NUMBER` を使用します。チェックポイントが作成されたシーケンス番号のレコードを取得するとき、チェックポイントが作成されたサブシーケンス番号がチェックされ、サブレコードが必要に応じて削除されます (最後のサブレコードにチェックポイントが作成されている場合、すべてのサブレコードが削除されます)。ここでも、集約されていないレコードは、単一のサブレコードを含む集約されたレコードと考えることができ、集約されたレコードと集約されていないレコードの両方で同じアルゴリズムを使用できます。

## GetRecords を直接使用する
<a name="kinesis-kpl-consumer-deaggregation-getrecords"></a>

KCL の使用を選択せずに、API オペレーション `GetRecords` を直接呼び出して Kinesis Data Streams レコードを取得することもできます。これらの取得したレコードを元の KPL ユーザーレコードに解凍するには、`UserRecord.java` にある次の静的なオペレーションの 1 つを呼び出します。

```
public static List<Record> deaggregate(List<Record> records)

public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)
```

最初のオペレーションでは、`startingHashKey` のデフォルト値 `0` (ゼロ) と `endingHashKey` のデフォルト値 `2^128 -1` を使用します。

これらの各オペレーションは、Kinesis Data Streams レコードの指定されたリストを KPL ユーザーレコードのリストに集約解除します。KPL ユーザーレコードの明示的なハッシュキーまたはパーティションキーが `startingHashKey` と `endingHashKey` の範囲 (境界を含む) 外にある場合、これらのユーザーレコードは、返されるレコードのリストから破棄されます。

# Amazon Data Firehose で KPL を使用する
<a name="kpl-with-firehose"></a>

Kinesis Producer Library (KPL) を使用して Kinesis データストリームにデータを書き込む場合、集約を使用してその Kinesis データストリームに書き込むレコードを結合できます。その後そのデータストリームを Firehose 配信ストリームのソースとして使用する場合、Firehose はレコードの集約を解除してから送信先に配信します。データを変換するように配信ストリームを設定する場合、Firehose はレコードの集約を解除してから AWS Lambdaに配信します。詳細については、「[Writing to Amazon Firehose Using Kinesis Data Streams](https://docs.aws.amazon.com/firehose/latest/dev/writing-with-kinesis-streams.html)」を参照してください。

# Schema Registry で KPL AWS Glue を使用する
<a name="kpl-with-schemaregistry"></a>

Kinesis データストリームを AWS Glue Schema Registry と統合できます。 AWS Glue スキーマレジストリを使用すると、スキーマを一元的に検出、制御、および進化させながら、生成されたデータが登録されたスキーマによって継続的に検証されるようにできます。スキーマ は、データレコードの構造と形式を定義します。スキーマは、信頼性の高いデータの公開、利用、または保存のための仕様をバージョニングしたものです。 AWS Glue Schema Registry を使用すると、ストリーミングアプリケーション内のend-to-endのデータ品質とデータガバナンスを向上させることができます。詳細については、[AWS Glue スキーマレジストリ](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)を参照してください。この統合を設定する方法の 1 つは、Java で KPL および Kinesis Client Library (KCL) ライブラリを使用することです。

**重要**  
現在、Kinesis Data Streams と AWS Glue スキーマレジストリの統合は、Java に実装された KPL プロデューサーを使用する Kinesis データストリームでのみサポートされています。多言語サポートは提供されていません。

KPL を使用して Kinesis Data Streams とスキーマレジストリの統合を設定する方法の詳細については、[「ユースケース: Amazon Kinesis Data Streams と AWS Glue スキーマレジストリの統合」の「KPL/KCL ライブラリを使用したデータの操作](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kds)」セクションを参照してください。

# KPL プロキシ設定を設定する
<a name="kpl-proxy-configuration"></a>

インターネットに直接接続できないアプリケーションの場合、すべての AWS SDK クライアントは HTTP または HTTPS プロキシの使用をサポートしています。一般的なエンタープライズ環境では、すべてのアウトバウンドネットワークトラフィックがプロキシサーバーを経由する必要があります。アプリケーションが Kinesis プロデューサーライブラリ (KPL) を使用してプロキシサーバーを使用する AWS 環境でデータを収集して に送信する場合、アプリケーションには KPL プロキシ設定が必要です。KPL は、Kinesis SDK AWS 上に構築された高レベルのライブラリです。これは、ネイティブプロセスとラッパーに分割されています。ネイティブプロセスがレコードの処理ジョブと送信ジョブのすべてを実行する一方で、ラッパーはネイティブプロセスの管理と、ネイティブプロセスとの通信を実行します。詳細については、「[Implementing Efficient and Reliable Producers with the Amazon Kinesis Producer Library](https://aws.amazon.com/blogs/big-data/implementing-efficient-and-reliable-producers-with-the-amazon-kinesis-producer-library/)」を参照してください。

ラッパーは Java で記述され、ネイティブプロセスは Kinesis SDK を使用して C\$1\$1 で記述されます。KPL バージョン 0.14.7 以降では、すべてのプロキシ設定をネイティブプロセスに渡すことができる、Java ラッパー内のプロキシ設定がサポートされるようになりました。詳細については、[https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7](https://github.com/awslabs/amazon-kinesis-producer/releases/tag/v0.14.7) を参照してください。

KPL アプリケーションへのプロキシ設定の追加には、以下のコードを使用できます。

```
KinesisProducerConfiguration configuration = new KinesisProducerConfiguration();
// Next 4 lines used to configure proxy 
configuration.setProxyHost("10.0.0.0"); // required
configuration.setProxyPort(3128); // default port is set to 443
configuration.setProxyUserName("username"); // no default 
configuration.setProxyPassword("password"); // no default

KinesisProducer kinesisProducer = new KinesisProducer(configuration);
```

# KPL バージョンライフサイクルポリシー
<a name="kpl-version-lifecycle-policy"></a>

このトピックでは、Amazon Kinesis Producer Library (KPL) のバージョンライフサイクルポリシーの概要を説明します。 では、新機能と機能強化、バグ修正、セキュリティパッチ、依存関係の更新をサポートするために、KPL バージョンの新しいリリース AWS を定期的に提供しています。最新の機能、セキュリティ更新、基本的な依存関係に対応するため、KPL を常に最新バージョンへ更新しておくことをお勧めします。サポートが終了した KPL バージョンを継続使用することは**お勧めしません**。

主要な KPL バージョンのライフサイクルは、次の 3 つのフェーズで構成されます。
+ **一般提供 (GA)** – このフェーズでは、メジャーバージョンが完全にサポートされています。 は、Kinesis Data Streams の新機能や API アップデートのサポート、バグやセキュリティの修正を含む、定期的なマイナーバージョンとパッチバージョンのリリース AWS を提供します。
+ **メンテナンスモード** – パッチバージョンのリリース AWS を制限して、重大なバグ修正とセキュリティの問題にのみ対処します。Kinesis Data Streams の新機能や API に対する更新は、このメジャーバージョンには行われません。
+ **サポート終了** – そのメジャーバージョンに対して更新やリリースは一切提供されません。以前に公開されたリリースは引き続き公開パッケージマネージャーから入手でき、コードは GitHub に残ります。サポートが終了したバージョンの使用は、ユーザーの裁量で行われます。最新のメジャーバージョンにアップグレードすることをお勧めします。


| メジャーバージョン | 現在のフェーズ | リリース日 | メンテナンスモード日 | サポート終了日 | 
| --- | --- | --- | --- | --- | 
| KPL 0.x | メンテナンスモード | 2015-06-02 | 2025-04-17 | 2026-01-30 | 
| KPL 1.x | 一般提供 | 2024-12-15 | -- | -- | 