

# DynamoDB Streams Kinesis 어댑터를 사용하여 스트림 레코드 처리
<a name="Streams.KCLAdapter"></a>

Amazon Kinesis 어댑터 사용은 Amazon DynamoDB의 스트림을 소비할 때 권장되는 방법입니다. DynamoDB Streams API는 Kinesis Data Streams와 유사합니다. 두 서비스 모두 데이터 스트림이 샤드로 구성되어 있습니다. 샤드란 스트림 레코드의 컨테이너입니다. 두 서비스의 API에는 `ListStreams`, `DescribeStream`, `GetShards` 및 `GetShardIterator` 작업이 포함되어 있습니다. (이러한 DynamoDB Streams 작업은 Kinesis Data Streams의 해당 작업과 유사하지만 100% 동일하지는 않습니다.)

DynamoDB Streams 사용자는 KCL에 있는 디자인 패턴을 활용하여 DynamoDB Streams 샤드와 스트림 레코드를 처리할 수 있습니다. 이렇게 하려면 DynamoDB Streams Kinesis 어댑터를 사용합니다. Kinesis 어댑터는 DynamoDB Streams의 레코드를 사용 및 처리하는 데 KCL을 사용할 수 있도록 Kinesis Data Streams 인터페이스를 구현합니다. DynamoDB Streams Kinesis 어댑터를 설정하고 설치하는 방법에 대한 지침은 [GitHub 리포지토리](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)를 참조하세요.

Kinesis Client Library(KCL)를 사용하여 Kinesis Data Streams의 애플리케이션을 작성할 수 있습니다. KCL은 하위 수준의 Kinesis Data Streams API에 유용한 추상화를 제공하여 코딩을 단순화합니다. KCL에 대한 자세한 내용은 *Amazon Kinesis Data Streams 개발자 안내서*의 [Kinesis Client Library를 사용하여 소비자 개발](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)을 참조하세요.

DynamoDB는 AWS SDK for Java v2.x와 함께 KCL 버전 3.x를 사용할 것을 권장합니다. AWS SDK for Java v1.x용 AWS SDK를 사용하는 현재 DynamoDB Streams Kinesis Adapter 버전 1.x는 [AWS SDK 및 도구 유지 관리 정책](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)에 따라 전환 기간 동안 의도한 대로 수명 주기 전체에 걸쳐 계속 완전히 지원됩니다.

