

신중한 고려 끝에 Amazon Kinesis Data Analytics for SQL 애플리케이션을 중단하기로 결정했습니다.

1. **2025년 9월 1**일부터 Amazon Kinesis Data Analytics for SQL 애플리케이션에 대한 버그 수정은 제공되지 않습니다. 곧 중단될 예정이므로 지원이 제한될 예정이기 때문입니다.

2. **2025년 10월 15**일부터 새 Kinesis Data Analytics for SQL 애플리케이션을 생성할 수 없습니다.

3. **2026년 1월 27**일부터 애플리케이션이 삭제됩니다. Amazon Kinesis Data Analytics for SQL 애플리케이션을 시작하거나 작동할 수 없게 됩니다. 그 시점부터 Amazon Kinesis Data Analytics for SQL에 대한 지원을 더 이상 이용할 수 없습니다. 자세한 내용은 [Amazon Kinesis Data Analytics for SQL 애플리케이션 단종](discontinuation.md) 단원을 참조하십시오.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Kinesis Data Analytics for SQL 예
<a name="examples"></a>

이 섹션은 Amazon Kinesis Data Analytics에서 애플리케이션 생성 및 작동 예를 제공합니다. 여기에는 Kinesis Data Analytics 애플리케이션을 생성하고 그 결과를 테스트하는 데 도움이 되는 예 코드와 단계별 지침이 포함되어 있습니다.

 이 예를 살펴보기 전에 먼저 [Amazon Kinesis Data Analytics for SQL 애플리케이션: 작동 방식](how-it-works.md) 및 [Amazon Kinesis Data Analytics for SQL 애플리케이션 시작하기](getting-started.md) 섹션을 검토하는 것이 좋습니다.

**Topics**
+ [예: 데이터 변환](examples-transforming.md)
+ [예: 윈도우 및 집계](examples-window.md)
+ [예: 조인](examples-joins.md)
+ [예: 기계 학습](examples-machine.md)
+ [예: 알림 및 오류](examples-alerts.md)
+ [예: 솔루션 액셀러레이터](examples_solution.md)

# 예: 데이터 변환
<a name="examples-transforming"></a>

애플리케이션 코드가 Amazon Kinesis Data Analytics에서 분석을 수행하기 전에 수신 레코드를 재처리해야 하는 경우가 있습니다. 이러한 상황은 다양한 이유로 발생할 수 있는데, 레코드가 지원 레코드 형식에 부합하지 않아 애플리케이션 내 입력 스트림에서 열이 비정규화되는 경우가 한 예입니다.

이 섹션에서는 가용한 문자열 함수를 사용하여 데이터를 정규화하는 방법, 문자열에서 필요한 정보를 추출하는 방법 등에 관한 예를 제공합니다. 유용한 날짜 시간 함수에 대해서도 이 섹션에서 소개합니다.

## Lambda를 통한 스트림 전처리
<a name="examples-transforming-lambda"></a>

를 사용한 스트림 사전 처리에 대한 자세한 내용은 섹션을 AWS Lambda참조하세요[Lambda 함수를 사용하여 데이터 사전 처리](lambda-preprocessing.md).

**Topics**
+ [Lambda를 통한 스트림 전처리](#examples-transforming-lambda)
+ [예: 문자열 값 변환](examples-transforming-strings.md)
+ [예: DateTime 값 변환](app-string-datetime-manipulation.md)
+ [예: 복수의 데이터 유형 변환](app-tworecordtypes.md)

# 예: 문자열 값 변환
<a name="examples-transforming-strings"></a>

Amazon Kinesis Data Analytics는 스트리밍 소스 상의 레코드에 대해 JSON 및 CSV와 같은 형식을 지원합니다. 자세한 설명은 [RecordFormat](API_RecordFormat.md) 섹션을 참조하십시오. 그런 다음 이들 레코드를 입력 구성에 따라 애플리케이션 내 스트림에 있는 행에 매핑합니다. 자세한 설명은 [애플리케이션 입력 구성](how-it-works-input.md) 섹션을 참조하십시오. 입력 구성에서는 스트리밍 소스에 있는 레코드 필드를 애플리케이션 내 스트림에 있는 열로 매핑하는 방식을 지정합니다.

이 매핑은 스트리밍 소스 상의 레코드가 지원 형식에 따를 때 작동하며, 애플리케이션 내 스트림이 정규화된 데이터로 채워집니다. 그러나 스트리밍 소스상의 데이터가 지원 표준에 부합하지 않는 경우에는 어떻게 되겠습니까? 예를 들어, 스트리밍 소스에 클릭스트림 데이터, IoT 센서 및 애플리케이션 로그와 같은 데이터가 포함되어 있다면 어떻게 되겠습니까?

아래 예를 참조하십시오:
+ 스트리밍 소스에 애플리케이션 로그가 포함된 경우 – 애플리케이션 로그는 표준 Apache 로그 형식에 따르고 JSON 형식을 사용하여 스트림에 기록됩니다.

  ```
  {
     "Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pb.gif HTTP/1.1" 304 0"
  }
  ```

  표준 Apache 로그 형식에 관한 자세한 정보는 Apache 웹 사이트의 [Log Files](https://httpd.apache.org/docs/2.4/logs.html)를 참조하십시오.

   
+ 스트리밍 소스에 반정형 데이터가 포함된 경우 – 다음 예에 두 가지 레코드가 소개되어 있습니다. `Col_E_Unstructured` 필드 값은 일련의 CSV(쉼표로 분리된 값)입니다. 5개 열 가운데 처음 4개는 문자열 유형의 값이고 마지막 열에는 CSV가 포함됩니다.

  ```
  { "Col_A" : "string",
    "Col_B" : "string",
    "Col_C" : "string",
    "Col_D" : "string",
    "Col_E_Unstructured" : "value,value,value,value"}
  
  { "Col_A" : "string",
    "Col_B" : "string",
    "Col_C" : "string",
    "Col_D" : "string",
    "Col_E_Unstructured" : "value,value,value,value"}
  ```
+ 스트리밍 소스의 레코드에는 URL이 포함되는데 분석을 위해서는 URL 도메인 명칭의 일부가 필요합니다.

  ```
  { "referrer" : "http://www.amazon.com"}
  { "referrer" : "http://www.stackoverflow.com" }
  ```

그와 같은 경우, 정규화된 데이터를 포함하는 애플리케이션 내 스트림 생성을 위해서는 일반적으로 다음과 같은 2단계의 프로세스가 요구됩니다.

1. 비정형 필드를 생성되는 애플리케이션 내 입력 스트림의 `VARCHAR(N)` 유형 열에 매핑하도록 애플리케이션을 구성합니다.

1. 애플리케이션 코드에서 문자열 함수를 사용하여 이 단일 열을 복수 열로 분할한 다음 행을 또 다른 애플리케이션 내 스트림에 저장합니다. 애플리케이션 코드가 생성하는 이 애플리케이션 내 스트림은 정규화된 데이터를 가지게 됩니다. 그런 다음 애플리케이션 내 스트림에 있는 데이터에 대해 분석을 수행할 수 있습니다.

Amazon Kinesis Data Analytics는 문자열 열 작업을 수행할 수 있도록 다음과 같은 문자열 연산, 표준 SQL 함수, SQL 표준 확장을 제공합니다: 
+ **문자열 연산자** – `LIKE`와`SIMILAR` 같은 연산자는 문자열 비교에 유용합니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [문자열 연산자](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-string-operators.html)를 참조하십시오.
+ **SQL 함수** – 다음 함수는 개별 문자열 조작에 유용합니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [문자열 및 검색 함수](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-string-and-search-functions.html)를 참조하십시오.
  + `CHAR_LENGTH` – 문자열의 길이를 정합니다.
  + `INITCAP` – 입력 문자열에서 공백으로 구분된 각 단어의 첫 번째 문자가 대문자로 변환되고 나머지 문자는 모두 소문자로 변환된 버전을 반환합니다.
  + `LOWER/UPPER` – 문자열을 대문자 또는 소문자로 변환합니다.
  + `OVERLAY` – 첫 번째 문자열(원본 문자열) 인수 중 일부를 두 번째 문자열(대체 문자열) 인수로 대체합니다.
  + `POSITION` – 다른 문자열 내에서 문자열을 검색합니다.
  + `REGEX_REPLACE` – 하위 문자열을 다른 하위 문자열로 대체합니다.
  + `SUBSTRING` – 특정 위치에서 시작하는 소스 문자열의 일부를 추출합니다.
  + `TRIM` – 소스 문자열의 시작 또는 끝부분에서 지정된 문자의 인스턴스를 제거합니다.
+ **SQL 확장** – 로그 및 URI와 같은 비정형 문자열 작업 수행에 유용합니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [로그 파싱 함수](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-pattern-matching-functions.html)를 참조하십시오.
  + `FAST_REGEX_LOG_PARSER` – regex parser와 비슷하게 작동하지만, 보다 빠른 결과를 얻기 위해 몇 가지 경로를 단축합니다. 예를 들어, 빠른 regex parser는 첫 번째 일치에서 중지합니다(*지연 시맨틱*).
  + `FIXED_COLUMN_LOG_PARSE` – 고정 너비 필드를 구문 분석하고 지정된 SQL 유형으로 자동으로 변환합니다.
  + `REGEX_LOG_PARSE` – 기본적인 Java 정규식 패턴을 기준으로 문자열 구문 분석을 수행합니다.
  + `SYS_LOG_PARSE` – UNIX/Linux 시스템 로그에서 흔히 발견되는 항목을 처리합니다.
  + `VARIABLE_COLUMN_LOG_PARSE` – 입력 문자열을 구분 기호 문자 또는 문자열에 의해 분리되는 필드로 분할합니다.
  + `W3C_LOG_PARSE` – Apache 로그의 신속한 포맷을 위해 사용할 수 있습니다.

이들 함수의 예는 다음 주제를 참조하십시오:

**Topics**
+ [예: 문자열의 일부분 추출(SUBSTRING 함수)](examples-transforming-strings-substring.md)
+ [예: Regex(REGEX\$1REPLACE 함수)를 사용하여 하위 문자열 대체](examples-transforming-strings-regexreplace.md)
+ [예: 정규식(REGEX\$1LOG\$1PARSE 함수) 기반 로그 문자열 구문 분석](examples-transforming-strings-regexlogparse.md)
+ [예: 웹 로그 구문 분석(W3C\$1LOG\$1PARSE 함수)](examples-transforming-strings-w3clogparse.md)
+ [예: 문자열을 복수의 필드로 분할(VARIABLE\$1COLUMN\$1LOG\$1PARSE 함수)](examples-transforming-strings-variablecolumnlogparse.md)

# 예: 문자열의 일부분 추출(SUBSTRING 함수)
<a name="examples-transforming-strings-substring"></a>

이 예는 `SUBSTRING` 함수를 사용하여 Amazon Kinesis Data Analytics의 문자열을 변환합니다. `SUBSTRING` 함수는 특정 위치에서 시작하는 소스 문자열의 일부를 추출합니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [SUBSTRING](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-substring.html)을 참조하십시오.

이 예에서는 다음 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다.

```
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
```



그런 다음 콘솔에서 Kinesis Data Analytics 애플리케이션을 생성하고 Kinesis 데이터 스트림을 스트리밍 소스로 사용합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 한 열(`REFERRER`)로 애플리케이션 내 스키마를 유추합니다.

![\[REFERRER 열에 있는 URL 목록이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/referrer-10.png)


그런 다음 `SUBSTRING` 함수를 지닌 애플리케이션 코드를 사용하여 URL 문자열 구문 분석을 수행하고 회사 명칭을 검색합니다. 그러면 다음과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[애플리케이션 내 스트림의 결과 데이터가 포함된 실시간 분석 탭을 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/referrer-20.png)


**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-transforming-strings-substring-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-transforming-strings-substring-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-transforming-strings-substring-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 로그 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Kinesis 스트림 생성**을 선택한 다음 샤드가 하나인 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 다음의 Python 코드를 실행하여 샘플 로그 레코드를 채웁니다. 이 단순한 코드는 동일한 로그 레코드를 스트림에 연속적으로 씁니다.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"REFERRER": "http://www.amazon.com"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-transforming-strings-substring-2"></a>

이후 다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다.

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **스트리밍 데이터 연결**을 선택합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. IAM 역할 생성 옵션을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마는 한 열만 지닙니다.

   1. [**Save and continue**]를 선택합니다.

   

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      -- CREATE OR REPLACE STREAM for cleaned up referrer
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          "ingest_time" TIMESTAMP,
          "referrer" VARCHAR(32));
          
      CREATE OR REPLACE PUMP "myPUMP" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM 
               "APPROXIMATE_ARRIVAL_TIME", 
               SUBSTRING("referrer", 12, (POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4)) 
            FROM "SOURCE_SQL_STREAM_001";
      ```

   1. [**Save and run SQL**]을 선택합니다. **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: Regex(REGEX\$1REPLACE 함수)를 사용하여 하위 문자열 대체
<a name="examples-transforming-strings-regexreplace"></a>

이 예는 `REGEX_REPLACE` 함수를 사용하여 Amazon Kinesis Data Analytics의 문자열을 변환합니다. `REGEX_REPLACE`는 하위 문자열을 대체 하위 문자열로 교체합니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [REGEX\$1REPLACE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-regex-replace.html)를 참조하십시오.

이 예에서는 다음 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다: 

```
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
```



그런 다음 콘솔에서 Kinesis Data Analytics 애플리케이션을 생성하고 Kinesis 데이터 스트림을 스트리밍 소스로 취합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 한 열(REFERRER)로 애플리케이션 내 스키마를 유추합니다.

![\[REFERRER 열에 있는 URL 목록이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/referrer-10.png)


그런 다음 `REGEX_REPLACE` 함수를 지닌 애플리케이션 코드를 사용하여 URL을 변환하고 `http://` 대신 `https://`를 사용합니다. 다음과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[ROWTIME, ingest_time 및 referrer 열이 포함된 결과 데이터 표를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_regex_replace.png)


**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-transforming-strings-regexreplace-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-transforming-strings-regexreplace-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-transforming-strings-regexreplace-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 로그 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Kinesis 스트림 생성**을 선택한 다음 샤드가 하나인 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 다음의 Python 코드를 실행하여 샘플 로그 레코드를 채웁니다. 이 단순한 코드는 동일한 로그 레코드를 스트림에 연속적으로 씁니다.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"REFERRER": "http://www.amazon.com"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-transforming-strings-regexreplace-2"></a>

이후 다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다.

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **스트리밍 데이터 연결**을 선택합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. IAM 역할 생성 옵션을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마는 한 열만 지닙니다.

   1. [**Save and continue**]를 선택합니다.

   

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      -- CREATE OR REPLACE STREAM for cleaned up referrer
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          "ingest_time" TIMESTAMP,
          "referrer" VARCHAR(32));
          
      CREATE OR REPLACE PUMP "myPUMP" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM 
               "APPROXIMATE_ARRIVAL_TIME", 
               REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0)
            FROM "SOURCE_SQL_STREAM_001";
      ```

   1. [**Save and run SQL**]을 선택합니다. **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: 정규식(REGEX\$1LOG\$1PARSE 함수) 기반 로그 문자열 구문 분석
<a name="examples-transforming-strings-regexlogparse"></a>

이 예는 `REGEX_LOG_PARSE` 함수를 사용하여 Amazon Kinesis Data Analytics의 문자열을 변환합니다. `REGEX_LOG_PARSE`는 기본 Java 정규식 패턴을 기준으로 문자열을 파싱합니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [REGEX\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-regex-log-parse.html)를 참조하십시오.

이 예에서는 다음 레코드를Amazon Kinesis 스트림에 기록합니다: 

```
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
...
```



그런 다음 콘솔에서 Kinesis Data Analytics 애플리케이션을 생성하고 Kinesis 데이터 스트림을 스트리밍 소스로 취합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 한 열(LOGENTRY)로 애플리케이션 내 스키마를 유추합니다.

![\[LOGENTRY 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_regex_log_parse_0.png)


그런 다음 `REGEX_LOG_PARSE` 함수를 지닌 애플리케이션 코드를 사용하여 로그 문자열 구문 분석을 수행하고 데이터 요소를 검색합니다. 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[ROWTIME, LOGENTRY, MATCH1 및 MATCH2 열이 포함된 결과 데이터 표를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_regex_log_parse_1.png)


**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-transforming-strings-regexlogparse-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-transforming-strings-regexlogparse-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-transforming-strings-regexlogparse-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 로그 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Kinesis 스트림 생성**을 선택한 다음 샤드가 하나인 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 다음의 Python 코드를 실행하여 샘플 로그 레코드를 채웁니다. 이 단순한 코드는 동일한 로그 레코드를 스트림에 연속적으로 씁니다.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] "
           '"GET /index.php HTTP/1.1" 200 125 "-" '
           '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"'
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-transforming-strings-regexlogparse-2"></a>

이후 다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다.

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. [**Create application**]을 선택하고 애플리케이션 명칭을 지정합니다.

1. 애플리케이션 세부 정보 페이지에서 **스트리밍 데이터 연결**을 선택합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. IAM 역할 생성 옵션을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마는 한 열만 지닙니다.

   1. [**Save and continue**]를 선택합니다.

   

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (logentry VARCHAR(24), match1 VARCHAR(24), match2 VARCHAR(24));
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM T.LOGENTRY, T.REC.COLUMN1, T.REC.COLUMN2
          FROM 
               (SELECT STREAM LOGENTRY,
                   REGEX_LOG_PARSE(LOGENTRY, '(\w.+) (\d.+) (\w.+) (\w.+)') AS REC
                   FROM SOURCE_SQL_STREAM_001) AS T;
      ```

   1. [**Save and run SQL**]을 선택합니다. **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: 웹 로그 구문 분석(W3C\$1LOG\$1PARSE 함수)
