

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 滑动窗口
滑动窗口

您可以不使用 `GROUP BY` 对记录分组，而是定义基于时间或基于行的窗口。您应通过添加显式 `WINDOW` 子句执行此操作。

在这种情况下，当窗口随着时间滑动时，只要流中出现新记录，Amazon Kinesis Data Analytics 就会发送输出。Kinesis Data Analytics 将会通过在此窗口中处理行来发送此输出。窗口在这种类型的处理中可以重叠，一个记录可以属于多个窗口并且可随各个窗口一起处理。以下示例说明了滑动的窗口。

考虑创建一个简单的查询对流中的记录进行计数。此示例假定有一个 5 秒的窗口。在以下示例流中，新记录分别于时间 t1、t2、t6 和 t7 到达，有三个记录于时间 t8 秒时到达。

![\[Timeline showing record arrivals at t1, t2, t6, t7, and multiple at t8 within a 5-second window.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-10.png)


记住以下内容：
+ 此示例假定有一个 5 秒的窗口。该 5 秒窗口持续随着时间滑动。
+ 对于进入窗口的每一行，滑动窗口会发送输出行。应用程序启动后不久，您会看到查询针对出现在流中的每个新记录发送输出，即使尚未经过 5 秒窗口。例如，当记录出现在第一秒和第二秒时，查询会发送输出。稍后，查询会处理 5 秒窗口中的记录。
+ 该窗口随着时间滑动。如果流中的旧记录落后于窗口，查询将不会发送任何输出，除非流中也有一个新记录落在该 5 秒窗口中。

  

假设查询在 t0 开始执行。那么，将出现以下情况：

1. 在时间 t0，查询开始执行。查询不发送输出 (计数值)，因为此时没有记录。  
![\[Timeline showing a stream starting at t0 with no output initially indicated.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t0.png)

1. 在时间 t1 处，新记录出现在流中，并且查询发送计数值 1。  
![\[Timeline showing a stream with a record appearing at time t1, and an arrow pointing to t0.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t1.png)

1. 在时间 t2，出现另一个记录，并且查询发送计数 2。  
![\[Timeline showing stream events at different time points, with two vertical bars at the end.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t2.png)

1. 此 5 秒窗口随着时间滑动：
   + 在 t3 处，滑动窗口为 t3 到 t0
   + 在 t4 处 (滑动窗口为 t4 到 t0)
   + 在 t5 处，滑动窗口为 t5 到 t0

   在所有这些时间，5 秒窗口具有相同的记录——没有新记录。因此，查询不会发送任何输出。  
![\[Timeline showing stream with multiple time points and colored rectangles representing data windows.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t3-4-5.png)

1. 在时间 t6 处，此 5 秒窗口为 (t6 到 t1)。查询在 t6 处检测到一个新记录，因此它发送了输出 2。t1 处的记录不再位于窗口中，计数时不考虑。  
![\[Timeline showing stream events at different time points with a sliding 5-second window.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t6.png)

1. 在时间 t7 处，此 5 秒窗口为 (t7 到 t2)。查询在 t7 处检测到一个新记录，因此它发送了输出 2。t2 处的记录不再位于 5 秒窗口中，因此计数时不考虑。  
![\[Timeline showing stream events and time points from t0 to t7, with a 5-second window highlighted.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t7.png)

1. 在时间 t8 处，此 5 秒窗口为 (t8 到 t3)。查询检测到三个新记录，因此发送了记录计数 5。  
![\[Timeline showing stream events with orange bars representing record counts at different time intervals.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t8.png)

总之，窗口是固定大小，并且随时间滑动。当出现新记录时，查询会发送输出。

**注意**  
我们建议使用滑动窗口的时间不要超过 1 小时。如果您使用时间更长的窗口，应用程序在常规系统维护之后需要更长的时间才能重新启动。这是因为必须再次从流中读取源数据。

以下示例查询使用 `WINDOW` 子句定义窗口和执行聚合。由于查询不指定 `GROUP BY`，因此查询使用滑动窗口方法处理流中的记录。



## 示例 1：使用一个 1 分钟滑动窗口处理流
示例 1

在填充应用程序内部流 `SOURCE_SQL_STREAM_001` 时，请考虑“入门”练习中的演示流。下面是架构。

```
(TICKER_SYMBOL VARCHAR(4), 
 SECTOR varchar(16),
 CHANGE REAL,
 PRICE REAL)
```

假设您希望应用程序使用 1 分钟滑动窗口计算聚合。也就是说，对于出现在流中的每个新记录，您希望应用程序通过对前面的 1 分钟窗口中的记录应用聚合来发送输出。

您可以使用以下基于时间的窗口式查询。查询使用 `WINDOW` 子句定义 1 分钟范围间隔。`WINDOW` 子句中的 `PARTITION BY` 按照滑动窗口中的股票行情机值对记录进行分组。

```
SELECT STREAM ticker_symbol,
              MIN(Price) OVER W1 AS Min_Price,
              MAX(Price) OVER W1 AS Max_Price,
              AVG(Price) OVER W1 AS Avg_Price
FROM   "SOURCE_SQL_STREAM_001"
WINDOW W1 AS (
   PARTITION BY ticker_symbol 
   RANGE INTERVAL '1' MINUTE PRECEDING);
```

**测试查询**

1. 按照[入门练习](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)设置应用程序。

1. 使用先前的 `SELECT` 查询替换应用程序代码中的 `SELECT` 语句。生成的应用程序代码如下。

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                            ticker_symbol VARCHAR(10), 
                            Min_Price     double, 
                            Max_Price     double, 
                            Avg_Price     double);
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
        SELECT STREAM ticker_symbol,
                      MIN(Price) OVER W1 AS Min_Price,
                      MAX(Price) OVER W1 AS Max_Price,
                      AVG(Price) OVER W1 AS Avg_Price
        FROM   "SOURCE_SQL_STREAM_001"
        WINDOW W1 AS (
           PARTITION BY ticker_symbol 
           RANGE INTERVAL '1' MINUTE PRECEDING);
   ```

## 示例 2：对滑动窗口应用聚合的查询


针对演示流的以下查询将返回一个 10 秒窗口中，每个股票行情机的价格的平均百分比变化。

```
SELECT STREAM Ticker_Symbol,
              AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change
FROM "SOURCE_SQL_STREAM_001"
WINDOW W1 AS (
        PARTITION BY ticker_symbol 
        RANGE INTERVAL '10' SECOND PRECEDING);
```



**测试查询**

1. 按照[入门练习](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)设置应用程序。

1. 使用先前的 `SELECT` 查询替换应用程序代码中的 `SELECT` 语句。生成的应用程序代码如下。

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                               ticker_symbol VARCHAR(10), 
                               Avg_Percent_Change double);
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM Ticker_Symbol,
                       AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change
         FROM "SOURCE_SQL_STREAM_001"
         WINDOW W1 AS (
                 PARTITION BY ticker_symbol 
                 RANGE INTERVAL '10' SECOND PRECEDING);
   ```

## 示例 3：从同一流的多个滑动窗口查询数据


您可以编写查询以发送输出，其中的每个列值都是使用同一流上定义的不同滑动窗口计算的。

在以下示例中，查询将发送输出股票行情机、价格、a2 和 a10。查询将发送股票代码的输出，这些代码的两行移动平均值超过了 10 行移动平均值。`a2` 和 `a10` 列值派生自 2 行和 10 行滑动窗口。

```
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                           ticker_symbol      VARCHAR(12), 
                           price              double, 
                           average_last2rows  double, 
                           average_last10rows double);

CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM ticker_symbol, 
              price, 
              avg(price) over last2rows, 
              avg(price) over last10rows
FROM SOURCE_SQL_STREAM_001
WINDOW
    last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING),
    last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);
```

要针对演示流测试此查询，请按照[示例 1](#sliding-ex1)中介绍的测试过程操作。