在 Lambda 中將結構描述登錄檔與 Kafka 事件來源搭配使用 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Lambda 中將結構描述登錄檔與 Kafka 事件來源搭配使用

結構描述登錄檔可協助您定義並管理資料串流結構描述。結構描述定義資料記錄的結構和格式。在 Kafka 事件來源映射的情境中,您可以設定結構描述登錄檔,在 Kafka 訊息送達 Lambda 函式之前,依據預先定義的結構描述驗證其結構與格式。這會將資料治理層新增至應用程式,讓您能夠有效管理資料格式、確保結構描述合規,並透過事件篩選來最佳化成本。

此功能適用於所有程式設計語言,但請考慮以下要點:

  • Powertools for Lambda 提供 Java、Python 及 TypeScript 的專屬支援,不僅能維護與現有 Kafka 開發模式的一致性,更可讓您直接存取業務物件,無需自訂反序列化程式碼

  • 此功能僅適用於採用佈建模式的事件來源映射。結構描述登錄檔不支援隨需模式下的事件來源映射。如果您使用佈建模式且已設定結構描述登錄檔,將無法變更為隨需模式,除非您先移除結構描述登錄檔組態。如需詳細資訊,請參閱佈建模式

  • 每個事件來源映射 (ESM) 僅能設定一個結構描述登錄檔。將結構描述登錄檔與 Kafka 事件來源搭配使用,可能會增加 Lambda 事件輪詢單元 (EPU) 用量,此單元為佈建模式的一個定價維度。

結構描述登錄檔選項

Lambda 支援下列結構描述登錄檔選項:

結構描述登錄檔支援驗證採用下列資料格式的訊息:

  • Apache Avro

  • 通訊協定緩衝區 (Protobuf)

  • JSON 結構描述 (JSON-SE)

若要使用結構描述登錄檔,請先確保事件來源映射處於佈建模式。當您使用結構描述登錄檔時,Lambda 會將結構描述的相關中繼資料新增至承載。如需詳細資訊,請參閱承載格式與反序列化行為

Lambda 如何對 Kafka 訊息執行結構描述驗證

當您設定結構描述登錄檔時,Lambda 會為每個 Kafka 訊息執行下列步驟:

  1. Lambda 會從叢集輪詢 Kafka 記錄。

  2. Lambda 會根據結構描述登錄檔中的特定結構描述,驗證記錄中選定的訊息屬性。

    • 如果在登錄檔中找不到與訊息相關聯的結構描述,Lambda 會將訊息傳送至 DLQ,並標示原因代碼 SCHEMA_NOT_FOUND

  3. 為了驗證訊息,Lambda 會依據結構描述登錄檔組態對訊息進行反序列化。如果設定了事件篩選,Lambda 會接著根據設定的篩選條件執行篩選。

    • 如果反序列化失敗,Lambda 會將訊息傳送至 DLQ,並標示原因代碼 DESERIALIZATION_ERROR。如果未設定 DLQ,Lambda 會捨棄訊息。

  4. 如果訊息透過結構描述登錄檔進行驗證,且未依篩選條件篩選出,Lambda 將會使用該訊息調用函式。

此功能旨在驗證已使用與結構描述登錄檔整合的 Kafka 用戶端所產生的訊息。建議將 Kafka 生產者設定為與結構描述登錄檔搭配使用,以便建立格式正確的訊息。

設定 Kafka 結構描述登錄檔

下列主控台步驟會將 Kafka 結構描述登錄檔組態新增至事件來源映射。

