

신중한 고려 후 Amazon Kinesis Data Analytics for SQL 애플리케이션을 중단하기로 결정했습니다.

1. **2025년 9월 1**일부터 Amazon Kinesis Data Analytics for SQL 애플리케이션에 대한 버그 수정은 제공되지 않습니다. 곧 중단될 예정이므로 지원이 제한될 예정이기 때문입니다.

2. **2025년 10월 15**일부터 새 Kinesis Data Analytics for SQL 애플리케이션을 생성할 수 없습니다.

3. **2026년 1월 27**일부터 애플리케이션이 삭제됩니다. Amazon Kinesis Data Analytics for SQL 애플리케이션을 시작하거나 작동할 수 없게 됩니다. 그 시점부터 Amazon Kinesis Data Analytics for SQL에 대한 지원을 더 이상 이용할 수 없습니다. 자세한 내용은 [Amazon Kinesis Data Analytics for SQL 애플리케이션 단종](discontinuation.md) 단원을 참조하십시오.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Managed Service for Apache Flink Studio로 마이그레이션하는 예
<a name="migrating-to-kda-studio-overview"></a>

신중한 고려 끝에 Amazon Kinesis Data Analytics for SQL 애플리케이션을 단종하기로 결정했습니다. Amazon Kinesis Data Analytics for SQL 애플리케이션을 계획하고 마이그레이션하는 데 도움이 되도록 15개월에 걸쳐 오퍼링을 점진적으로 단종할 예정입니다. 중요한 날짜는 **2025년 9월 1일,** 202**5년 10월 15일**, **2026년 1월 27**일입니다.

1. **2025년 9월 1**일부터 Amazon Kinesis Data Analytics for SQL 애플리케이션에 대한 버그 수정은 제공되지 않습니다. 곧 중단될 예정이므로 지원이 제한될 예정이기 때문입니다.

1. **2025년 10월 15일**부터는 새 Amazon Kinesis Data Analytics for SQL 애플리케이션을 생성할 수 없습니다.

1. **2026년 1월 27**일부터 애플리케이션이 삭제됩니다. Amazon Kinesis Data Analytics for SQL 애플리케이션을 시작하거나 작동할 수 없게 됩니다. 그 시점부터 Amazon Kinesis Data Analytics for SQL 애플리케이션에 대한 지원을 더 이상 이용할 수 없습니다. 자세한 내용은 [Amazon Kinesis Data Analytics for SQL 애플리케이션 단종](discontinuation.md)를 참조하세요.

[Amazon Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html)를 사용하는 것이 좋습니다. 사용 편의성과 고급 분석 기능을 결합하여 몇 분 만에 스트림 처리 애플리케이션을 빌드할 수 있습니다.

이 섹션에서는 Amazon Kinesis Data Analytics for SQL 애플리케이션 워크로드를 Managed Service for Apache Flink로 이동하는 데 도움이 되는 코드 및 아키텍처 예를 제공합니다.

자세한 내용은 [AWS 블로그 게시물: Migrate from Amazon Kinesis Data Analytics for SQL Applications to Managed Service for Apache Flink Studio](https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-managed-service-for-apache-flink-studio/)를 참조하세요.

## Managed Service for Apache Flink Studio에서 Kinesis Data Analytics for SQL 쿼리를 복제하는 방법
<a name="examples-migrating-to-kda-studio"></a>

워크로드를 Managed Service for Apache Flink Studio 또는 Managed Service for Apache Flink로 마이그레이션하기 위해 이 섹션에서는 일반적인 사용 사례에 사용할 수 있는 쿼리 번역을 제공합니다.

이러한 예를 살펴보기 전에 먼저 [Managed Service for Apache Flink가 포함된 Studio 노트북 사용](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html)을 검토하는 것이 좋습니다.

### Managed Service for Apache Flink Studio에서 Kinesis Data Analytics for SQL 쿼리를 다시 만들기
<a name="examples-recreating-queries"></a>

아래 옵션에서는 일반적인 SQL 기반 Kinesis Data Analytics 애플리케이션 쿼리를 Managed Service for Apache Flink Studio로 변환하는 것을 보여줍니다.

#### 다단계 애플리케이션
<a name="Multi-Step-application"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_001" (
   ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16), price REAL, change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_001" AS 
INSERT INTO
   "IN_APP_STREAM_001"
   SELECT
      STREAM APPROXIMATE_ARRIVAL_TIME,
      ticker_symbol,
      sector,
      price,
      change FROM "SOURCE_SQL_STREAM_001";
-- Second in-app stream and pump
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_02" AS 
INSERT INTO
   "IN_APP_STREAM_02"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_001";
-- Destination in-app stream and third pump
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_03" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_02";
```

------
#### [ Managed Service for Apache Flink Studio ]

```
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;           
           
CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE,
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 

FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) 
   PARTITIONED BY (TICKER_SYMBOL) WITH (
      'connector' = 'kinesis',
      'stream' = 'kinesis-analytics-demo-stream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS IN_APP_STREAM_001;

CREATE TABLE IN_APP_STREAM_001 ( 
   INGEST_TIME TIMESTAMP,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
      'connector' = 'kinesis', 
      'stream' = 'IN_APP_STREAM_001', 
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS IN_APP_STREAM_02;

CREATE TABLE IN_APP_STREAM_02 (
   INGEST_TIME TIMESTAMP, 
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE, 
   CHANGE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'IN_APP_STREAM_02',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), 
   PRICE DOUBLE, CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'DESTINATION_SQL_STREAM',
   'aws.region' = 'us-east-1', 
   'scan.stream.initpos' = 'LATEST',  
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');


Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_001 
      SELECT
         APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         SOURCE_SQL_STREAM_001;


Query 3 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_02 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_001;


Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_02;
```

------

#### DateTime 값 변환
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT, 
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),   
   EVENT_UNIX_TIMESTAMP INT,
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT)
   
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 간단한 알림
<a name="simple-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price 
FROM
   "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4), 
   CHANGE DOUBLE,       
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
```

------

#### 조정된 알림
<a name="throttled-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CHANGE_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price
FROM "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1;
-- ** Trigger Count and Limit **
-- Counts "triggers" or those values that evaluated true against the previous where clause
-- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause

CREATE 
OR REPLACE STREAM TRIGGER_COUNT_STREAM (
   ticker_symbol VARCHAR(4),
   change REAL,
   trigger_count INTEGER);
   
CREATE 
OR REPLACE PUMP trigger_count_pump AS 
INSERT INTO
   TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol,
   change,
   trigger_count 
FROM
   (
      SELECT
         STREAM ticker_symbol,
         change,
         COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers
         WINDOW W1 AS 
         (
            PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING
         )
   )
WHERE
   trigger_count >= 1;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4),      
   CHANGE DOUBLE, PRICE DOUBLE,
   EVENT_TIME AS PROCTIME()) 
PARTITIONED BY (TICKER_SYMBOL) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');   
DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM;
CREATE TABLE TRIGGER_COUNT_STREAM ( 
   TICKER_SYMBOL VARCHAR(4), 
   CHANGE DOUBLE, 
   TRIGGER_COUNT INT) 
PARTITIONED BY (TICKER_SYMBOL);

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
      
Query 3 - % flink.ssql(type = 
update
) 
   SELECT * 
   FROM(
         SELECT
            TICKER_SYMBOL,
            CHANGE,
            COUNT(*) AS TRIGGER_COUNT 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER_SYMBOL,
            CHANGE 
      )
   WHERE
      TRIGGER_COUNT > 1;
```

------

#### 쿼리에서 부분적 결과 집계
<a name="aggregate-partial-results"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
INSERT INTO
   "CALC_COUNT_SQL_STREAM"(
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER_SYMBOL",
      STEP("SOURCE_SQL_STREAM_001",
      "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
      COUNT(*) AS "TickerCount "
   FROM
      "SOURCE_SQL_STREAM_001" 
   GROUP BY
      STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE),
      STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
      TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER",
      "TRADETIME",
      SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" 
   FROM
      "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
      (
         PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
      )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;
CREATE TABLE SOURCE_SQL_STREAM_001 (
   TICKER_SYMBOL VARCHAR(4),
   TRADETIME AS PROCTIME(),
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 
FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',   
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM;
CREATE TABLE CALC_COUNT_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND,   
   TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 
      'connector' = 'kinesis',
      'stream' = 'CALC_COUNT_SQL_STREAM',
      'aws.region' = 'us-east-1',       
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, 
   TICKERCOUNT BIGINT NOT NULL )
   PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 
      'stream' = 'DESTINATION_SQL_STREAM',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');

Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      CALC_COUNT_SQL_STREAM 
      SELECT
         TICKER,
         TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME,
         TICKERCOUNT 
      FROM
         (
            SELECT
               TICKER_SYMBOL AS TICKER,
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME,
               COUNT(*) AS TICKERCOUNT 
            FROM
               SOURCE_SQL_STREAM_001 
            GROUP BY
               TUMBLE(TRADETIME, INTERVAL '1' MINUTE),
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'),
               DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'),
               TICKER_SYMBOL 
         )
;

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      CALC_COUNT_SQL_STREAM;
      
Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         TICKER,
         TRADETIME,
         SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT 
      FROM
         CALC_COUNT_SQL_STREAM WINDOW W1 AS 
         (
            PARTITION BY TICKER 
         ORDER BY
            TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING
         )
;

Query 5 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      DESTINATION_SQL_STREAM;
```

