在 Lambda 中使用带有 Kafka 事件源的架构注册表 - AWS Lambda

在 Lambda 中使用带有 Kafka 事件源的架构注册表

架构注册表可帮助您定义和管理数据流架构。架构定义了数据记录的结构和格式。在 Kafka 事件源映射的背景下,您可以配置架构注册表,以便在 Kafka 消息到达您的 Lambda 函数之前,根据预定义的架构对其结构和格式进行验证。这为您的应用程序增加了一层数据治理,使您能够通过事件筛选高效管理数据格式、确保架构合规性并优化成本。

此功能适用于所有编程语言,但应考虑以下要点:

  • Powertools for Lambda 提供对 Java、Python 和 TypeScript 的特定支持,保持了与现有 Kafka 开发模式的一致性,并且无需自定义反序列化代码即可让您直接访问业务对象

  • 此功能仅适用于使用预置模式的事件源映射。架构注册表不支持按需模式下的事件源映射。如果您使用的是预配模式并且配置了架构注册表,则除非先移除架构注册表配置,否则无法更改为按需模式。有关更多信息,请参阅 预调配模式

  • 每个事件源映射 (ESM) 只能配置一个架构注册表。将架构注册表与 Kafka 事件源一起使用可能会增加 Lambda Event Poller Unit (EPU) 的使用量,这是预置模式的定价维度。

架构注册表选项

Lambda 支持以下架构注册表选项:

您的架构注册表支持验证以下数据格式的消息:

  • Apache Avro

  • 协议缓冲区 (Protobuf)

  • JSON 架构 (JSON-SE)

要使用架构注册表,首先应确保您的事件源映射处于预置模式。当您使用架构注册表时,Lambda 会将有关架构的元数据添加到有效载荷中。有关更多信息,请参阅有效载荷格式和反序列化行为

Lambda 如何对 Kafka 消息执行架构验证

配置架构注册表时,Lambda 会对每条 Kafka 消息执行以下步骤:

  1. Lambda 会轮询您集群中的 Kafka 记录。

  2. Lambda 会根据您架构注册表中的特定架构来验证记录中的选定消息属性。

    • 如果在注册表中找不到与消息关联的架构,Lambda 会将消息发送到带有原因代码 SCHEMA_NOT_FOUND 的 DLQ。

  3. Lambda 会根据架构注册表配置对消息进行反序列化以验证消息。如果配置了事件筛选,则 Lambda 会根据配置的筛选条件执行筛选。

    • 如果反序列化失败,Lambda 会将带有原因代码 DESERIALIZATION_ERROR 的消息发送到 DLQ。如果未配置 DLQ,则 Lambda 会丢弃此消息。

  4. 如果消息已通过架构注册表验证,且未按您的筛选条件进行筛选,则 Lambda 会通过该消息来调用您的函数。

此功能旨在验证已使用与架构注册表集成的 Kafka 客户端生成的消息。我们建议您将 Kafka 生产者配置为使用架构注册表来创建格式正确的消息。

配置 Kafka 架构注册表

通过以下控制台步骤,您可以将 Kafka 架构注册表配置添加到您的事件源映射中。

