

신중한 고려 끝에 Amazon Kinesis Data Analytics for SQL 애플리케이션을 중단하기로 결정했습니다.

1. **2025년 9월 1**일부터 Amazon Kinesis Data Analytics for SQL 애플리케이션에 대한 버그 수정은 제공되지 않습니다. 곧 중단될 예정이므로 지원이 제한될 예정이기 때문입니다.

2. **2025년 10월 15**일부터 새 Kinesis Data Analytics for SQL 애플리케이션을 생성할 수 없습니다.

3. **2026년 1월 27**일부터 애플리케이션이 삭제됩니다. Amazon Kinesis Data Analytics for SQL 애플리케이션을 시작하거나 작동할 수 없게 됩니다. 그 시점부터 Amazon Kinesis Data Analytics for SQL에 대한 지원을 더 이상 이용할 수 없습니다. 자세한 내용은 [Amazon Kinesis Data Analytics for SQL 애플리케이션 단종](discontinuation.md) 단원을 참조하십시오.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# 예: 스트림에서 데이터 변칙을 감지하는 방법(RANDOM\$1CUT\$1FOREST 함수)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics는 수 열에 있는 값을 바탕으로 각 레코드의 이상 점수를 할당할 수 있는 함수(`RANDOM_CUT_FOREST`)를 제공합니다. 자세한 설명은 *Amazon Managed Service for Apache Flink SQL 참조*에서 [`RANDOM_CUT_FOREST` 함수](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)를 참조하십시오.

이 실습에서는 애플리케이션 코드를 작성해 변칙 점수를 애플리케이션의 스트리밍 소스에 있는 레코드에 할당합니다. 애플리케이션을 설정하려면 다음을 수행합니다:

1. **스트리밍 소스 설정** – 다음과 같이 샘플 `heartRate` 데이터 스트림을 설정하고 샘플 데이터를 작성합니다:

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   이 절차에서는 Python 스크립트를 통해 스트림을 채우는 방법을 알아봅니다. `heartRate` 값은 무작위로 생성되는데, `heartRate` 값이 60과 100 사이인 레코드가 99%이고 `heartRate` 값이 150과 200 사이인 레코드는 1%밖에 되지 않습니다. 따라서 `heartRate` 값이 150과 200 사이인 레코드는 변칙입니다.

1. **입력 구성** – 콘솔을 사용하여 Kinesis Data Analytics 애플리케이션을 생성하고, 스트리밍 소스를 애플리케이션 내 스트림(`SOURCE_SQL_STREAM_001`)에 매핑함으로써 애플리케이션 입력을 구성합니다. 애플리케이션을 시작하면 Kinesis Data Analytics이 지속적으로 스트리밍 소스를 읽어서 애플리케이션 내 스트림에 레코드를 삽입합니다.

1. **애플리케이션 코드 지정** – 이 예는 다음 애플리케이션 코드를 사용합니다:

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   코드가 `SOURCE_SQL_STREAM_001`에 있는 행을 읽고, 변칙 점수를 할당하고, 결과 행을 또 다른 애플리케이션 내 스트림(`TEMP_STREAM`)에 작성합니다. 그런 다음 애플리케이션 코드가 `TEMP_STREAM`에 있는 레코드를 정렬하고 결과를 또 다른 애플리케이션 내 스트림(`DESTINATION_SQL_STREAM`)에 저장합니다. 펌프를 사용하여 행을 애플리케이션 내 스트림에 삽입합니다. 자세한 설명은 [애플리케이션 내 스트림과 펌프](streams-pumps.md) 섹션을 참조하십시오.

1. **출력 구성** – `DESTINATION_SQL_STREAM`에 있는 데이터를 또 다른 Kinesis 데이터 스트림인 외부 대상에 유지하도록 애플리케이션 출력을 구성합니다. 각 레코드에 할당된 변칙 점수를 검토하고 어떤 점수가 변칙을 나타내는지 판단하는 것은 애플리케이션 외부에서 이루어집니다. AWS Lambda 함수를 사용하여 이러한 이상 점수를 처리하고 알림을 구성할 수 있습니다.

이 연습에서는 미국 동부(버지니아 북부)`us-east-1`)를 사용하여 이러한 스트림과 애플리케이션을 생성합니다. 다른 리전을 사용하는 경우 그에 따라 코드를 업데이트해야 합니다.

**Topics**
+ [

# 1단계: 준비
](app-anomaly-prepare.md)
+ [

# 단계 2: 애플리케이션 만들기
](app-anom-score-create-app.md)
+ [

# 3단계: 애플리케이션 출력 구성
](app-anomaly-create-ka-app-config-destination.md)
+ [

# 4단계: 출력 확인
](app-anomaly-verify-output.md)

**다음 단계**  
[1단계: 준비](app-anomaly-prepare.md)

# 1단계: 준비
<a name="app-anomaly-prepare"></a>