------

#### 문자열 값 변환
<a name="transform-string-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   SUBSTRING("referrer", 12, 
   (
      POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4
   )
) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         substring(referrer, 12, 6) as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Regex를 사용하여 하위 문자열 대체
<a name="substring-regex"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME()) 
PARTITIONED BY (referrer) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         REGEXP_REPLACE(referrer, 'http', 'https') as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### Regex 로그 파싱
<a name="regex-log-parse"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   sector VARCHAR(24),
   match1 VARCHAR(24),
   match2 VARCHAR(24));
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM T.SECTOR,
      T.REC.COLUMN1,
      T.REC.COLUMN2 
   FROM
      (
         SELECT
            STREAM SECTOR,
            REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC 
         FROM
            SOURCE_SQL_STREAM_001
      )
      AS T;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   CHANGE DOUBLE, PRICE DOUBLE,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16)) 
PARTITIONED BY (SECTOR) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
SELECT
   * 
FROM
   (
      SELECT
         SECTOR,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 
      FROM
         DESTINATION_SQL_STREAM 
   )
WHERE
   MATCH1 IS NOT NULL 
   AND MATCH2 IS NOT NULL;
```

------

#### DateTime 값 변환
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( 
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT,
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),
   EVENT_UNIX_TIMESTAMP INT,    
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT) PARTITIONED BY (TICKER)
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 창 및 집계
<a name="windows-aggregation"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   event_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   ticker_count INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM EVENT_TIME,
      TICKER,
      COUNT(TICKER) AS ticker_count 
   FROM
      "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY 
         TICKER,
         EVENT_TIME RANGE INTERVAL '1' MINUTE);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND,    
   TICKER VARCHAR(4),
   TICKER_COUNT INT) PARTITIONED BY (TICKER) 
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         EVENT_TIME,
         TICKER, COUNT(TICKER) AS ticker_count 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(EVENT_TIME,
         INTERVAL '60' second),
         EVENT_TIME, TICKER;
```

------

#### ROWTIME을 사용하는 텀블링 창
<a name="tumbling-windows-rowtime"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   MIN_PRICE REAL,
   MAX_PRICE REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      MIN(PRICE),
      MAX(PRICE)
   FROM
      "SOURCE_SQL_STREAM_001"
   GROUP BY
      TICKER,
      STEP("SOURCE_SQL_STREAM_001".
            ROWTIME BY INTERVAL '60' SECOND);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   ticker VARCHAR(4),
   price DOUBLE,
   event_time VARCHAR(32),
   processing_time AS PROCTIME()) 
PARTITIONED BY (ticker) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601') 

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ticker,
         min(price) AS MIN_PRICE,
         max(price) AS MAX_PRICE 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(processing_time, INTERVAL '60' second),
         ticker;
```

------

#### 가장 자주 발생하는 값 검색(TOP\_K\_ITEMS\_TUMBLING)
<a name="retrieve-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM"TICKER_SYMBOL",
   STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
   COUNT(*) AS "TickerCount"
FROM
   "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".
   ROWTIME BY INTERVAL '1' MINUTE),
   STEP("SOURCE_SQL_STREAM_001".
      "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
   TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM "TICKER",
   "TRADETIME",
   SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
FROM
   "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
   (
      PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
   )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM ( 
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) 
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      (
         SELECT
            TICKER,
            COUNT(*) as MOST_FREQUENT_VALUES,
            ROW_NUMBER() OVER (PARTITION BY TICKER 
         ORDER BY
            TICKER DESC) AS row_num 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER
      )
   WHERE
      row_num <= 5;
```

------

#### 대략적인 Top-K 항목
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM ITEM,
      ITEM_COUNT 
   FROM
      TABLE(TOP_K_ITEMS_TUMBLING(CURSOR(
      SELECT
         STREAM * 
      FROM
         "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flinkssql
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 
CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), 
PRICE DOUBLE) 
   WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001',
'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');


%flink.ssql(type=update)
SELECT
   * 
FROM
   (
      SELECT
         *,
         ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW 
      ORDER BY
         ITEM_COUNT DESC) as rownum 
      FROM
         (
            select
               AGG_WINDOW,
               ITEM,
               ITEM_COUNT 
            from
               (
                  select
                     TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW,
                     ITEM,
                     count(*) as ITEM_COUNT 
                  FROM
                     SOURCE_SQL_STREAM_001 
                  GROUP BY
                     TUMBLE(TS, INTERVAL '60' SECONDS),
                     ITEM
               )
         )
   )
where
   rownum <= 3
