

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 迁移到适用于 Apache Flink Studio 的托管服务示例
<a name="migrating-to-kda-studio-overview"></a>

经过仔细考虑，我们决定停用 Amazon Kinesis Data Analytics for SQL 应用程序。为了帮助您规划和从 Amazon Kinesis Data Analytics for SQL 应用程序迁移出去，我们将在 15 个月内逐步停止提供该服务。这些是需要注意的重要日期，即2025年 **9月1日、202** **5年10月15日和202** **6年1月27日**。

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

1. 从 **2025 年 10 月 15 日**起，您将无法创建新的 Amazon Kinesis Data Analytics for SQL 应用程序。

1. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 应用程序的支持。要了解更多信息，请参阅[Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

建议您使用[适用于 Apache Flink 的亚马逊托管服务](https://docs.aws.amazon.com/managed-flink/latest/java/what-is.html)。它不仅简单易用，还具有高级分析功能，使您能够在几分钟内构建流处理应用程序。

本部分提供了代码和架构示例，有助于您将 Amazon Kinesis Data Analytics for SQL 应用程序工作负载迁移到适用于 Apache Flink 的托管服务。

有关更多信息，另请参阅此 [AWS 博客文章：Migrate from Amazon Kinesis Data Analytics for SQL Applications to Managed Service for Apache Flink Studio](https://aws.amazon.com/blogs/big-data/migrate-from-amazon-kinesis-data-analytics-for-sql-applications-to-amazon-managed-service-for-apache-flink-studio/)。

## 在适用于 Apache Flink Studio 的托管服务中复制 Kinesis Data Analytics for SQL 查询
<a name="examples-migrating-to-kda-studio"></a>

本节提供了适用于常见用例的查询转换,以便将您的工作负载迁移到适用于 Apache Flink Studio 的托管服务或适用于 Apache Flink 的托管服务。

在探索这些示例之前，建议您先查看 [Using a Studio notebook with a Managed Service for Apache Flink](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html)。

### 在适用于 Apache Flink Studio 的托管服务中重新创建 Kinesis Data Analytics for SQL 查询
<a name="examples-recreating-queries"></a>

以下选项提供了从基于 SQL 的常用 Kinesis Data Analytics 应用程序查询到适用于 Apache Flink Studio 的托管服务的转换。

#### 多步应用程序
<a name="Multi-Step-application"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_001" (
   ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16), price REAL, change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_001" AS 
INSERT INTO
   "IN_APP_STREAM_001"
   SELECT
      STREAM APPROXIMATE_ARRIVAL_TIME,
      ticker_symbol,
      sector,
      price,
      change FROM "SOURCE_SQL_STREAM_001";
-- Second in-app stream and pump
CREATE 
OR REPLACE STREAM "IN_APP_STREAM_02" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_02" AS 
INSERT INTO
   "IN_APP_STREAM_02"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_001";
-- Destination in-app stream and third pump
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ingest_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   sector VARCHAR(16),
   price REAL,
   change REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP_03" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ingest_time,
      ticker_symbol,
      sector,
      price,
      change FROM "IN_APP_STREAM_02";
```

------
#### [ Managed Service for Apache Flink Studio ]

```
Query 1 - % flink.ssql DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;           
           
CREATE TABLE SOURCE_SQL_STREAM_001 (TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE,
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 

FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND ) 
   PARTITIONED BY (TICKER_SYMBOL) WITH (
      'connector' = 'kinesis',
      'stream' = 'kinesis-analytics-demo-stream',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS IN_APP_STREAM_001;

CREATE TABLE IN_APP_STREAM_001 ( 
   INGEST_TIME TIMESTAMP,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE,
   CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
      'connector' = 'kinesis', 
      'stream' = 'IN_APP_STREAM_001', 
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'json',
      'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS IN_APP_STREAM_02;

CREATE TABLE IN_APP_STREAM_02 (
   INGEST_TIME TIMESTAMP, 
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   PRICE DOUBLE, 
   CHANGE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'IN_APP_STREAM_02',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   INGEST_TIME TIMESTAMP, TICKER_SYMBOL VARCHAR(4), SECTOR VARCHAR(16), 
   PRICE DOUBLE, CHANGE DOUBLE )
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'DESTINATION_SQL_STREAM',
   'aws.region' = 'us-east-1', 
   'scan.stream.initpos' = 'LATEST',  
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');


Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_001 
      SELECT
         APPROXIMATE_ARRIVAL_TIME AS INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         SOURCE_SQL_STREAM_001;


Query 3 - % flink.ssql(type = 
update
) 
   INSERT INTO
      IN_APP_STREAM_02 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_001;


Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         INGEST_TIME,
         TICKER_SYMBOL,
         SECTOR,
         PRICE,
         CHANGE 
      FROM
         IN_APP_STREAM_02;
```

------

#### 变换 DateTime 价值
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT, 
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),   
   EVENT_UNIX_TIMESTAMP INT,
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT)
   
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',   
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 简单警报
<a name="simple-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price 
FROM
   "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4), 
   CHANGE DOUBLE,       
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
   
Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
```

------

#### 受限警报
<a name="throttled-alerts"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CHANGE_STREAM"(
   ticker_symbol VARCHAR(4),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
   
CREATE 
OR REPLACE PUMP "change_pump" AS INSERT INTO "CHANGE_STREAM"
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   price
FROM "SOURCE_SQL_STREAM_001"
WHERE
   (
      ABS(Change / (Price - Change)) * 100
   )
   > 1;
-- ** Trigger Count and Limit **
-- Counts "triggers" or those values that evaluated true against the previous where clause
-- Then provides its own limit on the number of triggers per hour per ticker symbol to what is specified in the WHERE clause

CREATE 
OR REPLACE STREAM TRIGGER_COUNT_STREAM (
   ticker_symbol VARCHAR(4),
   change REAL,
   trigger_count INTEGER);
   
CREATE 
OR REPLACE PUMP trigger_count_pump AS 
INSERT INTO
   TRIGGER_COUNT_STREAMSELECT STREAM ticker_symbol,
   change,
   trigger_count 
FROM
   (
      SELECT
         STREAM ticker_symbol,
         change,
         COUNT(*) OVER W1 as trigger_countFROM "CHANGE_STREAM" --window to perform aggregations over last minute to keep track of triggers
         WINDOW W1 AS 
         (
            PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING
         )
   )
WHERE
   trigger_count >= 1;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;

CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(4),      
   CHANGE DOUBLE, PRICE DOUBLE,
   EVENT_TIME AS PROCTIME()) 
PARTITIONED BY (TICKER_SYMBOL) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');   
DROP TABLE IF EXISTS TRIGGER_COUNT_STREAM;
CREATE TABLE TRIGGER_COUNT_STREAM ( 
   TICKER_SYMBOL VARCHAR(4), 
   CHANGE DOUBLE, 
   TRIGGER_COUNT INT) 
PARTITIONED BY (TICKER_SYMBOL);

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
   WHERE
      (
         ABS(CHANGE / (PRICE - CHANGE)) * 100
      )
      > 1;
      
Query 3 - % flink.ssql(type = 
update
) 
   SELECT * 
   FROM(
         SELECT
            TICKER_SYMBOL,
            CHANGE,
            COUNT(*) AS TRIGGER_COUNT 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER_SYMBOL,
            CHANGE 
      )
   WHERE
      TRIGGER_COUNT > 1;
```

------

#### 从查询中聚合部分结果
<a name="aggregate-partial-results"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
   
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
INSERT INTO
   "CALC_COUNT_SQL_STREAM"(
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER_SYMBOL",
      STEP("SOURCE_SQL_STREAM_001",
      "ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
      COUNT(*) AS "TickerCount "
   FROM
      "SOURCE_SQL_STREAM_001" 
   GROUP BY
      STEP("SOURCE_SQL_STREAM_001". ROWTIME BY INTERVAL '1' MINUTE),
      STEP("SOURCE_SQL_STREAM_001"." APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
      TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT") 
   SELECT
      STREAM "TICKER",
      "TRADETIME",
      SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT" 
   FROM
      "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
      (
         PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
      )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001;
CREATE TABLE SOURCE_SQL_STREAM_001 (
   TICKER_SYMBOL VARCHAR(4),
   TRADETIME AS PROCTIME(),
   APPROXIMATE_ARRIVAL_TIME TIMESTAMP(3) METADATA 
FROM
   'timestamp' VIRTUAL,
   WATERMARK FOR APPROXIMATE_ARRIVAL_TIME AS APPROXIMATE_ARRIVAL_TIME - INTERVAL '1' SECOND) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',   
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');
DROP TABLE IF EXISTS CALC_COUNT_SQL_STREAM;
CREATE TABLE CALC_COUNT_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND,   
   TICKERCOUNT BIGINT NOT NULL ) PARTITIONED BY (TICKER) WITH ( 
      'connector' = 'kinesis',
      'stream' = 'CALC_COUNT_SQL_STREAM',
      'aws.region' = 'us-east-1',       
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');
DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP(3),
   WATERMARK FOR TRADETIME AS TRADETIME - INTERVAL '1' SECOND, 
   TICKERCOUNT BIGINT NOT NULL )
   PARTITIONED BY (TICKER) WITH ('connector' = 'kinesis', 
      'stream' = 'DESTINATION_SQL_STREAM',
      'aws.region' = 'us-east-1',
      'scan.stream.initpos' = 'LATEST',
      'format' = 'csv');

Query 2 - % flink.ssql(type = 
update
) 
   INSERT INTO
      CALC_COUNT_SQL_STREAM 
      SELECT
         TICKER,
         TO_TIMESTAMP(TRADETIME, 'yyyy-MM-dd HH:mm:ss') AS TRADETIME,
         TICKERCOUNT 
      FROM
         (
            SELECT
               TICKER_SYMBOL AS TICKER,
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00') AS TRADETIME,
               COUNT(*) AS TICKERCOUNT 
            FROM
               SOURCE_SQL_STREAM_001 
            GROUP BY
               TUMBLE(TRADETIME, INTERVAL '1' MINUTE),
               DATE_FORMAT(TRADETIME, 'yyyy-MM-dd HH:mm:00'),
               DATE_FORMAT(APPROXIMATE_ARRIVAL_TIME, 'yyyy-MM-dd HH:mm:00'),
               TICKER_SYMBOL 
         )
;

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      CALC_COUNT_SQL_STREAM;
      
Query 4 - % flink.ssql(type = 
update
) 
   INSERT INTO
      DESTINATION_SQL_STREAM 
      SELECT
         TICKER,
         TRADETIME,
         SUM(TICKERCOUNT) OVER W1 AS TICKERCOUNT 
      FROM
         CALC_COUNT_SQL_STREAM WINDOW W1 AS 
         (
            PARTITION BY TICKER 
         ORDER BY
            TRADETIME RANGE INTERVAL '10' MINUTE PRECEDING
         )
;

Query 5 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      DESTINATION_SQL_STREAM;
```

------

#### 转换字符串值
<a name="transform-string-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   SUBSTRING("referrer", 12, 
   (
      POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4
   )
) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME() ) PARTITIONED BY (referrer) 
WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         substring(referrer, 12, 6) as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 使用正则表达式替换子字符串
<a name="substring-regex"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM for cleaned up referrerCREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32));
CREATE 
OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM "APPROXIMATE_ARRIVAL_TIME",
   REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0) 
