

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# KPL を使用して Kinesis Data Stream に書き込む
<a name="kinesis-kpl-writing"></a>

以下のセクションでは、最も基本的なプロデューサーから完全に非同期なコードまで順にサンプルコードを示します。

## 最低限のプロデューサーコード
<a name="kinesis-kpl-writing-code"></a>

次のコードは、最小限の機能するプロデューサーを書くために必要なものがすべて含まれています。Amazon Kinesis Producer Library (KPL) ユーザーレコードはバックグラウンドで処理されます。

```
// KinesisProducer gets credentials automatically like 
// DefaultAWSCredentialsProviderChain. 
// It also gets region automatically from the EC2 metadata service. 
KinesisProducer kinesis = new KinesisProducer();  
// Put some records 
for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block       
    kinesis.addUserRecord("myStream", "myPartitionKey", data); 
}  
// Do other stuff ...
```

## 結果に対する同期的な応答
<a name="kinesis-kpl-writing-synchronous"></a>

前のコード例では、 ユーザーレコードが成功したかどうかをチェックしませんでした。KPL は、失敗に対処するために必要な再試行を実行します。ただし、結果を確認する必要がある場合は、次の例 (分かりやすくするため前の例を使用しています) のように、`addUserRecord` から返される `Future` オブジェクトを使用して結果を確認します。

```
KinesisProducer kinesis = new KinesisProducer();  

// Put some records and save the Futures 
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); 
for (int i = 0; i < 100; i++) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block 
    putFutures.add(
        kinesis.addUserRecord("myStream", "myPartitionKey", data)); 
}  

// Wait for puts to finish and check the results 
for (Future<UserRecordResult> f : putFutures) {
    UserRecordResult result = f.get(); // this does block     
    if (result.isSuccessful()) {         
        System.out.println("Put record into shard " + 
                            result.getShardId());     
    } else {
        for (Attempt attempt : result.getAttempts()) {
            // Analyze and respond to the failure         
        }
    }
}
```

## 結果に対する非同期的な応答
<a name="kinesis-kpl-writing-asynchronous"></a>

前の例では、`get()` オブジェクトに対して `Future` を呼び出しているため、ランタイムがブロックされます。ランタイムのブロックを避ける必要がある場合には、次の例に示すように非同期コールバックを使用できます。

```
KinesisProducer kinesis = new KinesisProducer();

FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {     
    @Override public void onFailure(Throwable t) {
        /* Analyze and respond to the failure  */ 
    };     
    @Override public void onSuccess(UserRecordResult result) { 
        /* Respond to the success */ 
    };
};

for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));      
    ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);     
    // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
    Futures.addCallback(f, myCallback); 
}
```