

# DynamoDB Streams에 대한 변경 데이터 캡처
<a name="Streams"></a>

 DynamoDB Streams는 DynamoDB 테이블에서 시간 순서에 따라 항목 수준 수정을 캡처하고 이 정보를 최대 24시간 동안 로그에 저장합니다. 로그와 데이터 항목은 변경 전후 거의 실시간으로 나타나므로 애플리케이션에서 이러한 로그와 데이터에 액세스할 수 있습니다.

 유휴 시 암호화는 DynamoDB Streams의 데이터를 암호화합니다. 자세한 내용은 [DynamoDB 저장 데이터 암호화](EncryptionAtRest.md) 섹션을 참조하세요.

*DynamoDB 스트림*은 DynamoDB 테이블 항목의 변경 사항에 대한 정렬된 정보 흐름입니다. 테이블에서 스트림을 활성화하면 DynamoDB가 테이블 데이터 항목의 모든 수정에 대한 정보를 캡처합니다.

애플리케이션에서 테이블 항목을 생성, 업데이트 또는 삭제할 때마다 DynamoDB Streams는 수정된 항목의 기본 키 속성을 사용하여 스트림 레코드를 작성합니다. *스트림 레코드*에는 DynamoDB 테이블의 단일 항목에 대한 데이터 수정 정보가 포함되어 있습니다. 수정된 항목의 "이전" 및 "이후" 이미지 등과 같이 스트림 레코드에서 추가 정보를 캡처하도록 스트림을 구성할 수 있습니다.

DynamoDB Streams는 다음을 보장할 수 있습니다.
+ 각 스트림 기록은 스트림에서 한 번만 나타납니다.
+ DynamoDB 테이블에서 수정된 각 항목의 스트림 레코드는 항목의 실제 수정과 동일한 순서로 표시됩니다.

DynamoDB Streams는 거의 실시간으로 스트림 레코드를 작성하므로 이러한 스트림을 소비하고 내용을 바탕으로 조치를 취할 수 있는 애플리케이션을 빌드할 수 있습니다.

