本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
在 Lambda 中使用結構描述登錄檔搭配 Kafka 事件來源
結構描述登錄檔可協助您定義和管理資料串流結構描述。結構描述定義資料記錄的結構和格式。在 Kafka 事件來源映射的內容中,您可以設定結構描述登錄檔,在 Kafka 訊息到達 Lambda 函數之前,針對預先定義的結構描述驗證其結構和格式。這會為您的應用程式新增資料控管層,並可讓您有效率地管理資料格式、確保結構描述合規,以及透過事件篩選來最佳化成本。
此功能適用於所有程式設計語言,但請考慮以下要點:
Powertools for Lambda 提供 Java、Python 和 TypeScript 的特定支援,維持與現有 Kafka 開發模式的一致性,並允許在沒有自訂還原序列化程式碼的情況下直接存取商業物件
此功能僅適用於使用佈建模式的事件來源映射。結構描述登錄檔不支援隨需模式中的事件來源映射。如果您使用佈建模式,且已設定結構描述登錄檔,則無法變更為隨需模式,除非您先移除結構描述登錄檔組態。如需詳細資訊,請參閱佈建模式
每個事件來源映射 (ESM) 只能設定一個結構描述登錄檔。搭配 Kafka 事件來源使用結構描述登錄檔可能會增加 Lambda 事件輪詢單元 (EPU) 用量,這是佈建模式的定價維度。
主題
結構描述登錄檔選項
Lambda 支援下列結構描述登錄選項:
您的結構描述登錄檔支援以下列資料格式驗證訊息:
-
Apache Avro
-
通訊協定緩衝區 (Protobuf)
-
JSON 結構描述 (JSON-SE)
若要使用結構描述登錄檔,請先確認您的事件來源映射處於佈建模式。當您使用結構描述登錄檔時,Lambda 會將結構描述的相關中繼資料新增至承載。如需詳細資訊,請參閱承載格式和還原序列化行為。
Lambda 如何對 Kafka 訊息執行結構描述驗證
當您設定結構描述登錄檔時,Lambda 會為每個 Kafka 訊息執行下列步驟:
-
Lambda 會從叢集輪詢 Kafka 記錄。
-
Lambda 會根據結構描述登錄檔中的特定結構描述,驗證記錄中選取的訊息屬性。
-
如果在登錄檔中找不到與訊息相關聯的結構描述,Lambda 會將訊息傳送至原因碼為 的 DLQ
SCHEMA_NOT_FOUND
。
-
-
Lambda 會根據結構描述登錄檔組態還原序列化訊息,以驗證訊息。如果已設定事件篩選,則 Lambda 會根據設定的篩選條件執行篩選。
-
如果還原序列化失敗,Lambda 會將訊息傳送至原因碼為 的 DLQ
DESERIALIZATION_ERROR
。如果未設定 DLQ,Lambda 會捨棄訊息。
-
-
如果訊息是由結構描述登錄檔驗證,而且未依篩選條件篩選出,Lambda 會使用 訊息叫用您的函數。
此功能旨在驗證已使用與結構描述登錄整合的 Kafka 用戶端產生的訊息。我們建議您將 Kafka 生產者設定為與您的結構描述登錄檔搭配使用,以建立格式正確的訊息。
設定 Kafka 結構描述登錄檔
下列主控台步驟會將 Kafka 結構描述登錄檔組態新增至您的事件來源映射。
將 Kafka 結構描述登錄檔組態新增至事件來源映射 (主控台)
-
開啟 Lambda 主控台中的函數頁面
。 -
選擇 Configuration (組態)。
-
選擇觸發。
-
選取您要為其設定結構描述登錄檔的 Kafka 事件來源映射,然後選擇編輯。
-
在事件輪詢器組態下,選擇設定結構描述登錄檔。您的事件來源映射必須處於佈建模式,才能查看此選項。
-
針對結構描述登錄 URI,輸入 AWS Glue 結構描述登錄檔的 ARN,或 Confluent Cloud 結構描述登錄檔的 HTTPS URL 或自我管理的 Confluent 結構描述登錄檔。
-
下列組態步驟會告知 Lambda 如何存取您的結構描述登錄檔。如需詳細資訊,請參閱結構描述登錄檔的身分驗證方法。
-
針對存取組態類型,選擇 Lambda 用來存取結構描述登錄檔的身分驗證類型。
-
針對存取組態 URI,輸入 Secrets Manager 秘密的 ARN,以驗證結構描述登錄檔,如適用。請確定函數的執行角色包含正確的許可。
-
-
只有在您的結構描述登錄檔由私有憑證授權單位 (CA) 或不在 Lambda 信任存放區中的憑證授權單位 (CA) 簽署時,加密欄位才適用。如果適用,請提供私密金鑰,其中包含結構描述登錄檔用於 TLS 加密的私有 CA 憑證。
-
針對事件記錄格式,選擇您希望 Lambda 在結構描述驗證後交付記錄的方式。如需詳細資訊,請參閱承載格式範例。
-
如果您選擇 JSON,Lambda 會以標準 JSON 格式提供您在下列結構描述驗證屬性中選取的屬性。對於您未選取的屬性,Lambda 會依原狀交付這些屬性。
-
如果您選擇 SOURCE,Lambda 會以原始來源格式提供您在下面的結構描述驗證屬性中選取的屬性。
-
-
針對結構描述驗證屬性,選取您希望 Lambda 使用您的結構描述登錄檔驗證和還原序列化的訊息屬性。您必須至少選取其中一個 KEY 或 VALUE。如果您為事件記錄格式選擇 JSON,Lambda 也會先還原序列化選取的訊息屬性,再將它們傳送到您的函數。如需詳細資訊,請參閱承載格式和還原序列化行為。
-
選擇儲存。
您也可以使用 Lambda API 建立或更新具有結構描述登錄檔組態的事件來源映射。下列範例示範如何使用 設定 AWS Glue 或 Confluent 結構描述登錄檔 AWS CLI,其對應至 API 參考中的 UpdateEventSourceMapping 和 CreateEventSourceMapping API 操作: AWS Lambda
重要
如果您使用 AWS CLI 或 update-event-source-mapping
API 更新任何結構描述登錄檔組態欄位,則必須更新結構描述登錄檔組態的所有欄位。
Avro 和 Protobuf 的篩選
搭配結構描述登錄檔使用 Avro 或 Protobuf 格式時,您可以將事件篩選套用至 Lambda 函數。篩選條件模式會在結構描述驗證後套用至資料的還原序列化傳統 JSON 表示法。例如,使用定義產品詳細資訊的 Avro 結構描述,包括價格,您可以根據價格值來篩選訊息:
注意
還原序列化時,Avro 會轉換為標準 JSON,這表示它無法直接轉換回 Avro 物件。如果您需要轉換為 Avro 物件,請改用 SOURCE 格式。
對於 Protobuf 還原序列化,產生的 JSON 中的欄位名稱符合您結構描述中定義的名稱,而不是像 Protobuf 一般轉換為駱駝案例。建立篩選模式時,請記住這一點。
aws lambda create-event-source-mapping \ --function-name myAvroFunction \ --topics myAvroTopic \ --starting-position TRIM_HORIZON \ --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \ --schema-registry-config '{ "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "VALUE" } ] }' \ --filter-criteria '{ "Filters": [ { "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }" } ] }'
在此範例中,篩選條件模式會分析 value
物件,將訊息field_1
與 "value1"
和 field_2
比對"value2"
。在 Lambda 將訊息從 Avro 格式轉換為 JSON 之後,篩選條件會根據還原序列化資料進行評估。
如需事件篩選的詳細資訊,請參閱 Lambda 事件篩選。
承載格式和還原序列化行為
使用結構描述登錄檔時,Lambda 會以類似於一般事件承載的格式,將最終承載交付給您的函數,其中包含一些額外的欄位。其他欄位取決於 SchemaValidationConfigs
參數。對於您為驗證選取的每個屬性 (索引鍵或值),Lambda 會將對應的結構描述中繼資料新增至承載。
注意
您必須將 aws-lambda-java-events
例如,如果您驗證 value
欄位,Lambda valueSchemaMetadata
會將稱為 的欄位新增至您的承載。同樣地,對於 key
欄位,Lambda 新增名為 的欄位keySchemaMetadata
。此中繼資料包含資料格式和用於驗證的結構描述 ID 的相關資訊:
"valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }
EventRecordFormat
參數可以設定為 JSON
或 SOURCE
,這會決定 Lambda 如何處理結構描述驗證的資料,然後再將其交付至您的函數。每個選項都提供不同的處理功能:
-
JSON
- Lambda 將已驗證的屬性還原序列化為標準 JSON 格式,使資料準備好在原生 JSON 支援的語言中直接使用。當您不需要保留原始二進位格式或使用產生的類別時,此格式是理想的。 -
SOURCE
- Lambda 會將資料的原始二進位格式保留為 Base64-encoded字串,允許直接轉換為 Avro 或 Protobuf 物件。使用強類型語言時,或當您需要維護 Avro 或 Protobuf 結構描述的完整功能時,此格式至關重要。
根據這些格式特性和特定語言考量,我們建議使用下列格式:
語言 | Avro | Protobuf | JSON |
---|---|---|---|
Java | SOURCE | SOURCE | SOURCE |
Python | JSON | JSON | JSON |
NodeJS | JSON | JSON | JSON |
.NET | SOURCE | SOURCE | SOURCE |
其他 | JSON | JSON | JSON |
下列各節詳細說明這些格式,並提供每個格式的範例承載。
JSON format (JSON 格式)
如果您選擇 JSON
做為 EventRecordFormat
,Lambda 會驗證和還原序列化您在 SchemaValidationConfigs
欄位中選取的訊息屬性 ( key
和/或 value
屬性)。Lambda 會將這些選取的屬性做為其在函數中標準 JSON 表示法的 base64 編碼字串提供。
注意
還原序列化時,Avro 會轉換為標準 JSON,這表示它無法直接轉換回 Avro 物件。如果您需要轉換為 Avro 物件,請改用 SOURCE 格式。
對於 Protobuf 還原序列化,產生的 JSON 中的欄位名稱符合您結構描述中定義的名稱,而不是像 Protobuf 一般轉換為駱駝案例。建立篩選模式時,請記住這一點。
以下顯示範例承載,假設您選擇 JSON
作為 EventRecordFormat
,且 key
和 value
屬性都作為 SchemaValidationConfigs
:
{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }
在此範例中:
-
key
和value
都是還原序列化後其 JSON 表示法的 base64 編碼字串。 -
Lambda 包含
keySchemaMetadata
和 中兩個屬性的結構描述中繼資料valueSchemaMetadata
。 -
您的函數可以解碼
key
和value
字串,以存取還原序列化的 JSON 資料。
對於非強式輸入的語言,例如 Python 或 Node.js,建議使用 JSON 格式。這些語言原生支援將 JSON 轉換為 物件。
來源格式
如果您選擇 SOURCE
做為 EventRecordFormat
,Lambda 仍會根據結構描述登錄檔驗證記錄,但 會將原始二進位資料交付給您的函數,而不會還原序列化。此二進位資料會以原始位元組資料的 Base64 編碼字串傳遞,並移除生產者附加的中繼資料。因此,您可以直接將原始二進位資料轉換為函數程式碼中的 Avro 和 Protobuf 物件。我們建議針對 使用 Powertools AWS Lambda,這會還原序列化原始二進位資料,並直接為您提供 Avro 和 Protobuf 物件。
例如,如果您設定 Lambda 來驗證 key
和 value
屬性,但使用 SOURCE
格式,您的函數會收到如下的承載:
{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "mytopic-0": [ { "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }
在此範例中:
-
key
和 都value
包含原始二進位資料做為 Base64 編碼字串。 -
您的函數需要使用適當的程式庫來處理還原序列化。
如果您使用的是 Avro 產生的或 Protobuf 產生的物件,特別是 Java 函數,EventRecordFormat
則建議選擇 SOURCE
。這是因為 Java 是強式類型,並且需要 Avro 和 Protobuf 格式的特定還原序列化程式。在函數程式碼中,您可以使用偏好的 Avro 或 Protobuf 程式庫來還原序列化資料。
在 Lambda 函數中使用還原序列化資料
Powertools for AWS Lambda 可協助您根據您使用的格式,將函數程式碼中的 Kafka 記錄還原序列化。此公用程式可處理資料轉換並提供ready-to-use簡化 Kafka 記錄的使用。
若要在函數 AWS Lambda 中使用 Powertools for,您需要在建置 Lambda 函數時新增 Powertools AWS Lambda 做為 layer 或包含它做為相依性。如需設定說明和詳細資訊,請參閱 Powertools for AWS Lambda documentation for your preferred language:
注意
使用結構描述登錄整合時,您可以選擇 SOURCE
或 JSON
格式。每個選項都支援不同的序列化格式,如下所示:
格式 | 支援 |
---|---|
SOURCE |
Avro 和 Protobuf (使用 Lambda 結構描述登錄檔整合) |
JSON |
JSON 資料 |
使用 SOURCE
或 JSON
格式時,您可以使用 Powertools for AWS 協助還原序列化函數程式碼中的資料。以下是如何處理不同資料格式的範例:
結構描述登錄檔的身分驗證方法
若要使用結構描述登錄檔,Lambda 需要能夠安全地存取它。如果您使用 AWS Glue 結構描述登錄檔,Lambda 依賴 IAM 身分驗證。這表示函數的執行角色必須具有下列許可,才能存取 AWS Glue 登錄檔:
-
AWS Glue Web API 參考中的 GetRegistry
-
AWS Glue Web API 參考中的 GetSchemaVersion
必要 IAM 政策的範例:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:GetRegistry", "glue:GetSchemaVersion" ], "Resource": [ "*" ] } ] }
注意
對於 AWS Glue 結構描述登錄檔,如果您AccessConfigs
提供 AWS Glue 登錄檔,Lambda 將傳回驗證例外狀況。
如果您使用的是 Confluent 結構描述登錄檔,您可以為 KafkaSchemaRegistryAccessConfig 物件的 Type
參數選擇三種支援的身分驗證方法之一:
-
BASIC_AUTH — Lambda 使用使用者名稱和密碼或 API 金鑰和 API 秘密身分驗證來存取您的登錄檔。如果您選擇此選項,請在 URI 欄位中提供包含憑證的 Secrets Manager ARN。
-
CLIENT_CERTIFICATE_TLS_AUTH — Lambda 對用戶端憑證使用交互 TLS 身分驗證。若要使用此選項,Lambda 需要同時存取憑證和私有金鑰。在 URI 欄位中提供包含這些登入資料的 Secrets Manager ARN。
-
NO_AUTH — 公有 CA 憑證必須由 Lambda 信任存放區中的憑證授權機構 (CA) 簽署。對於私有 CA/自我簽署憑證,您可以設定伺服器根 CA 憑證。若要使用此選項,請省略
AccessConfigs
參數。
此外,如果 Lambda 需要存取私有 CA 憑證以驗證結構描述登錄檔的 TLS 憑證,請選擇 SERVER_ROOT_CA_CERT
做為 Type
,並在 URI 欄位中將 Secrets Manager ARN 提供給憑證。
注意
若要在主控台中設定 SERVER_ROOT_CA_CERT
選項,請在加密欄位中提供包含憑證的秘密 ARN。
結構描述登錄檔的身分驗證組態與您為 Kafka 叢集設定的任何身分驗證不同。您必須分別設定兩者,即使它們使用類似的身分驗證方法。
結構描述登錄檔問題的錯誤處理和疑難排解
搭配 Amazon MSK 事件來源使用結構描述登錄檔時,您可能會遇到各種錯誤。本節提供有關常見問題以及如何解決這些問題的指導。
組態錯誤
設定結構描述登錄檔組態時,會發生這些錯誤。
- 需要佈建模式
-
錯誤訊息:
SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.
解決方案:透過在 中設定
MinimumPollers
參數,為您的事件來源映射啟用佈建模式ProvisionedPollerConfig
。 - 無效的結構描述登錄 URL
-
錯誤訊息:
Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.
解決方案:為 Confluent Schema Registry 提供有效的 HTTPS URL,或為 AWS Glue Schema Registry 提供有效的 ARN。
- 無效或缺少事件記錄格式
-
錯誤訊息:
EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.
解決方案:在您的結構描述登錄檔組態中,將 SOURCE 或 JSON 指定為 EventRecordFormat。
- 重複的驗證屬性
-
錯誤訊息:
Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.
解決方案:從您的 SchemaValidationConfigs 移除重複的 KEY 或 VALUE 屬性。每個屬性類型只能顯示一次。
- 缺少驗證組態
-
錯誤訊息:
SchemaValidationConfigs is a required field for SchemaRegistryConfig.
解決方案:將 SchemaValidationConfigs 新增至您的組態,指定至少一個驗證屬性 (KEY 或 VALUE)。
存取和許可錯誤
當 Lambda 由於許可或身分驗證問題而無法存取結構描述登錄檔時,就會發生這些錯誤。
- AWS Glue 結構描述登錄檔存取遭拒
-
錯誤訊息:
Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.
解決方案:將所需的許可 (
glue:GetRegistry
和glue:GetSchemaVersion
) 新增至函數的執行角色。 - Confluent 結構描述登錄檔存取遭拒
-
錯誤訊息:
Cannot access Confluent Schema with the provided access configuration.
解決方案:確認您的身分驗證登入資料 (存放在 Secrets Manager 中) 是否正確,並具有存取結構描述登錄檔的必要許可。
- 跨帳戶 AWS Glue 結構描述登錄檔
-
錯誤訊息:
Cross-account Glue Schema Registry ARN not supported.
解決方案:使用與 Lambda 函數位於相同 AWS 帳戶的 AWS Glue 結構描述登錄檔。
- 跨區域 AWS Glue 結構描述登錄檔
-
錯誤訊息:
Cross-region Glue Schema Registry ARN not supported.
解決方案:使用與 Lambda 函數位於相同區域的 AWS Glue 結構描述登錄檔。
- 私密存取問題
-
錯誤訊息:
Lambda received InvalidRequestException from Secrets Manager.
解決方法:確認函數的執行角色具有存取秘密的許可,而且如果從其他帳戶存取 ,則不會使用預設 AWS KMS 金鑰加密秘密。
連線錯誤
當 Lambda 無法建立與結構描述登錄檔的連線時,就會發生這些錯誤。
- VPC 連線問題
-
錯誤訊息:
Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.
解決方案:設定您的 VPC 聯網,以允許使用 AWS PrivateLink NAT Gateway 或 VPC 對等互連連線至結構描述登錄檔。
- TLS 交握失敗
-
錯誤訊息:
Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.
解決方案:確認您的 CA 憑證和用戶端憑證 (適用於 mTLS) 在 Secrets Manager 中正確設定。
- 限流
-
錯誤訊息:
Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.
解決方案:提高結構描述登錄檔的 API 速率限制,或降低應用程式的請求速率。
- 自我管理結構描述登錄檔錯誤
-
錯誤訊息:
Lambda received an internal server an unexpected error from the provided self-managed schema registry.
解決方案:檢查自我管理結構描述登錄伺服器的運作狀態和組態。