<a name="examples-transforming-strings-w3clogparse"></a>

이 예는 `W3C_LOG_PARSE` 함수를 사용하여 Amazon Kinesis Data Analytics의 문자열을 변환합니다. `W3C_LOG_PARSE`를 사용하여 Apache 로그의 형식을 빠르게 지정할 수 있습니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [W3C\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-w3c-log-parse.html)를 참조하십시오.

이 예에서는 로그 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다. 다음은 로그의 예입니다:

```
{"Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pba.gif HTTP/1.1" 304 0"}
{"Log":"192.168.254.30 - John [24/May/2004:22:01:03 -0700] "GET /icons/apache_pbb.gif HTTP/1.1" 304 0"}
{"Log":"192.168.254.30 - John [24/May/2004:22:01:04 -0700] "GET /icons/apache_pbc.gif HTTP/1.1" 304 0"}
...
```



그런 다음 콘솔에서 Kinesis Data Analytics 애플리케이션을 생성하고 Kinesis 데이터 스트림을 스트리밍 소스로 취합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 한 열(로그)로 애플리케이션 내 스키마를 유추합니다.

![\[로그 열이 포함된 애플리케이션 내 스키마가 포함된 형식이 있는 스트림 샘플 탭을 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/log-10.png)


그런 다음 `W3C_LOG_PARSE` 함수를 지닌 애플리케이션 코드를 사용하여 로그 구문 분석을 수행하고, 다음과 같이 별도의 열에 다양한 로그 필드로 또 다른 애플리케이션 내 스트림을 생성합니다.

![\[애플리케이션 내 스트림이 포함된 실시간 분석 탭을 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/log-20.png)


**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-transforming-strings-w3clogparse-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-transforming-strings-w3clogparse-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-transforming-strings-w3clogparse-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 로그 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Kinesis 스트림 생성**을 선택한 다음 샤드가 하나인 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 다음의 Python 코드를 실행하여 샘플 로그 레코드를 채웁니다. 이 단순한 코드는 동일한 로그 레코드를 스트림에 연속적으로 씁니다.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] "
           '"GET /icons/apache_pb.gif HTTP/1.1" 304 0'
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-transforming-strings-w3clogparse-2"></a>

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **스트리밍 데이터 연결**을 선택합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. IAM 역할 생성 옵션을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마는 한 열만 지닙니다.

   1. [**Save and continue**]를 선택합니다.

   

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
      column1 VARCHAR(16),
      column2 VARCHAR(16),
      column3 VARCHAR(16),
      column4 VARCHAR(16),
      column5 VARCHAR(16),
      column6 VARCHAR(16),
      column7 VARCHAR(16));
      
      CREATE OR REPLACE PUMP "myPUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
              SELECT STREAM
                  l.r.COLUMN1,
                  l.r.COLUMN2,
                  l.r.COLUMN3,
                  l.r.COLUMN4,
                  l.r.COLUMN5,
                  l.r.COLUMN6,
                  l.r.COLUMN7
              FROM (SELECT STREAM W3C_LOG_PARSE("log", 'COMMON')
                    FROM "SOURCE_SQL_STREAM_001") AS l(r);
      ```

   1. [**Save and run SQL**]을 선택합니다. **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: 문자열을 복수의 필드로 분할(VARIABLE\$1COLUMN\$1LOG\$1PARSE 함수)
<a name="examples-transforming-strings-variablecolumnlogparse"></a>

이 예에서는 `VARIABLE_COLUMN_LOG_PARSE` 함수를 이용해 Kinesis Data Analytics의 문자열을 조작합니다. `VARIABLE_COLUMN_LOG_PARSE`는 입력 문자열을 구분 기호 문자 또는 문자열에 의해 분리되는 필드로 분할합니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [VARIABLE\$1COLUMN\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-variable-column-log-parse.html)를 참조하십시오.

이 예에서는 반정형 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다. 예 레코드는 다음과 같습니다:

```
{ "Col_A" : "string",
  "Col_B" : "string",
  "Col_C" : "string",
  "Col_D_Unstructured" : "value,value,value,value"}
{ "Col_A" : "string",
  "Col_B" : "string",
  "Col_C" : "string",
  "Col_D_Unstructured" : "value,value,value,value"}
```



그런 다음 콘솔에서 Kinesis Data Analytics 애플리케이션을 생성하고 Kinesis 스트림을 스트리밍 소스로 사용합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 네 개의 열로 애플리케이션 내 스키마를 유추합니다.

![\[4개의 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/unstructured-10.png)


그런 다음 `VARIABLE_COLUMN_LOG_PARSE` 함수를 지닌 애플리케이션 코드를 사용하여 CSV 구문 분석을 수행하고, 다음과 같이 또 다른 애플리케이션 내 스트림에 정규화된 행을 삽입합니다.



![\[애플리케이션 내 스트림이 포함된 실시간 분석 탭을 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/unstructured-20.png)


**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-transforming-strings-variablecolumnlogparse-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-transforming-strings-variablecolumnlogparse-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-transforming-strings-variablecolumnlogparse-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 로그 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Kinesis 스트림 생성**을 선택한 다음 샤드가 하나인 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 다음의 Python 코드를 실행하여 샘플 로그 레코드를 채웁니다. 이 단순한 코드는 동일한 로그 레코드를 스트림에 연속적으로 씁니다.

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

   

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-transforming-strings-variablecolumnlogparse-2"></a>

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **스트리밍 데이터 연결**을 선택합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. IAM 역할 생성 옵션을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 참고로 유추된 스키마는 한 열만 지닙니다.

   1. [**Save and continue**]를 선택합니다.

   

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다:

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
                  "column_A" VARCHAR(16),
                  "column_B" VARCHAR(16),
                  "column_C" VARCHAR(16),
                  "COL_1" VARCHAR(16),             
                  "COL_2" VARCHAR(16),            
                  "COL_3" VARCHAR(16));
      
      CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM  t."Col_A", t."Col_B", t."Col_C",
                        t.r."COL_1", t.r."COL_2", t.r."COL_3"
         FROM (SELECT STREAM 
                 "Col_A", "Col_B", "Col_C",
                 VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
                                           'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)',
                                           ',') AS r 
               FROM "SOURCE_SQL_STREAM_001") as t;
      ```

   1. [**Save and run SQL**]을 선택합니다. **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: DateTime 값 변환
<a name="app-string-datetime-manipulation"></a>

Amazon Kinesis Data Analytics는 열을 타임스탬프로 변환하는 것을 지원합니다. 예를 들어 `GROUP BY` 절의 일부인 자체 타임스탬프를 `ROWTIME` 열에 더하여 또 다른 시간 기반 윈도우로 사용할 수 있습니다. Kinesis Data Analytics는 날짜 및 시간 필드 작업을 위한 작업 및 SQL 함수를 제공합니다.
+ **날짜 및 시간 연산자** – 날짜, 시간 및 간격 데이터 유형에 대한 산술 연산을 수행할 수 있습니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*의 [날짜, 타임스탬프 및 간격 연산자](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-timestamp-interval.html)를 참조하십시오.

   
+ **SQL 함수** – 여기에는 다음이 포함됩니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*의 [날짜 및 시간 함수](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html)를 참조하십시오.
  + `EXTRACT()` – 날짜, 시간, 타임스탬프 또는 간격 표현식에서 필드 하나를 추출합니다.
  + `CURRENT_TIME` – 쿼리가 실행될 때의 시간을 반환합니다(UTC).
  + `CURRENT_DATE` – 쿼리가 실행될 때의 날짜를 반환합니다(UTC).
  + `CURRENT_TIMESTAMP` – 쿼리가 실행될 때의 타임스탬프를 반환합니다(UTC).
  + `LOCALTIME` – Kinesis Data Analytics가 실행 중인 환경에서 정의된 대로 쿼리가 실행될 때의 현재 시간(UTC)을 반환합니다.
  + `LOCALTIMESTAMP` – Kinesis Data Analytics가 실행 중인 환경에 의해 정의되는 대로 현재 타임스탬프를 반환합니다(UTC).

     
+ **SQL 확장** – 여기에는 다음이 포함됩니다. 자세한 설명은 [Amazon Managed Service for Apache Flink SQL 참조](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html)에서 [날짜 및 시간 함수](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-datetime-conversion-functions.html)와 *날짜 시간 변환 함수*를 참조하십시오.
  + `CURRENT_ROW_TIMESTAMP` – 스트림 상의 각 행에 대해 새 타임스탬프를 반환합니다.
  + `TSDIFF` – 두 타임스탬프 간의 차이를 밀리초 단위로 반환합니다.
  + `CHAR_TO_DATE` – 문자열을 날짜로 변환합니다.
  + `CHAR_TO_TIME` – 문자열을 시간으로 변환합니다.
  + `CHAR_TO_TIMESTAMP` – 문자열을 타임스탬프로 변환합니다.
  + `DATE_TO_CHAR` – 날짜를 문자열로 변환합니다.
  + `TIME_TO_CHAR` – 시간을 문자열로 변환합니다.
  + `TIMESTAMP_TO_CHAR` – 타임스탬프를 문자열로 변환합니다.

위의 SQL 함수 중 대부분은 형식을 사용하여 열을 변환합니다. 형식은 유연합니다. 예를 들어, 형식 `yyyy-MM-dd hh:mm:ss`를 지정하여 입력 문자열 `2009-09-16 03:15:24`를 타임스탬프로 변환할 수 있습니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*의 [Char To Timestamp(Sys)](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-char-to-timestamp.html)를 참조하십시오.

## 예: 날짜 변환
<a name="examples-transforming-dates"></a>

이 예에서는 다음 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다.

```
{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"}
{"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"}
{"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"}
...
```



그런 다음 콘솔에서 Kinesis Data Analytics 애플리케이션을 생성하고 Kinesis 스트림을 스트리밍 소스로 취합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 두 개의 열(`EVENT_TIME` 및 `TICKER`)로 애플리케이션 내 스키마를 유추합니다.

![\[이벤트 시간 및 티커 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_datetime_convert_0.png)


그런 다음 SQL 함수를 지닌 애플리케이션 코드를 사용하여 `EVENT_TIME` 타임스탬프 필드를 다양한 방법으로 변환합니다. 그러면 다음 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_datetime_convert_1.png)




### 1단계: Kinesis 데이터 스트림 생성
<a name="examples-transforming-dates-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 이벤트 시간 및 티커 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Kinesis 스트림 생성**을 선택한 다음 샤드가 하나인 스트림을 생성합니다.

1. 다음의 Python 코드를 실행하여 스트림을 샘플 데이터로 채웁니다. 이 간단한 코드는 지속적으로 임의 티커 기호 및 현재 타임스탬프가 포함된 레코드를 스트림에 씁니다.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

### 2단계: Amazon Kinesis Data Analytics 애플리케이션 생성
<a name="examples-transforming-dates-2"></a>

다음과 같이 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Connect streaming data(스트리밍 데이터 연결)**를 선택하여 소스에 연결합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. IAM 역할 생성을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 두 개의 열이 있습니다.

   1. **Edit Schema(스키마 편집)**를 선택합니다. **EVENT\$1TIME** 열의 **열 유형**을 `TIMESTAMP`으로 변경합니다.

   1. [**Save schema and update stream samples**]를 선택합니다. 콘솔에서 스키마를 저장한 이후 **종료**를 선택합니다.

   1. [**Save and continue**]를 선택합니다.

   

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          TICKER VARCHAR(4), 
          event_time TIMESTAMP, 
          five_minutes_before TIMESTAMP, 
          event_unix_timestamp BIGINT,
          event_timestamp_as_char VARCHAR(50),
          event_second INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
      
      SELECT STREAM 
          TICKER, 
          EVENT_TIME,
          EVENT_TIME - INTERVAL '5' MINUTE,
          UNIX_TIMESTAMP(EVENT_TIME),
          TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
          EXTRACT(SECOND FROM EVENT_TIME) 
      FROM "SOURCE_SQL_STREAM_001"
      ```

   1. [**Save and run SQL**]을 선택합니다. **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: 복수의 데이터 유형 변환
<a name="app-tworecordtypes"></a>

 추출, 변환 및 로드(ETL) 애플리케이션의 공통적인 요건은 복수의 레코드 유형을 한 스트리밍 소스에서 처리하는 것입니다. Kinesis Data Analytics 애플리케이션을 생성하여 이러한 유형의 스트리밍 소스를 처리할 수 있습니다. 프로세스는 다음과 같습니다.

1. 먼저 스트리밍 소스를 다른 모든 Kinesis Data Analytics 애플리케이션과 유사한 애플리케이션 내 입력 스트림에 매핑합니다.

1. 그런 다음 애플리케이션 코드에서 SQL 문을 작성하여 애플리케이션 내 입력 스트림에서 특정 유형의 열을 가져옵니다. 그런 다음 이를 별도의 애플리케이션 내 스트림에 삽입합니다. (애플리케이션 코드에서 추가 애플리케이션 내 스트림을 생성할 수 있습니다.)

이 실습에서는 두 가지 유형의 레코드(`Order` 및 `Trade` 유형)를 수신하는 스트리밍 소스를 가집니다. 두 가지 유형은 주식 주문과 해당 거래입니다. 각 주문에 대해 거래는 0 이상이 될 수 있습니다. 각 유형의 예 레코드가 다음에 나와 있습니다.

[**Order record**]

```
{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}
```

[**Trade record**]

```
{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}
```

를 사용하여 애플리케이션을 생성하면 생성된 인애플리케이션 입력 스트림에 대해 다음과 같은 추론된 스키마가 AWS Management Console콘솔에 표시됩니다. 기본적으로 콘솔은 이 애플리케이션 내 스트림을 `SOURCE_SQL_STREAM_001`로 명명합니다.

![\[형식이 있는 애플리케이션 내 스트림 샘플을 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/two-record-types-10.png)


구성을 저장하면 Amazon Kinesis Data Analytics가 스트리밍 소스로부터 데이터를 연속적으로 읽고 애플리케이션 내 스트림에 행을 삽입합니다. 이제 애플리케이션 내 스트림에 있는 데이터에 대해 분석을 수행할 수 있습니다.

이 예의 애플리케이션 코드에서 먼저 두 가지 추가 애플리케이션 내 스트림(`Order_Stream` 및 `Trade_Stream`)을 생성합니다. 그런 다음 레코드 유형에 따라 `SOURCE_SQL_STREAM_001` 스트림으로부터 행을 필터링하고 펌프를 사용하여 새로 생성된 스트림에 삽입합니다. 이 코딩 패턴에 대한 정보는 [애플리케이션 코드](how-it-works-app-code.md) 섹션을 참조하십시오.

1. 주문 및 거래 행을 별도의 애플리케이션 내 스트림에 필터링합니다.

   1. `SOURCE_SQL_STREAM_001`에 있는 주문 레코드를 필터링하고 주문을 `Order_Stream`에 저장합니다.

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  order_id     integer, 
                  order_type   varchar(10),
                  ticker       varchar(4),
                  order_price  DOUBLE, 
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM oid, otype,oticker, oprice, recordtype 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Order';
      ```

   1. `SOURCE_SQL_STREAM_001`에 있는 거래 레코드를 필터링하고 주문을 `Trade_Stream`에 저장합니다.

      ```
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 (trade_id     integer, 
                  order_id     integer, 
                  trade_price  DOUBLE, 
                  ticker       varchar(4),
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM tid, toid, tprice, tticker, recordtype
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Trade';
      ```

