在 Lambda 中使用結構描述登錄檔搭配 Kafka 事件來源 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Lambda 中使用結構描述登錄檔搭配 Kafka 事件來源

結構描述登錄檔可協助您定義和管理資料串流結構描述。結構描述定義資料記錄的結構和格式。在 Kafka 事件來源映射的內容中,您可以設定結構描述登錄檔,在 Kafka 訊息到達 Lambda 函數之前,針對預先定義的結構描述驗證其結構和格式。這會為您的應用程式新增資料控管層,並可讓您有效率地管理資料格式、確保結構描述合規,以及透過事件篩選來最佳化成本。

此功能適用於所有程式設計語言,但請考慮以下要點:

  • Powertools for Lambda 提供 Java、Python 和 TypeScript 的特定支援,維持與現有 Kafka 開發模式的一致性,並允許在沒有自訂還原序列化程式碼的情況下直接存取商業物件

  • 此功能僅適用於使用佈建模式的事件來源映射。結構描述登錄檔不支援隨需模式中的事件來源映射。如果您使用佈建模式,且已設定結構描述登錄檔,則無法變更為隨需模式,除非您先移除結構描述登錄檔組態。如需詳細資訊,請參閱佈建模式

  • 每個事件來源映射 (ESM) 只能設定一個結構描述登錄檔。搭配 Kafka 事件來源使用結構描述登錄檔可能會增加 Lambda 事件輪詢單元 (EPU) 用量,這是佈建模式的定價維度。

結構描述登錄檔選項

Lambda 支援下列結構描述登錄選項:

您的結構描述登錄檔支援以下列資料格式驗證訊息:

  • Apache Avro

  • 通訊協定緩衝區 (Protobuf)

  • JSON 結構描述 (JSON-SE)

若要使用結構描述登錄檔,請先確認您的事件來源映射處於佈建模式。當您使用結構描述登錄檔時,Lambda 會將結構描述的相關中繼資料新增至承載。如需詳細資訊,請參閱承載格式和還原序列化行為

Lambda 如何對 Kafka 訊息執行結構描述驗證

當您設定結構描述登錄檔時,Lambda 會為每個 Kafka 訊息執行下列步驟:

  1. Lambda 會從叢集輪詢 Kafka 記錄。

  2. Lambda 會根據結構描述登錄檔中的特定結構描述,驗證記錄中選取的訊息屬性。

    • 如果在登錄檔中找不到與訊息相關聯的結構描述,Lambda 會將訊息傳送至原因碼為 的 DLQSCHEMA_NOT_FOUND

  3. Lambda 會根據結構描述登錄檔組態還原序列化訊息,以驗證訊息。如果已設定事件篩選,則 Lambda 會根據設定的篩選條件執行篩選。

    • 如果還原序列化失敗,Lambda 會將訊息傳送至原因碼為 的 DLQDESERIALIZATION_ERROR。如果未設定 DLQ,Lambda 會捨棄訊息。

  4. 如果訊息是由結構描述登錄檔驗證,而且未依篩選條件篩選出,Lambda 會使用 訊息叫用您的函數。

此功能旨在驗證已使用與結構描述登錄整合的 Kafka 用戶端產生的訊息。我們建議您將 Kafka 生產者設定為與您的結構描述登錄檔搭配使用,以建立格式正確的訊息。

設定 Kafka 結構描述登錄檔

下列主控台步驟會將 Kafka 結構描述登錄檔組態新增至您的事件來源映射。