```

------

#### 웹 로그 구문 파싱(W3C\_LOG\_PARSE 함수)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), 
   column2 VARCHAR(16), 
   column3 VARCHAR(16), 
   column4 VARCHAR(16), 
   column5 VARCHAR(16), 
   column6 VARCHAR(16), 
   column7 VARCHAR(16));
CREATE 
OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM l.r.COLUMN1,
   l.r.COLUMN2,
   l.r.COLUMN3,
   l.r.COLUMN4,
   l.r.COLUMN5,
   l.r.COLUMN6,
   l.r.COLUMN7 
FROM
   (
      SELECT
         STREAM W3C_LOG_PARSE("log", 'COMMON') 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   AS l(r);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis', 
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5),
      SPLIT_INDEX(log, ' ', 6) 
   from
      SOURCE_SQL_STREAM_001;
```

------

#### 문자열을 복수의 필드로 분할(VARIABLE\_COLUMN\_LOG\_PARSE 함수)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16),
   "column_B" VARCHAR(16),
   "column_C" VARCHAR(16),
   "COL_1" VARCHAR(16),
   "COL_2" VARCHAR(16),
   "COL_3" VARCHAR(16));
CREATE 
OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM t."Col_A",
   t."Col_B",
   t."Col_C",
   t.r."COL_1",
   t.r."COL_2",
   t.r."COL_3"
FROM
   (
      SELECT
         STREAM "Col_A",
         "Col_B",
         "Col_C",
         VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
         'COL_1 TYPE VARCHAR(16),
         COL_2 TYPE VARCHAR(16),
         COL_3 TYPE VARCHAR(16)', ',') AS r 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   as t;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis',
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5)
)
from
   SOURCE_SQL_STREAM_001;
```

------

#### 조인
<a name="joins"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   ticker_symbol VARCHAR(4),
   "Company" varchar(20),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ticker_symbol,
      "c"."Company",
      sector,
      change,
      priceFROM "SOURCE_SQL_STREAM_001" 
      LEFT JOIN
         "CompanyName" as "c"
         ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(12),
   CHANGE INT,
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',   
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - CREATE TABLE CompanyName (
   Ticker VARCHAR(4),
   Company VARCHAR(4)) WITH ( 
      'connector' = 'filesystem',
      'path' = 's3://kda-demo-sample/TickerReference.csv',       
      'format' = 'csv' );

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      c.Company,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
      LEFT JOIN
         CompanyName as c 
         ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
```

------

#### 오류
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   (
      price / 0
   )
   as ProblemColumnFROM "SOURCE_SQL_STREAM_001"
WHERE
   sector SIMILAR TO '%TECH%';
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   CHANGE DOUBLE, 
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()],
   result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 
except
: return - 1 st_env.register_function("DivideByZero",
   DivideByZero) 
   
   Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      CURRENT_TIMESTAMP AS ERROR_TIME,
      * 
   FROM
      (
         SELECT
            TICKER_SYMBOL,
            SECTOR,
            CHANGE,
            DivideByZero(PRICE) as ErrorColumn 
         FROM
            DESTINATION_SQL_STREAM 
         WHERE
            SECTOR SIMILAR TO '%TECH%' 
      )
      AS ERROR_STREAM;