FROM
   "SOURCE_SQL_STREAM_001";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   referrer VARCHAR(32),
   ingest_time AS PROCTIME()) 
PARTITIONED BY (referrer) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ingest_time,
         REGEXP_REPLACE(referrer, 'http', 'https') as referrer 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 正则表达式模式日志解析
<a name="regex-log-parse"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   sector VARCHAR(24),
   match1 VARCHAR(24),
   match2 VARCHAR(24));
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM T.SECTOR,
      T.REC.COLUMN1,
      T.REC.COLUMN2 
   FROM
      (
         SELECT
            STREAM SECTOR,
            REGEX_LOG_PARSE(SECTOR, '.*([E].).*([R].*)') AS REC 
         FROM
            SOURCE_SQL_STREAM_001
      )
      AS T;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   CHANGE DOUBLE, PRICE DOUBLE,
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16)) 
PARTITIONED BY (SECTOR) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
SELECT
   * 
FROM
   (
      SELECT
         SECTOR,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 1) AS MATCH1,
         REGEXP_EXTRACT(SECTOR, '.([E].).([R].)', 2) AS MATCH2 
      FROM
         DESTINATION_SQL_STREAM 
   )
WHERE
   MATCH1 IS NOT NULL 
   AND MATCH2 IS NOT NULL;
