

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 Lambda 中將結構描述登錄檔與 Kafka 事件來源搭配使用
<a name="services-consume-kafka-events"></a>

 結構描述登錄檔可協助您定義並管理資料串流結構描述。結構描述定義資料記錄的結構和格式。在 Kafka 事件來源映射的情境中，您可以設定結構描述登錄檔，在 Kafka 訊息送達 Lambda 函式之前，依據預先定義的結構描述驗證其結構與格式。這會將資料治理層新增至應用程式，讓您能夠有效管理資料格式、確保結構描述合規，並透過事件篩選來最佳化成本。

 此功能適用於所有程式設計語言，但請考慮以下要點：
+ Powertools for Lambda 提供 Java、Python 及 TypeScript 的專屬支援，不僅能維護與現有 Kafka 開發模式的一致性，更可讓您直接存取業務物件，無需自訂反序列化程式碼
+ 此功能僅適用於採用佈建模式的事件來源映射。結構描述登錄檔不支援隨需模式下的事件來源映射。如果您使用佈建模式且已設定結構描述登錄檔，將無法變更為隨需模式，除非您先移除結構描述登錄檔組態。如需詳細資訊，請參閱[佈建模式](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)
+ 每個事件來源映射 (ESM) 僅能設定一個結構描述登錄檔。將結構描述登錄檔與 Kafka 事件來源搭配使用，可能會增加 Lambda 事件輪詢單元 (EPU) 用量，此單元為佈建模式的一個定價維度。

