在 Lambda 中使用带有 Kafka 事件源的架构注册表
架构注册表可帮助您定义和管理数据流架构。架构定义了数据记录的结构和格式。在 Kafka 事件源映射的背景下,您可以配置架构注册表,以便在 Kafka 消息到达您的 Lambda 函数之前,根据预定义的架构对其结构和格式进行验证。这为您的应用程序增加了一层数据治理,使您能够通过事件筛选高效管理数据格式、确保架构合规性并优化成本。
此功能适用于所有编程语言,但应考虑以下要点:
Powertools for Lambda 提供对 Java、Python 和 TypeScript 的特定支持,保持了与现有 Kafka 开发模式的一致性,并且无需自定义反序列化代码即可让您直接访问业务对象
此功能仅适用于使用预置模式的事件源映射。架构注册表不支持按需模式下的事件源映射。如果您使用的是预配模式并且配置了架构注册表,则除非先移除架构注册表配置,否则无法更改为按需模式。有关更多信息,请参阅 预调配模式。
每个事件源映射 (ESM) 只能配置一个架构注册表。将架构注册表与 Kafka 事件源一起使用可能会增加 Lambda Event Poller Unit (EPU) 的使用量,这是预置模式的定价维度。
主题
架构注册表选项
Lambda 支持以下架构注册表选项:
您的架构注册表支持验证以下数据格式的消息:
-
Apache Avro
-
协议缓冲区 (Protobuf)
-
JSON 架构 (JSON-SE)
要使用架构注册表,首先应确保您的事件源映射处于预置模式。当您使用架构注册表时,Lambda 会将有关架构的元数据添加到有效载荷中。有关更多信息,请参阅有效载荷格式和反序列化行为。
Lambda 如何对 Kafka 消息执行架构验证
配置架构注册表时,Lambda 会对每条 Kafka 消息执行以下步骤:
-
Lambda 会轮询您集群中的 Kafka 记录。
-
Lambda 会根据您架构注册表中的特定架构来验证记录中的选定消息属性。
-
如果在注册表中找不到与消息关联的架构,Lambda 会将消息发送到带有原因代码
SCHEMA_NOT_FOUND
的 DLQ。
-
-
Lambda 会根据架构注册表配置对消息进行反序列化以验证消息。如果配置了事件筛选,则 Lambda 会根据配置的筛选条件执行筛选。
-
如果反序列化失败,Lambda 会将带有原因代码
DESERIALIZATION_ERROR
的消息发送到 DLQ。如果未配置 DLQ,则 Lambda 会丢弃此消息。
-
-
如果消息已通过架构注册表验证,且未按您的筛选条件进行筛选,则 Lambda 会通过该消息来调用您的函数。
此功能旨在验证已使用与架构注册表集成的 Kafka 客户端生成的消息。我们建议您将 Kafka 生产者配置为使用架构注册表来创建格式正确的消息。
配置 Kafka 架构注册表
通过以下控制台步骤,您可以将 Kafka 架构注册表配置添加到您的事件源映射中。
要将 Kafka 架构注册表配置添加到您的事件源映射中
-
打开 Lambda 控制台的函数页面
。 -
选择配置。
-
选择触发器。
-
选择要为其配置架构注册表的 Kafka 事件源映射,然后选择编辑。
-
在事件轮询器配置下,选择配置架构注册表。您的事件源映射必须处于预置模式才能看到此选项。
-
对于架构注册表 URI,请输入您 AWS Glue 架构注册表的 ARN,或者您 Confluent Cloud 架构注册表或自托管式 Confluent 架构注册表的 HTTPS URL。
-
以下配置步骤将告诉 Lambda 如何访问您的架构注册表。有关更多信息,请参阅 架构注册表的身份验证方法。
-
对于访问配置类型,请选择 Lambda 用于访问您架构注册表的身份验证类型。
-
对于访问配置 URI,请输入 Secrets Manager 密钥的 ARN,以便使用您的架构注册表进行身份验证(如果适用)。确保您函数的执行角色包含正确的权限。
-
-
只有当您的架构注册表由私有证书颁发机构 (CA) 或不在 Lambda 信任存储中的证书颁发机构 (CA) 签名时,加密字段才适用。如果适用,请提供包含您的注册表用于 TLS 加密的私有 CA 证书的私有密钥。
-
对于事件记录格式,请选择您希望 Lambda 在架构验证后向函数传送记录的方式。有关更多信息,请参阅有效载荷格式示例。
-
如果您选择 JSON,Lambda 将以标准 JSON 格式提供您在下面的架构验证属性中选择的属性。对于您未选择的属性,Lambda 会按原样提供这些属性。
-
如果您选择 SOURCE,Lambda 将以原始 SOURCE 格式提供您在下面的架构验证属性中选择的属性。
-
-
对于架构验证属性,请选择您希望 Lambda 使用您的架构注册表进行验证和反序列化的消息属性。您必须至少选择一个键或值。如果您选择了 JSON 作为事件记录格式,Lambda 还会在将所选消息属性发送到您的函数之前对其进行反序列化。有关更多信息,请参阅有效载荷格式和反序列化行为。
-
选择保存。
您还可以使用 Lambda API 通过架构注册表配置创建或更新您的事件源映射。以下示例演示了如何使用 AWS CLI 来配置 AWS Glue 或 Confluent 架构注册表,这与 AWS Lambda API 参考中的 UpdateEventSourceMapping 和 CreateEventSourceMapping API 操作相对应:
重要
如果您要使用 AWS CLI 或 update-event-source-mapping
API 更新任何架构注册表配置字段,则必须更新架构注册表配置的所有字段。
Avro 和 Protobuf 的筛选
在架构注册表中使用 Avro 或 Protobuf 格式时,您可以对您的 Lambda 函数应用“事件筛选”。架构验证后,筛选模式将应用于您数据的反序列化经典 JSON 表示形式。例如,通过定义包括价格在内的产品详细信息的 Avro 架构,您可以根据价格值筛选消息:
注意
反序列化时,Avro 会转换为标准 JSON,这意味着它无法直接转换回 Avro 对象。如果您需要转换为 Avro 对象,请改用 SOURCE 格式。
对于 Protobuf 的反序列化,生成的 JSON 中的字段名称与模式中定义的字段名称一致,而不是像 Protobuf 通常那样被转换为驼峰式拼写法。创建筛选模式时,请记住这一点。
aws lambda create-event-source-mapping \ --function-name myAvroFunction \ --topics myAvroTopic \ --starting-position TRIM_HORIZON \ --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \ --schema-registry-config '{ "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "VALUE" } ] }' \ --filter-criteria '{ "Filters": [ { "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }" } ] }'
在此示例中,筛选模式将分析 value
对象,匹配 field_1
中带 "value1"
的消息和 field_2
中带 "value2"
的消息。在 Lambda 将消息从 Avro 格式转换为 JSON 之后,会根据反序列化的数据对筛选条件进行评估。
有关事件筛选的更多信息,请参阅 Lambda 事件筛选。
有效载荷格式和反序列化行为
使用架构注册表时,Lambda 会将最终有效载荷以类似于常规事件有效载荷的格式传递给您的函数,并附加一些字段。这些附加字段取决于 SchemaValidationConfigs
参数。对于您选择进行验证的每个属性(键或值),Lambda 都会向有效载荷添加相应的架构元数据。
注意
您必须将您的 aws-lambda-java-events
例如,如果您验证了 value
字段,Lambda 就会在您的有效载荷中添加一个名为 valueSchemaMetadata
的字段。同样,对于 key
字段,Lambda 会添加一个名为 keySchemaMetadata
的字段。此元数据包含有关用于验证的数据格式和架构 ID 的信息:
"valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }
该 EventRecordFormat
参数可以设置为 JSON
或 SOURCE
,这决定了 Lambda 在将经架构验证的数据传送到您的函数之前如何处理这些数据。每个选项有着不同的处理能力:
-
JSON
- Lambda 将经过验证的属性反序列化为标准 JSON 格式,使数据可以直接用于支持原生 JSON 的语言。当您不需要保留原始的二进制格式或使用生成的类时,这种格式是比较理想的选择。 -
SOURCE
- Lambda 将数据的原始二进制格式保留为 Base64 编码的字符串,让您可以直接转换为 Avro 或 Protobuf 对象。当使用强类型语言或需要维护 Avro 或 Protobuf 架构的全部功能时,这种格式是必不可少的。
基于这些格式特征和特定语言的考虑因素,我们建议使用以下格式:
语言 | Avro | Protobuf | JSON |
---|---|---|---|
Java | SOURCE | SOURCE | SOURCE |
Python | JSON | JSON | JSON |
NodeJS | JSON | JSON | JSON |
.NET | SOURCE | SOURCE | SOURCE |
其他 | JSON | JSON | JSON |
以下各节详细描述了这些格式,并提供了每种格式的有效载荷示例。
JSON 格式
如果您选择将 JSON
作为 EventRecordFormat
,Lambda 会验证并反序列化您在 SchemaValidationConfigs
字段中选择的消息属性(key
和/或 value
属性)。Lambda 在您的函数中将这些选定的属性作为其标准 JSON 表示形式的 base64 编码字符串提供。
注意
反序列化时,Avro 会转换为标准 JSON,这意味着它无法直接转换回 Avro 对象。如果您需要转换为 Avro 对象,请改用 SOURCE 格式。
对于 Protobuf 的反序列化,生成的 JSON 中的字段名称与模式中定义的字段名称一致,而不是像 Protobuf 通常那样被转换为驼峰式拼写法。创建筛选模式时,请记住这一点。
下面是一个有效载荷示例,假设您选择 JSON
作为 EventRecordFormat
,key
和 value
属性作为 SchemaValidationConfigs
:
{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }
在本示例中:
-
key
和value
都是反序列化后的 JSON 表示形式的 base64 编码字符串。 -
Lambda 包含
keySchemaMetadata
和valueSchemaMetadata
中两个属性的架构元数据。 -
您的函数可以解码
key
和value
字符串,以便访问反序列化的 JSON 数据。
对于 Python 或 Node.js 等非强类型语言,建议使用 JSON 格式。这些语言原生支持将 JSON 转换为对象。
SOURCE 格式
如果您选择 SOURCE
作为 EventRecordFormat
,Lambda 仍会根据架构注册表验证记录,但无需反序列化即可将原始二进制数据传送到您的函数。此二进制数据以原始字节数据的 Base64 编码字符串形式提供,并删除了生产者附加的元数据。因此,您可以直接将原始二进制数据转换为您函数代码中的 Avro 和 Protobuf 对象。我们建议使用 Powertools for AWS Lambda,它会反序列化原始二进制数据,并直接为您提供 Avro 和 Protobuf 对象。
例如,如果您将 Lambda 配置为同时验证 key
和 value
属性,但使用了 SOURCE
格式,则您的函数将接收到如下所示的有效载荷:
{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "mytopic-0": [ { "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }
在本示例中:
-
key
和value
均包含 Base64 编码字符串形式的原始二进制数据。 -
您的函数需要使用相应的库来处理反序列化。
如果您使用的是 AVRO 生成的对象或 Protobuf 生成的对象,尤其是使用 Java 函数时,建议选择 SOURCE
for EventRecordFormat
。这是因为 Java 是强类型的,需要针对 Avro 和 Protobuf 格式使用特定的反序列化器。在您的函数代码中,您可以使用自己喜欢的 Avro 或 Protobuf 库来反序列化数据。
在 Lambda 函数中使用反序列化数据
Powertools for AWS Lambda 可根据您使用的格式,在函数代码中帮助您反序列化 Kafka 记录。该实用程序通过处理数据转换和提供即用型对象,简化了 Kafka 记录的使用。
要在您的函数中使用 Powertools for AWS Lambda,您需要在构建 Lambda 函数时将 Powertools for AWS Lambda 添加为一个层或将其作为依赖项。有关设置说明和更多信息,请参阅您首选语言的 Powertools for AWS Lambda 文档:
注意
在使用架构注册表集成时,您可以选择 SOURCE
或 JSON
格式。如下所示,每个选项支持不同的序列化格式:
格式 | 支持 |
---|---|
SOURCE |
Avro 和 Protobuf(使用 Lambda 架构注册表集成) |
JSON |
JSON 数据 |
在使用 SOURCE
或 JSON
格式时,您可以使用 Powertools for AWS 来帮助您反序列化您函数代码中的数据。以下是如何处理不同数据格式的示例:
架构注册表的身份验证方法
要使用架构注册表,Lambda 需要能够安全地访问它。如果您使用的是 AWS Glue 架构注册表,则 Lambda 将依赖于 IAM 身份验证。这意味着,您函数的执行角色必须具有以下权限才能访问 AWS Glue 注册表:
-
AWS Glue Web API 参考中的 GetRegistry
-
AWS Glue Web API 参考中的 GetSchemaVersion
所需 IAM 策略的示例:
注意
对于 AWS Glue 架构注册表,如果您为 AWS Glue 注册表提供 AccessConfigs
,Lambda 将返回一个验证异常。
如果您使用的是 Confluent 架构注册表,您可以为 KafkaSchemaRegistryAccessConfig 对象的 Type
参数选择三种支持的身份验证方法之一:
-
BASIC_AUTH — Lambda 通过用户名和密码或 API 密钥和 API 密钥身份验证来访问您的注册表。如果您选择此选项,请在 URI 字段中提供包含您凭证的 Secrets Manager ARN。
-
CLIENT_CERTIFICATE_TLS_AUTH — Lambda 使用基于客户端证书的双向 TLS 身份验证。要使用此选项,Lambda 需要同时访问证书和私钥。请在 URI 字段中提供包含这些凭证的 Secrets Manager ARN。
-
NO_AUTH — 公有 CA 证书必须由 Lambda 信任存储中的证书颁发机构 (CA) 签名。对于私有 CA /自签名证书,您可以配置服务器根 CA 证书。要使用此选项,请省略
AccessConfigs
参数。
此外,如果 Lambda 需要访问私有 CA 证书来验证您架构注册表的 TLS 证书,请选择 SERVER_ROOT_CA_CERT
作为 Type
,并在 URI 字段中为证书提供 Secrets Manager ARN。
注意
要在控制台中配置 SERVER_ROOT_CA_CERT
选项,请在加密字段中提供包含证书的密钥 ARN。
架构注册表的身份验证配置与您为 Kafka 集群配置的任何身份验证是分开的。即使它们使用相似的身份验证方法,也必须分别进行配置。
架构注册表问题的错误处理和故障排除
在 Amazon MSK 事件源中使用架构注册表时,您可能会遇到各种错误。本节提供有关常见问题以及如何解决这些问题的指导。
配置错误
当您进行架构注册表配置时,会发生这些错误。
- 需要启用预置模式
-
错误消息:
SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.
解决方案:通过在
ProvisionedPollerConfig
中配置MinimumPollers
参数,为事件源映射启用预置模式。 - 架构注册表 URL 无效
-
错误消息:
Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.
解决方案:为 Confluent 架构注册表提供有效的 HTTPS URL,或者为 AWS Glue 架构注册表提供有效的 ARN。
- 事件记录格式无效或缺失
-
错误消息:
EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.
解决方案:在架构注册表配置中将 SOURCE 或 JSON 指定为 eventRecordFormat。
- 重复验证属性
-
错误消息:
Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.
解决方案:从您的 SchemaValidationConfigs 中删除重复的 KEY 或 VALUE 属性。每种属性类型只能出现一次。
- 缺少验证配置
-
错误消息:
SchemaValidationConfigs is a required field for SchemaRegistryConfig.
解决方案:将 SchemaValidationConfigs 添加到您的配置中,并且至少指定一个验证属性(KEY 或 VALUE)。
访问权和权限错误
当 Lambda 由于权限或身份验证问题而无法访问架构注册表时,就会发生这些错误。
- AWS Glue 架构注册表访问被拒绝
-
错误消息:
Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.
解决方案:将所需的权限(
glue:GetRegistry
和glue:GetSchemaVersion
)添加到您函数的执行角色中。 - Confluent 架构注册表访问被拒绝
-
错误消息:
Cannot access Confluent Schema with the provided access configuration.
解决方案:验证您的身份验证凭证(存储在 Secrets Manager 中)是否正确并具有访问架构注册表所需的权限。
- 跨账户 AWS Glue 架构注册表
-
错误消息:
Cross-account Glue Schema Registry ARN not supported.
解决方案:使用与您的 Lambda 函数位于同一 AWS 账户中的 AWS Glue 架构注册表。
- 跨区域 AWS Glue 架构注册表
-
错误消息:
Cross-region Glue Schema Registry ARN not supported.
解决方案:使用与您的 Lambda 函数位于同一区域中的 AWS Glue 架构注册表。
- 密钥访问问题
-
错误消息:
Lambda received InvalidRequestException from Secrets Manager.
解决方案:验证您的函数执行角色是否具有访问该密钥的权限,并且如果从其他账户访问,该密钥未使用默认的 AWS KMS 密钥进行加密。
连接错误
当 Lambda 无法与架构注册表建立连接时,就会发生这些错误。
- VPC 连接问题
-
错误消息:
Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.
解决方案:配置您的 VPC 网络,以允许通过 AWS PrivateLink、NAT 网关或 VPC 对等来连接架构注册表。
- TLS 握手失败
-
错误消息:
Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.
解决方案:在 Secrets Manager 中验证您的 CA 证书和客户端证书(适用于 mTLS)是否正确且配置正确。
- 节流
-
错误消息:
Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.
解决方案:提高架构注册表的 API 速率限制或降低来自应用程序的请求速率。
- 自托管式架构注册表错误
-
错误消息:
Lambda received an internal server an unexpected error from the provided self-managed schema registry.
解决方案:检查自托管式架构注册表服务器的运行状况和配置。