**Topics**
+ [DynamoDB Streams에 대한 엔드포인트](#Streams.Endpoints)
+ [스트림 활성화](#Streams.Enabling)
+ [스트림 판독 및 처리](#Streams.Processing)
+ [DynamoDB Streams 및 유지 시간(TTL)](time-to-live-ttl-streams.md)
+ [DynamoDB Streams Kinesis 어댑터를 사용하여 스트림 레코드 처리](Streams.KCLAdapter.md)
+ [DynamoDB Streams 하위 수준 API: Java 예시](Streams.LowLevel.Walkthrough.md)
+ [DynamoDB Streams 및 AWS Lambda 트리거](Streams.Lambda.md)
+ [DynamoDB 스트림 및 Apache Flink](StreamsApacheFlink.xml.md)

## DynamoDB Streams에 대한 엔드포인트
<a name="Streams.Endpoints"></a>

AWS에서는 DynamoDB 및 DynamoDB Streams의 엔드포인트가 별개입니다. 데이터베이스 테이블 및 인덱스를 사용하려면 애플리케이션이 DynamoDB 엔드포인트에 액세스해야 합니다. DynamoDB Streams 레코드를 읽고 처리하려면 애플리케이션이 동일한 리전의 DynamoDB Streams 엔드포인트에 액세스해야 합니다.

DynamoDB Streams는 두 가지 엔드포인트 세트를 제공합니다. 스크립트는 다음과 같습니다.
+ **IPv4 전용 엔드포인트**: `streams.dynamodb.<region>.amazonaws.com` 명명 규칙이 있는 엔드포인트입니다.
+ **듀얼 스택 엔드포인트**: IPv4 및 IPv6 모두와 호환되고 `streams-dynamodb.<region>.api.aws` 명명 규칙을 따르는 새로운 엔드포인트입니다.

**참고**  
DynamoDB 및 DynamoDB Streams 리전 및 엔드포인트의 전체 목록은 **AWS 일반 참조의 [리전 및 엔드포인트](https://docs.aws.amazon.com/general/latest/gr/rande.html)를 참조하세요.

AWS SDK는 DynamoDB와 DynamoDB Streams의 클라이언트를 따로 제공합니다. 요구 사항에 따라 애플리케이션은 DynamoDB 엔드포인트, DynamoDB Streams 엔드포인트 또는 두 엔드포인트에 동시에 액세스할 수 있습니다. 두 엔드포인트에 모두 연결하려면 애플리케이션이 2개 클라이언트(DynamoDB용 클라이언트 하나와 DynamoDB Streams용 클라이언트 하나)를 인스턴스화해야 합니다.

## 스트림 활성화
<a name="Streams.Enabling"></a>

AWS CLI 또는 AWS SDK 중 하나를 사용하여 새 테이블을 생성할 때 테이블에서 스트림을 활성화할 수 있습니다. 또한 기존 테이블에서도 스트림을 활성화하거나 비활성화 할 수 있으며, 스트림 설정도 변경할 수 있습니다. DynamoDB Streams는 비동기식으로 작동하므로 스트림을 활성화하더라도 테이블의 성능에 영향을 미치지 않습니다.

DynamoDB Streams를 관리하는 가장 용이한 방법은 AWS Management Console을 사용하는 것입니다.

1. AWS Management Console에 로그인하고 [https://console.aws.amazon.com/dynamodb/](https://console.aws.amazon.com/dynamodb/)에서 DynamoDB 콘솔을 엽니다.

1. DynamoDB 콘솔 대시보드에서 **Tables(테이블)**를 선택하고 기존 테이블을 선택합니다.

1. **내보내기 및 스트림(Exports and streams)** 탭을 선택합니다.

1. **DynamoDB 스트림 세부 정보** 섹션에서 **켜기**를 선택합니다.

1. **DynamoDB 스트림 켜기** 페이지에서 테이블의 데이터가 수정될 때마다 스트림에 기록할 정보를 선택합니다.
   + **키 속성만(Key attributes only)** - 수정된 항목의 키 속성만을 표시합니다.
   + **새로운 이미지** - 항목의 수정 후 전체 모습을 보여 줍니다.
   + **이전 이미지** - 항목의 수정 전 전체 모습을 보여 줍니다.
   + **새 이미지와 이전 이미지** - 항목의 새 이미지와 이전 이미지를 모두 보여 줍니다.

   원하는 대로 설정되었으면 **스트림 켜기**를 선택합니다.

1. (선택 사항) 기존 스트림을 비활성화하려면 **DynamoDB 스트림 세부 정보** 아래의 **끄기**를 선택합니다.

또한 `CreateTable` 또는 `UpdateTable` API 작업을 사용하여 스트림을 활성화하거나 수정할 수도 있습니다. `StreamSpecification` 파라미터는 스트림 구성 방식을 결정합니다.
+ `StreamEnabled` - 테이블에서 스트림을 활성화(`true`)할지 비활성화(`false`)할지 여부를 지정합니다.
+ `StreamViewType` - 테이블의 데이터가 수정될 때마다 스트림에 쓸 정보를 지정합니다.
  + `KEYS_ONLY` - 수정된 항목의 키 속성만 표시합니다.
  + `NEW_IMAGE` - 항목의 수정 후 전체 모습을 보여 줍니다.
  + `OLD_IMAGE` - 항목의 수정 전 전체 모습을 보여 줍니다.
  + `NEW_AND_OLD_IMAGES` - 항목의 새 이미지와 이전 이미지를 모두 보여 줍니다.

언제든지 스트림을 활성화 또는 비활성화할 수 있습니다. 그러나 이미 스트림을 가지고 있는 테이블에서 스트림을 활성화하려고 하면 `ValidationException`이 수신됩니다. 또한 스트림을 가지고 있지 않은 테이블에서 스트림을 비활성화하려고 하면 `ValidationException`이 수신됩니다.

`StreamEnabled`를 `true`로 설정하면 DynamoDB가 배정된 스트림 서술자를 사용하여 새 스트림을 생성합니다. 테이블에서 스트림을 비활성화한 후 다시 활성화하면 다른 스트림 서술자를 갖는 새 스트림이 생성됩니다.

모든 스트림은 Amazon 리소스 이름(ARN)을 통해 고유 식별됩니다. 아래에는 이름이 `TestTable`인 DynamoDB 테이블에 위치한 스트림의 ARN이 예시되어 있습니다.

```
arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291
```

테이블의 가장 최근 스트림 서술자를 확인하려면 DynamoDB `DescribeTable` 요청을 실행한 뒤 응답에서 `LatestStreamArn` 요소를 찾습니다.

**참고**  
스트림을 설정한 후에는 `StreamViewType`을 편집할 수 없습니다. 스트림을 설정한 후 변경해야 하는 경우 현재 스트림을 비활성화하고 새 스트림을 생성해야 합니다.

## 스트림 판독 및 처리
<a name="Streams.Processing"></a>

스트림을 판독하고 처리하려면 애플리케이션이 DynamoDB Streams 엔드포인트에 연결되어 API 요청을 실행해야 합니다.

스트림은 *스트림 기록*으로 구성되어 있습니다. 각 스트림 레코드는 스트림이 속한 DynamoDB 테이블에서의 단일 데이터 수정을 나타냅니다. 각 스트림 레코드에는 레코드가 스트림에 게시되는 순서를 반영하는 시퀀스 번호가 할당됩니다.

스트림 기록은 그룹 또는 *샤드*로 구성되어 있습니다. 각 샤드는 다중 스트림 레코드 저장소 역할을 하며 이러한 기록 액세스 및 반복 처리에 필요한 정보를 담고 있습니다. 샤드 내 스트림 레코드는 24시간 후 자동으로 제거됩니다.

샤드는 한시적입니다. 필요에 따라 자동으로 생성되었다가 삭제됩니다. 또한 모든 샤드는 여러 개의 새로운 샤드로 분할될 수 있으며 이 프로세스도 자동으로 이루어집니다. (상위 샤드가 하위 하드를 하나만 가질 수 있다는 점도 유의하세요.) 상위 테이블에서 쓰기 활동이 매우 활발할 경우 애플리케이션이 여러 샤드로부터 동시에 레코드를 처리할 수 있도록 샤드가 분할될 수 있습니다.

스트림을 비활성화하면 열려 있는 샤드가 모두 닫힙니다. 스트림의 데이터는 24시간 동안 읽기 가능한 상태로 유지됩니다.

샤드에는 계보(상위 및 하위)가 있으므로 애플리케이션은 항상 상위 샤드를 하위 샤드보다 먼저 처리해야 합니다. 그래야 스트림 레코드도 올바른 순서로 처리됩니다. (DynamoDB Streams Kinesis 어댑터를 사용하는 경우 이 프로세스가 자동으로 처리됩니다. 애플리케이션이 올바른 순서로 샤드와 스트림 레코드를 처리하고 자동으로 새로운 또는 만료된 샤드, 그리고 애플리케이션이 실행되는 동안 분할된 샤드를 처리합니다. 자세한 내용은 [DynamoDB Streams Kinesis 어댑터를 사용하여 스트림 레코드 처리](Streams.KCLAdapter.md) 섹션을 참조하세요.)

스트림, 스트림 샤드 및 샤드에 포함된 스트림 기록 간 관계가 다음 도표에 나타나 있습니다.

![\[DynamoDB Streams 구조. 데이터 수정을 나타내는 스트림 레코드는 샤드로 구성됩니다.\]](http://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/images/streams-terminology.png)


**참고**  
항목 안의 어떠한 데이터도 변경하지 않는 `PutItem` 또는 `UpdateItem` 작업을 수행하면 DynamoDB Streams는 해당 작업에 대하여 스트림 레코드를 작성하지 *않습니다*.

스트림에 액세스하여 그 안에 포함된 스트림 기록을 처리하는 과정은 다음과 같습니다.
+ 액세스하고자 하는 스트림의 고유 ARN을 선택합니다.
+ 처리하고자 하는 스트림 레코드가 포함된 샤드를 스트림에서 선택합니다.
+ 샤드에 액세스한 뒤 원하는 스트림 레코드를 조회합니다.

**참고**  
최대 2개의 프로세스까지 동일한 스트림의 샤드에서 동시에 읽을 수 있습니다. 샤드당 읽기 프로세스가 2개를 초과하면 병목이 발생할 수 있습니다.

DynamoDB Streams API에서는 애플리케이션 프로그램에서 사용하기 위한 다음과 같은 작업을 제공합니다.
+  `[ListStreams](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_ListStreams.html)` - 현재 계정 및 엔드포인트에 대한 스트림 서술자 목록을 반환합니다. 선택 사항으로 특정 테이블 이름에 대하여 스트림 서술자만 요청할 수 있습니다.
+ `[DescribeStream](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_DescribeStream.html)` — 스트림의 현재 상태, Amazon 리소스 이름(ARN), 샤드의 구성 및 해당되는 DynamoDB 테이블을 포함하여 스트림에 대한 정보를 반환합니다. 선택적으로 `ShardFilter` 필드를 사용하여 상위 샤드와 연결된 기존 하위 샤드를 검색할 수 있습니다.
+ `[GetShardIterator](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetShardIterator.html)` - 샤드 내 위치를 설명하는 *샤드 반복자*를 반환합니다. 반복자가 스트림의 가장 오래된 지점, 최신 지점 및 특정 지점에 대한 액세스를 제공하도록 요청할 수 있습니다.
+ `[GetRecords](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_streams_GetRecords.html)` - 지정된 샤드 내에서 스트림 레코드를 반환합니다. `GetShardIterator` 요청으로부터 반환된 샤드 반복자를 제공해야 합니다.

요청 및 응답 예제를 포함하여 이러한 API 작업에 대한 전체 설명은 [Amazon DynamoDB Streams API 참조](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Operations_Amazon_DynamoDB_Streams.html)를 참조하세요.

### 샤드 검색
<a name="Streams.ShardDiscovery"></a>



두 가지 강력한 방법을 사용하여 DynamoDB 스트림에서 새 샤드를 검색합니다. Amazon DynamoDB Streams 사용자는 다음 두 가지 효과적인 방법으로 새 샤드를 추적하고 식별할 수 있습니다.

**전체 스트림 토폴로지 폴링**  
`DescribeStream` API를 사용하여 스트림을 정기적으로 폴링합니다. 그러면 생성된 새 샤드를 포함하여 스트림의 모든 샤드가 반환됩니다. 시간 경과에 따른 결과를 비교하면 새로 추가된 샤드를 감지할 수 있습니다.

**하위 샤드 검색**  
`ShardFilter` 파라미터와 함께 `DescribeStream` API를 사용하여 샤드의 하위 집합을 찾습니다. 요청에 상위 샤드를 지정하면 DynamoDB Streams가 바로 밑에 있는 하위 샤드를 반환합니다. 이 접근 방식은 전체 스트림을 스캔하지 않고 샤드 계보만 추적하면 되는 경우에 유용합니다.  
DynamoDB Streams에서 데이터를 사용하는 애플리케이션은 `ShardFilter` 파라미터를 사용하여 닫힌 샤드를 읽는 것에서 하위 샤드로 효율적으로 전환할 수 있으므로 모든 닫힌 샤드와 열린 샤드에 대한 샤드 맵을 검색하고 통과하기 위해 `DescribeStream` API를 반복적으로 직접 호출하지 않아도 됩니다. 이렇게 하면 상위 샤드가 닫힌 후 하위 샤드를 빠르게 검색하여 스트림 처리 애플리케이션의 응답성과 비용 효과를 높일 수 있습니다.

두 방법 모두 DynamoDB Streams의 진화하는 구조를 계속해서 추적할 수 있으므로 중요한 데이터 업데이트 또는 샤드 수정을 놓치지 않을 수 있도록 합니다.

### DynamoDB Streams에 대한 데이터 보존 제한
<a name="Streams.DataRetention"></a>

DynamoDB Streams의 모든 데이터는 24시간 동안 유지됩니다. 특정 테이블에 대한 지난 24시간 동안의 활동을 조회하고 분석할 수 있습니다. 그러나 24시간이 지난 데이터는 언제든 트리밍(제거)될 수 있습니다.

테이블에서 스트림을 비활성화해도 스트림에 포함된 데이터는 24시간 동안 읽기 가능한 상태가 유지됩니다. 이 시긴 이후 데이터는 만료되며 스트림 기록은 자동으로 삭제됩니다. 기존 스트림을 수동으로 삭제하기 위한 메커니즘은 없습니다. 보유 제한이 만료(24시간)될 때까지 기다려야 하며, 모든 스트림 레코드가 삭제됩니다.

# DynamoDB Streams 및 유지 시간(TTL)
<a name="time-to-live-ttl-streams"></a>

테이블에서 Amazon DynamoDB Streams를 활성화하고 만료된 항목의 스트림 레코드를 처리하여 [유지 시간](TTL.md)(TTL)에 의해 삭제된 항목을 백업하거나 처리할 수 있습니다. 자세한 내용은 [스트림 판독 및 처리](Streams.md#Streams.Processing) 섹션을 참조하세요.

스트림 레코드에는 사용자 ID 필드 `Records[<index>].userIdentity`가 포함되어 있습니다.

만료 후 유지 시간(TTL) 프로세스에 의해 삭제된 항목에는 다음 필드가 있습니다.
+ `Records[<index>].userIdentity.type`

  `"Service"`
+ `Records[<index>].userIdentity.principalId`

  `"dynamodb.amazonaws.com"`

**참고**  
글로벌 테이블에서 TTL을 사용하는 경우 TTL이 수행된 리전에 `userIdentity` 필드가 설정됩니다. 삭제가 복제될 때 다른 리전에서는 이 필드가 설정되지 않습니다.

다음 JSON은 단일 스트림 레코드의 해당 부분을 보여 줍니다.

```
"Records": [
    {
        ...

        "userIdentity": {
            "type": "Service",
            "principalId": "dynamodb.amazonaws.com"
        }

        ...

    }
]
```

## DynamoDB Streams 및 Lambda를 사용하여 TTL 삭제 항목 보관
<a name="streams-archive-ttl-deleted-items"></a>

[DynamoDB 유지 시간(TTL)](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/TTL.html), [DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) 및 [AWS Lambda](https://aws.amazon.com/lambda/)를 결합하면 데이터 보관을 간소화하고 DynamoDB 스토리지 비용을 절감하며 코드 복잡성을 줄일 수 있습니다. Lambda를 스트림 소비자로 사용할 경우 많은 이점이 있습니다. 특히 Kinesis Client Library(KCL)와 같은 여타 소비자에 비해 비용이 절감됩니다. Lambda를 사용하여 이벤트를 소비할 때 DynamoDB 스트림에서 `GetRecords` API 호출에 대한 요금이 부과되지 않으며, Lambda는 스트림 이벤트에서 JSON 패턴을 식별하여 이벤트 필터링을 제공할 수 있습니다. 이벤트 패턴 콘텐츠 필터링을 통해 최대 5가지 필터를 정의하여 처리를 위해 Lambda로 전송되는 이벤트를 제어할 수 있습니다. 이렇게 함으로써 Lambda 함수의 호출 수가 감소하고 코드가 간소화되며 전체 비용이 절감됩니다.

DynamoDB Streams에는 `Create`, `Modify` 및 `Remove` 작업과 같은 모든 데이터 수정 사항이 포함되어 있지만 이로 인해 아카이브 Lambda 함수가 원치 않게 호출될 수 있습니다. 예를 들어 시간당 200만 개의 데이터 수정 사항이 스트림으로 유입되는 테이블이 있는데 이 중 5% 미만이 TTL 프로세스를 통해 만료되고 보관해야 하는 항목 삭제라고 가정해 보겠습니다. [Lambda 이벤트 소스 필터](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)를 사용하면 Lambda 함수가 시간당 10만 회만 호출합니다. 이벤트 필터링의 결과로, 이벤트 필터링 없이 수행될 200만 회의 호출 대신 필요한 호출에 대해서만 요금이 부과됩니다.

이벤트 필터링은 선택한 이벤트(DynamoDB 스트림)에서 읽기를 수행하고 Lambda 함수를 호출하는 리소스인 [Lambda 이벤트 소스 매핑](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html)에 적용됩니다. 다음 다이어그램에서는 스트림 및 이벤트 필터를 사용하여 Lambda 함수에 의해 유지 시간(TTL) 삭제 항목이 소비되는 방법을 볼 수 있습니다.

![\[TTL 프로세스를 통해 삭제된 항목은 스트림과 이벤트 필터를 사용하는 Lambda 함수를 시작합니다.\]](http://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/images/streams-lambda-ttl.png)


### DynamoDB 유지 시간(TTL) 이벤트 필터 패턴
<a name="ttl-event-filter-pattern"></a>

이벤트 소스 매핑 [필터 기준](https://docs.aws.amazon.com/lambda/latest/dg/API_FilterCriteria.html)에 다음 JSON을 추가하면 TTL 삭제 항목에 대해서만 Lambda 함수를 호출할 수 있습니다.

```
{
    "Filters": [
        {
            "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }
        }
    ]
}
```

### AWS Lambda 이벤트 소스 매핑 생성
<a name="create-event-source-mapping"></a>

다음 코드 조각을 사용하여 테이블의 DynamoDB 스트림에 연결할 수 있는 필터링된 이벤트 소스 매핑을 생성합니다. 각 코드 블록에는 이벤트 필터 패턴이 포함됩니다.

------
#### [ AWS CLI ]

```
aws lambda create-event-source-mapping \
--event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \
--batch-size 10 \
--enabled \
--function-name test_func \
--starting-position LATEST \
--filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
```

------
#### [ Java ]

```
LambdaClient client = LambdaClient.builder()
        .region(Region.EU_WEST_1)
        .build();

Filter userIdentity = Filter.builder()
        .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}")
        .build();

FilterCriteria filterCriteria = FilterCriteria.builder()
        .filters(userIdentity)
        .build();

CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder()
        .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000")
        .batchSize(10)
        .enabled(Boolean.TRUE)
        .functionName("test_func")
        .startingPosition("LATEST")
        .filterCriteria(filterCriteria)
        .build();

try{
    CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest);
    System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn());

}catch (ServiceException e){
    System.out.println(e.getMessage());
}
```

------
#### [ Node ]

```
const client = new LambdaClient({ region: "eu-west-1" });

const input = {
    EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000",
    BatchSize: 10,
    Enabled: true,
    FunctionName: "test_func",
    StartingPosition: "LATEST",
    FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] }
}

const command = new CreateEventSourceMappingCommand(input);

try {
    const results = await client.send(command);
    console.log(results);
} catch (err) {
    console.error(err);
}
```

------
#### [ Python ]

```
session = boto3.session.Session(region_name = 'eu-west-1')
client = session.client('lambda')

try:
    response = client.create_event_source_mapping(
        EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000',
        BatchSize=10,
        Enabled=True,
        FunctionName='test_func',
        StartingPosition='LATEST',
        FilterCriteria={
            'Filters': [
                {
                    'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"
                },
            ]
        }
    )
    print(response)
except Exception as e:
    print(e)
```

------
#### [ JSON ]

```
{
  "userIdentity": {
     "type": ["Service"],
     "principalId": ["dynamodb.amazonaws.com"]
   }
}
```

------

# DynamoDB Streams Kinesis 어댑터를 사용하여 스트림 레코드 처리
<a name="Streams.KCLAdapter"></a>

Amazon Kinesis 어댑터 사용은 Amazon DynamoDB의 스트림을 소비할 때 권장되는 방법입니다. DynamoDB Streams API는 Kinesis Data Streams와 유사합니다. 두 서비스 모두 데이터 스트림이 샤드로 구성되어 있습니다. 샤드란 스트림 레코드의 컨테이너입니다. 두 서비스의 API에는 `ListStreams`, `DescribeStream`, `GetShards` 및 `GetShardIterator` 작업이 포함되어 있습니다. (이러한 DynamoDB Streams 작업은 Kinesis Data Streams의 해당 작업과 유사하지만 100% 동일하지는 않습니다.)

DynamoDB Streams 사용자는 KCL에 있는 디자인 패턴을 활용하여 DynamoDB Streams 샤드와 스트림 레코드를 처리할 수 있습니다. 이렇게 하려면 DynamoDB Streams Kinesis 어댑터를 사용합니다. Kinesis 어댑터는 DynamoDB Streams의 레코드를 사용 및 처리하는 데 KCL을 사용할 수 있도록 Kinesis Data Streams 인터페이스를 구현합니다. DynamoDB Streams Kinesis 어댑터를 설정하고 설치하는 방법에 대한 지침은 [GitHub 리포지토리](https://github.com/awslabs/dynamodb-streams-kinesis-adapter)를 참조하세요.

Kinesis Client Library(KCL)를 사용하여 Kinesis Data Streams의 애플리케이션을 작성할 수 있습니다. KCL은 하위 수준의 Kinesis Data Streams API에 유용한 추상화를 제공하여 코딩을 단순화합니다. KCL에 대한 자세한 내용은 *Amazon Kinesis Data Streams 개발자 안내서*의 [Kinesis Client Library를 사용하여 소비자 개발](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)을 참조하세요.

DynamoDB는 AWS SDK for Java v2.x와 함께 KCL 버전 3.x를 사용할 것을 권장합니다. AWS SDK for Java v1.x용 AWS SDK를 사용하는 현재 DynamoDB Streams Kinesis Adapter 버전 1.x는 [AWS SDK 및 도구 유지 관리 정책](https://docs.aws.amazon.com/sdkref/latest/guide/maint-policy.html)에 따라 전환 기간 동안 의도한 대로 수명 주기 전체에 걸쳐 계속 완전히 지원됩니다.

**참고**  
Amazon Kinesis Client Library(KCL) 버전 1.x 및 2.x는 오래되었습니다. KCL 1.x는 2026년 1월 30일에 지원이 종료됩니다. 버전 1.x를 사용하는 KCL 애플리케이션을 2026년 1월 30일 이전에 최신 KCL 버전으로 마이그레이션하는 것이 좋습니다. 최신 KCL 버전을 찾으려면 GitHub의 [Amazon Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) 페이지를 참조하세요. 최신 KCL 버전에 대한 자세한 내용은 [Kinesis Client Library 사용](https://docs.aws.amazon.com/streams/latest/dev/kcl.html)을 참조하세요. KCL 1.x에서 KCL 3.x로의 마이그레이션에 대한 자세한 내용은 KCL 1.x에서 KCL 3.x로의 마이그레이션을 참조하세요.

다음 다이어그램은 이러한 라이브러리가 서로 상호 작용하는 방법을 보여 줍니다.

![\[DynamoDB Streams 레코드 처리를 위한 DynamoDB Streams, Kinesis Data Streams 및 KCL 간의 상호 작용입니다.\]](http://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/images/streams-kinesis-adapter.png)


DynamoDB Streams Kinesis 어댑터가 준비되어 있으면 DynamoDB Streams 엔드포인트로 원활하게 전달되는 API 호출을 통해 KCL 인터페이스 개발을 시작할 수 있습니다.

애플리케이션이 시작되면 KCL을 호출하여 작업자를 인스턴스화합니다. 작업자에게 애플리케이션의 구성 정보를 제공해야 합니다. 제공해야 하는 구성 정보에는 스트림 서술자와 AWS 자격 증명, 제공하는 레코드 프로세서 클래스의 이름 등이 있습니다. 레코드 프로세서에서 코드를 실행하면 작업자는 다음 작업을 수행합니다.
+ 스트림에 연결합니다
+ 스트림 내 샤드를 열거합니다.
+ 스트림 내에서 닫힌 상위 샤드의 하위 샤드를 확인하고 열거
+ 샤드 연결을 다른 작업자(있는 경우)와 조정합니다.
+ 관리하는 모든 샤드의 레코드 프로세서를 인스턴스화합니다
+ 스트림에서 레코드를 가져옵니다.
+ 처리량이 많을 때 GetRecords API 직접 호출 속도를 조정합니다(캐치업 모드가 구성된 경우).
+ 해당하는 레코드 프로세서로 레코드를 푸시합니다
+ 처리된 레코드에 대해 체크포인트를 수행합니다
+ 작업자 인스턴스 수가 변경되면 샤드-작업자 연결을 조정합니다
+ 샤드가 분할되면 샤드-작업자 연결을 조정합니다.

KCL 어댑터는 임시 처리량 증가를 처리하기 위한 직접 호출 속도 자동 조정 기능인 캐치업 모드를 지원합니다. 스트림 처리 지연이 구성 가능한 임계값(기본값 1분)을 초과하면 캐치업 모드를 통해 GetRecords API 직접 호출 빈도가 구성 가능한 값(기본값 3배)으로 조정되어 레코드를 더 빠르게 가져오게 되며, 지연이 감소하면 평소 상태로 돌아옵니다. 이는 DynamoDB 쓰기 활동이 기본 폴링 속도를 사용하는 소비자에게 과부하를 일으킬 수 있는 처리량이 많은 기간에 유용합니다. `catchupEnabled` 구성 파라미터(기본값 false)를 통해 캐치업 모드를 활성화할 수 있습니다.

**참고**  
여기에 나온 KCL 개념에 대한 설명은 *Amazon Kinesis Data Streams 개발자 안내서*의 [Kinesis Client Library를 사용하여 소비자 개발](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)을 참조하세요.  
AWS Lambda에 스트림을 사용하는 방법에 대한 자세한 내용은 [DynamoDB Streams 및 AWS Lambda 트리거](Streams.Lambda.md) 섹션을 참조하세요.

# KCL 1.x에서 KCL 3.x로 마이그레이션
<a name="streams-migrating-kcl"></a>

## 개요
<a name="migrating-kcl-overview"></a>

이 가이드에서는 소비자 애플리케이션을 KCL 1.x에서 KCL 3.x로 마이그레이션하기 위한 지침을 제공합니다. KCL 1.x와 KCL 3.x의 아키텍처 차이로 인해 마이그레이션하려면 호환성을 보장하기 위해 여러 구성 요소를 업데이트해야 합니다.

KCL 1.x는 KCL 3.x와 다른 클래스 및 인터페이스를 사용합니다. 먼저 레코드 프로세서, 레코드 프로세서 팩토리 및 워커 클래스를 KCL 3.x 호환 형식으로 마이그레이션하고 KCL 1.x에서 KCL 3.x로 마이그레이션하는 단계를 따라야 합니다.

## 마이그레이션 단계
<a name="migration-steps"></a>

**Topics**
+ [1단계: 레코드 프로세서 마이그레이션](#step1-record-processor)
+ [2단계: 레코드 프로세서 팩토리 마이그레이션](#step2-record-processor-factory)
+ [3단계: 워커 마이그레이션](#step3-worker-migration)
+ [4단계: KCL 3.x 구성 개요 및 권장 사항](#step4-configuration-migration)
+ [5단계: KCL 2.x에서 KCL 3.x로 마이그레이션](#step5-kcl2-to-kcl3)

### 1단계: 레코드 프로세서 마이그레이션
<a name="step1-record-processor"></a>

다음은 KCL 1.x DynamoDB Streams Kinesis 어댑터에 구현된 레코드 프로세서를 보여주는 예제입니다.

```
package com.amazonaws.kcl;

import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException;
import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason;
import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;

public class StreamsRecordProcessor implements IRecordProcessor, IShutdownNotificationAware {
    @Override
    public void initialize(InitializationInput initializationInput) {
        //
        // Setup record processor
        //
    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        for (Record record : processRecordsInput.getRecords()) {
            String data = new String(record.getData().array(), Charset.forName("UTF-8"));
            System.out.println(data);
            if (record instanceof RecordAdapter) {
                // record processing and checkpointing logic
            }
        }
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) {
        if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) {
            try {
                shutdownInput.getCheckpointer().checkpoint();
            } catch (ShutdownException | InvalidStateException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @Override
    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
        try {
            checkpointer.checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow exception
            //
            e.printStackTrace();
        }
    }
}
```

**RecordProcessor 클래스를 마이그레이션하려면**

1. 다음과 같이 인터페이스를 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor` 및 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware`에서 `com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor`로 변경합니다.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
   // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware;
   
   import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
   ```

1. `initialize` 및 `processRecords` 메서드에 대한 import 문을 업데이트합니다.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput;
   import software.amazon.kinesis.lifecycle.events.InitializationInput;
   
   // import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
   import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
   ```

1. `shutdownRequested` 메서드를 새 메서드인 `leaseLost`, `shardEnded`, `shutdownRequested`으로 바꿉니다.

   ```
   //    @Override
   //    public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) {
   //        //
   //        // This is moved to shardEnded(...) and shutdownRequested(ShutdownReauestedInput)
   //        //
   //        try {
   //            checkpointer.checkpoint();
   //        } catch (ShutdownException | InvalidStateException e) {
   //            //
   //            // Swallow exception
   //            //
   //            e.printStackTrace();
   //        }
   //    }
   
       @Override
       public void leaseLost(LeaseLostInput leaseLostInput) {
   
       }
   
       @Override
       public void shardEnded(ShardEndedInput shardEndedInput) {
           try {
               shardEndedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   
       @Override
       public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
           try {
               shutdownRequestedInput.checkpointer().checkpoint();
           } catch (ShutdownException | InvalidStateException e) {
               //
               // Swallow the exception
               //
               e.printStackTrace();
           }
       }
   ```

다음은 업데이트된 버전의 레코드 프로세서 클래스입니다.

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;
import software.amazon.dynamodb.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.dynamodb.streamsadapter.adapter.DynamoDBStreamsKinesisClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import software.amazon.awssdk.services.dynamodb.model.Record;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    @Override
    public void initialize(InitializationInput initializationInput) {
        
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput processRecordsInput) {
        for (DynamoDBStreamsKinesisClientRecord record: processRecordsInput.records())
            Record ddbRecord = record.getRecord();
            // processing and checkpointing logic for the ddbRecord
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            //
            // Swallow the exception
            //
            e.printStackTrace();
        }
    }
}
```

**참고**  
DynamoDB Streams Kinesis Adapter는 이제 SDKv2 Record 모델을 사용합니다. SDKv2에서 복합 `AttributeValue` 객체(`BS`, `NS`, `M`, `L`, `SS`)는 null을 반환하지 않습니다. `hasBs()`, `hasNs()`, `hasM()`, `hasL()`, `hasSs()` 메서드를 사용하여 이러한 값이 존재하는지 확인합니다.

### 2단계: 레코드 프로세서 팩토리 마이그레이션
<a name="step2-record-processor-factory"></a>

레코드 프로세스 팩토리는 리스가 필요할 경우 레코드 프로세서 생성을 담당합니다. 다음은 KCL 1.x 팩토리의 예입니다.

```
package com.amazonaws.codesamples;

import software.amazon.dynamodb.AmazonDynamoDB;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements IRecordProcessorFactory {
    
    @Override
    public IRecordProcessor createProcessor() {
        return new StreamsRecordProcessor(dynamoDBClient, tableName);
    }
}
```

**`RecordProcessorFactory`를 마이그레이션하려면**
+ 구현된 인터페이스를 다음과 같이 `com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory`에서 `software.amazon.kinesis.processor.ShardRecordProcessorFactory`로 변경합니다.

  ```
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  import software.amazon.kinesis.processor.ShardRecordProcessor;
  
  // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
  
  // public class TestRecordProcessorFactory implements IRecordProcessorFactory {
  public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {
  
  Change the return signature for createProcessor.
  
  // public IRecordProcessor createProcessor() {
  public ShardRecordProcessor shardRecordProcessor() {
  ```

다음은 3.0의 레코드 프로세서 팩토리의 예입니다.

```
package com.amazonaws.codesamples;

import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsRecordProcessorFactory implements ShardRecordProcessorFactory {

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor();
    }
}
```

### 3단계: 워커 마이그레이션
<a name="step3-worker-migration"></a>

KCL 버전 3.0에서는 **Scheduler**라는 새로운 클래스가 **Worker** 클래스를 대체합니다. 다음은 KCL 1.x 워커의 예입니다.

```
final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...)
final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory();
final Worker worker = StreamsWorkerFactory.createDynamoDbStreamsWorker(
        recordProcessorFactory,
        workerConfig,
        adapterClient,
        amazonDynamoDB,
        amazonCloudWatchClient);
```

**작업자를 마이그레이션하려면**

1. `import` 클래스에 대한 `Worker` 문을 `Scheduler` 및 `ConfigsBuilder` 클래스에 대한 가져오기 문으로 변경합니다.

   ```
   // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
   import software.amazon.kinesis.coordinator.Scheduler;
   import software.amazon.kinesis.common.ConfigsBuilder;
   ```

1. `StreamTracker`를 가져오고 `StreamsWorkerFactory`의 가져오기를 `StreamsSchedulerFactory`로 변경합니다.

   ```
   import software.amazon.kinesis.processor.StreamTracker;
   // import software.amazon.dynamodb.streamsadapter.StreamsWorkerFactory;
   import software.amazon.dynamodb.streamsadapter.StreamsSchedulerFactory;
   ```

1. 애플리케이션을 시작할 위치를 선택합니다. `TRIM_HORIZON` 또는 `LATEST`일 수 있습니다.

   ```
   import software.amazon.kinesis.common.InitialPositionInStream;
   import software.amazon.kinesis.common.InitialPositionInStreamExtended;
   ```

1. `StreamTracker` 인스턴스를 만듭니다.

   ```
   StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(
           streamArn,
           InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
   );
   ```

1. `AmazonDynamoDBStreamsAdapterClient` 객체를 생성합니다.

   ```
   import software.amazon.dynamodb.streamsadapter.AmazonDynamoDBStreamsAdapterClient; 
   import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
   import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
   
   ...
   
   AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();
   
   AmazonDynamoDBStreamsAdapterClient adapterClient = new AmazonDynamoDBStreamsAdapterClient(
           credentialsProvider, awsRegion);
   ```

1. `ConfigsBuilder` 객체를 생성합니다.

   ```
   import software.amazon.kinesis.common.ConfigsBuilder;
   
   ...
   ConfigsBuilder configsBuilder = new ConfigsBuilder(
                   streamTracker,
                   applicationName,
                   adapterClient,
                   dynamoDbAsyncClient,
                   cloudWatchAsyncClient,
                   UUID.randomUUID().toString(),
                   new StreamsRecordProcessorFactory());
   ```

1. 다음 예제와 같이 `ConfigsBuilder`를 사용하여 `Scheduler`를 생성합니다.

   ```
   import java.util.UUID;
   
   import software.amazon.awssdk.regions.Region;
   import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
   import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
   import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
   
   import software.amazon.kinesis.common.KinesisClientUtil;
   import software.amazon.kinesis.coordinator.Scheduler;
   
   ...
   
                   
   DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
   CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
   
                   
   DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(adapterClient);
   pollingConfig.idleTimeBetweenReadsInMillis(idleTimeBetweenReadsInMillis);
   
   // Use ConfigsBuilder to configure settings
   RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
   retrievalConfig.retrievalSpecificConfig(pollingConfig);
   
   CoordinatorConfig coordinatorConfig = configsBuilder.coordinatorConfig();
   coordinatorConfig.clientVersionConfig(CoordinatorConfig.ClientVersionConfig.CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X);
                   
   Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                   configsBuilder.checkpointConfig(),
                   coordinatorConfig,
                   configsBuilder.leaseManagementConfig(),
                   configsBuilder.lifecycleConfig(),
                   configsBuilder.metricsConfig(),
                   configsBuilder.processorConfig(),
                   retrievalConfig,
                   adapterClient
           );
   ```

**중요**  
`CLIENT_VERSION_CONFIG_COMPATIBLE_WITH_2X` 설정은 DynamoDB Streams Kinesis Adapter KCL v2와 v3가 아닌 KCL v3와 KCL v1 간의 호환성을 유지합니다.

### 4단계: KCL 3.x 구성 개요 및 권장 사항
<a name="step4-configuration-migration"></a>

KCL 1.x 이후에 도입된 KCL 3.x 관련 구성에 대한 자세한 설명은 [KCL 구성](https://docs.aws.amazon.com//streams/latest/dev/kcl-configuration.html) 및 [KCL 마이그레이션 클라이언트 구성](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration.html#client-configuration)을 참조하세요.

**중요**  
KCL 3.x 및 이후 버전에서는 `checkpointConfig`, `coordinatorConfig`, `leaseManagementConfig`, `metricsConfig`, `processorConfig`, `retrievalConfig` 객체를 직접 생성하는 대신 `ConfigsBuilder`를 사용하여 구성을 설정하는 것이 좋습니다. `ConfigsBuilder`는 KCL 애플리케이션을 구성하는 데 더 유연하고 유지 관리가 용이한 방법을 제공합니다.

#### KCL 3.x에서 업데이트 기본값을 사용한 구성
<a name="kcl3-configuration-overview"></a>

`billingMode`  
KCL 버전 1.x에서는 `billingMode`의 기본값이 `PROVISIONED`로 설정됩니다. 그러나 KCL 버전 3.x에서는 기본 `billingMode`가 `PAY_PER_REQUEST`(온디맨드 모드)입니다. 사용량에 따라 용량을 자동으로 조정하려면 리스 테이블에 온디맨드 용량 모드를 사용하는 것이 좋습니다. 리스 테이블에 프로비저닝된 용량을 사용하는 방법에 대한 지침은 [프로비저닝된 용량 모드를 사용하는 리스 테이블의 모범 사례](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-lease-table.html)를 참조하세요.

`idleTimeBetweenReadsInMillis`  
KCL 버전 1.x에서는 `idleTimeBetweenReadsInMillis`의 기본값이 1,000(또는 1초)으로 설정됩니다. KCL 버전 3.x에서는 i`dleTimeBetweenReadsInMillis`의 기본값이 1,500(또는 1.5초)으로 설정되지만 Amazon DynamoDB Streams Kinesis Adapter는 이 기본값을 1,000(또는 1초)으로 재정의합니다.

#### KCL 3.x의 새로운 구성
<a name="kcl3-new-configs"></a>

`leaseAssignmentIntervalMillis`  
이 구성은 새로 검색된 샤드가 처리를 시작하기 전의 시간 간격을 정의하며, 1.5 × `leaseAssignmentIntervalMillis`로 계산됩니다. 이 설정을 명시적으로 구성하지 않으면 시간 간격은 기본적으로 1.5 × `failoverTimeMillis`로 설정됩니다. 새 샤드를 처리하려면 리스 테이블을 스캔하고 리스 테이블에서 글로벌 보조 인덱스(GSI)를 쿼리해야 합니다. `leaseAssignmentIntervalMillis`를 낮추면 이러한 스캔 및 쿼리 작업의 빈도가 증가하여 DynamoDB 비용이 증가합니다. 새 샤드 처리 지연을 최소화하려면 이 값을 2,000(또는 2초)으로 설정하는 것이 좋습니다.

`shardConsumerDispatchPollIntervalMillis`  
이 구성은 상태 전환을 트리거하기 위해 샤드 소비자가 연속 폴링하는 간격을 정의합니다. KCL 버전 1.x에서 이 동작은 구성 가능한 설정으로 노출되지 않은 `idleTimeInMillis` 파라미터에 의해 제어되었습니다. KCL 버전 3.x에서는 KCL 버전 1.x 설정에서 ` idleTimeInMillis`에 사용된 값과 일치하도록 이 구성을 설정하는 것이 좋습니다.

### 5단계: KCL 2.x에서 KCL 3.x로 마이그레이션
<a name="step5-kcl2-to-kcl3"></a>

원활한 전환과 최신 Kinesis Client Library(KCL) 버전과의 호환성을 보장하려면 [KCL 2.x에서 KCL 3.x로 업그레이드](https://docs.aws.amazon.com//streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-worker-metrics)하기 위한 마이그레이션 가이드의 지침 5\$18단계를 따르세요.

일반적인 KCL 3.x 문제 해결에 대한 내용은 [Troubleshooting KCL consumer applications](https://docs.aws.amazon.com//streams/latest/dev/troubleshooting-consumers.html)를 참조하세요.

# 이전 KCL 버전으로 롤백
<a name="kcl-migration-rollback"></a>

이 주제에서는 소비자 애플리케이션을 이전 KCL 버전으로 롤백하는 방법을 설명합니다. 롤백 프로세스는 다음 두 단계로 구성됩니다.

1. [KCL 마이그레이션 도구](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)를 실행합니다.

1. 이전 KCL 버전 코드를 재배포합니다.

## 1단계: KCL 마이그레이션 도구 실행
<a name="kcl-migration-rollback-step1"></a>

이전 KCL 버전으로 롤백해야 하는 경우 KCL 마이그레이션 도구를 실행해야 합니다. 이 도구는 다음 두 가지 중요한 작업을 수행합니다.
+ DynamoDB의 리스 테이블에서 워커 지표 테이블이라고 하는 메타데이터 테이블과 글로벌 보조 인덱스를 제거합니다. 이러한 아티팩트는 KCL 3.x에서 생성되지만 이전 버전으로 롤백하는 경우 필요하지 않습니다.
+ 모든 워커가 KCL 1.x와 호환되는 모드로 실행되고 이전 KCL 버전에서 사용된 로드 밸런싱 알고리즘을 사용하도록 합니다. KCL 3.x의 새 로드 밸런싱 알고리즘에 문제가 있는 경우 해당 문제를 즉시 완화합니다.

**중요**  
DynamoDB의 조정자 상태 테이블은 반드시 존재해야 하며 마이그레이션, 롤백, 롤포워드 프로세스 중에 삭제되어서는 안 됩니다.

**참고**  
소비자 애플리케이션의 모든 워커가 지정된 시간에 동일한 로드 밸런싱 알고리즘을 사용하는 것이 중요합니다. KCL 마이그레이션 도구를 사용하면 KCL 3.x 소비자 애플리케이션의 모든 워커가 KCL 1.x 호환 모드로 전환되므로 이전 KCL 버전으로 애플리케이션을 롤백하는 동안 모든 워커가 동일한 로드 밸런싱 알고리즘을 실행하게 됩니다.

[KCL GitHub 리포지토리](https://github.com/awslabs/amazon-kinesis-client/tree/master)의 스크립트 디렉터리에서 [KCL 마이그레이션 도구](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)를 다운로드할 수 있습니다. 조정자 상태 테이블, 워커 지표 테이블 및 리스 테이블에 쓸 수 있는 적절한 권한이 있는 워커 또는 호스트에서 스크립트를 실행합니다. KCL 소비자 애플리케이션에 대해 적절한 [IAM 권한](https://docs.aws.amazon.com/streams/latest/dev/kcl-iam-permissions.html)이 구성되어 있는지 확인합니다. 지정된 명령을 사용하여 KCL 애플리케이션당 한 번만 스크립트를 실행합니다.

```
python3 ./KclMigrationTool.py --region region --mode rollback [--application_name applicationName] [--lease_table_name leaseTableName] [--coordinator_state_table_name coordinatorStateTableName] [--worker_metrics_table_name workerMetricsTableName]
```

### 파라미터
<a name="kcl-migration-rollback-parameters"></a>

`--region`  
*리전*을 사용자의 AWS 리전으로 바꿉니다.

`--application_name`  
이 파라미터는 DynamoDB 메타데이터 테이블(리스 테이블, 조정자 상태 테이블 및 워커 지표 테이블)의 기본 이름을 사용하는 경우 필요합니다. 이러한 테이블에 사용자 지정 이름을 지정한 경우 이 파라미터를 생략할 수 있습니다. *applicationName*을 실제 KCL 애플리케이션 이름으로 바꿉니다. 이 도구는 사용자 지정 이름이 제공되지 않은 경우 이 이름을 사용하여 기본 테이블 이름을 파생합니다.

`--lease_table_name`  
이 파라미터는 KCL 구성에서 리스 테이블에 사용자 지정 이름을 설정한 경우에 필요합니다. 기본 테이블 이름을 사용하는 경우 이 파라미터를 생략할 수 있습니다. *leaseTableName*을 리스 테이블에 지정한 사용자 지정 테이블 이름으로 바꿉니다.

`--coordinator_state_table_name`  
이 파라미터는 KCL 구성에서 조정자 상태 테이블에 사용자 지정 이름을 설정한 경우에 필요합니다. 기본 테이블 이름을 사용하는 경우 이 파라미터를 생략할 수 있습니다. *coordinatorStateTableName*을 조정자 상태 테이블에 지정한 사용자 지정 테이블 이름으로 바꿉니다.

`--worker_metrics_table_name`  
이 파라미터는 KCL 구성에서 워커 지표 테이블에 사용자 지정 이름을 설정한 경우에 필요합니다. 기본 테이블 이름을 사용하는 경우 이 파라미터를 생략할 수 있습니다. *workerMetricsTableName*을 워커 지표 테이블에 지정한 사용자 지정 테이블 이름으로 바꿉니다.

## 2단계: 이전 KCL 버전으로 코드 재배포
<a name="kcl-migration-rollback-step2"></a>

**중요**  
KCL 마이그레이션 도구에서 생성된 출력에 버전 2.x가 언급된 경우 KCL 버전 1.x를 의미하는 것으로 해석해야 합니다. 스크립트를 실행해도 전체 롤백이 수행되는 것은 아니며, 로드 밸런싱 알고리즘만 KCL 버전 1.x에서 사용된 알고리즘으로 전환됩니다.

롤백을 위해 KCL 마이그레이션 도구를 실행하면 다음 메시지 중 하나가 표시됩니다.

메시지 1  
"Rollback completed. Your application was running 2x compatible functionality. Please rollback to your previous application binaries by deploying the code with your previous KCL version."  
**필요한 작업:** 이는 워커가 KCL 1.x 호환 모드로 실행 중이었음을 의미합니다. 이전 KCL 버전으로 워커에 코드를 재배포합니다.

메시지 2  
"Rollback completed. Your KCL Application was running 3x functionality and will rollback to 2x compatible functionality. If you don't see mitigation after a short period of time, please rollback to your previous application binaries by deploying the code with your previous KCL version."  
**필요한 작업:** 이는 워커가 KCL 3.x 모드로 실행 중이었고 KCL 마이그레이션 도구가 모든 워커를 KCL 1.x 호환 모드로 전환했음을 의미합니다. 이전 KCL 버전으로 워커에 코드를 재배포합니다.

메시지 3  
"Application was already rolled back. Any KCLv3 resources that could be deleted were cleaned up to avoid charges until the application can be rolled forward with migration."  
**필요한 작업:** 이는 워커가 KCL 1.x 호환 모드로 실행되도록 이미 롤백되었음을 의미합니다. 이전 KCL 버전으로 워커에 코드를 재배포합니다.

# 롤백 후 KCL 3.x로 롤포워드
<a name="kcl-migration-rollforward"></a>

이 주제에서는 롤백 후 소비자 애플리케이션을 KCL 3.x로 롤포워드하는 방법을 설명합니다. 롤포워드가 필요한 경우 2단계 프로세스를 완료해야 합니다.

1. [KCL 마이그레이션 도구](https://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/scripts/KclMigrationTool.py)를 실행합니다.

1. KCL 3.x로 코드를 배포합니다.

## 1단계: KCL 마이그레이션 도구 실행
<a name="kcl-migration-rollforward-step1"></a>

다음 명령으로 KCL 마이그레이션 도구를 실행하여 KCL 3.x로 롤포워드합니다.

```
python3 ./KclMigrationTool.py --region region --mode rollforward [--application_name applicationName] [--coordinator_state_table_name coordinatorStateTableName]
```

### 파라미터
<a name="kcl-migration-rollforward-parameters"></a>

`--region`  
*리전*을 사용자의 AWS 리전으로 바꿉니다.

`--application_name`  
조정자 상태 테이블에 기본 이름을 사용하는 경우 이 파라미터가 필요합니다. 조정자 상태 테이블에 사용자 지정 이름을 지정한 경우 이 파라미터를 생략할 수 있습니다. *applicationName*을 실제 KCL 애플리케이션 이름으로 바꿉니다. 이 도구는 사용자 지정 이름이 제공되지 않은 경우 이 이름을 사용하여 기본 테이블 이름을 파생합니다.

`--coordinator_state_table_name`  
이 파라미터는 KCL 구성에서 조정자 상태 테이블에 사용자 지정 이름을 설정한 경우에 필요합니다. 기본 테이블 이름을 사용하는 경우 이 파라미터를 생략할 수 있습니다. *coordinatorStateTableName*을 조정자 상태 테이블에 지정한 사용자 지정 테이블 이름으로 바꿉니다.

롤포워드 모드로 마이그레이션 도구를 실행한 후 KCL은 KCL 3.x에 필요한 다음과 같은 DynamoDB 리소스를 생성합니다.
+ 리스 테이블의 글로벌 보조 인덱스
+ 워커 지표 테이블

## 2단계: KCL 3.x로 코드 배포
<a name="kcl-migration-rollforward-step2"></a>

롤포워드를 위해 KCL 마이그레이션 도구를 실행한 후 KCL 3.x로 워커에 코드를 배포합니다. 마이그레이션을 완료하려면 [8단계: 마이그레이션 완료](https://docs.aws.amazon.com/streams/latest/dev/kcl-migration-from-2-3.html#kcl-migration-from-2-3-finish)를 참조하세요.

# 연습: DynamoDB Streams Kinesis 어댑터
<a name="Streams.KCLAdapter.Walkthrough"></a>

이번 섹션에서는 Amazon Kinesis Client Library와 Amazon DynamoDB Streams Kinesis 어댑터를 사용하는 Java 애플리케이션에 대해 살펴보겠습니다. 이 애플리케이션은 한 테이블의 쓰기 작업이 두 번째 테이블에도 적용되면서 두 테이블의 내용이 동기화를 유지하는 데이터 복제의 예로 설명됩니다. 소스 코드는 [전체 프로그램: DynamoDB Streams Kinesis 어댑터](Streams.KCLAdapter.Walkthrough.CompleteProgram.md) 섹션을 참조하세요.

이 프로그램에서는 다음 작업을 수행합니다.

1. `KCL-Demo-src`와 `KCL-Demo-dst`라는 이름의 DynamoDB 테이블 2개를 생성합니다. 두 테이블 모두 스트림이 활성화되어 있습니다.

1. 항목을 추가, 업데이트 및 삭제하여 원본 테이블을 업데이트합니다. 이렇게 하면 데이터가 테이블의 스트림으로 기록됩니다.

1. 스트림에서 레코드를 읽고 DynamoDB 요청으로 재작성한 다음 대상 테이블에 요청을 적용합니다.

1. 원본 테이블과 대상 테이블을 스캔하여 내용이 동일한지 확인합니다.

1. 두 테이블을 삭제합니다.

이러한 단계는 다음 섹션에서 설명하며, 전체 애플리케이션은 연습 끝에 나와 있습니다.

**Topics**
+ [1단계: DynamoDB 테이블 생성](#Streams.KCLAdapter.Walkthrough.Step1)
+ [2단계: 소스 테이블의 업데이트 활동 생성](#Streams.KCLAdapter.Walkthrough.Step2)
+ [3단계: 스트림 처리](#Streams.KCLAdapter.Walkthrough.Step3)
+ [4단계: 양 테이블에 동일한 콘텐츠가 있는지 확인](#Streams.KCLAdapter.Walkthrough.Step4)
+ [5단계: 정리](#Streams.KCLAdapter.Walkthrough.Step5)
+ [전체 프로그램: DynamoDB Streams Kinesis 어댑터](Streams.KCLAdapter.Walkthrough.CompleteProgram.md)

## 1단계: DynamoDB 테이블 생성
<a name="Streams.KCLAdapter.Walkthrough.Step1"></a>

첫 번째 단계에서 두 개의 DynamoDB 테이블(소스 테이블과 대상 테이블)을 생성합니다. 원본 테이블 스트림의 `StreamViewType`은 `NEW_IMAGE`입니다. 이 말은 원본 테이블 항목이 변경될 때마다 항목의 "사후" 이미지가 스트림에 기록된다는 것을 의미합니다. 이러한 방식으로 스트림이 테이블의 모든 쓰기 작업을 추적합니다.

다음은 두 테이블 생성에 사용된 코드를 보여주는 예제입니다.

```
java.util.List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>();
attributeDefinitions.add(new AttributeDefinition().withAttributeName("Id").withAttributeType("N"));

java.util.List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>();
keySchema.add(new KeySchemaElement().withAttributeName("Id").withKeyType(KeyType.HASH)); // Partition
                                                                                         // key

ProvisionedThroughput provisionedThroughput = new ProvisionedThroughput().withReadCapacityUnits(2L)
    .withWriteCapacityUnits(2L);

StreamSpecification streamSpecification = new StreamSpecification();
streamSpecification.setStreamEnabled(true);
streamSpecification.setStreamViewType(StreamViewType.NEW_IMAGE);
CreateTableRequest createTableRequest = new CreateTableRequest().withTableName(tableName)
    .withAttributeDefinitions(attributeDefinitions).withKeySchema(keySchema)
    .withProvisionedThroughput(provisionedThroughput).withStreamSpecification(streamSpecification);
```

## 2단계: 소스 테이블의 업데이트 활동 생성
<a name="Streams.KCLAdapter.Walkthrough.Step2"></a>

다음 단계는 원본 테이블의 쓰기 작업입니다. 이 작업을 하면 원본 테이블의 스트림 역시 거의 실시간으로 업데이트됩니다.

이 애플리케이션은 데이터 기록을 위해 `PutItem`, `UpdateItem` 및 `DeleteItem` API 작업을 호출하는 메서드를 사용하여 헬퍼 클래스를 정의합니다. 다음은 이러한 메서드의 사용 방법을 나타낸 코드 예제입니다.

```
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "101", "test1");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "101", "test2");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "101");
StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName, "102", "demo3");
StreamsAdapterDemoHelper.updateItem(dynamoDBClient, tableName, "102", "demo4");
StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName, "102");
```

## 3단계: 스트림 처리
<a name="Streams.KCLAdapter.Walkthrough.Step3"></a>

이제 프로그램이 스트림을 처리합니다. DynamoDB Streams Kinesis 어댑터가 KCL과 DynamoDB Streams 엔드포인트 사이에서 투명 계층의 역할을 하기 때문에 코드에서 하위 수준 DynamoDB Streams를 호출할 필요 없이 KCL을 최대한 이용할 수 있습니다. 프로그램이 실행하는 작업은 다음과 같습니다.
+ KCL 인터페이스 정의를 준수하는 메서드인 `StreamsRecordProcessor`, `initialize` 및 `processRecords`을 사용하여 레코드 프로세서 클래스인 `shutdown`를 정의합니다. `processRecords` 메서드에는 원본 테이블의 스트림에서 데이터를 읽어 대상 테이블에 기록하는 데 필요한 로직이 저장됩니다.
+ 레코드 프로세서 클래스의 클래스 팩토리(`StreamsRecordProcessorFactory`)를 정의합니다. Java 프로그램이 KCL을 사용하려면 이 팩토리가 필요합니다.
+ 새로운 KCL `Worker`를 인스턴스화하여 클래스 팩토리와 연동시킵니다.
+ 레코드 처리가 완료되면 `Worker`를 종료합니다.

선택적으로 스트림 KCL 어댑터 구성에서 캐치업 모드를 활성화하면 스트림 처리 지연이 1분(기본값)을 초과할 경우 GetRecords API 직접 호출 속도가 자동으로 3배(기본값)로 조정되어 스트림 소비자가 테이블의 높은 처리량 급증을 처리하는 데 도움이 됩니다.

KCL 인터페이스 정의에 대한 자세한 내용은 *Amazon Kinesis Data Streams 개발자 안내서*의 [Kinesis Client Library를 사용하여 소비자 개발](https://docs.aws.amazon.com/kinesis/latest/dev/developing-consumers-with-kcl.html)을 참조하세요.

다음은 `StreamsRecordProcessor`의 메인 루프를 나타낸 코드 예제입니다. `case` 문은 스트림 레코드에 표시되는 `OperationType`에 따라 어떤 작업을 실행할지 결정합니다.

```
for (Record record : records) {
    String data = new String(record.getData().array(), Charset.forName("UTF-8"));
    System.out.println(data);
    if (record instanceof RecordAdapter) {
                software.amazon.dynamodb.model.Record streamRecord = ((RecordAdapter) record)
                    .getInternalObject();

                switch (streamRecord.getEventName()) {
                    case "INSERT":
                    case "MODIFY":
                        StreamsAdapterDemoHelper.putItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getNewImage());
                        break;
                    case "REMOVE":
                        StreamsAdapterDemoHelper.deleteItem(dynamoDBClient, tableName,
                            streamRecord.getDynamodb().getKeys().get("Id").getN());
                }
    }
    checkpointCounter += 1;
    if (checkpointCounter % 10 == 0) {
        try {
            checkpointer.checkpoint();
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}
```

## 4단계: 양 테이블에 동일한 콘텐츠가 있는지 확인
<a name="Streams.KCLAdapter.Walkthrough.Step4"></a>

이 시점에서는 원본 테이블과 대상 테이블의 내용이 동기화 상태를 유지합니다. 애플리케이션이 두 테이블에 대해 `Scan` 요청을 하여 내용이 실제로 동일한지 확인합니다.

`DemoHelper` 클래스에는 하위 수준 `ScanTable` API를 호출하는 `Scan` 메서드가 포함되어 있습니다. 다음 예제는 이 작업을 수행하는 방법을 보여줍니다.

```
if (StreamsAdapterDemoHelper.scanTable(dynamoDBClient, srcTable).getItems()
    .equals(StreamsAdapterDemoHelper.scanTable(dynamoDBClient, destTable).getItems())) {
    System.out.println("Scan result is equal.");
}
else {
    System.out.println("Tables are different!");
}
```

## 5단계: 정리
<a name="Streams.KCLAdapter.Walkthrough.Step5"></a>

데모가 완료되면 애플리케이션이 원본 테이블과 대상 테이블을 삭제합니다. 다음 코드 예제를 참조하세요. 하지만 테이블이 삭제된 후에도 스트림은 최대 24시간까지 사용 가능하며, 이 시간이 지나면 자동 삭제됩니다.

```
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(srcTable));
dynamoDBClient.deleteTable(new DeleteTableRequest().withTableName(destTable));
```

# 전체 프로그램: DynamoDB Streams Kinesis 어댑터
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram"></a>

다음은 [연습: DynamoDB Streams Kinesis 어댑터](Streams.KCLAdapter.Walkthrough.md)에서 설명한 작업을 실행하는 전체 Java 프로그램입니다. 프로그램을 실행하면 다음과 비슷한 출력 화면이 보여야 합니다.

```
Creating table KCL-Demo-src
Creating table KCL-Demo-dest
Table is active.
Creating worker for stream: arn:aws:dynamodb:us-west-2:111122223333:table/KCL-Demo-src/stream/2015-05-19T22:48:56.601
Starting worker...
Scan result is equal.
Done.
```

**중요**  
 이 프로그램을 실행하려면 클라이언트 애플리케이션이 정책을 사용하여 DynamoDB 및 Amazon CloudWatch에 액세스할 수 있어야 합니다. 자세한 내용은 [DynamoDB에 대한 자격 증명 기반 정책](security_iam_service-with-iam.md#security_iam_service-with-iam-id-based-policies) 섹션을 참조하세요.

소스 코드는 4개의 `.java` 파일로 구성됩니다. 이 프로그램을 빌드하려면 Amazon Kinesis Client Library(KCL) 3.x 및 AWS SDK for Java v2를 전이적 종속성으로 포함하는 다음 종속성을 추가합니다.

------
#### [ Maven ]

```
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>dynamodb-streams-kinesis-adapter</artifactId>
    <version>2.1.0</version>
</dependency>
```

------
#### [ Gradle ]

```
implementation 'com.amazonaws:dynamodb-streams-kinesis-adapter:2.1.0'
```

------

소스 파일 준비:
+ `StreamsAdapterDemo.java`
+ `StreamsRecordProcessor.java`
+ `StreamsRecordProcessorFactory.java`
+ `StreamsAdapterDemoHelper.java`

## StreamsAdapterDemo.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemo"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.AmazonDynamoDBStreamsAdapterClient;
import com.amazonaws.services.dynamodbv2.streamsadapter.StreamsSchedulerFactory;
import com.amazonaws.services.dynamodbv2.streamsadapter.polling.DynamoDBStreamsPollingConfig;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.kinesis.common.ConfigsBuilder;
import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.coordinator.Scheduler;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;
import software.amazon.kinesis.processor.StreamTracker;
import software.amazon.kinesis.retrieval.RetrievalConfig;

public class StreamsAdapterDemo {

    private static DynamoDbAsyncClient dynamoDbAsyncClient;
    private static CloudWatchAsyncClient cloudWatchAsyncClient;
    private static AmazonDynamoDBStreamsAdapterClient amazonDynamoDbStreamsAdapterClient;

    private static String tablePrefix = "KCL-Demo";
    private static String streamArn;

    private static Region region = Region.US_EAST_1;
    private static AwsCredentialsProvider credentialsProvider = DefaultCredentialsProvider.create();

    public static void main( String[] args ) throws Exception {
        System.out.println("Starting demo...");
        dynamoDbAsyncClient = DynamoDbAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        cloudWatchAsyncClient = CloudWatchAsyncClient.builder()
                .credentialsProvider(credentialsProvider)
                .region(region)
                .build();
        amazonDynamoDbStreamsAdapterClient = new AmazonDynamoDBStreamsAdapterClient(credentialsProvider, region);

        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";

        setUpTables();

        StreamTracker streamTracker = StreamsSchedulerFactory.createSingleStreamTracker(streamArn,
                InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON));

        ShardRecordProcessorFactory shardRecordProcessorFactory =
                new StreamsAdapterDemoProcessorFactory(dynamoDbAsyncClient, destTable);

        ConfigsBuilder configsBuilder = new ConfigsBuilder(
                streamTracker,
                "streams-adapter-demo",
                amazonDynamoDbStreamsAdapterClient,
                dynamoDbAsyncClient,
                cloudWatchAsyncClient,
                "streams-demo-worker",
                shardRecordProcessorFactory
        );

        DynamoDBStreamsPollingConfig pollingConfig = new DynamoDBStreamsPollingConfig(amazonDynamoDbStreamsAdapterClient);
        RetrievalConfig retrievalConfig = configsBuilder.retrievalConfig();
        retrievalConfig.retrievalSpecificConfig(pollingConfig);

        System.out.println("Creating scheduler for stream " + streamArn);
        Scheduler scheduler = StreamsSchedulerFactory.createScheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                retrievalConfig,
                amazonDynamoDbStreamsAdapterClient
        );

        System.out.println("Starting scheduler...");
        Thread t = new Thread(scheduler);
        t.start();

        Thread.sleep(250000);

        System.out.println("Stopping scheduler...");
        scheduler.shutdown();
        t.join();

        if (StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, srcTable).items()
                .equals(StreamsAdapterDemoHelper.scanTable(dynamoDbAsyncClient, destTable).items())) {
            System.out.println("Scan result is equal.");
        } else {
            System.out.println("Tables are different!");
        }

        System.out.println("Done.");
        cleanupAndExit(0);
    }

    private static void setUpTables() {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        streamArn = StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, srcTable);
        StreamsAdapterDemoHelper.createTable(dynamoDbAsyncClient, destTable);

        awaitTableCreation(srcTable);

        performOps(srcTable);
    }

    private static void awaitTableCreation(String tableName) {
        Integer retries = 0;
        Boolean created = false;
        while (!created && retries < 100) {
            DescribeTableResponse result = StreamsAdapterDemoHelper.describeTable(dynamoDbAsyncClient, tableName);
            created = result.table().tableStatusAsString().equals("ACTIVE");
            if (created) {
                System.out.println("Table is active.");
                return;
            } else {
                retries++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // do nothing
                }
            }
        }
        System.out.println("Timeout after table creation. Exiting...");
        cleanupAndExit(1);
    }

    private static void performOps(String tableName) {
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "101", "test1");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "101", "test2");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "101");
        StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName, "102", "demo3");
        StreamsAdapterDemoHelper.updateItem(dynamoDbAsyncClient, tableName, "102", "demo4");
        StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName, "102");
    }

    private static void cleanupAndExit(Integer returnValue) {
        String srcTable = tablePrefix + "-src";
        String destTable = tablePrefix + "-dest";
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(srcTable).build());
        dynamoDbAsyncClient.deleteTable(DeleteTableRequest.builder().tableName(destTable).build());
        System.exit(returnValue);
    }
}
```

## StreamsRecordProcessor.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessor"></a>

```
package com.amazonaws.codesamples;

import com.amazonaws.services.dynamodbv2.streamsadapter.adapter.DynamoDBStreamsClientRecord;
import com.amazonaws.services.dynamodbv2.streamsadapter.model.DynamoDBStreamsProcessRecordsInput;
import com.amazonaws.services.dynamodbv2.streamsadapter.processor.DynamoDBStreamsShardRecordProcessor;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.kinesis.exceptions.InvalidStateException;
import software.amazon.kinesis.exceptions.ShutdownException;
import software.amazon.kinesis.lifecycle.events.InitializationInput;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;

public class StreamsRecordProcessor implements DynamoDBStreamsShardRecordProcessor {

    private Integer checkpointCounter;

    private final DynamoDbAsyncClient dynamoDbAsyncClient;
    private final String tableName;

    public StreamsRecordProcessor(DynamoDbAsyncClient dynamoDbAsyncClient, String tableName) {
        this.dynamoDbAsyncClient = dynamoDbAsyncClient;
        this.tableName = tableName;
    }

    @Override
    public void initialize(InitializationInput initializationInput) {
        this.checkpointCounter = 0;
    }

    @Override
    public void processRecords(DynamoDBStreamsProcessRecordsInput dynamoDBStreamsProcessRecordsInput) {
        for (DynamoDBStreamsClientRecord record: dynamoDBStreamsProcessRecordsInput.records()) {
            String data = new String(record.data().array(), StandardCharsets.UTF_8);
            System.out.println(data);
            Record streamRecord = record.getRecord();

            switch (streamRecord.eventName()) {
                case INSERT:
                case MODIFY:
                    StreamsAdapterDemoHelper.putItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().newImage());
                case REMOVE:
                    StreamsAdapterDemoHelper.deleteItem(dynamoDbAsyncClient, tableName,
                            streamRecord.dynamodb().keys().get("Id").n());
            }
            checkpointCounter += 1;
            if (checkpointCounter % 10 == 0) {
                try {
                    dynamoDBStreamsProcessRecordsInput.checkpointer().checkpoint();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {
        System.out.println("Lease Lost");
    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            e.printStackTrace();
        }
    }
}
```

## StreamsRecordProcessorFactory.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsRecordProcessorFactory"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.processor.ShardRecordProcessorFactory;

public class StreamsAdapterDemoProcessorFactory implements ShardRecordProcessorFactory {
    private final String tableName;
    private final DynamoDbAsyncClient dynamoDbAsyncClient;

    public StreamsAdapterDemoProcessorFactory(DynamoDbAsyncClient asyncClient, String tableName) {
        this.tableName = tableName;
        this.dynamoDbAsyncClient = asyncClient;
    }

    @Override
    public ShardRecordProcessor shardRecordProcessor() {
        return new StreamsRecordProcessor(dynamoDbAsyncClient, tableName);
    }
}
```

## StreamsAdapterDemoHelper.java
<a name="Streams.KCLAdapter.Walkthrough.CompleteProgram.StreamsAdapterDemoHelper"></a>

```
package com.amazonaws.codesamples;

import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.OnDemandThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ResourceInUseException;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanResponse;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.StreamViewType;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class StreamsAdapterDemoHelper {

    /**
     * @return StreamArn
     */
    public static String createTable(DynamoDbAsyncClient client, String tableName) {
        List<AttributeDefinition> attributeDefinitions = new ArrayList<>();
        attributeDefinitions.add(AttributeDefinition.builder()
                .attributeName("Id")
                .attributeType("N")
                .build());

        List<KeySchemaElement> keySchema = new ArrayList<>();
        keySchema.add(KeySchemaElement.builder()
                .attributeName("Id")
                .keyType(KeyType.HASH) // Partition key
                .build());

        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType(StreamViewType.NEW_IMAGE)
                .build();

        CreateTableRequest createTableRequest = CreateTableRequest.builder()
                .tableName(tableName)
                .attributeDefinitions(attributeDefinitions)
                .keySchema(keySchema)
                .billingMode(BillingMode.PAY_PER_REQUEST)
                .streamSpecification(streamSpecification)
                .build();

        try {
            System.out.println("Creating table " + tableName);
            CreateTableResponse result = client.createTable(createTableRequest).join();
            return result.tableDescription().latestStreamArn();
        } catch (Exception e) {
            if (e.getCause() instanceof ResourceInUseException) {
                System.out.println("Table already exists.");
                return describeTable(client, tableName).table().latestStreamArn();
            }
            throw e;
        }
    }

    public static DescribeTableResponse describeTable(DynamoDbAsyncClient client, String tableName) {
        return client.describeTable(DescribeTableRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static ScanResponse scanTable(DynamoDbAsyncClient dynamoDbClient, String tableName) {
        return dynamoDbClient.scan(ScanRequest.builder()
                        .tableName(tableName)
                        .build())
                .join();
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put("Id", AttributeValue.builder().n(id).build());
        item.put("attribute-1", AttributeValue.builder().s(val).build());

        putItem(dynamoDbClient, tableName, item);
    }

    public static void putItem(DynamoDbAsyncClient dynamoDbClient, String tableName,
                               Map<String, AttributeValue> items) {
        PutItemRequest putItemRequest = PutItemRequest.builder()
                .tableName(tableName)
                .item(items)
                .build();
        dynamoDbClient.putItem(putItemRequest).join();
    }

    public static void updateItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id, String val) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        Map<String, String> expressionAttributeNames = new HashMap<>();
        expressionAttributeNames.put("#attr2", "attribute-2");

        Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
        expressionAttributeValues.put(":val", AttributeValue.builder().s(val).build());

        UpdateItemRequest updateItemRequest = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .updateExpression("SET #attr2 = :val")
                .expressionAttributeNames(expressionAttributeNames)
                .expressionAttributeValues(expressionAttributeValues)
                .build();

        dynamoDbClient.updateItem(updateItemRequest).join();
    }

    public static void deleteItem(DynamoDbAsyncClient dynamoDbClient, String tableName, String id) {
        Map<String, AttributeValue> key = new HashMap<>();
        key.put("Id", AttributeValue.builder().n(id).build());

        DeleteItemRequest deleteItemRequest = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(key)
                .build();
        dynamoDbClient.deleteItem(deleteItemRequest).join();
    }
}
```

# DynamoDB Streams 하위 수준 API: Java 예시
<a name="Streams.LowLevel.Walkthrough"></a>

**참고**  
이 페이지의 코드는 완전한 코드가 아니며, Amazon DynamoDB Streams 사용 시나리오 중 일부만 처리합니다. DynamoDB에서 스트림 레코드를 사용하기 위해 권장되는 방법은 [DynamoDB Streams Kinesis 어댑터를 사용하여 스트림 레코드 처리](Streams.KCLAdapter.md) 섹션에 기술된 것처럼 Amazon Kinesis 어댑터를 통해 Kinesis Client Library(KCL)를 사용하는 것입니다.

이 섹션에는 실행 중인 DynamoDB Streams를 보여 주는 Java 프로그램이 나와 있습니다. 이 프로그램에서는 다음 작업을 수행합니다.

1. 스트림을 활성화하여 DynamoDB 테이블을 생성합니다.

1. 이 테이블의 스트림 설정을 설명합니다.

1. 테이블의 데이터를 수정합니다.

1. 스트림의 샤드를 설명합니다.

1. 샤드의 스트림 레코드를 읽습니다.

1. 하위 샤드를 가져오고 레코드를 계속 읽습니다.

1. 그리고 정리합니다.

프로그램을 실행하면 다음과 비슷한 출력 화면이 나타납니다.

```
Testing Streams Demo
Creating an Amazon DynamoDB table TestTableForStreams with a simple primary key: Id
Waiting for TestTableForStreams to be created...
Current stream ARN for TestTableForStreams: arn:aws:dynamodb:us-east-2:123456789012:table/TestTableForStreams/stream/2018-03-20T16:49:55.208
Stream enabled: true
Update view type: NEW_AND_OLD_IMAGES

Performing write activities on TestTableForStreams
Processing item 1 of 100
Processing item 2 of 100
Processing item 3 of 100
...
Processing item 100 of 100
Shard: {ShardId: shardId-1234567890-...,SequenceNumberRange: {StartingSequenceNumber: 100002572486797508907,},}
    Shard iterator: EjYFEkX2a26eVTWe...
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2000001584047545833909, SizeBytes=22, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, NewImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=New Item!), Id=AttributeValue(S=4)}, SequenceNumber=2100003604869767892701, SizeBytes=55, StreamViewType=NEW_AND_OLD_IMAGES)
        StreamRecord(ApproximateCreationDateTime=2025-04-09T13:11:58Z, Keys={Id=AttributeValue(S=4)}, OldImage={Message=AttributeValue(S=This is an updated item), Id=AttributeValue(S=4)}, SequenceNumber=2200001099771112898434, SizeBytes=36, StreamViewType=NEW_AND_OLD_IMAGES)
...
Deleting the table...
Table StreamsDemoTable deleted.
Demo complete
```

**Example 예제**  

```
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import software.amazon.awssdk.core.waiters.WaiterResponse;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeAction;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.AttributeValueUpdate;
import software.amazon.awssdk.services.dynamodb.model.BillingMode;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.CreateTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DeleteItemRequest;
import software.amazon.awssdk.services.dynamodb.model.DeleteTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeStreamResponse;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableResponse;
import software.amazon.awssdk.services.dynamodb.model.DynamoDbException;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsRequest;
import software.amazon.awssdk.services.dynamodb.model.GetRecordsResponse;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.dynamodb.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.Record;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.Shard;
import software.amazon.awssdk.services.dynamodb.model.ShardFilter;
import software.amazon.awssdk.services.dynamodb.model.ShardFilterType;
import software.amazon.awssdk.services.dynamodb.model.ShardIteratorType;
import software.amazon.awssdk.services.dynamodb.model.StreamSpecification;
import software.amazon.awssdk.services.dynamodb.model.TableDescription;
import software.amazon.awssdk.services.dynamodb.model.UpdateItemRequest;
import software.amazon.awssdk.services.dynamodb.streams.DynamoDbStreamsClient;
import software.amazon.awssdk.services.dynamodb.waiters.DynamoDbWaiter;

public class StreamsLowLevelDemo {


    public static void main(String[] args) {
        final String usage = "Testing Streams Demo";
        try {
            System.out.println(usage);

            String tableName = "StreamsDemoTable";
            String key = "Id";
            System.out.println("Creating an Amazon DynamoDB table " + tableName + " with a simple primary key: " + key);
            Region region = Region.US_WEST_2;
            DynamoDbClient ddb = DynamoDbClient.builder()
                    .region(region)
                    .build();

            DynamoDbStreamsClient ddbStreams = DynamoDbStreamsClient.builder()
                    .region(region)
                    .build();
            DescribeTableRequest describeTableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
            TableDescription tableDescription = null;
            try{
                tableDescription = ddb.describeTable(describeTableRequest).table();
            }catch (Exception e){
                System.out.println("Table " + tableName + " does not exist.");
                tableDescription = createTable(ddb, tableName, key);
            }

            // Print the stream settings for the table
            String streamArn = tableDescription.latestStreamArn();
           
            StreamSpecification streamSpec = tableDescription.streamSpecification();
            System.out.println("Current stream ARN for " + tableDescription.tableName() + ": " +
                   streamArn);
            System.out.println("Stream enabled: " + streamSpec.streamEnabled());
            System.out.println("Update view type: " + streamSpec.streamViewType());
            System.out.println();
            // Generate write activity in the table
            System.out.println("Performing write activities on " + tableName);
            int maxItemCount = 100;
            for (Integer i = 1; i <= maxItemCount; i++) {
                System.out.println("Processing item " + i + " of " + maxItemCount);
                // Write a new item
                putItemInTable(key, i, tableName, ddb);
                // Update the item
                updateItemInTable(key, i, tableName, ddb);
                // Delete the item
                deleteDynamoDBItem(key, i, tableName, ddb);
            }

            // Process Stream
            processStream(streamArn, maxItemCount, ddb, ddbStreams, tableName);

            // Delete the table
            System.out.println("Deleting the table...");
            DeleteTableRequest deleteTableRequest = DeleteTableRequest.builder()
                    .tableName(tableName)
                    .build();
            ddb.deleteTable(deleteTableRequest);
            System.out.println("Table " + tableName + " deleted.");
            System.out.println("Demo complete");
            ddb.close();
        } catch (Exception e) {
            System.out.println("Error: " + e.getMessage());
        }
    }

    private static void processStream(String streamArn, int maxItemCount, DynamoDbClient ddb, DynamoDbStreamsClient ddbStreams, String tableName) {
        // Get all the shard IDs from the stream. Note that DescribeStream returns
        // the shard IDs one page at a time.
        String lastEvaluatedShardId = null;
        do {
            DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder()
                    .streamArn(streamArn)
                    .exclusiveStartShardId(lastEvaluatedShardId).build();
            DescribeStreamResponse describeStreamResponse = ddbStreams.describeStream(describeStreamRequest);

            List<Shard> shards = describeStreamResponse.streamDescription().shards();

            // Process each shard on this page

            fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, shards);

            // If LastEvaluatedShardId is set, then there is
            // at least one more page of shard IDs to retrieve
            lastEvaluatedShardId = describeStreamResponse.streamDescription().lastEvaluatedShardId();

        } while (lastEvaluatedShardId != null);

    }

    private static void fetchShardsAndReadRecords(String streamArn, int maxItemCount, DynamoDbStreamsClient ddbStreams, List<Shard> shards) {
        for (Shard shard : shards) {
            String shardId = shard.shardId();
            System.out.println("Shard: " + shard);

            // Get an iterator for the current shard
            GetShardIteratorRequest shardIteratorRequest = GetShardIteratorRequest.builder()
                    .streamArn(streamArn).shardId(shardId)
                    .shardIteratorType(ShardIteratorType.TRIM_HORIZON).build();

            GetShardIteratorResponse getShardIteratorResult = ddbStreams.getShardIterator(shardIteratorRequest);

            String currentShardIter = getShardIteratorResult.shardIterator();

            // Shard iterator is not null until the Shard is sealed (marked as READ_ONLY).
            // To prevent running the loop until the Shard is sealed, we process only the
            // items that were written into DynamoDB and then exit.
            int processedRecordCount = 0;
            while (currentShardIter != null && processedRecordCount < maxItemCount) {
                // Use the shard iterator to read the stream records
                GetRecordsRequest getRecordsRequest = GetRecordsRequest.builder()
                        .shardIterator(currentShardIter).build();
                GetRecordsResponse getRecordsResult = ddbStreams.getRecords(getRecordsRequest);
                List<Record> records = getRecordsResult.records();
                for (Record record : records) {
                    System.out.println("        " + record.dynamodb());
                }
                processedRecordCount += records.size();
                currentShardIter = getRecordsResult.nextShardIterator();
            }
            if (currentShardIter == null){
                System.out.println("Shard has been fully processed. Shard iterator is null.");
                System.out.println("Fetch the child shard to continue processing instead of bulk fetching all shards");
                DescribeStreamRequest describeStreamRequestForChildShards = DescribeStreamRequest.builder()
                        .streamArn(streamArn)
                        .shardFilter(ShardFilter.builder()
                                .type(ShardFilterType.CHILD_SHARDS)
                                .shardId(shardId).build())
                        .build();
                DescribeStreamResponse describeStreamResponseChildShards = ddbStreams.describeStream(describeStreamRequestForChildShards);
                fetchShardsAndReadRecords(streamArn, maxItemCount, ddbStreams, describeStreamResponseChildShards.streamDescription().shards());
            }
        }
    }

    private static void putItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {
        Map<String, AttributeValue> item = new HashMap<>();
        item.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());
        item.put("Message", AttributeValue.builder()
                .s("New Item!")
                .build());
        PutItemRequest request = PutItemRequest.builder()
                .tableName(tableName)
                .item(item)
                .build();
        ddb.putItem(request);
    }

    private static void updateItemInTable(String key, Integer i, String tableName, DynamoDbClient ddb) {

        HashMap<String, AttributeValue> itemKey = new HashMap<>();
        itemKey.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());


        HashMap<String, AttributeValueUpdate> updatedValues = new HashMap<>();
        updatedValues.put("Message", AttributeValueUpdate.builder()
                .value(AttributeValue.builder().s("This is an updated item").build())
                .action(AttributeAction.PUT)
                .build());

        UpdateItemRequest request = UpdateItemRequest.builder()
                .tableName(tableName)
                .key(itemKey)
                .attributeUpdates(updatedValues)
                .build();
        ddb.updateItem(request);
    }

    public static void deleteDynamoDBItem(String key, Integer i, String tableName, DynamoDbClient ddb) {
        HashMap<String, AttributeValue> keyToGet = new HashMap<>();
        keyToGet.put(key, AttributeValue.builder()
                .s(i.toString())
                .build());

        DeleteItemRequest deleteReq = DeleteItemRequest.builder()
                .tableName(tableName)
                .key(keyToGet)
                .build();
        ddb.deleteItem(deleteReq);
    }

    public static TableDescription createTable(DynamoDbClient ddb, String tableName, String key) {
        DynamoDbWaiter dbWaiter = ddb.waiter();
        StreamSpecification streamSpecification = StreamSpecification.builder()
                .streamEnabled(true)
                .streamViewType("NEW_AND_OLD_IMAGES")
                .build();
        CreateTableRequest request = CreateTableRequest.builder()
                .attributeDefinitions(AttributeDefinition.builder()
                        .attributeName(key)
                        .attributeType(ScalarAttributeType.S)
                        .build())
                .keySchema(KeySchemaElement.builder()
                        .attributeName(key)
                        .keyType(KeyType.HASH)
                        .build())
                .billingMode(BillingMode.PAY_PER_REQUEST) //  DynamoDB automatically scales based on traffic.
                .tableName(tableName)
                .streamSpecification(streamSpecification)
                .build();

        TableDescription newTable;
        try {
            CreateTableResponse response = ddb.createTable(request);
            DescribeTableRequest tableRequest = DescribeTableRequest.builder()
                    .tableName(tableName)
                    .build();
                    
            System.out.println("Waiting for " + tableName + " to be created...");

            // Wait until the Amazon DynamoDB table is created.
            WaiterResponse<DescribeTableResponse> waiterResponse = dbWaiter.waitUntilTableExists(tableRequest);
            waiterResponse.matched().response().ifPresent(System.out::println);
            newTable = response.tableDescription();
            return newTable;

        } catch (DynamoDbException e) {
            System.err.println(e.getMessage());
            System.exit(1);
        }
        return null;
    }



}
```

# DynamoDB Streams 및 AWS Lambda 트리거
<a name="Streams.Lambda"></a>

Amazon DynamoDB는 DynamoDB Streams의 이벤트에 자동으로 응답하는 코드 조각인 *트리거*를 만들 수 있도록 AWS Lambda와 통합되어 있습니다. 트리거를 사용하면 DynamoDB 테이블의 데이터 수정에 응답하는 애플리케이션을 빌드할 수 있습니다.

**Topics**
+ [자습서 \$11: 필터를 사용하여 AWS CLI로 AWS Lambda 및 Amazon DynamoDB에서 모든 이벤트 처리](Streams.Lambda.Tutorial.md)
+ [자습서 \$12: 필터를 사용하여 DynamoDB 및 Lambda에서 일부 이벤트 처리](Streams.Lambda.Tutorial2.md)
+ [DynamoDB Streams를 Lambda와 함께 사용하는 모범 사례](Streams.Lambda.BestPracticesWithDynamoDB.md)

테이블에서 DynamoDB Streams를 활성화할 경우 스트림 Amazon 리소스 이름(ARN)을 사용자가 작성하는 AWS Lambda 함수에 연결할 수 있습니다. 그러면 해당 DynamoDB 테이블에 대한 모든 변형 작업을 스트림의 항목으로 캡처할 수 있습니다. 예를 들어, 테이블의 항목이 수정될 때 새 레코드가 해당 테이블의 스트림에 즉시 나타나도록 트리거를 설정할 수 있습니다.

**참고**  
하나의 DynamoDB 스트림에 3개 이상의 Lambda 함수를 구독하는 경우 읽기 제한이 발생할 수 있습니다.

[AWS Lambda](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html) 서비스는 초당 4번 새 레코드에 대한 스트림을 폴링합니다. 새 스트림 레코드를 사용할 수 있게 되면 Lambda 함수가 동기식으로 호출됩니다. 동일한 DynamoDB 스트림에 최대 2개의 Lambda 함수를 구독할 수 있습니다. 동일한 DynamoDB 스트림에 3개 이상의 Lambda 함수를 구독하는 경우 읽기 제한이 발생할 수 있습니다.

Lambda 함수는 알림을 보내거나 워크플로를 시작하거나 사용자가 지정하는 기타 여러 작업을 수행할 수 있습니다. 각 스트림 레코드를 Amazon S3 File Gateway(Amazon S3)와 같은 영구 스토리지에 간단하게 복사하는 Lambda 함수를 작성하여 테이블의 쓰기 작업에 대한 영구 감사 추적을 만들 수 있습니다. `GameScores` 테이블에 쓰는 모바일 게임 앱이 있다고 가정해 보겠습니다. `TopScore` 테이블의 `GameScores` 속성이 업데이트될 때마다 해당하는 스트림 레코드가 테이블 스트림에 기록됩니다. 그런 다음 이 이벤트는 소셜 미디어 네트워크에 축하 메시지를 게시하는 Lambda 함수를 트리거합니다. 이 함수는 `GameScores`에 업데이트되지 않거나 `TopScore` 속성을 수정하지 않는 모든 스트림 레코드를 무시하도록 작성할 수도 있습니다.

함수가 오류를 반환하면 Lambda는 성공적으로 처리되거나 데이터가 만료될 때까지 배치(batch)를 재시도합니다. 또한 더 작은 배치로 재시도하고, 재시도 횟수를 제한하고, 너무 오래된 레코드를 폐기하고, 기타 옵션을 사용하도록 Lambda를 구성할 수 있습니다.

성능 모범 사례에 따라 Lambda 함수는 수명이 짧아야 합니다. 불필요한 처리 지연을 방지하기 위해 복잡한 로직도 실행하지 않아야 합니다. 특히 고속 스트림의 경우 장기 실행 중인 동기식 Lambda보다 비동기식 사후 처리 단계 함수 워크플로를 트리거하는 것이 좋습니다.

 DynamoDB 스트림에 리소스 기반 정책을 구성하여 Lambda 함수에 대한 교차 계정 읽기 액세스 권한을 부여함으로써 여러 AWS 계정에서 Lambda 트리거를 사용할 수 있습니다. 교차 계정 액세스를 허용하도록 스트림을 구성하는 방법을 자세히 알아보려면 DynamoDB 개발자 가이드의 [교차 계정 AWS Lambda 함수를 통한 액세스 공유](rbac-cross-account-access.md#shared-access-cross-acount-lambda)를 참조하세요.

AWS Lambda에 대한 자세한 내용은 [AWS Lambda개발자 안내서](https://docs.aws.amazon.com/lambda/latest/dg/) 단원을 참조하세요.

# 자습서 \$11: 필터를 사용하여 AWS CLI로 AWS Lambda 및 Amazon DynamoDB에서 모든 이벤트 처리
<a name="Streams.Lambda.Tutorial"></a>

 

이 자습서에서는 DynamoDB 테이블의 스트림을 처리하기 위해 AWS Lambda 트리거를 생성합니다.

**Topics**
+ [1단계: 스트림을 활성화하여 DynamoDB 테이블 생성](#Streams.Lambda.Tutorial.CreateTable)
+ [2단계: Lambda 실행 역할 생성](#Streams.Lambda.Tutorial.CreateRole)
+ [3단계: Amazon SNS 주제 생성](#Streams.Lambda.Tutorial.SNSTopic)
+ [4단계: Lambda 함수 생성 및 테스트](#Streams.Lambda.Tutorial.LambdaFunction)
+ [5단계: 트리거 생성 및 테스트](#Streams.Lambda.Tutorial.CreateTrigger)

이 자습서의 시나리오는 간단한 소셜 네트워크인 Woofer입니다. Woofer 사용자는 다른 Woofer 사용자에게 전달되는 *바크*(간단한 문자 메시지)를 이용해 의사소통합니다. 다음 다이어그램은 이 애플리케이션에 대한 구성 요소 및 워크플로우를 보여줍니다.

![\[DynamoDB 테이블, 스트림 레코드, Lambda 함수 및 Amazon SNS 주제의 Woofer 애플리케이션 워크플로입니다.\]](http://docs.aws.amazon.com/ko_kr/amazondynamodb/latest/developerguide/images/StreamsAndTriggers.png)


1. 사용자는 DynamoDB 테이블(`BarkTable`)에 항목을 씁니다. 테이블에서 각 항목은 바크를 나타냅니다.

1. 작성된 새 스트림 레코드는 새 항목이 `BarkTable`에 추가되었다는 것을 반영합니다.

1. 새 스트림 레코드에서 AWS Lambda 함수(`publishNewBark`)를 트리거합니다.

1. 스트림 레코드가 새 항목이 `BarkTable`에 추가되었음을 나타내는 경우 Lambda 함수는 스트림 레코드에서 데이터를 읽고 Amazon Simple Notification Service(Amazon SNS)의 주제에 메시지를 게시합니다.

1. Amazon SNS 주제의 구독자는 메시지를 수신합니다. (본 자습서에서 이메일 주소는 구독자에 해당됩니다.)

**시작하기 전**  
이 자습서에서는 AWS Command Line Interface AWS CLI을 사용합니다. 아직 완료하지 않았다면 [AWS Command Line Interface 사용 설명서](https://docs.aws.amazon.com/cli/latest/userguide/)의 지침에 따라 AWS CLI를 설치 및 구성하세요.

## 1단계: 스트림을 활성화하여 DynamoDB 테이블 생성
<a name="Streams.Lambda.Tutorial.CreateTable"></a>

이 단계에서는 Woofer 사용자의 모든 바크를 저장할 DynamoDB 테이블(`BarkTable`)을 생성합니다. 기본 키는 `Username`(파티션 키)와 `Timestamp`(정렬 키)로 구성됩니다. 이 두 가지 속성 모두 문자열 유형입니다.

`BarkTable`에서는 스트림이 활성화되어 있습니다. 이 자습서에서 나중에 AWS Lambda 함수를 스트림에 연결하여 트리거를 생성할 수 있습니다.

1. 테이블을 생성하려면 다음 명령을 입력합니다.

   ```
   aws dynamodb create-table \
       --table-name BarkTable \
       --attribute-definitions AttributeName=Username,AttributeType=S AttributeName=Timestamp,AttributeType=S \
       --key-schema AttributeName=Username,KeyType=HASH  AttributeName=Timestamp,KeyType=RANGE \
       --provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5 \
       --stream-specification StreamEnabled=true,StreamViewType=NEW_AND_OLD_IMAGES
   ```

1. 출력에서 `LatestStreamArn`를 찾습니다.

   ```
   ...
   "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

   `region` 및 `accountID`를 기록해 둡니다. 이 두 항목은 자습서의 다른 단계에서 필요합니다.

## 2단계: Lambda 실행 역할 생성
<a name="Streams.Lambda.Tutorial.CreateRole"></a>

이 단계에서는 AWS Identity and Access Management(IAM) 역할(`WooferLambdaRole`\$1을 만들어 권한을 할당합니다. 이 역할은 [4단계: Lambda 함수 생성 및 테스트](#Streams.Lambda.Tutorial.LambdaFunction)에서 만드는 Lambda 함수에서 사용합니다.

또한 해당 역할에 대한 정책도 생성합니다. 이 정책에는 Lambda 함수에서 런타임에 필요로 하는 모든 권한이 포함되어 있습니다.

1. 다음 콘텐츠를 가진 `trust-relationship.json`이라는 파일을 생성합니다:

------
#### [ JSON ]

****  

   ```
   {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "lambda.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
      ]
    }
   ```

------

1. 다음 명령을 입력하여 `WooferLambdaRole`을 생성합니다.

   ```
   aws iam create-role --role-name WooferLambdaRole \
       --path "/service-role/" \
       --assume-role-policy-document file://trust-relationship.json
   ```

1. 다음 콘텐츠를 가진 `role-policy.json`이라는 파일을 생성합니다: (`region` 및 `accountID`를 AWS 리전 및 계정 ID로 바꿉니다.)

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents"
               ],
               "Resource": "arn:aws:logs:us-east-1:111122223333:*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "dynamodb:DescribeStream",
                   "dynamodb:GetRecords",
                   "dynamodb:GetShardIterator",
                   "dynamodb:ListStreams"
               ],
               "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/BarkTable/stream/*"
           },
           {
               "Effect": "Allow",
               "Action": [
                   "sns:Publish"
               ],
               "Resource": [
                   "*"
               ]
           }
       ]
   }
   ```

------

   정책은 `WooferLambdaRole`이 다음을 수행할 수 있도록 해주는 네 가지 문을 가지고 있습니다.
   + Lambda 함수 실행(`publishNewBark`). 이 자습서에서는 나중에 함수를 생성합니다.
   + Amazon CloudWatch Logs 액세스. Lambda 함수는 런타임에 CloudWatch Logs에 진단을 씁니다.
   + `BarkTable`에 대한 DynamoDB 스트림에서 데이터 읽기
   + Amazon SNS에 메시지 게시

1. 다음 명령을 입력하여 정책을 `WooferLambdaRole`에 연결합니다.

   ```
   aws iam put-role-policy --role-name WooferLambdaRole \
       --policy-name WooferLambdaRolePolicy \
       --policy-document file://role-policy.json
   ```

## 3단계: Amazon SNS 주제 생성
<a name="Streams.Lambda.Tutorial.SNSTopic"></a>

이 단계에서는 Amazon SNS 주제(`wooferTopic`)를 생성하고 해당 주제에 대한 이메일 주소를 구독합니다. Lambda 함수에서는 이 주제를 사용하여 Woofer 사용자의 새 바크를 게시합니다.

1. 다음 명령을 입력하여 새 Amazon SNS 주제를 생성합니다.

   ```
   aws sns create-topic --name wooferTopic
   ```

1. 다음 명령을 입력해 `wooferTopic`에 대한 이메일 주소를 구독합니다. (`region` 및 `accountID`를 AWS 리전 및 계정 ID로 바꾸고, `example@example.com`을 유효한 이메일 주소로 바꿉니다.)

   ```
   aws sns subscribe \
       --topic-arn arn:aws:sns:region:accountID:wooferTopic \
       --protocol email \
       --notification-endpoint example@example.com
   ```

1. Amazon SNS에서 사용자 이메일 주소로 확인 메시지를 보냅니다. 구독 프로세스를 완료하는 메시지와 연결된 **구독 확인** 링크를 클릭합니다.

## 4단계: Lambda 함수 생성 및 테스트
<a name="Streams.Lambda.Tutorial.LambdaFunction"></a>

이 단계에서는 AWS Lambda 함수(`publishNewBark`)를 만들어 `BarkTable`에서 스트림 레코드를 처리합니다.

`publishNewBark` 함수는 `BarkTable`의 새 항목에 해당하는 스트림 이벤트만을 처리합니다. 함수는 해당 이벤트에서 데이터를 읽은 다음 Amazon SNS를 호출하여 게시합니다.

1. 다음 콘텐츠를 가진 `publishNewBark.js`이라는 파일을 생성합니다: `region` 및 `accountID`를 AWS 리전 및 계정 ID로 바꿉니다.

   ```
   'use strict';
   var AWS = require("aws-sdk");
   var sns = new AWS.SNS();
   
   exports.handler = (event, context, callback) => {
   
       event.Records.forEach((record) => {
           console.log('Stream record: ', JSON.stringify(record, null, 2));
   
           if (record.eventName == 'INSERT') {
               var who = JSON.stringify(record.dynamodb.NewImage.Username.S);
               var when = JSON.stringify(record.dynamodb.NewImage.Timestamp.S);
               var what = JSON.stringify(record.dynamodb.NewImage.Message.S);
               var params = {
                   Subject: 'A new bark from ' + who,
                   Message: 'Woofer user ' + who + ' barked the following at ' + when + ':\n\n ' + what,
                   TopicArn: 'arn:aws:sns:region:accountID:wooferTopic'
               };
               sns.publish(params, function(err, data) {
                   if (err) {
                       console.error("Unable to send message. Error JSON:", JSON.stringify(err, null, 2));
                   } else {
                       console.log("Results from sending message: ", JSON.stringify(data, null, 2));
                   }
               });
           }
       });
       callback(null, `Successfully processed ${event.Records.length} records.`);
   };
   ```

1. `publishNewBark.js`가 포함된 zip 파일을 만듭니다. zip 명령줄 유틸리티를 사용하는 경우 다음과 같은 명령을 입력하여 이를 수행할 수 있습니다.

   ```
   zip publishNewBark.zip publishNewBark.js
   ```

1. Lambda 함수를 만들 때 [2단계: Lambda 실행 역할 생성](#Streams.Lambda.Tutorial.CreateRole)에서 생성한 `WooferLambdaRole`의 Amazon 리소스 이름(ARN)을 지정합니다. 이 ARN을 가져오려면 다음 명령을 입력합니다.

   ```
   aws iam get-role --role-name WooferLambdaRole
   ```

   출력에서 `WooferLambdaRole`의 ARN을 찾습니다.

   ```
   ...
   "Arn": "arn:aws:iam::region:role/service-role/WooferLambdaRole"
   ...
   ```

   Lambda 함수를 생성하려면 다음 명령을 입력합니다. *roleARN*를 `WooferLambdaRole`의 ARN으로 바꿉니다.

   ```
   aws lambda create-function \
       --region region \
       --function-name publishNewBark \
       --zip-file fileb://publishNewBark.zip \
       --role roleARN \
       --handler publishNewBark.handler \
       --timeout 5 \
       --runtime nodejs16.x
   ```

1. 이제, `publishNewBark`을 테스트하여 제대로 작동하는지 확인합니다. 이렇게 하려면 DynamoDB Streams의 실제 레코드와 유사한 내용을 입력합니다.

   다음 콘텐츠를 가진 `payload.json`이라는 파일을 생성합니다: `region` 및 `accountID`를 AWS 리전 및 계정 ID로 바꿉니다.

   ```
   {
       "Records": [
           {
               "eventID": "7de3041dd709b024af6f29e4fa13d34c",
               "eventName": "INSERT",
               "eventVersion": "1.1",
               "eventSource": "aws:dynamodb",
               "awsRegion": "region",
               "dynamodb": {
                   "ApproximateCreationDateTime": 1479499740,
                   "Keys": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "NewImage": {
                       "Timestamp": {
                           "S": "2016-11-18:12:09:36"
                       },
                       "Message": {
                           "S": "This is a bark from the Woofer social network"
                       },
                       "Username": {
                           "S": "John Doe"
                       }
                   },
                   "SequenceNumber": "13021600000000001596893679",
                   "SizeBytes": 112,
                   "StreamViewType": "NEW_IMAGE"
               },
               "eventSourceARN": "arn:aws:dynamodb:region:account ID:table/BarkTable/stream/2016-11-16T20:42:48.104"
           }
       ]
   }
   ```

   `publishNewBark` 함수를 테스트하려면 다음 명령을 입력합니다.

   ```
   aws lambda invoke --function-name publishNewBark --payload file://payload.json --cli-binary-format raw-in-base64-out output.txt
   ```

   테스트가 성공적이면 다음 출력이 생성됩니다.

   ```
   {
       "StatusCode": 200,
       "ExecutedVersion": "$LATEST"
   }
   ```

   또한 `output.txt` 파일에는 다음 텍스트가 포함됩니다.

   ```
   "Successfully processed 1 records."
   ```

   몇 분 안에 새 이메일 메시지가 전송됩니다.
**참고**  
AWS Lambda는 진단 정보를 Amazon CloudWatch Logs에 씁니다. Lambda 함수에서 오류가 발생하면 문제 해결을 위해 이 진단 정보를 사용할 수 있습니다.  
[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)에서 CloudWatch 콘솔을 엽니다.
탐색 창에서 **로그**를 선택합니다.
다음과 같은 로그 그룹을 선택합니다.`/aws/lambda/publishNewBark`
최신 로그 스트림을 선택하여 함수의 출력(오류 사항 포함)을 봅니다.

## 5단계: 트리거 생성 및 테스트
<a name="Streams.Lambda.Tutorial.CreateTrigger"></a>

[4단계: Lambda 함수 생성 및 테스트](#Streams.Lambda.Tutorial.LambdaFunction)에서 Lambda 함수를 테스트하여 제대로 실행되는지 확인합니다. 이 단계에서는 Lambda 함수(`publishNewBark`)를 이벤트 소스(`BarkTable` 스트림)과 연결하여 *트리거*를 생성합니다.

1. 트리거를 생성할 때 `BarkTable` 스트림의 ARN을 지정해야 합니다. 이 ARN을 가져오려면 다음 명령을 입력합니다.

   ```
   aws dynamodb describe-table --table-name BarkTable
   ```

   출력에서 `LatestStreamArn`를 찾습니다.

   ```
   ...
    "LatestStreamArn": "arn:aws:dynamodb:region:accountID:table/BarkTable/stream/timestamp
   ...
   ```

1. 다음 명령을 입력하여 트리거를 생성합니다. `streamARN`를 실제 스트림 ARN으로 바꿉니다.

   ```
   aws lambda create-event-source-mapping \
       --region region \
       --function-name publishNewBark \
       --event-source streamARN  \
       --batch-size 1 \
       --starting-position TRIM_HORIZON
   ```

1. 트리거를 테스트합니다. 다음 명령을 입력하여 `BarkTable`에 항목을 추가합니다.

   ```
   aws dynamodb put-item \
       --table-name BarkTable \
       --item Username={S="Jane Doe"},Timestamp={S="2016-11-18:14:32:17"},Message={S="Testing...1...2...3"}
   ```

   몇 분 이내에 새 이메일 메시지를 수신합니다.

1. DynamoDB 콘솔을 열어서 `BarkTable`에 몇 개 항목을 추가합니다. `Username` 및 `Timestamp`에 대한 속성값을 지정해야 합니다. (필수 사항은 아니지만 `Message`에 대한 값을 지정할 수도 있습니다.) 추가한 `BarkTable`의 각 항목에 대해 새 이메일 메시지를 수신하게 됩니다.

   Lambda 함수는 `BarkTable`에 추가된 새 항목만을 처리합니다. 테이블에 업데이트되거나 삭제한 항목에 대해 이 함수는 별도의 작업을 하지 않습니다.

**참고**  
AWS Lambda는 진단 정보를 Amazon CloudWatch Logs에 씁니다. Lambda 함수에서 오류가 발생하면 문제 해결을 위해 이 진단 정보를 사용할 수 있습니다.  
[https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/)에서 CloudWatch 콘솔을 엽니다.
탐색 창에서 **로그**를 선택합니다.
다음과 같은 로그 그룹을 선택합니다.`/aws/lambda/publishNewBark`
최신 로그 스트림을 선택하여 함수의 출력(오류 사항 포함)을 봅니다.

# 자습서 \$12: 필터를 사용하여 DynamoDB 및 Lambda에서 일부 이벤트 처리
<a name="Streams.Lambda.Tutorial2"></a>

이 자습서에서는 DynamoDB 테이블의 스트림에서 일부 이벤트만 처리하기 위해 AWS Lambda 트리거를 생성합니다.

**Topics**
+ [종합 - CloudFormation](#Streams.Lambda.Tutorial2.Cloudformation)
+ [종합 - CDK](#Streams.Lambda.Tutorial2.CDK)

[Lambda 이벤트 필터링](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html)을 사용하면 필터 표현식을 사용하여 Lambda가 어떤 이벤트를 처리를 위해 함수로 보내는지를 제어할 수 있습니다. DynamoDB Streams당 최대 5개의 서로 다른 필터를 구성할 수 있습니다. 일괄 처리 기간을 사용하는 경우 Lambda는 각 새 이벤트에 필터 기준을 적용하여 현재 배치에 포함되어야 하는지 확인합니다.

필터는 `FilterCriteria`라는 구조를 통해 적용됩니다. `FilterCriteria`의 3가지 주요 속성은 `metadata properties`, `data properties` 및 `filter patterns`입니다.

다음은 DynamoDB Streams 이벤트의 예제 구조입니다.

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

`metadata properties`는 이벤트 객체의 필드입니다. DynamoDB Streams의 경우 `metadata properties`은 `dynamodb` 또는 `eventName`과 같은 필드입니다.

`data properties`은 이벤트 본문의 필드입니다. `data properties`을 필터링하려면 적절한 키 내의 `FilterCriteria`에 해당 속성을 포함해야 합니다. DynamoDB 이벤트 소스의 경우 데이터 키는 `NewImage` 또는 `OldImage`입니다.

마지막으로 필터 규칙은 특정 속성에 적용할 필터 표현식을 정의합니다. 여기 몇 가지 예가 있습니다:


| 비교 연산자 | 예제 | 규칙 구문(부분적) | 
| --- | --- | --- | 
|  Null  |  제품 유형이 null입니다.  |  `{ "product_type": { "S": null } } `  | 
|  비어 있음  |  제품 이름이 비어 있습니다.  |  `{ "product_name": { "S": [ ""] } } `  | 
|  같음  |  주가 플로리다와 같습니다.  |  `{ "state": { "S": ["FL"] } } `  | 
|  및  |  제품 주는 플로리다와 같고 제품 범주는 초콜릿과 같습니다.  |  `{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } } `  | 
|  또는  |  제품 주는 플로리다 또는 캘리포니아입니다.  |  `{ "state": { "S": ["FL","CA"] } } `  | 
|  Not  |  제품 주는 플로리다가 아닙니다.  |  `{"state": {"S": [{"anything-but": ["FL"]}]}}`  | 
|  존재함  |  홈메이드 제품이 있습니다.  |  `{"homemade": {"S": [{"exists": true}]}}`  | 
|  존재하지 않음  |  홈메이드 제품이 존재하지 않습니다  |  `{"homemade": {"S": [{"exists": false}]}}`  | 
|  로 시작함  |  PK는 COMPANY로 시작합니다.  |  `{"PK": {"S": [{"prefix": "COMPANY"}]}}`  | 

Lambda 함수에 대해 최대 5개의 이벤트 필터링 패턴을 지정할 수 있습니다. 5개의 이벤트 각각은 논리적 OR로 평가됩니다. 따라서 `Filter_One` 및 `Filter_Two`라는 두 개의 필터를 구성하면 Lambda 함수가 `Filter_One` 또는 `Filter_Two`를 실행합니다.

**참고**  
[Lambda 이벤트 필터링](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventfiltering.html) 페이지에는 숫자 값을 필터링하고 비교하는 몇 가지 옵션이 있지만 DynamoDB 필터 이벤트의 경우 DynamoDB의 숫자가 문자열로 저장되기 때문에 적용되지 않습니다. 예를 들어 ` "quantity": { "N": "50" }`의 경우, `"N"` 속성 때문에 숫자임을 알 수 있습니다.

## 종합 - CloudFormation
<a name="Streams.Lambda.Tutorial2.Cloudformation"></a>

이벤트 필터링 기능을 실제로 보여주기 위해 CloudFormation 템플릿 샘플을 소개합니다. 이 템플릿은 Amazon DynamoDB Streams가 활성화된 파티션 키 PK 및 정렬 키 SK가 있는 간단한 DynamoDB 테이블을 생성합니다. 그러면 Amazon Cloudwatch에 로그를 쓰고 Amazon DynamoDB 스트림에서 이벤트를 읽을 수 있는 람다 함수와 간단한 Lambda 실행 역할이 생성됩니다. 또한 DynamoDB Streams와 Lambda 함수 사이에 이벤트 소스 매핑을 추가하므로 Amazon DynamoDB Streams에 이벤트가 있을 때마다 함수를 실행할 수 있습니다.

```
AWSTemplateFormatVersion: "2010-09-09"

Description: Sample application that presents AWS Lambda event source filtering 
with Amazon DynamoDB Streams.

Resources:
  StreamsSampleDDBTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: "PK"
          AttributeType: "S"
        - AttributeName: "SK"
          AttributeType: "S"
      KeySchema:
        - AttributeName: "PK"
          KeyType: "HASH"
        - AttributeName: "SK"
          KeyType: "RANGE"
      StreamSpecification:
        StreamViewType: "NEW_AND_OLD_IMAGES"
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 5

  LambdaExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: "2012-10-17",		 	 	 
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      Path: "/"
      Policies:
        - PolicyName: root
          PolicyDocument:
            Version: "2012-10-17",		 	 	 
            Statement:
              - Effect: Allow
                Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                Resource: arn:aws:logs:*:*:*
              - Effect: Allow
                Action:
                  - dynamodb:DescribeStream
                  - dynamodb:GetRecords
                  - dynamodb:GetShardIterator
                  - dynamodb:ListStreams
                Resource: !GetAtt StreamsSampleDDBTable.StreamArn

  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST

  ProcessEventLambda:
    Type: AWS::Lambda::Function
    Properties:
      Runtime: python3.7
      Timeout: 300
      Handler: index.handler
      Role: !GetAtt LambdaExecutionRole.Arn
      Code:
        ZipFile: |
          import logging

          LOGGER = logging.getLogger()
          LOGGER.setLevel(logging.INFO)

          def handler(event, context):
            LOGGER.info('Received Event: %s', event)
            for rec in event['Records']:
              LOGGER.info('Record: %s', rec)

Outputs:
  StreamsSampleDDBTable:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.Arn
  StreamARN:
    Description: DynamoDB Table ARN created for this example
    Value: !GetAtt StreamsSampleDDBTable.StreamArn
```

이 클라우드 구성 템플릿을 배포한 후 다음과 같은 Amazon DynamoDB 항목을 삽입할 수 있습니다.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

이 클라우드 형성 템플릿에 인라인으로 포함된 간단한 람다 함수 덕분에 다음과 같이 람다 함수에 대한 Amazon CloudWatch 로그 그룹의 이벤트를 볼 수 있습니다.

```
{
  "eventID": "c9fbe7d0261a5163fcb6940593e41797",
  "eventName": "INSERT",
  "eventVersion": "1.1",
  "eventSource": "aws:dynamodb",
  "awsRegion": "us-east-2",
  "dynamodb": {
    "ApproximateCreationDateTime": 1664559083.0,
    "Keys": {
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" }
    },
    "NewImage": {
      "quantity": { "N": "50" },
      "company_id": { "S": "1000" },
      "fabric": { "S": "Florida Chocolates" },
      "price": { "N": "15" },
      "stores": { "N": "5" },
      "product_id": { "S": "1000" },
      "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" },
      "PK": { "S": "COMPANY#1000" },
      "state": { "S": "FL" },
      "type": { "S": "" }
    },
    "SequenceNumber": "700000000000888747038",
    "SizeBytes": 174,
    "StreamViewType": "NEW_AND_OLD_IMAGES"
  },
  "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209"
}
```

**필터링 예제**
+ **지정된 주와 일치하는 제품만**

이 예제에서는 플로리다(약어 'FL')에서 생산되는 모든 제품을 일치시키는 필터를 포함하도록 CloudFormation 템플릿을 수정합니다.

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

스택을 재배포하고 나면 다음 DynamoDB 항목을 테이블에 추가할 수 있습니다. 이 예제의 제품은 캘리포니아에서 만들어진 제품이므로 Lambda 함수 로그에는 표시되지 않습니다.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK#1000",
 "company_id": "1000",
 "fabric": "Florida Chocolates",
 "price": 15,
 "product_id": "1000",
 "quantity": 50,
 "state": "CA",
 "stores": 5,
 "type": ""
}
```
+ **PK 및 SK의 일부 값으로 시작하는 항목만**

