

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# KPL을 사용하여 Kinesis 데이터 스트림에 쓰기
<a name="kinesis-kpl-writing"></a>

다음 단원에서는 가장 기본적인 생산자부터 완전 비동기식 코드까지 진행되는 샘플 코드를 보여줍니다.

## 베어본 생산자 코드
<a name="kinesis-kpl-writing-code"></a>

다음 코드만 있으면 작동되는 최소한의 생산자를 쓸 수 있습니다. Amazon Kinesis Producer Library(KPL) 사용자 레코드는 백그라운드에서 처리됩니다.

```
// KinesisProducer gets credentials automatically like 
// DefaultAWSCredentialsProviderChain. 
// It also gets region automatically from the EC2 metadata service. 
KinesisProducer kinesis = new KinesisProducer();  
// Put some records 
for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block       
    kinesis.addUserRecord("myStream", "myPartitionKey", data); 
}  
// Do other stuff ...
```

## 결과에 동기적으로 응답
<a name="kinesis-kpl-writing-synchronous"></a>

이전 예제에서 코드는 KPL 사용자 레코드가 성공했는지 여부를 확인하지 않습니다. KPL은 실패를 설명하는 데 필요한 재시도를 수행합니다. 하지만 결과를 확인하려면 다음 예제(컨텍스트에 맞게 표시된 이전 예제)와 같이 `addUserRecord`에서 반환되는`Future` 객체를 사용하여 검사할 수 있습니다.

```
KinesisProducer kinesis = new KinesisProducer();  

// Put some records and save the Futures 
List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); 
for (int i = 0; i < 100; i++) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));
    // doesn't block 
    putFutures.add(
        kinesis.addUserRecord("myStream", "myPartitionKey", data)); 
}  

// Wait for puts to finish and check the results 
for (Future<UserRecordResult> f : putFutures) {
    UserRecordResult result = f.get(); // this does block     
    if (result.isSuccessful()) {         
        System.out.println("Put record into shard " + 
                            result.getShardId());     
    } else {
        for (Attempt attempt : result.getAttempts()) {
            // Analyze and respond to the failure         
        }
    }
}
```

## 결과에 비동기적으로 응답
<a name="kinesis-kpl-writing-asynchronous"></a>

이전 예제에서는 `Future` 객체에서 `get()`을 호출하므로 런타임이 차단됩니다. 런타임을 차단하지 않으려면 다음 예제에 나온 대로 비동기 콜백을 사용할 수 있습니다.

```
KinesisProducer kinesis = new KinesisProducer();

FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() {     
    @Override public void onFailure(Throwable t) {
        /* Analyze and respond to the failure  */ 
    };     
    @Override public void onSuccess(UserRecordResult result) { 
        /* Respond to the success */ 
    };
};

for (int i = 0; i < 100; ++i) {
    ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8"));      
    ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data);     
    // If the Future is complete by the time we call addCallback, the callback will be invoked immediately.
    Futures.addCallback(f, myCallback); 
}
```