```

------

#### 变换 DateTime 价值
<a name="transform-date-time-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( 
   TICKER VARCHAR(4),
   event_time TIMESTAMP,
   five_minutes_before TIMESTAMP,
   event_unix_timestamp BIGINT,
   event_timestamp_as_char VARCHAR(50),
   event_second INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      EVENT_TIME,
      EVENT_TIME - INTERVAL '5' MINUTE,
      UNIX_TIMESTAMP(EVENT_TIME),
      TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
      EXTRACT(SECOND 
   FROM
      EVENT_TIME) 
   FROM
      "SOURCE_SQL_STREAM_001"
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   FIVE_MINUTES_BEFORE TIMESTAMP(3),
   EVENT_UNIX_TIMESTAMP INT,    
   EVENT_TIMESTAMP_AS_CHAR VARCHAR(50),
   EVENT_SECOND INT) PARTITIONED BY (TICKER)
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601')

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         TICKER,
         EVENT_TIME,
         EVENT_TIME - INTERVAL '5' MINUTE AS FIVE_MINUTES_BEFORE,
         UNIX_TIMESTAMP() AS EVENT_UNIX_TIMESTAMP,
         DATE_FORMAT(EVENT_TIME, 'yyyy-MM-dd hh:mm:ss') AS EVENT_TIMESTAMP_AS_CHAR,
         EXTRACT(SECOND 
      FROM
         EVENT_TIME) AS EVENT_SECOND 
      FROM
         DESTINATION_SQL_STREAM;
```

------

#### 窗口和聚合
<a name="windows-aggregation"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   event_time TIMESTAMP,
   ticker_symbol VARCHAR(4),
   ticker_count INTEGER);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM EVENT_TIME,
      TICKER,
      COUNT(TICKER) AS ticker_count 
   FROM
      "SOURCE_SQL_STREAM_001" WINDOWED BY STAGGER ( PARTITION BY 
         TICKER,
         EVENT_TIME RANGE INTERVAL '1' MINUTE);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '60' SECOND,    
   TICKER VARCHAR(4),
   TICKER_COUNT INT) PARTITIONED BY (TICKER) 
WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json'

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         EVENT_TIME,
         TICKER, COUNT(TICKER) AS ticker_count 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(EVENT_TIME,
         INTERVAL '60' second),
         EVENT_TIME, TICKER;
```

------

#### 使用 ROWTIME 的滚动窗口
<a name="tumbling-windows-rowtime"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   MIN_PRICE REAL,
   MAX_PRICE REAL);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM TICKER,
      MIN(PRICE),
      MAX(PRICE)
   FROM
      "SOURCE_SQL_STREAM_001"
   GROUP BY
      TICKER,
      STEP("SOURCE_SQL_STREAM_001".
            ROWTIME BY INTERVAL '60' SECOND);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   ticker VARCHAR(4),
   price DOUBLE,
   event_time VARCHAR(32),
   processing_time AS PROCTIME()) 
PARTITIONED BY (ticker) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601') 

Query 2 - % flink.ssql(type = 
   update
) 
      SELECT
         ticker,
         min(price) AS MIN_PRICE,
         max(price) AS MAX_PRICE 
      FROM
         DESTINATION_SQL_STREAM 
      GROUP BY
         TUMBLE(processing_time, INTERVAL '60' second),
         ticker;
```

------

