

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Kinesis Data Analytics for SQL 示例
<a name="examples"></a>

此部分提供在 Amazon Kinesis Data Analytics 中创建和使用应用程序的示例。它们包括示例代码和 step-by-step说明，可帮助您创建 Kinesis Data Analytics 应用程序并测试结果。

 在开始探索这些示例之前，建议您先查看[Amazon Kinesis Data Analytics for SQL 应用程序：工作原理](how-it-works.md)和[Amazon Kinesis Data Analytics·for·SQL 应用程序入门](getting-started.md)。

**Topics**
+ [示例：转换数据](examples-transforming.md)
+ [示例：窗口和聚合](examples-window.md)
+ [示例：联接](examples-joins.md)
+ [示例：机器学习](examples-machine.md)
+ [示例：警报和错误](examples-alerts.md)
+ [示例：解决方案加速器](examples_solution.md)

# 示例：转换数据
<a name="examples-transforming"></a>

有时，在 Amazon Kinesis Data Analytics 中执行任何分析之前，应用程序代码必须先预处理传入记录。这种情况是由多种原因导致的，例如，不符合支持的记录格式的记录会导致应用程序内部输入流中出现非规范化的列。

此部分提供了有关如何使用可用的字符串函数来规范化数据、如何从字符串列中提取所需信息等操作的示例，还指明了您可能发现很有用的日期时间函数。

## 使用 Lambda 预处理流
<a name="examples-transforming-lambda"></a>

有关使用预处理流的信息 AWS Lambda，请参见[使用 Lambda 函数预处理数据](lambda-preprocessing.md)。