將 Kafka 結構描述登錄檔組態新增至事件來源映射 (主控台)
  1. 開啟 Lambda 主控台中的函數頁面

  2. 選擇 Configuration (組態)

  3. 選擇觸發程序

  4. 選取要為其設定結構描述登錄檔的 Kafka 事件來源映射,然後選擇編輯

  5. 事件輪詢器組態欄位中,選擇設定結構描述登錄檔。事件來源映射必須處於佈建模式,方可查看此選項。

  6. 對於結構描述登錄 URI,輸入 AWS Glue 結構描述登錄檔的 ARN,或 Confluent Cloud 結構描述登錄檔的 HTTPS URL 或自我管理的 Confluent 結構描述登錄檔。

  7. 下列組態步驟會告知 Lambda 如何存取結構描述登錄檔。如需詳細資訊,請參閱結構描述登錄檔的身分驗證方法

    • 存取組態類型欄位中,選擇 Lambda 用於存取結構描述登錄檔的身分驗證類型。

    • 存取組態 URI 欄位中,輸入 Secrets Manager 秘密的 ARN,向結構描述登錄檔進行身分驗證 (如適用)。確保函式的執行角色包含正確的許可。

  8. 僅當結構描述登錄檔由私有憑證認證機構 (CA) 或未納入 Lambda 信任存放區的憑證認證機構 (CA) 簽署時,加密欄位才適用。如適用,請提供秘密金鑰,其中應包含結構描述登錄檔用於 TLS 加密的私有 CA 憑證。

  9. 事件記錄格式欄位中,選擇希望 Lambda 在完成結構描述驗證後將記錄交付給函式的方式。如需詳細資訊,請參閱承載格式範例

    • 如果選擇 JSON,Lambda 會以標準 JSON 格式傳送您在下方「結構描述驗證屬性」中選取的屬性。對於您未選取的屬性,Lambda 會依原狀傳送這些屬性。

    • 如果選擇 SOURCE,Lambda 會以原始來源格式傳送您在下方「結構描述驗證屬性」中選取的屬性。

  10. 結構描述驗證屬性欄位中,選取您希望 Lambda 使用結構描述登錄檔進行驗證與反序列化的訊息屬性。必須至少選取 KEYVALUE 其中一項。若您選擇 JSON 作為事件記錄格式,Lambda 還會在將選定的訊息屬性傳送至函式之前,對其進行反序列化。如需詳細資訊,請參閱承載格式與反序列化行為

  11. 選擇儲存

您也可以使用 Lambda API 來建立或更新帶有結構描述登錄檔組態的事件來源映射。下列範例示範如何使用 設定 AWS Glue 或 Confluent 結構描述登錄檔 AWS CLI,其對應至 API 參考中的 UpdateEventSourceMappingCreateEventSourceMapping API 操作: AWS Lambda

重要

如果您使用 AWS CLI 或 update-event-source-mapping API 更新任何結構描述登錄檔組態欄位,則必須更新結構描述登錄檔組態的所有欄位。

Create Event Source Mapping
aws lambda create-event-source-mapping \ --function-name my-schema-validator-function \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \ --topics my-kafka-topic \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update AWS Glue Schema Registry
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry with Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry without Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Remove Schema Registry Configuration

若要從事件來源映射中移除結構描述登錄檔組態,可以使用 AWS Lambda API Reference 中的 CLI 命令 UpdateEventSourceMapping

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : {} }'

Avro 與 Protobuf 的篩選

將 Avro 或 Protobuf 格式與結構描述登錄檔搭配使用時,即可將事件篩選功能套用至 Lambda 函式。篩選模式會在結構描述驗證後,套用至資料反序列化後的傳統 JSON 表示法。例如,使用定義產品詳細資訊 (包含價格) 的 Avro 結構描述,您可以根據價格值篩選訊息:

注意

當 Avro 進行反序列化時,會轉換為標準 JSON 格式,也意味著無法直接轉換回 Avro 物件。如需轉換為 Avro 物件,請改用 SOURCE 格式。

在 Protobuf 反序列化過程中,產生 JSON 的欄位名稱與結構描述中定義的欄位名稱相符,而非如 Protobuf 常規作法般為駝峰式大小寫。在建立篩選模式時,請記住這一點。

aws lambda create-event-source-mapping \ --function-name myAvroFunction \ --topics myAvroTopic \ --starting-position TRIM_HORIZON \ --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \ --schema-registry-config '{ "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "VALUE" } ] }' \ --filter-criteria '{ "Filters": [ { "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }" } ] }'

在此範例中,篩選模式會分析 value 物件,將 field_1 中符合 "value1"field_2 中符合 "value2" 的訊息進行比對。在 Lambda 將訊息從 Avro 格式轉換為 JSON 格式之後,篩選條件會針對反序列化後的資料進行評估。

如需有關事件篩選的詳細資訊,請參閱 Lambda 事件篩選

承載格式與反序列化行為