1. 이제 이들 스트림에 대한 추가 분석을 수행할 수 있습니다. 이 예에서는 1분 시간 범위([텀블링 윈도우](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/tumbling-window-concepts.html))에 대해 티커별 거래 수를 집계하여 결과를 또 다른 스트림(`DESTINATION_SQL_STREAM`)에 저장합니다.

   ```
   --do some analytics on the Trade_Stream and Order_Stream. 
   -- To see results in console you must write to OPUT_SQL_STREAM.
   
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
               ticker  varchar(4),
               trade_count   integer
               );
   
   CREATE OR REPLACE PUMP "Output_Pump" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM ticker, count(*) as trade_count
         FROM   "Trade_Stream"
         GROUP BY ticker,
                   FLOOR("Trade_Stream".ROWTIME TO MINUTE);
   ```

   결과는 다음과 같을 것입니다.  
![\[SQL 결과 탭의 결과를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/two-record-types-20.png)

**Topics**
+ [1단계: 데이터 준비](tworecordtypes-prepare.md)
+ [2단계: 애플리케이션 만들기](tworecordtypes-create-app.md)

**다음 단계**  
[1단계: 데이터 준비](tworecordtypes-prepare.md)

# 1단계: 데이터 준비
<a name="tworecordtypes-prepare"></a>

이 섹션에서는 Kinesis 데이터 스트림을 생성한 다음 주문 및 거래 레코드를 스트림에 채워 넣습니다. 이는 다음 단계에서 생성할 애플리케이션의 스트리밍 소스입니다.

**Topics**
+ [1.1단계: 스트리밍 소스 생성](#tworecordtypes-prepare-create-stream)
+ [1.2단계: 스트리밍 소스 채우기](#tworecordtypes-prepare-populate-stream)

## 1.1단계: 스트리밍 소스 생성
<a name="tworecordtypes-prepare-create-stream"></a>

콘솔 또는 AWS CLI을 사용하여 Kinesis 데이터 스트림을 생성할 수 있습니다. 이 예에서는 `OrdersAndTradesStream`을 스트림 명칭으로 가정합니다.
+ **콘솔 사용** -에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다. [**Data Streams**]를 선택한 다음 샤드가 하나인 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.
+ **사용 AWS CLI** - 다음 Kinesis `create-stream` AWS CLI 명령을 사용하여 스트림을 생성합니다.

  ```
  $ aws kinesis create-stream \
  --stream-name OrdersAndTradesStream \
  --shard-count 1 \
  --region us-east-1 \
  --profile adminuser
  ```

## 1.2단계: 스트리밍 소스 채우기
<a name="tworecordtypes-prepare-populate-stream"></a>

다음의 Python 스크립트를 실행하여 샘플 레코드를 `OrdersAndTradesStream`에 채웁니다. 다른 명칭의 스트림을 생성한 경우 Python 코드를 적절히 업데이트합니다.

1. Python 및 `pip`를 설치합니다.

   Python 설치에 관한 정보는 [Python](https://www.python.org/) 웹사이트를 참조하십시오.

   pip를 사용하여 종속 프로그램을 설치할 수 있습니다. pip 설치에 관한 정보는 pip 웹 사이트에 있는 [Installation](https://pip.pypa.io/en/stable/installing/)을 참조하십시오.

1. 다음 Python 코드를 실행합니다. 코드에 있는 `put-record` 명령이 JSON 레코드를 스트림에 작성합니다.

   ```
    
   import json
   import random
   import boto3
   
   STREAM_NAME = "OrdersAndTradesStream"
   PARTITION_KEY = "partition_key"
   
   
   def get_order(order_id, ticker):
       return {
           "RecordType": "Order",
           "Oid": order_id,
           "Oticker": ticker,
           "Oprice": random.randint(500, 10000),
           "Otype": "Sell",
       }
   
   
   def get_trade(order_id, trade_id, ticker):
       return {
           "RecordType": "Trade",
           "Tid": trade_id,
           "Toid": order_id,
           "Tticker": ticker,
           "Tprice": random.randint(0, 3000),
       }
   
   
   def generate(stream_name, kinesis_client):
       order_id = 1
       while True:
           ticker = random.choice(["AAAA", "BBBB", "CCCC"])
           order = get_order(order_id, ticker)
           print(order)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY
           )
           for trade_id in range(1, random.randint(0, 6)):
               trade = get_trade(order_id, trade_id, ticker)
               print(trade)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(trade),
                   PartitionKey=PARTITION_KEY,
               )
           order_id += 1
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**다음 단계**  
 [2단계: 애플리케이션 만들기](tworecordtypes-create-app.md)

# 2단계: 애플리케이션 만들기
<a name="tworecordtypes-create-app"></a>

이 섹션에서는 Kinesis Data Analytics 애플리케이션을 생성합니다. 그런 다음 이전 섹션에서 생성한 스트리밍 소스를 애플리케이션 내 입력 스트림에 매핑하는 입력 구성을 추가함으로써 애플리케이션을 업데이트합니다.

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택합니다. 이 예에서는 애플리케이션 명칭 **ProcessMultipleRecordTypes**을(를) 사용합니다.

1. 애플리케이션 세부 정보 페이지에서 **Connect streaming data(스트리밍 데이터 연결)**를 선택하여 소스에 연결합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   1. [1단계: 데이터 준비](tworecordtypes-prepare.md) 단계에서 만든 스트림을 선택합니다.

   1. IAM 역할 생성을 선택합니다.

   1. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다.

   1. [**Save and continue**]를 선택합니다.

1. 애플리케이션 허브에서 [**Go to SQL editor**]를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  "order_id"     integer, 
                  "order_type"   varchar(10),
                  "ticker"       varchar(4),
                  "order_price"  DOUBLE, 
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Order';
      --********************************************
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 ("trade_id"     integer, 
                  "order_id"     integer, 
                  "trade_price"  DOUBLE, 
                  "ticker"       varchar(4),
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType"
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Trade';
      --*****************************************************************
      --do some analytics on the Trade_Stream and Order_Stream. 
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                  "ticker"  varchar(4),
                  "trade_count"   integer
                  );
      
      CREATE OR REPLACE PUMP "Output_Pump" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM "ticker", count(*) as trade_count
            FROM   "Trade_Stream"
            GROUP BY "ticker",
                      FLOOR("Trade_Stream".ROWTIME TO MINUTE);
      ```

   1. [**Save and run SQL**]을 선택합니다. **Real-time analytics(실시간 분석)** 탭을 선택하여 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증합니다.

   

**다음 단계**  
또 다른 Kinesis 스트림이나 Firehose 데이터 전송 스트림과 같은 외부 대상에 결과를 유지하도록 애플리케이션 출력을 구성할 수 있습니다.

# 예: 윈도우 및 집계
<a name="examples-window"></a>

이 섹션에서는 윈도우 형식 및 집계 쿼리를 사용하는 Amazon Kinesis Data Analytics 애플리케이션 예를 소개합니다. (자세한 설명은 [윈도우 모드 쿼리](windowed-sql.md) 섹션을 참조하십시오.) 각 예에서는 Kinesis Data Analytics 애플리케이션을 설정하기 위한 단계별 지침과 예 코드를 제공합니다.

**Topics**
+ [예: 스태거 윈도우](examples-window-stagger.md)
+ [예: ROWTIME을 사용하는 텀블링 윈도우](examples-window-tumbling-rowtime.md)
+ [예: 이벤트 타임스탬프를 사용하는 텀블링 윈도우](examples-window-tumbling-event.md)
+ [예: 가장 자주 발생하는 값 검색(TOP\$1K\$1ITEMS\$1TUMBLING)](examples-window-topkitems.md)
+ [예: 쿼리에서 부분적 결과 집계](examples-window-partialresults.md)

# 예: 스태거 윈도우
<a name="examples-window-stagger"></a>

키와 일치하는 데이터가 도착할 때 윈도우 모드 쿼리가 고유 파티션 키에 대한 개별 윈도우를 처리하기 시작한다면, 이 윈도우는 *스태거 윈도우*라고 부릅니다. 자세한 설명은 [스태거 윈도우](stagger-window-concepts.md) 섹션을 참조하십시오. 이 Amazon Kinesis Data Analytics 예는 EVENT\$1TIME 열과 TICKER 열을 사용하여 스태거 윈도우를 생성합니다. 스소 스트림은 1분 이내에 도착하지만 분 값이 다를 수도 있는(예: `18:41:xx`) 같은 EVENT\$1TIME 및 TICKER 값을 가진 레코드 6개 그룹으로 구성됩니다.

이 예에서는 다음 레코드를 다음 시간에 Kinesis 데이터 스트림에 기록합니다. 이 스크립트는 시간을 스트림에 쓰지 않지만 애플리케이션에서 레코드를 수집하는 시간은 `ROWTIME` 필드에 기록됩니다.

```
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:30
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:40
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:50
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:00
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:10
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:21
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:31
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:41
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:51
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:01
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:11
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:21
...
```



그런 다음 Kinesis 데이터 스트림을 스트리밍 소스로 AWS Management Console사용하여에서 Kinesis Data Analytics 애플리케이션을 생성합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 두 개의 열(`EVENT_TIME` 및 `TICKER`)로 애플리케이션 내 스키마를 유추합니다.

![\[가격 및 티커 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_stagger_schema.png)


`COUNT` 함수가 포함된 애플리케이션 코드를 사용하여 데이터의 윈도우 모드 집계를 생성합니다. 그런 다음 아래 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_stagger.png)


다음 절차에서는 EVENT\$1TIME 및 TICKER를 기반으로 스태거 윈도우의 입력 스트림에서 값을 집계하는 Kinesis Data Analytics 애플리케이션을 생성합니다.

**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-stagger-window-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-stagger-window-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-stagger-window-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Create Kinesis stream(Kinesis 스트림 생성)**을 선택한 다음 샤드가 하나 있는 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 프로덕션 환경에서 Kinesis 데이터 스트림에 레코드를 기록하려면 [Kinesis Producer Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) 또는 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)를 사용하는 것이 좋습니다. 이 예에서는 간단한 설명을 위해 다음 Python 스크립트를 사용하여 레코드를 생성합니다. 코드를 실행하여 샘플 티커 레코드를 채웁니다. 이 단순 코드는 같은 무작위 `EVENT_TIME`과 티커 기호를 이용하는 레코드 6개 집단을 스트림에 1분 이상 계속 작성합니다. 이후 단계에서 애플리케이션 스키마를 생성할 수 있도록 스크립트를 실행 중 상태로 유지합니다.

   ```
    
   import datetime
   import json
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
       return {
           "EVENT_TIME": event_time.isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           # Send six records, ten seconds apart, with the same event time and ticker
           for _ in range(6):
               print(data)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(data),
                   PartitionKey="partitionkey",
               )
               time.sleep(10)
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-stagger-window-2"></a>

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Connect streaming data(스트리밍 데이터 연결)**를 선택하여 소스에 연결합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 두 개의 열이 있습니다.

   1. **Edit Schema(스키마 편집)**를 선택합니다. **EVENT\$1TIME** 열의 **열 유형**을 `TIMESTAMP`으로 변경합니다.

   1. [**Save schema and update stream samples**]를 선택합니다. 콘솔에서 스키마를 저장한 이후 **종료**를 선택합니다.

   1. [**Save and continue**]를 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          event_time TIMESTAMP,
          ticker_symbol    VARCHAR(4),
          ticker_count     INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM 
              EVENT_TIME, 
              TICKER,
              COUNT(TICKER) AS ticker_count
          FROM "SOURCE_SQL_STREAM_001"
          WINDOWED BY STAGGER (
                  PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
      ```

   1. [**Save and run SQL**]을 선택합니다.

      **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: ROWTIME을 사용하는 텀블링 윈도우
<a name="examples-window-tumbling-rowtime"></a>

윈도우 모드 쿼리가 비중첩 방식으로 각 윈도우를 처리하는 경우 이 윈도우를 *텀블링 윈도우*라고 합니다. 자세한 설명은 [텀블링 윈도우(그룹별 집계)](tumbling-window-concepts.md) 섹션을 참조하십시오. 이 Amazon Kinesis Data Analytics 예는 `ROWTIME` 열을 사용하여 텀플링 윈도우를 생성합니다. `ROWTIME` 열은 시간을 나타내며 애플리케이션이 레코드를 읽었습니다.

이 예에서는 다음 레코드를 Kinesis 데이터 스트림에 기록합니다.

```
{"TICKER": "TBV", "PRICE": 33.11}
{"TICKER": "INTC", "PRICE": 62.04}
{"TICKER": "MSFT", "PRICE": 40.97}
{"TICKER": "AMZN", "PRICE": 27.9}
...
```



그런 다음 Kinesis 데이터 스트림을 스트리밍 소스로 AWS Management Console사용하여에서 Kinesis Data Analytics 애플리케이션을 생성합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 두 개의 열(`TICKER` 및 `PRICE`)로 애플리케이션 내 스키마를 유추합니다.

![\[가격 및 티커 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime_schema.png)


`MIN` 및 `MAX` 함수가 포함된 애플리케이션 코드를 사용하여 데이터의 윈도우 모드 집계를 생성합니다. 그런 다음 아래 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime.png)


다음 절차에서는 ROWTIME을 기반으로 텀플링 윈도우의 입력 스트림에서 값을 집계하는 Kinesis Data Analytics 애플리케이션을 생성합니다.

**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-tumbling-window-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-tumbling-window-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-tumbling-window-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Create Kinesis stream(Kinesis 스트림 생성)**을 선택한 다음 샤드가 하나 있는 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 프로덕션 환경에서 Kinesis 데이터 스트림에 레코드를 기록하려면 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) 또는 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)를 사용하는 것이 좋습니다. 이 예에서는 간단한 설명을 위해 다음 Python 스크립트를 사용하여 레코드를 생성합니다. 코드를 실행하여 샘플 티커 레코드를 채웁니다. 이 단순한 코드는 임의의 티커 레코드를 스트림에 연속적으로 씁니다. 이후 단계에서 애플리케이션 스키마를 생성할 수 있도록 스크립트를 실행 중 상태로 유지합니다.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-tumbling-window-2"></a>

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Connect streaming data(스트리밍 데이터 연결)**를 선택하여 소스에 연결합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 두 개의 열이 있습니다.

   1. [**Save schema and update stream samples**]를 선택합니다. 콘솔에서 스키마를 저장한 이후 **종료**를 선택합니다.

   1. [**Save and continue**]를 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE)
              FROM "SOURCE_SQL_STREAM_001"
              GROUP BY TICKER, 
                  STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
      ```

   1. [**Save and run SQL**]을 선택합니다.

      **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: 이벤트 타임스탬프를 사용하는 텀블링 윈도우
<a name="examples-window-tumbling-event"></a>

윈도우 모드 쿼리가 비중첩 방식으로 각 윈도우를 처리하는 경우 이 윈도우를 *텀블링 윈도우*라고 합니다. 자세한 설명은 [텀블링 윈도우(그룹별 집계)](tumbling-window-concepts.md) 섹션을 참조하십시오. 이 Amazon Kinesis Data Analytics 예에서는 스트리밍 데이터에 포함된 사용자 생성 타임스탬프인 이벤트 타임스탬프를 사용하는 텀블링 윈도우를 보여줍니다. 여기서는 애플리케이션이 레코드를 수신할 때 Kinesis Data Analytics가 생성하는 타임스탬프인 ROWTIME를 사용하는 대신에 이 접근 방식을 사용합니다. 이벤트가 애플리케이션에 수신된 시간 대신에 이벤트가 발생한 시간을 기반으로 집계를 생성하려는 경우 스트리밍 데이터에서 이벤트 타임스탬프를 사용합니다. 이 예에서는 `ROWTIME` 값이 1분마다 집계를 트리거하며 레코드가 `ROWTIME` 및 포함된 이벤트 시간별로 모두 집계됩니다.

이 예에서는 다음 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다. 이벤트가 발생한 시간부터 레코드가 Kinesis Data Analytics에 수집된 시간까지 지연을 생성할 수 있는 처리 및 전송 지연을 시뮬레이션하기 위해 이전에 `EVENT_TIME` 값이 5초로 설정되어 있습니다.

```
{"EVENT_TIME": "2018-06-13T14:11:05.766191", "TICKER": "TBV", "PRICE": 43.65}
{"EVENT_TIME": "2018-06-13T14:11:05.848967", "TICKER": "AMZN", "PRICE": 35.61}
{"EVENT_TIME": "2018-06-13T14:11:05.931871", "TICKER": "MSFT", "PRICE": 73.48}
{"EVENT_TIME": "2018-06-13T14:11:06.014845", "TICKER": "AMZN", "PRICE": 18.64}
...
```



그런 다음 Kinesis 데이터 스트림을 스트리밍 소스로 AWS Management Console사용하여에서 Kinesis Data Analytics 애플리케이션을 생성합니다. 검색 프로세스는 스트리밍 소스에서 샘플 레코드를 읽고 다음과 같이 세 개의 열(`EVENT_TIME`, `TICKER` 및 `PRICE`)로 애플리케이션 내 스키마를 유추합니다.

![\[이벤트 시간, 티커 및 가격 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_tumbling_event_schema.png)


`MIN` 및 `MAX` 함수가 포함된 애플리케이션 코드를 사용하여 데이터의 윈도우 모드 집계를 생성합니다. 그런 다음 아래 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_tumbling_event.png)


다음 절차에서는 이벤트 시간을 기반으로 텀플링 윈도우의 입력 스트림에서 값을 집계하는 Kinesis Data Analytics 애플리케이션을 생성합니다.

**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-window-tumbling-event-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-window-tumbling-event-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-window-tumbling-event-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Create Kinesis stream(Kinesis 스트림 생성)**을 선택한 다음 샤드가 하나 있는 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 프로덕션 환경에서 Kinesis 데이터 스트림에 레코드를 기록하려면 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) 또는 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)를 사용하는 것이 좋습니다. 이 예에서는 간단한 설명을 위해 다음 Python 스크립트를 사용하여 레코드를 생성합니다. 코드를 실행하여 샘플 티커 레코드를 채웁니다. 이 단순한 코드는 임의의 티커 레코드를 스트림에 연속적으로 씁니다. 이후 단계에서 애플리케이션 스키마를 생성할 수 있도록 스크립트를 실행 중 상태로 유지합니다.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-window-tumbling-event-2"></a>

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Connect streaming data(스트리밍 데이터 연결)**를 선택하여 소스에 연결합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 세 개의 열이 있습니다.

   1. **Edit Schema(스키마 편집)**를 선택합니다. **EVENT\$1TIME** 열의 **열 유형**을 `TIMESTAMP`으로 변경합니다.

   1. [**Save schema and update stream samples**]를 선택합니다. 콘솔에서 스키마를 저장한 이후 **종료**를 선택합니다.

   1. [**Save and continue**]를 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME timestamp, TICKER VARCHAR(4), min_price REAL, max_price REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND),
              TICKER,
               MIN(PRICE) AS MIN_PRICE,
               MAX(PRICE) AS MAX_PRICE
          FROM    "SOURCE_SQL_STREAM_001"
          GROUP BY TICKER, 
                   STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), 
                   STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND);
      ```

   1. [**Save and run SQL**]을 선택합니다.

      **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: 가장 자주 발생하는 값 검색(TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="examples-window-topkitems"></a>

이 Amazon Kinesis Data Analytics 예에서는 `TOP_K_ITEMS_TUMBLING` 함수를 사용하여 텀블링 윈도우에서 가장 자주 발생하는 값을 검색하는 방법을 보여줍니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [`TOP_K_ITEMS_TUMBLING` 함수](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/top-k.html)를 참조하십시오.

`TOP_K_ITEMS_TUMBLING` 함수는 수만 또는 수십만 개 이상의 키를 집계할 때 리소스 사용을 줄이려는 경우에 유용합니다. 이 함수는 `GROUP BY` 및 `ORDER BY` 절을 사용하여 집계하는 것과 동일한 결과를 생성합니다.

이 예에서는 다음 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다: 

```
{"TICKER": "TBV"}
{"TICKER": "INTC"}
{"TICKER": "MSFT"}
{"TICKER": "AMZN"}
...
```



그런 다음 Kinesis 데이터 스트림을 스트리밍 소스로 AWS Management Console사용하여에서 Kinesis Data Analytics 애플리케이션을 생성합니다. 검색 프로세스는 스트리밍 소스에서 샘플 레코드를 읽고 다음과 같이 하나의 열(`TICKER`)을 사용하여 애플리케이션 내 스키마를 유추합니다.

![\[티커 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_topk_schema.png)


`TOP_K_VALUES_TUMBLING` 함수가 포함된 애플리케이션 코드를 사용하여 데이터의 윈도우 모드 집계를 생성합니다. 그런 다음 아래 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.



![\[애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_topk.png)


다음 절차에서는 입력 스트림에서 가장 자주 발생하는 값을 검색하는 Kinesis Data Analytics 애플리케이션을 생성합니다.

**Topics**
+ [1단계: Kinesis 데이터 스트림 생성](#examples-window-topkitems-1)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](#examples-window-topkitems-2)

## 1단계: Kinesis 데이터 스트림 생성
<a name="examples-window-topkitems-1"></a>

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 레코드를 채웁니다:

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

1. **Create Kinesis stream(Kinesis 스트림 생성)**을 선택한 다음 샤드가 하나 있는 스트림을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

1. 프로덕션 환경에서 Kinesis 데이터 스트림에 레코드를 기록하려면 [Kinesis Client Library](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) 또는 [Kinesis Data Streams API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)를 사용하는 것이 좋습니다. 이 예에서는 간단한 설명을 위해 다음 Python 스크립트를 사용하여 레코드를 생성합니다. 코드를 실행하여 샘플 티커 레코드를 채웁니다. 이 단순한 코드는 임의의 티커 레코드를 스트림에 연속적으로 씁니다. 이후 단계에서 애플리케이션 스키마를 생성할 수 있도록 스크립트를 실행 중 상태로 둡니다.

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="examples-window-topkitems-2"></a>

다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:

1. [https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Managed Service for Apache Flink 콘솔을 엽니다.

1. **애플리케이션 생성**을 선택하고 애플리케이션 명칭을 입력한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Connect streaming data(스트리밍 데이터 연결)**를 선택하여 소스에 연결합니다.

1. **Connect to source(소스에 연결)** 페이지에서 다음을 수행합니다.

   

   1. 이전 섹션에서 생성한 스트림을 선택합니다.

   1. **Discover schema(스키마 발견)**를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 열이 한 개 있습니다.

   1. [**Save schema and update stream samples**]를 선택합니다. 콘솔에서 스키마를 저장한 이후 **종료**를 선택합니다.

   1. [**Save and continue**]를 선택합니다.

1. 애플리케이션 세부 정보 페이지에서 **Go to SQL editor(SQL 편집기로 이동)**를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 **Yes, start application(예, 애플리케이션 시작)**을 선택합니다.

1. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

   1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다:

      ```
      CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
        "TICKER" VARCHAR(4), 
        "MOST_FREQUENT_VALUES" BIGINT
      );
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
          INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM * 
              FROM TABLE (TOP_K_ITEMS_TUMBLING(
                  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
                  'TICKER',         -- name of column in single quotes
                  5,                       -- number of the most frequently occurring values
                  60                       -- tumbling window size in seconds
                  )
              );
      ```

   1. [**Save and run SQL**]을 선택합니다.

      **Real-time analytics(실시간 분석)** 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.

# 예: 쿼리에서 부분적 결과 집계
<a name="examples-window-partialresults"></a>

Amazon Kinesis 데이터 스트림에 처리 시간이 정확히 일치하지 않는 이벤트 시간이 있는 레코드가 포함되어 있는 경우, 텀블링 윈도의 결과 선택에는 창에 도착했다고 되어 잇지만 실제로는 그렇지 아니한 레코드가 포함됩니다. 이 경우 텀블링 윈도우에는 원하는 결과의 일부분만 포함됩니다. 이 문제를 해결하려면 다음과 같은 여러 가지 접근 방식을 사용할 수 있습니다.
+ 텀블링 윈도우만 사용하고, upsert를 사용하여 데이터베이스 또는 데이터 웨어하우스를 통한 사후 처리에서 부분적 결과를 집계합니다. 이 접근 방법은 애플리케이션을 처리하는 데 효율적입니다. 집계 연산자(`sum`, `min`, `max` 등)에 대해 지연 데이터를 무기한 처리합니다. 이 접근 방식의 단점은 데이터베이스 계층에서 추가 애플리케이션 로직을 개발하고 유지 관리해야 한다는 것입니다.
+ 부분적 결과를 조기에 생성하지만 슬라이딩 윈도우 기간 동안 전체 결과를 계속 생성하는 텀블링 및 슬라이딩 윈도우를 사용합니다. 이 접근 방식은 데이터베이스 계층에서 추가 애플리케이션 로직을 추가할 필요가 없도록 upsert 대신 덮어쓰기를 사용하여 만기 데이터를 처리합니다. 이 접근 방식의 단점은 이것이 더 많은 Kinesis 처리 단위(KPU)를 사용하지만 여전히 두 개의 결과를 생성하므로 일부 사용 사례에는 효과적이지 않을 수 있다는 것입니다.

텀블링 및 슬라이딩 윈도우에 대한 자세한 설명은 [윈도우 모드 쿼리](windowed-sql.md) 섹션을 참조하십시오.

다음 절차에서는 텀블링 윈도우 집계가 최종 결과를 생성하기 위해 결합해야 하는 두 개의 부분적 결과(`CALC_COUNT_SQL_STREAM` 애플리케이션 내 스트림으로 전송됨)를 생성합니다. 그런 다음 애플리케이션은 두 개의 부분적 결과를 결합하는 두 번째 집계(`DESTINATION_SQL_STREAM` 애플리케이션 내 스트림으로 전송됨)를 생성합니다.

**이벤트 시간을 사용하여 부분적 결과를 집계하는 애플리케이션을 생성하려면**

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Analytics(데이터 분석)**를 선택합니다. 교재 [Amazon Kinesis Data Analytics for SQL 애플리케이션 시작하기](getting-started.md)의 설명에 따라 Kinesis Data Analytics 애플리케이션을 생성합니다.

1. SQL 편집기에서 애플리케이션 코드를 다음으로 바꿉니다.

   ```
   CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);
   	            
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);            
   	
   CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
       INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER_SYMBOL",
           STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
           COUNT(*) AS "TickerCount"
       FROM "SOURCE_SQL_STREAM_001"
       GROUP BY
           STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE),
           STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
           TICKER_SYMBOL;
   
   CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER",
           "TRADETIME",
           SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
       FROM "CALC_COUNT_SQL_STREAM"
       WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);
   ```

   애플리케이션 코드의 `SELECT`문이 `SOURCE_SQL_STREAM_001`에서 주가 변동이 1%보다 큰 행을 필터링하여 펌프를 사용하여 또 다른 애플리케이션 내 스트림 `CHANGE_STREAM`에 삽입합니다.

1. [**Save and run SQL**]을 선택합니다.

첫 번째 펌프는 다음과 비슷한 `CALC_COUNT_SQL_STREAM`에 스트림을 출력합니다. 결과 집합은 불완전합니다.

![\[부분적 결과를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_partial_0.png)


그런 다음 두 번째 펌프는 전체 결과 집합이 포함된 `DESTINATION_SQL_STREAM`에 스트림을 출력합니다.

![\[전체 결과를 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex_partial_1.png)


# 예: 조인
<a name="examples-joins"></a>

이 섹션에서는 조인 쿼리를 사용하는 Kinesis Data Analytics 애플리케이션 예를 보여드립니다. 각 예는 Kinesis Data Analytics 애플리케이션의 설정 및 테스트를 위한 단계별 지침과 코드를 제공합니다.

**Topics**
+ [예: 참조 데이터를 Kinesis Data Analytics 애플리케이션에 추가](app-add-reference-data.md)

# 예: 참조 데이터를 Kinesis Data Analytics 애플리케이션에 추가
<a name="app-add-reference-data"></a>

이 연습에서는 참조 데이터를 기존 Kinesis Data Analytics 애플리케이션에 추가합니다. 참조 데이터에 대한 정보는 다음 주제를 참조하십시오:
+ [Amazon Kinesis Data Analytics for SQL 애플리케이션: 작동 방식](how-it-works.md)
+ [애플리케이션 입력 구성](how-it-works-input.md)

이 연습에서는 Kinesis Data Analytics [시작하기](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) 연습에서 생성한 애플리케이션에 참조 데이터를 추가합니다. 참조 데이터는 각 티커 기호에 회사 명칭을 부여합니다; 예:

```
Ticker, Company
AMZN,Amazon
ASD, SomeCompanyA
MMB, SomeCompanyB
WAS,  SomeCompanyC
```

우선 [시작하기](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) 연습의 단계를 완료하여 스타터 애플리케이션을 생성합니다. 그런 다음 이러한 단계를 따라 참조 데이터를 설정하고 애플리케이션에 추가합니다.

1. **데이터 준비**
   + Amazon Simple Storage Service (Amazon S3) 의 객체로 저장합니다.
   + Kinesis Data Analytics가 사용자를 대신하여 Amazon S3 객체를 읽을 수 있도록 IAM 역할을 생성합니다.

1. **참조 데이터 소스를 애플리케이션에 추가합니다. ** 

   Kinesis Data Analytics는 Amazon S3 객체를 읽고 애플리케이션 코드에서 쿼리할 수 있는 애플리케이션 내 참조 표를 생성합니다.

1. **코드를 테스트합니다.**

   애플리케이션 코드에서 조인 쿼리를 작성하여 애플리케이션 내 스트림을 애플리케이션 내 참조 표와 조인시켜 회사 명칭에 각 티커 기호를 부여합니다.

**Topics**
+ [1단계: 준비](#add-refdata-prepare)
+ [2단계: 참조 데이터 원본을 애플리케이션 구성에 추가](#add-refdata-create-iamrole)
+ [3단계: 애플리케이션 내 참조 표 쿼리 테스트](#add-refdata-test)

## 1단계: 준비
<a name="add-refdata-prepare"></a>

이 섹션에서는 샘플 참조 데이터를 Amazon S3 버킷에 객체로 저장합니다. Kinesis Data Analytics가 사용자를 대신하여 객체를 읽을 수 있도록 IAM 역할을 생성합니다.

### 참조 데이터를 Amazon S3 객체로 저장
<a name="prepare-create-s3object"></a>

이 단계에서는 샘플 참조 데이터를 Amazon S3 객체로 저장합니다.

1. 텍스트 편집기를 열고 다음 데이터를 추가한 다음 파일을 `TickerReference.csv`로 저장합니다.

   ```
   Ticker, Company
   AMZN,Amazon
   ASD, SomeCompanyA
   MMB, SomeCompanyB
   WAS,  SomeCompanyC
   ```

   

1. `TickerReference.csv` 파일을 S3 버킷에 업로드합니다. *Amazon Simple Storage Service 사용자 가이드*에서 [Amazon S3로 객체 업로드](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UploadingObjectsintoAmazonS3.html)를 참조하십시오.

### IAM 역할 생성
<a name="prepare-create-iamrole"></a>

다음으로, Kinesis Data Analytics가 사용자를 대신하여 Amazon S3 객체를 읽을 수 있도록 IAM 역할을 생성합니다.

1.  AWS Identity and Access Management (IAM)에서 라는 IAM 역할을 생성합니다**KinesisAnalytics-ReadS3Object**. 이 역할을 생성하려면 *IAM 사용자 가이드*에서 [Amazon 서비스를 위한 역할 생성 (AWS Management Console)](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html#roles-creatingrole-service-console)의 지침을 따르십시오.

   IAM 콘솔에서 다음을 지정합니다:
   + **역할 유형 선택**에서 **AWS Lambda**을 선택합니다. 역할을 생성한 후 신뢰 정책을 변경하여 Kinesis Data Analytics(그렇지 않음 AWS Lambda)가 역할을 수임하도록 허용합니다.
   + [**Attach Policy**] 페이지에서 정책을 연결하지 않습니다.

1. IAM 역할 정책 업데이트:

   

   1. IAM 콘솔에서 생성한 역할을 선택합니다.

   1. **신뢰 관계** 탭에서 신뢰 정책을 업데이트하여 Kinesis Data Analytics에게 역할을 맡을 권한을 부여합니다. 신뢰 정책은 다음과 같습니다:

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Principal": {
              "Service": "kinesisanalytics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
          }
        ]
      }
      ```

------

      

   1. **허락** 탭에서 **AmazonS3ReadOnlyAccess**라고 하는 Amazon 관리형 정책을 연결합니다. 이렇게 하면 Amazon S3 객체를 읽을 수 있는 역할 권한이 부여됩니다. 이 정책은 다음과 같습니다.

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "s3:Get*",
              "s3:List*"
            ],
            "Resource": "*"
          }
        ]
      }
      ```

------

## 2단계: 참조 데이터 원본을 애플리케이션 구성에 추가
<a name="add-refdata-create-iamrole"></a>

이 단계에서는 애플리케이션 구성에 참조 데이터 원본을 추가합니다. 시작하려면 다음 정보가 필요합니다.
+ S3 버킷 명칭 및 객체 키 명칭
+ IAM 역할 Amazon 리소스 이름(ARN)

1. 애플리케이션의 기본 페이지에서 **Connect reference data(참조 데이터 연결)**를 선택합니다.

1. **참조 데이터 리소스 연결** 페이지에서 참조 데이터 객체가 포함된 Amazon S3 버킷을 선택하고 객체의 키 명칭을 입력합니다.

1. **애플리케이션 내 참조 표 명칭**에 **CompanyName**을 입력합니다.

1. **Access to chosen resources(선택한 리소스에 액세스)** 섹션에서 **Choose from IAM roles that Kinesis Analytics can assume(Kinesis Analytics가 수임할 수 있는 IAM 역할 중에 선택)**을 선택하고, 이전 섹션에서 생성한 **KinesisAnalytics-ReadS3Object** IAM 역할을 선택합니다.

1. **Discover schema(스키마 발견)**를 선택합니다. 콘솔에서 참조 데이터에 있는 두 개의 열이 감지됩니다.

1. [**Save and close**]를 선택합니다.

## 3단계: 애플리케이션 내 참조 표 쿼리 테스트
<a name="add-refdata-test"></a>

이제 애플리케이션 내 참조 표 `CompanyName`을 쿼리할 수 있습니다. 참조 정보를 활용하여 티커 가격 데이터를 참조 표에 조인하면 애플리케이션을 보강할 수 있습니다. 결과는 회사 명칭을 표시합니다.

1. 애플리케이션 코드를 다음으로 대체합니다. 쿼리가 애플리케이션 내 입력 스트림을 애플리케이션 내 참조 표와 조인시킵니다. 애플리케이션 코드가 결과를 또 다른 애플리케이션 내 스트림 `DESTINATION_SQL_STREAM`에 작성합니다.

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE);
   
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
      SELECT STREAM ticker_symbol, "c"."Company", sector, change, price
      FROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c"
      ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
   ```

1. 애플리케이션 출력이 [**SQLResults**] 탭에 표시되는지 확인합니다. 일부 행이 회사 명칭을 표시하는지 확인합니다(샘플 참조 데이터가 모든 회사 명칭을 가지고 있지는 않습니다).

# 예: 기계 학습
<a name="examples-machine"></a>

이 섹션에서는 기계 학습 쿼리를 사용하는 Amazon Kinesis Data Analytics 애플리케이션 예를 소개합니다. 기계 학습 쿼리는 스트림의 데이터 기록에 의존하여 데이터의 복잡한 분석을 수행하고 비정상적인 패턴을 찾습니다. 이 예에서는 Kinesis Data Analytics 애플리케이션 설정 및 테스트 방법을 단계적으로 설명합니다.

**Topics**
+ [예: 스트림에서 데이터 변칙을 감지하는 방법(RANDOM\$1CUT\$1FOREST 함수)](app-anomaly-detection.md)
+ [예: 데이터 변칙 감지 및 설명(RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION 함수)](app-anomaly-detection-with-explanation.md)
+ [예: 스트림에서 핫스팟 감지(HOTSPOTS 함수)](app-hotspots-detection.md)

# 예: 스트림에서 데이터 변칙을 감지하는 방법(RANDOM\$1CUT\$1FOREST 함수)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics는 수 열에 있는 값을 바탕으로 각 레코드의 이상 점수를 할당할 수 있는 함수(`RANDOM_CUT_FOREST`)를 제공합니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [`RANDOM_CUT_FOREST` 함수](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)를 참조하십시오.

이 실습에서는 애플리케이션 코드를 작성해 변칙 점수를 애플리케이션의 스트리밍 소스에 있는 레코드에 할당합니다. 애플리케이션을 설정하려면 다음을 수행합니다:

1. **스트리밍 소스 설정** – 다음과 같이 샘플 `heartRate` 데이터 스트림을 설정하고 샘플 데이터를 작성합니다:

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   이 절차에서는 Python 스크립트를 통해 스트림을 채우는 방법을 알아봅니다. `heartRate` 값은 무작위로 생성되는데, `heartRate` 값이 60과 100 사이인 레코드가 99%이고 `heartRate` 값이 150과 200 사이인 레코드는 1%밖에 되지 않습니다. 따라서 `heartRate` 값이 150과 200 사이인 레코드는 변칙입니다.

1. **입력 구성** – 콘솔을 사용하여 Kinesis Data Analytics 애플리케이션을 생성하고, 스트리밍 소스를 애플리케이션 내 스트림(`SOURCE_SQL_STREAM_001`)에 매핑함으로써 애플리케이션 입력을 구성합니다. 애플리케이션을 시작하면 Kinesis Data Analytics이 지속적으로 스트리밍 소스를 읽어서 애플리케이션 내 스트림에 레코드를 삽입합니다.

1. **애플리케이션 코드 지정** – 이 예는 다음 애플리케이션 코드를 사용합니다:

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   코드가 `SOURCE_SQL_STREAM_001`에 있는 행을 읽고, 변칙 점수를 할당하고, 결과 행을 또 다른 애플리케이션 내 스트림(`TEMP_STREAM`)에 작성합니다. 그런 다음 애플리케이션 코드가 `TEMP_STREAM`에 있는 레코드를 정렬하고 결과를 또 다른 애플리케이션 내 스트림(`DESTINATION_SQL_STREAM`)에 저장합니다. 펌프를 사용하여 행을 애플리케이션 내 스트림에 삽입합니다. 자세한 설명은 [애플리케이션 내 스트림과 펌프](streams-pumps.md) 섹션을 참조하십시오.

1. **출력 구성** – `DESTINATION_SQL_STREAM`에 있는 데이터를 또 다른 Kinesis 데이터 스트림인 외부 대상에 유지하도록 애플리케이션 출력을 구성합니다. 각 레코드에 할당된 변칙 점수를 검토하고 어떤 점수가 변칙을 나타내는지 판단하는 것은 애플리케이션 외부에서 이루어집니다. AWS Lambda 함수를 사용하여 이러한 이상 점수를 처리하고 알림을 구성할 수 있습니다.

이 연습에서는 미국 동부(버지니아 북부)`us-east-1`)를 사용하여 이러한 스트림과 애플리케이션을 생성합니다. 다른 리전을 사용하는 경우 그에 따라 코드를 업데이트해야 합니다.

**Topics**
+ [1단계: 준비](app-anomaly-prepare.md)
+ [단계 2: 애플리케이션 만들기](app-anom-score-create-app.md)
+ [3단계: 애플리케이션 출력 구성](app-anomaly-create-ka-app-config-destination.md)
+ [4단계: 출력 확인](app-anomaly-verify-output.md)

**다음 단계**  
[1단계: 준비](app-anomaly-prepare.md)

# 1단계: 준비
<a name="app-anomaly-prepare"></a>

이 연습에 사용할 Amazon Kinesis Data Analytics 애플리케이션을 생성하기 전에 Kinesis 데이터 스트림 두 개를 생성해야 합니다. 스트림 중 하나를 애플리케이션의 스트리밍 소스로 구성하고 또 다른 스트림을 Kinesis Data Analytics가 애플리케이션 출력을 유지하는 목적지로 구성합니다.

**Topics**
+ [1.1단계: 입력 및 출력 데이터 스트림 생성](#app-anomaly-create-two-streams)
+ [1.2단계: 샘플 레코드를 입력 스트림에 작성](#app-anomaly-write-sample-records-inputstream)

## 1.1단계: 입력 및 출력 데이터 스트림 생성
<a name="app-anomaly-create-two-streams"></a>

이 섹션에서는 2개의 Kinesis 스트림을 생성합니다: `ExampleInputStream` 및 `ExampleOutputStream`. AWS Management Console 또는 AWS CLI을(를) 사용하여 이러한 스트림을 만들 수 있습니다.
+ 

**콘솔을 사용하려면**

  1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

  1. **데이터 스트림 생성**을 선택합니다. 샤드가 하나인 스트림(`ExampleInputStream`이라고 함)을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

  1. 이전 단계를 반복하여 샤드가 하나인 스트림(`ExampleOutputStream`이라고 함)을 생성합니다.
+ 

**를 사용하려면 AWS CLI**

  1. 다음 Kinesis `create-stream` AWS CLI 명령을 사용하여 첫 번째 스트림()을 생성합니다`ExampleInputStream`.

     ```
     $ aws kinesis create-stream \
     --stream-name ExampleInputStream \
     --shard-count 1 \
     --region us-east-1 \
     --profile adminuser
     ```

  1. 동일한 명령을 실행하여 스트림 명칭을 `ExampleOutputStream`으로 변경합니다. 이 명령은 두 번째 스트림을 생성하고 애플리케이션은 이를 사용하여 출력을 작성합니다.

## 1.2단계: 샘플 레코드를 입력 스트림에 작성
<a name="app-anomaly-write-sample-records-inputstream"></a>

이 단계에서는 Python 코드를 실행하여 샘플 레코드를 연속적으로 생성하고 이러한 레코드를 `ExampleInputStream` 스트림에 작성합니다.

```
{"heartRate": 60, "rateType":"NORMAL"} 
...
{"heartRate": 180, "rateType":"HIGH"}
```

1. Python 및 `pip`를 설치합니다.

   Python 설치에 관한 정보는 [Python](https://www.python.org/) 웹사이트를 참조하십시오.

   pip를 사용하여 종속 프로그램을 설치할 수 있습니다. pip 설치에 관한 정보는 pip 웹 사이트에 있는 [Installation](https://pip.pypa.io/en/stable/installing/)을 참조하십시오.

1. 다음 Python 코드를 실행합니다. 코드에 있는 `put-record` 명령이 JSON 레코드를 스트림에 작성합니다.

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class RateType(Enum):
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_heart_rate(rate_type):
       if rate_type == RateType.normal:
           rate = random.randint(60, 100)
       elif rate_type == RateType.high:
           rate = random.randint(150, 200)
       else:
           raise TypeError
       return {"heartRate": rate, "rateType": rate_type.value}
   
   
   def generate(stream_name, kinesis_client, output=True):
       while True:
           rnd = random.random()
           rate_type = RateType.high if rnd < 0.01 else RateType.normal
           heart_rate = get_heart_rate(rate_type)
           if output:
               print(heart_rate)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(heart_rate),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**다음 단계**  
[단계 2: 애플리케이션 만들기](app-anom-score-create-app.md)

# 단계 2: 애플리케이션 만들기
<a name="app-anom-score-create-app"></a>

이 섹션에서는 다음과 같이 Amazon Kinesis Data Analytics 애플리케이션을 생성합니다:
+ [1단계: 준비](app-anomaly-prepare.md)에서 스트리밍 소스로 생성한 Kinesis 데이터 스트림을 사용하도록 애플리케이션 입력을 구성합니다.
+ 콘솔에서 **변칙 감지** 템플릿을 사용합니다.

**애플리케이션을 생성하는 방법**

1. Kinesis Data Analytics **시작하기** 연습에서의 1, 2 및 3단계를 따르십시오 ([단계 3.1: 애플리케이션 만들기](get-started-create-app.md) 참조).
   + 소스 구성에서 다음을 수행합니다:
     + 이전 섹션에서 생성한 스트리밍 소스를 지정합니다.
     + 콘솔이 스키마를 유추한 후에 스키마를 편집하고 `heartRate` 열 유형을 `INTEGER`로 설정합니다.

       대부분의 심박수 값은 정상이며 검색 프로세스는 이 열에 `TINYINT` 유형을 할당할 가능성이 높습니다. 그러나 높은 심박수를 나타내는 값은 백분율이 매우 낮습니다. 이러한 높은 값이 `TINYINT` 유형에 부합하지 않을 경우 Kinesis Data Analytics는 해당 행을 오류 스트림으로 전송합니다. 생성된 모든 심박수 데이터를 수용할 수 있도록 데이터 유형을 `INTEGER`로 업데이트합니다.
   + 콘솔에서 **변칙 감지** 템플릿을 사용합니다. 그런 다음 템플릿 코드를 업데이트하여 적절한 열 명칭을 부여합니다.

1. 열 명칭을 부여하여 애플리케이션 코드를 업데이트합니다. 결과로 얻은 애플리케이션 코드는 다음과 같이 표시됩니다(이 코드를 복사하여 SQL 편집기에 붙여넣습니다).

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   

1. Kinesis Data Analytics 콘솔에서 SQL 코드를 실행하고 결과를 검토합니다:  
![\[애플리케이션 내 스트림의 결과 데이터가 포함된 실시간 분석 탭을 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/anom-v2-40.png)





**다음 단계**  
[3단계: 애플리케이션 출력 구성](app-anomaly-create-ka-app-config-destination.md)

# 3단계: 애플리케이션 출력 구성
<a name="app-anomaly-create-ka-app-config-destination"></a>

[단계 2: 애플리케이션 만들기](app-anom-score-create-app.md) 완료 이후 스트리밍 소스로부터 심박수 데이터를 읽고 변칙 점수를 각각에 할당하는 애플리케이션 코드를 가지고 있습니다.

이제 애플리케이션 내 스트림에서 외부 목적지인 또 다른 Kinesis 데이터 스트림(`OutputStreamTestingAnomalyScores`)으로 애플리케이션 결과를 전송할 수 있습니다. 변칙 점수를 분석하여 어떤 심박수가 변칙인지 판단할 수 있습니다. 이 애플리케이션을 더욱 확장하여 알림을 만들 수 있습니다.

다음 단계에 따라 애플리케이션 출력을 구성합니다.



1. Amazon Kinesis Data Analytics 콘솔을 엽니다. SQL 편집기에서 [**Destination**]을 선택하거나 애플리케이션 대시보드에서 [**Add a destination**]을 선택합니다.

1. **Connect to destination(대상 주소에 연결)** 페이지에서 이전 섹션을 통해 생성한 `OutputStreamTestingAnomalyScores` 스트림을 선택합니다.

   이제 애플리케이션 내 스트림 `DESTINATION_SQL_STREAM`에 애플리케이션이 작성하는 모든 레코드를 Amazon Kinesis Data Analytics가 유지하는 외부 대상이 생성되었습니다.

1. 선택적으로 `OutputStreamTestingAnomalyScores` 스트림을 모니터링하고 알림을 보내 AWS Lambda 도록를 구성할 수 있습니다. 지침은 [Lambda 함수를 사용하여 데이터 사전 처리](lambda-preprocessing.md) 섹션을 참조하십시오. 알림을 설정하지 않으면 [4단계: 출력 확인](app-anomaly-verify-output.md)에 설명된 대로, Kinesis Data Analytics이 외부 목적지인 Kinesis 데이터 스트림 `OutputStreamTestingAnomalyScores`으로 기록하는 레코드를 검토할 수 있습니다.

**다음 단계**  
[4단계: 출력 확인](app-anomaly-verify-output.md)

# 4단계: 출력 확인
<a name="app-anomaly-verify-output"></a>

[3단계: 애플리케이션 출력 구성](app-anomaly-create-ka-app-config-destination.md)에서 애플리케이션 출력을 구성한 이후 다음 AWS CLI 명령을 사용하여 애플리케이션이 작성한 대상 스트림 내의 레코드를 읽습니다.

1. `get-shard-iterator` 명령을 실행하여 출력 스트림 상의 데이터에 대한 포인터를 확보합니다.

   ```
   aws kinesis get-shard-iterator \
   --shard-id shardId-000000000000 \
   --shard-iterator-type TRIM_HORIZON \
   --stream-name OutputStreamTestingAnomalyScores \
   --region us-east-1 \
   --profile adminuser
   ```

   다음 예 응답에서와 같이 샤드 반복자 값을 지닌 응답을 얻습니다:

   ```
     {      
         "ShardIterator":
         "shard-iterator-value"   }
   ```

   샤드 반복자 값을 복사합니다.

1.  AWS CLI `get-records` 명령을 실행합니다.

   ```
   aws kinesis get-records \
   --shard-iterator shared-iterator-value \
   --region us-east-1 \
   --profile adminuser
   ```

   이 명령은 다음 레코드 세트를 가져오는 후속 `get-records` 명령에서 사용할 수 있는 레코드 페이지와 또 다른 샤드 반복자를 반환합니다.

# 예: 데이터 변칙 감지 및 설명(RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION 함수)
<a name="app-anomaly-detection-with-explanation"></a>

Amazon Kinesis Data Analytics은 수치 열에 있는 값을 바탕으로 각 레코드에 이상 점수를 할당하는`RANDOM_CUT_FOREST_WITH_EXPLANATION` 함수를 제공합니다. 또한 함수는 변칙에 대한 설명을 제공합니다. *자세한 설명은 Amazon Managed Service for Apache Flink SQL 참조*의 [RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPONSION](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-random-cut-forest-with-explanation.html)을 참조하십시오.

이 연습에서는 애플리케이션 코드를 작성하여 애플리케이션의 스트리밍 소스에 있는 레코드의 변칙 점수를 가져옵니다. 또한 각 변칙에 대한 설명을 얻습니다.

**Topics**
+ [1단계: 데이터 준비](app-anomaly-with-ex-prepare.md)
+ [2단계: 분석 애플리케이션 생성](app-anom-with-exp-create-app.md)
+ [3단계: 결과 검사](examine-results-with-exp.md)

**첫 단계**  
[1단계: 데이터 준비](app-anomaly-with-ex-prepare.md)

# 1단계: 데이터 준비
<a name="app-anomaly-with-ex-prepare"></a>

이 [예](app-anomaly-detection-with-explanation.md)의 경우 Amazon Kinesis Data Analytics 애플리케이션을 생성하기 전에 애플리케이션의 스트리밍 소스로 사용할 Kinesis 데이터 스트림을 생성합니다. 또한 Python 코드를 실행하여 시뮬레이션된 혈압 데이터를 스트림에 작성합니다.

**Topics**
+ [1.1단계: Kinesis 데이터 스트림 생성](#app-anomaly-create-two-streams)
+ [1.2단계: 샘플 레코드를 입력 스트림에 작성](#app-anomaly-write-sample-records-inputstream)

## 1.1단계: Kinesis 데이터 스트림 생성
<a name="app-anomaly-create-two-streams"></a>

이 섹션에서는 `ExampleInputStream`이라는 Kinesis 데이터 스트림을 생성합니다. AWS Management Console 또는를 사용하여이 데이터 스트림을 생성할 수 있습니다 AWS CLI.
+ 콘솔을 사용하려면:

  1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

  1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다. 그런 다음 **Kinesis 스트림 생성**을 선택합니다.

  1. 명칭에는 **ExampleInputStream**을(를) 입력합니다. 샤드 수에는 **1**을(를) 입력합니다.
+ 또는 AWS CLI 를 사용하여 데이터 스트림을 생성하려면 다음 명령을 실행합니다.

  ```
  $ aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1
  ```

## 1.2단계: 샘플 레코드를 입력 스트림에 작성
<a name="app-anomaly-write-sample-records-inputstream"></a>

이 단계에서는 Python 코드를 실행하여 샘플 레코드를 연속적으로 생성하고 생성한 데이터 스트림에 작성합니다.

1. Python 및 pip를 설치합니다.

   Python 설치에 관한 자세한 설명은 [Python](https://www.python.org/)을 참조하십시오.

   pip를 사용하여 종속 프로그램을 설치할 수 있습니다. pip 설치에 관한 자세한 설명은 pip 설명서의 [설치](https://pip.pypa.io/en/stable/installing/)를 참조하십시오.

1. 다음 Python 코드를 실행합니다. 리전은 이 예에서 사용할 리전으로 변경할 수 있습니다. 코드에 있는 `put-record` 명령이 JSON 레코드를 스트림에 작성합니다.

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class PressureType(Enum):
       low = "LOW"
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_blood_pressure(pressure_type):
       pressure = {"BloodPressureLevel": pressure_type.value}
       if pressure_type == PressureType.low:
           pressure["Systolic"] = random.randint(50, 80)
           pressure["Diastolic"] = random.randint(30, 50)
       elif pressure_type == PressureType.normal:
           pressure["Systolic"] = random.randint(90, 120)
           pressure["Diastolic"] = random.randint(60, 80)
       elif pressure_type == PressureType.high:
           pressure["Systolic"] = random.randint(130, 200)
           pressure["Diastolic"] = random.randint(90, 150)
       else:
           raise TypeError
       return pressure
   
   
   def generate(stream_name, kinesis_client):
       while True:
           rnd = random.random()
           pressure_type = (
               PressureType.low
               if rnd < 0.005
               else PressureType.high
               if rnd > 0.995
               else PressureType.normal
           )
           blood_pressure = get_blood_pressure(pressure_type)
           print(blood_pressure)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(blood_pressure),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

**다음 단계**  
[2단계: 분석 애플리케이션 생성](app-anom-with-exp-create-app.md)

# 2단계: 분석 애플리케이션 생성
<a name="app-anom-with-exp-create-app"></a>

이 섹션에서는 Amazon Kinesis Data Analytics 애플리케이션을 생성하고 [1단계: 데이터 준비](app-anomaly-with-ex-prepare.md)에서 스트리밍 소스로 만든 Kinesis 데이터 스트림을 사용하도록 구성합니다. 그런 다음 `RANDOM_CUT_FOREST_WITH_EXPLANATION` 함수를 사용하는 애플리케이션 코드를 실행합니다.

**애플리케이션을 생성하는 방법**

1. [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)에서 Kinesis 콘솔을 엽니다.

1. 탐색 창에서 **Data Analytics(데이터 분석)**를 선택한 다음 **애플리케이션 생성**을 선택합니다.

1. 애플리케이션 명칭 및 설명(선택 사항)을 입력하고 [**Create application**]을 선택합니다.

1. **스트리밍 데이터 연결**을 선택한 다음 목록에서 **ExampleInputStream**을 선택합니다.

1. [**Discover schema**]를 선택하고 `Systolic` 및 `Diastolic`이 `INTEGER` 열로 나타나는지 확인하십시오. 다른 유형이 있으면 [**Edit schema**]를 선택하고 유형 `INTEGER`를 두 유형 모두에 할당합니다.

1. [**Real time analytics**]에서 [**Go to SQL editor**]를 선택합니다. 메시지가 표시되면 애플리케이션을 실행하도록 선택합니다.

1. 다음 코드를 SQL 편집기에 붙여넣은 다음 [**Save and run SQL**]을 선택합니다.

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "Systolic"                  INTEGER,
   	        "Diastolic"                 INTEGER,
   	        "BloodPressureLevel"        varchar(20),
   	        "ANOMALY_SCORE"             DOUBLE,
   	        "ANOMALY_EXPLANATION"       varchar(512));
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "Systolic"                  INTEGER,
   	        "Diastolic"                 INTEGER,
   	        "BloodPressureLevel"        varchar(20),
   	        "ANOMALY_SCORE"             DOUBLE,
   	        "ANOMALY_EXPLANATION"       varchar(512));
   
   -- Compute an anomaly score with explanation for each record in the input stream
   -- using RANDOM_CUT_FOREST_WITH_EXPLANATION
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "Systolic", "Diastolic", "BloodPressureLevel", ANOMALY_SCORE, ANOMALY_EXPLANATION 
         FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

**다음 단계**  
[3단계: 결과 검사](examine-results-with-exp.md)

# 3단계: 결과 검사
<a name="examine-results-with-exp"></a>

이 [예](app-anomaly-detection-with-explanation.md)에 대한 SQL 코드를 실행하면 먼저 변칙 점수가 0인 행이 표시됩니다. 이는 초기 학습 단계 중에 발생합니다. 그런 다음 다음과 비슷한 결과를 얻게 됩니다:

```
ROWTIME SYSTOLIC DIASTOLIC BLOODPRESSURELEVEL ANOMALY_SCORE ANOMALY_EXPLANATION
27:49.0	101      66        NORMAL             0.711460417   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0922","ATTRIBUTION_SCORE":"0.3792"},"Diastolic":{"DIRECTION":"HIGH","STRENGTH":"0.0210","ATTRIBUTION_SCORE":"0.3323"}}
27:50.0	144      123       HIGH               3.855851061   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.8567","ATTRIBUTION_SCORE":"1.7447"},"Diastolic":{"DIRECTION":"HIGH","STRENGTH":"7.0982","ATTRIBUTION_SCORE":"2.1111"}}
27:50.0	113      69        NORMAL             0.740069409   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0549","ATTRIBUTION_SCORE":"0.3750"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0394","ATTRIBUTION_SCORE":"0.3650"}}
27:50.0	105      64        NORMAL             0.739644157   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.0245","ATTRIBUTION_SCORE":"0.3667"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0524","ATTRIBUTION_SCORE":"0.3729"}}
27:50.0	100      65        NORMAL             0.736993425   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.0203","ATTRIBUTION_SCORE":"0.3516"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0454","ATTRIBUTION_SCORE":"0.3854"}}
27:50.0	108      69        NORMAL             0.733767202   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0974","ATTRIBUTION_SCORE":"0.3961"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0189","ATTRIBUTION_SCORE":"0.3377"}}
```
+ `RANDOM_CUT_FOREST_WITH_EXPLANATION` 함수의 알고리즘에서는 `Systolic` 및 `Diastolic` 열이 숫자이며 이들 열을 입력으로 사용합니다.
+ `BloodPressureLevel` 열은 텍스트 데이터를 포함하므로 알고리즘에 의해 고려되지 않습니다. 이 열은 시각적으로 보조하기 위한 것으로, 이 예에서 정상, 최고 및 최저 혈압 레벨을 신속하게 파악하는 데 도움이 됩니다.
+ `ANOMALY_SCORE` 열에서 점수가 더 높은 레코드는 보다 더 변칙적입니다. 이 샘플 결과 세트의 두 번째 레코드는 변칙 점수가 3.855851061로, 가장 변칙적입니다.
+ 알고리즘에 의해 고려된 각 숫자 열이 변칙 점수에 영향을 미치는 정도를 이해하려면 `ANOMALY_SCORE` 열에서 `ATTRIBUTION_SCORE`라는 JSON 필드를 참조하십시오. 이 샘플 결과 세트의 두 번째 행의 경우 `Systolic` 및 `Diastolic` 행은 1.7447:2.1111 비율로 변칙에 영향을 미칩니다. 즉 변칙 점수에 대한 설명의 45%는 수축 값에 의한 것이고 나머지 속성은 이완 값에 의한 것입니다.
+ 이 샘플의 두 번째 행에 표시된 점의 변칙성을 확인하려면 `DIRECTION`이라는 JSON 필드를 참조하십시오. 이 경우 이완 및 수축 값 모두 `HIGH`로 표시됩니다. 이러한 방향의 정확성에 대한 신뢰도를 확인하려면 `STRENGTH`라는 JSON 필드를 참조하십시오. 이 예에서 알고리즘은 이완 값이 높다는 것을 보다 더 확신합니다. 실제로 정상적인 확장 판독 값은 보통 60에서 80 사이이며 123은 예상보다 훨씬 높습니다.

# 예: 스트림에서 핫스팟 감지(HOTSPOTS 함수)
<a name="app-hotspots-detection"></a>

Amazon Kinesis Data Analytics는 데이터에서 상대적으로 밀도가 높은 리전에 대한 정보를 찾아 반환할 수 있는 `HOTSPOTS` 함수를 제공합니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [핫스팟](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-hotspots.html)을 참조하십시오.

이 실습에서는 애플리케이션 코드를 작성해 애플리케이션의 스트리밍 소스에서 핫스팟을 찾습니다. 애플리케이션을 설정하려면 다음 단계를 수행합니다.

1. **스트리밍 소스 설정** – 다음과 같이 Kinesis 스트림을 설정하고 샘플 좌표 데이터를 작성합니다:

   ```
   {"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
   {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}
   ```

   이 예에서는 Python 스크립트를 통해 스트림을 채우는 방법을 알아봅니다. `x` 및 `y` 값은 무작위로 생성되며, 특정 위치 주변에서 일부 기록이 클러스터링됩니다.

   `is_hot` 필드가 제공되어 스크립트가 의도적으로 핫스팟의 일부로서 값을 생성했는지 여부를 나타냅니다. 이는 핫스팟 감지 함수가 제대로 작동하고 있는지 평가하는 데 도움이 될 수 있습니다.

1. **애플리케이션 생성** – AWS Management Console를 사용하여 Kinesis Data Analytics 애플리케이션을 생성합니다. 스트리밍 소스를 애플리케이션 내 스트림(`SOURCE_SQL_STREAM_001`)으로 매핑하여 애플리케이션 입력을 구성합니다. 애플리케이션을 시작하면 Kinesis Data Analytics가 지속적으로 스트리밍 소스를 읽고 애플리케이션 내 스트림에 레코드를 삽입합니다.

   이 연습에서는 애플리케이션에 대해 다음 코드를 사용합니다.

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   코드가 `SOURCE_SQL_STREAM_001`에 있는 행을 읽고, 중요 핫스팟에 대해 이를 분석하고, 결과 데이터를 또 다른 애플리케이션 내 스트림(`DESTINATION_SQL_STREAM`)에 작성합니다. 펌프를 사용하여 행을 애플리케이션 내 스트림에 삽입합니다. 자세한 설명은 [애플리케이션 내 스트림과 펌프](streams-pumps.md) 섹션을 참조하십시오.

1. **출력 구성** – 애플리케이션이 외부 목적지인 또 다른 Kinesis 데이터 스트림으로 데이터를 보내도록 애플리케이션 출력을 구성합니다. 핫스팟 점수를 검토하고 어떤 점수가 핫스팟 발생(알림이 발생해야 함)을 나타내는지 판단합니다. AWS Lambda 함수를 사용하여 핫스팟 정보를 추가로 처리하고 알림을 구성할 수 있습니다.

1. **출력 확인** – 이 예에는 출력 스트림으로부터 데이터를 읽고 이를 그래픽으로 표현하는 JavaScript 애플리케이션이 포함되어 있습니다. 따라서 애플리케이션이 실시간으로 생성하는 핫스팟을 볼 수 있습니다.



이 연습에서는 미국 서부(오레곤)(`us-west-2`)을 사용하여 이러한 스트림과 애플리케이션을 생성합니다. 다른 리전을 사용하는 경우 그에 따라 코드를 업데이트합니다.

**Topics**
+ [1단계: 입력 및 출력 스트림 생성](app-hotspots-prepare.md)
+ [2단계: Kinesis Data Analytics 애플리케이션 생성](app-hotspot-create-app.md)
+ [3단계: 애플리케이션 출력 구성](app-hotspots-create-ka-app-config-destination.md)
+ [4단계: 애플리케이션 출력 확인](app-hotspots-verify-output.md)

# 1단계: 입력 및 출력 스트림 생성
<a name="app-hotspots-prepare"></a>

[핫스팟 예](app-hotspots-detection.md)를 위해 Amazon Kinesis Data Analytics 애플리케이션을 생성하기 전에 먼저 Kinesis 데이터 스트림 2개를 생성합니다. 스트림 중 하나를 애플리케이션의 스트리밍 소스로 구성하고 또 다른 스트림을 Kinesis Data Analytics가 애플리케이션 출력을 유지하는 목적지로 구성합니다.

**Topics**
+ [1.1단계: Kinesis 데이터 스트림 생성](#app-hotspots-create-two-streams)
+ [1.2단계: 샘플 레코드를 입력 스트림에 작성](#app-hotspots-write-sample-records-inputstream)

## 1.1단계: Kinesis 데이터 스트림 생성
<a name="app-hotspots-create-two-streams"></a>

이 섹션에서는 다음 2개의 Kinesis 데이터 스트림을 생성합니다: `ExampleInputStream` 및 `ExampleOutputStream`.

콘솔 또는 AWS CLI을(를) 사용하여 데이터 스트림을 생성합니다.
+ 콘솔을 사용하여 데이터 스트림을 생성:

  1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

  1. 탐색 창에서 **Data Streams(데이터 스트림)**를 선택합니다.

  1. **Kinesis 스트림 생성**을 선택한 다음 샤드가 하나인 스트림(`ExampleInputStream`이라고 함)을 생성합니다.

  1. 이전 단계를 반복하여 샤드가 하나인 스트림(`ExampleOutputStream`이라고 함)을 생성합니다.
+  AWS CLI을(를) 사용하여 데이터 스트림 생성:
  + 다음 Kinesis `create-stream` AWS CLI 명령을 사용하여 스트림(`ExampleInputStream` 및 `ExampleOutputStream`)을 생성합니다. 애플리케이션이 출력을 작성하기 위해 사용할 두 번째 스트림을 생성하려면 동일한 명령을 실행하여 스트림 명칭을 `ExampleOutputStream`으로 변경합니다.

    ```
    $ aws kinesis create-stream \
    --stream-name ExampleInputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
                             
    $ aws kinesis create-stream \
    --stream-name ExampleOutputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
    ```

## 1.2단계: 샘플 레코드를 입력 스트림에 작성
<a name="app-hotspots-write-sample-records-inputstream"></a>

이 단계에서는 Python 코드를 실행하여 샘플 레코드를 연속적으로 생성하고 `ExampleInputStream` 스트림에 작성합니다.

```
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
{"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
```

1. Python 및 `pip`를 설치합니다.

   Python 설치에 관한 정보는 [Python](https://www.python.org/) 웹사이트를 참조하십시오.

   pip를 사용하여 종속 프로그램을 설치할 수 있습니다. pip 설치에 관한 정보는 pip 웹 사이트에 있는 [Installation](https://pip.pypa.io/en/stable/installing/)을 참조하십시오.

1. 다음 Python 코드를 실행합니다. 이 코드는 다음을 수행합니다.
   + (X, Y) 평면 어딘가에 잠재적 핫스팟을 생성합니다.
   + 각 핫스팟마다 1,000포인트 세트를 생성합니다. 이 포인트에서 20%가 핫스팟 주변에 클러스터링됩니다. 나머지는 전체 공간 내에 무작위로 생성됩니다.
   + `put-record` 명령은 JSON 레코드를 스트림에 작성합니다.
**중요**  
이 파일에는 귀하의 AWS 자격 증명이 포함되어 있으므로 이 파일을 웹 서버에 업로드하지 마십시오.

   ```
    
   import json
   from pprint import pprint
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_hotspot(field, spot_size):
       hotspot = {
           "left": field["left"] + random.random() * (field["width"] - spot_size),
           "width": spot_size,
           "top": field["top"] + random.random() * (field["height"] - spot_size),
           "height": spot_size,
       }
       return hotspot
   
   
   def get_record(field, hotspot, hotspot_weight):
       rectangle = hotspot if random.random() < hotspot_weight else field
       point = {
           "x": rectangle["left"] + random.random() * rectangle["width"],
           "y": rectangle["top"] + random.random() * rectangle["height"],
           "is_hot": "Y" if rectangle is hotspot else "N",
       }
       return {"Data": json.dumps(point), "PartitionKey": "partition_key"}
   
   
   def generate(
       stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client
   ):
       """
       Generates points used as input to a hotspot detection algorithm.
       With probability hotspot_weight (20%), a point is drawn from the hotspot;
       otherwise, it is drawn from the base field. The location of the hotspot
       changes for every 1000 points generated.
       """
       points_generated = 0
       hotspot = None
       while True:
           if points_generated % 1000 == 0:
               hotspot = get_hotspot(field, hotspot_size)
           records = [
               get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)
           ]
           points_generated += len(records)
           pprint(records)
           kinesis_client.put_records(StreamName=stream_name, Records=records)
   
           time.sleep(0.1)
   
   
   if __name__ == "__main__":
       generate(
           stream_name=STREAM_NAME,
           field={"left": 0, "width": 10, "top": 0, "height": 10},
           hotspot_size=1,
           hotspot_weight=0.2,
           batch_size=10,
           kinesis_client=boto3.client("kinesis"),
       )
   ```



**다음 단계**  
[2단계: Kinesis Data Analytics 애플리케이션 생성](app-hotspot-create-app.md)

# 2단계: Kinesis Data Analytics 애플리케이션 생성
<a name="app-hotspot-create-app"></a>

[핫스팟 예](app-hotspots-detection.md) 섹션에서 다음과 같이 Kinesis Data Analytics 애플리케이션을 생성합니다:
+ [1 단계](app-hotspots-prepare.md)에서 스트리밍 소스로 생성한 Kinesis 데이터 스트림을 사용하도록 애플리케이션 입력을 구성합니다.
+  AWS Management Console에서 제공된 애플리케이션 코드를 사용합니다.

**애플리케이션을 생성하는 방법**

1. [시작하기](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) 연습 ([단계 3.1: 애플리케이션 만들기](get-started-create-app.md) 참조) 의 1, 2, 3단계에 따라 Kinesis Data Analytics 애플리케이션을 생성합니다.

   소스 구성에서 다음을 수행합니다:
   + [1단계: 입력 및 출력 스트림 생성](app-hotspots-prepare.md) 섹션에서 생성한 스트리밍 소스를 지정합니다.
   + 콘솔에서 스키마를 유추한 이후 스키마를 편집합니다. `x` 및 `y`열 유형이 `DOUBLE`로 설정되고 `IS_HOT` 열 유형이 `VARCHAR`로 설정되어야 합니다.

1. 다음 애플리케이션 코드를 사용합니다(이 코드를 복사하여 SQL 편집기에 붙여넣을 수 있습니다).

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   

1. SQL 코드를 실행하고 결과를 검토합니다.  
![\[rowtime, hotspot 및 hotspot_heat를 보여주는 SQL 코드 결과입니다.\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/hotspot-v2-40.png)





**다음 단계**  
[3단계: 애플리케이션 출력 구성](app-hotspots-create-ka-app-config-destination.md)

# 3단계: 애플리케이션 출력 구성
<a name="app-hotspots-create-ka-app-config-destination"></a>

이 시점에 [핫스팟 예](app-hotspots-detection.md)에서 Amazon Kinesis Data Analytics 애플리케이션 코드를 보유하여 스트리밍 소스로부터 중요 핫스팟을 발견하고 히트 점수를 각각에게 할당합니다.

이제 애플리케이션 내 스트림에서 외부 목적지인 또 다른 Kinesis 데이터 스트림 (`ExampleOutputStream`)으로 에플리케이션 결과를 보낼 수 있습니다. 그런 다음 핫스팟 점수를 분석하고 핫스팟 히트에 대한 적절한 임계점을 판단할 수 있습니다. 이 애플리케이션을 더욱 확장하여 알림을 만들 수 있습니다.

**애플리케이션 출력을 구성하려면**

1. [ https://console.aws.amazon.com/kinesisanalytics](https://console.aws.amazon.com/kinesisanalytics)에서 Kinesis Data Analytics 콘솔을 엽니다.

1. SQL 편집기에서 [**Destination**]을 선택하거나 애플리케이션 대시보드에서 [**Add a destination**]을 선택합니다.

1. **Add a destination(대상 주소 추가)** 페이지에서 **Select from your streams(나의 스트림에서 선택)**을 선택합니다. 그런 다음 이전 섹션에서 생성한 `ExampleOutputStream` 스트림을 선택합니다.

   이제 애플리케이션 내 스트림 `DESTINATION_SQL_STREAM`에 애플리케이션이 작성하는 모든 레코드를 Amazon Kinesis Data Analytics가 유지하는 외부 대상이 생성되었습니다.

1. 선택적으로 `ExampleOutputStream` 스트림을 모니터링하고 알림을 보내 AWS Lambda 도록를 구성할 수 있습니다. 자세한 내용은 [Lambda 함수를 출력으로 사용](how-it-works-output-lambda.md) 단원을 참조하십시오. [4단계: 애플리케이션 출력 확인](app-hotspots-verify-output.md)에서 설명한 대로 Kinesis Data Analytics가 외부 목적지인 Kinesis 스트림 `ExampleOutputStream`에 기록하는 레코드를 검토할 수 있습니다.

**다음 단계**  
[4단계: 애플리케이션 출력 확인](app-hotspots-verify-output.md)

# 4단계: 애플리케이션 출력 확인
<a name="app-hotspots-verify-output"></a>

[핫스팟 예](app-hotspots-detection.md)의 이번 섹션에서 Scalable Vector Graphics(SVG) 제어에서 핫스팟 정보를 표시하는 웹 애플리케이션을 설정합니다.

1. 다음 콘텐츠를 가진 `index.html`이라는 파일을 생성합니다:

   ```
   <!doctype html>
   <html lang=en>
   <head>
       <meta charset=utf-8>
       <title>hotspots viewer</title>
   
       <style>
       #visualization {
         display: block;
         margin: auto;
       }
   
       .point {
         opacity: 0.2;
       }
   
       .hot {
         fill: red;
       }
   
       .cold {
         fill: blue;
       }
   
       .hotspot {
         stroke: black;
         stroke-opacity: 0.8;
         stroke-width: 1;
         fill: none;
       }
       </style>
       <script src="https://sdk.amazonaws.com/js/aws-sdk-2.202.0.min.js"></script>
       <script src="https://d3js.org/d3.v4.min.js"></script>
   </head>
   <body>
   <svg id="visualization" width="600" height="600"></svg>
   <script src="hotspots_viewer.js"></script>
   </body>
   </html>
   ```

1. 명칭이 `hotspots_viewer.js`인 동일한 디렉터리에 다음 내용을 포함한 파일을 생성합니다. 제공된 변수에 자격 증명 및 출력 스트림 명칭을 입력합니다.

   ```
   // Visualize example output from the Kinesis Analytics hotspot detection algorithm.
   // This script assumes that the output stream has a single shard.
   
   // Modify this section to reflect your AWS configuration
   var awsRegion = "",        // The  where your Kinesis Analytics application is configured.
       accessKeyId = "",      // Your Access Key ID
       secretAccessKey = "",  // Your Secret Access Key
       outputStream = "";     // The name of the Kinesis Stream where the output from the HOTSPOTS function is being written
   
   // The variables in this section should reflect way input data was generated and the parameters that the HOTSPOTS
   // function was called with.
   var windowSize = 1000, // The window size used for hotspot detection
       minimumDensity = 40,  // A filter applied to returned hotspots before visualization
       xRange = [0, 10],  // The range of values to display on the x-axis
       yRange = [0, 10];  // The range of values to display on the y-axis
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // D3 setup
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var svg = d3.select("svg"),
       margin = {"top": 20, "right": 20, "bottom": 20, "left": 20},
       graphWidth = +svg.attr("width") - margin.left - margin.right,
       graphHeight = +svg.attr("height") - margin.top - margin.bottom;
   
   // Return the linear function that maps the segment [a, b] to the segment [c, d].
   function linearScale(a, b, c, d) {
       var m = (d - c) / (b - a);
       return function(x) {
           return c + m * (x - a);
       };
   }
   
   // helper functions to extract the x-value from a stream record and scale it for output
   var xValue = function(r) { return r.x; },
       xScale = linearScale(xRange[0], xRange[1], 0, graphWidth),
       xMap = function(r) { return xScale(xValue(r)); };
   
   // helper functions to extract the y-value from a stream record and scale it for output
   var yValue = function(r) { return r.y; },
       yScale = linearScale(yRange[0], yRange[1], 0, graphHeight),
       yMap = function(r) { return yScale(yValue(r)); };
   
   // a helper function that assigns a CSS class to a point based on whether it was generated as part of a hotspot
   var classMap = function(r) { return r.is_hot == "Y" ? "point hot" : "point cold"; };
   
   var g = svg.append("g")
       .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
   
   function update(records, hotspots) {
   
       var points = g.selectAll("circle")
           .data(records, function(r) { return r.dataIndex; });
   
       points.enter().append("circle")
           .attr("class", classMap)
           .attr("r", 3)
           .attr("cx", xMap)
           .attr("cy", yMap);
   
       points.exit().remove();
   
       if (hotspots) {
           var boxes = g.selectAll("rect").data(hotspots);
   
           boxes.enter().append("rect")
               .merge(boxes)
               .attr("class", "hotspot")
               .attr("x", function(h) { return xScale(h.minValues[0]); })
               .attr("y", function(h) { return yScale(h.minValues[1]); })
               .attr("width", function(h) { return xScale(h.maxValues[0]) - xScale(h.minValues[0]); })
               .attr("height", function(h) { return yScale(h.maxValues[1]) - yScale(h.minValues[1]); });
   
           boxes.exit().remove();
       }
   }
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Use the AWS SDK to pull output records from Kinesis and update the visualization
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var kinesis = new AWS.Kinesis({
       "region": awsRegion,
       "accessKeyId": accessKeyId,
       "secretAccessKey": secretAccessKey
   });
   
   var textDecoder = new TextDecoder("utf-8");
   
   // Decode an output record into an object and assign it an index value
   function decodeRecord(record, recordIndex) {
       var record = JSON.parse(textDecoder.decode(record.Data));
       var hotspots_result = JSON.parse(record.HOTSPOTS_RESULT);
       record.hotspots = hotspots_result.hotspots
           .filter(function(hotspot) { return hotspot.density >= minimumDensity});
       record.index = recordIndex
       return record;
   }
   
   // Fetch a new records from the shard iterator, append them to records, and update the visualization
   function getRecordsAndUpdateVisualization(shardIterator, records, lastRecordIndex) {
       kinesis.getRecords({
           "ShardIterator": shardIterator
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var newRecords = data.Records.map(function(raw) { return decodeRecord(raw, ++lastRecordIndex); });
           newRecords.forEach(function(record) { records.push(record); });
   
           var hotspots = null;
           if (newRecords.length > 0) {
               hotspots = newRecords[newRecords.length - 1].hotspots;
           }
   
           while (records.length > windowSize) {
               records.shift();
           }
   
           update(records, hotspots);
   
           getRecordsAndUpdateVisualization(data.NextShardIterator, records, lastRecordIndex);
       });
   }
   
   // Get a shard iterator for the output stream and begin updating the visualization. Note that this script will only
   // read records from the first shard in the stream.
   function init() {
       kinesis.describeStream({
           "StreamName": outputStream
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var shardId = data.StreamDescription.Shards[0].ShardId;
   
           kinesis.getShardIterator({
               "StreamName": outputStream,
               "ShardId": shardId,
               "ShardIteratorType": "LATEST"
           }, function(err, data) {
               if (err) {
                   console.log(err, err.stack);
                   return;
               }
               getRecordsAndUpdateVisualization(data.ShardIterator, [], 0);
           })
       });
   }
   
   // Start the visualization
   init();
   ```

1. 실행 중인 첫 번째 섹션의 Python 코드를 통해 웹 브라우저에서 `index.html`을 엽니다. 다음과 같이 핫스팟 정보가 페이지에 표시됩니다.

     
![\[핫스팟 정보를 표시하는 Scalable Vector Graphics 다이어그램\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/hotspots_visualizer.png)

# 예: 알림 및 오류
<a name="examples-alerts"></a>

이 섹션에서는 알림 및 오류를 사용하는 Kinesis Data Analytics 애플리케이션 예를 제공합니다. 각 예에는 Kinesis Data Analytics 애플리케이션을 설정 및 테스트에 도움이 되는 단계별 지침과 코드를 제공합니다.

**Topics**
+ [예: 간단한 알림 생성](app-simple-alerts.md)
+ [예: 조정된 알림 생성](app-throttled-alerts.md)
+ [예: 애플리케이션 내 오류 스트림 탐색](app-explore-error-stream.md)

# 예: 간단한 알림 생성
<a name="app-simple-alerts"></a>

이 Kinesis Data Analytics 애플리케이션에서는 데모 스트림에 대해 생성된 애플리케이션 내 스트림 상에서 쿼리가 연속적으로 실행됩니다. 자세한 설명은 [연속 쿼리](continuous-queries-concepts.md) 섹션을 참조하십시오.

임의의 행이 1%보다 큰 주가 변동을 보이는 경우, 해당 행은 또 다른 애플리케이션 내 스트림에 삽입됩니다. 실습에서 결과를 외부 대상에 유지하도록 애플리케이션 출력을 구성할 수 있습니다. 그런 다음 결과를 추가로 조사할 수 있습니다. 예를 들어 AWS Lambda 함수를 사용하여 레코드를 처리하고 알림을 보낼 수 있습니다.

**간단한 알림 애플리케이션을 만드는 방법**

1. Kinesis Data Analytics [시작하기](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)에 설명된 대로 분석 애플리케이션을 생성하십시오.

1. Kinesis Data Analytics의 SQL 편집기에서 애플리케이션 코드를 다음으로 바꿉니다: 

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
              (ticker_symbol VARCHAR(4), 
               sector        VARCHAR(12), 
               change        DOUBLE, 
               price         DOUBLE);
   
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM ticker_symbol, sector, change, price 
         FROM   "SOURCE_SQL_STREAM_001"
         WHERE  (ABS(Change / (Price - Change)) * 100) > 1;
   ```

   애플리케이션 코드의 `SELECT` 문은 1%보다 큰 주가 변동에 대해 `SOURCE_SQL_STREAM_001`의 행을 필터링합니다. 그런 다음 펌프를 사용하여 다른 애플리케이션 내 스트림 `DESTINATION_SQL_STREAM`에 이러한 행을 삽입합니다. 펌프를 사용하여 행을 애플리케이션 내 스트림에 삽입하는 방법을 설명하는 코딩 패턴에 관한 자세한 설명은 [애플리케이션 코드](how-it-works-app-code.md) 섹션을 참조하십시오.