이 예에서는 다음 조건을 포함하도록 CloudFormation 템플릿을 수정합니다.

```
EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

AND 조건을 사용하려면 패턴 내부에 조건이 있어야 하며 이 경우 PK 및 SK 키가 쉼표로 구분되어 동일한 표현식 안에 있습니다.

PK 및 SK의 일부 값으로 시작하거나 특정 상태에서 시작합니다.

이 예에서는 다음 조건을 포함하도록 CloudFormation 템플릿을 수정합니다.

```
  EventSourceDDBTableStream:
    Type: AWS::Lambda::EventSourceMapping
    Properties:
      BatchSize: 1
      Enabled: True
      FilterCriteria:
        Filters:
          - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}'
          - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }'
      EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn
      FunctionName: !GetAtt ProcessEventLambda.Arn
      StartingPosition: LATEST
```

필터 섹션에 새 패턴을 도입하여 OR 조건이 추가되었음을 알 수 있습니다.

## 종합 - CDK
<a name="Streams.Lambda.Tutorial2.CDK"></a>

다음 샘플 CDK 프로젝트 구성 템플릿은 이벤트 필터링 기능을 안내합니다. 이 CDK 프로젝트로 작업하기 전에 [준비 스크립트 실행](https://docs.aws.amazon.com/cdk/v2/guide/work-with-cdk-python.html)을 포함한 [전제 조건을 설치](https://docs.aws.amazon.com/cdk/v2/guide/work-with.html)해야 합니다.

**CDK 프로젝트 만들기**

먼저 빈 디렉터리에서 `cdk init`를 호출하여 새 AWS CDK 프로젝트를 생성합니다.

```
mkdir ddb_filters
cd ddb_filters
cdk init app --language python
```

`cdk init` 명령은 프로젝트 폴더의 이름을 사용하여 클래스, 하위 폴더 및 파일을 포함한 프로젝트의 다양한 요소의 이름을 지정합니다. 폴더 이름의 하이픈은 밑줄로 변환됩니다. 그렇지 않으면 이름은 Python 식별자 형식을 따라야 합니다. 예를 들어 숫자로 시작하거나 공백을 포함해서는 안 됩니다.

새 프로젝트를 사용하려면 해당 가상 환경을 활성화하세요. 이렇게 하면 프로젝트의 종속성을 전역적으로 설치하는 대신 프로젝트 폴더에 로컬로 설치할 수 있습니다.

```
source .venv/bin/activate
python -m pip install -r requirements.txt
```

**참고**  
이를 Mac/Linux 명령으로 인식하여 가상 환경을 활성화할 수 있습니다. Python 템플릿에는 Windows에서 동일한 명령을 사용할 수 있도록 하는 배치 파일인 `source.bat`가 포함되어 있습니다. 기존 Windows 명령 `.venv\Scripts\activate.bat`도 작동합니다. AWS CDK Toolkit v1.70.0 이하를 사용하여 AWS CDK 프로젝트를 초기화한 경우 가상 환경은 .`.venv` 대신 `.env` 디렉터리에 있습니다.

**기본 인프라**

선호하는 텍스트 편집기로 `./ddb_filters/ddb_filters_stack.py` 파일을 엽니다. 이 파일은 AWS CDK 프로젝트를 생성할 때 자동으로 생성되었습니다.

다음으로 `_create_ddb_table` 및 `_set_ddb_trigger_function` 함수를 추가합니다. 이러한 함수는 프로비저닝 모드 온디맨드 모드에서 파티션 키 PK 및 정렬 키 SK가 있는 DynamoDB 테이블을 생성하며 Amazon DynamoDB Streams가 기본적으로 활성화되어 새 이미지와 이전 이미지를 표시합니다.

Lambda 함수는 `app.py` 파일 아래의 `lambda` 폴더에 저장됩니다. 이 파일은 나중에 생성됩니다. 여기에는 이 스택에서 생성된 Amazon DynamoDB 테이블의 이름이 될 환경 변수 `APP_TABLE_NAME`이 포함됩니다. 동일한 함수에서 Lambda 함수에 스트림 읽기 권한을 부여합니다. 마지막으로 DynamoDB Streams를 람다 함수의 이벤트 소스로 구독합니다.

`__init__` 메서드의 파일 끝에서 각 구성을 호출하여 스택에서 초기화합니다. 추가 구성 요소와 서비스가 필요한 더 큰 프로젝트의 경우 기본 스택 외부에서 이러한 구성을 정의하는 것이 가장 좋습니다.

```
import os
import json

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    aws_lambda as _lambda,
    aws_dynamodb as dynamodb,
)
from constructs import Construct