使用結構描述登錄檔時,Lambda 會以類似於常規事件承載的格式將最終承載傳送至函式,但會包含部分額外欄位。這些額外欄位取決於 SchemaValidationConfigs 參數。對於您選取進行驗證的每個屬性 (鍵或值),Lambda 會將相應的結構描述中繼資料新增至承載。

注意

必須將 aws-lambda-java-events 更新至 3.16.0 版或更新版本,才能使用結構描述中繼資料欄位。

例如,若驗證 value 欄位,Lambda 會將名為 valueSchemaMetadata 的欄位新增至承載。同理,如果驗證 key 欄位,Lambda 會新增名為 keySchemaMetadata 的欄位。此中繼資料包含資料格式與驗證所用結構描述 ID 的相關資訊:

"valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }

EventRecordFormat 參數可以設定為 JSONSOURCE,這決定了 Lambda 在將經過結構描述驗證的資料傳送至函式之前的處理方式。每個選項都提供不同的處理功能:

  • JSON:Lambda 將經過驗證的屬性反序列化為標準 JSON 格式,便於資料在原生 JSON 支援的語言中直接使用。若無需保留原始二進位格式或無需使用產生的類別,此格式尤其適用。

  • SOURCE:Lambda 會將資料的原始二進位格式保留為 Base64 編碼字串,允許直接轉換為 Avro 或 Protobuf 物件。當使用強型別語言或需要保持 Avro 或 Protobuf 結構描述的完整功能時,此格式至關重要。

基於上述格式特性與語言專屬考量,建議採用下列格式:

依據程式設計語言建議的格式
Language Avro Protobuf JSON
Java SOURCE SOURCE SOURCE
Python JSON JSON JSON
NodeJS JSON JSON JSON
.NET SOURCE SOURCE SOURCE
其他 JSON JSON JSON

下列各節詳細介紹了這些格式,也為每種格式提供了承載範例。

JSON format (JSON 格式)

若選擇 JSON 作為 EventRecordFormat,Lambda 會驗證並反序列化您在 SchemaValidationConfigs 欄位中選取的訊息屬性 (key 及/或 value 屬性)。Lambda 會將這些選定的屬性,以其標準 JSON 表述的 Base64 編碼字串形式,傳遞至您的函式中。

注意

當 Avro 進行反序列化時,會轉換為標準 JSON 格式,也意味著無法直接轉換回 Avro 物件。如需轉換為 Avro 物件,請改用 SOURCE 格式。

在 Protobuf 反序列化過程中,產生 JSON 的欄位名稱與結構描述中定義的欄位名稱相符,而非如 Protobuf 常規作法般為駝峰式大小寫。在建立篩選模式時,請記住這一點。

下列承載範例假設您選擇 JSON 作為 EventRecordFormat,同時選擇 keyvalue 屬性作為 SchemaValidationConfigs

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

在此範例中:

  • keyvalue 經反序列化後,皆以其 JSON 表述的 Base64 編碼字串形式呈現。

  • Lambda 會包含 keySchemaMetadatavalueSchemaMetadata 中兩種屬性的結構描述中繼資料。

  • 函式可以解碼 keyvalue 字串,以存取反序列化後的 JSON 資料。

對於非強型別語言,例如 Python 或 Node.js,建議採用 JSON 格式。這些語言原生支援將 JSON 轉換為物件。

來源格式

如果您選擇 SOURCE 作為 EventRecordFormat,Lambda 仍然會根據結構描述登錄檔驗證記錄,但會將原始二進位資料傳送至函式,而不進行反序列化。此二進位資料會以原始位元組資料的 Base64 編碼字串形式傳送,並移除生產者附加的中繼資料。因此,您可在函式程式碼中,將原始二進位資料直接轉換為 Avro 與 Protobuf 物件。我們建議將 Powertools 用於 AWS Lambda,這會還原序列化原始二進位資料,並直接為您提供 Avro 和 Protobuf 物件。

例如,若設定 Lambda 來驗證 keyvalue 屬性,但採用 SOURCE 格式,函式會收到如下承載:

{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "mytopic-0": [ { "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

在此範例中:

  • keyvalue 都包含原始二進位資料作為 Base64 編碼字串。

  • 函式需要使用適當的程式庫來處理反序列化。

若您使用 Avro 產生的物件或 Protobuf 產生的物件 (特別是在 Java 函式中),則建議將 EventRecordFormat 設定為 SOURCE。這是因為 Java 屬於強型別語言,需針對 Avro 與 Protobuf 格式使用特定的反序列化。在函式程式碼中,您可以使用偏好的 Avro 或 Protobuf 程式庫將資料反序列化。

在 Lambda 函式中處理反序列化資料

的 Powertools AWS Lambda 可協助您根據您使用的格式,還原序列化函數程式碼中的 Kafka 記錄。此公用程式透過處理資料轉換並提供即用型物件,簡化了 Kafka 記錄的處理流程。

若要在函數 AWS Lambda 中使用 Powertools for ,您需要在建置 Lambda 函數時新增 Powertools AWS Lambda 做為 layer 或包含它做為相依性。如需設定說明和詳細資訊,請參閱 Powertools for AWS Lambda documentation for your preferred language:

注意

使用結構描述登錄檔整合時,可選擇 SOURCEJSON 格式。每個選項都支援不同的序列化格式,如下所示:

格式 支援

SOURCE

Avro 與 Protobuf (透過 Lambda 結構描述登錄檔整合)

JSON

JSON 資料

使用 SOURCEJSON 格式時,您可以使用 的 Powertools, AWS 協助還原序列化函數程式碼中的資料。以下是如何處理不同資料格式的範例:

AVRO

Java 範例:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.avro.AvroProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_AVRO) public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) { for (ConsumerRecord<String, AvroProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); AvroProduct product = consumerRecord.value(); LOGGER.info("AvroProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Python 範例:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") value_schema_str = open("customer_profile.avsc", "r").read() schema_config = SchemaConfig( value_schema_type="AVRO", value_schema=value_schema_str) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript 範例:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; const schema = '{ "type": "record", "name": "Product", "fields": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "price", "type": "double" } ] }'; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'avro', schema: schema, }, } );

.NET 範例:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Avro; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
PROTOBUF

Java 範例:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.protobuf.ProtobufProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class ProtobufDeserializationFunction implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_PROTOBUF) public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) { for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); ProtobufProduct product = consumerRecord.value(); LOGGER.info("ProtobufProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Python 範例:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger from user_pb2 import User # protobuf generated class logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig( value_schema_type="PROTOBUF", value_schema=User) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript 範例:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; import { Product } from './product.generated.js'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); } }, { value: { type: 'protobuf', schema: Product, }, } );

.NET 範例:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Protobuf; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
JSON

Java 範例:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_JSON) public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) { for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) { LOGGER.info("ConsumerRecord: {}", consumerRecord); Product product = consumerRecord.value(); LOGGER.info("Product: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Python 範例:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig(value_schema_type="JSON") @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript 範例:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'json', }, } );

.NET 範例:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Json; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))] namespace JsonClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }

結構描述登錄檔的身分驗證方法

若要使用結構描述登錄檔,Lambda 需要能夠對其進行安全存取。如果您使用 AWS Glue 結構描述登錄檔,Lambda 依賴 IAM 身分驗證。這表示函數的執行角色必須具有下列許可,才能存取 AWS Glue 登錄檔:

必要 IAM 政策範例:

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:GetRegistry", "glue:GetSchemaVersion" ], "Resource": [ "*" ] } ] }
注意

對於 AWS Glue 結構描述登錄檔,如果您AccessConfigs提供 AWS Glue 登錄檔,Lambda 將傳回驗證例外狀況。

若您使用 Confluent 結構描述登錄檔,可為 KafkaSchemaRegistryAccessConfig 物件的 Type 參數選擇以下三種身分驗證方法中一種:

  • BASIC_AUTH:Lambda 會透過使用者名稱與密碼,或 API 金鑰與 API 秘密身分驗證方式存取登錄庫。如果選擇此選項,請在 URI 欄位中提供包含憑證的 Secrets Manager ARN。

  • CLIENT_CERTIFICATE_TLS_AUTH:Lambda 透過用戶端憑證進行雙向 TLS 身分驗證。若要使用此選項,Lambda 需要同時存取憑證和私有金鑰。在 URI 欄位中提供包含這些憑證的 Secrets Manager ARN。

  • NO_AUTH:公有 CA 憑證必須由位於 Lambda 信任存放區中的憑證認證機構 (CA) 簽署。對於私有 CA/自我簽署憑證,您需要設定伺服器根 CA 憑證。若要使用此選項,請省略 AccessConfigs 參數。