將 Kafka 結構描述登錄檔組態新增至事件來源映射 (主控台)
  1. 開啟 Lambda 主控台中的函數頁面

  2. 選擇 Configuration (組態)

  3. 選擇觸發。

  4. 選取您要為其設定結構描述登錄檔的 Kafka 事件來源映射,然後選擇編輯

  5. 事件輪詢器組態下,選擇設定結構描述登錄檔。您的事件來源映射必須處於佈建模式,才能查看此選項。

  6. 針對結構描述登錄 URI,輸入 AWS Glue 結構描述登錄檔的 ARN,或 Confluent Cloud 結構描述登錄檔的 HTTPS URL 或自我管理的 Confluent 結構描述登錄檔。

  7. 下列組態步驟會告知 Lambda 如何存取您的結構描述登錄檔。如需詳細資訊,請參閱結構描述登錄檔的身分驗證方法

    • 針對存取組態類型,選擇 Lambda 用來存取結構描述登錄檔的身分驗證類型。

    • 針對存取組態 URI,輸入 Secrets Manager 秘密的 ARN,以驗證結構描述登錄檔,如適用。請確定函數的執行角色包含正確的許可。

  8. 只有在您的結構描述登錄檔由私有憑證授權單位 (CA) 或不在 Lambda 信任存放區中的憑證授權單位 (CA) 簽署時,加密欄位才適用。如果適用,請提供私密金鑰,其中包含結構描述登錄檔用於 TLS 加密的私有 CA 憑證。

  9. 針對事件記錄格式,選擇您希望 Lambda 在結構描述驗證後交付記錄的方式。如需詳細資訊,請參閱承載格式範例

    • 如果您選擇 JSON,Lambda 會以標準 JSON 格式提供您在下列結構描述驗證屬性中選取的屬性。對於您未選取的屬性,Lambda 會依原狀交付這些屬性。

    • 如果您選擇 SOURCE,Lambda 會以原始來源格式提供您在下面的結構描述驗證屬性中選取的屬性。

  10. 針對結構描述驗證屬性,選取您希望 Lambda 使用您的結構描述登錄檔驗證和還原序列化的訊息屬性。您必須至少選取其中一個 KEYVALUE。如果您為事件記錄格式選擇 JSON,Lambda 也會先還原序列化選取的訊息屬性,再將它們傳送到您的函數。如需詳細資訊,請參閱承載格式和還原序列化行為

  11. 選擇儲存

您也可以使用 Lambda API 建立或更新具有結構描述登錄檔組態的事件來源映射。下列範例示範如何使用 設定 AWS Glue 或 Confluent 結構描述登錄檔 AWS CLI,其對應至 API 參考中的 UpdateEventSourceMappingCreateEventSourceMapping API 操作: AWS Lambda

重要

如果您使用 AWS CLI 或 update-event-source-mapping API 更新任何結構描述登錄檔組態欄位,則必須更新結構描述登錄檔組態的所有欄位。

Create Event Source Mapping
aws lambda create-event-source-mapping \ --function-name my-schema-validator-function \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \ --topics my-kafka-topic \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update AWS Glue Schema Registry
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry with Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry without Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Remove Schema Registry Configuration

若要從事件來源映射中移除結構描述登錄檔組態,您可以使用 AWS Lambda API 參考中的 CLI 命令 UpdateEventSourceMapping

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : {} }'

Avro 和 Protobuf 的篩選

搭配結構描述登錄檔使用 Avro 或 Protobuf 格式時,您可以將事件篩選套用至 Lambda 函數。篩選條件模式會在結構描述驗證後套用至資料的還原序列化傳統 JSON 表示法。例如,使用定義產品詳細資訊的 Avro 結構描述,包括價格,您可以根據價格值來篩選訊息:

注意

還原序列化時,Avro 會轉換為標準 JSON,這表示它無法直接轉換回 Avro 物件。如果您需要轉換為 Avro 物件,請改用 SOURCE 格式。

對於 Protobuf 還原序列化,產生的 JSON 中的欄位名稱符合您結構描述中定義的名稱,而不是像 Protobuf 一般轉換為駱駝案例。建立篩選模式時,請記住這一點。

aws lambda create-event-source-mapping \ --function-name myAvroFunction \ --topics myAvroTopic \ --starting-position TRIM_HORIZON \ --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \ --schema-registry-config '{ "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "VALUE" } ] }' \ --filter-criteria '{ "Filters": [ { "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }" } ] }'

在此範例中,篩選條件模式會分析 value 物件,將訊息field_1"value1"field_2 比對"value2"。在 Lambda 將訊息從 Avro 格式轉換為 JSON 之後,篩選條件會根據還原序列化資料進行評估。

如需事件篩選的詳細資訊,請參閱 Lambda 事件篩選

承載格式和還原序列化行為

使用結構描述登錄檔時,Lambda 會以類似於一般事件承載的格式,將最終承載交付給您的函數,其中包含一些額外的欄位。其他欄位取決於 SchemaValidationConfigs 參數。對於您為驗證選取的每個屬性 (索引鍵或值),Lambda 會將對應的結構描述中繼資料新增至承載。

