

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 并行处理输入流以增加吞吐量
<a name="input-parallelism"></a>

**注意**  
2023 年 9 月 12 日之后，如果您尚未使用 Kinesis Data Analytics for SQL，则将无法使用 Kinesis Data Firehose 作为来源创建新应用程序。有关更多信息，请参阅[限制](https://docs.aws.amazon.com//kinesisanalytics/latest/dev/limits.html)。

Amazon Kinesis Data Analytics 应用程序可以支持多个应用程序内部输入流，以将应用程序扩展到超出单个应用程序内部输入流的吞吐量。有关应用程序内部输入流的更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序：工作原理](how-it-works.md)。

几乎在所有情况下，Amazon Kinesis Data Analytics 都可扩展您的应用程序以处理传输到您的应用程序的 Kinesis 流或 Firehose 源流的容量。但是，如果您的源流的吞吐量超出单个应用程序内部输入流的吞吐量，您可显式增加您的应用程序使用的应用程序内部输入流的数量。您需使用 `InputParallelism` 参数执行此操作。

如果 `InputParallelism` 参数大于 1，则 Amazon Kinesis Data Analytics 在应用程序内部流之间平均拆分源流的分区。例如，如果您的源流具有 50 个分片，并且您已将 `InputParallelism` 设置为 `2`，则每个应用程序内部输入流都将收到来自 25 个源流分片的输入。