**Topics**
+ [結構描述登錄檔選項](#services-consume-kafka-events-options)
+ [Lambda 如何對 Kafka 訊息執行結構描述驗證](#services-consume-kafka-events-how)
+ [設定 Kafka 結構描述登錄檔](#services-consume-kafka-events-config)
+ [Avro 與 Protobuf 的篩選](#services-consume-kafka-events-filtering)
+ [承載格式與反序列化行為](#services-consume-kafka-events-payload)
+ [在 Lambda 函式中處理反序列化資料](#services-consume-kafka-events-payload-examples)
+ [結構描述登錄檔的身分驗證方法](#services-consume-kafka-events-auth)
+ [結構描述登錄檔問題的錯誤處理與疑難排解](#services-consume-kafka-events-troubleshooting)

## 結構描述登錄檔選項
<a name="services-consume-kafka-events-options"></a>

 Lambda 支援下列結構描述登錄檔選項：
+ [AWS Glue 結構描述登錄檔](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)
+ [Confluent Cloud 結構描述登錄檔](https://docs.confluent.io/platform/current/schema-registry/index.html)
+ [自我管理的 Confluent 結構描述登錄檔](https://docs.confluent.io/platform/current/schema-registry/index.html)

 結構描述登錄檔支援驗證採用下列資料格式的訊息：
+ Apache Avro
+ 通訊協定緩衝區 (Protobuf)
+ JSON 結構描述 (JSON-SE)

 若要使用結構描述登錄檔，請先確保事件來源映射處於佈建模式。當您使用結構描述登錄檔時，Lambda 會將結構描述的相關中繼資料新增至承載。如需詳細資訊，請參閱[承載格式與反序列化行為](#services-consume-kafka-events-payload)。

## Lambda 如何對 Kafka 訊息執行結構描述驗證
<a name="services-consume-kafka-events-how"></a>

 當您設定結構描述登錄檔時，Lambda 會為每個 Kafka 訊息執行下列步驟：

1. Lambda 會從叢集輪詢 Kafka 記錄。

1. Lambda 會根據結構描述登錄檔中的特定結構描述，驗證記錄中選定的訊息屬性。
   + 如果在登錄檔中找不到與訊息相關聯的結構描述，Lambda 會將訊息傳送至 DLQ，並標示原因代碼 `SCHEMA_NOT_FOUND`。

1. 為了驗證訊息，Lambda 會依據結構描述登錄檔組態對訊息進行反序列化。如果設定了事件篩選，Lambda 會接著根據設定的篩選條件執行篩選。
   + 如果反序列化失敗，Lambda 會將訊息傳送至 DLQ，並標示原因代碼 `DESERIALIZATION_ERROR`。如果未設定 DLQ，Lambda 會捨棄訊息。

1. 如果訊息透過結構描述登錄檔進行驗證，且未依篩選條件篩選出，Lambda 將會使用該訊息調用函式。

 此功能旨在驗證已使用與結構描述登錄檔整合的 Kafka 用戶端所產生的訊息。建議將 Kafka 生產者設定為與結構描述登錄檔搭配使用，以便建立格式正確的訊息。

## 設定 Kafka 結構描述登錄檔
<a name="services-consume-kafka-events-config"></a>

 下列主控台步驟會將 Kafka 結構描述登錄檔組態新增至事件來源映射。

**將 Kafka 結構描述登錄檔組態新增至事件來源映射 (主控台)**

1. 開啟 Lambda 主控台中的[函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選擇 **Configuration (組態)**。

1. 選擇**觸發程序**。

1. 選取要為其設定結構描述登錄檔的 Kafka 事件來源映射，然後選擇**編輯**。

1. 在**事件輪詢器組態**欄位中，選擇**設定結構描述登錄檔**。事件來源映射必須處於佈建模式，方可查看此選項。

1. 對於**結構描述登錄 URI**，輸入 AWS Glue 結構描述登錄檔的 ARN，或 Confluent Cloud 結構描述登錄檔的 HTTPS URL 或自我管理的 Confluent 結構描述登錄檔。

1. 下列組態步驟會告知 Lambda 如何存取結構描述登錄檔。如需詳細資訊，請參閱[結構描述登錄檔的身分驗證方法](#services-consume-kafka-events-auth)。
   + 在**存取組態類型**欄位中，選擇 Lambda 用於存取結構描述登錄檔的身分驗證類型。
   + 在**存取組態 URI** 欄位中，輸入 Secrets Manager 秘密的 ARN，向結構描述登錄檔進行身分驗證 (如適用)。確保函式的[執行角色](with-msk-permissions.md)包含正確的許可。

1. 僅當結構描述登錄檔由私有憑證認證機構 (CA) 或未納入 Lambda 信任存放區的憑證認證機構 (CA) 簽署時，**加密**欄位才適用。如適用，請提供私密金鑰，其中應包含結構描述登錄檔用於 TLS 加密的私有 CA 憑證。

1. 在**事件記錄格式**欄位中，選擇希望 Lambda 在完成結構描述驗證後將記錄交付給函式的方式。如需詳細資訊，請參閱[承載格式範例](#services-consume-kafka-events-payload)。
   + 如果選擇 **JSON**，Lambda 會以標準 JSON 格式傳送您在下方「結構描述驗證屬性」中選取的屬性。對於您未選取的屬性，Lambda 會依原狀傳送這些屬性。
   + 如果選擇 **SOURCE**，Lambda 會以原始來源格式傳送您在下方「結構描述驗證屬性」中選取的屬性。

1. 在**結構描述驗證屬性**欄位中，選取您希望 Lambda 使用結構描述登錄檔進行驗證與反序列化的訊息屬性。必須至少選取 **KEY** 或 **VALUE** 其中一項。若您選擇 JSON 作為事件記錄格式，Lambda 還會在將選定的訊息屬性傳送至函式之前，對其進行反序列化。如需詳細資訊，請參閱[承載格式與反序列化行為](#services-consume-kafka-events-payload)。

1. 選擇**儲存**。

 您也可以使用 Lambda API 來建立或更新帶有結構描述登錄檔組態的事件來源映射。下列範例示範如何使用 設定 AWS Glue 或 Confluent 結構描述登錄檔 AWS CLI，其對應至 API 參考中的 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) 和 [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) API 操作： *AWS Lambda *

**重要**  
如果您使用 AWS CLI 或 `update-event-source-mapping` API 更新任何結構描述登錄檔組態欄位，則必須更新結構描述登錄檔組態的所有欄位。

------
#### [ Create Event Source Mapping ]

```
aws lambda create-event-source-mapping \
  --function-name my-schema-validator-function \
  --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \
  --topics my-kafka-topic \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --amazon-managed-kafka-event-source-mapping '{
      "SchemaRegistryConfig" : {
          "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
          "AccessConfigs": [{
              "Type": "BASIC_AUTH", 
              "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName"
          }],
          "EventRecordFormat": "JSON",
          "SchemaValidationConfigs": [
          { 
              "Attribute": "KEY" 
          },
          { 
              "Attribute": "VALUE" 
          }]
      }
  }'
```

------
#### [ Update AWS Glue Schema Registry ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName",
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Update Confluent Schema Registry with Authentication ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
            "AccessConfigs": [{
                "Type": "BASIC_AUTH", 
                "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName"
            }],
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Update Confluent Schema Registry without Authentication ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Remove Schema Registry Configuration ]

若要從事件來源映射中移除結構描述登錄檔組態，可以使用 *AWS Lambda API Reference* 中的 CLI 命令 [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)。

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {}
    }'
```

------

## Avro 與 Protobuf 的篩選
<a name="services-consume-kafka-events-filtering"></a>

 將 Avro 或 Protobuf 格式與結構描述登錄檔搭配使用時，即可將事件篩選功能套用至 Lambda 函式。篩選模式會在結構描述驗證後，套用至資料反序列化後的傳統 JSON 表示法。例如，使用定義產品詳細資訊 (包含價格) 的 Avro 結構描述，您可以根據價格值篩選訊息：

**注意**  
 當 Avro 進行反序列化時，會轉換為標準 JSON 格式，也意味著無法直接轉換回 Avro 物件。如需轉換為 Avro 物件，請改用 SOURCE 格式。  
 在 Protobuf 反序列化過程中，產生 JSON 的欄位名稱與結構描述中定義的欄位名稱相符，而非如 Protobuf 常規作法般為駝峰式大小寫。在建立篩選模式時，請記住這一點。

```
aws lambda create-event-source-mapping \
    --function-name myAvroFunction \
    --topics myAvroTopic \
    --starting-position TRIM_HORIZON \
    --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \
    --schema-registry-config '{
        "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry",
        "EventRecordFormat": "JSON",
        "SchemaValidationConfigs": [
            { 
                "Attribute": "VALUE" 
            }
        ]
    }' \
    --filter-criteria '{
        "Filters": [
            {
                "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }"
            }
        ]
    }'
```

 在此範例中，篩選模式會分析 `value` 物件，將 `field_1` 中符合 `"value1"`、`field_2` 中符合 `"value2"` 的訊息進行比對。在 Lambda 將訊息從 Avro 格式轉換為 JSON 格式之後，篩選條件會針對反序列化後的資料進行評估。

 如需有關事件篩選的詳細資訊，請參閱 [Lambda 事件篩選](invocation-eventfiltering.md)。

## 承載格式與反序列化行為
<a name="services-consume-kafka-events-payload"></a>

 使用結構描述登錄檔時，Lambda 會以類似於[常規事件承載](with-msk.md#msk-sample-event)的格式將最終承載傳送至函式，但會包含部分額外欄位。這些額外欄位取決於 `SchemaValidationConfigs` 參數。對於您選取進行驗證的每個屬性 (鍵或值)，Lambda 會將相應的結構描述中繼資料新增至承載。

**注意**  
必須將 [aws-lambda-java-events](https://github.com/aws/aws-lambda-java-libs/tree/main/aws-lambda-java-events) 更新至 3.16.0 版或更新版本，才能使用結構描述中繼資料欄位。

 例如，若驗證 `value` 欄位，Lambda 會將名為 `valueSchemaMetadata` 的欄位新增至承載。同理，如果驗證 `key` 欄位，Lambda 會新增名為 `keySchemaMetadata` 的欄位。此中繼資料包含資料格式與驗證所用結構描述 ID 的相關資訊：

```
"valueSchemaMetadata": {
    "dataFormat": "AVRO",
    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
}
```

 `EventRecordFormat` 參數可以設定為 `JSON` 或 `SOURCE`，這決定了 Lambda 在將經過結構描述驗證的資料傳送至函式之前的處理方式。每個選項都提供不同的處理功能：
+ `JSON`：Lambda 將經過驗證的屬性反序列化為標準 JSON 格式，便於資料在原生 JSON 支援的語言中直接使用。若無需保留原始二進位格式或無需使用產生的類別，此格式尤其適用。
+ `SOURCE`：Lambda 會將資料的原始二進位格式保留為 Base64 編碼字串，允許直接轉換為 Avro 或 Protobuf 物件。當使用強型別語言或需要保持 Avro 或 Protobuf 結構描述的完整功能時，此格式至關重要。

基於上述格式特性與語言專屬考量，建議採用下列格式：


**依據程式設計語言建議的格式**  

| Language | Avro | Protobuf | JSON | 
| --- | --- | --- | --- | 
| Java | SOURCE | SOURCE | SOURCE | 
| Python | JSON | JSON | JSON | 
| NodeJS | JSON | JSON | JSON | 
| .NET | SOURCE | SOURCE | SOURCE | 
| 其他 | JSON | JSON | JSON | 

下列各節詳細介紹了這些格式，也為每種格式提供了承載範例。

### JSON format (JSON 格式)
<a name="services-consume-kafka-events-payload-json"></a>

 若選擇 `JSON` 作為 `EventRecordFormat`，Lambda 會驗證並反序列化您在 `SchemaValidationConfigs` 欄位中選取的訊息屬性 (`key` 及/或 `value` 屬性)。Lambda 會將這些選定的屬性，以其標準 JSON 表述的 Base64 編碼字串形式，傳遞至您的函式中。

**注意**  
 當 Avro 進行反序列化時，會轉換為標準 JSON 格式，也意味著無法直接轉換回 Avro 物件。如需轉換為 Avro 物件，請改用 SOURCE 格式。  
 在 Protobuf 反序列化過程中，產生 JSON 的欄位名稱與結構描述中定義的欄位名稱相符，而非如 Protobuf 常規作法般為駝峰式大小寫。在建立篩選模式時，請記住這一點。

 下列承載範例假設您選擇 `JSON` 作為 `EventRecordFormat`，同時選擇 `key` 與 `value` 屬性作為 `SchemaValidationConfigs`：

```
{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON
            "keySchemaMetadata": {
                "dataFormat": "AVRO",
                "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
            },
            "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON
            "valueSchemaMetadata": {
                "dataFormat": "AVRO",
                "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
            },
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

 在此範例中：
+ `key` 與 `value` 經反序列化後，皆以其 JSON 表述的 Base64 編碼字串形式呈現。
+ Lambda 會包含 `keySchemaMetadata` 與 `valueSchemaMetadata` 中兩種屬性的結構描述中繼資料。
+ 函式可以解碼 `key` 和 `value` 字串，以存取反序列化後的 JSON 資料。

 對於非強型別語言，例如 Python 或 Node.js，建議採用 JSON 格式。這些語言原生支援將 JSON 轉換為物件。

### 來源格式
<a name="services-consume-kafka-events-payload-source"></a>

 如果您選擇 `SOURCE` 作為 `EventRecordFormat`，Lambda 仍然會根據結構描述登錄檔驗證記錄，但會將原始二進位資料傳送至函式，而不進行反序列化。此二進位資料會以原始位元組資料的 Base64 編碼字串形式傳送，並移除生產者附加的中繼資料。因此，您可在函式程式碼中，將原始二進位資料直接轉換為 Avro 與 Protobuf 物件。我們建議將 Powertools 用於 AWS Lambda，這會還原序列化原始二進位資料，並直接為您提供 Avro 和 Protobuf 物件。

 例如，若設定 Lambda 來驗證 `key` 與 `value` 屬性，但採用 `SOURCE` 格式，函式會收到如下承載：

```
{
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1",
    "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
    "records": {
        "mytopic-0": [
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 15,
                "timestamp": 1545084650987,
                "timestampType": "CREATE_TIME",
                "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed
                "keySchemaMetadata": {
                    "dataFormat": "AVRO",
                    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
                },
                "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed
                "valueSchemaMetadata": {
                    "dataFormat": "AVRO",
                    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
                },
                "headers": [
                    {
                        "headerKey": [
                            104,
                            101,
                            97,
                            100,
                            101,
                            114,
                            86,
                            97,
                            108,
                            117,
                            101
                        ]
                    }
                ]
            }
        ]
    }
}
```

 在此範例中：
+ `key` 和 `value` 都包含原始二進位資料作為 Base64 編碼字串。
+ 函式需要使用適當的程式庫來處理反序列化。

 若您使用 Avro 產生的物件或 Protobuf 產生的物件 (特別是在 Java 函式中)，則建議將 `EventRecordFormat` 設定為 `SOURCE`。這是因為 Java 屬於強型別語言，需針對 Avro 與 Protobuf 格式使用特定的反序列化。在函式程式碼中，您可以使用偏好的 Avro 或 Protobuf 程式庫將資料反序列化。

## 在 Lambda 函式中處理反序列化資料
<a name="services-consume-kafka-events-payload-examples"></a>

Powertools for AWS Lambda 可協助您根據您使用的格式，將函數程式碼中的 Kafka 記錄還原序列化。此公用程式透過處理資料轉換並提供即用型物件，簡化了 Kafka 記錄的處理流程。

 若要在函數 AWS Lambda 中使用 Powertools for ，您需要在建置 Lambda 函數時新增 Powertools AWS Lambda 做為 layer 或包含它做為相依性。如需設定說明和詳細資訊，請參閱 Powertools for AWS Lambda documentation for your preferred language：
+ [Powertools for AWS Lambda (Java)](https://docs.powertools.aws.dev/lambda/java/latest/utilities/kafka/)
+ [Powertools for AWS Lambda (Python)](https://docs.powertools.aws.dev/lambda/python/latest/utilities/kafka/)
+ [Powertools for AWS Lambda (TypeScript)](https://docs.powertools.aws.dev/lambda/typescript/latest/features/kafka/)
+ [Powertools for AWS Lambda (.NET)](https://docs.powertools.aws.dev/lambda/dotnet/utilities/kafka/)

**注意**  
使用結構描述登錄檔整合時，可選擇 `SOURCE` 或 `JSON` 格式。每個選項都支援不同的序列化格式，如下所示：  


| 格式 | 支援 | 
| --- | --- | 
| SOURCE | Avro 與 Protobuf (透過 Lambda 結構描述登錄檔整合) | 
| JSON | JSON 資料 | 

 使用 `SOURCE`或 `JSON` 格式時，您可以使用 Powertools for AWS 協助還原序列化函數程式碼中的資料。以下是如何處理不同資料格式的範例：

------
#### [ AVRO ]

Java 範例：

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.avro.AvroProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_AVRO)
    public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) {
        for (ConsumerRecord<String, AvroProduct> consumerRecord : records) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            AvroProduct product = consumerRecord.value();
            LOGGER.info("AvroProduct: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }

}
```

Python 範例：

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer
from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

logger = Logger(service="kafkaConsumerPowertools")

value_schema_str = open("customer_profile.avsc", "r").read()

schema_config = SchemaConfig(
value_schema_type="AVRO",
value_schema=value_schema_str)

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript 範例：

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';

import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

const schema = '{   
    "type": "record",   
    "name": "Product",   
    "fields": [     
        { "name": "id", "type": "int" },     
        { "name": "name", "type": "string" },     
        { "name": "price", "type": "double" }   
    ] 
}';

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
            // You can add more processing logic here
        }
    },
    {
        value: {
            type: 'avro',
            schema: schema,
        },
    }
);
```

.NET 範例：

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Avro;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))]

namespace ProtoBufClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------
#### [ PROTOBUF ]

Java 範例：

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.protobuf.ProtobufProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class ProtobufDeserializationFunction
        implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_PROTOBUF)
    public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) {
        for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            ProtobufProduct product = consumerRecord.value();
            LOGGER.info("ProtobufProduct: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }

}
```

Python 範例：

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer

from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

from user_pb2 import User # protobuf generated class

logger = Logger(service="kafkaConsumerPowertools")

schema_config = SchemaConfig(
value_schema_type="PROTOBUF",
value_schema=User)

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript 範例：

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';
import { Product } from './product.generated.js';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
        }
    },
    {
        value: {
            type: 'protobuf',
            schema: Product,
        },
    }
);
```

.NET 範例：

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Protobuf;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))]

namespace ProtoBufClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------
#### [ JSON ]

Java 範例：

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_JSON)
    public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) {
        for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            Product product = consumerRecord.value();
            LOGGER.info("Product: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }
}
```

Python 範例：

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer

from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

logger = Logger(service="kafkaConsumerPowertools")

schema_config = SchemaConfig(value_schema_type="JSON")

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript 範例：

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
            // You can add more processing logic here
        }
    },
    {
        value: {
            type: 'json',
        },
    }
);
```

.NET 範例：

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Json;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))]

namespace JsonClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------

## 結構描述登錄檔的身分驗證方法
<a name="services-consume-kafka-events-auth"></a>

 若要使用結構描述登錄檔，Lambda 需要能夠對其進行安全存取。如果您使用 AWS Glue 結構描述登錄檔，Lambda 依賴 IAM 身分驗證。這表示函數的[執行角色](lambda-intro-execution-role.md)必須具有下列許可，才能存取 AWS Glue 登錄檔：
+ *AWS Glue Web API Reference* 中的 [GetRegistry](https://docs.aws.amazon.com/glue/latest/webapi/API_GetRegistry.html)
+ *AWS Glue Web API Reference* 中的 [GetSchemaVersion](https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html)

必要 IAM 政策範例：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetRegistry",
                "glue:GetSchemaVersion"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**注意**  
 對於 AWS Glue 結構描述登錄檔，如果您`AccessConfigs`提供 AWS Glue 登錄檔，Lambda 將傳回驗證例外狀況。

若您使用 Confluent 結構描述登錄檔，可為 [KafkaSchemaRegistryAccessConfig](https://docs.aws.amazon.com/lambda/latest/api/API_KafkaSchemaRegistryAccessConfig) 物件的 `Type` 參數選擇以下三種身分驗證方法中一種：
+ **BASIC\_AUTH**：Lambda 會透過使用者名稱與密碼，或 API 金鑰與 API 秘密身分驗證方式存取登錄庫。如果選擇此選項，請在 URI 欄位中提供包含憑證的 Secrets Manager ARN。
+ **CLIENT\_CERTIFICATE\_TLS\_AUTH**：Lambda 透過用戶端憑證進行雙向 TLS 身分驗證。若要使用此選項，Lambda 需要同時存取憑證和私有金鑰。在 URI 欄位中提供包含這些憑證的 Secrets Manager ARN。
+ **NO\_AUTH**：公有 CA 憑證必須由位於 Lambda 信任存放區中的憑證認證機構 (CA) 簽署。對於私有 CA/自我簽署憑證，您需要設定伺服器根 CA 憑證。若要使用此選項，請省略 `AccessConfigs` 參數。

 此外，如果 Lambda 需要存取私有 CA 憑證來驗證結構描述登錄檔的 TLS 憑證，請選擇 `SERVER_ROOT_CA_CERT` 作為 `Type`，並在 URI 欄位中提供憑證對應的 Secrets Manager ARN。

**注意**  
 若要在主控台中設定 `SERVER_ROOT_CA_CERT` 選項，需要在**加密**欄位中提供包含該憑證的秘密 ARN。

 結構描述登錄檔的驗證組態，需獨立於 Kafka 叢集的任何驗證設定。即使兩者採用相似的驗證方法，仍須分別進行設定。

## 結構描述登錄檔問題的錯誤處理與疑難排解
<a name="services-consume-kafka-events-troubleshooting"></a>

在將結構描述登錄檔與 Amazon MSK 事件來源搭配使用時，可能會遭遇各種錯誤。本節提供了常見問題及其解決方法的指導。

### 組態錯誤
<a name="consume-kafka-events-troubleshooting-configuration-errors"></a>

這些錯誤通常發生在設定結構描述登錄檔組態時。

需要佈建模式  
**錯誤訊息**：`SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.`  
**解決方案：**透過在 `ProvisionedPollerConfig` 中設定 `MinimumPollers` 參數，為事件來源映射啟用佈建模式。

無效的結構描述登錄檔 URL  
**錯誤訊息**：`Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.`  
**解決方案：**為 Confluent 結構描述登錄檔提供有效的 HTTPS URL，或為 AWS Glue 結構描述登錄檔提供有效的 ARN。

無效或缺少事件記錄格式  
**錯誤訊息**：`EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.`  
**解決方案：**在結構描述登錄檔組態中，將 SOURCE 或 JSON 指定為 EventRecordFormat。

重複驗證屬性  
**錯誤訊息**：`Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.`  
**解決方案：**從 SchemaValidationConfigs 中移除重複的 KEY 或 VALUE 屬性。每種屬性類型只能出現一次。

缺少驗證組態  
**錯誤訊息**：`SchemaValidationConfigs is a required field for SchemaRegistryConfig.`  
**解決方案：**將 SchemaValidationConfigs 新增至組態，指定至少一個驗證屬性 (KEY 或 VALUE)。

### 存取與許可錯誤
<a name="consume-kafka-events-troubleshooting-access-errors"></a>

當 Lambda 陰許可或身分驗證問題而無法存取結構描述登錄檔時，就會發生這些錯誤。

AWS Glue 結構描述登錄檔存取遭拒  
**錯誤訊息**：`Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.`  
**解決方案：**將必要許可 (`glue:GetRegistry` 與 `glue:GetSchemaVersion`) 新增至函式的執行角色。

Confluent 結構描述登錄檔存取遭拒  
**錯誤訊息**：`Cannot access Confluent Schema with the provided access configuration.`  
**解決方案：**確認身分驗證憑證 (儲存於 Secrets Manager 中) 是否正確，以及是否具有存取結構描述登錄檔的必要許可。

跨帳戶 AWS Glue 結構描述登錄檔  
**錯誤訊息**：`Cross-account Glue Schema Registry ARN not supported.`  
**解決方案：**使用與 Lambda 函數位於相同 AWS 帳戶的 AWS Glue 結構描述登錄檔。

跨區域 AWS Glue 結構描述登錄檔  
**錯誤訊息**：`Cross-region Glue Schema Registry ARN not supported.`  
**解決方案：**使用與 Lambda 函數位於相同區域的 AWS Glue 結構描述登錄檔。

秘密存取問題  
**錯誤訊息**：`Lambda received InvalidRequestException from Secrets Manager.`  
**解決方法：**確認函數的執行角色具有存取秘密的許可，而且如果從其他帳戶存取 ，則不會使用預設 AWS KMS 金鑰加密秘密。

### 連線錯誤
<a name="consume-kafka-events-troubleshooting-connection-errors"></a>

當 Lambda 無法與結構描述登錄檔建立連線時，就會發生這些錯誤。

VPC 連線問題  
**錯誤訊息**：`Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.`  
**解決方案：**設定您的 VPC 聯網，以允許使用 AWS PrivateLink NAT Gateway 或 VPC 對等互連連線至結構描述登錄檔。

TLS 交握失敗  
**錯誤訊息**：`Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.`  
**解決方案：**確認 CA 憑證與用戶端憑證 (適用於 mTLS) 是否正確，是否在 Secrets Manager 中正確設定。

限流  
**錯誤訊息**：`Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.`  
**解決方案：**提高結構描述登錄檔的 API 速率限制，或降低應用程式的請求速率。

自我管理結構描述登錄檔錯誤  
**錯誤訊息**：`Lambda received an internal server an unexpected error from the provided self-managed schema registry.`  
**解決方案：**檢查自我管理結構描述登錄檔伺服器的運作狀態與組態。