注意

您必須將 aws-lambda-java-events 更新至 3.16.0 版或更新版本,才能使用結構描述中繼資料欄位。

例如,如果您驗證 value 欄位,Lambda valueSchemaMetadata會將稱為 的欄位新增至您的承載。同樣地,對於 key 欄位,Lambda 新增名為 的欄位keySchemaMetadata。此中繼資料包含資料格式和用於驗證的結構描述 ID 的相關資訊:

"valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }

EventRecordFormat 參數可以設定為 JSONSOURCE,這會決定 Lambda 如何處理結構描述驗證的資料,然後再將其交付至您的函數。每個選項都提供不同的處理功能:

  • JSON - Lambda 將已驗證的屬性還原序列化為標準 JSON 格式,使資料準備好在原生 JSON 支援的語言中直接使用。當您不需要保留原始二進位格式或使用產生的類別時,此格式是理想的。

  • SOURCE - Lambda 會將資料的原始二進位格式保留為 Base64-encoded字串,允許直接轉換為 Avro 或 Protobuf 物件。使用強類型語言時,或當您需要維護 Avro 或 Protobuf 結構描述的完整功能時,此格式至關重要。

根據這些格式特性和特定語言考量,我們建議使用下列格式:

根據程式設計語言的建議格式
語言 Avro Protobuf JSON
Java SOURCE SOURCE SOURCE
Python JSON JSON JSON
NodeJS JSON JSON JSON
.NET SOURCE SOURCE SOURCE
其他 JSON JSON JSON

下列各節詳細說明這些格式,並提供每個格式的範例承載。

JSON format (JSON 格式)

如果您選擇 JSON做為 EventRecordFormat,Lambda 會驗證和還原序列化您在 SchemaValidationConfigs 欄位中選取的訊息屬性 ( key 和/或 value 屬性)。Lambda 會將這些選取的屬性做為其在函數中標準 JSON 表示法的 base64 編碼字串提供。

注意

還原序列化時,Avro 會轉換為標準 JSON,這表示它無法直接轉換回 Avro 物件。如果您需要轉換為 Avro 物件,請改用 SOURCE 格式。

對於 Protobuf 還原序列化,產生的 JSON 中的欄位名稱符合您結構描述中定義的名稱,而不是像 Protobuf 一般轉換為駱駝案例。建立篩選模式時,請記住這一點。

以下顯示範例承載,假設您選擇 JSON作為 EventRecordFormat,且 keyvalue 屬性都作為 SchemaValidationConfigs

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

在此範例中:

  • keyvalue都是還原序列化後其 JSON 表示法的 base64 編碼字串。

  • Lambda 包含 keySchemaMetadata和 中兩個屬性的結構描述中繼資料valueSchemaMetadata

  • 您的函數可以解碼 keyvalue字串,以存取還原序列化的 JSON 資料。

對於非強式輸入的語言,例如 Python 或 Node.js,建議使用 JSON 格式。這些語言原生支援將 JSON 轉換為 物件。

來源格式

如果您選擇 SOURCE做為 EventRecordFormat,Lambda 仍會根據結構描述登錄檔驗證記錄,但 會將原始二進位資料交付給您的函數,而不會還原序列化。此二進位資料會以原始位元組資料的 Base64 編碼字串傳遞,並移除生產者附加的中繼資料。因此,您可以直接將原始二進位資料轉換為函數程式碼中的 Avro 和 Protobuf 物件。我們建議針對 使用 Powertools AWS Lambda,這會還原序列化原始二進位資料,並直接為您提供 Avro 和 Protobuf 物件。

例如,如果您設定 Lambda 來驗證 keyvalue 屬性,但使用 SOURCE 格式,您的函數會收到如下的承載:

{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "mytopic-0": [ { "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

在此範例中:

  • key 和 都value包含原始二進位資料做為 Base64 編碼字串。

  • 您的函數需要使用適當的程式庫來處理還原序列化。

如果您使用的是 Avro 產生的或 Protobuf 產生的物件,特別是 Java 函數,EventRecordFormat則建議選擇 SOURCE 。這是因為 Java 是強式類型,並且需要 Avro 和 Protobuf 格式的特定還原序列化程式。在函數程式碼中,您可以使用偏好的 Avro 或 Protobuf 程式庫來還原序列化資料。

在 Lambda 函數中使用還原序列化資料

Powertools for AWS Lambda 可協助您根據您使用的格式,將函數程式碼中的 Kafka 記錄還原序列化。此公用程式可處理資料轉換並提供ready-to-use簡化 Kafka 記錄的使用。

若要在函數 AWS Lambda 中使用 Powertools for,您需要在建置 Lambda 函數時新增 Powertools AWS Lambda 做為 layer 或包含它做為相依性。如需設定說明和詳細資訊,請參閱 Powertools for AWS Lambda documentation for your preferred language:

注意

使用結構描述登錄整合時,您可以選擇 SOURCEJSON 格式。每個選項都支援不同的序列化格式,如下所示:

格式 支援

SOURCE

Avro 和 Protobuf (使用 Lambda 結構描述登錄檔整合)

JSON

JSON 資料

使用 SOURCEJSON 格式時,您可以使用 Powertools for AWS 協助還原序列化函數程式碼中的資料。以下是如何處理不同資料格式的範例:

AVRO

Java 範例:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.avro.AvroProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_AVRO) public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) { for (ConsumerRecord<String, AvroProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); AvroProduct product = consumerRecord.value(); LOGGER.info("AvroProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Python 範例:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") value_schema_str = open("customer_profile.avsc", "r").read() schema_config = SchemaConfig( value_schema_type="AVRO", value_schema=value_schema_str) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript 範例:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; const schema = '{ "type": "record", "name": "Product", "fields": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "price", "type": "double" } ] }'; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'avro', schema: schema, }, } );

.NET 範例:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Avro; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
PROTOBUF

Java 範例:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.protobuf.ProtobufProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class ProtobufDeserializationFunction implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_PROTOBUF) public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) { for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); ProtobufProduct product = consumerRecord.value(); LOGGER.info("ProtobufProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Python 範例:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger from user_pb2 import User # protobuf generated class logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig( value_schema_type="PROTOBUF", value_schema=User) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript 範例:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; import { Product } from './product.generated.js'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); } }, { value: { type: 'protobuf', schema: Product, }, } );

.NET 範例:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Protobuf; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
JSON

Java 範例:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_JSON) public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) { for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) { LOGGER.info("ConsumerRecord: {}", consumerRecord); Product product = consumerRecord.value(); LOGGER.info("Product: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Python 範例:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig(value_schema_type="JSON") @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript 範例:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'json', }, } );

.NET 範例:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Json; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))] namespace JsonClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }

結構描述登錄檔的身分驗證方法

若要使用結構描述登錄檔,Lambda 需要能夠安全地存取它。如果您使用 AWS Glue 結構描述登錄檔,Lambda 依賴 IAM 身分驗證。這表示函數的執行角色必須具有下列許可,才能存取 AWS Glue 登錄檔:

必要 IAM 政策的範例:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:GetRegistry", "glue:GetSchemaVersion" ], "Resource": [ "*" ] } ] }
注意

對於 AWS Glue 結構描述登錄檔,如果您AccessConfigs提供 AWS Glue 登錄檔,Lambda 將傳回驗證例外狀況。

如果您使用的是 Confluent 結構描述登錄檔,您可以為 KafkaSchemaRegistryAccessConfig 物件的 Type 參數選擇三種支援的身分驗證方法之一:

  • BASIC_AUTH — Lambda 使用使用者名稱和密碼或 API 金鑰和 API 秘密身分驗證來存取您的登錄檔。如果您選擇此選項,請在 URI 欄位中提供包含憑證的 Secrets Manager ARN。

  • CLIENT_CERTIFICATE_TLS_AUTH — Lambda 對用戶端憑證使用交互 TLS 身分驗證。若要使用此選項,Lambda 需要同時存取憑證和私有金鑰。在 URI 欄位中提供包含這些登入資料的 Secrets Manager ARN。

  • NO_AUTH — 公有 CA 憑證必須由 Lambda 信任存放區中的憑證授權機構 (CA) 簽署。對於私有 CA/自我簽署憑證,您可以設定伺服器根 CA 憑證。若要使用此選項,請省略 AccessConfigs 參數。

