

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 示例：检测流上的热点 (HOTSPOTS 函数)
<a name="app-hotspots-detection"></a>

Amazon Kinesis Data Analytics 提供了 `HOTSPOTS` 函数，它可以查找并返回有关数据中的相对密集的区域的信息。有关更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL 参考*中的 [HOTSPOTS](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sqlrf-hotspots.html)。

在本练习中，您将编写应用程序代码以查找应用程序的流式传输源上的热点。要设置应用程序，请执行以下步骤：

1. **设置流式传输源** - 您设置 Kinesis 流并编写示例坐标数据，如下所示：

   ```
   {"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
   {"x": 0.722248626528026, "y": 4.648868803193405, "is_hot": "Y"}
   ```

   本示例提供了用于填充流的 Python 脚本。`x` 和 `y` 值是随机生成的，一些记录集中在特定位置周围。

   如果脚本有意生成值作为热点的一部分，`is_hot` 字段将作为指示器提供。这可以帮助您评估热点检测函数是否正常运行。

1. **创建应用程序** – 使用 AWS 管理控制台，您随后创建一个 Kinesis Data Analytics 应用程序。通过将流式传输源映射到应用程序内部流 (`SOURCE_SQL_STREAM_001`) 来配置应用程序输入。在应用程序启动时，Kinesis Data Analytics 持续读取流式传输源，并将记录插入到应用程序内部流中。

   在本练习中，您将为应用程序使用以下代码：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   此代码读取 `SOURCE_SQL_STREAM_001` 中的行，分析它是否有大量热点，并将生成的数据写入到另一个应用程序内部流 (`DESTINATION_SQL_STREAM`)。您使用数据泵将流插入到应用程序内部流。有关更多信息，请参阅 [应用程序内部流和数据泵](streams-pumps.md)。

1. **配置输出** - 您配置应用程序输出以将应用程序中的数据发送到外部目标 (另一个 Kinesis 数据流)。查看热点分数并确定哪些分数表明出现了热点 (并且您需要收到警报)。您可以使用 AWS Lambda 函数进一步处理热点信息并配置警报。

1. **验证输出**-该示例包括一个 JavaScript 应用程序，该应用程序从输出流中读取数据并将其以图形方式显示，因此您可以实时查看该应用程序生成的热点。



本练习使用美国西部（俄勒冈州）(`us-west-2`) 区域创建这些流和您的应用程序。如果您使用任何其他区域，请相应地更新代码。

**Topics**
+ [步骤 1：创建输入和输出流](app-hotspots-prepare.md)
+ [步骤 2：创建 Kinesis Data Analytics 应用程序](app-hotspot-create-app.md)
+ [步骤 3：配置应用程序输出](app-hotspots-create-ka-app-config-destination.md)
+ [步骤 4：验证应用程序输出](app-hotspots-verify-output.md)

# 步骤 1：创建输入和输出流
<a name="app-hotspots-prepare"></a>

在为[热点示例](app-hotspots-detection.md)创建 Amazon Kinesis Data Analytics 应用程序之前，您必须创建两个 Kinesis 数据流。将一个流配置为应用程序的流式传输源，并将另一个流配置为目标（Kinesis Data Analytics 在其中永久保存应用程序输出）。

