

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 窗口式查询
<a name="windowed-sql"></a>

对应用程序内部流连续执行应用程序代码中的 SQL 查询。应用程序内部流表示通过您的应用程序持续流动的无边界数据。因此，要从该连续更新输入获取结果集，通常可以使用根据时间或行定义的窗口来限制查询。这些查询也称为*窗口式 SQL*。

对于基于时间的窗口式查询，需要以时间为单位指定窗口大小（例如，一分钟窗口）。这需要在应用程序内部流中有一个单调递增的时间戳列。(新行的时间戳大于或等于前一行。) Amazon Kinesis Data Analytics 为每个应用程序内部流提供名为 `ROWTIME` 的时间戳列。在指定基于时间的查询时，可以使用该列。对于您的应用程序，可以选择其他某个时间戳选项。有关更多信息，请参阅 [时间戳和 ROWTIME 列](timestamps-rowtime-concepts.md)。

对于基于行的窗口式查询，可以使用行数为单位指定窗口大小。

您可以根据应用程序需求指定查询以滚动窗口方式、滑动窗口方式还是交错窗口方式处理记录。Kinesis Data Analytics 支持以下窗口类型：
+ [交错窗口](stagger-window-concepts.md)：一个使用基于时间的键控窗口聚合数据的查询，这种窗口在数据到达时打开。这些键允许多个重叠的窗口。这是使用基于时间的窗口聚合数据的推荐方法，因为与 Tumbling 窗口相比，Stagger Windows 可以减少延迟或 out-of-order数据流量。
+ [滚动窗口](tumbling-window-concepts.md)：一个使用基于时间的不同窗口聚合数据的查询，这些窗口以固定时间间隔打开和关闭。
+ [滑动窗口](sliding-window-concepts.md)：一个使用固定时间或行计数间隔连续聚合数据的查询。

# 交错窗口
<a name="stagger-window-concepts"></a>

使用*交错窗口* 是一种窗口化方法，适用于分析到达时间不一致的数据组。这种方法非常适合任何时间序列分析使用案例，例如一组相关的销售或日志记录。