此外,如果 Lambda 需要存取私有 CA 憑證以驗證結構描述登錄檔的 TLS 憑證,請選擇 SERVER_ROOT_CA_CERT做為 Type,並在 URI 欄位中將 Secrets Manager ARN 提供給憑證。

注意

若要在主控台中設定 SERVER_ROOT_CA_CERT選項,請在加密欄位中提供包含憑證的秘密 ARN。

結構描述登錄檔的身分驗證組態與您為 Kafka 叢集設定的任何身分驗證不同。您必須分別設定兩者,即使它們使用類似的身分驗證方法。

結構描述登錄檔問題的錯誤處理和疑難排解

搭配 Amazon MSK 事件來源使用結構描述登錄檔時,您可能會遇到各種錯誤。本節提供有關常見問題以及如何解決這些問題的指導。

組態錯誤

設定結構描述登錄檔組態時,會發生這些錯誤。

需要佈建模式

錯誤訊息SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.

解決方案:透過在 中設定 MinimumPollers 參數,為您的事件來源映射啟用佈建模式ProvisionedPollerConfig

無效的結構描述登錄 URL

錯誤訊息Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.

解決方案:為 Confluent Schema Registry 提供有效的 HTTPS URL,或為 AWS Glue Schema Registry 提供有效的 ARN。

無效或缺少事件記錄格式

錯誤訊息EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.

解決方案:在您的結構描述登錄檔組態中,將 SOURCE 或 JSON 指定為 EventRecordFormat。

重複的驗證屬性

錯誤訊息Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.

解決方案:從您的 SchemaValidationConfigs 移除重複的 KEY 或 VALUE 屬性。每個屬性類型只能顯示一次。

缺少驗證組態

錯誤訊息SchemaValidationConfigs is a required field for SchemaRegistryConfig.

解決方案:將 SchemaValidationConfigs 新增至您的組態,指定至少一個驗證屬性 (KEY 或 VALUE)。

存取和許可錯誤

當 Lambda 由於許可或身分驗證問題而無法存取結構描述登錄檔時,就會發生這些錯誤。

AWS Glue 結構描述登錄檔存取遭拒

錯誤訊息Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.

解決方案:將所需的許可 (glue:GetRegistryglue:GetSchemaVersion) 新增至函數的執行角色。

Confluent 結構描述登錄檔存取遭拒

錯誤訊息Cannot access Confluent Schema with the provided access configuration.

解決方案:確認您的身分驗證登入資料 (存放在 Secrets Manager 中) 是否正確,並具有存取結構描述登錄檔的必要許可。

跨帳戶 AWS Glue 結構描述登錄檔

錯誤訊息Cross-account Glue Schema Registry ARN not supported.

解決方案:使用與 Lambda 函數位於相同 AWS 帳戶的 AWS Glue 結構描述登錄檔。

跨區域 AWS Glue 結構描述登錄檔

錯誤訊息Cross-region Glue Schema Registry ARN not supported.

解決方案:使用與 Lambda 函數位於相同區域的 AWS Glue 結構描述登錄檔。

私密存取問題

錯誤訊息Lambda received InvalidRequestException from Secrets Manager.

解決方法:確認函數的執行角色具有存取秘密的許可,而且如果從其他帳戶存取 ,則不會使用預設 AWS KMS 金鑰加密秘密。

連線錯誤

當 Lambda 無法建立與結構描述登錄檔的連線時,就會發生這些錯誤。

VPC 連線問題

錯誤訊息Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.

解決方案:設定您的 VPC 聯網,以允許使用 AWS PrivateLink NAT Gateway 或 VPC 對等互連連線至結構描述登錄檔。

TLS 交握失敗

錯誤訊息Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.

解決方案:確認您的 CA 憑證和用戶端憑證 (適用於 mTLS) 在 Secrets Manager 中正確設定。

限流

錯誤訊息Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.

解決方案:提高結構描述登錄檔的 API 速率限制,或降低應用程式的請求速率。

自我管理結構描述登錄檔錯誤

錯誤訊息Lambda received an internal server an unexpected error from the provided self-managed schema registry.

解決方案:檢查自我管理結構描述登錄伺服器的運作狀態和組態。