#### 检索最常出现的值（TOP\_K\_ITEMS\_TUMBLING）
<a name="retrieve-values"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "CALC_COUNT_SQL_STREAM"(TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
   TICKER VARCHAR(4),
   TRADETIME TIMESTAMP,
   TICKERCOUNT DOUBLE);
CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS INSERT INTO "CALC_COUNT_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM"TICKER_SYMBOL",
   STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
   COUNT(*) AS "TickerCount"
FROM
   "SOURCE_SQL_STREAM_001"
GROUP BY STEP("SOURCE_SQL_STREAM_001".
   ROWTIME BY INTERVAL '1' MINUTE),
   STEP("SOURCE_SQL_STREAM_001".
      "APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
   TICKER_SYMBOL;
CREATE PUMP "AGGREGATED_SQL_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" (
   "TICKER",
   "TRADETIME",
   "TICKERCOUNT")
SELECT
   STREAM "TICKER",
   "TRADETIME",
   SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
FROM
   "CALC_COUNT_SQL_STREAM" WINDOW W1 AS 
   (
      PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING
   )
;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM ( 
   TICKER VARCHAR(4),
   EVENT_TIME TIMESTAMP(3),
   WATERMARK FOR EVENT_TIME AS EVENT_TIME - INTERVAL '1' SECONDS ) 
PARTITIONED BY (TICKER) WITH (
   'connector' = 'kinesis', 'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.ssql(type = 
update
) 
   SELECT
      * 
   FROM
      (
         SELECT
            TICKER,
            COUNT(*) as MOST_FREQUENT_VALUES,
            ROW_NUMBER() OVER (PARTITION BY TICKER 
         ORDER BY
            TICKER DESC) AS row_num 
         FROM
            DESTINATION_SQL_STREAM 
         GROUP BY
            TUMBLE(EVENT_TIME, INTERVAL '1' MINUTE),
            TICKER
      )
   WHERE
      row_num <= 5;
```

------

#### 接近排名前 K 的项目
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ITEM VARCHAR(1024), ITEM_COUNT DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM" 
   SELECT
      STREAM ITEM,
      ITEM_COUNT 
   FROM
      TABLE(TOP_K_ITEMS_TUMBLING(CURSOR(
      SELECT
         STREAM * 
      FROM
         "SOURCE_SQL_STREAM_001"), 'column1', -- name of column in single quotes10, -- number of top items60 -- tumbling window size in seconds));
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flinkssql
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 
CREATE TABLE SOURCE_SQL_STREAM_001 ( TS TIMESTAMP(3), WATERMARK FOR TS as TS - INTERVAL '5' SECOND, ITEM VARCHAR(1024), 
PRICE DOUBLE) 
   WITH ( 'connector' = 'kinesis', 'stream' = 'SOURCE_SQL_STREAM_001',
'aws.region' = 'us-east-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json',
'json.timestamp-format.standard' = 'ISO-8601');


%flink.ssql(type=update)
SELECT
   * 
FROM
   (
      SELECT
         *,
         ROW_NUMBER() OVER (PARTITION BY AGG_WINDOW 
      ORDER BY
         ITEM_COUNT DESC) as rownum 
      FROM
         (
            select
               AGG_WINDOW,
               ITEM,
               ITEM_COUNT 
            from
               (
                  select
                     TUMBLE_ROWTIME(TS, INTERVAL '60' SECONDS) as AGG_WINDOW,
                     ITEM,
                     count(*) as ITEM_COUNT 
                  FROM
                     SOURCE_SQL_STREAM_001 
                  GROUP BY
                     TUMBLE(TS, INTERVAL '60' SECONDS),
                     ITEM
               )
         )
   )
where
   rownum <= 3
```

------

#### 分析 Web 日志 (W3C\_LOG\_PARSE 函数)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( column1 VARCHAR(16), 
   column2 VARCHAR(16), 
   column3 VARCHAR(16), 
   column4 VARCHAR(16), 
   column5 VARCHAR(16), 
   column6 VARCHAR(16), 
   column7 VARCHAR(16));
CREATE 
OR REPLACE PUMP "myPUMP" ASINSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM l.r.COLUMN1,
   l.r.COLUMN2,
   l.r.COLUMN3,
   l.r.COLUMN4,
   l.r.COLUMN5,
   l.r.COLUMN6,
   l.r.COLUMN7 
FROM
   (
      SELECT
         STREAM W3C_LOG_PARSE("log", 'COMMON') 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   AS l(r);
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis', 
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5),
      SPLIT_INDEX(log, ' ', 6) 
   from
      SOURCE_SQL_STREAM_001;