**Topics**
+ [步骤 1.1：创建 Kinesis 数据流](#app-hotspots-create-two-streams)
+ [步骤 1.2：将示例记录写入输入流](#app-hotspots-write-sample-records-inputstream)

## 步骤 1.1：创建 Kinesis 数据流
<a name="app-hotspots-create-two-streams"></a>

在此部分中，您创建两个 Kinesis 数据流：`ExampleInputStream` 和 `ExampleOutputStream`。

使用控制台或 AWS CLI创建这些数据流。
+ 使用控制台创建数据流：

  1. [登录 AWS 管理控制台 并在 /kinesis 上打开 Kinesis 控制台。https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

  1. 在导航窗格中，选择 **数据流**。

  1. 选择**创建 Kinesis 流**，然后创建带有一个名为 `ExampleInputStream` 的分片的流。

  1. 重复上一步骤以创建带有一个名为 `ExampleOutputStream` 的分片的流。
+ 要使用 AWS CLI创建数据流，请执行以下操作：
  + 使用以下 Kinesis `create-stream` AWS CLI 命令创建直播（`ExampleInputStream`和`ExampleOutputStream`）。要创建另一个流 (应用程序将用于写入输出)，请运行同一命令以将流名称更改为 `ExampleOutputStream`。

    ```
    $ aws kinesis create-stream \
    --stream-name ExampleInputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
                             
    $ aws kinesis create-stream \
    --stream-name ExampleOutputStream \
    --shard-count 1 \
    --region us-west-2 \
    --profile adminuser
    ```

## 步骤 1.2：将示例记录写入输入流
<a name="app-hotspots-write-sample-records-inputstream"></a>

在此步骤中，您运行 Python 代码以持续生成示例记录并将其写入 `ExampleInputStream` 流。

```
{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"}
{"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
```

1. 安装 Python 和 `pip`。

   有关安装 Python 的信息，请访问 [Python](https://www.python.org/) 网站。

   您可以使用 pip 安装依赖项。有关安装 pip 的信息，请参阅 pip 网站上的[安装](https://pip.pypa.io/en/stable/installing/)。

1. 运行以下 Python 代码。此代码将执行以下操作：
   + 在 (X, Y) 平面上的某个位置生成潜在热点。
   + 为每个热点生成一系列点 (1000 个)。这些点中有 20% 集中在热点周围。其余的点在整个空间内随机生成。
   + `put-record` 命令将 JSON 记录写入到流。
**重要**  
请勿将此文件上传到 Web 服务器，因为它包含您的 AWS 凭证。

   ```
    
   import json
   from pprint import pprint
   import random
   import time
   import boto3
   
   STREAM_NAME = "ExampleInputStream"
   
   
   def get_hotspot(field, spot_size):
       hotspot = {
           "left": field["left"] + random.random() * (field["width"] - spot_size),
           "width": spot_size,
           "top": field["top"] + random.random() * (field["height"] - spot_size),
           "height": spot_size,
       }
       return hotspot
   
   
   def get_record(field, hotspot, hotspot_weight):
       rectangle = hotspot if random.random() < hotspot_weight else field
       point = {
           "x": rectangle["left"] + random.random() * rectangle["width"],
           "y": rectangle["top"] + random.random() * rectangle["height"],
           "is_hot": "Y" if rectangle is hotspot else "N",
       }
       return {"Data": json.dumps(point), "PartitionKey": "partition_key"}
   
   
   def generate(
       stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client
   ):
       """
       Generates points used as input to a hotspot detection algorithm.
       With probability hotspot_weight (20%), a point is drawn from the hotspot;
       otherwise, it is drawn from the base field. The location of the hotspot
       changes for every 1000 points generated.
       """
       points_generated = 0
       hotspot = None
       while True:
           if points_generated % 1000 == 0:
               hotspot = get_hotspot(field, hotspot_size)
           records = [
               get_record(field, hotspot, hotspot_weight) for _ in range(batch_size)
           ]
           points_generated += len(records)
           pprint(records)
           kinesis_client.put_records(StreamName=stream_name, Records=records)
   
           time.sleep(0.1)
   
   
   if __name__ == "__main__":
       generate(
           stream_name=STREAM_NAME,
           field={"left": 0, "width": 10, "top": 0, "height": 10},
           hotspot_size=1,
           hotspot_weight=0.2,
           batch_size=10,
           kinesis_client=boto3.client("kinesis"),
       )
   ```



**下一个步骤**  
[步骤 2：创建 Kinesis Data Analytics 应用程序](app-hotspot-create-app.md)

# 步骤 2：创建 Kinesis Data Analytics 应用程序
<a name="app-hotspot-create-app"></a>

在该[热点示例](app-hotspots-detection.md)的此部分中，您创建一个 Kinesis Data Analytics 应用程序，如下所示：
+ 配置应用程序输入以将您在[步骤 1 ](app-hotspots-prepare.md)中创建的 Kinesis 数据流用作流式传输源。
+ 在 AWS 管理控制台中使用提供的应用程序代码。

**创建应用程序**

1. 按照[入门](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)练习中的步骤 1、2 和 3（请参阅 [步骤 3.1：创建应用程序](get-started-create-app.md)）创建 Kinesis Data Analytics 应用程序。

   在源配置中，执行以下操作：
   + 指定您在[步骤 1：创建输入和输出流](app-hotspots-prepare.md)中创建的流式传输源。
   + 在控制台推断架构后编辑架构。确保 `x` 和 `y``DOUBLE` 列类型设置为 ，并确保 `IS_HOT` 列类型设置为 `VARCHAR`。

1. 使用以下应用程序代码 (您可以将此代码粘贴到 SQL 编辑器中)：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
       "x" DOUBLE, 
       "y" DOUBLE, 
       "is_hot" VARCHAR(4),
       HOTSPOTS_RESULT VARCHAR(10000)
   ); 
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
       INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT "x", "y", "is_hot", "HOTSPOTS_RESULT" 
       FROM TABLE (
           HOTSPOTS(   
               CURSOR(SELECT STREAM "x", "y", "is_hot" FROM "SOURCE_SQL_STREAM_001"), 
               1000, 
               0.2, 
               17)
       );
   ```

   

1. 运行 SQL 代码并审查结果。  
![\[显示行时间、热点和 hotspot_heat 的 SQL 代码结果。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/hotspot-v2-40.png)





**下一个步骤**  
[步骤 3：配置应用程序输出](app-hotspots-create-ka-app-config-destination.md)

# 步骤 3：配置应用程序输出
<a name="app-hotspots-create-ka-app-config-destination"></a>

在[热点示例](app-hotspots-detection.md)的此部分中，您使用 Amazon Kinesis Data Analytics 应用程序代码查找流式传输源中的大量热点并将热分数分配给每个热点。

您现在可以将应用程序内部流中的应用程序结果发送到外部目标，这是另一个 Kinesis 数据流 (`ExampleOutputStream`)。然后，您可以分析热点分数并为热点热确定合适的阈值。您可以进一步扩展此应用程序以生成警报。

**配置应用程序输出**

1. 在 /kinesisanalytics 上打开 Kinesis Data Analytics 控制台[ https://console.aws.amazon.com。](https://console.aws.amazon.com/kinesisanalytics)

1. 在 SQL 编辑器中，在应用程序控制面板中选择 **Destination** 或 **Add a destination**。

1. 在 **Add a destination (添加目标)** 页面上，选择 **Select from your streams (从流中选择)**。然后选择在上一部分中创建的 `ExampleOutputStream` 流。

   现在，您具有一个外部目标，Amazon Kinesis Data Analytics 将应用程序写入到应用程序内部流 `DESTINATION_SQL_STREAM` 的任何记录永久保存到该目标中。

1. 您可以选择配置 AWS Lambda 为监控`ExampleOutputStream`直播并向您发送警报。有关更多信息，请参阅 [使用 Lambda 函数作为输出](how-it-works-output-lambda.md)。您还可以查看 Kinesis Data Analytics 写入到外部目标（Kinesis 流 `ExampleOutputStream`）的记录，如 [步骤 4：验证应用程序输出](app-hotspots-verify-output.md) 中所述。

**下一个步骤**  
[步骤 4：验证应用程序输出](app-hotspots-verify-output.md)

# 步骤 4：验证应用程序输出
<a name="app-hotspots-verify-output"></a>

在[热点示例](app-hotspots-detection.md)的此部分中，您将设置一个 Web 应用程序，该应用程序在可扩展矢量图形 (SVG) 控件中显示热点信息。

1. 使用以下内容创建名为 `index.html` 的文件：

   ```
   <!doctype html>
   <html lang=en>
   <head>
       <meta charset=utf-8>
       <title>hotspots viewer</title>
   
       <style>
       #visualization {
         display: block;
         margin: auto;
       }
   
       .point {
         opacity: 0.2;
       }
   
       .hot {
         fill: red;
       }
   
       .cold {
         fill: blue;
       }
   
       .hotspot {
         stroke: black;
         stroke-opacity: 0.8;
         stroke-width: 1;
         fill: none;
       }
       </style>
       <script src="https://sdk.amazonaws.com/js/aws-sdk-2.202.0.min.js"></script>
       <script src="https://d3js.org/d3.v4.min.js"></script>
   </head>
   <body>
   <svg id="visualization" width="600" height="600"></svg>
   <script src="hotspots_viewer.js"></script>
   </body>
   </html>
   ```

1. 在同一目录中创建一个名为 `hotspots_viewer.js` 的文件，该文件包含以下内容。在提供的变量中包含您的 、凭证和输出流名称。

   ```
   // Visualize example output from the Kinesis Analytics hotspot detection algorithm.
   // This script assumes that the output stream has a single shard.
   
   // Modify this section to reflect your AWS configuration
   var awsRegion = "",        // The  where your Kinesis Analytics application is configured.
       accessKeyId = "",      // Your Access Key ID
       secretAccessKey = "",  // Your Secret Access Key
       outputStream = "";     // The name of the Kinesis Stream where the output from the HOTSPOTS function is being written
   
   // The variables in this section should reflect way input data was generated and the parameters that the HOTSPOTS
   // function was called with.
   var windowSize = 1000, // The window size used for hotspot detection
       minimumDensity = 40,  // A filter applied to returned hotspots before visualization
       xRange = [0, 10],  // The range of values to display on the x-axis
       yRange = [0, 10];  // The range of values to display on the y-axis
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // D3 setup
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var svg = d3.select("svg"),
       margin = {"top": 20, "right": 20, "bottom": 20, "left": 20},
       graphWidth = +svg.attr("width") - margin.left - margin.right,
       graphHeight = +svg.attr("height") - margin.top - margin.bottom;
   
   // Return the linear function that maps the segment [a, b] to the segment [c, d].
   function linearScale(a, b, c, d) {
       var m = (d - c) / (b - a);
       return function(x) {
           return c + m * (x - a);
       };
   }
   
   // helper functions to extract the x-value from a stream record and scale it for output
   var xValue = function(r) { return r.x; },
       xScale = linearScale(xRange[0], xRange[1], 0, graphWidth),
       xMap = function(r) { return xScale(xValue(r)); };
   
   // helper functions to extract the y-value from a stream record and scale it for output
   var yValue = function(r) { return r.y; },
       yScale = linearScale(yRange[0], yRange[1], 0, graphHeight),
       yMap = function(r) { return yScale(yValue(r)); };
   
   // a helper function that assigns a CSS class to a point based on whether it was generated as part of a hotspot
   var classMap = function(r) { return r.is_hot == "Y" ? "point hot" : "point cold"; };
   
   var g = svg.append("g")
       .attr("transform", "translate(" + margin.left + "," + margin.top + ")");
   
   function update(records, hotspots) {
   
       var points = g.selectAll("circle")
           .data(records, function(r) { return r.dataIndex; });
   
       points.enter().append("circle")
           .attr("class", classMap)
           .attr("r", 3)
           .attr("cx", xMap)
           .attr("cy", yMap);
   
       points.exit().remove();
   
       if (hotspots) {
           var boxes = g.selectAll("rect").data(hotspots);
   
           boxes.enter().append("rect")
               .merge(boxes)
               .attr("class", "hotspot")
               .attr("x", function(h) { return xScale(h.minValues[0]); })
               .attr("y", function(h) { return yScale(h.minValues[1]); })
               .attr("width", function(h) { return xScale(h.maxValues[0]) - xScale(h.minValues[0]); })
               .attr("height", function(h) { return yScale(h.maxValues[1]) - yScale(h.minValues[1]); });
   
           boxes.exit().remove();
       }
   }
   
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   // Use the AWS SDK to pull output records from Kinesis and update the visualization
   ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
   
   var kinesis = new AWS.Kinesis({
       "region": awsRegion,
       "accessKeyId": accessKeyId,
       "secretAccessKey": secretAccessKey
   });
   
   var textDecoder = new TextDecoder("utf-8");
   
   // Decode an output record into an object and assign it an index value
   function decodeRecord(record, recordIndex) {
       var record = JSON.parse(textDecoder.decode(record.Data));
       var hotspots_result = JSON.parse(record.HOTSPOTS_RESULT);
       record.hotspots = hotspots_result.hotspots
           .filter(function(hotspot) { return hotspot.density >= minimumDensity});
       record.index = recordIndex
       return record;
   }
   
   // Fetch a new records from the shard iterator, append them to records, and update the visualization
   function getRecordsAndUpdateVisualization(shardIterator, records, lastRecordIndex) {
       kinesis.getRecords({
           "ShardIterator": shardIterator
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var newRecords = data.Records.map(function(raw) { return decodeRecord(raw, ++lastRecordIndex); });
           newRecords.forEach(function(record) { records.push(record); });
   
           var hotspots = null;
           if (newRecords.length > 0) {
               hotspots = newRecords[newRecords.length - 1].hotspots;
           }
   
           while (records.length > windowSize) {
               records.shift();
           }
   
           update(records, hotspots);
   
           getRecordsAndUpdateVisualization(data.NextShardIterator, records, lastRecordIndex);
       });
   }
   
   // Get a shard iterator for the output stream and begin updating the visualization. Note that this script will only
   // read records from the first shard in the stream.
   function init() {
       kinesis.describeStream({
           "StreamName": outputStream
       }, function(err, data) {
           if (err) {
               console.log(err, err.stack);
               return;
           }
   
           var shardId = data.StreamDescription.Shards[0].ShardId;
   
           kinesis.getShardIterator({
               "StreamName": outputStream,
               "ShardId": shardId,
               "ShardIteratorType": "LATEST"
           }, function(err, data) {
               if (err) {
                   console.log(err, err.stack);
                   return;
               }
               getRecordsAndUpdateVisualization(data.ShardIterator, [], 0);
           })
       });
   }
   
   // Start the visualization
   init();
   ```

1. 在第一个部分中 Python 代码运行时，在 Web 浏览器中打开 `index.html`。热点信息显示在页面上，如下所示。

     
![\[显示热点信息的可扩展矢量图形图表。\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/hotspots_visualizer.png)