要将 Kafka 架构注册表配置添加到您的事件源映射中
  1. 打开 Lambda 控制台的函数页面

  2. 选择配置

  3. 选择触发器

  4. 选择要为其配置架构注册表的 Kafka 事件源映射,然后选择编辑

  5. 事件轮询器配置下,选择配置架构注册表。您的事件源映射必须处于预置模式才能看到此选项。

  6. 对于架构注册表 URI,请输入您 AWS Glue 架构注册表的 ARN,或者您 Confluent Cloud 架构注册表或自托管式 Confluent 架构注册表的 HTTPS URL。

  7. 以下配置步骤将告诉 Lambda 如何访问您的架构注册表。有关更多信息,请参阅 架构注册表的身份验证方法

    • 对于访问配置类型,请选择 Lambda 用于访问您架构注册表的身份验证类型。

    • 对于访问配置 URI,请输入 Secrets Manager 密钥的 ARN,以便使用您的架构注册表进行身份验证(如果适用)。确保您函数的执行角色包含正确的权限。

  8. 只有当您的架构注册表由私有证书颁发机构 (CA) 或不在 Lambda 信任存储中的证书颁发机构 (CA) 签名时,加密字段才适用。如果适用,请提供包含您的注册表用于 TLS 加密的私有 CA 证书的私有密钥。

  9. 对于事件记录格式,请选择您希望 Lambda 在架构验证后向函数传送记录的方式。有关更多信息,请参阅有效载荷格式示例

    • 如果您选择 JSON,Lambda 将以标准 JSON 格式提供您在下面的架构验证属性中选择的属性。对于您未选择的属性,Lambda 会按原样提供这些属性。

    • 如果您选择 SOURCE,Lambda 将以原始 SOURCE 格式提供您在下面的架构验证属性中选择的属性。

  10. 对于架构验证属性,请选择您希望 Lambda 使用您的架构注册表进行验证和反序列化的消息属性。您必须至少选择一个。如果您选择了 JSON 作为事件记录格式,Lambda 还会在将所选消息属性发送到您的函数之前对其进行反序列化。有关更多信息,请参阅有效载荷格式和反序列化行为

  11. 选择保存

您还可以使用 Lambda API 通过架构注册表配置创建或更新您的事件源映射。以下示例演示了如何使用 AWS CLI 来配置 AWS Glue 或 Confluent 架构注册表,这与 AWS Lambda API 参考中的 UpdateEventSourceMappingCreateEventSourceMapping API 操作相对应:

重要

如果您要使用 AWS CLI 或 update-event-source-mapping API 更新任何架构注册表配置字段,则必须更新架构注册表配置的所有字段。

Create Event Source Mapping
aws lambda create-event-source-mapping \ --function-name my-schema-validator-function \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \ --topics my-kafka-topic \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update AWS Glue Schema Registry
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry with Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry without Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Remove Schema Registry Configuration

要从事件源映射中删除架构注册表配置,您可以使用 AWS Lambda API 参考中的 CLI 命令 UpdateEventSourceMapping

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : {} }'

Avro 和 Protobuf 的筛选

在架构注册表中使用 Avro 或 Protobuf 格式时,您可以对您的 Lambda 函数应用“事件筛选”。架构验证后,筛选模式将应用于您数据的反序列化经典 JSON 表示形式。例如,通过定义包括价格在内的产品详细信息的 Avro 架构,您可以根据价格值筛选消息:

注意

反序列化时,Avro 会转换为标准 JSON,这意味着它无法直接转换回 Avro 对象。如果您需要转换为 Avro 对象,请改用 SOURCE 格式。

对于 Protobuf 的反序列化,生成的 JSON 中的字段名称与模式中定义的字段名称一致,而不是像 Protobuf 通常那样被转换为驼峰式拼写法。创建筛选模式时,请记住这一点。

aws lambda create-event-source-mapping \ --function-name myAvroFunction \ --topics myAvroTopic \ --starting-position TRIM_HORIZON \ --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \ --schema-registry-config '{ "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "VALUE" } ] }' \ --filter-criteria '{ "Filters": [ { "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }" } ] }'

在此示例中,筛选模式将分析 value 对象,匹配 field_1 中带 "value1" 的消息和 field_2 中带 "value2" 的消息。在 Lambda 将消息从 Avro 格式转换为 JSON 之后,会根据反序列化的数据对筛选条件进行评估。

有关事件筛选的更多信息,请参阅 Lambda 事件筛选

有效载荷格式和反序列化行为

使用架构注册表时,Lambda 会将最终有效载荷以类似于常规事件有效载荷的格式传递给您的函数,并附加一些字段。这些附加字段取决于 SchemaValidationConfigs 参数。对于您选择进行验证的每个属性(键或值),Lambda 都会向有效载荷添加相应的架构元数据。

注意