1. [**Save and run SQL**]을 선택합니다.

1. 대상을 추가합니다. 이렇게 하려면 SQL 편집기에서 **대상 주소** 탭을 선택하거나 애플리케이션 세부 정보 페이지에서 **Add a destination(대상 추가)**을 선택합니다.

   1. SQL 편집기에서 **대상 주소** 탭을 선택한 다음 **Connect to a destination(대상에 연결)**을 선택합니다.

      **Connect to destination(대상에 연결)** 페이지에서 **Create New(새로 생성)**를 선택합니다.

   1. [**Go to Kinesis Streams**]를 선택합니다.

   1. Amazon Kinesis Data Streams 콘솔에서 샤드가 하나인 새로운 Kinesis 스트림을 생성합니다(예: `gs-destination`). 스트림 상태가 [**ACTIVE**]가 될 때까지 기다립니다.

   1. Kinesis Data Analytics 콘솔로 돌아가십시오. **Connect to destination(대상에 연결)** 페이지에서 앞서 생성한 스트림을 선택합니다.

      스트림이 표시되지 않으면 페이지를 새로 고칩니다.

   1. [**Save and continue**]를 선택합니다.

   이제 외부 목적지인 Kinesis 데이터 스트림이 생겼으며 Kinesis Data Analytics이 애플리케이션 내 스트림에 애플리케이션 출력을 지속합니다.

