

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 示例：在流中检测数据异常情况 (RANDOM\$1CUT\$1FOREST 函数)
<a name="app-anomaly-detection"></a>

Amazon Kinesis Data Analytics 提供了一个函数 (`RANDOM_CUT_FOREST`)，它可以根据数值列中的值将异常分数分配给每个记录。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的[`RANDOM_CUT_FOREST` 函数](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/analytics-sql-reference.html)。

在本练习中，您将编写应用程序代码以将异常分数分配给应用程序的流式传输源中的记录。要设置应用程序，请执行以下操作：

1. **设置流式源** - 您设置 Kinesis 数据流并编写示例 `heartRate` 数据，如下所示：

   ```
   {"heartRate": 60, "rateType":"NORMAL"}
   ...
   {"heartRate": 180, "rateType":"HIGH"}
   ```

   此过程提供用于填充流的 Python 脚本。`heartRate` 值将随机生成，99% 的记录具有的 `heartRate` 值介于 60 和 100 之间，仅 1% 的记录具有的 `heartRate` 值介于 150 和 200 之间。因此，`heartRate` 值介于 150 和 200 之间的记录是异常情况。

1. **配置输入** - 通过使用控制台，您创建 Kinesis Data Analytics 应用程序，并通过将流式源映射到应用程序内部流 (`SOURCE_SQL_STREAM_001`) 来配置应用程序输入。在应用程序启动时，Kinesis Data Analytics 持续读取流式传输源，并将记录插入到应用程序内部流中。

1. **指定应用程序代码** - 示例使用以下应用程序代码：

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   此代码读取 `SOURCE_SQL_STREAM_001` 中的行，分配异常分数，并将结果行写入另一个应用程序内部流 (`TEMP_STREAM`)。随后，应用程序代码将对 `TEMP_STREAM` 中的记录进行排序，并将结果保存到另一个应用程序内部流 (`DESTINATION_SQL_STREAM`)。您使用数据泵将流插入到应用程序内部流。有关更多信息，请参阅 [应用程序内部流和数据泵](streams-pumps.md)。

1. **配置输出** - 您配置应用程序输出以将 `DESTINATION_SQL_STREAM` 中的数据保存到外部目标 (另一个 Kinesis 数据流)。查看分配给每条记录的异常分数并确定哪个分数指示应用程序外部的异常情况 (您需要收到这些异常情况的警报)。您可以使用 AWS Lambda 函数来处理这些异常分数并配置警报。

此练习使用美国东部 (弗吉尼亚州北部) (`us-east-1`) 来创建这些流和您的应用程序。如果您使用任何其他区域，则必须相应地更新代码。

**Topics**
+ [步骤 1：准备](app-anomaly-prepare.md)
+ [步骤 2：创建应用程序](app-anom-score-create-app.md)
+ [步骤 3：配置应用程序输出](app-anomaly-create-ka-app-config-destination.md)
+ [步骤 4：验证输出](app-anomaly-verify-output.md)

**下一个步骤**  
[步骤 1：准备](app-anomaly-prepare.md)

# 步骤 1：准备
<a name="app-anomaly-prepare"></a>

在为本练习创建 Amazon Kinesis Data Analytics 应用程序之前，您必须创建两个 Kinesis 数据流。将一个流配置为应用程序的流式传输源，并将另一个流配置为目标（Kinesis Data Analytics 在其中永久保存应用程序输出）。