class DdbFiltersStack(Stack):

    def _create_ddb_table(self):
        dynamodb_table = dynamodb.Table(
            self,
            "AppTable",
            partition_key=dynamodb.Attribute(
                name="PK", type=dynamodb.AttributeType.STRING
            ),
            sort_key=dynamodb.Attribute(
                name="SK", type=dynamodb.AttributeType.STRING),
            billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST,
            stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES,
            removal_policy=cdk.RemovalPolicy.DESTROY,
        )

        cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name)
        return dynamodb_table

    def _set_ddb_trigger_function(self, ddb_table):
        events_lambda = _lambda.Function(
            self,
            "LambdaHandler",
            runtime=_lambda.Runtime.PYTHON_3_9,
            code=_lambda.Code.from_asset("lambda"),
            handler="app.handler",
            environment={
                "APP_TABLE_NAME": ddb_table.table_name,
            },
        )

        ddb_table.grant_stream_read(events_lambda)

        event_subscription = _lambda.CfnEventSourceMapping(
            scope=self,
            id="companyInsertsOnlyEventSourceMapping",
            function_name=events_lambda.function_name,
            event_source_arn=ddb_table.table_stream_arn,
            maximum_batching_window_in_seconds=1,
            starting_position="LATEST",
            batch_size=1,
        )

    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        ddb_table = self._create_ddb_table()
        self._set_ddb_trigger_function(ddb_table)