**참고**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션을 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션하는 것이 좋습니다. 최신 KCL 버전을 찾으려면 GitHub의 [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) 페이지를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](https://docs.aws.amazon.com/streams/latest/dev/kcl.html)을 참조하세요. KCL 1.x에서 KCL 3.x로의 마이그레이션에 대한 자세한 내용은 KCL 1.x에서 KCL 3.x로의 마이그레이션을 참조하세요.

다음 다이어그램은 이러한 라이브러리가 서로 상호 작용하는 방법을 보여 줍니다.

![\[DynamoDB Streams 레코드 처리를 위한 DynamoDB Streams, Kinesis Data Streams 및 KCL 간의 상호 작용입니다.\]](http://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


DynamoDB Streams Kinesis 어댑터가 준비되어 있으면 DynamoDB Streams 엔드포인트로 원활하게 전달되는 API 호출을 통해 KCL 인터페이스 개발을 시작할 수 있습니다.

애플리케이션이 시작되면 KCL을 호출하여 작업자를 인스턴스화합니다. 작업자에게 애플리케이션의 구성 정보를 제공해야 합니다. 제공해야 하는 구성 정보에는 스트림 서술자와 AWS 자격 증명, 제공하는 레코드 프로세서 클래스의 이름 등이 있습니다. 레코드 프로세서에서 코드를 실행하면 작업자는 다음 작업을 수행합니다.
+ 스트림에 연결합니다
+ 스트림 내 샤드를 열거합니다.
+ 스트림 내에서 닫힌 상위 샤드의 하위 샤드를 확인하고 열거
+ 샤드 연결을 다른 작업자(있는 경우)와 조정합니다.
+ 관리하는 모든 샤드의 레코드 프로세서를 인스턴스화합니다
+ 스트림에서 레코드를 가져옵니다.
+ 처리량이 많을 때 GetRecords API 직접 호출 속도를 조정합니다(캐치업 모드가 구성된 경우).
+ 해당하는 레코드 프로세서로 레코드를 푸시합니다
+ 처리된 레코드에 대해 체크포인트를 수행합니다
+ 작업자 인스턴스 수가 변경되면 샤드-작업자 연결을 조정합니다
+ 샤드가 분할되면 샤드-작업자 연결을 조정합니다.

KCL 어댑터는 임시 처리량 증가를 처리하기 위한 직접 호출 속도 자동 조정 기능인 캐치업 모드를 지원합니다. 스트림 처리 지연이 구성 가능한 임계값(기본값 1분)을 초과하면 캐치업 모드를 통해 GetRecords API 직접 호출 빈도가 구성 가능한 값(기본값 3배)으로 조정되어 레코드를 더 빠르게 가져오게 되며, 지연이 감소하면 평소 상태로 돌아옵니다. 이는 DynamoDB 쓰기 활동이 기본 폴링 속도를 사용하는 소비자에게 과부하를 일으킬 수 있는 처리량이 많은 기간에 유용합니다. `catchupEnabled` 구성 파라미터(기본값 false)를 통해 캐치업 모드를 활성화할 수 있습니다.

**참고**  
여기에 나온 KCL 개념에 대한 설명은 *Amazon Kinesis Data Streams 개발자 안내서*의 [Kinesis Client Library를 사용하여 소비자 개발](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)을 참조하세요.  
AWS Lambda에 스트림을 사용하는 방법에 대한 자세한 내용은 [DynamoDB Streams 및 AWS Lambda 트리거](Streams.Lambda.md) 섹션을 참조하세요.

# KCL 1.x에서 KCL 3.x로 마이그레이션
<a name="streams-migrating-kcl"></a>

## 개요
<a name="migrating-kcl-overview"></a>

이 가이드에서는 소비자 애플리케이션을 KCL 1.x에서 KCL 3.x로 마이그레이션하기 위한 지침을 제공합니다. KCL 1.x와 KCL 3.x의 아키텍처 차이로 인해 마이그레이션하려면 호환성을 보장하기 위해 여러 구성 요소를 업데이트해야 합니다.

KCL 1.x는 KCL 3.x와 다른 클래스 및 인터페이스를 사용합니다. 먼저 레코드 프로세서, 레코드 프로세서 팩토리 및 워커 클래스를 KCL 3.x 호환 형식으로 마이그레이션하고 KCL 1.x에서 KCL 3.x로 마이그레이션하는 단계를 따라야 합니다.

## 마이그레이션 단계
<a name="migration-steps"></a>

**Topics**
+ [1단계: 레코드 프로세서 마이그레이션](#step1-record-processor)
+ [2단계: 레코드 프로세서 팩토리 마이그레이션](#step2-record-processor-factory)
+ [3단계: 워커 마이그레이션](#step3-worker-migration)
+ [4단계: KCL 3.x 구성 개요 및 권장 사항](#step4-configuration-migration)
+ [5단계: KCL 2.x에서 KCL 3.x로 마이그레이션](#step5-kcl2-to-kcl3)

### 1단계: 레코드 프로세서 마이그레이션
<a name="step1-record-processor"></a>

다음은 KCL 1.x DynamoDB Streams Kinesis 어댑터에 구현된 레코드 프로세서를 보여주는 예제입니다.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**RecordProcessor 클래스를 마이그레이션하려면**

1. 다음과 같이 인터페이스를 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 및 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware`에서 `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor`로 변경합니다.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. `initialize` 및 `processRecords` 메서드에 대한 import 문을 업데이트합니다.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. `shutdownRequested` 메서드를 새 메서드인 `leaseLost`, `shardEnded`, `shutdownRequested`으로 바꿉니다.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

다음은 업데이트된 버전의 레코드 프로세서 클래스입니다.

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**참고**  
DynamoDB Streams Kinesis Adapter는 이제 SDKv2 Record 모델을 사용합니다. SDKv2에서 복합 `AttributeValue` 객체(`BS`, `NS`, `M`, `L`, `SS`)는 null을 반환하지 않습니다. `hasBs()`, `hasNs()`, `hasM()`, `hasL()`, `hasSs()` 메서드를 사용하여 이러한 값이 존재하는지 확인합니다.

### 2단계: 레코드 프로세서 팩토리 마이그레이션
<a name="step2-record-processor-factory"></a>

레코드 프로세스 팩토리는 리스가 필요할 경우 레코드 프로세서 생성을 담당합니다. 다음은 KCL 1.x 팩토리의 예입니다.

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**`RecordProcessorFactory`를 마이그레이션하려면**
+ 구현된 인터페이스를 다음과 같이 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory`에서 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`로 변경합니다.

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

다음은 3.0의 레코드 프로세서 팩토리의 예입니다.

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### 3단계: 워커 마이그레이션
<a name="step3-worker-migration"></a>

KCL 버전 3.0에서는 **Scheduler**라는 새로운 클래스가 **Worker** 클래스를 대체합니다. 다음은 KCL 1.x 워커의 예입니다.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**작업자를 마이그레이션하려면**

1. `import` 클래스에 대한 `Worker` 문을 `Scheduler` 및 `ConfigsBuilder` 클래스에 대한 가져오기 문으로 변경합니다.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. `StreamTracker`를 가져오고 `StreamsWorkerFactory`의 가져오기를 `StreamsSchedulerFactory`로 변경합니다.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. 애플리케이션을 시작할 위치를 선택합니다. `TRIM_HORIZON` 또는 `LATEST`일 수 있습니다.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. `StreamTracker` 인스턴스를 만듭니다.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. `AmazonDynamoDBStreamsAdapterClient` 객체를 생성합니다.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. `ConfigsBuilder` 객체를 생성합니다.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 다음 예제와 같이 `ConfigsBuilder`를 사용하여 `Scheduler`를 생성합니다.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**중요**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 설정은 DynamoDB Streams Kinesis Adapter KCL v2와 v3가 아닌 KCL v3와 KCL v1 간의 호환성을 유지합니다.

### 4단계: KCL 3.x 구성 개요 및 권장 사항
<a name="step4-configuration-migration"></a>

KCL 1.x 이후에 도입된 KCL 3.x 관련 구성에 대한 자세한 설명은 [KCL 구성](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) 및 [KCL 마이그레이션 클라이언트 구성](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)을 참조하세요.

**중요**  
KCL 3.x 및 이후 버전에서는 `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig`, `retrievalConfig` 객체를 직접 생성하는 대신 `ConfigsBuilder`를 사용하여 구성을 설정하는 것이 좋습니다. `ConfigsBuilder`는 KCL 애플리케이션을 구성하는 데 더 유연하고 유지 관리가 용이한 방법을 제공합니다.

#### KCL 3.x에서 업데이트 기본값을 사용한 구성
<a name="kcl3-configuration-overview"></a>

`billingMode`  
KCL 버전 1.x에서는 `billingMode`의 기본값이 `PROVISIONED`로 설정됩니다. 그러나 KCL 버전 3.x에서는 기본 `billingMode`가 `PAY_PER_REQUEST`(온디맨드 모드)입니다. 사용량에 따라 용량을 자동으로 조정하려면 리스 테이블에 온디맨드 용량 모드를 사용하는 것이 좋습니다. 리스 테이블에 프로비저닝된 용량을 사용하는 방법에 대한 지침은 [프로비저닝된 용량 모드를 사용하는 리스 테이블의 모범 사례](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)를 참조하세요.

`idleTimeBetweenReadsInMillis`  
KCL 버전 1.x에서는 `idleTimeBetweenReadsInMillis`의 기본값이 1,000(또는 1초)으로 설정됩니다. KCL 버전 3.x에서는 i`dleTimeBetweenReadsInMillis`의 기본값이 1,500(또는 1.5초)으로 설정되지만 Amazon DynamoDB Streams Kinesis Adapter는 이 기본값을 1,000(또는 1초)으로 재정의합니다.

#### KCL 3.x의 새로운 구성
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
이 구성은 새로 검색된 샤드가 처리를 시작하기 전의 시간 간격을 정의하며, 1.5 × `leaseAssignmentIntervalMillis`로 계산됩니다. 이 설정을 명시적으로 구성하지 않으면 시간 간격은 기본적으로 1.5 × `failoverTimeMillis`로 설정됩니다. 새 샤드를 처리하려면 리스 테이블을 스캔하고 리스 테이블에서 글로벌 보조 인덱스(GSI)를 쿼리해야 합니다. `leaseAssignmentIntervalMillis`를 낮추면 이러한 스캔 및 쿼리 작업의 빈도가 증가하여 DynamoDB 비용이 증가합니다. 새 샤드 처리 지연을 최소화하려면 이 값을 2,000(또는 2초)으로 설정하는 것이 좋습니다.

`shardConsumerDispatchPollIntervalMillis`  
이 구성은 상태 전환을 트리거하기 위해 샤드 소비자가 연속 폴링하는 간격을 정의합니다. KCL 버전 1.x에서 이 동작은 구성 가능한 설정으로 노출되지 않은 `idleTimeInMillis` 파라미터에 의해 제어되었습니다. KCL 버전 3.x에서는 KCL 버전 1.x 설정에서 ` idleTimeInMillis`에 사용된 값과 일치하도록 이 구성을 설정하는 것이 좋습니다.

### 5단계: KCL 2.x에서 KCL 3.x로 마이그레이션
<a name="step5-kcl2-to-kcl3"></a>

원활한 전환과 최신 Kinesis Client Library(KCL) 버전과의 호환성을 보장하려면 [KCL 2.x에서 KCL 3.x로 업그레이드](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics)하기 위한 마이그레이션 가이드의 지침 5\$18단계를 따르세요.

일반적인 KCL 3.x 문제 해결에 대한 내용은 [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)를 참조하세요.

# 이전 KCL 버전으로 롤백
<a name="kcl-migration-rollback"></a>

이 주제에서는 소비자 애플리케이션을 이전 KCL 버전으로 롤백하는 방법을 설명합니다. 롤백 프로세스는 다음 두 단계로 구성됩니다.

1. [KCL 마이그레이션 도구](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)를 실행합니다.

1. 이전 KCL 버전 코드를 재배포합니다.

## 1단계: KCL 마이그레이션 도구 실행
<a name="kcl-migration-rollback-step1"></a>

이전 KCL 버전으로 롤백해야 하는 경우 KCL 마이그레이션 도구를 실행해야 합니다. 이 도구는 다음 두 가지 중요한 작업을 수행합니다.
+ DynamoDB의 리스 테이블에서 워커 지표 테이블이라고 하는 메타데이터 테이블과 글로벌 보조 인덱스를 제거합니다. 이러한 아티팩트는 KCL 3.x에서 생성되지만 이전 버전으로 롤백하는 경우 필요하지 않습니다.
+ 모든 워커가 KCL 1.x와 호환되는 모드로 실행되고 이전 KCL 버전에서 사용된 로드 밸런싱 알고리즘을 사용하도록 합니다. KCL 3.x의 새 로드 밸런싱 알고리즘에 문제가 있는 경우 해당 문제를 즉시 완화합니다.

**중요**  
DynamoDB의 조정자 상태 테이블은 반드시 존재해야 하며 마이그레이션, 롤백, 롤포워드 프로세스 중에 삭제되어서는 안 됩니다.

**참고**  
소비자 애플리케이션의 모든 워커가 지정된 시간에 동일한 로드 밸런싱 알고리즘을 사용하는 것이 중요합니다. KCL 마이그레이션 도구를 사용하면 KCL 3.x 소비자 애플리케이션의 모든 워커가 KCL 1.x 호환 모드로 전환되므로 이전 KCL 버전으로 애플리케이션을 롤백하는 동안 모든 워커가 동일한 로드 밸런싱 알고리즘을 실행하게 됩니다.

[KCL GitHub 리포지토리](https://github.com/awslabs/amazon-kinesis-client/tree/master)의 스크립트 디렉터리에서 [KCL 마이그레이션 도구](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)를 다운로드할 수 있습니다. 조정자 상태 테이블, 워커 지표 테이블 및 리스 테이블에 쓸 수 있는 적절한 권한이 있는 워커 또는 호스트에서 스크립트를 실행합니다. KCL 소비자 애플리케이션에 대해 적절한 [IAM 권한](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html)이 구성되어 있는지 확인합니다. 지정된 명령을 사용하여 KCL 애플리케이션당 한 번만 스크립트를 실행합니다.

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### 파라미터
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
*리전*을 사용자의 AWS 리전으로 바꿉니다.

`--application_name`  
이 파라미터는 DynamoDB 메타데이터 테이블(리스 테이블, 조정자 상태 테이블 및 워커 지표 테이블)의 기본 이름을 사용하는 경우 필요합니다. 이러한 테이블에 사용자 지정 이름을 지정한 경우 이 파라미터를 생략할 수 있습니다. *applicationName*을 실제 KCL 애플리케이션 이름으로 바꿉니다. 이 도구는 사용자 지정 이름이 제공되지 않은 경우 이 이름을 사용하여 기본 테이블 이름을 파생합니다.

`--lease_table_name`  
이 파라미터는 KCL 구성에서 리스 테이블에 사용자 지정 이름을 설정한 경우에 필요합니다. 기본 테이블 이름을 사용하는 경우 이 파라미터를 생략할 수 있습니다. *leaseTableName*을 리스 테이블에 지정한 사용자 지정 테이블 이름으로 바꿉니다.

`--coordinator_state_table_name`  
이 파라미터는 KCL 구성에서 조정자 상태 테이블에 사용자 지정 이름을 설정한 경우에 필요합니다. 기본 테이블 이름을 사용하는 경우 이 파라미터를 생략할 수 있습니다. *coordinatorStateTableName*을 조정자 상태 테이블에 지정한 사용자 지정 테이블 이름으로 바꿉니다.

`--worker_metrics_table_name`  
이 파라미터는 KCL 구성에서 워커 지표 테이블에 사용자 지정 이름을 설정한 경우에 필요합니다. 기본 테이블 이름을 사용하는 경우 이 파라미터를 생략할 수 있습니다. *workerMetricsTableName*을 워커 지표 테이블에 지정한 사용자 지정 테이블 이름으로 바꿉니다.

## 2단계: 이전 KCL 버전으로 코드 재배포
<a name="kcl-migration-rollback-step2"></a>

**중요**  
KCL 마이그레이션 도구에서 생성된 출력에 버전 2.x가 언급된 경우 KCL 버전 1.x를 의미하는 것으로 해석해야 합니다. 스크립트를 실행해도 전체 롤백이 수행되는 것은 아니며, 로드 밸런싱 알고리즘만 KCL 버전 1.x에서 사용된 알고리즘으로 전환됩니다.

롤백을 위해 KCL 마이그레이션 도구를 실행하면 다음 메시지 중 하나가 표시됩니다.

메시지 1  
"Rollback completed. Your application was running 2x compatible functionality. Please rollback to your previous application binaries by deploying the code with your previous KCL version."  
**필요한 작업:** 이는 워커가 KCL 1.x 호환 모드로 실행 중이었음을 의미합니다. 이전 KCL 버전으로 워커에 코드를 재배포합니다.

메시지 2  
"Rollback completed. Your KCL Application was running 3x functionality and will rollback to 2x compatible functionality. If you don't see mitigation after a short period of time, please rollback to your previous application binaries by deploying the code with your previous KCL version."  
**필요한 작업:** 이는 워커가 KCL 3.x 모드로 실행 중이었고 KCL 마이그레이션 도구가 모든 워커를 KCL 1.x 호환 모드로 전환했음을 의미합니다. 이전 KCL 버전으로 워커에 코드를 재배포합니다.

메시지 3  
"Application was already rolled back. Any KCLv3 resources that could be deleted were cleaned up to avoid charges until the application can be rolled forward with migration."  
**필요한 작업:** 이는 워커가 KCL 1.x 호환 모드로 실행되도록 이미 롤백되었음을 의미합니다. 이전 KCL 버전으로 워커에 코드를 재배포합니다.

# 롤백 후 KCL 3.x로 롤포워드
<a name="kcl-migration-rollforward"></a>

이 주제에서는 롤백 후 소비자 애플리케이션을 KCL 3.x로 롤포워드하는 방법을 설명합니다. 롤포워드가 필요한 경우 2단계 프로세스를 완료해야 합니다.

1. [KCL 마이그레이션 도구](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)를 실행합니다.

1. KCL 3.x로 코드를 배포합니다.

## 1단계: KCL 마이그레이션 도구 실행
<a name="kcl-migration-rollforward-step1"></a>

다음 명령으로 KCL 마이그레이션 도구를 실행하여 KCL 3.x로 롤포워드합니다.

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### 파라미터
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
*리전*을 사용자의 AWS 리전으로 바꿉니다.

`--application_name`  
조정자 상태 테이블에 기본 이름을 사용하는 경우 이 파라미터가 필요합니다. 조정자 상태 테이블에 사용자 지정 이름을 지정한 경우 이 파라미터를 생략할 수 있습니다. *applicationName*을 실제 KCL 애플리케이션 이름으로 바꿉니다. 이 도구는 사용자 지정 이름이 제공되지 않은 경우 이 이름을 사용하여 기본 테이블 이름을 파생합니다.

`--coordinator_state_table_name`  
이 파라미터는 KCL 구성에서 조정자 상태 테이블에 사용자 지정 이름을 설정한 경우에 필요합니다. 기본 테이블 이름을 사용하는 경우 이 파라미터를 생략할 수 있습니다. *coordinatorStateTableName*을 조정자 상태 테이블에 지정한 사용자 지정 테이블 이름으로 바꿉니다.

롤포워드 모드로 마이그레이션 도구를 실행한 후 KCL은 KCL 3.x에 필요한 다음과 같은 DynamoDB 리소스를 생성합니다.
+ 리스 테이블의 글로벌 보조 인덱스
+ 워커 지표 테이블

## 2단계: KCL 3.x로 코드 배포
<a name="kcl-migration-rollforward-step2"></a>

롤포워드를 위해 KCL 마이그레이션 도구를 실행한 후 KCL 3.x로 워커에 코드를 배포합니다. 마이그레이션을 완료하려면 [8단계: 마이그레이션 완료](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish)를 참조하세요.

# 연습: DynamoDB Streams Kinesis 어댑터
<a name="Streams.KCLAdapter.Walkthrough"></a>

이번 섹션에서는 Amazon Kinesis Client Library와 Amazon DynamoDB Streams Kinesis 어댑터를 사용하는 Java 애플리케이션에 대해 살펴보겠습니다. 이 애플리케이션은 한 테이블의 쓰기 작업이 두 번째 테이블에도 적용되면서 두 테이블의 내용이 동기화를 유지하는 데이터 복제의 예로 설명됩니다. 소스 코드는 [전체 프로그램: DynamoDB Streams Kinesis 어댑터](Streams.KCLAdapter.Walkthrough.CompleteProgram.md) 섹션을 참조하세요.

이 프로그램에서는 다음 작업을 수행합니다.

1. `KCL-Demo-src`와 `KCL-Demo-dst`라는 이름의 DynamoDB 테이블 2개를 생성합니다. 두 테이블 모두 스트림이 활성화되어 있습니다.

1. 항목을 추가, 업데이트 및 삭제하여 원본 테이블을 업데이트합니다. 이렇게 하면 데이터가 테이블의 스트림으로 기록됩니다.

1. 스트림에서 레코드를 읽고 DynamoDB 요청으로 재작성한 다음 대상 테이블에 요청을 적용합니다.

1. 원본 테이블과 대상 테이블을 스캔하여 내용이 동일한지 확인합니다.

1. 두 테이블을 삭제합니다.

이러한 단계는 다음 섹션에서 설명하며, 전체 애플리케이션은 연습 끝에 나와 있습니다.

**Topics**
+ [1단계: DynamoDB 테이블 생성](#Streams.KCLAdapter.Walkthrough.Step1)
+ [2단계: 소스 테이블의 업데이트 활동 생성](#Streams.KCLAdapter.Walkthrough.Step2)
+ [3단계: 스트림 처리](#Streams.KCLAdapter.Walkthrough.Step3)
+ [4단계: 양 테이블에 동일한 콘텐츠가 있는지 확인](#Streams.KCLAdapter.Walkthrough.Step4)
+ [5단계: 정리](#Streams.KCLAdapter.Walkthrough.Step5)
+ [전체 프로그램: DynamoDB Streams Kinesis 어댑터](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## 1단계: DynamoDB 테이블 생성
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

첫 번째 단계에서 두 개의 DynamoDB 테이블(소스 테이블과 대상 테이블)을 생성합니다. 원본 테이블 스트림의 `StreamViewType`은 `NEW_IMAGE`입니다. 이 말은 원본 테이블 항목이 변경될 때마다 항목의 "사후" 이미지가 스트림에 기록된다는 것을 의미합니다. 이러한 방식으로 스트림이 테이블의 모든 쓰기 작업을 추적합니다.

다음은 두 테이블 생성에 사용된 코드를 보여주는 예제입니다.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## 2단계: 소스 테이블의 업데이트 활동 생성
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

다음 단계는 원본 테이블의 쓰기 작업입니다. 이 작업을 하면 원본 테이블의 스트림 역시 거의 실시간으로 업데이트됩니다.

이 애플리케이션은 데이터 기록을 위해 `PutItem`, `UpdateItem` 및 `DeleteItem` API 작업을 호출하는 메서드를 사용하여 헬퍼 클래스를 정의합니다. 다음은 이러한 메서드의 사용 방법을 나타낸 코드 예제입니다.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## 3단계: 스트림 처리
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

이제 프로그램이 스트림을 처리합니다. DynamoDB Streams Kinesis 어댑터가 KCL과 DynamoDB Streams 엔드포인트 사이에서 투명 계층의 역할을 하기 때문에 코드에서 하위 수준 DynamoDB Streams를 호출할 필요 없이 KCL을 최대한 이용할 수 있습니다. 프로그램이 실행하는 작업은 다음과 같습니다.
+ KCL 인터페이스 정의를 준수하는 메서드인 `StreamsRecordProcessor`, `initialize` 및 `processRecords`을 사용하여 레코드 프로세서 클래스인 `shutdown`를 정의합니다. `processRecords` 메서드에는 원본 테이블의 스트림에서 데이터를 읽어 대상 테이블에 기록하는 데 필요한 로직이 저장됩니다.
+ 레코드 프로세서 클래스의 클래스 팩토리(`StreamsRecordProcessorFactory`)를 정의합니다. Java 프로그램이 KCL을 사용하려면 이 팩토리가 필요합니다.
+ 새로운 KCL `Worker`를 인스턴스화하여 클래스 팩토리와 연동시킵니다.
+ 레코드 처리가 완료되면 `Worker`를 종료합니다.

선택적으로 스트림 KCL 어댑터 구성에서 캐치업 모드를 활성화하면 스트림 처리 지연이 1분(기본값)을 초과할 경우 GetRecords API 직접 호출 속도가 자동으로 3배(기본값)로 조정되어 스트림 소비자가 테이블의 높은 처리량 급증을 처리하는 데 도움이 됩니다.

KCL 인터페이스 정의에 대한 자세한 내용은 *Amazon Kinesis Data Streams 개발자 안내서*의 [Kinesis Client Library를 사용하여 소비자 개발](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)을 참조하세요.

다음은 `StreamsRecordProcessor`의 메인 루프를 나타낸 코드 예제입니다. `case` 문은 스트림 레코드에 표시되는 `OperationType`에 따라 어떤 작업을 실행할지 결정합니다.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## 4단계: 양 테이블에 동일한 콘텐츠가 있는지 확인
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

이 시점에서는 원본 테이블과 대상 테이블의 내용이 동기화 상태를 유지합니다. 애플리케이션이 두 테이블에 대해 `Scan` 요청을 하여 내용이 실제로 동일한지 확인합니다.

`DemoHelper` 클래스에는 하위 수준 `ScanTable` API를 호출하는 `Scan` 메서드가 포함되어 있습니다. 다음 예제는 이 작업을 수행하는 방법을 보여줍니다.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## 5단계: 정리
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

데모가 완료되면 애플리케이션이 원본 테이블과 대상 테이블을 삭제합니다. 다음 코드 예제를 참조하세요. 하지만 테이블이 삭제된 후에도 스트림은 최대 24시간까지 사용 가능하며, 이 시간이 지나면 자동 삭제됩니다.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# 전체 프로그램: DynamoDB Streams Kinesis 어댑터
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

다음은 [연습: DynamoDB Streams Kinesis 어댑터](Streams.KCLAdapter.Walkthrough.md)에서 설명한 작업을 실행하는 전체 Java 프로그램입니다. 프로그램을 실행하면 다음과 비슷한 출력 화면이 보여야 합니다.

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**중요**  
 이 프로그램을 실행하려면 클라이언트 애플리케이션이 정책을 사용하여 DynamoDB 및 Amazon CloudWatch에 액세스할 수 있어야 합니다. 자세한 내용은 [DynamoDB에 대한 자격 증명 기반 정책](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies) 섹션을 참조하세요.

소스 코드는 4개의 `.java` 파일로 구성됩니다. 이 프로그램을 빌드하려면 Amazon Kinesis Client Library(KCL) 3.x 및 AWS SDK for Java v2를 전이적 종속성으로 포함하는 다음 종속성을 추가합니다.

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

소스 파일 준비:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```