**Topics**
+ [使用 Lambda 预处理流](#examples-transforming-lambda)
+ [示例：转换字符串值](examples-transforming-strings.md)
+ [示例：转换 DateTime 值](app-string-datetime-manipulation.md)
+ [示例：转换多个数据类型](app-tworecordtypes.md)

# 示例：转换字符串值
<a name="examples-transforming-strings"></a>

对于流式传输源中的记录，Amazon Kinesis Data Analytics 支持 JSON 和 CSV 等格式。有关更多信息，请参阅 [RecordFormat](API_RecordFormat.md)。然后，这些记录根据输入配置映射到应用程序内部流中的行。有关更多信息，请参阅 [配置应用程序输入](how-it-works-input.md)。输入配置指定流式传输源中的记录字段如何映射到应用程序内部流中的列。

当流式传输源中的记录遵循支持的格式时，此映射有效，这会导致应用程序内部流具有规范化数据。但是，如果流式传输源中的数据不符合支持的标准怎么办？ 例如，如果流式传输源包含点击流数据、IoT 传感器和应用程序日志等数据时该怎么办呢？ 

请考虑以下示例：
+ 流式源包含应用程序日志 - 应用程序日志遵循标准 Apache 日志格式，并使用 JSON 格式写入到流。

  ```
  {
     "Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pb.gif HTTP/1.1" 304 0"
  }
  ```

  有关标准 Apache 日志格式的更多信息，请参阅 Apache 网站上的[日志文件](https://httpd.apache.org/docs/2.4/logs.html)。

   
+ 流式源包含半结构化数据 - 以下示例显示了两种记录。`Col_E_Unstructured` 字段值是一系列逗号分隔的值。这里总共有五列：前四列包含字符串类型的值，最后一列包含逗号分隔的值。

  ```
  { "Col_A" : "string",
    "Col_B" : "string",
    "Col_C" : "string",
    "Col_D" : "string",
    "Col_E_Unstructured" : "value,value,value,value"}
  
  { "Col_A" : "string",
    "Col_B" : "string",
    "Col_C" : "string",
    "Col_D" : "string",
    "Col_E_Unstructured" : "value,value,value,value"}
  ```
+ 您的直播源上的记录包含 URLs，您需要网址域名的一部分进行分析。

  ```
  { "referrer" : "http://www.amazon.com"}
  { "referrer" : "http://www.stackoverflow.com" }
  ```

此种情况下，以下包含两个步骤的过程通常适用于创建包含规范化数据的应用程序内部流：

1. 配置应用程序输入以便将非结构化字段映射到创建的应用程序内部输入流中的 `VARCHAR(N)` 类型的列。

1. 在应用程序代码中，使用字符串功能将这一个列拆分为多个列，并将行保存到其他应用程序内部流。应用程序代码创建的该应用程序内部流将包含规范化数据。然后，您可以对该应用程序内部流进行分析。

Amazon Kinesis Data Analytics 提供以下字符串操作、标准 SQL 函数和 SQL 标准扩展以处理字符串列：
+ **字符串运算符** - 在比较字符串时，`LIKE` 和 `SIMILAR` 等运算符是非常有用的。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[字符串运算符](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-string-operators.html)。
+ **SQL 函数** - 在处理各种字符串时，以下函数是非常有用的。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[字符串和搜索函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-string-and-search-functions.html)。
  + `CHAR_LENGTH` – 提供字符串的长度。
  + `INITCAP` – 返回输入字符串的转换后版本，在这类字符串中，每个以空格分隔的单词的第一个字符都是大写的，所有其他字符都是小写的。
  + `LOWER/UPPER` – 将字符串转换为小写或大写。
  + `OVERLAY` – 使用第二个字符串参数 (替代字符串) 替换第一个字符串参数的一部分 (原始字符串)。
  + `POSITION` - 在某个字符串中搜索其他字符串。
  + `REGEX_REPLACE` – 将子字符串替换为备用子字符串。
  + `SUBSTRING` - 从特定位置开始提取部分源字符串。
  + `TRIM` – 从源字符串的开头或结尾删除指定字符的实例。
+ **SQL 扩展** — 这些扩展对于处理非结构化字符串（例如日志和 URIs）非常有用。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[日志解析函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-pattern-matching-functions.html)。
  + `FAST_REGEX_LOG_PARSER` – 工作原理类似于正则表达式分析程序，但方法更简便，生成结果的速度更快。例如，较快的正则表达式解析程序会在找到第一个匹配项时停止（称为*延迟语义*）。
  + `FIXED_COLUMN_LOG_PARSE` – 分析固定宽度的字段，自动将其转换为指定的 SQL 类型。
  + `REGEX_LOG_PARSE` – 根据默认的 Java 正则表达式模式分析字符串。
  + `SYS_LOG_PARSE`— 解析 UNIX/Linux 系统日志中常见的条目。
  + `VARIABLE_COLUMN_LOG_PARSE` – 将输入字符串拆分为多个由分隔符或分隔符字符串分隔的字段。
  + `W3C_LOG_PARSE` – 可用于快速格式化 Apache 日志。

有关使用这些函数的示例，请参阅以下主题：

**Topics**
+ [示例：提取部分字符串 (SUBSTRING 函数)](examples-transforming-strings-substring.md)
+ [示例：使用正则表达式替换子字符串 (REGEX\$1REPLACE 函数)](examples-transforming-strings-regexreplace.md)
+ [示例：根据正则表达式分析日志字符串 (REGEX\$1LOG\$1PARSE 函数)](examples-transforming-strings-regexlogparse.md)
+ [示例：分析 Web 日志 (W3C\$1LOG\$1PARSE 函数)](examples-transforming-strings-w3clogparse.md)
+ [示例：将字符串拆分到多个字段 (VARIABLE\$1COLUMN\$1LOG\$1PARSE 函数)](examples-transforming-strings-variablecolumnlogparse.md)

# 示例：提取部分字符串 (SUBSTRING 函数)
<a name="examples-transforming-strings-substring"></a>

此示例使用 `SUBSTRING` 函数在 Amazon Kinesis Data Analytics 中转换字符串。`SUBSTRING` 函数从特定位置开始提取部分源字符串。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[ 子字符串](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-substring.html)。

在本示例中，您将以下记录写入到 Amazon Kinesis 数据流中。

```
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
```



然后，您在控制台上创建一个 Kinesis Data Analytics 应用程序，并将 Kinesis 数据流作为流式传输源。发现过程读取有关流式传输源的示例记录，并推断出具有一列 (`REFERRER`) 的应用程序内部架构，如下所示。

![\[控制台屏幕截图显示了应用程序内架构，反向链接列 URLs 中有一个列表。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/referrer-10.png)


然后，您可以将应用程序代码与 `SUBSTRING` 函数结合使用，来解析 URL 字符串以检索公司名称。随后将结果数据插入另一个应用程序内部流，如下所示：



![\[控制台屏幕截图，显示具有应用程序内部流中的结果数据的 Real-time analytics (实时分析) 选项卡。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/referrer-20.png)


**Topics**
+ [步骤 1：创建 Kinesis 数据流](#examples-transforming-strings-substring-1)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](#examples-transforming-strings-substring-2)

## 步骤 1：创建 Kinesis 数据流
<a name="examples-transforming-strings-substring-1"></a>

创建一个 Amazon Kinesis 数据流并填充日志记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建带有一个分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"REFERRER": "http://www.amazon.com"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="examples-transforming-strings-substring-2"></a>

接下来，创建一个 Kinesis Data Analytics 应用程序，如下所示：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **创建应用程序**，键入应用程序名称，然后选择 **创建应用程序**。

1. 在应用程序详细信息页面上，选择 **连接流数据**。

1. 在 **连接到源** 页面上，执行以下操作：

   1. 选择在上一部分中创建的流。

   1. 选择创建 IAM 角色的选项。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构仅包含一列。

   1. 选择 **保存并继续**。

   

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      ```
      -- CREATE OR REPLACE STREAM for cleaned up referrer
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          "ingest_time" TIMESTAMP,
          "referrer" VARCHAR(32));
          
      CREATE OR REPLACE PUMP "myPUMP" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM 
               "APPROXIMATE_ARRIVAL_TIME", 
               SUBSTRING("referrer", 12, (POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4)) 
            FROM "SOURCE_SQL_STREAM_001";
      ```

   1. 选择 **保存并运行 SQL**。在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。

# 示例：使用正则表达式替换子字符串 (REGEX\$1REPLACE 函数)
<a name="examples-transforming-strings-regexreplace"></a>

此示例使用 `REGEX_REPLACE` 函数在 Amazon Kinesis Data Analytics 中转换字符串。`REGEX_REPLACE` 将子字符串替换为备用子字符串。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[ REGEX\$1REPLACE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-regex-replace.html)。

在本示例中，您将以下记录写入到 Amazon Kinesis 数据流中：

```
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
```



然后，您在控制台上创建一个 Kinesis Data Analytics 应用程序，并将 Kinesis 数据流作为流式传输源。发现过程读取有关流式传输源的示例记录，并推断出具有一列 (REFERRER) 的应用程序内部架构，如下所示。

![\[控制台屏幕截图显示了应用程序内架构，其列表 URLs 在 referrer 列中。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/referrer-10.png)


然后，通过 `REGEX_REPLACE` 函数使用应用程序代码将 URL 转换为使用 `https://` 而不是 `http://`。随后将结果数据插入另一个应用程序内部流，如下所示：



![\[控制台屏幕截图，显示具有 ROWTIME、ingest_time 和引用站点列的结果数据表。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_regex_replace.png)


**Topics**
+ [步骤 1：创建 Kinesis 数据流](#examples-transforming-strings-regexreplace-1)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](#examples-transforming-strings-regexreplace-2)

## 步骤 1：创建 Kinesis 数据流
<a name="examples-transforming-strings-regexreplace-1"></a>

创建一个 Amazon Kinesis 数据流并填充日志记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建带有一个分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"REFERRER": "http://www.amazon.com"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="examples-transforming-strings-regexreplace-2"></a>

接下来，创建一个 Kinesis Data Analytics 应用程序，如下所示：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **创建应用程序**，键入应用程序名称，然后选择 **创建应用程序**。

1. 在应用程序详细信息页面上，选择 **连接流数据**。

1. 在 **连接到源** 页面上，执行以下操作：

   1. 选择在上一部分中创建的流。

   1. 选择创建 IAM 角色的选项。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构仅包含一列。

   1. 选择 **保存并继续**。

   

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果，如下所示：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中：

      ```
      -- CREATE OR REPLACE STREAM for cleaned up referrer
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          "ingest_time" TIMESTAMP,
          "referrer" VARCHAR(32));
          
      CREATE OR REPLACE PUMP "myPUMP" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM 
               "APPROXIMATE_ARRIVAL_TIME", 
               REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0)
            FROM "SOURCE_SQL_STREAM_001";
      ```

   1. 选择 **保存并运行 SQL**。在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。

# 示例：根据正则表达式分析日志字符串 (REGEX\$1LOG\$1PARSE 函数)
<a name="examples-transforming-strings-regexlogparse"></a>

此示例使用 `REGEX_LOG_PARSE` 函数在 Amazon Kinesis Data Analytics 中转换字符串。`REGEX_LOG_PARSE` 根据默认 Java 正则表达式模式分析字符串。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[ REGEX\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-regex-log-parse.html)。

在本示例中，您将以下记录写入到 Amazon Kinesis 流中。

```
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200 125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
...
```



然后，您在控制台上创建一个 Kinesis Data Analytics 应用程序，并将 Kinesis 数据流作为流式传输源。发现过程读取有关流式传输源的示例记录，并推断出具有一列 (LOGENTRY) 的应用程序内部架构，如下所示。

![\[控制台屏幕截图，显示具有 LOGENTRY 列的应用程序内部架构。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_regex_log_parse_0.png)


然后，在 `REGEX_LOG_PARSE` 函数中使用应用程序代码分析日志字符串从而检索数据元素。随后将结果数据插入另一个应用程序内部流，如下面的屏幕截图所示：



![\[控制台屏幕截图显示了生成的数据表，其中包含 ROWTIME、LOGENTRY 和列 MATCH1。 MATCH2\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_regex_log_parse_1.png)


**Topics**
+ [步骤 1：创建 Kinesis 数据流](#examples-transforming-strings-regexlogparse-1)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](#examples-transforming-strings-regexlogparse-2)

## 步骤 1：创建 Kinesis 数据流
<a name="examples-transforming-strings-regexlogparse-1"></a>

创建一个 Amazon Kinesis 数据流并填充日志记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建带有一个分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] "
           '"GET /index.php HTTP/1.1" 200 125 "-" '
           '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"'
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="examples-transforming-strings-regexlogparse-2"></a>

接下来，创建一个 Kinesis Data Analytics 应用程序，如下所示：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **Create application**，并指定应用程序名称。

1. 在应用程序详细信息页面上，选择 **连接流数据**。

1. 在 **连接到源** 页面上，执行以下操作：

   1. 选择在上一部分中创建的流。

   1. 选择创建 IAM 角色的选项。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构仅包含一列。

   1. 选择 **保存并继续**。

   

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (logentry VARCHAR(24), match1 VARCHAR(24), match2 VARCHAR(24));
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM T.LOGENTRY, T.REC.COLUMN1, T.REC.COLUMN2
          FROM 
               (SELECT STREAM LOGENTRY,
                   REGEX_LOG_PARSE(LOGENTRY, '(\w.+) (\d.+) (\w.+) (\w.+)') AS REC
                   FROM SOURCE_SQL_STREAM_001) AS T;
      ```

   1. 选择 **保存并运行 SQL**。在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。

# 示例：分析 Web 日志 (W3C\$1LOG\$1PARSE 函数)
<a name="examples-transforming-strings-w3clogparse"></a>

此示例使用 `W3C_LOG_PARSE` 函数在 Amazon Kinesis Data Analytics 中转换字符串。您可以使用 `W3C_LOG_PARSE` 快速格式化 Apache 日志。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的 [W3C\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-w3c-log-parse.html)。

在本示例中，您将日志记录写入到 Amazon Kinesis 数据流中。示例日志如下所示：

```
{"Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pba.gif HTTP/1.1" 304 0"}
{"Log":"192.168.254.30 - John [24/May/2004:22:01:03 -0700] "GET /icons/apache_pbb.gif HTTP/1.1" 304 0"}
{"Log":"192.168.254.30 - John [24/May/2004:22:01:04 -0700] "GET /icons/apache_pbc.gif HTTP/1.1" 304 0"}
...
```



然后，您在控制台上创建一个 Kinesis Data Analytics 应用程序，并将 Kinesis 数据流作为流式传输源。发现过程读取流式传输源上的示例记录，并推断出具有一个列（日志）的应用程序内部架构，如下所示：

![\[控制台屏幕截图，显示 Formatted stream sample (格式化的流示例) 选项卡以及包含日志列的应用程序内部架构。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/log-10.png)


然后，您将使用应用程序代码和 `W3C_LOG_PARSE` 函数解析日志，创建另一应用程序内部流（将不同日志字段放置在单独的列中），如下所示：

![\[控制台屏幕截图，显示 实时分析 选项卡和应用程序内部流。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/log-20.png)


**Topics**
+ [步骤 1：创建 Kinesis 数据流](#examples-transforming-strings-w3clogparse-1)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](#examples-transforming-strings-w3clogparse-2)

## 步骤 1：创建 Kinesis 数据流
<a name="examples-transforming-strings-w3clogparse-1"></a>

创建一个 Amazon Kinesis Data Stream 并填充日志记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建带有一个分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] "
           '"GET /icons/apache_pb.gif HTTP/1.1" 304 0'
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="examples-transforming-strings-w3clogparse-2"></a>

创建一个 Kinesis Data Analytics 应用程序，如下所示：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **创建应用程序**，键入应用程序名称，然后选择 **创建应用程序**。

1. 在应用程序详细信息页面上，选择 **连接流数据**。

1. 在 **连接到源** 页面上，执行以下操作：

   1. 选择在上一部分中创建的流。

   1. 选择创建 IAM 角色的选项。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构仅包含一列。

   1. 选择 **保存并继续**。

   

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
      column1 VARCHAR(16),
      column2 VARCHAR(16),
      column3 VARCHAR(16),
      column4 VARCHAR(16),
      column5 VARCHAR(16),
      column6 VARCHAR(16),
      column7 VARCHAR(16));
      
      CREATE OR REPLACE PUMP "myPUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
              SELECT STREAM
                  l.r.COLUMN1,
                  l.r.COLUMN2,
                  l.r.COLUMN3,
                  l.r.COLUMN4,
                  l.r.COLUMN5,
                  l.r.COLUMN6,
                  l.r.COLUMN7
              FROM (SELECT STREAM W3C_LOG_PARSE("log", 'COMMON')
                    FROM "SOURCE_SQL_STREAM_001") AS l(r);
      ```

   1. 选择 **保存并运行 SQL**。在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。

# 示例：将字符串拆分到多个字段 (VARIABLE\$1COLUMN\$1LOG\$1PARSE 函数)
<a name="examples-transforming-strings-variablecolumnlogparse"></a>

此示例使用 `VARIABLE_COLUMN_LOG_PARSE` 函数在 Kinesis Data Analytics 中处理字符串列。`VARIABLE_COLUMN_LOG_PARSE` 将输入字符串拆分为多个字段，它们由分隔符或分隔符字符串分隔。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的 [VARIABLE\$1COLUMN\$1LOG\$1PARSE](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-variable-column-log-parse.html)。

在本示例中，您将半结构化记录写入到 Amazon Kinesis Data Stream 中。示例记录如下所示：

```
{ "Col_A" : "string",
  "Col_B" : "string",
  "Col_C" : "string",
  "Col_D_Unstructured" : "value,value,value,value"}
{ "Col_A" : "string",
  "Col_B" : "string",
  "Col_C" : "string",
  "Col_D_Unstructured" : "value,value,value,value"}
```



然后，您在控制台上创建一个 Kinesis Data Analytics 应用程序，并将 Kinesis 流作为流式传输源。发现过程读取流式传输源上的示例记录，并推断出具有四个列的应用程序内部架构，如下所示：

![\[控制台屏幕截图，显示具有 4 个列的应用程序内部架构。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/unstructured-10.png)


然后，您将使用应用程序代码和 `VARIABLE_COLUMN_LOG_PARSE` 函数解析逗号分隔的值，将规范化的行插入到其他应用程序内部流，如下所示：



![\[控制台屏幕截图，显示 实时分析 选项卡和应用程序内部流。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/unstructured-20.png)


**Topics**
+ [步骤 1：创建 Kinesis 数据流](#examples-transforming-strings-variablecolumnlogparse-1)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](#examples-transforming-strings-variablecolumnlogparse-2)

## 步骤 1：创建 Kinesis 数据流
<a name="examples-transforming-strings-variablecolumnlogparse-1"></a>

创建一个 Amazon Kinesis 数据流并填充日志记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建带有一个分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 运行以下 Python 代码以便填充示例日志记录。这段简单代码不断地将同一日志记录写入到流中。

   ```
    
   import json
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"}
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

   

## 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="examples-transforming-strings-variablecolumnlogparse-2"></a>

创建一个 Kinesis Data Analytics 应用程序，如下所示：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **创建应用程序**，键入应用程序名称，然后选择 **创建应用程序**。

1. 在应用程序详细信息页面上，选择 **连接流数据**。

1. 在 **连接到源** 页面上，执行以下操作：

   1. 选择在上一部分中创建的流。

   1. 选择创建 IAM 角色的选项。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。请注意推断的架构仅包含一列。

   1. 选择 **保存并继续**。

   

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中，编写应用程序代码并确认结果：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中：

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"(
                  "column_A" VARCHAR(16),
                  "column_B" VARCHAR(16),
                  "column_C" VARCHAR(16),
                  "COL_1" VARCHAR(16),             
                  "COL_2" VARCHAR(16),            
                  "COL_3" VARCHAR(16));
      
      CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM  t."Col_A", t."Col_B", t."Col_C",
                        t.r."COL_1", t.r."COL_2", t.r."COL_3"
         FROM (SELECT STREAM 
                 "Col_A", "Col_B", "Col_C",
                 VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured",
                                           'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)',
                                           ',') AS r 
               FROM "SOURCE_SQL_STREAM_001") as t;
      ```

   1. 选择 **保存并运行 SQL**。在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。

# 示例：转换 DateTime 值
<a name="app-string-datetime-manipulation"></a>

Amazon Kinesis Data Analytics 支持将列转换为时间戳。例如，除了 `ROWTIME` 列以外，建议您将自己的时间戳用作 `GROUP BY` 子句中另一个基于时间的窗口。Kinesis Data Analytics 提供用于处理日期和时间字段的操作和 SQL 函数。
+ **日期和时间运算符** - 您可以对日期、时间和间隔数据类型执行算术运算。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink* SQL 参考中的[日期、时间戳和间隔运算符](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-timestamp-interval.html)。

   
+ **SQL 函数** - 其中包括以下各项。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[日期和时间函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html)。
  + `EXTRACT()` - 从日期、时间、时间戳或间隔表达式中提取一个字段。
  + `CURRENT_TIME` - 返回执行查询的时间 (UTC)。
  + `CURRENT_DATE` - 返回执行查询的日期 (UTC)。
  + `CURRENT_TIMESTAMP` - 返回执行查询的时间戳 (UTC)。
  + `LOCALTIME` - 返回 Kinesis Data Analytics 运行环境所定义执行查询的当前时间 (UTC)。
  + `LOCALTIMESTAMP` - 返回 Kinesis Data Analytics 运行环境定义的当前时间戳 (UTC)。

     
+ **SQL 扩展** - 其中包括以下各项。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[日期和时间函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-date-time-functions.html)以及[日期时间转换函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-datetime-conversion-functions.html)。
  + `CURRENT_ROW_TIMESTAMP` - 为流中的每一行返回一个新的时间戳。
  + `TSDIFF` - 返回两个时间戳的差异 (以毫秒为单位)。
  + `CHAR_TO_DATE` - 将字符串转换为日期。
  + `CHAR_TO_TIME` - 将字符串转换为时间。
  + `CHAR_TO_TIMESTAMP` - 将字符串转换为时间戳。
  + `DATE_TO_CHAR` - 将日期转换为字符串。
  + `TIME_TO_CHAR` - 将时间转换为字符串。
  + `TIMESTAMP_TO_CHAR` - 将时间戳转换为字符串。

上面大多数 SQL 函数都采用一种格式来转换列。格式是灵活多变的。例如，您可以指定采用格式 `yyyy-MM-dd hh:mm:ss` 将输入字符串 `2009-09-16 03:15:24` 转换为时间戳。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[字符到时间戳 (Sys)](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-char-to-timestamp.html)。

## 示例：转换日期
<a name="examples-transforming-dates"></a>

在本示例中，您将以下记录写入到 Amazon Kinesis 数据流中。

```
{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"}
{"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"}
{"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"}
{"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"}
...
```



然后，您在控制台上创建一个 Kinesis Data Analytics 应用程序，并将 Kinesis 流作为流式传输源。发现过程读取流式传输源中的示例记录，并推断出具有两个列 (`EVENT_TIME` 和 `TICKER`) 的如下应用程序内部架构。

![\[控制台屏幕截图，显示具有事件时间和股票代码列的应用程序内部架构。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_datetime_convert_0.png)


然后，将该应用程序代码与 SQL 函数结合使用，以多种方式转换 `EVENT_TIME` 时间戳字段。随后将结果数据插入另一个应用程序内部流，如下面的屏幕截图所示：



![\[控制台屏幕截图，显示应用程序内部流中的结果数据。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_datetime_convert_1.png)




### 步骤 1：创建 Kinesis 数据流
<a name="examples-transforming-dates-1"></a>

创建一个 Amazon Kinesis Data Stream 并填充事件时间和股票代码记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建带有一个分片的流。

1. 运行以下 Python 代码以便用示例数据填充流。此简单代码会不断将具有随机股票代号和当前时间戳的记录写入流中。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

### 步骤 2：创建 Amazon Kinesis Data Analytics 应用程序
<a name="examples-transforming-dates-2"></a>

按如下方式创建应用程序：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **创建应用程序**，键入应用程序名称，然后选择 **创建应用程序**。

1. 在应用程序详细信息页面上，选择 **连接流数据**，以连接到源。

1. 在 **连接到源** 页面上，执行以下操作：

   1. 选择在上一部分中创建的流。

   1. 选择以创建 IAM 角色。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有两列。

   1. 选择 **编辑架构**。将 **EVENT\$1TIME** 列的 **列类型** 更改为 `TIMESTAMP`。

   1. 选择 **保存架构并更新流示例**。在控制台保存架构后，选择 **退出**。

   1. 选择 **保存并继续**。

   

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果，如下所示：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          TICKER VARCHAR(4), 
          event_time TIMESTAMP, 
          five_minutes_before TIMESTAMP, 
          event_unix_timestamp BIGINT,
          event_timestamp_as_char VARCHAR(50),
          event_second INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
      
      SELECT STREAM 
          TICKER, 
          EVENT_TIME,
          EVENT_TIME - INTERVAL '5' MINUTE,
          UNIX_TIMESTAMP(EVENT_TIME),
          TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME),
          EXTRACT(SECOND FROM EVENT_TIME) 
      FROM "SOURCE_SQL_STREAM_001"
      ```

   1. 选择 **保存并运行 SQL**。在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。

# 示例：转换多个数据类型
<a name="app-tworecordtypes"></a>

 提取、转换和加载 (ETL) 应用程序的共同要求是处理流式传输源中的多种记录。您可以通过创建一个 Kinesis Data Analytics 应用程序来处理此类流式传输源。流程如下：

1. 首先，您将流式传输源映射到应用程序内部输入流，与所有其他 Kinesis Data Analytics 应用程序类似。

1. 然后，在应用程序代码中编写 SQL 语句，从应用程序内部输入流中检索特定类型的行。然后，将这些行插入单独的应用程序内部流。(您可以在应用程序代码中创建其他应用程序内部流)。

在本练习中，您具有一个可接收两种类型 (`Order` 和 `Trade`) 记录的流式传输源。它们分别表示库存订单和相应交易。每批订单可以有零笔或多笔交易。下面显示了每个类型的示例记录：

**Order record**

```
{"RecordType": "Order", "Oprice": 9047, "Otype": "Sell", "Oid": 3811, "Oticker": "AAAA"}
```

**Trade record**

```
{"RecordType": "Trade", "Tid": 1, "Toid": 3812, "Tprice": 2089, "Tticker": "BBBB"}
```

使用创建应用程序时 AWS 管理控制台，控制台会显示所创建的应用程序内输入流的以下推断架构。默认情况下，控制台将该应用程序内部流命名为 `SOURCE_SQL_STREAM_001`。

![\[控制台屏幕截图，显示格式化的应用程序内部流示例。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/two-record-types-10.png)


在保存配置时，Amazon Kinesis Data Analytics 持续从流式传输源中读取数据，并在应用程序内部流中插入行。现在，您可以对应用程序内部流中的数据进行分析。

在本示例的应用程序代码中，首先创建两个额外的应用程序内部流：`Order_Stream` 和 `Trade_Stream`。然后，根据记录类型筛选 `SOURCE_SQL_STREAM_001` 流中的行，使用数据泵将这些行插入到新创建的流。有关此编码模式的信息，请参阅[应用程序代码](how-it-works-app-code.md)。

1. 将订单和交易行筛选到单独的应用程序内部流:

   1. 筛选 `SOURCE_SQL_STREAM_001` 中的订单记录，并将订单保存到 `Order_Stream`。

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  order_id     integer, 
                  order_type   varchar(10),
                  ticker       varchar(4),
                  order_price  DOUBLE, 
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM oid, otype,oticker, oprice, recordtype 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Order';
      ```

   1. 筛选 `SOURCE_SQL_STREAM_001` 中的交易记录，并将订单保存到 `Trade_Stream`。

      ```
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 (trade_id     integer, 
                  order_id     integer, 
                  trade_price  DOUBLE, 
                  ticker       varchar(4),
                  record_type  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM tid, toid, tprice, tticker, recordtype
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  recordtype = 'Trade';
      ```

1. 现在，您可以对这些流进行其他分析。在本示例中，您将按股票在时长一分钟的[滚动窗口](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/tumbling-window-concepts.html)中计算交易数，并将结果保存到另一个流 `DESTINATION_SQL_STREAM`。

   ```
   --do some analytics on the Trade_Stream and Order_Stream. 
   -- To see results in console you must write to OPUT_SQL_STREAM.
   
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
               ticker  varchar(4),
               trade_count   integer
               );
   
   CREATE OR REPLACE PUMP "Output_Pump" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM ticker, count(*) as trade_count
         FROM   "Trade_Stream"
         GROUP BY ticker,
                   FLOOR("Trade_Stream".ROWTIME TO MINUTE);
   ```

   此时将显示如下结果：  
![\[控制台屏幕截图，在 SQL 结果选项卡中显示结果。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/two-record-types-20.png)

**Topics**
+ [步骤 1：准备数据](tworecordtypes-prepare.md)
+ [步骤 2：创建应用程序](tworecordtypes-create-app.md)

**下一个步骤**  
[步骤 1：准备数据](tworecordtypes-prepare.md)

# 步骤 1：准备数据
<a name="tworecordtypes-prepare"></a>

在此部分中，您创建一个 Kinesis 数据流，然后在该流上填充订单和交易记录。此式源将用于下一步创建的应用程序。

**Topics**
+ [步骤 1.1：创建流式传输源](#tworecordtypes-prepare-create-stream)
+ [步骤 1.2：填充流式传输源](#tworecordtypes-prepare-populate-stream)

## 步骤 1.1：创建流式传输源
<a name="tworecordtypes-prepare-create-stream"></a>

可以使用控制台或 AWS CLI创建一个 Kinesis 数据流。本示例采用 `OrdersAndTradesStream` 作为流名称。
+ **使用控制台** [— 登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)选择 **Data Streams**，然后创建带有一个分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。
+ **使用 AWS CLI— 使用以**下 Kinesis `create-stream` AWS CLI 命令创建直播：

  ```
  $ aws kinesis create-stream \
  --stream-name OrdersAndTradesStream \
  --shard-count 1 \
  --region us-east-1 \
  --profile adminuser
  ```

## 步骤 1.2：填充流式传输源
<a name="tworecordtypes-prepare-populate-stream"></a>

运行以下 Python 脚本以便在 `OrdersAndTradesStream` 中填充示例记录。如果您使用其他名称创建了流，请相应更新 Python 代码。

1. 安装 Python 和 `pip`。

   有关安装 Python 的信息，请访问 [Python](https://www.python.org/) 网站。

   您可以使用 pip 安装依赖项。有关安装 pip 的信息，请参阅 pip 网站上的[安装](https://pip.pypa.io/en/stable/installing/)。

1. 运行以下 Python 代码。代码中的 `put-record` 命令将 JSON 记录写入到流。

   ```
    
   import json
   import random
   import boto3
   
   STREAM_NAME = "OrdersAndTradesStream"
   PARTITION_KEY = "partition_key"
   
   
   def get_order(order_id, ticker):
       return {
           "RecordType": "Order",
           "Oid": order_id,
           "Oticker": ticker,
           "Oprice": random.randint(500, 10000),
           "Otype": "Sell",
       }
   
   
   def get_trade(order_id, trade_id, ticker):
       return {
           "RecordType": "Trade",
           "Tid": trade_id,
           "Toid": order_id,
           "Tticker": ticker,
           "Tprice": random.randint(0, 3000),
       }
   
   
   def generate(stream_name, kinesis_client):
       order_id = 1
       while True:
           ticker = random.choice(["AAAA", "BBBB", "CCCC"])
           order = get_order(order_id, ticker)
           print(order)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY
           )
           for trade_id in range(1, random.randint(0, 6)):
               trade = get_trade(order_id, trade_id, ticker)
               print(trade)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(trade),
                   PartitionKey=PARTITION_KEY,
               )
           order_id += 1
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**下一个步骤**  
 [步骤 2：创建应用程序](tworecordtypes-create-app.md)

# 步骤 2：创建应用程序
<a name="tworecordtypes-create-app"></a>

在此部分中，您创建一个 Kinesis Data Analytics 应用程序。然后，通过添加将您在前一部分中创建的流式传输源映射到应用程序内部输入流的输入配置，您可以更新应用程序。

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择**创建应用程序**。此示例使用应用程序名称 **ProcessMultipleRecordTypes**。

1. 在应用程序详细信息页面上，选择 **连接流数据**，以连接到源。

1. 在 **连接到源** 页面上，执行以下操作：

   1. 选择在[步骤 1：准备数据](tworecordtypes-prepare.md)中创建的流。

   1. 选择以创建 IAM 角色。

   1. 等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。

   1. 选择 **保存并继续**。

1. 在应用程序中心上，选择 **Go to SQL editor**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      ```
      --Create Order_Stream.
      CREATE OR REPLACE STREAM "Order_Stream" 
                 ( 
                  "order_id"     integer, 
                  "order_type"   varchar(10),
                  "ticker"       varchar(4),
                  "order_price"  DOUBLE, 
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Order_Pump" AS 
         INSERT INTO "Order_Stream"
            SELECT STREAM "Oid", "Otype","Oticker", "Oprice", "RecordType" 
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Order';
      --********************************************
      --Create Trade_Stream.      
      CREATE OR REPLACE STREAM "Trade_Stream" 
                 ("trade_id"     integer, 
                  "order_id"     integer, 
                  "trade_price"  DOUBLE, 
                  "ticker"       varchar(4),
                  "record_type"  varchar(10)
                  );
      
      CREATE OR REPLACE PUMP "Trade_Pump" AS 
         INSERT INTO "Trade_Stream"
            SELECT STREAM "Tid", "Toid", "Tprice", "Tticker", "RecordType"
            FROM   "SOURCE_SQL_STREAM_001"
            WHERE  "RecordType" = 'Trade';
      --*****************************************************************
      --do some analytics on the Trade_Stream and Order_Stream. 
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                  "ticker"  varchar(4),
                  "trade_count"   integer
                  );
      
      CREATE OR REPLACE PUMP "Output_Pump" AS 
         INSERT INTO "DESTINATION_SQL_STREAM"
            SELECT STREAM "ticker", count(*) as trade_count
            FROM   "Trade_Stream"
            GROUP BY "ticker",
                      FLOOR("Trade_Stream".ROWTIME TO MINUTE);
      ```

   1. 选择 **保存并运行 SQL**。选择 **Real-time analytics (实时分析)** 选项卡可查看应用程序已创建的所有应用程序内部流并验证数据。

   

**下一个步骤**  
您可以配置应用程序输出以将结果永久保存到外部目标中，例如，另一个 Kinesis 流或 Firehose 数据传输流。

# 示例：窗口和聚合
<a name="examples-window"></a>

本部分提供使用窗口式查询和聚合查询的 Amazon Kinesis Data Analytics 应用程序示例。（有关更多信息，请参阅 [窗口式查询](windowed-sql.md)。） 每个示例都提供了设置 Kinesis Data Analytics 应用程序的 step-by-step说明和示例代码。

**Topics**
+ [示例：交错窗口](examples-window-stagger.md)
+ [示例：使用 ROWTIME 的滚动窗口](examples-window-tumbling-rowtime.md)
+ [示例：使用事件时间戳的滚动窗口](examples-window-tumbling-event.md)
+ [示例：检索最常出现的值 (TOP\$1K\$1ITEMS\$1TUMBLING)](examples-window-topkitems.md)
+ [示例：从查询中聚合部分结果](examples-window-partialresults.md)

# 示例：交错窗口
<a name="examples-window-stagger"></a>

当窗口化查询处理每个唯一分区键的单独窗口时，从具有匹配键的数据到达时开始，该窗口被称为*交错窗口*。有关更多信息，请参阅 [交错窗口](stagger-window-concepts.md)。此 Amazon Kinesis Data Analytics 示例使用 EVENT\$1TIME 和 TICKER 列创建交错窗口。源流包含具有相同 EVENT\$1TIME 和 TICKER 值的六个记录组成的组，这些值在一分钟时间内到达，但不一定具有相同的分钟值（例如 `18:41:xx`）。

在本示例中，您在以下时间将以下记录写入到 Kinesis 数据流中。该脚本不会将时间写入流，但应用程序接收记录的时间将写入 `ROWTIME` 字段：

```
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:30
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:40
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:17:50
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:00
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:10
{"EVENT_TIME": "2018-08-01T20:17:20.797945", "TICKER": "AMZN"}   20:18:21
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:31
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:41
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:18:51
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:01
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:11
{"EVENT_TIME": "2018-08-01T20:18:21.043084", "TICKER": "INTC"}   20:19:21
...
```



然后，你可以在中 AWS 管理控制台创建一个 Kinesis Data Analytics 应用程序，将 Kinesis 数据流作为流媒体源。发现过程读取流式传输源中的示例记录，并推断出具有两个列（`EVENT_TIME` 和 `TICKER`）的如下所示的应用程序内部架构。

![\[控制台屏幕截图，显示具有价格和股票代码列的应用程序内部架构。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_stagger_schema.png)


您使用应用程序代码以及 `COUNT` 函数以创建数据的窗口式聚合。然后，将结果数据插入另一个应用程序内部流，如下面的屏幕截图所示：



![\[控制台屏幕截图，显示应用程序内部流中的结果数据。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_stagger.png)


在以下过程中，您创建一个 Kinesis Data Analytics 应用程序，它在基于 EVENT\$1TIME 和 TICKER 的交错窗口中聚合输入流中的值。

**Topics**
+ [步骤 1：创建 Kinesis 数据流](#examples-stagger-window-1)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](#examples-stagger-window-2)

## 步骤 1：创建 Kinesis 数据流
<a name="examples-stagger-window-1"></a>

创建一个 Amazon Kinesis Data Stream 并填充记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建具有一个分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 要在生产环境中将记录写入到 Kinesis 数据流，我们建议您使用 [Kinesis 创建器库](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)或 [Kinesis 数据流 API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。为简单起见，此示例使用以下 Python 脚本以便生成记录。运行此代码以填充示例股票代码记录。这段简单代码在一分钟时间中连续地将一组六个记录与相同的随机 `EVENT_TIME` 和股票代码符号一起写入流中。让脚本保持运行，以便可以在后面的步骤中生成应用程序架构。

   ```
    
   import datetime
   import json
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10)
       return {
           "EVENT_TIME": event_time.isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           # Send six records, ten seconds apart, with the same event time and ticker
           for _ in range(6):
               print(data)
               kinesis_client.put_record(
                   StreamName=stream_name,
                   Data=json.dumps(data),
                   PartitionKey="partitionkey",
               )
               time.sleep(10)
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="examples-stagger-window-2"></a>

创建一个 Kinesis Data Analytics 应用程序，如下所示：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **创建应用程序**，键入应用程序名称，然后选择 **创建应用程序**。

1. 在应用程序详细信息页面上，选择 **连接流数据**，以连接到源。

1. 在 **连接到源** 页面上，执行以下操作：

   

   1. 选择在上一部分中创建的流。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有两列。

   1. 选择 **编辑架构**。将 **EVENT\$1TIME** 列的 **列类型** 更改为 `TIMESTAMP`。

   1. 选择 **保存架构并更新流示例**。在控制台保存架构后，选择 **退出**。

   1. 选择 **保存并继续**。

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
          event_time TIMESTAMP,
          ticker_symbol    VARCHAR(4),
          ticker_count     INTEGER);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM 
              EVENT_TIME, 
              TICKER,
              COUNT(TICKER) AS ticker_count
          FROM "SOURCE_SQL_STREAM_001"
          WINDOWED BY STAGGER (
                  PARTITION BY TICKER, EVENT_TIME RANGE INTERVAL '1' MINUTE);
      ```

   1. 选择 **保存并运行 SQL**。

      在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。

# 示例：使用 ROWTIME 的滚动窗口
<a name="examples-window-tumbling-rowtime"></a>

当一个窗口式查询以非重叠方式处理每个窗口时，这样的窗口称为*滚动窗口*。有关更多信息，请参阅 [滚动窗口（使用 GROUP BY 组的聚合）](tumbling-window-concepts.md)。此 Amazon Kinesis Data Analytics 示例使用 `ROWTIME` 列创建滚动窗口。`ROWTIME` 列表示应用程序读取该记录的时间。

在本示例中，您将以下记录写入到 Kinesis 数据流中。

```
{"TICKER": "TBV", "PRICE": 33.11}
{"TICKER": "INTC", "PRICE": 62.04}
{"TICKER": "MSFT", "PRICE": 40.97}
{"TICKER": "AMZN", "PRICE": 27.9}
...
```



然后，你可以在中 AWS 管理控制台创建一个 Kinesis Data Analytics 应用程序，将 Kinesis 数据流作为流媒体源。发现过程读取流式传输源中的示例记录，并推断出具有两个列（`TICKER` 和 `PRICE`）的如下所示的应用程序内部架构。

![\[控制台屏幕截图，显示具有价格和股票代码列的应用程序内部架构。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime_schema.png)


您使用应用程序代码以及 `MIN` 和 `MAX` 函数以创建数据的窗口式聚合。然后，将结果数据插入另一个应用程序内部流，如下面的屏幕截图所示：



![\[控制台屏幕截图，显示应用程序内部流中的结果数据。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_tumbling_rowtime.png)


在以下过程中，您创建一个 Kinesis Data Analytics 应用程序，它在基于 ROWTIME 的滚动窗口中聚合输入流中的值。

**Topics**
+ [步骤 1：创建 Kinesis 数据流](#examples-tumbling-window-1)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](#examples-tumbling-window-2)

## 步骤 1：创建 Kinesis 数据流
<a name="examples-tumbling-window-1"></a>

创建一个 Amazon Kinesis Data Stream 并填充记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建具有一个分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 要在生产环境中将记录写入到 Kinesis 数据流，我们建议您使用 [Kinesis 客户端库](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)或 [Kinesis 数据流 API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。为简单起见，此示例使用以下 Python 脚本以便生成记录。运行此代码以填充示例股票代码记录。这段简单代码不断地将随机的股票代码记录写入到流中。让脚本保持运行，以便可以在后面的步骤中生成应用程序架构。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="examples-tumbling-window-2"></a>

创建一个 Kinesis Data Analytics 应用程序，如下所示：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **创建应用程序**，输入应用程序名称，然后选择 **创建应用程序**。

1. 在应用程序详细信息页面上，选择 **连接流数据**，以连接到源。

1. 在 **连接到源** 页面上，执行以下操作：

   

   1. 选择在上一部分中创建的流。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有两列。

   1. 选择 **保存架构并更新流示例**。在控制台保存架构后，选择 **退出**。

   1. 选择 **保存并继续**。

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (TICKER VARCHAR(4), MIN_PRICE REAL, MAX_PRICE REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM TICKER, MIN(PRICE), MAX(PRICE)
              FROM "SOURCE_SQL_STREAM_001"
              GROUP BY TICKER, 
                  STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
      ```

   1. 选择 **保存并运行 SQL**。

      在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。

# 示例：使用事件时间戳的滚动窗口
<a name="examples-window-tumbling-event"></a>

当一个窗口式查询以非重叠方式处理每个窗口时，这样的窗口称为*滚动窗口*。有关更多信息，请参阅 [滚动窗口（使用 GROUP BY 组的聚合）](tumbling-window-concepts.md)。此 Amazon Kinesis Data Analytics 示例说明了一个使用事件时间戳的滚动窗口，这是一个用户创建的时间戳，它包含在流数据中。它使用这种方法而不是只使用 ROWTIME，这是 Kinesis Data Analytics 在应用程序收到记录时创建的时间戳。如果您想根据事件发生的时间而非应用程序收到此事件的时间创建一个聚合，则可以在流数据中使用事件时间戳。在本例中，`ROWTIME` 值每分钟触发一次此聚合，`ROWTIME` 和所包含事件时间都会聚合记录。

在本示例中，您将以下记录写入到 Amazon Kinesis 流中。`EVENT_TIME` 值在过去设置为 5 秒以模拟处理和传输滞后，而滞后可能导致在发生事件和将记录提取到 Kinesis Data Analytics 之间出现延迟。

```
{"EVENT_TIME": "2018-06-13T14:11:05.766191", "TICKER": "TBV", "PRICE": 43.65}
{"EVENT_TIME": "2018-06-13T14:11:05.848967", "TICKER": "AMZN", "PRICE": 35.61}
{"EVENT_TIME": "2018-06-13T14:11:05.931871", "TICKER": "MSFT", "PRICE": 73.48}
{"EVENT_TIME": "2018-06-13T14:11:06.014845", "TICKER": "AMZN", "PRICE": 18.64}
...
```



然后，你可以在中 AWS 管理控制台创建一个 Kinesis Data Analytics 应用程序，将 Kinesis 数据流作为流媒体源。发现过程读取流式传输源中的示例记录，并推断出具有三个列（`EVENT_TIME`、`TICKER` 和 `PRICE`）的如下所示的应用程序内部架构。

![\[控制台屏幕截图，显示具有事件时间、股票代码和价格列的应用程序内部架构。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_tumbling_event_schema.png)


您使用应用程序代码以及 `MIN` 和 `MAX` 函数以创建数据的窗口式聚合。然后，将结果数据插入另一个应用程序内部流，如下面的屏幕截图所示：



![\[控制台屏幕截图，显示应用程序内部流中的结果数据。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_tumbling_event.png)


在以下过程中，您创建一个 Kinesis Data Analytics 应用程序，它在基于事件时间的滚动窗口中聚合输入流中的值。

**Topics**
+ [步骤 1：创建 Kinesis 数据流](#examples-window-tumbling-event-1)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](#examples-window-tumbling-event-2)

## 步骤 1：创建 Kinesis 数据流
<a name="examples-window-tumbling-event-1"></a>

创建一个 Amazon Kinesis Data Stream 并填充记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建具有一个分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 要在生产环境中将记录写入到 Kinesis 数据流，我们建议您使用 [Kinesis 客户端库](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)或 [Kinesis 数据流 API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。为简单起见，此示例使用以下 Python 脚本以便生成记录。运行此代码以填充示例股票代码记录。这段简单代码不断地将随机的股票代码记录写入到流中。让脚本保持运行，以便可以在后面的步骤中生成应用程序架构。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="examples-window-tumbling-event-2"></a>

创建一个 Kinesis Data Analytics 应用程序，如下所示：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **创建应用程序**，输入应用程序名称，然后选择 **创建应用程序**。

1. 在应用程序详细信息页面上，选择 **连接流数据**，以连接到源。

1. 在 **连接到源** 页面上，执行以下操作：

   

   1. 选择在上一部分中创建的流。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构有三列。

   1. 选择 **编辑架构**。将 **EVENT\$1TIME** 列的 **列类型** 更改为 `TIMESTAMP`。

   1. 选择 **保存架构并更新流示例**。在控制台保存架构后，选择 **退出**。

   1. 选择 **保存并继续**。

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中。

      ```
      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (EVENT_TIME timestamp, TICKER VARCHAR(4), min_price REAL, max_price REAL);
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
        INSERT INTO "DESTINATION_SQL_STREAM" 
          SELECT STREAM STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND),
              TICKER,
               MIN(PRICE) AS MIN_PRICE,
               MAX(PRICE) AS MAX_PRICE
          FROM    "SOURCE_SQL_STREAM_001"
          GROUP BY TICKER, 
                   STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND), 
                   STEP("SOURCE_SQL_STREAM_001".EVENT_TIME BY INTERVAL '60' SECOND);
      ```

   1. 选择 **保存并运行 SQL**。

      在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。

# 示例：检索最常出现的值 (TOP\$1K\$1ITEMS\$1TUMBLING)
<a name="examples-window-topkitems"></a>

此 Amazon Kinesis Data Analytics 示例说明了如何使用 `TOP_K_ITEMS_TUMBLING` 函数在滚动窗口中检索最常出现的值。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[`TOP_K_ITEMS_TUMBLING` 函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/top-k.html)。

在对数万或数十万个密钥进行聚合时，如果您希望降低资源占用，则 `TOP_K_ITEMS_TUMBLING` 函数很有用。该函数生成的结果与使用 `GROUP BY` 和 `ORDER BY` 子句进行聚合一样。

在本示例中，您将以下记录写入到 Amazon Kinesis 数据流中：

```
{"TICKER": "TBV"}
{"TICKER": "INTC"}
{"TICKER": "MSFT"}
{"TICKER": "AMZN"}
...
```



然后，你可以在中 AWS 管理控制台创建一个 Kinesis Data Analytics 应用程序，将 Kinesis 数据流作为流媒体源。发现过程读取流式传输源上的示例记录，并推断出具有一个列 (`TICKER`) 的应用程序内部架构，如下所示：

![\[控制台屏幕截图，显示具有股票代码列的应用程序内部架构。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_topk_schema.png)


您使用应用程序代码以及 `TOP_K_VALUES_TUMBLING` 函数以创建数据的窗口式聚合。然后，将结果数据插入另一个应用程序内部流，如下面的屏幕截图所示：



![\[控制台屏幕截图，显示应用程序内部流中的结果数据。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_topk.png)


在以下过程中，您创建一个 Kinesis Data Analytics 应用程序，它在输入流中检索最常出现的值。

**Topics**
+ [步骤 1：创建 Kinesis 数据流](#examples-window-topkitems-1)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](#examples-window-topkitems-2)

## 步骤 1：创建 Kinesis 数据流
<a name="examples-window-topkitems-1"></a>

创建一个 Amazon Kinesis Data Stream 并填充记录，如下所示：

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **数据流**。

1. 选择 **创建 Kinesis 流**，然后创建具有一个分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

1. 要在生产环境中将记录写入到 Kinesis 数据流，我们建议您使用 [Kinesis 客户端库](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html)或 [Kinesis 数据流 API](https://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-sdk.html)。为简单起见，此示例使用以下 Python 脚本以便生成记录。运行此代码以填充示例股票代码记录。这段简单代码不断地将随机的股票代码记录写入到流中。让脚本保持运行，以便可以在后面的步骤中生成应用程序架构。

   ```
    
   import datetime
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_data():
       return {
           "EVENT_TIME": datetime.datetime.now().isoformat(),
           "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]),
           "PRICE": round(random.random() * 100, 2),
       }
   
   
   def generate(stream_name, kinesis_client):
       while True:
           data = get_data()
           print(data)
           kinesis_client.put_record(
               StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey"
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

## 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="examples-window-topkitems-2"></a>

创建一个 Kinesis Data Analytics 应用程序，如下所示：

1. [在 /kinesisanalytics 上打开适用于 Apache Flink 的托管服务控制台。 https://console.aws.amazon.com](https://console.aws.amazon.com/kinesisanalytics)

1. 选择 **创建应用程序**，键入应用程序名称，然后选择 **创建应用程序**。

1. 在应用程序详细信息页面上，选择 **连接流数据**，以连接到源。

1. 在 **连接到源** 页面上，执行以下操作：

   

   1. 选择在上一部分中创建的流。

   1. 选择 **发现架构**。等待控制台显示推断的架构和为创建的应用程序内部流推断架构所使用的示例记录。推断的架构包含一列。

   1. 选择 **保存架构并更新流示例**。在控制台保存架构后，选择 **退出**。

   1. 选择 **保存并继续**。

1. 在应用程序详细信息页面上，选择 **转到 SQL编辑器**。要启动应用程序，请在显示的对话框中选择 **是，启动应用程序**。

1. 在 SQL 编辑器中编写应用程序代码并确认结果如下所示：

   1. 复制下面的应用程序代码并将其粘贴到编辑器中：

      ```
      CREATE OR REPLACE STREAM DESTINATION_SQL_STREAM (
        "TICKER" VARCHAR(4), 
        "MOST_FREQUENT_VALUES" BIGINT
      );
      
      CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
          INSERT INTO "DESTINATION_SQL_STREAM"
          SELECT STREAM * 
              FROM TABLE (TOP_K_ITEMS_TUMBLING(
                  CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"),
                  'TICKER',         -- name of column in single quotes
                  5,                       -- number of the most frequently occurring values
                  60                       -- tumbling window size in seconds
                  )
              );
      ```

   1. 选择 **保存并运行 SQL**。

      在 **实时分析** 选项卡上，可以查看应用程序已创建的所有应用程序内部流并验证数据。

# 示例：从查询中聚合部分结果
<a name="examples-window-partialresults"></a>

如果 Amazon Kinesis 数据流包含的记录具有与提取时间不完全匹配的事件时间，在滚动窗口中选择的结果将包含在窗口中到达但未必发生的记录。在这种情况下，滚动窗口只包含您需要的部分结果集。您可以通过多种方法来纠正这一问题：
+ 仅使用滚动窗口，并使用 upsert 通过数据库或数据仓库在后处理中聚合部分结果。这种方法在处理应用程序时很有效。它为聚合运算符（`sum`、`min`、`max` 等）无限期地处理后期数据。这种方法的缺点是，您必须在数据库层开发和维护额外的应用程序逻辑。
+ 使用滚动和滑动窗口，这会在早期生成部分结果，还会继续在滑动窗口期间生成完整的结果。此方法使用 overwrite 操作而非 upsert 操作来处理新近数据，这样就不需要在数据库层添加任何其他应用程序逻辑。这种方法的缺点是它使用更多的 Kinesis 处理单元 (KPUs)，但仍会产生两个结果，这可能不适用于某些用例。

有关滚动和滑动窗口的更多信息，请参阅[窗口式查询](windowed-sql.md)。

在以下过程中，滚动窗口聚合会生成两个部分结果（发送到 `CALC_COUNT_SQL_STREAM` 应用程序内部流），它们必须合并以生成最终结果。然后，应用程序生成第二个聚合（发送到 `DESTINATION_SQL_STREAM` 应用程序内部流），它合并这两个部分结果。

**创建使用事件时间聚合部分结果的应用程序**

1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中，选择 **Data Analytics (数据分析)**。按照 [Amazon Kinesis Data Analytics·for·SQL 应用程序入门](getting-started.md) 教程中所述，创建一个 Kinesis Data Analytics 应用程序。

1. 在 SQL 编辑器中，将应用程序代码替换为以下内容：

   ```
   CREATE OR REPLACE STREAM "CALC_COUNT_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);
   	            
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
       (TICKER      VARCHAR(4), 
       TRADETIME   TIMESTAMP, 
       TICKERCOUNT       DOUBLE);            
   	
   CREATE PUMP "CALC_COUNT_SQL_PUMP_001" AS 
       INSERT INTO "CALC_COUNT_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER_SYMBOL",
           STEP("SOURCE_SQL_STREAM_001"."ROWTIME" BY INTERVAL '1' MINUTE) as "TradeTime",
           COUNT(*) AS "TickerCount"
       FROM "SOURCE_SQL_STREAM_001"
       GROUP BY
           STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE),
           STEP("SOURCE_SQL_STREAM_001"."APPROXIMATE_ARRIVAL_TIME" BY INTERVAL '1' MINUTE),
           TICKER_SYMBOL;
   
   CREATE PUMP "AGGREGATED_SQL_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" ("TICKER","TRADETIME", "TICKERCOUNT")
       SELECT STREAM
           "TICKER",
           "TRADETIME",
           SUM("TICKERCOUNT") OVER W1 AS "TICKERCOUNT"
       FROM "CALC_COUNT_SQL_STREAM"
       WINDOW W1 AS (PARTITION BY "TRADETIME" RANGE INTERVAL '10' MINUTE PRECEDING);
   ```

   应用程序代码中的 `SELECT` 语句将在 `SOURCE_SQL_STREAM_001` 中筛选出显示股票价格更改大于 1% 的行，并使用数据泵将这些行插入另一个应用程序内部流 `CHANGE_STREAM`。

1. 选择 **保存并运行 SQL**。

第一个数据泵将流输出到与以下内容类似的 `CALC_COUNT_SQL_STREAM`。请注意，结果集不完整：

![\[显示部分结果的控制台屏幕截图。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_partial_0.png)


然后，第二个泵将流输出到 `DESTINATION_SQL_STREAM`，其中包含完整的结果集：

![\[显示完整结果的控制台屏幕截图。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex_partial_1.png)


# 示例：联接
<a name="examples-joins"></a>

此部分提供了使用联接查询的 数据分析应用程序示例。每个示例都提供了设置和测试 Kinesis Data Analytics 应用程序的 step-by-step说明。

**Topics**
+ [示例：在 应用程序中添加引用数据](app-add-reference-data.md)

# 示例：在 应用程序中添加引用数据
<a name="app-add-reference-data"></a>

在本练习中，您在现有 数据分析应用程序中添加引用数据。有关引用数据的信息，请参阅以下主题：
+ [Amazon Kinesis Data Analytics for SQL 应用程序：工作原理](how-it-works.md)
+ [配置应用程序输入](how-it-works-input.md)

在本练习中，您将引用数据添加到在 Kinesis Data Analytics [入门](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)练习中创建的应用程序。引用数据提供每个股票代号对应的公司名称；例如：

```
Ticker, Company
AMZN,Amazon
ASD, SomeCompanyA
MMB, SomeCompanyB
WAS,  SomeCompanyC
```

首先，请完成[入门](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)练习中的步骤，以创建一个启动应用程序。然后，执行以下步骤进行设置，并将引用数据添加到应用程序：

1. **准备数据**
   + 将上述参考数据作为对象存储在 Amazon Simple Storage Service (Amazon S3) 中
   + 创建 IAM 角色，Kinesis Data Analytics 可以担任该角色来代表您读取 Amazon S3 对象。

1. **将引用数据源添加到您的应用程序。**

   读取 对象，并创建应用程序内部引用表，您可以在应用程序代码中查询该表。

1. **测试代码。**

   在应用程序代码中，将编写一个联接查询来联接应用程序内部流和应用程序内部引用表，从而获取每个股票代码的对应公司名称。

**Topics**
+ [步骤 1：准备](#add-refdata-prepare)
+ [步骤 2：将引用数据源添加到应用程序配置中](#add-refdata-create-iamrole)
+ [步骤 3：测试：查询应用程序内部引用表](#add-refdata-test)

## 步骤 1：准备
<a name="add-refdata-prepare"></a>

在此部分中，您将示例引用数据作为对象存储在 存储桶中。您还将创建 IAM 角色，可担任该角色来代表您读取对象。

### 将引用数据存储为 Amazon S3 对象。
<a name="prepare-create-s3object"></a>

在这一步中，将示例引用数据存储为 Amazon S3 对象。

1. 打开文本编辑器，添加以下数据，然后将文件另存为 `TickerReference.csv`。

   ```
   Ticker, Company
   AMZN,Amazon
   ASD, SomeCompanyA
   MMB, SomeCompanyB
   WAS,  SomeCompanyC
   ```

   

1. 将 `TickerReference.csv` 文件上传到 S3 存储桶。有关说明，请参阅 *Amazon Simple Storage Service 用户指南*中的[上传对象](https://docs.aws.amazon.com/AmazonS3/latest/userguide/UploadingObjectsintoAmazonS3.html)。

### 创建 IAM 角色
<a name="prepare-create-iamrole"></a>

接下来，创建一个 Kinesis Data Analytics 可以代入的 IAM 角色并读取 Amazon S3 对象。

1. 在 AWS Identity and Access Management (IAM) 中，创建一个名为的 IAM 角色**KinesisAnalytics-ReadS3Object**。要创建该角色，请按照 [IAM 用户指南](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create_for-service.html#roles-creatingrole-service-console)中的*为 Amazon Web Service 创建角色（AWS 管理控制台）*中的说明操作。

   在 IAM 控制台上，指定以下项：
   + 再**选择角色类型**中，选择 **AWS Lambda**。创建角色后，您将更改信任策略以允许 Kinesis Data Analytics（ AWS Lambda不是）担任该角色。
   + 不要在 **Attach Policy** 页面上附加任何策略。

1. 更新 IAM 角色策略：

   

   1. 在 IAM 控制台上，选择您创建的角色。

   1. 在**信任关系**选项卡上，更新信任策略以便为 Kinesis Data Analytics 授予权限以代入该角色。下面显示了信任策略：

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Principal": {
              "Service": "kinesisanalytics.amazonaws.com"
            },
            "Action": "sts:AssumeRole"
          }
        ]
      }
      ```

------

      

   1. **在 “**权限**” 选项卡上，附上名为 AmazonS3 的亚马逊管理的策略。ReadOnlyAccess**这会为该角色授予权限以读取 对象。下面显示了此策略：

------
#### [ JSON ]

****  

      ```
      {
        "Version":"2012-10-17",		 	 	 
        "Statement": [
          {
            "Effect": "Allow",
            "Action": [
              "s3:Get*",
              "s3:List*"
            ],
            "Resource": "*"
          }
        ]
      }
      ```

------

## 步骤 2：将引用数据源添加到应用程序配置中
<a name="add-refdata-create-iamrole"></a>

在该步骤中，将引用数据源添加到应用程序配置中。要开始操作，需要以下信息：
+ S3 存储桶名称和对象键名称
+ IAM 角色的 Amazon 资源名称 (ARN)

1. 在应用程序主页面中，选择 **Connect reference data (连接引用数据)**。

1. 在**连接引用数据来源**页面上，选择包含引用数据对象的 Amazon S3 存储桶，并输入对象的键名称。

1. 在**应用程序内部引用表名称**处输入 **CompanyName**。

1. 在**对所选资源的访问权限部分，选择****从 Kinesis Analytics 可以担任的 IAM 角色**中选择，然后选择您在上一节中创建的 **KinesisAnalytics-reads3Object** IAM 角色。

1. 选择 **发现架构**。控制台检测到引用数据中的两列。

1. 选择**保存并关闭**。

## 步骤 3：测试：查询应用程序内部引用表
<a name="add-refdata-test"></a>

现在，您可以查询应用程序内部引用表 `CompanyName`。您可以联接股票价格数据和引用表以使用引用信息扩充应用程序。结果显示公司名称。

1. 用以下内容替换您的应用程序代码。查询会联接应用程序内部输入流和应用程序内部引用表。应用程序代码将结果写入到另一应用程序内部流 `DESTINATION_SQL_STREAM`。

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE);
   
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
      SELECT STREAM ticker_symbol, "c"."Company", sector, change, price
      FROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c"
      ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
   ```

1. 验证应用程序输出是否出现在**SQLResults**选项卡中。确保某些行显示公司名称 (示例引用数据不包含所有公司名称)。

# 示例：机器学习
<a name="examples-machine"></a>

此部分提供了使用机器学习查询的 Amazon Kinesis Data Analytics 应用程序示例。机器学习查询对数据执行复杂的分析，同时依靠流中数据的历史记录以查找异常模式。这些示例提供了设置和测试 Kinesis Data Analytics 应用程序的 step-by-step说明。

**Topics**
+ [示例：在流中检测数据异常情况 (RANDOM\$1CUT\$1FOREST 函数)](app-anomaly-detection.md)
+ [示例：检测数据异常和获取说明 (RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION 函数)](app-anomaly-detection-with-explanation.md)
+ [示例：检测流上的热点 (HOTSPOTS 函数)](app-hotspots-detection.md)

# 示例：在流中检测数据异常情况 (RANDOM\$1CUT\$1FOREST 函数)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics 提供了一个函数 (`RANDOM_CUT_FOREST`)，它可以根据数值列中的值将异常分数分配给每个记录。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[`RANDOM_CUT_FOREST` 函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)。

在本练习中，您将编写应用程序代码以将异常分数分配给应用程序的流式传输源中的记录。要设置应用程序，请执行以下操作：

1. **设置流式源** - 您设置 Kinesis 数据流并编写示例 `heartRate` 数据，如下所示：

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   此过程提供用于填充流的 Python 脚本。`heartRate` 值将随机生成，99% 的记录具有的 `heartRate` 值介于 60 和 100 之间，仅 1% 的记录具有的 `heartRate` 值介于 150 和 200 之间。因此，`heartRate` 值介于 150 和 200 之间的记录是异常情况。

1. **配置输入** - 通过使用控制台，您创建 Kinesis Data Analytics 应用程序，并通过将流式源映射到应用程序内部流 (`SOURCE_SQL_STREAM_001`) 来配置应用程序输入。在应用程序启动时，Kinesis Data Analytics 持续读取流式传输源，并将记录插入到应用程序内部流中。

1. **指定应用程序代码** - 示例使用以下应用程序代码：

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   此代码读取 `SOURCE_SQL_STREAM_001` 中的行，分配异常分数，并将结果行写入另一个应用程序内部流 (`TEMP_STREAM`)。随后，应用程序代码将对 `TEMP_STREAM` 中的记录进行排序，并将结果保存到另一个应用程序内部流 (`DESTINATION_SQL_STREAM`)。您使用数据泵将流插入到应用程序内部流。有关更多信息，请参阅 [应用程序内部流和数据泵](streams-pumps.md)。

1. **配置输出** - 您配置应用程序输出以将 `DESTINATION_SQL_STREAM` 中的数据保存到外部目标 (另一个 Kinesis 数据流)。查看分配给每条记录的异常分数并确定哪个分数指示应用程序外部的异常情况 (您需要收到这些异常情况的警报)。您可以使用 AWS Lambda 函数来处理这些异常分数并配置警报。

此练习使用美国东部 (弗吉尼亚州北部) (`us-east-1`) 来创建这些流和您的应用程序。如果您使用任何其他区域，则必须相应地更新代码。

**Topics**
+ [步骤 1：准备](app-anomaly-prepare.md)
+ [步骤 2：创建应用程序](app-anom-score-create-app.md)
+ [步骤 3：配置应用程序输出](app-anomaly-create-ka-app-config-destination.md)
+ [步骤 4：验证输出](app-anomaly-verify-output.md)

**下一个步骤**  
[步骤 1：准备](app-anomaly-prepare.md)

# 步骤 1：准备
<a name="app-anomaly-prepare"></a>

在为本练习创建 Amazon Kinesis Data Analytics 应用程序之前，您必须创建两个 Kinesis 数据流。将一个流配置为应用程序的流式传输源，并将另一个流配置为目标（Kinesis Data Analytics 在其中永久保存应用程序输出）。

**Topics**
+ [步骤 1.1：创建输入和输出数据流](#app-anomaly-create-two-streams)
+ [步骤 1.2：将示例记录写入输入流](#app-anomaly-write-sample-records-inputstream)

## 步骤 1.1：创建输入和输出数据流
<a name="app-anomaly-create-two-streams"></a>

在此部分中，您创建两个 Kinesis 流：`ExampleInputStream` 和 `ExampleOutputStream`。您可以使用 AWS 管理控制台 或 AWS CLI创建这些流。
+ 

**要使用 控制台**

  1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

  1. 选择**创建数据流**。创建带有一个名为 `ExampleInputStream` 的分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

  1. 重复上一步骤以创建带有一个名为 `ExampleOutputStream` 的分片的流。
+ 

**要使用 AWS CLI**

  1. 使用以下 Kinesis `create-stream` AWS CLI 命令创建第一个直播 () `ExampleInputStream`。

     ```
     $ aws kinesis create-stream \
     --stream-name ExampleInputStream \
     --shard-count 1 \
     --region us-east-1 \
     --profile adminuser
     ```

  1. 运行同一命令，同时将流名称更改为 `ExampleOutputStream`。此命令创建应用程序用来写入输出的第二个流。

## 步骤 1.2：将示例记录写入输入流
<a name="app-anomaly-write-sample-records-inputstream"></a>

在此步骤中，您运行 Python 代码以持续生成示例记录，并将这些记录写入 `ExampleInputStream` 流。

```
{"heartRate": 60, "rateType":"NORMAL"} 
...
{"heartRate": 180, "rateType":"HIGH"}
```

1. 安装 Python 和 `pip`。

   有关安装 Python 的信息，请访问 [Python](https://www.python.org/) 网站。

   您可以使用 pip 安装依赖项。有关安装 pip 的信息，请参阅 pip 网站上的[安装](https://pip.pypa.io/en/stable/installing/)。

1. 运行以下 Python 代码。代码中的 `put-record` 命令将 JSON 记录写入到流。

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class RateType(Enum):
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_heart_rate(rate_type):
       if rate_type == RateType.normal:
           rate = random.randint(60, 100)
       elif rate_type == RateType.high:
           rate = random.randint(150, 200)
       else:
           raise TypeError
       return {"heartRate": rate, "rateType": rate_type.value}
   
   
   def generate(stream_name, kinesis_client, output=True):
       while True:
           rnd = random.random()
           rate_type = RateType.high if rnd < 0.01 else RateType.normal
           heart_rate = get_heart_rate(rate_type)
           if output:
               print(heart_rate)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(heart_rate),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**下一个步骤**  
[步骤 2：创建应用程序](app-anom-score-create-app.md)

# 步骤 2：创建应用程序
<a name="app-anom-score-create-app"></a>

在此部分中，您创建一个 Amazon Kinesis Data Analytics 应用程序，如下所示：
+ 配置应用程序输入以将在 [步骤 1：准备](app-anomaly-prepare.md) 中创建的 Kinesis 数据流作为流式传输源。
+ 在控制台上使用 **Anomaly Detection (异常检测)** 模板。

**创建应用程序**

1. 按照 Kinesis Data Analytics **入门**练习中的步骤 1、2 和 3（请参阅 [步骤 3.1：创建应用程序](get-started-create-app.md)）。
   + 在源配置中，执行以下操作：
     + 指定您在上一部分中创建的流式传输源。
     + 在控制台推断架构后，编辑架构并将 `heartRate` 列类型设置为 `INTEGER`。

       大多数心率值是正常的，发现过程最有可能将 `TINYINT` 类型分配给此列。但有小部分值显示了高心率。如果这些较高的值不适合 `TINYINT` 类型，则 Kinesis Data Analytics 会将这些行发送到错误流。将数据类型更新为 `INTEGER`，以便能适合所有生成的心率数据。
   + 在控制台上使用 **Anomaly Detection (异常检测)** 模板。随后，您更新模板代码以提供适当的列名称。

1. 通过提供列名称来更新应用程序代码。下面显示了生成的应用程序代码 (将此代码粘贴到 SQL 编辑器中)：

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   

1. 运行 SQL 代码并在 Kinesis Data Analytics 控制台中检查结果：  
![\[控制台屏幕截图，显示 Real-time analytics (实时分析) 选项卡以及应用程序内部流中的结果数据。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/anom-v2-40.png)





**下一个步骤**  
[步骤 3：配置应用程序输出](app-anomaly-create-ka-app-config-destination.md)

# 步骤 3：配置应用程序输出
<a name="app-anomaly-create-ka-app-config-destination"></a>

完成[步骤 2：创建应用程序](app-anom-score-create-app.md)后，您已拥有用于从流式传输源读取心率数据并为每条记录分配一个异常分数的应用程序代码。

您现在可以将应用程序内部流中的应用程序结果发送到外部目标，这是另一个 Kinesis 数据流 (`OutputStreamTestingAnomalyScores`)。您可以分析异常分数并确定哪种心率是异常的。然后，您可以进一步扩展此应用程序以生成警报。

执行以下步骤可配置应用程序输出：



1. 打开 Amazon Kinesis Data Analytics 控制台。在 SQL 编辑器中，在应用程序控制面板中选择 **Destination** 或 **Add a destination**。

1. 在 **Connect to destination (连接到目标)** 页面中，选择您在上一部分中创建的 `OutputStreamTestingAnomalyScores` 流。

   现在，您具有一个外部目标，Amazon Kinesis Data Analytics 将应用程序写入到应用程序内部流 `DESTINATION_SQL_STREAM` 的任何记录永久保存到该目标中。

1. 您可以选择配置 AWS Lambda 为监控`OutputStreamTestingAnomalyScores`直播并向您发送警报。有关说明，请参阅[使用 Lambda 函数预处理数据](lambda-preprocessing.md)。如果未设置警报，您可以查看 Kinesis Data Analytics 写入到外部目标（Kinesis 数据流 `OutputStreamTestingAnomalyScores`）的记录，如 [步骤 4：验证输出](app-anomaly-verify-output.md) 中所述。

**下一个步骤**  
[步骤 4：验证输出](app-anomaly-verify-output.md)

# 步骤 4：验证输出
<a name="app-anomaly-verify-output"></a>

在[步骤 3：配置应用程序输出](app-anomaly-create-ka-app-config-destination.md)中配置应用程序输出后，使用以下 AWS CLI 命令读取应用程序在目标流中写入的记录。

1. 运行 `get-shard-iterator` 命令以获取指向输出流中的数据的指针。

   ```
   aws kinesis get-shard-iterator \
   --shard-id shardId-000000000000 \
   --shard-iterator-type TRIM_HORIZON \
   --stream-name OutputStreamTestingAnomalyScores \
   --region us-east-1 \
   --profile adminuser
   ```

   您将获得带分片迭代器值的响应，如以下示例响应中所示：

   ```
     {      
         "ShardIterator":
         "shard-iterator-value"   }
   ```

   复制该分片迭代器值。

1. 运行 AWS CLI `get-records` 命令。

   ```
   aws kinesis get-records \
   --shard-iterator shared-iterator-value \
   --region us-east-1 \
   --profile adminuser
   ```

   此命令将返回一页记录和另一个分片迭代器，您可在后续 `get-records` 命令中使用该迭代器来提取下一组记录。

# 示例：检测数据异常和获取说明 (RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION 函数)
<a name="app-anomaly-detection-with-explanation"></a>

Amazon Kinesis Data Analytics 提供了 `RANDOM_CUT_FOREST_WITH_EXPLANATION` 函数，该函数根据数值列中的值为每个记录分配一个异常分数。该函数还能提供异常说明。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的 [RANDOM\$1CUT\$1FOREST\$1WITH\$1EXPLANATION](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-random-cut-forest-with-explanation.html)。

在本练习中，您将编写应用程序代码，以获取应用程序流式传输源中记录的异常分数。以及获取每个异常的说明。

**Topics**
+ [步骤 1：准备数据](app-anomaly-with-ex-prepare.md)
+ [步骤 2：创建分析应用程序](app-anom-with-exp-create-app.md)
+ [步骤 3：检查结果](examine-results-with-exp.md)

**第一步**  
[步骤 1：准备数据](app-anomaly-with-ex-prepare.md)

# 步骤 1：准备数据
<a name="app-anomaly-with-ex-prepare"></a>

在为此[示例](app-anomaly-detection-with-explanation.md)创建 Amazon Kinesis Data Analytics 应用程序前，需要先创建一个 Kinesis 数据流，以作为应用程序的流式传输源。另外，您还需运行 Python 代码来将模拟血压数据写入流中。

**Topics**
+ [步骤 1.1：创建 Kinesis 数据流](#app-anomaly-create-two-streams)
+ [步骤 1.2：将示例记录写入输入流](#app-anomaly-write-sample-records-inputstream)

## 步骤 1.1：创建 Kinesis 数据流
<a name="app-anomaly-create-two-streams"></a>

在此部分中，您创建一个名为 `ExampleInputStream` 的 Kinesis 数据流。您可以使用 AWS 管理控制台 或创建此数据流 AWS CLI。
+ 使用控制台：

  1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

  1. 在导航窗格中，选择 **数据流**。然后选择**创建 Kinesis 流**。

  1. 对于名称，请键入 **ExampleInputStream**。对于分片数，请键入 **1**。
+ 或者， AWS CLI 要使用创建数据流，请运行以下命令：

  ```
  $ aws kinesis create-stream --stream-name ExampleInputStream --shard-count 1
  ```

## 步骤 1.2：将示例记录写入输入流
<a name="app-anomaly-write-sample-records-inputstream"></a>

在此步骤中，您运行 Python 代码以不断生成示例记录并将其写入您创建的数据流中。

1. 安装 Python 和 pip。

   有关安装 Python 的信息，请参阅 [Python](https://www.python.org/)。

   您可以使用 pip 安装依赖项。有关安装 pip 的信息，请参阅 pip 文档中的[安装](https://pip.pypa.io/en/stable/installing/)。

1. 运行以下 Python 代码。可以将区域改为您要用于此示例的区域。代码中的 `put-record` 命令将 JSON 记录写入到流。

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class PressureType(Enum):
       low = "LOW"
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_blood_pressure(pressure_type):
       pressure = {"BloodPressureLevel": pressure_type.value}
       if pressure_type == PressureType.low:
           pressure["Systolic"] = random.randint(50, 80)
           pressure["Diastolic"] = random.randint(30, 50)
       elif pressure_type == PressureType.normal:
           pressure["Systolic"] = random.randint(90, 120)
           pressure["Diastolic"] = random.randint(60, 80)
       elif pressure_type == PressureType.high:
           pressure["Systolic"] = random.randint(130, 200)
           pressure["Diastolic"] = random.randint(90, 150)
       else:
           raise TypeError
       return pressure
   
   
   def generate(stream_name, kinesis_client):
       while True:
           rnd = random.random()
           pressure_type = (
               PressureType.low
               if rnd < 0.005
               else PressureType.high
               if rnd > 0.995
               else PressureType.normal
           )
           blood_pressure = get_blood_pressure(pressure_type)
           print(blood_pressure)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(blood_pressure),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```

**下一个步骤**  
[步骤 2：创建分析应用程序](app-anom-with-exp-create-app.md)

# 步骤 2：创建分析应用程序
<a name="app-anom-with-exp-create-app"></a>

在本节中，您将创建一个 Amazon Kinesis Data Analytics 应用程序，然后将其配置为使用在 [步骤 1：准备数据](app-anomaly-with-ex-prepare.md) 中作为流式传输源创建的 Kinesis 数据流。然后，运行使用 `RANDOM_CUT_FOREST_WITH_EXPLANATION` 函数的应用程序代码。

**创建应用程序**

1. [在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. 在导航窗格中选择 **Data Analytics (数据分析)**，然后选择**创建应用程序**。

1. 提供应用程序名称和描述 (可选)，并选择 **Create application**。

1. 选择 **Connect 流式传输数据**，然后**ExampleInputStream**从列表中进行选择。

1. 选择 **Discover schema**，并确保 `Systolic` 和 `Diastolic` 显示为 `INTEGER` 列。如果二者为另一种类型，则选择 **Edit schema**，并将 `INTEGER` 类型分配给二者。

1. 在 **Real time analytics** 下，选择 **Go to SQL editor**。出现提示时，选择运行您的应用程序。

1. 将以下代码粘贴到 SQL 编辑器中，然后选择 **Save and run SQL**。

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "Systolic"                  INTEGER,
   	        "Diastolic"                 INTEGER,
   	        "BloodPressureLevel"        varchar(20),
   	        "ANOMALY_SCORE"             DOUBLE,
   	        "ANOMALY_EXPLANATION"       varchar(512));
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "Systolic"                  INTEGER,
   	        "Diastolic"                 INTEGER,
   	        "BloodPressureLevel"        varchar(20),
   	        "ANOMALY_SCORE"             DOUBLE,
   	        "ANOMALY_EXPLANATION"       varchar(512));
   
   -- Compute an anomaly score with explanation for each record in the input stream
   -- using RANDOM_CUT_FOREST_WITH_EXPLANATION
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "Systolic", "Diastolic", "BloodPressureLevel", ANOMALY_SCORE, ANOMALY_EXPLANATION 
         FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

**下一个步骤**  
[步骤 3：检查结果](examine-results-with-exp.md)

# 步骤 3：检查结果
<a name="examine-results-with-exp"></a>

当您对此[示例](app-anomaly-detection-with-explanation.md)运行 SQL 代码时，首先会看到异常分数等于零的行。这种情况发生在初始学习阶段。然后，您会得到类似如下的结果：

```
ROWTIME SYSTOLIC DIASTOLIC BLOODPRESSURELEVEL ANOMALY_SCORE ANOMALY_EXPLANATION
27:49.0	101      66        NORMAL             0.711460417   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0922","ATTRIBUTION_SCORE":"0.3792"},"Diastolic":{"DIRECTION":"HIGH","STRENGTH":"0.0210","ATTRIBUTION_SCORE":"0.3323"}}
27:50.0	144      123       HIGH               3.855851061   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.8567","ATTRIBUTION_SCORE":"1.7447"},"Diastolic":{"DIRECTION":"HIGH","STRENGTH":"7.0982","ATTRIBUTION_SCORE":"2.1111"}}
27:50.0	113      69        NORMAL             0.740069409   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0549","ATTRIBUTION_SCORE":"0.3750"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0394","ATTRIBUTION_SCORE":"0.3650"}}
27:50.0	105      64        NORMAL             0.739644157   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.0245","ATTRIBUTION_SCORE":"0.3667"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0524","ATTRIBUTION_SCORE":"0.3729"}}
27:50.0	100      65        NORMAL             0.736993425   {"Systolic":{"DIRECTION":"HIGH","STRENGTH":"0.0203","ATTRIBUTION_SCORE":"0.3516"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0454","ATTRIBUTION_SCORE":"0.3854"}}
27:50.0	108      69        NORMAL             0.733767202   {"Systolic":{"DIRECTION":"LOW","STRENGTH":"0.0974","ATTRIBUTION_SCORE":"0.3961"},"Diastolic":{"DIRECTION":"LOW","STRENGTH":"0.0189","ATTRIBUTION_SCORE":"0.3377"}}
```
+ `RANDOM_CUT_FOREST_WITH_EXPLANATION` 函数中的算法看到 `Systolic` (收缩压) 和 `Diastolic` (舒张压) 列为数字，于是将它们作为输入。
+ `BloodPressureLevel` 列包含文本数据，因此不会被算法所考虑。该列只是一个可视化助手，用来帮助您快速发现本例中的正常、高、低血压水平。
+ 在 `ANOMALY_SCORE` 列中，分数越高的记录越异常。此示例结果集中的第二个记录最异常，异常分数为 3.855851061。
+ 要了解算法所考虑的每个数字列在多大程度上造成异常评分，请参阅 `ATTRIBUTION_SCORE` 列中名为 `ANOMALY_SCORE` 的 JSON 字段。对于该示例结果集中的第二行，`Systolic` 和 `Diastolic` 列造成异常的比例为 1.7447:2.1111。换句话说，异常分数原因的 45% 归咎于收缩压值，55% 归咎于舒张压值。
+ 要确定此示例中第二行所代表的点的方向是否异常，请参阅名为 `DIRECTION` 的 JSON 字段。在本例中，舒张压和收缩压值均标记为 `HIGH`。要确定这些方向正确的置信度，请参阅名为 `STRENGTH` 的 JSON 字段。在此示例中，算法更加确信舒张值太高。事实上，舒张压读数的正常值通常为 60–80，而 123 远高于预期值。

# 示例：检测流上的热点 (HOTSPOTS 函数)
<a name="app-hotspots-detection"></a>

Amazon Kinesis Data Analytics 提供了 `HOTSPOTS` 函数，它可以查找并返回有关数据中的相对密集的区域的信息。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的 [HOTSPOTS](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-hotspots.html)。

在本练习中，您将编写应用程序代码以查找应用程序的流式传输源上的热点。要设置应用程序，请执行以下步骤：

1. **设置流式传输源** - 您设置 Kinesis 流并编写示例坐标数据，如下所示：

   ```
   {"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
   {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}
   ```

   本示例提供了用于填充流的 Python 脚本。`x` 和 `y` 值是随机生成的，一些记录集中在特定位置周围。

   如果脚本有意生成值作为热点的一部分，`is_hot` 字段将作为指示器提供。这可以帮助您评估热点检测函数是否正常运行。

1. **创建应用程序** – 使用 AWS 管理控制台，您随后创建一个 Kinesis Data Analytics 应用程序。通过将流式传输源映射到应用程序内部流 (`SOURCE_SQL_STREAM_001`) 来配置应用程序输入。在应用程序启动时，Kinesis Data Analytics 持续读取流式传输源，并将记录插入到应用程序内部流中。

   在本练习中，您将为应用程序使用以下代码：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   此代码读取 `SOURCE_SQL_STREAM_001` 中的行，分析它是否有大量热点，并将生成的数据写入到另一个应用程序内部流 (`DESTINATION_SQL_STREAM`)。您使用数据泵将流插入到应用程序内部流。有关更多信息，请参阅 [应用程序内部流和数据泵](streams-pumps.md)。

1. **配置输出** - 您配置应用程序输出以将应用程序中的数据发送到外部目标 (另一个 Kinesis 数据流)。查看热点分数并确定哪些分数表明出现了热点 (并且您需要收到警报)。您可以使用 AWS Lambda 函数进一步处理热点信息并配置警报。

1. **验证输出**-该示例包括一个 JavaScript 应用程序，该应用程序从输出流中读取数据并将其以图形方式显示，因此您可以实时查看该应用程序生成的热点。



本练习使用美国西部（俄勒冈州）(`us-west-2`) 区域创建这些流和您的应用程序。如果您使用任何其他区域，请相应地更新代码。

**Topics**
+ [步骤 1：创建输入和输出流](app-hotspots-prepare.md)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](app-hotspot-create-app.md)
+ [步骤 3：配置应用程序输出](app-hotspots-create-ka-app-config-destination.md)
+ [步骤 4：验证应用程序输出](app-hotspots-verify-output.md)

# 步骤 1：创建输入和输出流
<a name="app-hotspots-prepare"></a>

在为[热点示例](app-hotspots-detection.md)创建 Amazon Kinesis Data Analytics 应用程序之前，您必须创建两个 Kinesis 数据流。将一个流配置为应用程序的流式传输源，并将另一个流配置为目标（Kinesis Data Analytics 在其中永久保存应用程序输出）。

**Topics**
+ [步骤 1.1：创建 Kinesis 数据流](#app-hotspots-create-two-streams)
+ [步骤 1.2：将示例记录写入输入流](#app-hotspots-write-sample-records-inputstream)

## 步骤 1.1：创建 Kinesis 数据流
<a name="app-hotspots-create-two-streams"></a>

在此部分中，您创建两个 Kinesis 数据流：`ExampleInputStream` 和 `ExampleOutputStream`。

使用控制台或 AWS CLI创建这些数据流。
+ 使用控制台创建数据流：

  1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

  1. 在导航窗格中，选择 **数据流**。

  1. 选择**创建 Kinesis 流**，然后创建带有一个名为 `ExampleInputStream` 的分片的流。

  1. 重复上一步骤以创建带有一个名为 `ExampleOutputStream` 的分片的流。
+ 要使用 AWS CLI创建数据流，请执行以下操作：
  + 使用以下 Kinesis `create-stream` AWS CLI 命令创建直播（`ExampleInputStream`和`ExampleOutputStream`）。要创建另一个流 (应用程序将用于写入输出)，请运行同一命令以将流名称更改为 `ExampleOutputStream`。

    ```
    $ aws kinesis create-stream \
    --stream-name ExampleInputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
                             
    $ aws kinesis create-stream \
    --stream-name ExampleOutputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
    ```

## 步骤 1.2：将示例记录写入输入流
<a name="app-hotspots-write-sample-records-inputstream"></a>

在此步骤中，您运行 Python 代码以持续生成示例记录并将其写入 `ExampleInputStream` 流。

```
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
{"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
```

1. 安装 Python 和 `pip`。

   有关安装 Python 的信息，请访问 [Python](https://www.python.org/) 网站。

   您可以使用 pip 安装依赖项。有关安装 pip 的信息，请参阅 pip 网站上的[安装](https://pip.pypa.io/en/stable/installing/)。

1. 运行以下 Python 代码。此代码将执行以下操作：
   + 在 (X, Y) 平面上的某个位置生成潜在热点。
   + 为每个热点生成一系列点 (1000 个)。这些点中有 20% 集中在热点周围。其余的点在整个空间内随机生成。
   + `put-record` 命令将 JSON 记录写入到流。
**重要**  
请勿将此文件上传到 Web 服务器，因为它包含您的 AWS 凭证。

   ```
    
   import json
   from pprint import pprint
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_hotspot(field, spot_size):
       hotspot = {
           "left": field["left"] + random.random() * (field["width"] - spot_size),
           "width": spot_size,
           "top": field["top"] + random.random() * (field["height"] - spot_size),
           "height": spot_size,
       }
       return hotspot
   
   
   def get_record(field, hotspot, hotspot_weight):
       rectangle = hotspot if random.random() < hotspot_weight else field
       point = {
           "x": rectangle["left"] + random.random() * rectangle["width"],
           "y": rectangle["top"] + random.random() * rectangle["height"],
           "is_hot": "Y" if rectangle is hotspot else "N",
       }
       return {"Data": json.dumps(point), "PartitionKey": "partition_key"}
   
   
   def generate(
       stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client
   ):
       """
       Generates points used as input to a hotspot detection algorithm.
       With probability hotspot_weight (20%), a point is drawn from the hotspot;
       otherwise, it is drawn from the base field. The location of the hotspot
       changes for every 1000 points generated.
       """
       points_generated = 0
       hotspot = None
       while True:
           if points_generated % 1000 == 0:
               hotspot = get_hotspot(field, hotspot_size)
           records = [
               get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)
           ]
           points_generated += len(records)
           pprint(records)
           kinesis_client.put_records(StreamName=stream_name, Records=records)
   
           time.sleep(0.1)
   
   
   if __name__ == "__main__":
       generate(
           stream_name=STREAM_NAME,
           field={"left": 0, "width": 10, "top": 0, "height": 10},
           hotspot_size=1,
           hotspot_weight=0.2,
           batch_size=10,
           kinesis_client=boto3.client("kinesis"),
       )
   ```



**下一个步骤**  
[步骤 2：创建 Kinesis Data Analytics 应用程序](app-hotspot-create-app.md)

# 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="app-hotspot-create-app"></a>

在该[热点示例](app-hotspots-detection.md)的此部分中，您创建一个 Kinesis Data Analytics 应用程序，如下所示：
+ 配置应用程序输入以将您在[步骤 1 ](app-hotspots-prepare.md)中创建的 Kinesis 数据流用作流式传输源。
+ 在 AWS 管理控制台中使用提供的应用程序代码。

**创建应用程序**

1. 按照[入门](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)练习中的步骤 1、2 和 3（请参阅 [步骤 3.1：创建应用程序](get-started-create-app.md)）创建 Kinesis Data Analytics 应用程序。

   在源配置中，执行以下操作：
   + 指定您在[步骤 1：创建输入和输出流](app-hotspots-prepare.md)中创建的流式传输源。
   + 在控制台推断架构后编辑架构。确保 `x` 和 `y``DOUBLE` 列类型设置为 ，并确保 `IS_HOT` 列类型设置为 `VARCHAR`。

1. 使用以下应用程序代码 (您可以将此代码粘贴到 SQL 编辑器中)：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   

1. 运行 SQL 代码并审查结果。  
![\[显示行时间、热点和 hotspot_heat 的 SQL 代码结果。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/hotspot-v2-40.png)





**下一个步骤**  
[步骤 3：配置应用程序输出](app-hotspots-create-ka-app-config-destination.md)

# 步骤 3：配置应用程序输出
<a name="app-hotspots-create-ka-app-config-destination"></a>

在[热点示例](app-hotspots-detection.md)的此部分中，您使用 Amazon Kinesis Data Analytics 应用程序代码查找流式传输源中的大量热点并将热分数分配给每个热点。

您现在可以将应用程序内部流中的应用程序结果发送到外部目标，这是另一个 Kinesis 数据流 (`ExampleOutputStream`)。然后，您可以分析热点分数并为热点热确定合适的阈值。您可以进一步扩展此应用程序以生成警报。

**配置应用程序输出**

1. 在 /kinesisanalytics 上打开 Kinesis Data Analytics 控制台[ https://console.aws.amazon.com。](https://console.aws.amazon.com/kinesisanalytics)

1. 在 SQL 编辑器中，在应用程序控制面板中选择 **Destination** 或 **Add a destination**。

1. 在 **Add a destination (添加目标)** 页面上，选择 **Select from your streams (从流中选择)**。然后选择在上一部分中创建的 `ExampleOutputStream` 流。

   现在，您具有一个外部目标，Amazon Kinesis Data Analytics 将应用程序写入到应用程序内部流 `DESTINATION_SQL_STREAM` 的任何记录永久保存到该目标中。

1. 您可以选择配置 AWS Lambda 为监控`ExampleOutputStream`直播并向您发送警报。有关更多信息，请参阅 [使用 Lambda 函数作为输出](how-it-works-output-lambda.md)。您还可以查看 Kinesis Data Analytics 写入到外部目标（Kinesis 流 `ExampleOutputStream`）的记录，如 [步骤 4：验证应用程序输出](app-hotspots-verify-output.md) 中所述。

**下一个步骤**  
[步骤 4：验证应用程序输出](app-hotspots-verify-output.md)

# 步骤 4：验证应用程序输出
<a name="app-hotspots-verify-output"></a>

在[热点示例](app-hotspots-detection.md)的此部分中，您将设置一个 Web 应用程序，该应用程序在可扩展矢量图形 (SVG) 控件中显示热点信息。

1. 使用以下内容创建名为 `index.html` 的文件：

   ```
   <!doctype html>
   <html lang=en>
   <head>
       <meta charset=utf-8>
       <title>hotspots viewer</title>
   
       <style>
       #visualization {
         display: block;
         margin: auto;
       }
   
       .point {
         opacity: 0.2;
       }
   
       .hot {
         fill: red;
       }
   
       .cold {
         fill: blue;
       }
   
       .hotspot {
         stroke: black;
         stroke-opacity: 0.8;
         stroke-width: 1;
         fill: none;
       }
       </style>
       <script src="https://sdk.amazonaws.com/js/aws-sdk-2.202.0.min.js"></script>
       <script src="https://d3js.org/d3.v4.min.js"></script>
   </head>
   <body>
   <svg id="visualization" width="600" height="600"></svg>
   <script src="hotspots_viewer.js"></script>
   </body>
   </html>
   ```

1. 在同一目录中创建一个名为 `hotspots_viewer.js` 的文件，该文件包含以下内容。在提供的变量中包含您的 、凭证和输出流名称。

   ```
   // Visualize example output from the Kinesis Analytics hotspot detection algorithm.
   // This script assumes that the output stream has a single shard.
   
   // Modify this section to reflect your AWS configuration
   var awsRegion = "",        // The  where your Kinesis Analytics application is configured.
       accessKeyId = "",      // Your Access Key ID
       secretAccessKey = "",  // Your Secret Access Key
       outputStream = "";     // The name of the Kinesis Stream where the output from the HOTSPOTS function is being written
   
   // The variables in this section should reflect way input data was generated and the parameters that the HOTSPOTS
   // function was called with.
   var windowSize = 1000, // The window size used for hotspot detection
       minimumDensity = 40,  // A filter applied to returned hotspots before visualization
       xRange = [0, 10],  // The range of values to display on the x-axis
       yRange = [0, 10];  // The range of values to display on the y-axis
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // D3 setup
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var svg = d3.select("svg"),
       margin = {"top": 20, "right": 20, "bottom": 20, "left": 20},
       graphWidth = +svg.attr("width") - margin.left - margin.right,
       graphHeight = +svg.attr("height") - margin.top - margin.bottom;
   
   // Return the linear function that maps the segment [a, b] to the segment [c, d].
   function linearScale(a, b, c, d) {
       var m = (d - c) / (b - a);
       return function(x) {
           return c + m * (x - a);
       };
   }
   
   // helper functions to extract the x-value from a stream record and scale it for output
   var xValue = function(r) { return r.x; },
       xScale = linearScale(xRange[0], xRange[1], 0, graphWidth),
       xMap = function(r) { return xScale(xValue(r)); };
   
   // helper functions to extract the y-value from a stream record and scale it for output
   var yValue = function(r) { return r.y; },
       yScale = linearScale(yRange[0], yRange[1], 0, graphHeight),
       yMap = function(r) { return yScale(yValue(r)); };
   
   // a helper function that assigns a CSS class to a point based on whether it was generated as part of a hotspot
   var classMap = function(r) { return r.is_hot == "Y" ? "point hot" : "point cold"; };
   
   var g = svg.append("g")
       .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
   
   function update(records, hotspots) {
   
       var points = g.selectAll("circle")
           .data(records, function(r) { return r.dataIndex; });
   
       points.enter().append("circle")
           .attr("class", classMap)
           .attr("r", 3)
           .attr("cx", xMap)
           .attr("cy", yMap);
   
       points.exit().remove();
   
       if (hotspots) {
           var boxes = g.selectAll("rect").data(hotspots);
   
           boxes.enter().append("rect")
               .merge(boxes)
               .attr("class", "hotspot")
               .attr("x", function(h) { return xScale(h.minValues[0]); })
               .attr("y", function(h) { return yScale(h.minValues[1]); })
               .attr("width", function(h) { return xScale(h.maxValues[0]) - xScale(h.minValues[0]); })
               .attr("height", function(h) { return yScale(h.maxValues[1]) - yScale(h.minValues[1]); });
   
           boxes.exit().remove();
       }
   }
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Use the AWS SDK to pull output records from Kinesis and update the visualization
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var kinesis = new AWS.Kinesis({
       "region": awsRegion,
       "accessKeyId": accessKeyId,
       "secretAccessKey": secretAccessKey
   });
   
   var textDecoder = new TextDecoder("utf-8");
   
   // Decode an output record into an object and assign it an index value
   function decodeRecord(record, recordIndex) {
       var record = JSON.parse(textDecoder.decode(record.Data));
       var hotspots_result = JSON.parse(record.HOTSPOTS_RESULT);
       record.hotspots = hotspots_result.hotspots
           .filter(function(hotspot) { return hotspot.density >= minimumDensity});
       record.index = recordIndex
       return record;
   }
   
   // Fetch a new records from the shard iterator, append them to records, and update the visualization
   function getRecordsAndUpdateVisualization(shardIterator, records, lastRecordIndex) {
       kinesis.getRecords({
           "ShardIterator": shardIterator
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var newRecords = data.Records.map(function(raw) { return decodeRecord(raw, ++lastRecordIndex); });
           newRecords.forEach(function(record) { records.push(record); });
   
           var hotspots = null;
           if (newRecords.length > 0) {
               hotspots = newRecords[newRecords.length - 1].hotspots;
           }
   
           while (records.length > windowSize) {
               records.shift();
           }
   
           update(records, hotspots);
   
           getRecordsAndUpdateVisualization(data.NextShardIterator, records, lastRecordIndex);
       });
   }
   
   // Get a shard iterator for the output stream and begin updating the visualization. Note that this script will only
   // read records from the first shard in the stream.
   function init() {
       kinesis.describeStream({
           "StreamName": outputStream
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var shardId = data.StreamDescription.Shards[0].ShardId;
   
           kinesis.getShardIterator({
               "StreamName": outputStream,
               "ShardId": shardId,
               "ShardIteratorType": "LATEST"
           }, function(err, data) {
               if (err) {
                   console.log(err, err.stack);
                   return;
               }
               getRecordsAndUpdateVisualization(data.ShardIterator, [], 0);
           })
       });
   }
   
   // Start the visualization
   init();
   ```

1. 在第一个部分中 Python 代码运行时，在 Web 浏览器中打开 `index.html`。热点信息显示在页面上，如下所示。

     
![\[显示热点信息的可扩展矢量图形图表。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/hotspots_visualizer.png)

# 示例：警报和错误
<a name="examples-alerts"></a>

此部分提供使用警报和错误的 Kinesis Data Analytics 应用程序示例。每个示例都提供了 step-by-step说明和代码，可帮助您设置和测试 Kinesis Data Analytics 应用程序。

**Topics**
+ [示例：创建简单警报](app-simple-alerts.md)
+ [示例：创建受限警报](app-throttled-alerts.md)
+ [示例：探索应用程序内部错误流](app-explore-error-stream.md)

# 示例：创建简单警报
<a name="app-simple-alerts"></a>

在此 Kinesis Data Analytics 应用程序中，将在通过演示流创建的应用程序内部流上持续运行查询。有关更多信息，请参阅 [连续查询](continuous-queries-concepts.md)。

如果任何行显示股票价格变动大于 1%，这些行将被插入另一个应用程序内部流中。在本练习中，可以将应用程序输出配置为将结果保存到外部目标。随后，可以进一步调查结果。例如，您可以使用 AWS Lambda 函数来处理记录并向您发送警报。

**创建简单的警报应用程序**

1. 创建分析应用程序，如 Kinesis Data Analytics [入门](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)练习中所述。

1. 在 Kinesis Data Analytics 上的 SQL 编辑器中，将应用程序代码替换为以下内容：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" 
              (ticker_symbol VARCHAR(4), 
               sector        VARCHAR(12), 
               change        DOUBLE, 
               price         DOUBLE);
   
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM ticker_symbol, sector, change, price 
         FROM   "SOURCE_SQL_STREAM_001"
         WHERE  (ABS(Change / (Price - Change)) * 100) > 1;
   ```

   应用程序代码中的 `SELECT` 语句可在 `SOURCE_SQL_STREAM_001` 中筛选出股票价格变化大于 1% 的行。之后，该代码将使用数据泵将这些行插入到另一个应用程序内部流 `DESTINATION_SQL_STREAM`。有关说明使用数据泵将行插入应用程序内部流中的编码模式的更多信息，请参阅[应用程序代码](how-it-works-app-code.md)。

1. 选择 **保存并运行 SQL**。

1. 添加一个目标。为此，请在 SQL 编辑器中选择 **Destination (目标)**，也可以在应用程序中心中选择 **Add a destination (添加目标)**。

   1. 在 SQL 编辑器中，选择 **Destination (目标)** 选项卡，然后选择 **Connect to a destination (连接到目标)**。

      在 **Connect to destination (连接到目标)** 页面中，选择 **Create New (新建)**。

   1. 选择 **Go to Kinesis Streams**。

   1. 在 Amazon Kinesis Data Streams 控制台中，创建具有一个分片的新 Kinesis 流（例如，`gs-destination`）。请等待，直到流状态为 **ACTIVE**。

   1. 返回 Kinesis Data Analytics 控制台。在 **Connect to destination (连接到目标)** 页面上，选择您创建的流。

      如果流未显示，请刷新页面。

   1. 选择 **保存并继续**。

   现在，您具有一个外部目标（Kinesis 数据流），Kinesis Data Analytics 将 `DESTINATION_SQL_STREAM` 应用程序内部流中的应用程序输出永久保存到该目标中。

1. 配置 AWS Lambda 为监控您创建的 Kinesis 流并调用 Lambda 函数。

   有关说明，请参阅[使用 Lambda 函数预处理数据](lambda-preprocessing.md)。

# 示例：创建受限警报
<a name="app-throttled-alerts"></a>

在此 Kinesis Data Analytics 应用程序中，将在通过演示流创建的应用程序内部流上持续运行查询。有关更多信息，请参阅 [连续查询](continuous-queries-concepts.md)。如果任何行显示股票价格变动大于 1%，这些行将被插入另一个应用程序内部流中。应用程序会限制警报，以便在股票价格变化时立即发送警报。但每个股票代号每分钟向应用程序内部流发送的警报不超过一个。

**创建受限警报应用程序**

1. 创建一个 Kinesis Data Analytics 应用程序，如 Kinesis Data Analytics [入门](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)练习中所述。

1. 在 Kinesis Data Analytics 上的 SQL 编辑器中，将应用程序代码替换为以下内容：

   ```
   CREATE OR REPLACE STREAM "CHANGE_STREAM" 
              (ticker_symbol VARCHAR(4), 
               sector        VARCHAR(12), 
               change        DOUBLE, 
               price         DOUBLE);
   
   CREATE OR REPLACE PUMP "change_pump" AS 
      INSERT INTO "CHANGE_STREAM"
         SELECT STREAM ticker_symbol, sector, change, price 
         FROM   "SOURCE_SQL_STREAM_001"
         WHERE  (ABS(Change / (Price - Change)) * 100) > 1;
         
   -- ** Trigger Count and Limit **
   -- Counts "triggers" or those values that evaluated true against the previous where clause
   -- Then provides its own limit on the number of triggers per hour per ticker symbol to what
   -- is specified in the WHERE clause
   
   CREATE OR REPLACE STREAM TRIGGER_COUNT_STREAM (
      ticker_symbol VARCHAR(4), 
      change REAL,
      trigger_count INTEGER);
   
   CREATE OR REPLACE PUMP trigger_count_pump AS INSERT INTO TRIGGER_COUNT_STREAM
   SELECT STREAM ticker_symbol, change, trigger_count
   FROM (
       SELECT STREAM ticker_symbol, change, COUNT(*) OVER W1 as trigger_count
       FROM "CHANGE_STREAM"
       --window to perform aggregations over last minute to keep track of triggers
       WINDOW W1 AS (PARTITION BY ticker_symbol RANGE INTERVAL '1' MINUTE PRECEDING)
   )
   WHERE trigger_count >= 1;
   ```

   应用程序代码中的 `SELECT` 语句将在 `SOURCE_SQL_STREAM_001` 中筛选出显示股票价格更改大于 1% 的行，并使用数据泵将这些行插入另一个应用程序内部流 `CHANGE_STREAM`。

   然后，应用程序为受限警报创建第二个名为 `TRIGGER_COUNT_STREAM` 的流。第二个查询从一个窗口中选择记录，每次记录被允许进入该窗口时，该窗口都向前跳，以便每个股票报价每分钟只有一个记录被写入到流中。

1. 选择 **保存并运行 SQL**。

该示例将流输出到与以下内容类似的 `TRIGGER_COUNT_STREAM`：

![\[控制台屏幕截图，显示包含股票代号、百分比变化和触发器计数列的输出流。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/ex-throttle-alerts.png)


# 示例：探索应用程序内部错误流
<a name="app-explore-error-stream"></a>

Amazon Kinesis Data Analytics 为您创建的每个应用程序提供一个应用程序内部错误流。应用程序无法处理的所有行将发送到此错误流。您可以考虑将错误流数据保存到外部目标以便于调查。

您将在控制台中执行以下练习。在这些示例中，您将通过编辑由发现过程推断的架构将错误引入输入配置中，然后验证已发送到错误流的行。

**Topics**
+ [引入分析错误](#intro-error-parse-error)
+ [引入被零除错误](#intro-error-divide-zero)

## 引入分析错误
<a name="intro-error-parse-error"></a>

在本练习中，您引入了分析错误。

1. 创建一个 Kinesis Data Analytics 应用程序，如 Kinesis Data Analytics [入门](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)练习中所述。

1. 在应用程序详细信息页面上，选择 **连接流数据**。

1. 如果已完成入门练习，则账户中会有一个演示流 (`kinesis-analytics-demo-stream`)。在 **Connect to source (连接到源)** 页面上，选择此演示流。

1. Kinesis Data Analytics 使用演示流中的示例推断它创建的应用程序内部输入流的架构。控制台在 **Formatted stream sample** 选项卡中显示推断的架构和示例数据。

1. 接下来，可以编辑架构并修改列类型以引入分析错误。选择 **Edit schema**。

1. 将 `TICKER_SYMBOL` 列类型从 `VARCHAR(4)` 更改为 `INTEGER`。

   由于创建的应用程序内部架构中的列类型无效，Kinesis Data Analytics 无法将数据添加到应用程序内部流中。而只能将行发送到错误流。

1. 选择 **Save schema**。

1. 选择 **Refresh schema samples**。

   请注意，**Formatted stream** 示例中没有行。不过，**Error stream** 选项卡将显示数据与错误消息。**Error stream** 选项卡将显示已发送到应用程序内部错误流的数据。

   由于已更改列数据类型，Kinesis Data Analytics 无法将数据添加到应用程序内部输入流中。而只能将数据发送到错误流。

## 引入被零除错误
<a name="intro-error-divide-zero"></a>

在本练习中，您将更新应用程序代码以引入运行时错误 (被零除)。请注意，Amazon Kinesis Data Analytics 将结果行发送到应用程序内部错误流，而不是发送到要将结果写入到的 `DESTINATION_SQL_STREAM` 应用程序内部错误流。



1. 创建一个 Kinesis Data Analytics 应用程序，如 Kinesis Data Analytics [入门](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)练习中所述。

   验证 **Real-time analytics** 选项卡上的结果，如下所示：

   Sour

1. 在应用程序代码中更新 `SELECT` 语句以引入被零除；例如：

   ```
   SELECT STREAM ticker_symbol, sector, change, (price / 0) as ProblemColumn
   FROM "SOURCE_SQL_STREAM_001"
   WHERE sector SIMILAR TO '%TECH%';
   ```

   

1. 运行 应用程序。

   由于出现被零除运行时错误，Kinesis Data Analytics 将行发送到应用程序内部错误流，而不是将结果写入到 `DESTINATION_SQL_STREAM` 中。在 **Real-time analytics (实时分析)** 选项卡上，选择错误流，随后应用程序内部错误流中将显示行。

# 示例：解决方案加速器
<a name="examples_solution"></a>

[AWS 解决方案网站](https://aws.amazon.com/solutions/)提供了 AWS CloudFormation 模板，您可以使用这些模板来快速创建完整的流数据解决方案。

可用模板如下：

## 实时洞察 AWS 账户 活动
<a name="examples_solution_activity"></a>

该解决方案可实时记录和可视化您的 AWS 账户资源访问和使用指标。有关更多信息，请参阅[实时 AWS 账户 活动见解](https://docs.aws.amazon.com/solutions/latest/real-time-insights-account-activity/welcome.html)。

## 使用 Kinesis Data Analytics 进行实时 AWS IoT 设备监控
<a name="examples_solution_iot"></a>

此解决方案实时收集、处理、分析和直观显示 IoT 设备连接和活动数据。有关更多信息，请参阅使用 [Kinesis Data Analytics 进行实时 AWS IoT 设备监控](https://docs.aws.amazon.com/solutions/latest/real-time-iot-device-monitoring-with-kinesis/welcome.html)。

## 使用 Kinesis Data Analytics 进行实时 Web 分析
<a name="examples_solution_web"></a>

此解决方案实时收集、处理、分析和直观显示网站点击流数据。有关更多信息，请参阅[使用 Kinesis Data Analytics 进行实时 Web 分析](https://docs.aws.amazon.com/solutions/latest/real-time-web-analytics-with-kinesis/welcome.html)。

## Amazon Connect 车辆解决方案
<a name="examples_solution_vehicle"></a>

此解决方案实时收集、处理、分析和直观显示车辆中的 IoT 数据。有关更多信息，请参阅 [Amazon Connect 车辆解决方案](https://docs.aws.amazon.com//solutions/latest/connected-vehicle-solution/welcome.html)。