```

------

## Random Cut Forest 워크로드 마이그레이션
<a name="examples-migrating-to-kda-studio-random-cut-forests"></a>

Random Cut Forest를 사용하는 워크로드를 Kinesis Analytics for SQL에서 Managed Service for Apache Flink로 이동하려는 경우, 이 [AWS 블로그 게시물](https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/)에서는 Managed Service for Apache Flink를 사용하여 이상 항목 탐지를 위한 온라인 RCF 알고리즘을 실행하는 방법을 보여줍니다.

## 소스인 Kinesis Data Firehose를 Kinesis Data Streams로 대체
<a name="examples-firehose"></a>

전체 튜토리얼은 [Converting-KDASQL-KDAStudio/](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/Converting-KDASQL-KDAStudio)를 참조하십시오.

다음 연습에서는 Amazon Managed Service for Apache Flink Studio를 사용하도록 데이터 흐름을 변경해 보겠습니다. Amazon Kinesis Data Firehose에서 Amazon Kinesis Data Streams로 전환하는 것도 의미합니다.

먼저 일반적인 KDA-SQL 아키텍처를 공유하고, Amazon Managed Service for Apache Flink Studio 및 Amazon Kinesis Data Streams를 사용하여 이를 대체할 수 있는 방법을 보여줍니다. 또는 다음에서 CloudFormation 템플릿을 시작할 수 [있습니다](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml).

### Amazon Kinesis Data Analytics-SQL 및 Amazon Kinesis Data Firehose
<a name="examples-firehose-legacy-setup"></a>

Amazon Kinesis Data Analytics SQL 아키텍처 흐름은 다음과 같습니다: 

![생산자부터 Kinesis Data Streams Firehose, 분석 SQL 및 S3까지의 데이터 흐름.](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/legacy-sql.png)


먼저 기존 Amazon Kinesis Data Analytics-SQL과 Amazon Kinesis Data Firehose의 설정을 살펴봅니다. 사용 사례는 주식 시세 및 가격을 포함한 거래 데이터가 외부 소스에서 Amazon Kinesis 시스템으로 스트리밍되는 거래 시장입니다. Amazon Kinesis Data Analytics for SQL은 입력 스트림을 사용하여 텀블링 윈도우와 같은 윈도우 쿼리를 실행하여 각 주식 시세 표시기에 대해 1분 기간의 거래량 및 `min`, `max`, `average` 거래 가격을 결정합니다.  

Amazon Kinesis Data Analytics-SQL은 Amazon Kinesis Data Firehose API에서 데이터를 수집하도록 설정되어 있습니다. 처리가 끝나면 Amazon Kinesis Data Analytics-SQL은 처리된 데이터를 다른 Amazon Kinesis Data Firehose로 보내고, 이 서비스는 출력을 Amazon S3 버킷에 저장합니다.

이 경우에는 Amazon Kinesis 데이터 생성기를 사용합니다. Amazon Kinesis 데이터 생성기를 사용하면 Amazon Kinesis Data Streams 또는 Amazon Kinesis Data Firehose 전송 스트림으로 테스트 데이터를 전송할 수 있습니다. 시작하려면 [여기](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)의 지침을 따르세요. [지침에](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) 제공된 템플릿 대신 [여기에](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml) 있는 CloudFormation 템플릿을 사용합니다.

 CloudFormation 템플릿을 실행하면 출력 섹션에 Amazon Kinesis Data Generator URL이 제공됩니다. [여기](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)에서 설정한 Cognito 사용자 ID와 비밀번호를 사용하여 포털에 로그인합니다. 리전 및 대상 스트림 명칭을 선택합니다. 현재 상태에서는 Amazon Kinesis Data Firehose 전송 스트림을 선택합니다. 새 상태에서는 Amazon Kinesis Data Firehose 스트림 명칭을 선택합니다. 요건에 따라 여러 템플릿을 생성하고 대상 스트림으로 보내기 전에 **템플릿 테스트** 버튼을 사용하여 템플릿을 테스트할 수 있습니다.

다음은 Amazon Kinesis 데이터 생성기를 사용하는 샘플 페이로드입니다. 데이터 생성기는 입력 Amazon Kinesis Firehose 스트림을 대상으로 하여 데이터를 지속적으로 스트리밍합니다. Amazon Kinesis SDK 클라이언트는 다른 생산자로부터도 데이터를 전송할 수 있습니다. 

```
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763,
"AMZN",3352023-02-17 09:28:07.763,
"GOOGL",1852023-02-17 09:28:07.763,
"AAPL",11162023-02-17 09:28:07.763,
"GOOGL",1582
```

다음 JSON은 일련의 거래 시간 및 날짜, 주식 시세 표시기, 주가를 무작위로 생성하는 데 사용됩니다.

```
date.now(YYYY-MM-DD HH:mm:ss.SSS),
"random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])",
random.number(2000)
```

**데이터 보내기**를 선택하면 생성기가 모의 데이터 전송을 시작합니다.

외부 시스템은 Amazon Kinesis Data Firehose로 데이터를 스트리밍합니다. Amazon Kinesis Data Analytics for SQL 애플리케이션을 사용하면 표준 SQL을 사용하여 스트리밍 데이터를 분석할 수 있습니다. 이 서비스를 사용하면 스트리밍 소스에 대해 SQL 코드를 작성하고 실행하여 시계열 분석을 수행하고, 실시간 대시보드를 공급하고, 실시간 지표를 생성할 수 있습니다. Amazon Kinesis Data Analytics for SQL 애플리케이션은 입력 스트림의 SQL 쿼리에서 대상 스트림을 생성하고 대상 스트림을 다른 Amazon Kinesis Data Firehose로 보낼 수 있습니다. 대상 Amazon Kinesis Data Firehose는 분석 데이터를 Amazon S3에 최종 상태로 보낼 수 있습니다.

Amazon Kinesis Data Analytics-SQL 레거시 코드는 SQL 표준의 확장을 기반으로 합니다.

Amazon Kinesis Data Analytics-SQL에서 다음 쿼리를 사용합니다. 먼저 쿼리 출력을 위한 대상 스트림을 생성합니다. 그런 다음 연속적 `INSERT INTO stream SELECT ... FROM` 쿼리 기능 실행을 제공하는 Amazon Kinesis Data Analytics 리포지토리 객체(SQL 표준의 확장)인 `PUMP`를 사용하여 쿼리 결과를 명명된 스트림에 지속적으로 입력할 수 있습니다. 

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP,
INGEST_TIME TIMESTAMP,
TICKER VARCHAR(16),
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE);
 
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME,
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME",
      "ticker",
       COUNT(*) AS VOLUME,
      AVG("tradePrice") AS AVG_PRICE,
      MIN("tradePrice") AS MIN_PRICE,
      MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001"
   GROUP BY
      "ticker",
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
      STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
```