```

------

#### 将字符串拆分到多个字段 (VARIABLE\_COLUMN\_LOG\_PARSE 函数)
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16),
   "column_B" VARCHAR(16),
   "column_C" VARCHAR(16),
   "COL_1" VARCHAR(16),
   "COL_2" VARCHAR(16),
   "COL_3" VARCHAR(16));
CREATE 
OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT
   STREAM t."Col_A",
   t."Col_B",
   t."Col_C",
   t.r."COL_1",
   t.r."COL_2",
   t.r."COL_3"
FROM
   (
      SELECT
         STREAM "Col_A",
         "Col_B",
         "Col_C",
         VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
         'COL_1 TYPE VARCHAR(16),
         COL_2 TYPE VARCHAR(16),
         COL_3 TYPE VARCHAR(16)', ',') AS r 
      FROM
         "SOURCE_SQL_STREAM_001"
   )
   as t;
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
%flink.ssql(type=update)
DROP TABLE IF EXISTS SOURCE_SQL_STREAM_001 CREATE TABLE SOURCE_SQL_STREAM_001 ( log VARCHAR(1024)) 
   WITH ( 'connector' = 'kinesis',
          'stream' = 'SOURCE_SQL_STREAM_001',
          'aws.region' = 'us-east-1',
          'scan.stream.initpos' = 'LATEST',
          'format' = 'json',
          'json.timestamp-format.standard' = 'ISO-8601');
          
% flink.ssql(type=update) 
   select
      SPLIT_INDEX(log, ' ', 0),
      SPLIT_INDEX(log, ' ', 1),
      SPLIT_INDEX(log, ' ', 2),
      SPLIT_INDEX(log, ' ', 3),
      SPLIT_INDEX(log, ' ', 4),
      SPLIT_INDEX(log, ' ', 5)
)
from
   SOURCE_SQL_STREAM_001;
```

------

#### 联接
<a name="joins"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   ticker_symbol VARCHAR(4),
   "Company" varchar(20),
   sector VARCHAR(12),
   change DOUBLE,
   price DOUBLE);
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM ticker_symbol,
      "c"."Company",
      sector,
      change,
      priceFROM "SOURCE_SQL_STREAM_001" 
      LEFT JOIN
         "CompanyName" as "c"
         ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(12),
   CHANGE INT,
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH ( 
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',   
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - CREATE TABLE CompanyName (
   Ticker VARCHAR(4),
   Company VARCHAR(4)) WITH ( 
      'connector' = 'filesystem',
      'path' = 's3://kda-demo-sample/TickerReference.csv',       
      'format' = 'csv' );

Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      TICKER_SYMBOL,
      c.Company,
      SECTOR,
      CHANGE,
      PRICE 
   FROM
      DESTINATION_SQL_STREAM 
      LEFT JOIN
         CompanyName as c 
         ON DESTINATION_SQL_STREAM.TICKER_SYMBOL = c.Ticker;
```

------

#### 错误
<a name="errors"></a>

------
#### [ SQL-based Kinesis Data Analytics application  ]

```
SELECT
   STREAM ticker_symbol,
   sector,
   change,
   (
      price / 0
   )
   as ProblemColumnFROM "SOURCE_SQL_STREAM_001"
WHERE
   sector SIMILAR TO '%TECH%';
```

------
#### [ Managed Service for Apache Flink Studio  ]

```
Query 1 - % flink.ssql(type = 
update
) DROP TABLE IF EXISTS DESTINATION_SQL_STREAM;
CREATE TABLE DESTINATION_SQL_STREAM (
   TICKER_SYMBOL VARCHAR(4),
   SECTOR VARCHAR(16),
   CHANGE DOUBLE, 
   PRICE DOUBLE ) 
PARTITIONED BY (TICKER_SYMBOL) WITH (
   'connector' = 'kinesis',
   'stream' = 'kinesis-analytics-demo-stream',
   'aws.region' = 'us-east-1',   
   'scan.stream.initpos' = 'LATEST',
   'format' = 'json',
   'json.timestamp-format.standard' = 'ISO-8601');

Query 2 - % flink.pyflink @udf(input_types = [DataTypes.BIGINT()],
   result_type = DataTypes.BIGINT()) def DivideByZero(price): try: price / 0 
except
: return - 1 st_env.register_function("DivideByZero",
   DivideByZero) 
   
   Query 3 - % flink.ssql(type = 
update
) 
   SELECT
      CURRENT_TIMESTAMP AS ERROR_TIME,
      * 
   FROM
      (
         SELECT
            TICKER_SYMBOL,
            SECTOR,
            CHANGE,
            DivideByZero(PRICE) as ErrorColumn 
         FROM
            DESTINATION_SQL_STREAM 
         WHERE
            SECTOR SIMILAR TO '%TECH%' 
      )
      AS ERROR_STREAM;
```

------

## 迁移随机森林砍伐工作负载
<a name="examples-migrating-to-kda-studio-random-cut-forests"></a>

如果您想将使用随机森林砍伐 (RCF) 的工作负载从适用于 SQL 的 Kinesis Analytics 迁移到适用于 Apache Flink 的托管服务，请阅读这篇[AWS 博文](https://aws.amazon.com/blogs/big-data/real-time-anomaly-detection-via-random-cut-forest-in-amazon-kinesis-data-analytics/)，文中演示了如何使用适用于 Apache Flink 的托管服务运行用于异常检测的在线 RCF 算法。

## 用 Kinesis Data Streams 代替 Kinesis Data Firehose 源
<a name="examples-firehose"></a>

有关完整教程，请参阅 [Converting-kdaSQL-KDAStudio/](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/tree/master/Converting-KDASQL-KDAStudio)。

在以下练习中，您需要更改数据流，以便使用适用于 Apache Flink Studio 的亚马逊托管服务。即从 Amazon Kinesis Data Firehose 切换到Amazon Kinesis Data Streams。

首先，我们提供一个典型的 KDA-SQL 架构，然后演示如何使用适用于 Apache Flink Studio 的亚马逊托管服务和 Amazon Kinesis Data Streams 取而代之。或者，您可以[在此](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)处启动 CloudFormation 模板：

### Amazon Kinesis Data Analytics-SQL 和 Amazon Kinesis Data Firehose
<a name="examples-firehose-legacy-setup"></a>

以下是 Amazon Kinesis Data Analytics SQL 架构流程：

![](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/legacy-sql.png)


我们首先研究了传统 Amazon Kinesis Data Analytics-SQL 和 Amazon Kinesis Data Firehose 的设置。用例是一个交易市场，交易数据 (包括股票代码和价格) 从外部来源流向 Amazon Kinesis 系统。Amazon Kinesis Data Analytics for SQL 使用输入流执行窗口式查询，例如滚动窗口，从而确定每只股票在一分钟窗口内的交易量和 `min`、`max` 以及 `average` 交易价格。  

Amazon Kinesis Data Analytics-SQL 设置为从亚马逊 Amazon Kinesis Data Firehose API 提取数据。处理后，Amazon Kinesis Data Analytics-SQL 将处理后的数据发送到另一个 Amazon Kinesis Data Firehose，然后由后者将输出保存在 Amazon S3 存储桶中。

在本例中，您将使用 Amazon Kinesis 数据生成器。使用 Amazon Kinesis 数据生成器，您可以将测试数据发送到 Amazon Kinesis Data Streams 或 Amazon Kinesis Data Firehose 传输流。要开始使用，请按照[此处](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)的说明进行操作。使用[此处](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)的 CloudFormation 模板代替[说明中提供的模板:](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html).

运行 CloudFormation 模板后，输出部分将提供亚马逊 Kinesis 数据生成器网址。使用您在[此处](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html)设置的 Cognito 用户 ID 和密码登录门户。选择区域和目标流名称。当前状态请选择 Amazon Kinesis Data Firehose 传输流。对于新状态，请选择 Amazon Kinesis Data Firehose 流名称。您可以根据需要创建多个模板，然后使用**测试模板**按钮测试模板，然后再发送到目标流。

以下是使用 Amazon Kinesis 数据生成器的有效负载示例。数据生成器将输入的 Amazon Kinesis Firehose 流作为目标，持续流式处理数据。Amazon Kinesis 软件开发工具包客户端也可以发送来自其他创建器的数据。 

```
2023-02-17 09:28:07.763,"AAPL",5032023-02-17 09:28:07.763,
"AMZN",3352023-02-17 09:28:07.763,
"GOOGL",1852023-02-17 09:28:07.763,
"AAPL",11162023-02-17 09:28:07.763,
"GOOGL",1582
```

以下 JSON 用于生成一系列随机交易时间和日期、股票代码和股票价格：

```
date.now(YYYY-MM-DD HH:mm:ss.SSS),
"random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])",
random.number(2000)
```

选择**发送数据**后，生成器将开始发送模拟数据。

外部系统将数据流式传输到 Amazon Kinesis Data Firehose。Amazon Kinesis Data Analytics for SQL 应用程序中，您可以使用标准 SQL 分析流数据。该服务用于根据流式传输源创建并运行 SQL 代码，以便执行时间序列分析，馈送实时控制面板和创建实时指标。Amazon Kinesis Data Analytics for SQL 应用程序可以根据输入流上的 SQL 查询创建目标流，然后将目标流发送给另一个 Amazon Kinesis Data Firehose。目标 Amazon Kinesis Data Firehose 可以最终状态将分析数据发送到 Amazon S3。

Amazon Kinesis Data Analytics-SQL 传统代码基于 SQL 标准的扩展。

您可以在 Amazon Kinesis Data Analytics-SQL 中使用以下查询。首先为查询输出创建目标流。然后，您将使用 `PUMP`，一个 Amazon Kinesis Data Analytics 存储库对象 (SQL 标准的扩展)，其提供持续运行的 `INSERT INTO stream SELECT ... FROM` 查询功能，因此，查询结果可持续输入到指定流中。 

```
CREATE 
OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME TIMESTAMP,
INGEST_TIME TIMESTAMP,
TICKER VARCHAR(16),
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE);
 