```

이제 Amazon CloudWatch에 로그를 출력하는 매우 간단한 람다 함수를 만들어 보겠습니다. 이를 위해 `lambda`라는 새 폴더를 만듭니다.

```
mkdir lambda
touch app.py
```

자주 사용하는 텍스트 편집기를 사용하여 다음 내용을 `app.py` 파일에 추가합니다.

```
import logging

LOGGER = logging.getLogger()
LOGGER.setLevel(logging.INFO)


def handler(event, context):
    LOGGER.info('Received Event: %s', event)
    for rec in event['Records']:
        LOGGER.info('Record: %s', rec)
```

`/ddb_filters/` 폴더에 있는지 확인하고 다음 명령을 입력하여 샘플 애플리케이션을 생성합니다.

```
cdk deploy
```

어느 시점에서 솔루션 배포 여부를 확인하라는 메시지가 표시됩니다. `Y`를 입력하여 변경 내용을 적용합니다.

```
├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │
└───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘

Do you wish to deploy these changes (y/n)? y

...

✨  Deployment time: 67.73s

Outputs:
DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP
Stack ARN:
arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6
```

변경 사항이 배포되면 AWS 콘솔을 열고 테이블에 항목 하나를 추가합니다.

```
{
 "PK": "COMPANY#1000",
 "SK": "PRODUCT#CHOCOLATE#DARK",
 "company_id": "1000",
 "type": "",
 "state": "FL",
 "stores": 5,
 "price": 15,
 "quantity": 50,
 "fabric": "Florida Chocolates"
}
```

이제 CloudWatch 로그에 이 항목의 모든 정보가 포함되어야 합니다.

**필터링 예제**
+ **지정된 주와 일치하는 제품만**

`ddb_filters/ddb_filters/ddb_filters_stack.py` 파일을 열고 "FL"과 동일한 모든 제품과 일치하는 필터를 포함하도록 수정합니다. 이는 45행의 `event_subscription` 바로 아래에서 수정할 수 있습니다.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```
+ **PK 및 SK의 일부 값으로 시작하는 항목만**

