

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 配置应用程序输入
<a name="how-it-works-input"></a>

您的 Amazon Kinesis Data Analytics 应用程序可以从单个流式传输源中接收输入，并且可以选择使用一个引用数据源。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序：工作原理](how-it-works.md)。本主题的此部分介绍了应用程序输入源。

**Topics**
+ [配置流式传输源](#source-streaming)
+ [配置引用源](#source-reference)
+ [与 JSONPath](about-json-path.md)
+ [将流式传输源元素映射到 SQL 输入列](sch-mapping.md)
+ [针对流数据使用架构发现功能](sch-dis.md)
+ [针对静态数据使用架构发现功能](sch-dis-ref.md)
+ [使用 Lambda 函数预处理数据](lambda-preprocessing.md)
+ [并行处理输入流以增加吞吐量](input-parallelism.md)

## 配置流式传输源
<a name="source-streaming"></a>

在创建应用程序时，您需要指定流媒体源。您还可以在创建应用程序后修改输入。Amazon Kinesis Data Analytics 支持应用程序的以下流式源：
+ Kinesis 数据流 
+ Firehose 传输流

**注意**  
2023 年 9 月 12 日之后，如果您尚未使用 Kinesis Data Analytics for SQL，则将无法使用 Kinesis Data Firehose 作为来源创建新应用程序。使用 `KinesisFirehoseInput` 对 Kinesis Data Analytics for SQL 应用程序进行操作的现有客户可以继续使用 `KinesisFirehoseInput` 在使用 Kinesis Data Analytics 的现有账户内添加应用程序。如果您是现有客户，并希望使用 `KinesisFirehoseInput` 在 Kinesis Data Analytics for SQL 应用程序中创建一个新的账户，则可以通过服务限制增加表创建案例。有关更多信息，请参阅 [AWS 支持 中心](https://console.aws.amazon.com/support/home#/)。在将所有新应用程序推广到生产环境之前，务必对其进行测试。

**注意**  
如果 Kinesis 数据流是加密的，Kinesis Data Analytics 会无缝地访问加密流中的数据，无需进一步配置。Kinesis Data Analytics 不存储从 Kinesis 数据流读取的未加密数据。有关更多信息，请参阅[什么是 Kinesis 数据流的服务器端加密？](https://docs.aws.amazon.com/streams/latest/dev/what-is-sse.html)。

Kinesis Data Analytics 持续轮询流式传输源以查找新数据，并根据输入配置在应用程序内部流中提取该数据。

**注意**  
添加 Kinesis 流作为应用程序的输入不会影响流中的数据。如果另一资源（如 Firehose 传输流）也访问了同一 Kinesis 流，则 Firehose 传输流和 Kinesis Data Analytics 应用程序将收到相同的数据。但是，吞吐量和限制可能会受到影响。

您的应用程序代码可以查询应用程序内部流。作为输入配置的一部分，您需要提供以下内容：
+ **流式传输源** – 您提供流的 Amazon 资源名称 (ARN) 以及 Kinesis Data Analytics 可担任的 IAM 角色以代表您访问流。
+ **应用程序内部流名称前缀** – 在启动应用程序时，Kinesis Data Analytics 创建指定的应用程序内部流。在您的应用程序代码中，可以使用此名称访问应用程序内部流。

  您可以选择将一个流式传输源映射到多个应用程序内部流。有关更多信息，请参阅 [限制](limits.md)。在这种情况下，Amazon Kinesis Data Analytics 会创建指定数量的应用程序内流，其名称如下 *prefix*`_001`：*prefix*`_002`、和。*prefix* `_003`默认情况下，Kinesis Data Analytics 会将流媒体源映射到一个名为的应用程序内流。*prefix* `_001`

  在应用程序内部流中插入行时有速度限制。因此，Kinesis Data Analytics 支持多个此类应用程序内部流，以便以快得多的速度将记录添加到应用程序中。如果发现应用程序无法及时处理流式传输源中的数据，您可以添加并行度单元以提高性能。
+ **映射架构** – 您描述流式传输源上的记录格式 (JSON、CSV)。您还描述流上的每个记录如何映射到创建的应用程序内部流中的列。您可以在此处提供列名和数据类型。

**注意**  
在创建输入应用程序内部流时，Kinesis Data Analytics 使用引号将标识符 (流名称和列名称) 引起来。在查询该流和列时，您必须在引号内使用相同的大小写指定它们 (小写和大写字母完全匹配)。有关标识符的更多信息，请参阅 *Amazon Managed Service for Apache Flink SQL* 参考中的[标识符](https://docs.aws.amazon.com/kinesisanalytics/latest/sqlref/sql-reference-identifiers.html)。

您可以通过 Amazon Kinesis Data Analytics 控制台创建一个应用程序并配置输入。然后，控制台执行必需的 API 调用。创建新应用程序 API 或者将输入配置添加到现有应用程序时，可以配置应用程序输入。有关更多信息，请参阅[CreateApplication](API_CreateApplication.md)和[AddApplicationInput](API_AddApplicationInput.md)。下面是 `Createapplication` API 请求正文的输入配置部分：

```
 "Inputs": [
        {
            "InputSchema": {
                "RecordColumns": [
                    {
                        "Mapping": "string",
                        "Name": "string",
                        "SqlType": "string"
                    }
                ],
                "RecordEncoding": "string",
                "RecordFormat": {
                    "MappingParameters": {
                        "CSVMappingParameters": {
                            "RecordColumnDelimiter": "string",
                            "RecordRowDelimiter": "string"
                        },
                        "JSONMappingParameters": {
                            "RecordRowPath": "string"
                        }
                    },
                    "RecordFormatType": "string"
                }
            },
            "KinesisFirehoseInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "KinesisStreamsInput": {
                "ResourceARN": "string",
                "RoleARN": "string"
            },
            "Name": "string"
        }
    ]
```

## 配置引用源
<a name="source-reference"></a>

您也可以选择向现有应用程序添加参考数据源，以丰富来自流媒体源的数据。您必须将引用数据存储为 Amazon S3 存储桶中的对象。在应用程序启动时，Amazon Kinesis Data Analytics 读取 Amazon S3 对象并创建应用程序内部引用表。然后，您的应用程序代码可以将其与应用程序内部流联接。

您可以使用支持的格式 (CSV、JSON) 在 Amazon S3 对象中存储引用数据。例如，假设您的应用程序对股票订单执行分析。对流式传输源采用以下记录格式：

```
Ticker, SalePrice, OrderId

AMZN     $700        1003
XYZ      $250        1004
...
```

在这种情况下，您可能考虑维护引用数据源，以提供有关每个股票行情机的详细信息，如公司名称。

```
Ticker, Company
AMZN, Amazon
XYZ, SomeCompany
...
```

您可以使用 API 或控制台添加应用程序引用数据源。Amazon Kinesis Data Analytics 提供以下 API 操作来管理引用数据源：
+  [AddApplicationReferenceDataSource](API_AddApplicationReferenceDataSource.md)
+ [UpdateApplication](API_UpdateApplication.md)

有关使用控制台添加引用数据的信息，请参阅[示例：在 应用程序中添加引用数据](app-add-reference-data.md)。

注意以下几点：
+ 如果应用程序正在运行，则 Kinesis Data Analytics 创建一个应用程序内部引用表，然后立即加载引用数据。
+ 如果应用程序未运行 (例如，处于就绪状态)，则 Kinesis Data Analytics 仅保存更新的输入配置。在应用程序开始运行时，Kinesis Data Analytics 在应用程序中将引用数据作为表进行加载。

假设您希望在 Kinesis Data Analytics 创建应用程序内部引用表后刷新数据。您可能更新了 Amazon S3 对象，或者要使用不同的 Amazon S3 对象。在这种情况下，您可以显式调用 [UpdateApplication](API_UpdateApplication.md)，或在控制台中依次选择**操作**、**Synchronize reference data table (同步引用数据表)**。Kinesis Data Analytics 不会自动刷新应用程序内部引用表。

可作为引用数据源创建的 Amazon S3 对象具有大小限制。有关更多信息，请参阅 [限制](limits.md)。如果对象大小超出该限制，则 Kinesis Data Analytics 无法加载数据。应用程序状态显示为正在运行，但是不读取数据。

当添加引用数据源时，您需要提供以下信息：
+ **S3 存储桶和对象键名称** – 除了存储桶名称和对象键以外，您还需要提供 Kinesis Data Analytics 可担任的 IAM 角色以代表您读取对象。
+ **应用程序内部引用表名称** – Kinesis Data Analytics 创建该应用程序内部表并读取 Amazon S3 对象以填充该表。这是您在应用程序代码中指定的表名称。
+ **映射架构** - 您描述记录格式 (JSON、CSV)，即 Amazon S3 对象中存储的数据的编码。您还可以描述每个对象元素如何映射到应用程序内部引用表中的列。

下面显示 `AddApplicationReferenceDataSource` API 请求中的请求正文。

```
{
    "applicationName": "string",
    "CurrentapplicationVersionId": number,
    "ReferenceDataSource": {
        "ReferenceSchema": {
            "RecordColumns": [
                {
                    "IsDropped": boolean,
                    "Mapping": "string",
                    "Name": "string",
                    "SqlType": "string"
                }
            ],
            "RecordEncoding": "string",
            "RecordFormat": {
                "MappingParameters": {
                    "CSVMappingParameters": {
                        "RecordColumnDelimiter": "string",
                        "RecordRowDelimiter": "string"
                    },
                    "JSONMappingParameters": {
                        "RecordRowPath": "string"
                    }
                },
                "RecordFormatType": "string"
            }
        },
        "S3ReferenceDataSource": {
            "BucketARN": "string",
            "FileKey": "string",
            "ReferenceRoleARN": "string"
        },
        "TableName": "string"
    }
}
```