您必须将您的 aws-lambda-java-events 更新到 3.16.0 或更高版本,才能使用架构元数据字段。

例如,如果您验证了 value 字段,Lambda 就会在您的有效载荷中添加一个名为 valueSchemaMetadata 的字段。同样,对于 key 字段,Lambda 会添加一个名为 keySchemaMetadata 的字段。此元数据包含有关用于验证的数据格式和架构 ID 的信息:

"valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }

EventRecordFormat 参数可以设置为 JSONSOURCE,这决定了 Lambda 在将经架构验证的数据传送到您的函数之前如何处理这些数据。每个选项有着不同的处理能力:

  • JSON - Lambda 将经过验证的属性反序列化为标准 JSON 格式,使数据可以直接用于支持原生 JSON 的语言。当您不需要保留原始的二进制格式或使用生成的类时,这种格式是比较理想的选择。

  • SOURCE - Lambda 将数据的原始二进制格式保留为 Base64 编码的字符串,让您可以直接转换为 Avro 或 Protobuf 对象。当使用强类型语言或需要维护 Avro 或 Protobuf 架构的全部功能时,这种格式是必不可少的。

基于这些格式特征和特定语言的考虑因素,我们建议使用以下格式:

基于编程语言的推荐格式
语言 Avro Protobuf JSON
Java SOURCE SOURCE SOURCE
Python JSON JSON JSON
NodeJS JSON JSON JSON
.NET SOURCE SOURCE SOURCE
其他 JSON JSON JSON

以下各节详细描述了这些格式,并提供了每种格式的有效载荷示例。

JSON 格式

如果您选择将 JSON 作为 EventRecordFormat,Lambda 会验证并反序列化您在 SchemaValidationConfigs 字段中选择的消息属性(key 和/或 value 属性)。Lambda 在您的函数中将这些选定的属性作为其标准 JSON 表示形式的 base64 编码字符串提供。

注意

反序列化时,Avro 会转换为标准 JSON,这意味着它无法直接转换回 Avro 对象。如果您需要转换为 Avro 对象,请改用 SOURCE 格式。

对于 Protobuf 的反序列化,生成的 JSON 中的字段名称与模式中定义的字段名称一致,而不是像 Protobuf 通常那样被转换为驼峰式拼写法。创建筛选模式时,请记住这一点。

下面是一个有效载荷示例,假设您选择 JSON 作为 EventRecordFormatkeyvalue 属性作为 SchemaValidationConfigs

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

在本示例中:

  • keyvalue 都是反序列化后的 JSON 表示形式的 base64 编码字符串。

  • Lambda 包含 keySchemaMetadatavalueSchemaMetadata 中两个属性的架构元数据。

  • 您的函数可以解码 keyvalue 字符串,以便访问反序列化的 JSON 数据。

对于 Python 或 Node.js 等非强类型语言,建议使用 JSON 格式。这些语言原生支持将 JSON 转换为对象。

SOURCE 格式

如果您选择 SOURCE 作为 EventRecordFormat,Lambda 仍会根据架构注册表验证记录,但无需反序列化即可将原始二进制数据传送到您的函数。此二进制数据以原始字节数据的 Base64 编码字符串形式提供,并删除了生产者附加的元数据。因此,您可以直接将原始二进制数据转换为您函数代码中的 Avro 和 Protobuf 对象。我们建议使用 Powertools for AWS Lambda,它会反序列化原始二进制数据,并直接为您提供 Avro 和 Protobuf 对象。

例如,如果您将 Lambda 配置为同时验证 keyvalue 属性,但使用了 SOURCE 格式,则您的函数将接收到如下所示的有效载荷:

{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "mytopic-0": [ { "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

在本示例中:

  • keyvalue 均包含 Base64 编码字符串形式的原始二进制数据。

  • 您的函数需要使用相应的库来处理反序列化。

如果您使用的是 AVRO 生成的对象或 Protobuf 生成的对象,尤其是使用 Java 函数时,建议选择 SOURCE for EventRecordFormat。这是因为 Java 是强类型的,需要针对 Avro 和 Protobuf 格式使用特定的反序列化器。在您的函数代码中,您可以使用自己喜欢的 Avro 或 Protobuf 库来反序列化数据。

在 Lambda 函数中使用反序列化数据

Powertools for AWS Lambda 可根据您使用的格式,在函数代码中帮助您反序列化 Kafka 记录。该实用程序通过处理数据转换和提供即用型对象,简化了 Kafka 记录的使用。

要在您的函数中使用 Powertools for AWS Lambda,您需要在构建 Lambda 函数时将 Powertools for AWS Lambda 添加为一个层或将其作为依赖项。有关设置说明和更多信息,请参阅您首选语言的 Powertools for AWS Lambda 文档:

注意

在使用架构注册表集成时,您可以选择 SOURCEJSON 格式。如下所示,每个选项支持不同的序列化格式:

格式 支持

SOURCE

Avro 和 Protobuf(使用 Lambda 架构注册表集成)

JSON

JSON 数据

在使用 SOURCEJSON 格式时,您可以使用 Powertools for AWS 来帮助您反序列化您函数代码中的数据。以下是如何处理不同数据格式的示例:

AVRO

Java 示例:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.avro.AvroProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_AVRO) public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) { for (ConsumerRecord<String, AvroProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); AvroProduct product = consumerRecord.value(); LOGGER.info("AvroProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Python 示例:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") value_schema_str = open("customer_profile.avsc", "r").read() schema_config = SchemaConfig( value_schema_type="AVRO", value_schema=value_schema_str) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript 示例:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; const schema = '{ "type": "record", "name": "Product", "fields": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "price", "type": "double" } ] }'; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'avro', schema: schema, }, } );

.NET 示例:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Avro; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
PROTOBUF

Java 示例:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.protobuf.ProtobufProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class ProtobufDeserializationFunction implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_PROTOBUF) public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) { for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); ProtobufProduct product = consumerRecord.value(); LOGGER.info("ProtobufProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Python 示例:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger from user_pb2 import User # protobuf generated class logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig( value_schema_type="PROTOBUF", value_schema=User) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript 示例:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; import { Product } from './product.generated.js'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); } }, { value: { type: 'protobuf', schema: Product, }, } );

.NET 示例:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Protobuf; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
JSON

Java 示例:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_JSON) public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) { for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) { LOGGER.info("ConsumerRecord: {}", consumerRecord); Product product = consumerRecord.value(); LOGGER.info("Product: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Python 示例:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig(value_schema_type="JSON") @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript 示例:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'json', }, } );

.NET 示例:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Json; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))] namespace JsonClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }

架构注册表的身份验证方法

要使用架构注册表,Lambda 需要能够安全地访问它。如果您使用的是 AWS Glue 架构注册表,则 Lambda 将依赖于 IAM 身份验证。这意味着,您函数的执行角色必须具有以下权限才能访问 AWS Glue 注册表:

所需 IAM 策略的示例:

JSON
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:GetRegistry", "glue:GetSchemaVersion" ], "Resource": [ "*" ] } ] }
注意

对于 AWS Glue 架构注册表,如果您为 AWS Glue 注册表提供 AccessConfigs,Lambda 将返回一个验证异常。

如果您使用的是 Confluent 架构注册表,您可以为 KafkaSchemaRegistryAccessConfig 对象的 Type 参数选择三种支持的身份验证方法之一:

  • BASIC_AUTH — Lambda 通过用户名和密码或 API 密钥和 API 密钥身份验证来访问您的注册表。如果您选择此选项,请在 URI 字段中提供包含您凭证的 Secrets Manager ARN。

  • CLIENT_CERTIFICATE_TLS_AUTH — Lambda 使用基于客户端证书的双向 TLS 身份验证。要使用此选项,Lambda 需要同时访问证书和私钥。请在 URI 字段中提供包含这些凭证的 Secrets Manager ARN。

  • NO_AUTH — 公有 CA 证书必须由 Lambda 信任存储中的证书颁发机构 (CA) 签名。对于私有 CA /自签名证书,您可以配置服务器根 CA 证书。要使用此选项,请省略 AccessConfigs 参数。