例如，[VPC 流日志](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html#flow-logs-limitations)具有一个大约 10 分钟的捕获窗口。但是，如果您在客户端上聚合数据，则最多可以具有 15 分钟的捕获窗口。交错窗口非常适合用于聚合这些日志进行分析。

交错窗口解决了多条相关记录不属于同一时间限制窗口的问题，例如在使用了滚动窗口的情况下。

## 滚动窗口的部分结果
<a name="stagger-window-tumbling"></a>

[滚动窗口](tumbling-window-concepts.md)用于聚合延迟或 out-of-order数据存在某些限制。

如果使用滚动窗口来分析与时间相关的多组数据，则个别记录可能属于单独的窗口。因此，必须稍后组合每个窗口的部分结果，以便为每组记录生成完整的结果。

在以下滚动窗口查询中，记录按行时间、事件时间和股票代码分组为若干窗口：

```
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    TICKER_SYMBOL VARCHAR(4),
    EVENT_TIME timestamp,
    TICKER_COUNT     DOUBLE);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT STREAM 
        TICKER_SYMBOL,
        FLOOR(EVENT_TIME TO MINUTE),
        COUNT(TICKER_SYMBOL) AS TICKER_COUNT
    FROM "SOURCE_SQL_STREAM_001"
    GROUP BY ticker_symbol, FLOOR(EVENT_TIME TO MINUTE), STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE);
```

在下图中，应用程序根据交易发生的时间（事件时间）以一分钟的粒度计算它收到的交易数量。应用程序可以使用滚动窗口根据行时间和事件时间对数据进行分组。该应用程序接收四条记录，所有记录都在彼此的一分钟之内到达。它按行时间、事件时间和股票代码对记录进行分组。因为一些记录在第一个滚动窗口结束后到达，所以记录并非全部位于同一个一分钟的滚动窗口内。

![\[Tumbling windows diagram showing data grouping by row time, event time, and ticker symbol over two minutes.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/stagger_0.png)


上图包含以下事件。


****  

| ROWTIME | EVENT\$1TIME | TICKER\$1SYMBOL | 
| --- | --- | --- | 
| 11:00:20 | 11:00:10 | AMZN | 
| 11:00:30 | 11:00:20 | AMZN | 
| 11:01:05 | 11:00:55 | AMZN | 
| 11:01:15 | 11:01:05 | AMZN | 

滚动窗口应用程序的结果集类似于以下内容。


****  

| ROWTIME | EVENT\$1TIME | TICKER\$1SYMBOL | COUNT | 
| --- | --- | --- | --- | 
| 11:01:00 | 11:00:00 | AMZN | 2  | 
| 11:02:00 | 11:00:00 | AMZN | 1  | 
| 11:02:00 | 11:01:00 | AMZN | 1  | 

在前面的结果集中，返回了三个结果：
+ `ROWTIME` 为 11:01:00 的记录，它聚合前两个记录。
+ 11:02:00 的记录，它仅聚合第三条记录。此记录在第二个窗口中有一个 `ROWTIME`，但在第一个窗口中有一个 `EVENT_TIME`。
+ 11:02:00 的记录，它仅聚合第四条记录。

要分析完整的结果集，必须在持久性存储中聚合记录。这给应用程序增加了复杂性和处理要求。

## 完整结果与交错窗口
<a name="stagger-window-concepts-stagger"></a>

为了提高分析与时间相关的数据记录的准确性，Kinesis Data Analytics 提供了一种名为*交错窗口*的新窗口类型。在此窗口类型中，窗口在与分区键匹配的第一个事件到达时打开，而不是在固定的时间间隔打开。窗口根据指定的期限关闭，期限是从窗口打开的时间开始计算的。

交错窗口是窗口子句中每个键分组的单独时间限制窗口。应用程序将窗口子句的每个结果聚合在其自己的时间窗口内，而不是对所有结果使用单个窗口。

在以下交错窗口查询中，记录按事件时间和股票代码分组为若干窗口：

```
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
    ticker_symbol    VARCHAR(4), 
    event_time       TIMESTAMP,
    ticker_count     DOUBLE);

CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
  INSERT INTO "DESTINATION_SQL_STREAM" 
    SELECT STREAM 
        TICKER_SYMBOL,
        FLOOR(EVENT_TIME TO MINUTE),
        COUNT(TICKER_SYMBOL) AS ticker_count
    FROM "SOURCE_SQL_STREAM_001"
    WINDOWED BY STAGGER (
            PARTITION BY FLOOR(EVENT_TIME TO MINUTE), TICKER_SYMBOL RANGE INTERVAL '1' MINUTE);
```

在下图中，事件按事件时间和股票代码聚合到交错窗口中。

![\[Diagram showing event aggregation into stagger windows by event time and ticker symbol.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/stagger_1.png)


上图包含以下事件，这些事件与分析的滚动窗口应用程序相同：


****  

| ROWTIME | EVENT\$1TIME | TICKER\$1SYMBOL | 
| --- | --- | --- | 
| 11:00:20 | 11:00:10 | AMZN | 
| 11:00:30 | 11:00:20 | AMZN | 
| 11:01:05 | 11:00:55 | AMZN | 
| 11:01:15 | 11:01:05 | AMZN | 

交错窗口应用程序的结果集类似于以下内容。


****  

| ROWTIME | EVENT\$1TIME | TICKER\$1SYMBOL | 计数 | 
| --- | --- | --- | --- | 
| 11:01:20 | 11:00:00 | AMZN | 3 | 
| 11:02:15 | 11:01:00 | AMZN | 1 | 

返回的记录聚合了前三条输入记录。记录按一分钟的交错窗口分组。当应用程序收到第一条 AMZN 记录（`ROWTIME` 为 11:00:20）时，交错窗口开始。当 1 分钟交错窗口到期时 (11:01:20)，对于包含位于交错窗口内的结果（基于 `ROWTIME` 和 `EVENT_TIME`）的记录，将被写入输出流。使用交错窗口，在一分钟窗口内具有 `ROWTIME` 和 `EVENT_TIME` 的所有记录都将在单个结果中发出。

最后一条记录（其 `EVENT_TIME` 位于一分钟聚合之外）是单独聚合的。这是因为 `EVENT_TIME` 是用于将记录分成多个结果集的分区键之一，而第一个窗口的 `EVENT_TIME` 的分区键是 `11:00`。

交错窗口的语法在特殊子句 `WINDOWED BY` 中定义。对于流式处理聚合，使用此子句代替 `GROUP BY` 子句。该子句紧跟在可选的 `WHERE` 子句之后和 `HAVING` 子句之前。

交错窗口在 `WINDOWED BY` 子句中定义，并带有两个参数：分区键和窗口长度。分区键对传入的数据流进行分区，并定义窗口何时打开。当流中出现具有唯一分区键的第一个事件时，将打开一个交错窗口。在经过由窗口长度定义的固定时间段之后，交错窗口关闭。以下代码示例说明了此语法：

```
...
FROM <stream-name>
WHERE <... optional statements...>
WINDOWED BY STAGGER(
	PARTITION BY <partition key(s)>
	RANGE INTERVAL <window length, interval>
);
```

# 滚动窗口（使用 GROUP BY 组的聚合）
<a name="tumbling-window-concepts"></a>

当一个窗口式查询以非重叠方式处理每个窗口时，这样的窗口称为*滚动窗口*。在这种情况下，应用程序内部流上的每个记录属于特定窗口。它只处理一次（当查询处理记录所属的窗口时）。

![\[Timeline showing non-overlapping windows processing data streams at distinct time intervals.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/window-tumbling-20.png)


例如，使用 `GROUP BY` 子句的聚合查询在一个滚动窗口中处理行。[入门练习](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)中的演示流接收股票价格数据，而这些数据映射到应用程序中的应用程序内部流 `SOURCE_SQL_STREAM_001`。这个流具有以下架构。

```
(TICKER_SYMBOL VARCHAR(4), 
 SECTOR varchar(16), 
 CHANGE REAL, 
 PRICE REAL)
```

在您的应用程序代码中，假设您希望针对一分钟窗口找到每个股票行情机的聚合（最小、最大）价格。您可以使用以下查询。

```
SELECT STREAM ROWTIME,
              Ticker_Symbol,
              MIN(Price) AS Price,
              MAX(Price) AS Price
FROM     "SOURCE_SQL_STREAM_001"
GROUP BY Ticker_Symbol, 
         STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
```

上述示例是一个基于时间的窗口式查询。该查询根据 `ROWTIME` 值将记录分组。对于每分钟进行的报告，`STEP` 函数将 `ROWTIME` 值向下舍入到最接近的分钟。

**注意**  
您也可以使用 `FLOOR` 函数将记录分组为不同的窗口。但是，`FLOOR` 只能将时间值向下舍入到完整的时间单位（小时、分钟、秒等）。建议使用 `STEP` 以将记录分组为不同的滚动窗口，因为它可以将值向下舍入到任意间隔，例如，30 秒。

该查询是非重叠 (滚动) 窗口示例。`GROUP BY` 子句在一分钟窗口内对记录进行分组，每个记录属于一个特定窗口 (不重叠)。该查询每分钟发出一条输出记录，提供在特定时刻记录的 min/max 股票价格。在根据输入数据流生成周期性报告时，此类查询很有用。在本示例中，报告是每分钟生成的。

**测试查询**

1. 按照[入门练习](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)设置应用程序。

1. 使用先前的 `SELECT` 查询替换应用程序代码中的 `SELECT` 语句。下面显示得到的应用程序代码：

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                                      ticker_symbol VARCHAR(4), 
                                      Min_Price     DOUBLE, 
                                      Max_Price     DOUBLE);
   -- CREATE OR REPLACE PUMP to insert into output
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
     INSERT INTO "DESTINATION_SQL_STREAM" 
       SELECT STREAM Ticker_Symbol,
                     MIN(Price) AS Min_Price,
                     MAX(Price) AS Max_Price
       FROM    "SOURCE_SQL_STREAM_001"
       GROUP BY Ticker_Symbol, 
                STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
   ```

# 滑动窗口
<a name="sliding-window-concepts"></a>

您可以不使用 `GROUP BY` 对记录分组，而是定义基于时间或基于行的窗口。您应通过添加显式 `WINDOW` 子句执行此操作。

在这种情况下，当窗口随着时间滑动时，只要流中出现新记录，Amazon Kinesis Data Analytics 就会发送输出。Kinesis Data Analytics 将会通过在此窗口中处理行来发送此输出。窗口在这种类型的处理中可以重叠，一个记录可以属于多个窗口并且可随各个窗口一起处理。以下示例说明了滑动的窗口。

考虑创建一个简单的查询对流中的记录进行计数。此示例假定有一个 5 秒的窗口。在以下示例流中，新记录分别于时间 t1、t2、t6 和 t7 到达，有三个记录于时间 t8 秒时到达。

![\[Timeline showing record arrivals at t1, t2, t6, t7, and multiple at t8 within a 5-second window.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-10.png)


记住以下内容：
+ 此示例假定有一个 5 秒的窗口。该 5 秒窗口持续随着时间滑动。
+ 对于进入窗口的每一行，滑动窗口会发送输出行。应用程序启动后不久，您会看到查询针对出现在流中的每个新记录发送输出，即使尚未经过 5 秒窗口。例如，当记录出现在第一秒和第二秒时，查询会发送输出。稍后，查询会处理 5 秒窗口中的记录。
+ 该窗口随着时间滑动。如果流中的旧记录落后于窗口，查询将不会发送任何输出，除非流中也有一个新记录落在该 5 秒窗口中。

  

假设查询在 t0 开始执行。那么，将出现以下情况：

1. 在时间 t0，查询开始执行。查询不发送输出 (计数值)，因为此时没有记录。  
![\[Timeline showing a stream starting at t0 with no output initially indicated.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t0.png)

1. 在时间 t1 处，新记录出现在流中，并且查询发送计数值 1。  
![\[Timeline showing a stream with a record appearing at time t1, and an arrow pointing to t0.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t1.png)

1. 在时间 t2，出现另一个记录，并且查询发送计数 2。  
![\[Timeline showing stream events at different time points, with two vertical bars at the end.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t2.png)

1. 此 5 秒窗口随着时间滑动：
   + 在 t3 处，滑动窗口为 t3 到 t0
   + 在 t4 处 (滑动窗口为 t4 到 t0)
   + 在 t5 处，滑动窗口为 t5 到 t0

   在所有这些时间，5 秒窗口具有相同的记录——没有新记录。因此，查询不会发送任何输出。  
![\[Timeline showing stream with multiple time points and colored rectangles representing data windows.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t3-4-5.png)

1. 在时间 t6 处，此 5 秒窗口为 (t6 到 t1)。查询在 t6 处检测到一个新记录，因此它发送了输出 2。t1 处的记录不再位于窗口中，计数时不考虑。  
![\[Timeline showing stream events at different time points with a sliding 5-second window.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t6.png)

1. 在时间 t7 处，此 5 秒窗口为 (t7 到 t2)。查询在 t7 处检测到一个新记录，因此它发送了输出 2。t2 处的记录不再位于 5 秒窗口中，因此计数时不考虑。  
![\[Timeline showing stream events and time points from t0 to t7, with a 5-second window highlighted.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t7.png)

1. 在时间 t8 处，此 5 秒窗口为 (t8 到 t3)。查询检测到三个新记录，因此发送了记录计数 5。  
![\[Timeline showing stream events with orange bars representing record counts at different time intervals.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/sliding-t8.png)

总之，窗口是固定大小，并且随时间滑动。当出现新记录时，查询会发送输出。

**注意**  
我们建议使用滑动窗口的时间不要超过 1 小时。如果您使用时间更长的窗口，应用程序在常规系统维护之后需要更长的时间才能重新启动。这是因为必须再次从流中读取源数据。

以下示例查询使用 `WINDOW` 子句定义窗口和执行聚合。由于查询不指定 `GROUP BY`，因此查询使用滑动窗口方法处理流中的记录。



## 示例 1：使用一个 1 分钟滑动窗口处理流
<a name="sliding-ex1"></a>

在填充应用程序内部流 `SOURCE_SQL_STREAM_001` 时，请考虑“入门”练习中的演示流。下面是架构。

```
(TICKER_SYMBOL VARCHAR(4), 
 SECTOR varchar(16),
 CHANGE REAL,
 PRICE REAL)
```

假设您希望应用程序使用 1 分钟滑动窗口计算聚合。也就是说，对于出现在流中的每个新记录，您希望应用程序通过对前面的 1 分钟窗口中的记录应用聚合来发送输出。

您可以使用以下基于时间的窗口式查询。查询使用 `WINDOW` 子句定义 1 分钟范围间隔。`WINDOW` 子句中的 `PARTITION BY` 按照滑动窗口中的股票行情机值对记录进行分组。

```
SELECT STREAM ticker_symbol,
              MIN(Price) OVER W1 AS Min_Price,
              MAX(Price) OVER W1 AS Max_Price,
              AVG(Price) OVER W1 AS Avg_Price
FROM   "SOURCE_SQL_STREAM_001"
WINDOW W1 AS (
   PARTITION BY ticker_symbol 
   RANGE INTERVAL '1' MINUTE PRECEDING);
```

**测试查询**

1. 按照[入门练习](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)设置应用程序。

1. 使用先前的 `SELECT` 查询替换应用程序代码中的 `SELECT` 语句。生成的应用程序代码如下。

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                            ticker_symbol VARCHAR(10), 
                            Min_Price     double, 
                            Max_Price     double, 
                            Avg_Price     double);
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
        SELECT STREAM ticker_symbol,
                      MIN(Price) OVER W1 AS Min_Price,
                      MAX(Price) OVER W1 AS Max_Price,
                      AVG(Price) OVER W1 AS Avg_Price
        FROM   "SOURCE_SQL_STREAM_001"
        WINDOW W1 AS (
           PARTITION BY ticker_symbol 
           RANGE INTERVAL '1' MINUTE PRECEDING);
   ```

## 示例 2：对滑动窗口应用聚合的查询
<a name="sliding-ex2"></a>

针对演示流的以下查询将返回一个 10 秒窗口中，每个股票行情机的价格的平均百分比变化。

```
SELECT STREAM Ticker_Symbol,
              AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change
FROM "SOURCE_SQL_STREAM_001"
WINDOW W1 AS (
        PARTITION BY ticker_symbol 
        RANGE INTERVAL '10' SECOND PRECEDING);
```



**测试查询**

1. 按照[入门练习](https://docs.aws.amazon.com/kinesisanalytics/latest/dev/get-started-exercise.html)设置应用程序。

1. 使用先前的 `SELECT` 查询替换应用程序代码中的 `SELECT` 语句。生成的应用程序代码如下。

   ```
   CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                               ticker_symbol VARCHAR(10), 
                               Avg_Percent_Change double);
   CREATE OR REPLACE PUMP "STREAM_PUMP" AS 
      INSERT INTO "DESTINATION_SQL_STREAM"
         SELECT STREAM Ticker_Symbol,
                       AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change
         FROM "SOURCE_SQL_STREAM_001"
         WINDOW W1 AS (
                 PARTITION BY ticker_symbol 
                 RANGE INTERVAL '10' SECOND PRECEDING);
   ```

## 示例 3：从同一流的多个滑动窗口查询数据
<a name="sliding-ex3"></a>

您可以编写查询以发送输出，其中的每个列值都是使用同一流上定义的不同滑动窗口计算的。

在以下示例中，查询将发送输出股票行情机、价格、a2 和 a10。查询将发送股票代码的输出，这些代码的两行移动平均值超过了 10 行移动平均值。`a2` 和 `a10` 列值派生自 2 行和 10 行滑动窗口。

```
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
                           ticker_symbol      VARCHAR(12), 
                           price              double, 
                           average_last2rows  double, 
                           average_last10rows double);

CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM ticker_symbol, 
              price, 
              avg(price) over last2rows, 
              avg(price) over last10rows
FROM SOURCE_SQL_STREAM_001
WINDOW
    last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING),
    last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);
```

要针对演示流测试此查询，请按照[示例 1](#sliding-ex1)中介绍的测试过程操作。