此外,如果 Lambda 需要存取私有 CA 憑證來驗證結構描述登錄檔的 TLS 憑證,請選擇 SERVER_ROOT_CA_CERT 作為 Type,並在 URI 欄位中提供憑證對應的 Secrets Manager ARN。

注意

若要在主控台中設定 SERVER_ROOT_CA_CERT 選項,需要在加密欄位中提供包含該憑證的秘密 ARN。

結構描述登錄檔的驗證組態,需獨立於 Kafka 叢集的任何驗證設定。即使兩者採用相似的驗證方法,仍須分別進行設定。

結構描述登錄檔問題的錯誤處理與疑難排解

在將結構描述登錄檔與 Amazon MSK 事件來源搭配使用時,可能會遭遇各種錯誤。本節提供了常見問題及其解決方法的指導。

組態錯誤

這些錯誤通常發生在設定結構描述登錄檔組態時。

需要佈建模式

錯誤訊息SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.

解決方案:透過在 ProvisionedPollerConfig 中設定 MinimumPollers 參數,為事件來源映射啟用佈建模式。

無效的結構描述登錄檔 URL

錯誤訊息Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.

解決方案:為 Confluent Schema Registry 提供有效的 HTTPS URL,或為 AWS Glue Schema Registry 提供有效的 ARN。

無效或缺少事件記錄格式

錯誤訊息EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.

解決方案:在結構描述登錄檔組態中,將 SOURCE 或 JSON 指定為 EventRecordFormat。

重複驗證屬性

錯誤訊息Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.

解決方案:從 SchemaValidationConfigs 中移除重複的 KEY 或 VALUE 屬性。每種屬性類型只能出現一次。

缺少驗證組態

錯誤訊息SchemaValidationConfigs is a required field for SchemaRegistryConfig.

解決方案:將 SchemaValidationConfigs 新增至組態,指定至少一個驗證屬性 (KEY 或 VALUE)。

存取與許可錯誤

當 Lambda 陰許可或身分驗證問題而無法存取結構描述登錄檔時,就會發生這些錯誤。

AWS Glue 結構描述登錄檔存取遭拒

錯誤訊息Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.

解決方案:將必要許可 (glue:GetRegistryglue:GetSchemaVersion) 新增至函式的執行角色。

Confluent 結構描述登錄檔存取遭拒

錯誤訊息Cannot access Confluent Schema with the provided access configuration.

解決方案:確認身分驗證憑證 (儲存於 Secrets Manager 中) 是否正確,以及是否具有存取結構描述登錄檔的必要許可。

跨帳戶 AWS Glue 結構描述登錄檔

錯誤訊息Cross-account Glue Schema Registry ARN not supported.

解決方案:使用與 Lambda 函數位於相同 AWS 帳戶的 AWS Glue 結構描述登錄檔。

跨區域 AWS Glue 結構描述登錄檔

錯誤訊息Cross-region Glue Schema Registry ARN not supported.

解決方案:使用與 Lambda 函數位於相同區域的 AWS Glue 結構描述登錄檔。

秘密存取問題

錯誤訊息Lambda received InvalidRequestException from Secrets Manager.

解決方法:確認函數的執行角色具有存取秘密的許可,而且如果從其他帳戶存取 ,則不會使用預設 AWS KMS 金鑰加密秘密。

連線錯誤

當 Lambda 無法與結構描述登錄檔建立連線時,就會發生這些錯誤。

VPC 連線問題

錯誤訊息Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.

解決方案:設定您的 VPC 聯網,以允許使用 AWS PrivateLink NAT Gateway 或 VPC 對等互連連線至結構描述登錄檔。

TLS 交握失敗

錯誤訊息Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.

解決方案:確認 CA 憑證與用戶端憑證 (適用於 mTLS) 是否正確,是否在 Secrets Manager 中正確設定。

限流

錯誤訊息Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.

解決方案:提高結構描述登錄檔的 API 速率限制,或降低應用程式的請求速率。

自我管理結構描述登錄檔錯誤

錯誤訊息Lambda received an internal server an unexpected error from the provided self-managed schema registry.

解決方案:檢查自我管理結構描述登錄檔伺服器的運作狀態與組態。