다음 조건을 포함하도록 Python 스크립트를 수정합니다.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
        ]
    },
```
+ **PK 및 SK의 일부 값으로 시작하거나 특정 상태에서 시작합니다.**

다음 조건을 포함하도록 Python 스크립트를 수정합니다.

```
event_subscription.add_property_override(
    property_path="FilterCriteria",
    value={
        "Filters": [
            {
                "Pattern": json.dumps(
                    {
                        {
                            "dynamodb": {
                                "Keys": {
                                    "PK": {"S": [{"prefix": "COMPANY"}]},
                                    "SK": {"S": [{"prefix": "PRODUCT"}]},
                                }
                            }
                        }
                    }
                )
            },
            {
                "Pattern": json.dumps(
                    {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}}
                )
            },
        ]
    },
)
```

필터 배열에 더 많은 요소를 추가하면 OR 조건이 추가됩니다.

**정리**

작업 디렉터리의 베이스에서 필터 스택을 찾고 `cdk destroy`를 실행합니다. 리소스 삭제를 확인하라는 메시지가 표시됩니다.

```
cdk destroy
Are you sure you want to delete: DdbFiltersStack (y/n)? y
```

# DynamoDB Streams를 Lambda와 함께 사용하는 모범 사례
<a name="Streams.Lambda.BestPracticesWithDynamoDB"></a>

AWS Lambda 함수는 다른 함수와 격리된 실행 환경인 *컨테이너* 내에서 실행됩니다. 함수를 처음 실행하면 AWS Lambda에서 컨테이너를 생성하고 함수 코드를 실행하기 시작합니다.

Lambda 함수에는 호출당 한 번 실행되는 *핸들러*가 있습니다. 핸들러에는 함수에 대한 주요 비즈니스 로직이 포함됩니다. 예를 들어, [4단계: Lambda 함수 생성 및 테스트](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction)에 표시된 Lambda 함수에는 DynamoDB 스트림의 레코드를 처리할 수 있는 핸들러가 있습니다.

또한 컨테이너가 생성되고 나서 AWS Lambda가 핸들러를 처음 실행하기 전에 한 번만 실행되는 초기화 코드도 제공할 수 있습니다. [4단계: Lambda 함수 생성 및 테스트](Streams.Lambda.Tutorial.md#Streams.Lambda.Tutorial.LambdaFunction)에 표시된 Lambda 함수에는 SDK for JavaScript in Node.js를 가져오고 Amazon SNS에 대한 클라이언트를 생성하는 초기화 코드가 있습니다. 이 객체는 핸들러 외부에서 한 번만 정의할 수 있습니다.

함수를 실행한 후, AWS Lambda는 이후 함수 호출에 컨테이너를 재사용하도록 선택할 수 있습니다. 이 경우에, 함수 핸들러에서 초기화 코드에 정의된 리소스를 재사용할 수도 있습니다. (AWS Lambda에서는 컨테이너의 보유 기간 또는 컨테이너의 재사용 여부를 제어할 수 없습니다.)

AWS Lambda를 사용하는 DynamoDB 트리거의 경우 다음 사항이 권장됩니다.
+ AWS 서비스 클라이언트는 핸들러가 아니라 초기화 코드에서 인스턴스화해야 합니다. 이렇게 하면 AWS Lambda에서 기존의 연결을 재사용해 컨테이너의 수명을 정할 수 있습니다.
+ 일반적으로, 연결을 명시적으로 관리하거나 연결 풀링을 구현할 필요가 없습니다. 이 작업은 AWS Lambda에서 수행되기 때문입니다.

DynamoDB 스트림의 Lambda 소비자는 정확히 한 번의 전송을 보장하지 않으며 가끔 중복이 발생할 수 있습니다. 중복 처리로 인해 예상치 못한 문제가 발생하지 않도록 Lambda 함수 코드가 멱등성을 갖도록 하세요.

자세한 내용은 *AWS Lambda 개발자 안내서*의 [AWS Lambda 함수 작업 모범 사례](https://docs.aws.amazon.com/lambda/latest/dg/best-practices.html)를 참조하세요.

# DynamoDB 스트림 및 Apache Flink
<a name="StreamsApacheFlink.xml"></a>

Apache Flink로 Amazon DynamoDB Streams 레코드를 사용할 수 있습니다. [Amazon Managed Service for Apache Flink](https://aws.amazon.com/managed-service-apache-flink/)를 통해 Apache Flink를 사용하여 스트리밍 데이터를 실시간으로 변환하고 분석할 수 있습니다. Apache Flink는 실시간 데이터를 처리하기 위한 오픈 소스 스트림 처리 프레임워크입니다. Apache Flink용 Amazon DynamoDB Streams 커넥터는 Apache Flink 워크로드의 구축 및 관리를 간소화하고 애플리케이션과 기타 AWS 서비스의 통합을 가능하게 합니다.

Amazon Managed Service for Apache Flink를 사용하면 로그 분석, 클릭스트림 분석, 사물 인터넷(IoT), 광고 기술, 게임 등을 위한 엔드 투 엔드 스트림 처리 애플리케이션을 빠르게 구축할 수 있습니다. 가장 일반적인 4가지 사용 사례는 스트리밍 추출, 변환, 로드(ETL), 이벤트 중심 애플리케이션, 응답형 실시간 분석 및 데이터 스트림의 대화형 쿼리입니다. Amazon DynamoDB Streams에서 Apache Flink에 쓰는 방법에 대한 자세한 내용은 [Amazon DynamoDB Streams Connector](https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/dynamodb/)를 참조하세요.