

# 在 Lambda 中使用带有 Kafka 事件源的架构注册表
<a name="services-consume-kafka-events"></a>

 架构注册表可帮助您定义和管理数据流架构。架构定义了数据记录的结构和格式。在 Kafka 事件源映射的背景下，您可以配置架构注册表，以便在 Kafka 消息到达您的 Lambda 函数之前，根据预定义的架构对其结构和格式进行验证。这为您的应用程序增加了一层数据治理，使您能够通过事件筛选高效管理数据格式、确保架构合规性并优化成本。

 此功能适用于所有编程语言，但应考虑以下要点：
+ Powertools for Lambda 提供对 Java、Python 和 TypeScript 的特定支持，保持了与现有 Kafka 开发模式的一致性，并且无需自定义反序列化代码即可让您直接访问业务对象
+ 此功能仅适用于使用预置模式的事件源映射。架构注册表不支持按需模式下的事件源映射。如果您使用的是预配模式并且配置了架构注册表，则除非先移除架构注册表配置，否则无法更改为按需模式。有关更多信息，请参阅 [预调配模式](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)。
+ 每个事件源映射 (ESM) 只能配置一个架构注册表。将架构注册表与 Kafka 事件源一起使用可能会增加 Lambda Event Poller Unit (EPU) 的使用量，这是预置模式的定价维度。

