

신중한 고려 끝에 Amazon Kinesis Data Analytics for SQL 애플리케이션을 중단하기로 결정했습니다.

1. **2025년 9월 1**일부터 Amazon Kinesis Data Analytics for SQL 애플리케이션에 대한 버그 수정은 제공되지 않습니다. 곧 중단될 예정이므로 지원이 제한될 예정이기 때문입니다.

2. **2025년 10월 15**일부터 새 Kinesis Data Analytics for SQL 애플리케이션을 생성할 수 없습니다.

3. **2026년 1월 27**일부터 애플리케이션이 삭제됩니다. Amazon Kinesis Data Analytics for SQL 애플리케이션을 시작하거나 작동할 수 없게 됩니다. 그 시점부터 Amazon Kinesis Data Analytics for SQL에 대한 지원을 더 이상 이용할 수 없습니다. 자세한 내용은 [Amazon Kinesis Data Analytics for SQL 애플리케이션 단종](discontinuation.md) 단원을 참조하십시오.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 예: 윈도우 및 집계
<a name="examples-window"></a>

이 섹션에서는 윈도우 형식 및 집계 쿼리를 사용하는 Amazon Kinesis Data Analytics 애플리케이션 예를 소개합니다. (자세한 설명은 [윈도우 모드 쿼리](windowed-sql.md) 섹션을 참조하십시오.) 각 예에서는 Kinesis Data Analytics 애플리케이션을 설정하기 위한 단계별 지침과 예 코드를 제공합니다.

**Topics**
+ [예: 스태거 윈도우](examples-window-stagger.md)
+ [예: ROWTIME을 사용하는 텀블링 윈도우](examples-window-tumbling-rowtime.md)
+ [예: 이벤트 타임스탬프를 사용하는 텀블링 윈도우](examples-window-tumbling-event.md)
+ [예: 가장 자주 발생하는 값 검색(TOP\$1K\$1ITEMS\$1TUMBLING)](examples-window-topkitems.md)
+ [예: 쿼리에서 부분적 결과 집계](examples-window-partialresults.md)

# 예: 스태거 윈도우
<a name="examples-window-stagger"></a>

키와 일치하는 데이터가 도착할 때 윈도우 모드 쿼리가 고유 파티션 키에 대한 개별 윈도우를 처리하기 시작한다면, 이 윈도우는 *스태거 윈도우*라고 부릅니다. 자세한 설명은 [스태거 윈도우](stagger-window-concepts.md) 섹션을 참조하십시오. 이 Amazon Kinesis Data Analytics 예는 EVENT\$1TIME 열과 TICKER 열을 사용하여 스태거 윈도우를 생성합니다. 스소 스트림은 1분 이내에 도착하지만 분 값이 다를 수도 있는(예: `18:41:xx`) 같은 EVENT\$1TIME 및 TICKER 값을 가진 레코드 6개 그룹으로 구성됩니다.

이 예에서는 다음 레코드를 다음 시간에 Kinesis 데이터 스트림에 기록합니다. 이 스크립트는 시간을 스트림에 쓰지 않지만 애플리케이션에서 레코드를 수집하는 시간은 `ROWTIME` 필드에 기록됩니다.

```
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:30
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:40
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:50
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:00
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:10
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:21
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:31
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:41
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:51
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:01
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:11
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:21
...
```



그런 다음 Kinesis 데이터 스트림을 스트리밍 소스로 AWS Management Console사용하여에서 Kinesis Data Analytics 애플리케이션을 생성합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 두 개의 열(`EVENT_TIME` 및 `TICKER`)로 애플리케이션 내 스키마를 유추합니다.

![\[가격 및 티커 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_stagger_schema.png)


`COUNT` 함수가 포함된 애플리케이션 코드를 사용하여 데이터의 윈도우 모드 집계를 생성합니다. 그런 다음 아래 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_stagger.png)


다음 절차에서는 EVENT\$1TIME 및 TICKER를 기반으로 스태거 윈도우의 입력 스트림에서 값을 집계하는 Kinesis Data Analytics 애플리케이션을 생성합니다.