CREATE 
OR REPLACE PUMP "STREAM_PUMP" AS 
INSERT INTO
   "DESTINATION_SQL_STREAM"
   SELECT
      STREAM STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND) AS EVENT_TIME,
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS "STREAM_INGEST_TIME",
      "ticker",
       COUNT(*) AS VOLUME,
      AVG("tradePrice") AS AVG_PRICE,
      MIN("tradePrice") AS MIN_PRICE,
      MAX("tradePrice") AS MAX_PRICEFROM "SOURCE_SQL_STREAM_001"
   GROUP BY
      "ticker",
      STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
      STEP("SOURCE_SQL_STREAM_001"."tradeTimestamp" BY INTERVAL '60' SECOND);
```

前述 SQL 使用两个时间窗口，即，来自传入流有效载荷的 `tradeTimestamp`，以及 `ROWTIME.tradeTimestamp`（也称为 `Event Time` 或 `client-side time`）。经常需要在分析中使用此时间，因为它是事件发生时的时间。但是，许多事件源（例如手机和 Web 客户端）没有可靠的时钟，这可能会导致时间不准确。此外，连接问题可能会导致记录没有按照事件发生顺序出现在流中。 

应用程序内部流也包含名为 `ROWTIME` 的特殊列。该列存储 Amazon Kinesis Data Analytics 在第一个应用程序内部流中插入行的时间戳。`ROWTIME` 反映了 Amazon Kinesis Data Analytics 从流式传输源中读取后将记录插入到第一个应用程序内部流的时间戳。之后，该 `ROWTIME` 值在您的整个应用程序中进行维护。 

SQL 在 60 秒的时间间隔内确定股票的 `volume`、`min`、`max` 和 `average` 价格。 

在基于时间的窗口式查询中使用这些时间有优点也有缺点。选择这些时间中的一个或多个，并根据您的使用案例场景选择一种策略来处理相关缺点。 

双窗口策略基于不同的时间，即 `ROWTIME` 和其他时间（接收时间或事件时间）中的一个。
+ 使用 `ROWTIME` 作为第一个窗口，控制查询发送结果的频率，如以下示例所示。它不用作逻辑时间。
+ 使用其他时间中您希望与分析关联的逻辑时间。该时间表示事件的发生时间。在以下示例中，分析目标是按股票行情机对记录分组并返回计数。

### 适用于 Apache Flink Studio 的亚马逊托管服务 
<a name="examples-studio"></a>

在更新的架构中，将 Amazon Kinesis Data Firehose 替换为 Amazon Kinesis Data Streams。Amazon Kinesis Data Analytics for SQL 应用程序已替换为适用于 Apache Flink Studio 的亚马逊托管服务。Apache Flink 代码在 Apache Zeppelin 笔记本中以交互方式运行。Amazon Managed Service for Apache Flink Studio 将聚合的交易数据发送到 Amazon S3 桶中，以便存储。步骤如下：

以下是适用于 Apache Flink Studio 的亚马逊托管服务架构流程：

![](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/kda-studio.png)


#### 创建 Kinesis 数据流
<a name="examples-studio-create-data-stream"></a>

**使用控制台创建数据流**

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航栏中，展开区域选择器并选择一个区域。

1. 选择**创建数据流**。

1. 在**创建 Kinesis 流**页面上，输入数据流的名称，然后接受默认的**按需**容量模式。

   在**按需**模式下，您可以选择**创建 Kinesis 流**来创建数据流。

   在 **Kinesis stream (Kinesis 流)**页面上，当流处于创建中时，流的 **Status (状态)** 为 **Creating (正在创建)**。当流可以使用时，**Status (状态)** 会更改为 **Active (有效)**。

1. 选择流的名称。**Stream Details (流详细信息)** 页面显示了流配置摘要以及监控信息。

1. **在亚马逊 Kinesis 数据生成器中，将数据流更改为新的亚马逊 Kinesis Data Stream/delivery Streams：TRADE\_SOURCE\_STREAM。**

   JSON 和有效负载不变，即与您在 Amazon Kinesis Data Analytics-SQL 中使用的一致。使用 Amazon Kinesis 数据生成器生成一些交易有效负载示例数据，并将 **TRADE\_SOURCE\_STREAM 数据流**作为本次练习的目标：

   ```
   {{date.now(YYYY-MM-DD HH:mm:ss.SSS)}},
   "{{random.arrayElement(["AAPL","AMZN","MSFT","META","GOOGL"])}}",
   {{random.number(2000)}}
   ```

1.  AWS 管理控制台 转到适用于 Apache Flink 的托管服务，然后选择**创建**应用程序。

1. 在左侧的导航窗格中，选择 **工作室笔记本**，然后选择 **创建工作室笔记本**。

1. 输入 Studio 笔记本的名称。

1. 在 **AWS Glue 数据库**下，提供一个已经存在的 AWS Glue 数据库，用于定义源和目标的元数据。如果您没有 AWS Glue 数据库，请选择 “**创建**” 并执行以下操作：

   1. 在 AWS Glue 控制台中，从左侧菜单的 “**数据目录**” 下选择 “**数据库**”。

   1. 选择 **创建数据库**。

   1. 在** 创建数据库**页面中，输入数据库的名称。在 **位置 - 可选** 部分中，选择 **浏览 Amazon S3**，然后选择 Amazon S3 桶。如果尚未设置 Amazon S3 桶，您可以跳过此步骤，稍后再返回。

   1. （可选）。输入数据库的描述。

   1. 选择**创建数据库**。

1. 选择 **创建笔记本**。

1. 创建笔记本后，选择**运行**。

1. 成功启动笔记本后，选择**在 Apache Zeppelin 中打开**，启动 Zeppelin 笔记本。

1. 在 “齐柏林飞艇笔记本” 页面上，选择 “**创建新笔记**” 并将其命名。*MarketDataFeed*

Flink SQL 代码的说明如下，但首先请参阅 [Zeppelin 笔记本屏幕外观](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/open-Zeppelin-notebook.jpg)。笔记本中的每个窗口都是一个单独的代码块，一次只能运行一个。

##### 适用于 Apache Flink 的亚马逊托管服务代码
<a name="examples-studio-code"></a>

适用于 Apache Flink 的亚马逊托管服务使用 Zeppelin 笔记本来运行代码。在本示例中，已基于 Apache Flink 1.13 进行 ssql 代码映射。Zeppelin 笔记本中的代码如下所示，一次运行一个代码块。  

在 Zeppelin 笔记本中运行任何代码之前，必须先运行 Flink 配置命令。如果在运行代码（ssql、Python 或 Scala）后需要更改任何配置设置，则必须停止并重启笔记本。在此示例中，您必须设置检查点。必须设置检查点，以便您可以将数据流式传输到 Amazon S3 中的文件。从而实现向 Amazon S3 进行的数据流式传输刷新到文件。以下语句将间隔设置为 5000 毫秒。  

```
%flink.conf
execution.checkpointing.interval 5000
```

`%flink.conf` 表示此数据块属于配置语句。有关 Flink 配置（包括检查点）的更多信息，请参阅 [Apache Flink Checkpointing](https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/ops/state/checkpoints/)。  

源 Amazon Kinesis Data Streams 的输入表使用以下 Flink ssql 代码创建。请注意，该`TRADE_TIME`字段存储由数据生成器 date/time 创建的。

```
%flink.ssql
     