**Topics**
+ [

## 架构注册表选项
](#services-consume-kafka-events-options)
+ [

## Lambda 如何对 Kafka 消息执行架构验证
](#services-consume-kafka-events-how)
+ [

## 配置 Kafka 架构注册表
](#services-consume-kafka-events-config)
+ [

## Avro 和 Protobuf 的筛选
](#services-consume-kafka-events-filtering)
+ [

## 有效载荷格式和反序列化行为
](#services-consume-kafka-events-payload)
+ [

## 在 Lambda 函数中使用反序列化数据
](#services-consume-kafka-events-payload-examples)
+ [

## 架构注册表的身份验证方法
](#services-consume-kafka-events-auth)
+ [

## 架构注册表问题的错误处理和故障排除
](#services-consume-kafka-events-troubleshooting)

## 架构注册表选项
<a name="services-consume-kafka-events-options"></a>

 Lambda 支持以下架构注册表选项：
+ [AWS Glue 架构注册表](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)
+ [Confluent Cloud 架构注册表](https://docs.confluent.io/platform/current/schema-registry/index.html)
+ [自托管式 Confluent 架构注册表](https://docs.confluent.io/platform/current/schema-registry/index.html)

 您的架构注册表支持验证以下数据格式的消息：
+ Apache Avro
+ 协议缓冲区 (Protobuf)
+ JSON 架构 (JSON-SE)

 要使用架构注册表，首先应确保您的事件源映射处于预置模式。当您使用架构注册表时，Lambda 会将有关架构的元数据添加到有效载荷中。有关更多信息，请参阅[有效载荷格式和反序列化行为](#services-consume-kafka-events-payload)。

## Lambda 如何对 Kafka 消息执行架构验证
<a name="services-consume-kafka-events-how"></a>

 配置架构注册表时，Lambda 会对每条 Kafka 消息执行以下步骤：

1. Lambda 会轮询您集群中的 Kafka 记录。

1. Lambda 会根据您架构注册表中的特定架构来验证记录中的选定消息属性。
   + 如果在注册表中找不到与消息关联的架构，Lambda 会将消息发送到带有原因代码 `SCHEMA_NOT_FOUND` 的 DLQ。

1. Lambda 会根据架构注册表配置对消息进行反序列化以验证消息。如果配置了事件筛选，则 Lambda 会根据配置的筛选条件执行筛选。
   + 如果反序列化失败，Lambda 会将带有原因代码 `DESERIALIZATION_ERROR` 的消息发送到 DLQ。如果未配置 DLQ，则 Lambda 会丢弃此消息。

1. 如果消息已通过架构注册表验证，且未按您的筛选条件进行筛选，则 Lambda 会通过该消息来调用您的函数。

 此功能旨在验证已使用与架构注册表集成的 Kafka 客户端生成的消息。我们建议您将 Kafka 生产者配置为使用架构注册表来创建格式正确的消息。

## 配置 Kafka 架构注册表
<a name="services-consume-kafka-events-config"></a>

 通过以下控制台步骤，您可以将 Kafka 架构注册表配置添加到您的事件源映射中。

**要将 Kafka 架构注册表配置添加到您的事件源映射中**

1. 打开 Lambda 控制台的[函数页面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 选择**配置**。

1. 选择**触发器**。

1. 选择要为其配置架构注册表的 Kafka 事件源映射，然后选择**编辑**。

1. 在**事件轮询器配置**下，选择**配置架构注册表**。您的事件源映射必须处于预置模式才能看到此选项。

1. 对于**架构注册表 URI**，请输入您 AWS Glue 架构注册表的 ARN，或者您 Confluent Cloud 架构注册表或自托管式 Confluent 架构注册表的 HTTPS URL。

1. 以下配置步骤将告诉 Lambda 如何访问您的架构注册表。有关更多信息，请参阅 [架构注册表的身份验证方法](#services-consume-kafka-events-auth)。
   + 对于**访问配置类型**，请选择 Lambda 用于访问您架构注册表的身份验证类型。
   + 对于**访问配置 URI**，请输入 Secrets Manager 密钥的 ARN，以便使用您的架构注册表进行身份验证（如果适用）。确保您函数的[执行角色](with-msk-permissions.md)包含正确的权限。

1. 只有当您的架构注册表由私有证书颁发机构 (CA) 或不在 Lambda 信任存储中的证书颁发机构 (CA) 签名时，**加密**字段才适用。如果适用，请提供包含您的注册表用于 TLS 加密的私有 CA 证书的私有密钥。

1. 对于**事件记录格式**，请选择您希望 Lambda 在架构验证后向函数传送记录的方式。有关更多信息，请参阅[有效载荷格式示例](#services-consume-kafka-events-payload)。
   + 如果您选择 **JSON**，Lambda 将以标准 JSON 格式提供您在下面的架构验证属性中选择的属性。对于您未选择的属性，Lambda 会按原样提供这些属性。
   + 如果您选择 **SOURCE**，Lambda 将以原始 SOURCE 格式提供您在下面的架构验证属性中选择的属性。

1. 对于**架构验证属性**，请选择您希望 Lambda 使用您的架构注册表进行验证和反序列化的消息属性。您必须至少选择一个**键**或**值**。如果您选择了 JSON 作为事件记录格式，Lambda 还会在将所选消息属性发送到您的函数之前对其进行反序列化。有关更多信息，请参阅[有效载荷格式和反序列化行为](#services-consume-kafka-events-payload)。

1. 选择**保存**。

 您还可以使用 Lambda API 通过架构注册表配置创建或更新您的事件源映射。以下示例演示了如何使用 AWS CLI 来配置 AWS Glue 或 Confluent 架构注册表，这与 *AWS Lambda API 参考*中的 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) 和 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) API 操作相对应：

**重要**  
如果您要使用 AWS CLI 或 `update-event-source-mapping` API 更新任何架构注册表配置字段，则必须更新架构注册表配置的所有字段。

------
#### [ Create Event Source Mapping ]

```
aws lambda create-event-source-mapping \
  --function-name my-schema-validator-function \
  --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \
  --topics my-kafka-topic \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --amazon-managed-kafka-event-source-mapping '{
      "SchemaRegistryConfig" : {
          "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
          "AccessConfigs": [{
              "Type": "BASIC_AUTH", 
              "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName"
          }],
          "EventRecordFormat": "JSON",
          "SchemaValidationConfigs": [
          { 
              "Attribute": "KEY" 
          },
          { 
              "Attribute": "VALUE" 
          }]
      }
  }'
```

------
#### [ Update AWS Glue Schema Registry ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName",
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Update Confluent Schema Registry with Authentication ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
            "AccessConfigs": [{
                "Type": "BASIC_AUTH", 
                "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName"
            }],
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Update Confluent Schema Registry without Authentication ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Remove Schema Registry Configuration ]

要从事件源映射中删除架构注册表配置，您可以使用 *AWS Lambda API 参考*中的 CLI 命令 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)。

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {}
    }'
```

------

## Avro 和 Protobuf 的筛选
<a name="services-consume-kafka-events-filtering"></a>

 在架构注册表中使用 Avro 或 Protobuf 格式时，您可以对您的 Lambda 函数应用“事件筛选”。架构验证后，筛选模式将应用于您数据的反序列化经典 JSON 表示形式。例如，通过定义包括价格在内的产品详细信息的 Avro 架构，您可以根据价格值筛选消息：

**注意**  
 反序列化时，Avro 会转换为标准 JSON，这意味着它无法直接转换回 Avro 对象。如果您需要转换为 Avro 对象，请改用 SOURCE 格式。  
 对于 Protobuf 的反序列化，生成的 JSON 中的字段名称与模式中定义的字段名称一致，而不是像 Protobuf 通常那样被转换为驼峰式拼写法。创建筛选模式时，请记住这一点。

```
aws lambda create-event-source-mapping \
    --function-name myAvroFunction \
    --topics myAvroTopic \
    --starting-position TRIM_HORIZON \
    --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \
    --schema-registry-config '{
        "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry",
        "EventRecordFormat": "JSON",
        "SchemaValidationConfigs": [
            { 
                "Attribute": "VALUE" 
            }
        ]
    }' \
    --filter-criteria '{
        "Filters": [
            {
                "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }"
            }
        ]
    }'
```

 在此示例中，筛选模式将分析 `value` 对象，匹配 `field_1` 中带 `"value1"` 的消息和 `field_2` 中带 `"value2"` 的消息。在 Lambda 将消息从 Avro 格式转换为 JSON 之后，会根据反序列化的数据对筛选条件进行评估。

 有关事件筛选的更多信息，请参阅 [Lambda 事件筛选](invocation-eventfiltering.md)。

## 有效载荷格式和反序列化行为
<a name="services-consume-kafka-events-payload"></a>

 使用架构注册表时，Lambda 会将最终有效载荷以类似于[常规事件有效载荷](with-msk.md#msk-sample-event)的格式传递给您的函数，并附加一些字段。这些附加字段取决于 `SchemaValidationConfigs` 参数。对于您选择进行验证的每个属性（键或值），Lambda 都会向有效载荷添加相应的架构元数据。

**注意**  
您必须将您的 [aws-lambda-java-events](https://github.com/aws/aws-lambda-java-libs/tree/main/aws-lambda-java-events) 更新到 3.16.0 或更高版本，才能使用架构元数据字段。

 例如，如果您验证了 `value` 字段，Lambda 就会在您的有效载荷中添加一个名为 `valueSchemaMetadata` 的字段。同样，对于 `key` 字段，Lambda 会添加一个名为 `keySchemaMetadata` 的字段。此元数据包含有关用于验证的数据格式和架构 ID 的信息：

```
"valueSchemaMetadata": {
    "dataFormat": "AVRO",
    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
}
```

 该 `EventRecordFormat` 参数可以设置为 `JSON` 或 `SOURCE`，这决定了 Lambda 在将经架构验证的数据传送到您的函数之前如何处理这些数据。每个选项有着不同的处理能力：
+ `JSON` - Lambda 将经过验证的属性反序列化为标准 JSON 格式，使数据可以直接用于支持原生 JSON 的语言。当您不需要保留原始的二进制格式或使用生成的类时，这种格式是比较理想的选择。
+ `SOURCE` - Lambda 将数据的原始二进制格式保留为 Base64 编码的字符串，让您可以直接转换为 Avro 或 Protobuf 对象。当使用强类型语言或需要维护 Avro 或 Protobuf 架构的全部功能时，这种格式是必不可少的。

基于这些格式特征和特定语言的考虑因素，我们建议使用以下格式：


**基于编程语言的推荐格式**  

| 语言 | Avro | Protobuf | JSON | 
| --- | --- | --- | --- | 
| Java | SOURCE | SOURCE | SOURCE | 
| Python | JSON | JSON | JSON | 
| NodeJS | JSON | JSON | JSON | 
| .NET | SOURCE | SOURCE | SOURCE | 
| 其他 | JSON | JSON | JSON | 

以下各节详细描述了这些格式，并提供了每种格式的有效载荷示例。

### JSON 格式
<a name="services-consume-kafka-events-payload-json"></a>

 如果您选择将 `JSON` 作为 `EventRecordFormat`，Lambda 会验证并反序列化您在 `SchemaValidationConfigs` 字段中选择的消息属性（`key` 和/或 `value` 属性）。Lambda 在您的函数中将这些选定的属性作为其标准 JSON 表示形式的 base64 编码字符串提供。

**注意**  
 反序列化时，Avro 会转换为标准 JSON，这意味着它无法直接转换回 Avro 对象。如果您需要转换为 Avro 对象，请改用 SOURCE 格式。  
 对于 Protobuf 的反序列化，生成的 JSON 中的字段名称与模式中定义的字段名称一致，而不是像 Protobuf 通常那样被转换为驼峰式拼写法。创建筛选模式时，请记住这一点。

 下面是一个有效载荷示例，假设您选择 `JSON` 作为 `EventRecordFormat`，`key` 和 `value` 属性作为 `SchemaValidationConfigs`：

```
{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON
            "keySchemaMetadata": {
                "dataFormat": "AVRO",
                "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
            },
            "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON
            "valueSchemaMetadata": {
                "dataFormat": "AVRO",
                "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
            },
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

 在本示例中：
+ `key` 和 `value` 都是反序列化后的 JSON 表示形式的 base64 编码字符串。
+ Lambda 包含 `keySchemaMetadata` 和 `valueSchemaMetadata` 中两个属性的架构元数据。
+ 您的函数可以解码 `key` 和 `value` 字符串，以便访问反序列化的 JSON 数据。

 对于 Python 或 Node.js 等非强类型语言，建议使用 JSON 格式。这些语言原生支持将 JSON 转换为对象。

### SOURCE 格式
<a name="services-consume-kafka-events-payload-source"></a>

 如果您选择 `SOURCE` 作为 `EventRecordFormat`，Lambda 仍会根据架构注册表验证记录，但无需反序列化即可将原始二进制数据传送到您的函数。此二进制数据以原始字节数据的 Base64 编码字符串形式提供，并删除了生产者附加的元数据。因此，您可以直接将原始二进制数据转换为您函数代码中的 Avro 和 Protobuf 对象。我们建议使用 Powertools for AWS Lambda，它会反序列化原始二进制数据，并直接为您提供 Avro 和 Protobuf 对象。

 例如，如果您将 Lambda 配置为同时验证 `key` 和 `value` 属性，但使用了 `SOURCE` 格式，则您的函数将接收到如下所示的有效载荷：

```
{
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1",
    "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
    "records": {
        "mytopic-0": [
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 15,
                "timestamp": 1545084650987,
                "timestampType": "CREATE_TIME",
                "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed
                "keySchemaMetadata": {
                    "dataFormat": "AVRO",
                    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
                },
                "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed
                "valueSchemaMetadata": {
                    "dataFormat": "AVRO",
                    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
                },
                "headers": [
                    {
                        "headerKey": [
                            104,
                            101,
                            97,
                            100,
                            101,
                            114,
                            86,
                            97,
                            108,
                            117,
                            101
                        ]
                    }
                ]
            }
        ]
    }
}
```

 在本示例中：
+ `key` 和 `value` 均包含 Base64 编码字符串形式的原始二进制数据。
+ 您的函数需要使用相应的库来处理反序列化。

 如果您使用的是 AVRO 生成的对象或 Protobuf 生成的对象，尤其是使用 Java 函数时，建议选择 `SOURCE` for `EventRecordFormat`。这是因为 Java 是强类型的，需要针对 Avro 和 Protobuf 格式使用特定的反序列化器。在您的函数代码中，您可以使用自己喜欢的 Avro 或 Protobuf 库来反序列化数据。

## 在 Lambda 函数中使用反序列化数据
<a name="services-consume-kafka-events-payload-examples"></a>

Powertools for AWS Lambda 可根据您使用的格式，在函数代码中帮助您反序列化 Kafka 记录。该实用程序通过处理数据转换和提供即用型对象，简化了 Kafka 记录的使用。

 要在您的函数中使用 Powertools for AWS Lambda，您需要在构建 Lambda 函数时将 Powertools for AWS Lambda 添加为一个层或将其作为依赖项。有关设置说明和更多信息，请参阅您首选语言的 Powertools for AWS Lambda 文档：
+ [Powertools for AWS Lambda (Java)](https://docs.powertools.aws.dev/lambda/java/latest/utilities/kafka/)
+ [Powertools for AWS Lambda (Python)](https://docs.powertools.aws.dev/lambda/python/latest/utilities/kafka/)
+ [Powertools for AWS Lambda (TypeScript)](https://docs.powertools.aws.dev/lambda/typescript/latest/features/kafka/)
+ [Powertools for AWS Lambda (.NET)](https://docs.powertools.aws.dev/lambda/dotnet/utilities/kafka/)

**注意**  
在使用架构注册表集成时，您可以选择 `SOURCE` 或 `JSON` 格式。如下所示，每个选项支持不同的序列化格式：  


| Format | 支持 | 
| --- | --- | 
|  SOURCE  |  Avro 和 Protobuf（使用 Lambda 架构注册表集成）  | 
|  JSON  |  JSON 数据  | 

 在使用 `SOURCE` 或 `JSON` 格式时，您可以使用 Powertools for AWS 来帮助您反序列化您函数代码中的数据。以下是如何处理不同数据格式的示例：

------
#### [ AVRO ]

Java 示例：

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.avro.AvroProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_AVRO)
    public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) {
        for (ConsumerRecord<String, AvroProduct> consumerRecord : records) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            AvroProduct product = consumerRecord.value();
            LOGGER.info("AvroProduct: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }

}
```

Python 示例：

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer
from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

logger = Logger(service="kafkaConsumerPowertools")

value_schema_str = open("customer_profile.avsc", "r").read()

schema_config = SchemaConfig(
value_schema_type="AVRO",
value_schema=value_schema_str)

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript 示例：

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';

import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

const schema = '{   
    "type": "record",   
    "name": "Product",   
    "fields": [     
        { "name": "id", "type": "int" },     
        { "name": "name", "type": "string" },     
        { "name": "price", "type": "double" }   
    ] 
}';

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
            // You can add more processing logic here
        }
    },
    {
        value: {
            type: 'avro',
            schema: schema,
        },
    }
);
```

.NET 示例：

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Avro;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))]

namespace ProtoBufClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------
#### [ PROTOBUF ]

Java 示例：

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.protobuf.ProtobufProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class ProtobufDeserializationFunction
        implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_PROTOBUF)
    public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) {
        for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            ProtobufProduct product = consumerRecord.value();
            LOGGER.info("ProtobufProduct: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }

}
```

Python 示例：

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer

from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

from user_pb2 import User # protobuf generated class

logger = Logger(service="kafkaConsumerPowertools")

schema_config = SchemaConfig(
value_schema_type="PROTOBUF",
value_schema=User)

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript 示例：

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';
import { Product } from './product.generated.js';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
        }
    },
    {
        value: {
            type: 'protobuf',
            schema: Product,
        },
    }
);
```

.NET 示例：

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Protobuf;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))]

namespace ProtoBufClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------
#### [ JSON ]

Java 示例：

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_JSON)
    public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) {
        for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            Product product = consumerRecord.value();
            LOGGER.info("Product: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }
}
```

Python 示例：

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer

from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

logger = Logger(service="kafkaConsumerPowertools")

schema_config = SchemaConfig(value_schema_type="JSON")

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript 示例：

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
            // You can add more processing logic here
        }
    },
    {
        value: {
            type: 'json',
        },
    }
);
```

.NET 示例：

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Json;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))]

namespace JsonClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------

## 架构注册表的身份验证方法
<a name="services-consume-kafka-events-auth"></a>

 要使用架构注册表，Lambda 需要能够安全地访问它。如果您使用的是 AWS Glue 架构注册表，则 Lambda 将依赖于 IAM 身份验证。这意味着，您函数的[执行角色](lambda-intro-execution-role.md)必须具有以下权限才能访问 AWS Glue 注册表：
+ *AWS Glue Web API* 参考中的 [GetRegistry](https://docs.aws.amazon.com/glue/latest/webapi/API_GetRegistry.html) 
+ *AWS Glue Web API 参考*中的 [GetSchemaVersion](https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html) 

所需 IAM 策略的示例：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetRegistry",
                "glue:GetSchemaVersion"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**注意**  
 对于 AWS Glue 架构注册表，如果您为 AWS Glue 注册表提供 `AccessConfigs`，Lambda 将返回一个验证异常。

如果您使用的是 Confluent 架构注册表，您可以为 [KafkaSchemaRegistryAccessConfig](https://docs.aws.amazon.com/lambda/latest/api/API_KafkaSchemaRegistryAccessConfig) 对象的 `Type` 参数选择三种支持的身份验证方法之一：
+ **BASIC\$1AUTH** — Lambda 通过用户名和密码或 API 密钥和 API 密钥身份验证来访问您的注册表。如果您选择此选项，请在 URI 字段中提供包含您凭证的 Secrets Manager ARN。
+ **CLIENT\$1CERTIFICATE\$1TLS\$1AUTH** — Lambda 使用基于客户端证书的双向 TLS 身份验证。要使用此选项，Lambda 需要同时访问证书和私钥。请在 URI 字段中提供包含这些凭证的 Secrets Manager ARN。
+ **NO\$1AUTH** — 公有 CA 证书必须由 Lambda 信任存储中的证书颁发机构 (CA) 签名。对于私有 CA /自签名证书，您可以配置服务器根 CA 证书。要使用此选项，请省略 `AccessConfigs` 参数。

 此外，如果 Lambda 需要访问私有 CA 证书来验证您架构注册表的 TLS 证书，请选择 `SERVER_ROOT_CA_CERT` 作为 `Type`，并在 URI 字段中为证书提供 Secrets Manager ARN。

**注意**  
 要在控制台中配置 `SERVER_ROOT_CA_CERT` 选项，请在**加密**字段中提供包含证书的密钥 ARN。

 架构注册表的身份验证配置与您为 Kafka 集群配置的任何身份验证是分开的。即使它们使用相似的身份验证方法，也必须分别进行配置。

## 架构注册表问题的错误处理和故障排除
<a name="services-consume-kafka-events-troubleshooting"></a>

在 Amazon MSK 事件源中使用架构注册表时，您可能会遇到各种错误。本节提供有关常见问题以及如何解决这些问题的指导。

### 配置错误
<a name="consume-kafka-events-troubleshooting-configuration-errors"></a>

当您进行架构注册表配置时，会发生这些错误。

需要启用预置模式  
**错误消息**：`SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.`  
**解决方案：**通过在 `ProvisionedPollerConfig` 中配置 `MinimumPollers` 参数，为事件源映射启用预置模式。

架构注册表 URL 无效  
**错误消息**：`Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.`  
**解决方案：**为 Confluent 架构注册表提供有效的 HTTPS URL，或者为 AWS Glue 架构注册表提供有效的 ARN。

事件记录格式无效或缺失  
**错误消息**：`EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.`  
**解决方案：**在架构注册表配置中将 SOURCE 或 JSON 指定为 eventRecordFormat。

重复验证属性  
**错误消息**：`Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.`  
**解决方案：**从您的 SchemaValidationConfigs 中删除重复的 KEY 或 VALUE 属性。每种属性类型只能出现一次。

缺少验证配置  
**错误消息**：`SchemaValidationConfigs is a required field for SchemaRegistryConfig.`  
**解决方案：**将 SchemaValidationConfigs 添加到您的配置中，并且至少指定一个验证属性（KEY 或 VALUE）。

### 访问权和权限错误
<a name="consume-kafka-events-troubleshooting-access-errors"></a>

当 Lambda 由于权限或身份验证问题而无法访问架构注册表时，就会发生这些错误。

AWS Glue 架构注册表访问被拒绝  
**错误消息**：`Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.`  
**解决方案：**将所需的权限（`glue:GetRegistry` 和 `glue:GetSchemaVersion`）添加到您函数的执行角色中。

Confluent 架构注册表访问被拒绝  
**错误消息**：`Cannot access Confluent Schema with the provided access configuration.`  
**解决方案：**验证您的身份验证凭证（存储在 Secrets Manager 中）是否正确并具有访问架构注册表所需的权限。

跨账户 AWS Glue 架构注册表  
**错误消息**：`Cross-account Glue Schema Registry ARN not supported.`  
**解决方案：**使用与您的 Lambda 函数位于同一 AWS 账户中的 AWS Glue 架构注册表。

跨区域 AWS Glue 架构注册表  
**错误消息**：`Cross-region Glue Schema Registry ARN not supported.`  
**解决方案：**使用与您的 Lambda 函数位于同一区域中的 AWS Glue 架构注册表。

密钥访问问题  
**错误消息**：`Lambda received InvalidRequestException from Secrets Manager.`  
**解决方案：**验证您的函数执行角色是否具有访问该密钥的权限，并且如果从其他账户访问，该密钥未使用默认的 AWS KMS 密钥进行加密。

### 连接错误
<a name="consume-kafka-events-troubleshooting-connection-errors"></a>

当 Lambda 无法与架构注册表建立连接时，就会发生这些错误。

VPC 连接问题  
**错误消息**：`Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.`  
**解决方案：**配置您的 VPC 网络，以允许通过 AWS PrivateLink、NAT 网关或 VPC 对等来连接架构注册表。

TLS 握手失败  
**错误消息**：`Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.`  
**解决方案：**在 Secrets Manager 中验证您的 CA 证书和客户端证书（适用于 mTLS）是否正确且配置正确。

节流  
**错误消息**：`Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.`  
**解决方案：**提高架构注册表的 API 速率限制或降低来自应用程序的请求速率。

自托管式架构注册表错误  
**错误消息**：`Lambda received an internal server an unexpected error from the provided self-managed schema registry.`  
**解决方案：**检查自托管式架构注册表服务器的运行状况和配置。