**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-stagger-window-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-stagger-window-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-stagger-window-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Create Kinesis stream(Kinesis 스트림 생성)**을 선택한 다음 샤드가 하나 있는 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 프로덕션 환경에서 Kinesis 데이터 스트림에 레코드를 기록하려면 [Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) 또는 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)를 사용하는 것이 좋습니다. 이 예에서는 간단한 설명을 위해 다음 Python 스크립트를 사용하여 레코드를 생성합니다. 코드를 실행하여 샘플 티커 레코드를 채웁니다. 이 단순 코드는 같은 무작위 `EVENT_TIME`과 티커 기호를 이용하는 레코드 6개 집단을 스트림에 1분 이상 계속 작성합니다. 이후 단계에서 애플리케이션 스키마를 생성할 수 있도록 스크립트를 실행 중 상태로 유지합니다.

   ```
    
   import datetime
   import json
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
       return {
           "EVENT_TIME": event_time.isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           # Send six records, ten seconds apart, with the same event time and ticker
           for _ in range(6):
               print(data)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(data),
                   PartitionKey="partitionkey",
               )
               time.sleep(10)
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-stagger-window-2"></a>

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Connect streaming data(스트리밍 데이터 연결)**를 선택하여 소스에 연결합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 두 개의 열이 있습니다.

   1. **Edit Schema(스키마 편집)**를 선택합니다. **EVENT\$1TIME** 열의 **열 유형**을 `TIMESTAMP`으로 변경합니다.

   1. [**Save schema and update stream samples**]를 선택합니다. 콘솔에서 스키마를 저장한 이후 **종료**를 선택합니다.

   1. [**Save and continue**]를 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          event_time TIMESTAMP,
          ticker_symbol    VARCHAR(4),
          ticker_count     INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM 
              EVENT_TIME, 
              TICKER,
              COUNT(TICKER) AS ticker_count
          FROM "SOURCE_SQL_STREAM_001"
          WINDOWED BY STAGGER (
                  PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
      ```

   1. [**Save and run SQL**]을 선택합니다.

      **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: ROWTIME을 사용하는 텀블링 윈도우
<a name="examples-window-tumbling-rowtime"></a>

윈도우 모드 쿼리가 비중첩 방식으로 각 윈도우를 처리하는 경우 이 윈도우를 *텀블링 윈도우*라고 합니다. 자세한 설명은 [텀블링 윈도우(그룹별 집계)](tumbling-window-concepts.md) 섹션을 참조하십시오. 이 Amazon Kinesis Data Analytics 예는 `ROWTIME` 열을 사용하여 텀플링 윈도우를 생성합니다. `ROWTIME` 열은 시간을 나타내며 애플리케이션이 레코드를 읽었습니다.

이 예에서는 다음 레코드를 Kinesis 데이터 스트림에 기록합니다.

```
{"TICKER": "TBV", "PRICE": 33.11}
{"TICKER": "INTC", "PRICE": 62.04}
{"TICKER": "MSFT", "PRICE": 40.97}
{"TICKER": "AMZN", "PRICE": 27.9}
...
```



그런 다음 Kinesis 데이터 스트림을 스트리밍 소스로 AWS Management Console사용하여에서 Kinesis Data Analytics 애플리케이션을 생성합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 두 개의 열(`TICKER` 및 `PRICE`)로 애플리케이션 내 스키마를 유추합니다.

![\[가격 및 티커 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime_schema.png)


`MIN` 및 `MAX` 함수가 포함된 애플리케이션 코드를 사용하여 데이터의 윈도우 모드 집계를 생성합니다. 그런 다음 아래 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime.png)


다음 절차에서는 ROWTIME을 기반으로 텀플링 윈도우의 입력 스트림에서 값을 집계하는 Kinesis Data Analytics 애플리케이션을 생성합니다.