1. 생성한 Kinesis 스트림을 모니터링하고 Lambda 함수를 호출 AWS Lambda 하도록를 구성합니다.

   지침은 [Lambda 함수를 사용하여 데이터 사전 처리](lambda-preprocessing.md) 섹션을 참조하십시오.

# 예: 조정된 알림 생성
<a name="app-throttled-alerts"></a>

이 Kinesis Data Analytics 애플리케이션에서는 데모 스트림에 대해 생성된 애플리케이션 내 스트림 상에서 쿼리가 연속적으로 실행됩니다. 자세한 설명은 [연속 쿼리](continuous-queries-concepts.md) 섹션을 참조하십시오. 임의의 행이 1%보다 큰 주가 변동을 보이는 경우, 해당 행은 또 다른 애플리케이션 내 스트림에 삽입됩니다. 애플리케이션은 알림을 조정하고, 주가가 변동하는 즉시 알림이 전송됩니다. 하지만 주식 기호당 1분에 1개 이상의 알림이 애플리케이션 내 스트림으로 전송되지 않습니다.

**조정된 알림 애플리케이션을 생성하려면**

1. Kinesis Data Analytics [시작하기](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) 연습에 설명된 대로 Kinesis Data Analytics 애플리케이션을 생성합니다.

1. Kinesis Data Analytics의 SQL 편집기에서 애플리케이션 코드를 다음으로 바꿉니다: 

   ```
   CREATE OR REPLACE STREAM "CHANGE_STREAM" 
              (ticker_symbol VARCHAR(4), 
               sector        VARCHAR(12), 
               change        DOUBLE, 
               price         DOUBLE);
   
   CREATE OR REPLACE PUMP "change_pump" AS 
      INSERT INTO "CHANGE_STREAM"
         SELECT STREAM ticker_symbol, sector, change, price 
         FROM   "SOURCE_SQL_STREAM_001"
         WHERE  (ABS(Change / (Price - Change)) * 100) > 1;
         
   -- ** Trigger Count and Limit **
   -- Counts "triggers" or those values that evaluated true against the previous where clause
   -- Then provides its own limit on the number of triggers per hour per ticker symbol to what
   -- is specified in the WHERE clause
   
   CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM (
      ticker_symbol VARCHAR(4), 
      change REAL,
      trigger_count INTEGER);
   
   CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAM
   SELECT STREAM ticker_symbol, change, trigger_count
   FROM (
       SELECT STREAM ticker_symbol, change, COUNT(*) OVER W1 as trigger_count
       FROM "CHANGE_STREAM"
       --window to perform aggregations over last minute to keep track of triggers
       WINDOW W1 AS (PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING)
   )
   WHERE trigger_count >= 1;
   ```

   애플리케이션 코드의 `SELECT`문이 `SOURCE_SQL_STREAM_001`에서 주가 변동이 1%보다 큰 행을 필터링하여 펌프를 사용하여 또 다른 애플리케이션 내 스트림 `CHANGE_STREAM`에 삽입합니다.

   그런 다음 애플리케이션은 조정된 알림에 대해 `TRIGGER_COUNT_STREAM`이라는 두 번째 스트림을 생성합니다. 두 번째 쿼리는 레코드가 허용될 때마다 앞으로 건너뛰는 윈도우에서 레코드를 선택하여 1분에 주식 티커당 하나의 레코드만 스트림에 작성되도록 합니다.