**Topics**
+ [步骤 1.1：创建输入和输出数据流](#app-anomaly-create-two-streams)
+ [步骤 1.2：将示例记录写入输入流](#app-anomaly-write-sample-records-inputstream)

## 步骤 1.1：创建输入和输出数据流
<a name="app-anomaly-create-two-streams"></a>

在此部分中，您创建两个 Kinesis 流：`ExampleInputStream` 和 `ExampleOutputStream`。您可以使用 AWS 管理控制台 或 AWS CLI创建这些流。
+ 

**要使用 控制台**

  1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

  1. 选择**创建数据流**。创建带有一个名为 `ExampleInputStream` 的分片的流。有关更多信息，请参阅 *Amazon Kinesis Data Streams 开发人员指南*中的[创建流](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)。

  1. 重复上一步骤以创建带有一个名为 `ExampleOutputStream` 的分片的流。
+ 

**要使用 AWS CLI**

  1. 使用以下 Kinesis `create-stream` AWS CLI 命令创建第一个直播 () `ExampleInputStream`。

     ```
     $ aws kinesis create-stream \
     --stream-name ExampleInputStream \
     --shard-count 1 \
     --region us-east-1 \
     --profile adminuser
     ```

  1. 运行同一命令，同时将流名称更改为 `ExampleOutputStream`。此命令创建应用程序用来写入输出的第二个流。

## 步骤 1.2：将示例记录写入输入流
<a name="app-anomaly-write-sample-records-inputstream"></a>

在此步骤中，您运行 Python 代码以持续生成示例记录，并将这些记录写入 `ExampleInputStream` 流。

```
{"heartRate": 60, "rateType":"NORMAL"} 
...
{"heartRate": 180, "rateType":"HIGH"}
```

1. 安装 Python 和 `pip`。

   有关安装 Python 的信息，请访问 [Python](https://www.python.org/) 网站。

   您可以使用 pip 安装依赖项。有关安装 pip 的信息，请参阅 pip 网站上的[安装](https://pip.pypa.io/en/stable/installing/)。

1. 运行以下 Python 代码。代码中的 `put-record` 命令将 JSON 记录写入到流。

   ```
    
   from enum import Enum
   import json
   import random
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   class RateType(Enum):
       normal = "NORMAL"
       high = "HIGH"
   
   
   def get_heart_rate(rate_type):
       if rate_type == RateType.normal:
           rate = random.randint(60, 100)
       elif rate_type == RateType.high:
           rate = random.randint(150, 200)
       else:
           raise TypeError
       return {"heartRate": rate, "rateType": rate_type.value}
   
   
   def generate(stream_name, kinesis_client, output=True):
       while True:
           rnd = random.random()
           rate_type = RateType.high if rnd < 0.01 else RateType.normal
           heart_rate = get_heart_rate(rate_type)
           if output:
               print(heart_rate)
           kinesis_client.put_record(
               StreamName=stream_name,
               Data=json.dumps(heart_rate),
               PartitionKey="partitionkey",
           )
   
   
   if __name__ == "__main__":
       generate(STREAM_NAME, boto3.client("kinesis"))
   ```



**下一个步骤**  
[步骤 2：创建应用程序](app-anom-score-create-app.md)

# 步骤 2：创建应用程序
<a name="app-anom-score-create-app"></a>

在此部分中，您创建一个 Amazon Kinesis Data Analytics 应用程序，如下所示：
+ 配置应用程序输入以将在 [步骤 1：准备](app-anomaly-prepare.md) 中创建的 Kinesis 数据流作为流式传输源。
+ 在控制台上使用 **Anomaly Detection (异常检测)** 模板。

**创建应用程序**

1. 按照 Kinesis Data Analytics **入门**练习中的步骤 1、2 和 3（请参阅 [步骤 3.1：创建应用程序](get-started-create-app.md)）。
   + 在源配置中，执行以下操作：
     + 指定您在上一部分中创建的流式传输源。
     + 在控制台推断架构后，编辑架构并将 `heartRate` 列类型设置为 `INTEGER`。

       大多数心率值是正常的，发现过程最有可能将 `TINYINT` 类型分配给此列。但有小部分值显示了高心率。如果这些较高的值不适合 `TINYINT` 类型，则 Kinesis Data Analytics 会将这些行发送到错误流。将数据类型更新为 `INTEGER`，以便能适合所有生成的心率数据。
   + 在控制台上使用 **Anomaly Detection (异常检测)** 模板。随后，您更新模板代码以提供适当的列名称。

1. 通过提供列名称来更新应用程序代码。下面显示了生成的应用程序代码 (将此代码粘贴到 SQL 编辑器中)：

   ```
   --Creates a temporary stream.
   CREATE OR REPLACE STREAM "TEMP_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   --Creates another stream for application output.	        
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
   	        "heartRate"        INTEGER,
   	        "rateType"         varchar(20),
   	        "ANOMALY_SCORE"    DOUBLE);
   
   -- Compute an anomaly score for each record in the input stream
   -- using Random Cut Forest
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "TEMP_STREAM"
         SELECT STREAM "heartRate", "rateType", ANOMALY_SCORE 
         FROM TABLE(RANDOM_CUT_FOREST(
                 CURSOR(SELECT STREAM * FROM "SOURCE_SQL_STREAM_001")));
   
   -- Sort records by descending anomaly score, insert into output stream
   CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM * FROM "TEMP_STREAM"
         ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;
   ```

   

1. 运行 SQL 代码并在 Kinesis Data Analytics 控制台中检查结果：  
![\[控制台屏幕截图，显示 Real-time analytics (实时分析) 选项卡以及应用程序内部流中的结果数据。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/anom-v2-40.png)





**下一个步骤**  
[步骤 3：配置应用程序输出](app-anomaly-create-ka-app-config-destination.md)

# 步骤 3：配置应用程序输出
<a name="app-anomaly-create-ka-app-config-destination"></a>

完成[步骤 2：创建应用程序](app-anom-score-create-app.md)后，您已拥有用于从流式传输源读取心率数据并为每条记录分配一个异常分数的应用程序代码。

您现在可以将应用程序内部流中的应用程序结果发送到外部目标，这是另一个 Kinesis 数据流 (`OutputStreamTestingAnomalyScores`)。您可以分析异常分数并确定哪种心率是异常的。然后，您可以进一步扩展此应用程序以生成警报。

执行以下步骤可配置应用程序输出：



1. 打开 Amazon Kinesis Data Analytics 控制台。在 SQL 编辑器中，在应用程序控制面板中选择 **Destination** 或 **Add a destination**。

1. 在 **Connect to destination (连接到目标)** 页面中，选择您在上一部分中创建的 `OutputStreamTestingAnomalyScores` 流。

   现在，您具有一个外部目标，Amazon Kinesis Data Analytics 将应用程序写入到应用程序内部流 `DESTINATION_SQL_STREAM` 的任何记录永久保存到该目标中。

1. 您可以选择配置 AWS Lambda 为监控`OutputStreamTestingAnomalyScores`直播并向您发送警报。有关说明，请参阅[使用 Lambda 函数预处理数据](lambda-preprocessing.md)。如果未设置警报，您可以查看 Kinesis Data Analytics 写入到外部目标（Kinesis 数据流 `OutputStreamTestingAnomalyScores`）的记录，如 [步骤 4：验证输出](app-anomaly-verify-output.md) 中所述。

**下一个步骤**  
[步骤 4：验证输出](app-anomaly-verify-output.md)

# 步骤 4：验证输出
<a name="app-anomaly-verify-output"></a>

在[步骤 3：配置应用程序输出](app-anomaly-create-ka-app-config-destination.md)中配置应用程序输出后，使用以下 AWS CLI 命令读取应用程序在目标流中写入的记录。

1. 运行 `get-shard-iterator` 命令以获取指向输出流中的数据的指针。

   ```
   aws kinesis get-shard-iterator \
   --shard-id shardId-000000000000 \
   --shard-iterator-type TRIM_HORIZON \
   --stream-name OutputStreamTestingAnomalyScores \
   --region us-east-1 \
   --profile adminuser
   ```

   您将获得带分片迭代器值的响应，如以下示例响应中所示：

   ```
     {      
         "ShardIterator":
         "shard-iterator-value"   }
   ```

   复制该分片迭代器值。

1. 运行 AWS CLI `get-records` 命令。

   ```
   aws kinesis get-records \
   --shard-iterator shared-iterator-value \
   --region us-east-1 \
   --profile adminuser
   ```

   此命令将返回一页记录和另一个分片迭代器，您可在后续 `get-records` 命令中使用该迭代器来提取下一组记录。