此外,如果 Lambda 需要访问私有 CA 证书来验证您架构注册表的 TLS 证书,请选择 SERVER_ROOT_CA_CERT 作为 Type,并在 URI 字段中为证书提供 Secrets Manager ARN。

注意

要在控制台中配置 SERVER_ROOT_CA_CERT 选项,请在加密字段中提供包含证书的密钥 ARN。

架构注册表的身份验证配置与您为 Kafka 集群配置的任何身份验证是分开的。即使它们使用相似的身份验证方法,也必须分别进行配置。

架构注册表问题的错误处理和故障排除

在 Amazon MSK 事件源中使用架构注册表时,您可能会遇到各种错误。本节提供有关常见问题以及如何解决这些问题的指导。

配置错误

当您进行架构注册表配置时,会发生这些错误。

需要启用预置模式

错误消息SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.

解决方案:通过在 ProvisionedPollerConfig 中配置 MinimumPollers 参数,为事件源映射启用预置模式。

架构注册表 URL 无效

错误消息Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.

解决方案:为 Confluent 架构注册表提供有效的 HTTPS URL,或者为 AWS Glue 架构注册表提供有效的 ARN。

事件记录格式无效或缺失

错误消息EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.

解决方案:在架构注册表配置中将 SOURCE 或 JSON 指定为 eventRecordFormat。

重复验证属性

错误消息Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.

解决方案:从您的 SchemaValidationConfigs 中删除重复的 KEY 或 VALUE 属性。每种属性类型只能出现一次。

缺少验证配置

错误消息SchemaValidationConfigs is a required field for SchemaRegistryConfig.

解决方案:将 SchemaValidationConfigs 添加到您的配置中,并且至少指定一个验证属性(KEY 或 VALUE)。

访问权和权限错误

当 Lambda 由于权限或身份验证问题而无法访问架构注册表时,就会发生这些错误。

AWS Glue 架构注册表访问被拒绝

错误消息Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.

解决方案:将所需的权限(glue:GetRegistryglue:GetSchemaVersion)添加到您函数的执行角色中。

Confluent 架构注册表访问被拒绝

错误消息Cannot access Confluent Schema with the provided access configuration.

解决方案:验证您的身份验证凭证(存储在 Secrets Manager 中)是否正确并具有访问架构注册表所需的权限。

跨账户 AWS Glue 架构注册表

错误消息Cross-account Glue Schema Registry ARN not supported.

解决方案:使用与您的 Lambda 函数位于同一 AWS 账户中的 AWS Glue 架构注册表。

跨区域 AWS Glue 架构注册表

错误消息Cross-region Glue Schema Registry ARN not supported.

解决方案:使用与您的 Lambda 函数位于同一区域中的 AWS Glue 架构注册表。

密钥访问问题

错误消息Lambda received InvalidRequestException from Secrets Manager.

解决方案:验证您的函数执行角色是否具有访问该密钥的权限,并且如果从其他账户访问,该密钥未使用默认的 AWS KMS 密钥进行加密。

连接错误

当 Lambda 无法与架构注册表建立连接时,就会发生这些错误。

VPC 连接问题

错误消息Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.

解决方案:配置您的 VPC 网络,以允许通过 AWS PrivateLink、NAT 网关或 VPC 对等来连接架构注册表。

TLS 握手失败

错误消息Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.

解决方案:在 Secrets Manager 中验证您的 CA 证书和客户端证书(适用于 mTLS)是否正确且配置正确。

节流

错误消息Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.

解决方案:提高架构注册表的 API 速率限制或降低来自应用程序的请求速率。

自托管式架构注册表错误

错误消息Lambda received an internal server an unexpected error from the provided self-managed schema registry.

解决方案:检查自托管式架构注册表服务器的运行状况和配置。