이 연습에 사용할 Amazon Kinesis Data Analytics 애플리케이션을 생성하기 전에 Kinesis 데이터 스트림 두 개를 생성해야 합니다. 스트림 중 하나를 애플리케이션의 스트리밍 소스로 구성하고 또 다른 스트림을 Kinesis Data Analytics가 애플리케이션 출력을 유지하는 목적지로 구성합니다.

**Topics**
+ [

## 1.1단계: 입력 및 출력 데이터 스트림 생성
](#app-anomaly-create-two-streams)
+ [

## 1.2단계: 샘플 레코드를 입력 스트림에 작성
](#app-anomaly-write-sample-records-inputstream)

## 1.1단계: 입력 및 출력 데이터 스트림 생성
<a name="app-anomaly-create-two-streams"></a>

이 섹션에서는 2개의 Kinesis 스트림을 생성합니다: `ExampleInputStream` 및 `ExampleOutputStream`. AWS Management Console 또는 AWS CLI을(를) 사용하여 이러한 스트림을 만들 수 있습니다.
+ 

**콘솔을 사용하려면**

  1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

  1. **데이터 스트림 생성**을 선택합니다. 샤드가 하나인 스트림(`ExampleInputStream`이라고 함)을 생성합니다. 자세한 설명은 *Amazon Kinesis Data Streams 개발자 가이드*의 [스트림 생성](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)을 참조하십시오.

  1. 이전 단계를 반복하여 샤드가 하나인 스트림(`ExampleOutputStream`이라고 함)을 생성합니다.
+ 

**를 사용하려면 AWS CLI**

  1. 다음 Kinesis `create-stream` AWS CLI 명령을 사용하여 첫 번째 스트림()을 생성합니다`ExampleInputStream`.

     ```
     $ aws kinesis create-stream \
     --stream-name ExampleInputStream \
     --shard-count 1 \
     --region us-east-1 \
     --profile adminuser
     ```

  1. 동일한 명령을 실행하여 스트림 명칭을 `ExampleOutputStream`으로 변경합니다. 이 명령은 두 번째 스트림을 생성하고 애플리케이션은 이를 사용하여 출력을 작성합니다.

## 1.2단계: 샘플 레코드를 입력 스트림에 작성
<a name="app-anomaly-write-sample-records-inputstream"></a>

이 단계에서는 Python 코드를 실행하여 샘플 레코드를 연속적으로 생성하고 이러한 레코드를 `ExampleInputStream` 스트림에 작성합니다.

```
{"heartRate": 60, "rateType":"NORMAL"} 
...
{"heartRate": 180, "rateType":"HIGH"}
```

1. Python 및 `pip`를 설치합니다.

   Python 설치에 관한 정보는 [Python](https://www.python.org/) 웹사이트를 참조하십시오.

   pip를 사용하여 종속 프로그램을 설치할 수 있습니다. pip 설치에 관한 정보는 pip 웹 사이트에 있는 [Installation](https://pip.pypa.io/en/stable/installing/)을 참조하십시오.

1. 다음 Python 코드를 실행합니다. 코드에 있는 `put-record` 명령이 JSON 레코드를 스트림에 작성합니다.

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class RateType(Enum):
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_heart_rate(rate_type):
       if rate_type == RateType.normal:
           rate = random.randint(60, 100)
       elif rate_type == RateType.high:
           rate = random.randint(150, 200)
       else:
           raise TypeError
       return {"heartRate": rate, "rateType": rate_type.value}
   
   
   def generate(stream_name, kinesis_client, output=True):
       while True:
           rnd = random.random()
           rate_type = RateType.high if rnd < 0.01 else RateType.normal
           heart_rate = get_heart_rate(rate_type)
           if output:
               print(heart_rate)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(heart_rate),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**다음 단계**  
[단계 2: 애플리케이션 만들기](app-anom-score-create-app.md)

# 단계 2: 애플리케이션 만들기
<a name="app-anom-score-create-app"></a>

이 섹션에서는 다음과 같이 Amazon Kinesis Data Analytics 애플리케이션을 생성합니다:
+ [1단계: 준비](app-anomaly-prepare.md)에서 스트리밍 소스로 생성한 Kinesis 데이터 스트림을 사용하도록 애플리케이션 입력을 구성합니다.
+ 콘솔에서 **변칙 감지** 템플릿을 사용합니다.

**애플리케이션을 생성하는 방법**

1. Kinesis Data Analytics **시작하기** 연습에서의 1, 2 및 3단계를 따르십시오 ([단계 3.1: 애플리케이션 만들기](get-started-create-app.md) 참조).
   + 소스 구성에서 다음을 수행합니다:
     + 이전 섹션에서 생성한 스트리밍 소스를 지정합니다.
     + 콘솔이 스키마를 유추한 후에 스키마를 편집하고 `heartRate` 열 유형을 `INTEGER`로 설정합니다.

       대부분의 심박수 값은 정상이며 검색 프로세스는 이 열에 `TINYINT` 유형을 할당할 가능성이 높습니다. 그러나 높은 심박수를 나타내는 값은 백분율이 매우 낮습니다. 이러한 높은 값이 `TINYINT` 유형에 부합하지 않을 경우 Kinesis Data Analytics는 해당 행을 오류 스트림으로 전송합니다. 생성된 모든 심박수 데이터를 수용할 수 있도록 데이터 유형을 `INTEGER`로 업데이트합니다.
   + 콘솔에서 **변칙 감지** 템플릿을 사용합니다. 그런 다음 템플릿 코드를 업데이트하여 적절한 열 명칭을 부여합니다.

1. 열 명칭을 부여하여 애플리케이션 코드를 업데이트합니다. 결과로 얻은 애플리케이션 코드는 다음과 같이 표시됩니다(이 코드를 복사하여 SQL 편집기에 붙여넣습니다).

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   

1. Kinesis Data Analytics 콘솔에서 SQL 코드를 실행하고 결과를 검토합니다:  
![\[애플리케이션 내 스트림의 결과 데이터가 포함된 실시간 분석 탭을 보여주는 콘솔 스크린샷\]](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/anom-v2-40.png)





**다음 단계**  
[3단계: 애플리케이션 출력 구성](app-anomaly-create-ka-app-config-destination.md)

# 3단계: 애플리케이션 출력 구성
<a name="app-anomaly-create-ka-app-config-destination"></a>

[단계 2: 애플리케이션 만들기](app-anom-score-create-app.md) 완료 이후 스트리밍 소스로부터 심박수 데이터를 읽고 변칙 점수를 각각에 할당하는 애플리케이션 코드를 가지고 있습니다.

이제 애플리케이션 내 스트림에서 외부 목적지인 또 다른 Kinesis 데이터 스트림(`OutputStreamTestingAnomalyScores`)으로 애플리케이션 결과를 전송할 수 있습니다. 변칙 점수를 분석하여 어떤 심박수가 변칙인지 판단할 수 있습니다. 이 애플리케이션을 더욱 확장하여 알림을 만들 수 있습니다.

다음 단계에 따라 애플리케이션 출력을 구성합니다.



1. Amazon Kinesis Data Analytics 콘솔을 엽니다. SQL 편집기에서 [**Destination**]을 선택하거나 애플리케이션 대시보드에서 [**Add a destination**]을 선택합니다.

1. **Connect to destination(대상 주소에 연결)** 페이지에서 이전 섹션을 통해 생성한 `OutputStreamTestingAnomalyScores` 스트림을 선택합니다.

   이제 애플리케이션 내 스트림 `DESTINATION_SQL_STREAM`에 애플리케이션이 작성하는 모든 레코드를 Amazon Kinesis Data Analytics가 유지하는 외부 대상이 생성되었습니다.

1. 선택적으로 `OutputStreamTestingAnomalyScores` 스트림을 모니터링하고 알림을 보내 AWS Lambda 도록를 구성할 수 있습니다. 지침은 [Lambda 함수를 사용하여 데이터 사전 처리](lambda-preprocessing.md) 섹션을 참조하십시오. 알림을 설정하지 않으면 [4단계: 출력 확인](app-anomaly-verify-output.md)에 설명된 대로, Kinesis Data Analytics이 외부 목적지인 Kinesis 데이터 스트림 `OutputStreamTestingAnomalyScores`으로 기록하는 레코드를 검토할 수 있습니다.

**다음 단계**  
[4단계: 출력 확인](app-anomaly-verify-output.md)

# 4단계: 출력 확인
<a name="app-anomaly-verify-output"></a>

[3단계: 애플리케이션 출력 구성](app-anomaly-create-ka-app-config-destination.md)에서 애플리케이션 출력을 구성한 이후 다음 AWS CLI 명령을 사용하여 애플리케이션이 작성한 대상 스트림 내의 레코드를 읽습니다.

1. `get-shard-iterator` 명령을 실행하여 출력 스트림 상의 데이터에 대한 포인터를 확보합니다.

   ```
   aws kinesis get-shard-iterator \
   --shard-id shardId-000000000000 \
   --shard-iterator-type TRIM_HORIZON \
   --stream-name OutputStreamTestingAnomalyScores \
   --region us-east-1 \
   --profile adminuser
   ```

   다음 예 응답에서와 같이 샤드 반복자 값을 지닌 응답을 얻습니다:

   ```
     {      
         "ShardIterator":
         "shard-iterator-value"   }
   ```

   샤드 반복자 값을 복사합니다.

1.  AWS CLI `get-records` 명령을 실행합니다.

   ```
   aws kinesis get-records \
   --shard-iterator shared-iterator-value \
   --region us-east-1 \
   --profile adminuser
   ```

   이 명령은 다음 레코드 세트를 가져오는 후속 `get-records` 명령에서 사용할 수 있는 레코드 페이지와 또 다른 샤드 반복자를 반환합니다.