**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-tumbling-window-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-tumbling-window-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-tumbling-window-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Create Kinesis stream(Kinesis 스트림 생성)**을 선택한 다음 샤드가 하나 있는 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 프로덕션 환경에서 Kinesis 데이터 스트림에 레코드를 기록하려면 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) 또는 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)를 사용하는 것이 좋습니다. 이 예에서는 간단한 설명을 위해 다음 Python 스크립트를 사용하여 레코드를 생성합니다. 코드를 실행하여 샘플 티커 레코드를 채웁니다. 이 단순한 코드는 임의의 티커 레코드를 스트림에 연속적으로 씁니다. 이후 단계에서 애플리케이션 스키마를 생성할 수 있도록 스크립트를 실행 중 상태로 유지합니다.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-tumbling-window-2"></a>

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Connect streaming data(스트리밍 데이터 연결)**를 선택하여 소스에 연결합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 두 개의 열이 있습니다.

   1. [**Save schema and update stream samples**]를 선택합니다. 콘솔에서 스키마를 저장한 이후 **종료**를 선택합니다.

   1. [**Save and continue**]를 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE)
              FROM "SOURCE_SQL_STREAM_001"
              GROUP BY TICKER, 
                  STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
      ```

   1. [**Save and run SQL**]을 선택합니다.

      **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: 이벤트 타임스탬프를 사용하는 텀블링 윈도우
<a name="examples-window-tumbling-event"></a>

윈도우 모드 쿼리가 비중첩 방식으로 각 윈도우를 처리하는 경우 이 윈도우를 *텀블링 윈도우*라고 합니다. 자세한 설명은 [텀블링 윈도우(그룹별 집계)](tumbling-window-concepts.md) 섹션을 참조하십시오. 이 Amazon Kinesis Data Analytics 예에서는 스트리밍 데이터에 포함된 사용자 생성 타임스탬프인 이벤트 타임스탬프를 사용하는 텀블링 윈도우를 보여줍니다. 여기서는 애플리케이션이 레코드를 수신할 때 Kinesis Data Analytics가 생성하는 타임스탬프인 ROWTIME를 사용하는 대신에 이 접근 방식을 사용합니다. 이벤트가 애플리케이션에 수신된 시간 대신에 이벤트가 발생한 시간을 기반으로 집계를 생성하려는 경우 스트리밍 데이터에서 이벤트 타임스탬프를 사용합니다. 이 예에서는 `ROWTIME` 값이 1분마다 집계를 트리거하며 레코드가 `ROWTIME` 및 포함된 이벤트 시간별로 모두 집계됩니다.

이 예에서는 다음 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다. 이벤트가 발생한 시간부터 레코드가 Kinesis Data Analytics에 수집된 시간까지 지연을 생성할 수 있는 처리 및 전송 지연을 시뮬레이션하기 위해 이전에 `EVENT_TIME` 값이 5초로 설정되어 있습니다.

```
{"EVENT_TIME": "2018-06-13T14:11:05.766191", "TICKER": "TBV", "PRICE": 43.65}
{"EVENT_TIME": "2018-06-13T14:11:05.848967", "TICKER": "AMZN", "PRICE": 35.61}
{"EVENT_TIME": "2018-06-13T14:11:05.931871", "TICKER": "MSFT", "PRICE": 73.48}
{"EVENT_TIME": "2018-06-13T14:11:06.014845", "TICKER": "AMZN", "PRICE": 18.64}
...
```



그런 다음 Kinesis 데이터 스트림을 스트리밍 소스로 AWS Management Console사용하여에서 Kinesis Data Analytics 애플리케이션을 생성합니다. 검색 프로세스는 스트리밍 소스에서 샘플 레코드를 읽고 다음과 같이 세 개의 열(`EVENT_TIME`, `TICKER` 및 `PRICE`)로 애플리케이션 내 스키마를 유추합니다.

![\[이벤트 시간, 티커 및 가격 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_tumbling_event_schema.png)


`MIN` 및 `MAX` 함수가 포함된 애플리케이션 코드를 사용하여 데이터의 윈도우 모드 집계를 생성합니다. 그런 다음 아래 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_tumbling_event.png)


다음 절차에서는 이벤트 시간을 기반으로 텀플링 윈도우의 입력 스트림에서 값을 집계하는 Kinesis Data Analytics 애플리케이션을 생성합니다.

**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-window-tumbling-event-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-window-tumbling-event-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-window-tumbling-event-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Create Kinesis stream(Kinesis 스트림 생성)**을 선택한 다음 샤드가 하나 있는 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 프로덕션 환경에서 Kinesis 데이터 스트림에 레코드를 기록하려면 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) 또는 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)를 사용하는 것이 좋습니다. 이 예에서는 간단한 설명을 위해 다음 Python 스크립트를 사용하여 레코드를 생성합니다. 코드를 실행하여 샘플 티커 레코드를 채웁니다. 이 단순한 코드는 임의의 티커 레코드를 스트림에 연속적으로 씁니다. 이후 단계에서 애플리케이션 스키마를 생성할 수 있도록 스크립트를 실행 중 상태로 유지합니다.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-window-tumbling-event-2"></a>

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Connect streaming data(스트리밍 데이터 연결)**를 선택하여 소스에 연결합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 세 개의 열이 있습니다.

   1. **Edit Schema(스키마 편집)**를 선택합니다. **EVENT\$1TIME** 열의 **열 유형**을 `TIMESTAMP`으로 변경합니다.

   1. [**Save schema and update stream samples**]를 선택합니다. 콘솔에서 스키마를 저장한 이후 **종료**를 선택합니다.

   1. [**Save and continue**]를 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME timestamp, TICKER VARCHAR(4), min_price REAL, max_price REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND),
              TICKER,
               MIN(PRICE) AS MIN_PRICE,
               MAX(PRICE) AS MAX_PRICE
          FROM    "SOURCE_SQL_STREAM_001"
          GROUP BY TICKER, 
                   STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), 
                   STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND);
      ```

   1. [**Save and run SQL**]을 선택합니다.

      **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: 가장 자주 발생하는 값 검색(TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="examples-window-topkitems"></a>

이 Amazon Kinesis Data Analytics 예에서는 `TOP_K_ITEMS_TUMBLING` 함수를 사용하여 텀블링 윈도우에서 가장 자주 발생하는 값을 검색하는 방법을 보여줍니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [`TOP_K_ITEMS_TUMBLING` 함수](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/top-k.html)를 참조하십시오.

`TOP_K_ITEMS_TUMBLING` 함수는 수만 또는 수십만 개 이상의 키를 집계할 때 리소스 사용을 줄이려는 경우에 유용합니다. 이 함수는 `GROUP BY` 및 `ORDER BY` 절을 사용하여 집계하는 것과 동일한 결과를 생성합니다.

이 예에서는 다음 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다: 

```
{"TICKER": "TBV"}
{"TICKER": "INTC"}
{"TICKER": "MSFT"}
{"TICKER": "AMZN"}
...
```



그런 다음 Kinesis 데이터 스트림을 스트리밍 소스로 AWS Management Console사용하여에서 Kinesis Data Analytics 애플리케이션을 생성합니다. 검색 프로세스는 스트리밍 소스에서 샘플 레코드를 읽고 다음과 같이 하나의 열(`TICKER`)을 사용하여 애플리케이션 내 스키마를 유추합니다.

![\[티커 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_topk_schema.png)


`TOP_K_VALUES_TUMBLING` 함수가 포함된 애플리케이션 코드를 사용하여 데이터의 윈도우 모드 집계를 생성합니다. 그런 다음 아래 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_topk.png)


다음 절차에서는 입력 스트림에서 가장 자주 발생하는 값을 검색하는 Kinesis Data Analytics 애플리케이션을 생성합니다.

**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-window-topkitems-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-window-topkitems-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-window-topkitems-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Create Kinesis stream(Kinesis 스트림 생성)**을 선택한 다음 샤드가 하나 있는 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 프로덕션 환경에서 Kinesis 데이터 스트림에 레코드를 기록하려면 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) 또는 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)를 사용하는 것이 좋습니다. 이 예에서는 간단한 설명을 위해 다음 Python 스크립트를 사용하여 레코드를 생성합니다. 코드를 실행하여 샘플 티커 레코드를 채웁니다. 이 단순한 코드는 임의의 티커 레코드를 스트림에 연속적으로 씁니다. 이후 단계에서 애플리케이션 스키마를 생성할 수 있도록 스크립트를 실행 중 상태로 둡니다.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-window-topkitems-2"></a>

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Connect streaming data(스트리밍 데이터 연결)**를 선택하여 소스에 연결합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 열이 한 개 있습니다.

   1. [**Save schema and update stream samples**]를 선택합니다. 콘솔에서 스키마를 저장한 이후 **종료**를 선택합니다.

   1. [**Save and continue**]를 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다:

      ```
      CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
        "TICKER" VARCHAR(4), 
        "MOST_FREQUENT_VALUES" BIGINT
      );
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
          INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM * 
              FROM TABLE (TOP_K_ITEMS_TUMBLING(
                  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
                  'TICKER',         -- name of column in single quotes
                  5,                       -- number of the most frequently occurring values
                  60                       -- tumbling window size in seconds
                  )
              );
      ```

   1. [**Save and run SQL**]을 선택합니다.

      **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: 쿼리에서 부분적 결과 집계