앞의 SQL은 두 개의 시간 윈도우를 사용합니다. 하나는 수신 스트림 페이로드에서 오는 `tradeTimestamp`이고, 다른 하나는 `Event Time` 또는 `client-side time`이라고도 하는 `ROWTIME.tradeTimestamp`입니다. 이벤트가 발생한 시간이기 때문에 분석에서 이 시간을 사용하는 것이 바람직한 경우가 많습니다. 그러나 스마트폰 및 웹 클라이언트와 같은 많은 이벤트 소스가 신뢰할 만한 시계를 갖추고 있지 않아 부정확한 시간을 야기할 수 있습니다. 또한 연결 문제로 인해 레코드가 이벤트가 발생한 순서대로 스트림에 나타나지 않을 수 있습니다. 

애플리케이션 내 스트림에는 `ROWTIME`이라고 하는 특수 열이 포함됩니다. Amazon Kinesis Data Analytics가 첫 번째 애플리케이션 내 스트림에 행을 삽입하면 타임스탬프를 저장합니다. `ROWTIME`은 Amazon Kinesis Data Analytics가 스트리밍 소스에서 읽은 후 첫 번째 애플리케이션 내 스트림을 레코드에 삽입한 타임스탬프를 나타냅니다. 그런 다음 이 `ROWTIME` 값은 애플리케이션 전체에 걸쳐 유지됩니다. 

SQL은 티커 수를 60초 간격 내의 `volume`, `min`, `max`, `average` 가격으로 결정합니다. 

시간 기반 창 모드 쿼리에서 이들 시간 각각을 사용하는 것은 장점과 단점이 있습니다. 이들 시간 중 하나 이상을 선택하고 사용 시나리오를 바탕으로 관련 단점을 처리할 전략을 선택하십시오. 

두 개의 시간 기반 창, 즉 `ROWTIME`과 이벤트 시간 같은 다른 시간인 두 가지의 시간 기반을 사용하는 전략
+ 다음 예에서와 같이 쿼리가 결과를 방출하는 빈도를 제어하는 첫 번째 창으로 `ROWTIME`을 사용합니다. 논리 시간으로는 사용되지 않습니다.
+ 분석과 연관시키고자 하는 논리 시간인 다른 시간 중 하나를 사용하십시오. 이 시간은 이벤트가 발생한 시간을 나타냅니다. 다음 예에서 분석 목적은 레코드를 그룹화하고 티커별 카운트를 반환하는 것입니다.

### Amazon Managed Service for Apache Flink Studio 
<a name="examples-studio"></a>

업데이트된 아키텍처에서는 Amazon Kinesis Data Firehose를 Amazon Kinesis Data Streams로 교체합니다. Amazon Kinesis Data Analytics for SQL 애플리케이션은 Amazon Managed Service for Apache Flink Studio로 대체되었습니다. Apache Flink 코드는 Apache Zeppelin Notebook 내에서 대화형 방식으로 실행됩니다. Amazon Managed Service for Apache Flink Studio에서 집계된 거래 데이터를 Amazon S3 버킷으로 전송하여 저장합니다. 단계는 다음과 같습니다:

Amazon Managed Service for Apache Flink Studio 아키텍처 흐름은 다음과 같습니다:

![생산자는 Kinesis Data Streams 를 통해 Kinesis Data Analytics Studio로 데이터를 전송한 다음 S3로 데이터를 전송합니다.](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/kda-studio.png)


#### Kinesis Data Stream 생성
<a name="examples-studio-create-data-stream"></a>

**콘솔을 사용하여 데이터 스트림을 만들려면**

1. 에 로그인 AWS Management Console 하고 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis) Kinesis 콘솔을 엽니다.

1. 탐색 모음에서 리전 선택기를 확장하고 리전을 선택합니다.

1. **데이터 스트림 생성**을 선택합니다.