1. [**Save and run SQL**]을 선택합니다.

위 예는 다음과 비슷한 스트림을 `TRIGGER_COUNT_STREAM`에 출력합니다.

![\[티커 기호, 백분율 변경 및 트리거 횟수 열을 포함하는 출력 스트림을 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/ex-throttle-alerts.png)


# 예: 애플리케이션 내 오류 스트림 탐색
<a name="app-explore-error-stream"></a>

Amazon Kinesis Data Analytics은 생성된 각 애플리케이션에 대해 애플리케이션 내 오류 스트림을 제공합니다. 애플리케이션이 처리할 수 없는 행은 모두 이 오류 스트림으로 전송됩니다. 조사를 위해 오류 스트림 데이터를 외부 대상에 유지하는 것을 고려할 수도 있습니다.

콘솔에서 다음 실습을 수행합니다. 이 예에서 검색 프로세스에서 유추된 스키마를 편집하여 오류를 입력 구성에 추가하고 오류 스트림에 전송된 행을 확인합니다.

**Topics**
+ [구문 분석 오류 추가](#intro-error-parse-error)
+ [0으로 나누기 오류 추가](#intro-error-divide-zero)

## 구문 분석 오류 추가
<a name="intro-error-parse-error"></a>

이 실습에서는 구문 분석 오류를 추가합니다.

1. Kinesis Data Analytics [시작하기](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) 연습에 설명된 대로 Kinesis Data Analytics 애플리케이션을 생성합니다.

1. 애플리케이션 세부 정보 페이지에서 **스트리밍 데이터 연결**을 선택합니다.

1. 시작하기 실습을 수행했으면 계정에 데모 스트림(`kinesis-analytics-demo-stream`)이 있을 것입니다. **Connect to source(소스에 연결)** 페이지에서 이 데모 스트림을 선택합니다.

1. Kinesis Data Analytics이 데모 스트림에서 샘플을 취하여 생성된 애플리케이션 내 입력 스트림에 대한 스키마를 유추합니다. 콘솔의 [**Formatted stream sample**] 탭에서 유추된 스키마와 샘플 데이터를 확인할 수 있습니다.

1. 다음으로 스키마를 편집하고 열 유형을 수정하여 구문 분석 오류를 추가합니다. **Edit schema(스키마 편집)**를 선택합니다.

1. `TICKER_SYMBOL` 열 유형을 `VARCHAR(4)`에서 `INTEGER`로 변경합니다.

   생성된 애플리케이션 내 스키마의 열 유형이 유효하지 않다면, Kinesis Data Analytics이 애플리케이션 내 스트림으로 데이터를 가져올 수 없습니다. 대신 행을 오류 스트림에 전송합니다.

1. [**Save schema**]를 선택합니다.

1. [**Refresh schema samples**]를 선택합니다.

   [**Formatted stream**] 샘플에 행이 없다는 점에 유의하십시오. 그러나 [**Error stream**] 탭에서는 오류 메시지와 함께 데이터를 보여 줍니다. [**Error stream**] 탭에서는 애플리케이션 내 오류 스트림으로 전송된 데이터를 보여 줍니다.

   열 데이터 유형을 변경했기 때문에 Kinesis Data Analytics가 애플리케이션 내 입력 스트림에서 데이터를 가져올 수 없었습니다. 대신 데이터를 오류 스트림으로 전송했습니다.

## 0으로 나누기 오류 추가
<a name="intro-error-divide-zero"></a>

이 연습에서는 애플리케이션 코드를 업데이트하여 런타임 오류(0으로 나누기)를 추가합니다. Amazon Kinesis Data Analytics는 당초 결과를 쓰여 질 애플리케이션 내 스트림 대신`DESTINATION_SQL_STREAM` 애플리케이션 내 오류 스트림으로 결과 행을 전송합니다.



1. Kinesis Data Analytics [시작하기](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html) 연습에 설명된 대로 Kinesis Data Analytics 애플리케이션을 생성합니다.

   다음과 같이 [**Real-time analytics**] 탭에서 결과를 확인합니다.

   Sour

1. 애플리케이션 코드에서 `SELECT` 문을 업데이트하여 0으로 나누기를 추가합니다. 예: 

   ```
   SELECT STREAM ticker_symbol, sector, change, (price / 0) as ProblemColumn
   FROM "SOURCE_SQL_STREAM_001"
   WHERE sector SIMILAR TO '%TECH%';
   ```

   

1. 애플리케이션을 실행합니다.

   0으로 나누기 런타임 오류가 발생하기 때문에 결과를 `DESTINATION_SQL_STREAM`에 작성하지 않고 Kinesis Data Analytics는 행을 애플리케이션 내 오류 스트림으로 전송합니다. **Real-time analytics(실시간 분석)** 탭에서 오류 스트림을 선택하면 애플리케이션 내 오류 스트림에서 행을 볼 수 있습니다.

# 예: 솔루션 액셀러레이터
<a name="examples_solution"></a>

[AWS 솔루션 사이트에](https://aws.amazon.com/solutions/)는 전체 스트리밍 데이터 솔루션을 빠르게 생성하는 데 사용할 수 있는 AWS CloudFormation 템플릿이 있습니다.

다음의 템플릿을 사용할 수 있습니다:

## AWS 계정 활동에 대한 실시간 인사이트
<a name="examples_solution_activity"></a>

이 솔루션은 AWS 계정()에 대한 리소스 액세스 및 사용량 지표를 실시간으로 기록하고 시각화합니다. 자세한 내용은 [AWS 계정 활동에 대한 실시간 인사이트](https://docs.aws.amazon.com/solutions/latest/real-time-insights-account-activity/welcome.html)를 참조하세요.

## Kinesis Data Analytics를 사용한 실시간 AWS IoT 디바이스 모니터링
<a name="examples_solution_iot"></a>

이 솔루션은 IoT 기기 연결 및 활동 데이터를 실시간으로 수집, 처리, 분석 및 가시화합니다. 자세한 내용은 [Kinesis Data Analytics를 사용한 실시간 AWS IoT 디바이스 모니터링을](https://docs.aws.amazon.com/solutions/latest/real-time-iot-device-monitoring-with-kinesis/welcome.html) 참조하세요.

## Kinesis Data Analytics를 사용한 실시간 웹 Analytics
<a name="examples_solution_web"></a>

이 솔루션은 웹 사이트 클릭스트림 데이터를 실시간으로 수집, 처리, 분석 및 가시화합니다. 자세한 설명은 [Kinesis Data Analytics을 통한 실시간 웹 분석](https://docs.aws.amazon.com/solutions/latest/real-time-web-analytics-with-kinesis/welcome.html)을 참조하십시오.

## Amazon Connected Vehicle 솔루션
<a name="examples_solution_vehicle"></a>

이 솔루션은 차량의 IoT 데이터를 실시간으로 수집, 처리, 분석 및 가시화합니다. 자세한 설명은 [Amazon Connected Vehicle 솔루션](https://docs.aws.amazon.com//solutions/latest/connected-vehicle-solution/welcome.html) 섹션을 참조하십시오.