DROP TABLE IF EXISTS TRADE_SOURCE_STREAM;
CREATE TABLE TRADE_SOURCE_STREAM (--`arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
TRADE_TIME TIMESTAMP(3),
WATERMARK FOR TRADE_TIME as TRADE_TIME - INTERVAL '5' SECOND,TICKER STRING,PRICE DOUBLE,
STATUS STRING)WITH ('connector' = 'kinesis','stream' = 'TRADE_SOURCE_STREAM',
'aws.region' = 'us-east-1','scan.stream.initpos' = 'LATEST','format' = 'csv');
```

您可以使用以下语句查看输入流：

```
%flink.ssql(type=update)-- testing the source stream
   
select * from TRADE_SOURCE_STREAM;
```

在将汇总数据发送到 Amazon S3 之前，您可以使用滚动窗口选择查询直接在适用于 Apache Flink 的亚马逊托管服务中查看。此代码将汇总一分钟时间窗口内的交易数据。请注意，%flink.ssql 语句必须进行 (type=update) 指定：

```
%flink.ssql(type=update)
   
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE) as TRADE_WINDOW,
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE, 
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAMGROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

然后，您才可以在 Amazon S3 中创建目标表。您必须使用水印。水印是一种进度指标，它表示您确信不会再出现延迟事件的时间点。使用水印的原因是为了考虑到达延迟。间隔 `‘5’ Second` 允许交易延迟5秒输入 Amazon Kinesis 数据流，如果窗口内有时间戳，则仍包含在内。有关更多信息，请参阅 [Generating Watermarks](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/event-time/generating_watermarks/)。   

```
%flink.ssql(type=update)

DROP TABLE IF EXISTS TRADE_DESTINATION_S3;
CREATE TABLE TRADE_DESTINATION_S3 (
TRADE_WINDOW_START TIMESTAMP(3),
WATERMARK FOR TRADE_WINDOW_START as TRADE_WINDOW_START - INTERVAL '5' SECOND,
TICKER STRING, 
VOLUME BIGINT,
AVG_PRICE DOUBLE,
MIN_PRICE DOUBLE,
MAX_PRICE DOUBLE)
WITH ('connector' = 'filesystem','path' = 's3://trade-destination/','format' = 'csv');
```

此语句将数据插入 `TRADE_DESTINATION_S3`。`TUMPLE_ROWTIME` 是滚动窗口上限 (含) 的时间戳。

```
%flink.ssql(type=update)

insert into TRADE_DESTINATION_S3
select TUMBLE_ROWTIME(TRADE_TIME,
INTERVAL '1' MINUTE),
TICKER, COUNT(*) as VOLUME,
AVG(PRICE) as AVG_PRICE,
MIN(PRICE) as MIN_PRICE,
MAX(PRICE) as MAX_PRICE FROM TRADE_SOURCE_STREAM
GROUP BY TUMBLE(TRADE_TIME, INTERVAL '1' MINUTE), TICKER;
```

运行语句 10 到 20 分钟，以便在 Amazon S3 中积累一些数据。然后中止语句。

从而关闭 Amazon S3 中的文件以便进行查看。

以下是内容：

![](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/kda-studio-contents.png)


您可以使用[CloudFormation 模板](https://github.com/aws-samples/amazon-kinesis-data-analytics-examples/blob/master/Converting-KDASQL-KDAStudio/environmentStackCfn/KdaStudioStack.template.yaml)来创建基础设施。

CloudFormation 将在您的 AWS 账户中创建以下资源：
+  Amazon Kinesis Data Streams
+  适用于 Apache Flink Studio 的亚马逊托管服务
+  AWS Glue 数据库
+  亚马逊 S3 存储桶
+  适用于 Apache Flink Studio 的亚马逊托管服务访问相应资源的 IAM 角色和策略

导入笔记本并使用创建的新 Amazon S3 存储桶更改 Amazon S3 存储桶名称 CloudFormation。

![](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/kda-studio-cfn.png)


##### 查看更多
<a name="more"></a>

以下是一些其他资源，您可以用来详细了解如何使用适用于 Apache Flink Studio 的托管服务：
+ [适用于 Apache Flink Studio 的托管服务开发人员指南](https://docs.aws.amazon.com/managed-flink/latest/java/how-notebook.html) 
+ [Apache Flink 1.13 文档](https://nightlies.apache.org/flink/flink-docs-release-1.13/) 
+ [适用于 Apache Flink Studio Workshop 的托管服务](https://catalog.us-east-1.prod.workshops.aws/workshops/c342c6d1-2baf-4827-ba42-52ef9eb173f6/en-US/flink-on-kda-studio) 
+ [Apache Flink 窗口化](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/queries/window-tvf/) 
+ [Amazon Kinesis Data Analytics 开发人员指南——从 Kinesis Data Analytics 流写入 S3 存储桶](https://docs.aws.amazon.com/managed-flink/latest/java/examples-s3.html) 