1. **Kinesis 스트림 생성** 페이지에서 데이터 스트림의 명칭을 입력하고 기본 **온디맨드** 용량 모드를 수락합니다.

   **온디맨드** 모드를 사용하면 **Kinesis 스트림 생성**을 선택하여 데이터 스트림을 생성할 수 있습니다.

   스트림이 생성 중인 동안 **Kinesis streams(Kinesis 스트림)** 페이지에서 스트림의 **Status(상태)**는 **Creating(생성 중)**입니다. 스트림을 사용할 준비가 되면 **Status(상태)**가 **Active(활성)**로 변경됩니다.

1. 스트림 명칭을 선택합니다. **스트림 세부 정보** 페이지에 모니터링 정보와 함께 스트림 구성 요약이 표시됩니다.

1. Amazon Kinesis 데이터 생성기에서 스트림/전송 스트림을 새 Amazon Kinesis Data Streams 으로 변경합니다: **TRADE\_SOURCE\_STREAM**.

   JSON과 페이로드는 Amazon Kinesis Data Analytics-SQL에서 사용한 것과 동일합니다. Amazon Kinesis 데이터 생성기를 사용하여 몇 가지 샘플 거래 페이로드 데이터를 생성하고 **TRADE\_SOURCE\_STREAM** 데이터 스트림을 이 연습의 대상으로 삼으십시오.

   ```
   {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}},
   "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}",
   {{random.number(2000)}}
   ```

1. 에서 Managed Service for Apache Flink로 AWS Management Console 이동한 다음 **애플리케이션 생성을** 선택합니다.

1. 왼쪽 탐색 창에서 **Studio 노트북**을 선택한 다음 **Studio 노트북 생성**을 선택합니다.

1. 스튜디오 노트북의 명칭을 입력합니다.

1. **AWS Glue 데이터베이스**에서 소스 및 대상의 메타데이터를 정의할 기존 AWS Glue 데이터베이스를 입력합니다. AWS Glue 데이터베이스가 없는 경우 **생성을** 선택하고 다음을 수행합니다.

   1.  AWS Glue 콘솔의 왼쪽 메뉴에 있는 **데이터 카탈로그**에서 **데이터베이스를** 선택합니다.

   1. **데이터베이스 생성**을 선택합니다.

   1. **데이터베이스 생성** 페이지에서 데이터베이스의 명칭을 입력합니다. **위치 - 옵션** 섹션에서 **Browse Amazon S3** 검색을 선택하고 Amazon S3 버킷을 선택합니다. Amazon S3 버킷이 아직 설정되지 않은 경우 이 단계를 건너뛰고 나중에 다시 돌아올 수 있습니다.

   1. (선택 사항). 데이터베이스에 대한 설명을 입력합니다.

   1. **데이터베이스 생성**을 선택합니다.

1. **노트북 생성**을 선택합니다.

1. 노트북이 생성되면 **실행**을 선택합니다.

1. 노트북이 성공적으로 시작되면 **Apache Zeppelin에서 열기**를 선택하여 Zeppelin 노트북을 실행합니다.

1. Zeppelin 노트북 페이지에서 **새 노트 만들기**를 선택하고 명칭을 *MarketDataFeed*로 지정합니다.

Flink SQL 코드에 대한 설명은 다음과 같지만, 먼저 [Zeppelin 노트북 화면의 모양은 다음과 같습니다](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/open-Zeppelin-notebook.jpg). 노트북 내의 각 창은 별도의 코드 블록이며 한 번에 하나씩 실행할 수 있습니다.

##### Amazon Managed Service for Apache Flink Studio 코드
<a name="examples-studio-code"></a>

Amazon Managed Service for Apache Flink Studio는 Zeppelin 노트북을 사용하여 코드를 실행합니다. 이 예에서는 Apache Flink 1.13에 근거한 ssql 코드에 매핑했습니다. Zeppelin 노트북의 코드는 한 번에 한 블록씩 아래에 나와 있습니다.  

Zeppelin 노트북에서 코드를 실행하기 전에 Flink 구성 명령을 실행해야 합니다. 코드(ssql, Python 또는 Scala)를 실행한 후 구성 설정을 변경해야 하는 경우 노트북을 중지하고 다시 시작해야 합니다. 이 예에서는 체크포인트를 설정해야 합니다. Amazon S3의 파일로 데이터를 스트리밍하려면 체크포인트가 요구됩니다. 이렇게 하면 Amazon S3로 스트리밍되는 데이터를 파일로 플러싱할 수 있습니다. 다음 명령문은 간격을 5,000밀리초로 설정합니다.  

```
%flink.conf
execution.checkpointing.interval 5000
```

`%flink.conf`은 이 블록이 구성 명령문임을 나타냅니다. 체크포인트를 포함한 Flink 구성에 대한 자세한 내용은 [Apache Flink 체크포인트](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints/)를 참조하세요.  