<a name="examples-window-partialresults"></a>

Amazon Kinesis 데이터 스트림에 처리 시간이 정확히 일치하지 않는 이벤트 시간이 있는 레코드가 포함되어 있는 경우, 텀블링 윈도의 결과 선택에는 창에 도착했다고 되어 잇지만 실제로는 그렇지 아니한 레코드가 포함됩니다. 이 경우 텀블링 윈도우에는 원하는 결과의 일부분만 포함됩니다. 이 문제를 해결하려면 다음과 같은 여러 가지 접근 방식을 사용할 수 있습니다.
+ 텀블링 윈도우만 사용하고, upsert를 사용하여 데이터베이스 또는 데이터 웨어하우스를 통한 사후 처리에서 부분적 결과를 집계합니다. 이 접근 방법은 애플리케이션을 처리하는 데 효율적입니다. 집계 연산자(`sum`, `min`, `max` 등)에 대해 지연 데이터를 무기한 처리합니다. 이 접근 방식의 단점은 데이터베이스 계층에서 추가 애플리케이션 로직을 개발하고 유지 관리해야 한다는 것입니다.
+ 부분적 결과를 조기에 생성하지만 슬라이딩 윈도우 기간 동안 전체 결과를 계속 생성하는 텀블링 및 슬라이딩 윈도우를 사용합니다. 이 접근 방식은 데이터베이스 계층에서 추가 애플리케이션 로직을 추가할 필요가 없도록 upsert 대신 덮어쓰기를 사용하여 만기 데이터를 처리합니다. 이 접근 방식의 단점은 이것이 더 많은 Kinesis 처리 단위(KPU)를 사용하지만 여전히 두 개의 결과를 생성하므로 일부 사용 사례에는 효과적이지 않을 수 있다는 것입니다.

