

经过仔细考虑，我们决定停用适用于 SQL 应用程序的 Amazon Kinesis Data Analytics：

1. 从 **2025年9月1日起，**我们将不再为适用于SQL应用程序的Amazon Kinesis Data Analytics Data Analytics提供任何错误修复，因为鉴于即将停产，我们对其的支持将有限。

2. 从 **2025 年 10 月 15 日**起，您将无法为 SQL 应用程序创建新的 Kinesis Data Analytics。

3. 从 **2026 年 1 月 27 日**起，我们将删除您的应用程序。您将无法启动或操作 Amazon Kinesis Data Analytics for SQL 应用程序。从那时起，将不再提供对 Amazon Kinesis Data Analytics for SQL 的支持。有关更多信息，请参阅 [Amazon Kinesis Data Analytics for SQL 应用程序停用](discontinuation.md)。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 创建 Lambda 函数以进行预处理
<a name="lambda-preprocessing-functions"></a>

在将记录提取到应用程序时，您的 Amazon Kinesis Data Analytics 应用程序可以使用 Lambda 函数预处理记录。Kinesis Data Analytics 在控制台上提供以下模板以作为数据预处理起点。

**Topics**
+ [使用 Node.js 创建预处理 Lambda 函数](#lambda-preprocessing-functions-nodejs)
+ [使用 Python 创建预处理 Lambda 函数](#lambda-preprocessing-functions-python)
+ [使用 Java 创建预处理 Lambda 函数](#lambda-preprocessing-functions-java)
+ [使用 .NET 创建预处理 Lambda 函数](#lambda-preprocessing-functions-net)

## 使用 Node.js 创建预处理 Lambda 函数
<a name="lambda-preprocessing-functions-nodejs"></a>

在 Kinesis Data Analytics 控制台上提供了以下模板以使用 Node.js 创建预处理 Lambda 函数：


| Lambda 蓝图 | 语言和版本 | 说明 | 
| --- | --- | --- | 
| 通用 Kinesis Data Analytics 输入处理  | Node.js 6.10 |  Kinesis Data Analytics 记录预处理器，它将 JSON 或 CSV 记录作为输入接收，然后返回这些记录以及处理状态。使用此处理器作为自定义转换逻辑的起点。  | 
| 压缩输入处理 | Node.js 6.10 | Kinesis Data Analytics 记录处理器，它接收压缩（GZIP 或 Deflate 压缩）JSON 或 CSV 记录以作为输入，并返回解压缩的记录以及处理状态。 | 

## 使用 Python 创建预处理 Lambda 函数
<a name="lambda-preprocessing-functions-python"></a>

在控制台上提供了以下模板以使用 Python 创建预处理 Lambda 函数：


| Lambda 蓝图 | 语言和版本 | 说明 | 
| --- | --- | --- | 
| 通用 Kinesis Analytics 输入处理  | Python 2.7 |  Kinesis Data Analytics 记录预处理器，它将 JSON 或 CSV 记录作为输入接收，然后返回这些记录以及处理状态。使用此处理器作为自定义转换逻辑的起点。  | 
| KPL 输入处理 | Python 2.7 | Kinesis Data Analytics 记录处理器，它接收 JSON 或 CSV 记录的 Kinesis 创建器库 (KPL) 聚合以作为输入，并返回取消聚合的记录以及处理状态。 | 

## 使用 Java 创建预处理 Lambda 函数
<a name="lambda-preprocessing-functions-java"></a>

要使用 Java 创建 Lambda 函数以预处理记录，请使用 [Java 事件](https://github.com/aws/aws-lambda-java-libs/tree/master/aws-lambda-java-events/src/main/java/com/amazonaws/services/lambda/runtime/events)类。

以下代码说明了一个使用 Java 的示例 Lambda 函数（用于预处理记录）：

```
public class LambdaFunctionHandler implements
        RequestHandler<KinesisAnalyticsStreamsInputPreprocessingEvent, KinesisAnalyticsInputPreprocessingResponse> {

    @Override
    public KinesisAnalyticsInputPreprocessingResponse handleRequest(
            KinesisAnalyticsStreamsInputPreprocessingEvent event, Context context) {
        context.getLogger().log("InvocatonId is : " + event.invocationId);
        context.getLogger().log("StreamArn is : " + event.streamArn);
        context.getLogger().log("ApplicationArn is : " + event.applicationArn);

        List<KinesisAnalyticsInputPreprocessingResponse.Record> records = new ArrayList<KinesisAnalyticsInputPreprocessingResponse.Record>();
        KinesisAnalyticsInputPreprocessingResponse response = new KinesisAnalyticsInputPreprocessingResponse(records);

        event.records.stream().forEach(record -> {
            context.getLogger().log("recordId is : " + record.recordId);
            context.getLogger().log("record aat is :" + record.kinesisStreamRecordMetadata.approximateArrivalTimestamp);
             // Add your record.data pre-processing logic here.                               
            // response.records.add(new Record(record.recordId, KinesisAnalyticsInputPreprocessingResult.Ok, <preprocessedrecordData>));
        });
        return response;
    }

}
```

## 使用 .NET 创建预处理 Lambda 函数
<a name="lambda-preprocessing-functions-net"></a>

要使用 .NET 创建 Lambda 函数以预处理记录，请使用 [.NET 事件](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)类。

以下代码说明了一个使用 C\$1 的示例 Lambda 函数（用于预处理记录）：

```
public class Function
    {
        public KinesisAnalyticsInputPreprocessingResponse FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext context)
        {
            context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
            context.Logger.LogLine($"StreamArn: {evnt.StreamArn}");
            context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");

            var response = new KinesisAnalyticsInputPreprocessingResponse
            {
                Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>()
            };

            foreach (var record in evnt.Records)
            {
                context.Logger.LogLine($"\tRecordId: {record.RecordId}");
                context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}");
                context.Logger.LogLine($"\tPartitionKey: {record.RecordMetadata.PartitionKey}");
                context.Logger.LogLine($"\tRecord ApproximateArrivalTime: {record.RecordMetadata.ApproximateArrivalTimestamp}");
                context.Logger.LogLine($"\tData: {record.DecodeData()}");

                // Add your record preprocessig logic here.

                var preprocessedRecord = new KinesisAnalyticsInputPreprocessingResponse.Record
                {
                    RecordId = record.RecordId,
                    Result = KinesisAnalyticsInputPreprocessingResponse.OK
                };
                preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant());
                response.Records.Add(preprocessedRecord);
            }
            return response;
        }
    }
```

有关使用 .NET 创建 Lambda 函数以进行预处理或作为目标的更多信息，请参阅 [https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents](https://github.com/aws/aws-lambda-dotnet/tree/master/Libraries/src/Amazon.Lambda.KinesisAnalyticsEvents)。