소스 Amazon Kinesis Data Streams의 입력 테이블은 다음 Flink ssql 코드를 사용하여 생성됩니다. 참고로 `TRADE_TIME` 필드에는 데이터 생성기가 생성한 날짜/시간이 저장됩니다.

```
%flink.ssql
     
DROP TABLE IF EXISTS TRADE_SOURCE_STREAM;
CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
TRADE_TIME TIMESTAMP(3),
WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE,
STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM',
'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
```

다음 명령문을 사용하여 입력 스트림을 볼 수 있습니다:

```
%flink.ssql(type=update)-- testing the source stream
   
select * from TRADE_SOURCE_STREAM;
```

Amazon S3로 집계 데이터를 전송하기 전에 Amazon Managed Service for Apache Flink Studio에서 텀블링 창 선택 쿼리를 사용하여 직접 데이터를 볼 수 있습니다. 이렇게 하면 거래 데이터가 1분 단위로 집계됩니다. 참고로 %flink.ssql 문에는 (유형=업데이트) 라는 대상이 있어야 합니다:

```
%flink.ssql(type=update)
   
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE) as TRADE_WINDOW,
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE, 
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

그런 다음 Amazon S3에서 대상 주소에 대한 표를 생성할 수 있습니다. 워터마크를 사용해야 합니다. 워터마크는 지연된 이벤트가 더 이상 발생하지 않을 것으로 확신하는 시점을 나타내는 진행률 지표입니다. 워터마크를 사용하는 이유는 늦게 도착하는 사람들을 고려하기 위함입니다. 간격 `‘5’ Second`을 통해 거래가 5초 늦게 Amazon Kinesis Data Stream에 입력될 수 있으며, 기간 내에 타임스탬프가 있는 거래는 여전히 포함됩니다. 자세한 내용은 [워터마크 생성](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/)을 참조하세요.   

```
%flink.ssql(type=update)

DROP TABLE IF EXISTS TRADE_DESTINATION_S3;
CREATE TABLE TRADE_DESTINATION_S3 (
TRADE_WINDOW_START TIMESTAMP(3),
WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND,
TICKER STRING, 
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
```

이 명령문은 데이터를 `TRADE_DESTINATION_S3`에 삽입합니다. `TUMPLE_ROWTIME`는 텀블링 창의 포함 상한선의 타임스탬프입니다.

```
%flink.ssql(type=update)

insert into TRADE_DESTINATION_S3
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE),
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE,
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM
GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

명령문을 10\~20분 동안 실행하여 Amazon S3에 일부 데이터를 누적하십시오. 그런 다음 명령문을 중단하십시오.

그러면 Amazon S3의 파일이 닫히고 해당 파일을 볼 수 있게 됩니다.

내용은 다음과 같습니다: 

![2023년 3월 1일 기술 회사의 주가 및 볼륨을 보여주는 금융 데이터 표.](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/kda-studio-contents.png)


[CloudFormation 템플릿](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)을 사용하여 인프라를 생성할 수 있습니다.

CloudFormation 는 AWS 계정에 다음 리소스를 생성합니다.
+  Amazon Kinesis Data Streams
+  Amazon Managed Service for Apache Flink Studio
+  AWS Glue 데이터베이스
+  Amazon S3 버킷
+  Amazon Managed Service for Apache Flink Studio에서 적절한 리소스에 액세스하기 위한 IAM 역할 및 정책

노트북을 가져오고에서 생성한 새 Amazon S3 버킷으로 Amazon S3 버킷 이름을 변경합니다 CloudFormation.

![거래 기간 시작 타임스탬프 및 티커 필드가 있는 TRADE_DESTINATION_S3 테이블을 생성하는 SQL 코드를 Flink합니다.](http://docs.aws.amazon.com/ko_kr/kinesisanalytics/latest/dev/images/kda-studio-cfn.png)


##### 자세한 내용을 참조하십시오.
<a name="more"></a>

다음은 Managed Service for Apache Flink Studio를 사용하는 방법에 대해 자세히 알아볼 수 있는 몇 가지 추가 리소스입니다.
+ [Managed Service for Apache Flink Studio Notebooks 개발자 가이드](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) 
+ [아파치 플링크 1.13 설명서](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 
+ [Managed Service for Apache Flink Studio 워크샵 ](https://catalog.us-east-1.prod.workshops.aws/workshops/c342c6d1-2baf-4827-ba42-52ef9eb173f6/en-US/flink-on-kda-studio) 
+ [Apache Flink 창](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/) 
+ [Amazon Kinesis Data Analytics 개발자 가이드 - Kinesis Data Analytics 스트림에서 S3 버킷으로 작성](https://docs.aws.amazon.com/managed-flink/latest/java/examples-s3.html) 