当您增加应用程序内部流的数量后，您的应用程序必须显式访问每个流中的数据。有关通过代码访问多个应用程序内部流的信息，请参阅[在 Amazon Kinesis Data Analytics 应用程序中访问单独的应用程序内部流](#input-parallelism-code-example)。

虽然以相同的方式在应用程序内部流之间划分 Kinesis 数据流和 Firehose 流分片，但会以不同的方式向您的应用程序显示这些分片：
+ Kinesis 数据流中的记录包含一个 `shard_id` 字段，可用于指定记录的源分片。
+ Firehose 传输流中的记录不包括标识记录的源分片或分区的字段。这是因为 Firehose 将此信息从您的应用程序中去除了。

## 评估是否增加您的应用程序内部输入流的数量
<a name="input-parallelism-evaluating"></a>

在大多数情况下，单个应用程序内部输入流可处理单个源流的吞吐量，具体取决于输入流的复杂度和数据大小。要确定是否需要增加应用程序内输入流的数量，您可以在 Amazon CloudWatch 中监控`InputBytes`和`MillisBehindLatest`指标。

如果该`InputBytes`指标大于 100 MB/sec （或者您预计它将大于此比率），则可能会导致应用程序问题的增加`MillisBehindLatest`并增加其影响。为解决此问题，我们建议您为应用程序选择以下语言：
+ 如果您的应用程序的扩展需求超过 100 MB/秒，请使用多个流和 Kinesis Data Analytics for SQL 应用程序。
+ 如果您希望使用单个流和应用程序，请使用[适用于 Java 应用程序的 Kinesis Data Analytics](/managed-flink/latest/java/what-is.html)。

如果 `MillisBehindLatest` 指标具有以下任一特征，则应调高您的应用程序的 `InputParallelism` 设置：
+ `MillisBehindLatest` 指标逐渐增大，指示您的应用程序落后于流中的最新数据。
+ `MillisBehindLatest` 指标始终高于 1000 (1 秒)。

如果满足以下条件，您无需调高您的应用程序的 `InputParallelism` 设置：
+ `MillisBehindLatest` 指标逐渐减小，指示您的应用程序跟上了流中的最新数据。
+ `MillisBehindLatest` 指标小于 1000 (1 秒)。

有关使用的更多信息 CloudWatch，请参阅《[CloudWatch 用户指南》](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/)。

## 实施多个应用程序内部输入流
<a name="input-parallelism-implementing"></a>

如果应用程序是使用 [CreateApplication](API_CreateApplication.md) 创建的，您可以设置应用程序内部输入流的数量。您应在使用[UpdateApplication](API_UpdateApplication.md)创建应用程序之后设置此数量。

**注意**  
您只能使用 Amazon Kinesis Data Analytics API 或 AWS CLI设置 `InputParallelism`。您不能使用来设置此设置 AWS 管理控制台。有关设置的信息 AWS CLI，请参阅[步骤 2：设置 AWS Command Line Interface (AWS CLI)](setup-awscli.md)。

### 设置新应用程序的输入流计数
<a name="input-parallelism-implementing-create"></a>

以下示例说明了如何使用 `CreateApplication` API 操作将新应用程序的输入流计数设置为 2。

有关 `CreateApplication`的更多信息，请参阅[CreateApplication](API_CreateApplication.md)。

```
{
   "ApplicationCode": "<The SQL code the new application will run on the input stream>",
   "ApplicationDescription": "<A friendly description for the new application>",
   "ApplicationName": "<The name for the new application>",
   "Inputs": [ 
    { 
      "InputId": "ID for the new input stream",
      "InputParallelism": { 
        "Count": 2
    }],
   "Outputs": [ ... ],
	}]
}
```

### 设置现有应用程序的输入流计数
<a name="input-parallelism-implementing-update"></a>

以下示例说明了如何使用 `UpdateApplication` API 操作将现有应用程序的输入流计数设置为 2。

有关 `Update_Application`的更多信息，请参阅[UpdateApplication](API_UpdateApplication.md)。

```
{
   "InputUpdates": [ 
      { 
         "InputId": "yourInputId",
         "InputParallelismUpdate": { 
            "CountUpdate": 2
         }
      }
   ],
}
```

## 在 Amazon Kinesis Data Analytics 应用程序中访问单独的应用程序内部流
<a name="input-parallelism-code-example"></a>

要使用您的应用程序中的多个应用程序内部输入流，您必须从不同的流中显式选择。以下代码示例说明了如何在创建于“入门”教程中的应用程序中查询多个输入流。

在以下示例中，每个源流在组合到名为 [ 的单个应用程序内部流之前先通过 ](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-count.html)COUNT`in_application_stream001` 进行了聚合。预先聚合源流有助于确保组合的应用程序内部流可处理来自多个流的流量而不会过载。

**注意**  
要运行此示例并获得来自应用程序内部输入流的结果，请更新源流中的分片数和应用程序中的 `InputParallelism` 参数。

```
CREATE OR REPLACE STREAM in_application_stream_001 (
    ticker VARCHAR(64),
    ticker_count INTEGER
);

CREATE OR REPLACE PUMP pump001 AS 
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_001
GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND),
    ticker_symbol; 
        
CREATE OR REPLACE PUMP pump002 AS 
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_002
GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND),
    ticker_symbol;
```

前面的代码示例将在 `in_application_stream001` 中生成类似于下面的输出：

![\[Table showing ROWTIME, TICKER, and TICKER_COUNT columns with sample data entries.\]](http://docs.aws.amazon.com/zh_cn/kinesisanalytics/latest/dev/images/input-parallelism-results.png)


## 其他注意事项
<a name="input-parallelism-considerations"></a>

在使用多个输入流时，请注意以下事项：
+ 应用程序内部输入流的最大数量为 64。
+ 应用程序内部输入流在应用程序的输入流的分片之间均匀分配。
+ 通过添加应用程序内部流获得的性能改进无法线性扩展。也就是说，使应用程序内部流的数量加倍不会使吞吐量加倍。对于典型行大小，每个应用程序内部流可实现每秒大约 5,000 到 15,000 行的吞吐量。通过将应用程序内部流计数增加到 10，您可以实现每秒 20,000 到 30,000 行的吞吐量。吞吐量流速取决于输入流中的字段的计数、数据类型和数据大小。
+ 某些聚合函数（如 [AVG](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-avg.html)）在应用于分区到不同分片中的输入流时可能生成意外结果。由于您需要在将各个分片组合到聚合流中之前对它们运行聚合操作，因此结果可能向包含更多记录的流加权。
+ 如果您的应用程序在增加输入流数量后继续出现性能不佳的情况（反映在较高的`MillisBehindLatest`指标上），则可能已达到 Kinesis 处理单元的限制 () KPUs。有关更多信息，请参阅 [自动扩展应用程序以提高吞吐量](how-it-works-autoscaling.md)。