텀블링 및 슬라이딩 윈도우에 대한 자세한 설명은 [윈도우 모드 쿼리](windowed-sql.md) 섹션을 참조하십시오.

다음 절차에서는 텀블링 윈도우 집계가 최종 결과를 생성하기 위해 결합해야 하는 두 개의 부분적 결과(`CALC_COUNT_SQL_STREAM` 애플리케이션 내 스트림으로 전송됨)를 생성합니다. 그런 다음 애플리케이션은 두 개의 부분적 결과를 결합하는 두 번째 집계(`DESTINATION_SQL_STREAM` 애플리케이션 내 스트림으로 전송됨)를 생성합니다.

**이벤트 시간을 사용하여 부분적 결과를 집계하는 애플리케이션을 생성하려면**

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Analytics(데이터 분석)**를 선택합니다. 교재 [Amazon Kinesis Data Analytics for SQL 애플리케이션 시작하기](getting-started.md)의 설명에 따라 Kinesis Data Analytics 애플리케이션을 생성합니다.

1. SQL 편집기에서 애플리케이션 코드를 다음으로 바꿉니다.

   ```
   CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);
   	            
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);            
   	
   CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
       INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER_SYMBOL",
           STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
           COUNT(*) AS "TickerCount"
       FROM "SOURCE_SQL_STREAM_001"
       GROUP BY
           STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE),
           STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
           TICKER_SYMBOL;
   
   CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER",
           "TRADETIME",
           SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
       FROM "CALC_COUNT_SQL_STREAM"
       WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);
   ```

   애플리케이션 코드의 `SELECT`문이 `SOURCE_SQL_STREAM_001`에서 주가 변동이 1%보다 큰 행을 필터링하여 펌프를 사용하여 또 다른 애플리케이션 내 스트림 `CHANGE_STREAM`에 삽입합니다.

1. [**Save and run SQL**]을 선택합니다.

첫 번째 펌프는 다음과 비슷한 `CALC_COUNT_SQL_STREAM`에 스트림을 출력합니다. 결과 집합은 불완전합니다.

![\[부분적 결과를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_partial_0.png)


그런 다음 두 번째 펌프는 전체 결과 집합이 포함된 `DESTINATION_SQL_STREAM`에 스트림을 출력합니다.

![\[전체 결과를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_partial_1.png)
