

# Lambda での Kafka イベントソースとスキーマレジストリの使用
<a name="services-consume-kafka-events"></a>

 スキーマレジストリは、データストリームスキーマの定義と管理に役立ちます。スキーマ は、データレコードの構造と形式を定義します。Kafka イベントソースマッピングのコンテキストでは、Lambda 関数に到達する前に、事前定義されたスキーマに対して Kafka メッセージの構造と形式を検証するようにスキーマレジストリを設定できます。これにより、アプリケーションにデータガバナンスのレイヤーが追加され、イベントフィルタリングを通じてデータ形式を効率的に管理し、スキーマコンプライアンスを確保し、コストを最適化できます。

 この機能はすべてのプログラミング言語で機能しますが、以下の重要な点を考慮してください。
+ Powertools for Lambda は、Java、Python、TypeScript に固有のサポートを提供し、既存の Kafka 開発パターンとの一貫性を維持し、カスタム逆シリアル化コードなしでビジネスオブジェクトに直接アクセスできるようにします。
+ この機能は、プロビジョニングモードを使用したイベントソースマッピングでのみ使用できます。スキーマレジストリは、オンデマンドモードでのイベントソースマッピングをサポートしていません。プロビジョニングモードを使用していて、スキーマレジストリが設定されている場合、スキーマレジストリ設定を最初に削除しない限り、オンデマンドモードに変更することはできません。詳細については、[プロビジョンドモード](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)を参照してください。
+ イベントソースマッピング (ESM) ごとに設定できるスキーマレジストリは 1 つだけです。Kafka イベントソースでスキーマレジストリを使用すると、Lambda の Event Poller Unit (EPU) 使用量が増加する可能性があり、これはプロビジョンドモードの料金計算に影響します。

**Topics**
+ [スキーマレジストリオプション](#services-consume-kafka-events-options)
+ [Lambda が Kafka メッセージのスキーマ検証を実行する方法](#services-consume-kafka-events-how)
+ [Kafka スキーマレジストリの設定](#services-consume-kafka-events-config)
+ [Avro と Protobuf のフィルタリング](#services-consume-kafka-events-filtering)
+ [ペイロード形式と逆シリアル化の動作](#services-consume-kafka-events-payload)
+ [Lambda 関数での逆シリアル化されたデータの操作](#services-consume-kafka-events-payload-examples)
+ [スキーマレジストリの認証方法](#services-consume-kafka-events-auth)
+ [スキーマレジストリの問題のエラー処理とトラブルシューティング](#services-consume-kafka-events-troubleshooting)

## スキーマレジストリオプション
<a name="services-consume-kafka-events-options"></a>

 Lambda は、次のスキーマレジストリオプションをサポートしています。
+ [「AWS Glue スキーマレジストリ](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)」
+ [Confluent クラウドスキーマレジストリ](https://docs.confluent.io/platform/current/schema-registry/index.html)
+ [セルフマネージド Confluent スキーマレジストリ](https://docs.confluent.io/platform/current/schema-registry/index.html)

 スキーマレジストリは、次のデータ形式のメッセージの検証をサポートしています。
+ Apache Avro
+ プロトコルバッファ (Protobuf)
+ JSON スキーマ (JSON-SE)

 スキーマレジストリを使用するには、まずイベントソースマッピングがプロビジョニングモードであることを確認します。スキーマレジストリを使用すると、Lambda はスキーマに関するメタデータをペイロードに追加します。詳細については、「[Payload formats and deserialization behavior](#services-consume-kafka-events-payload)」を参照してください。

## Lambda が Kafka メッセージのスキーマ検証を実行する方法
<a name="services-consume-kafka-events-how"></a>

 スキーマレジストリを設定すると、Lambda は Kafka メッセージごとに次の手順を実行します。

1. Lambda は、クラスターから Kafka レコードをポーリングします。

1. Lambda は、スキーマレジストリ内の特定のスキーマに対してレコード内の選択したメッセージ属性を検証します。
   + メッセージに関連付けられたスキーマがレジストリに見つからない場合、Lambda は理由コード `SCHEMA_NOT_FOUND` を使用してメッセージを DLQ に送信します。

1. Lambda は、スキーマレジストリ設定に従ってメッセージを逆シリアル化し、メッセージを検証します。イベントフィルタリングが設定されている場合、Lambda は設定されたフィルター条件に基づいてフィルタリングを実行します。
   + 逆シリアル化が失敗した場合、Lambda は理由コード `DESERIALIZATION_ERROR` を使用して DLQ にメッセージを送信します。DLQ が設定されていない場合、Lambda はメッセージを削除します。

1. メッセージがスキーマレジストリによって検証され、フィルター条件によって除外されない場合、Lambda はメッセージを使用して関数を呼び出します。

 この機能は、スキーマレジストリと統合された Kafka クライアントを使用して既に生成されたメッセージを検証することを目的としています。スキーマレジストリと連携するように Kafka プロデューサーを設定して、適切にフォーマットされたメッセージを作成することをお勧めします。

## Kafka スキーマレジストリの設定
<a name="services-consume-kafka-events-config"></a>

 次のコンソールステップでは、Kafka スキーマレジストリ設定をイベントソースマッピングに追加します。

**Kafka スキーマレジストリ設定をイベントソースマッピングに追加するには (コンソール)**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開きます。

1. **[設定]** を選択します。

1. **[トリガー]** を選択します。

1. スキーマレジストリを設定する Kafka イベントソースマッピングを選択し、**[編集]** を選択します。

1. **[イベントポーラー設定]** で、**[スキーマレジストリを設定]** を選択します。このオプションを表示するには、イベントソースマッピングがプロビジョニングモードになっている必要があります。

1. **[スキーマレジストリ URI]** には、AWS Glue スキーマレジストリの ARN、または Confluent Cloud スキーマレジストリまたはセルフマネージド Confluent スキーマレジストリの HTTPS URL を入力します。

1. 以下の設定ステップでは、Lambda にスキーマレジストリへのアクセス方法を指示します。詳細については、「[スキーマレジストリの認証方法](#services-consume-kafka-events-auth)」を参照してください。
   + **[アクセス設定タイプ]** で、Lambda がスキーマレジストリにアクセスするために使用する認証のタイプを選択します。
   + **[アクセス設定 URI]** には、該当する場合は Secrets Manager シークレットの ARN を入力して、スキーマレジストリで認証します。関数の[実行ロール](with-msk-permissions.md)に正しいアクセス許可が含まれていることを確認します。

1. **[暗号化]** フィールドは、スキーマレジストリがプライベート認証局 (CA) または Lambda トラストストアにない認証局 (CA) によって署名されている場合にのみ適用されます。該当する場合は、TLS 暗号化のためにスキーマレジストリで使用されるプライベート CA 証明書を含むシークレットキーを指定します。

1. **イベントレコード形式**で、スキーマ検証後に Lambda が関数にレコードを配信する方法を選択します。詳細については、「[Payload format examples](#services-consume-kafka-events-payload)」を参照してください。
   + **[JSON]** を選択した場合、Lambda は以下のスキーマ検証属性で選択した属性を標準 JSON 形式で配信します。選択しない属性については、Lambda がそのまま配信します。
   + **[SOURCE]** を選択した場合、Lambda は以下のスキーマ検証属性で選択した属性を元のソース形式で配信します。

1. **[スキーマ検証属性]** で、Lambda がスキーマレジストリを使用して検証および逆シリアル化するメッセージ属性を選択します。**[KEY]** または **[VALUE]** のどちらかを選択する必要があります。イベントレコード形式に [JSON] を選択した場合、[Lambda] は選択したメッセージ属性を関数に送信する前に逆シリアル化します。詳細については、「[Payload formats and deserialization behavior](#services-consume-kafka-events-payload)」を参照してください。

1. **[保存]** を選択します。

 Lambda API を使用して、スキーマレジストリ設定でイベントソースマッピングを作成または更新することもできます。次の例は、API *AWS Lambdaリファレンス*の [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) および [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) API オペレーションに対応する AWS CLIを使用して、AWS Glue または Confluent スキーマレジストリを設定する方法を示しています。

**重要**  
AWS CLI または `update-event-source-mapping` API を使用してスキーマレジストリ設定フィールドを更新する場合は、スキーマレジストリ設定のすべてのフィールドを更新する必要があります。

------
#### [ Create Event Source Mapping ]

```
aws lambda create-event-source-mapping \
  --function-name my-schema-validator-function \
  --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \
  --topics my-kafka-topic \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --amazon-managed-kafka-event-source-mapping '{
      "SchemaRegistryConfig" : {
          "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
          "AccessConfigs": [{
              "Type": "BASIC_AUTH", 
              "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName"
          }],
          "EventRecordFormat": "JSON",
          "SchemaValidationConfigs": [
          { 
              "Attribute": "KEY" 
          },
          { 
              "Attribute": "VALUE" 
          }]
      }
  }'
```

------
#### [ Update AWS Glue Schema Registry ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName",
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Update Confluent Schema Registry with Authentication ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
            "AccessConfigs": [{
                "Type": "BASIC_AUTH", 
                "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName"
            }],
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Update Confluent Schema Registry without Authentication ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Remove Schema Registry Configuration ]

イベントソースマッピングからスキーマレジストリ設定を削除するには、「*AWS Lambda API リファレンス*」にある CLI コマンド [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) を使用できます。

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {}
    }'
```

------

## Avro と Protobuf のフィルタリング
<a name="services-consume-kafka-events-filtering"></a>

 スキーマレジストリで Avro または Protobuf 形式を使用する場合、Lambda 関数にイベントフィルタリングを適用できます。フィルターパターンは、スキーマ検証後のデータの逆シリアル化された従来の JSON 表現に適用されます。例えば、価格を含む製品詳細を定義する Avro スキーマでは、価格の値に基づいてメッセージをフィルタリングできます。

**注記**  
 逆シリアル化されると、Avro は標準 JSON に変換されます。つまり、Avro オブジェクトに直接変換することはできません。Avro オブジェクトに変換する必要がある場合は、代わりに SOURCE 形式を使用します。  
 Protobuf の逆シリアル化の場合、結果の JSON のフィールド名は、Protobuf が通常行うようにキャメルケースに変換されるのではなく、スキーマで定義されたものと一致します。フィルタリングパターンを作成するときは、この点に注意してください。

```
aws lambda create-event-source-mapping \
    --function-name myAvroFunction \
    --topics myAvroTopic \
    --starting-position TRIM_HORIZON \
    --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \
    --schema-registry-config '{
        "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry",
        "EventRecordFormat": "JSON",
        "SchemaValidationConfigs": [
            { 
                "Attribute": "VALUE" 
            }
        ]
    }' \
    --filter-criteria '{
        "Filters": [
            {
                "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }"
            }
        ]
    }'
```

 この例では、フィルターパターンは `value` オブジェクトを分析し、`field_1` のメッセージを `"value1"` と、`field_2` を `"value2"` と一致させます。Lambda がメッセージを Avro 形式から JSON に変換した後、フィルター条件は逆シリアル化されたデータに対して評価されます。

 イベント フィルタリングの詳細については、「[Lambda イベントフィルタリング](invocation-eventfiltering.md)」を参照してください。

## ペイロード形式と逆シリアル化の動作
<a name="services-consume-kafka-events-payload"></a>

 スキーマレジストリを使用する場合、Lambda は最終的なペイロードを[通常のイベントペイロード](with-msk.md#msk-sample-event)と同様の形式で関数に配信し、いくつかの追加フィールドがあります。追加のフィールドは、`SchemaValidationConfigs` パラメータによって異なります。検証用に選択した属性 (キーまたは値) ごとに、Lambda は対応するスキーマメタデータをペイロードに追加します。

**注記**  
スキーマメタデータフィールドを使用するには、[aws-lambda-java-events](https://github.com/aws/aws-lambda-java-libs/tree/main/aws-lambda-java-events) をバージョン 3.16.0 以降に更新する必要があります。

 例えば、`value` フィールドを検証すると、Lambda は `valueSchemaMetadata` というフィールドをペイロードに追加します。同様に、`key` フィールドの場合、Lambda は `keySchemaMetadata` というフィールドを追加します。このメタデータには、検証に使用されるデータ形式とスキーマ ID に関する情報が含まれています。

```
"valueSchemaMetadata": {
    "dataFormat": "AVRO",
    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
}
```

 `EventRecordFormat` パラメータは `JSON` または `SOURCE` のいずれかに設定できます。これにより、Lambda がスキーマ検証済みデータを関数に配信する前に処理する方法が決まります。各オプションには、さまざまな処理機能があります。
+ `JSON` - Lambda は検証された属性を標準 JSON 形式に逆シリアル化し、ネイティブ JSON をサポートする言語でデータを直接使用できるようにします。この形式は、元のバイナリ形式を保持したり、生成されたクラスを操作する必要がない場合に最適です。
+ `SOURCE` - Lambda はデータの元のバイナリ形式を Base64 エンコードされた文字列として保持し、Avro オブジェクトまたは Protobuf オブジェクトに直接変換できるようにします。この形式は、厳密に型指定された言語を使用する場合や、Avro スキーマまたは Protobuf スキーマの全機能を維持する必要がある場合に不可欠です。

これらの形式特性と言語固有の考慮事項に基づいて、次の形式をお勧めします。


**プログラミング言語に基づく推奨形式**  

| Language | Avro | Protobuf | JSON | 
| --- | --- | --- | --- | 
| Java | SOURCE | SOURCE | SOURCE | 
| Python | JSON | JSON | JSON | 
| NodeJS | JSON | JSON | JSON | 
| .NET | SOURCE | SOURCE | SOURCE | 
| その他 | JSON | JSON | JSON | 

以下のセクションでは、これらの形式について詳しく説明し、各形式のペイロードの例を示します。

### JSON 形式
<a name="services-consume-kafka-events-payload-json"></a>

 `JSON` を `EventRecordFormat` として選択すると、Lambda は [`SchemaValidationConfigs`] フィールドで選択したメッセージ属性 (`key` および/または `value` 属性) を検証および逆シリアル化します。Lambda は選択した属性を、関数内の標準 JSON 表現の base64 エンコードされた文字列として配信します。

**注記**  
 逆シリアル化されると、Avro は標準 JSON に変換されます。つまり、Avro オブジェクトに直接変換することはできません。Avro オブジェクトに変換する必要がある場合は、代わりに SOURCE 形式を使用します。  
 Protobuf の逆シリアル化の場合、結果の JSON のフィールド名は、Protobuf が通常行うようにキャメルケースに変換されるのではなく、スキーマで定義されたものと一致します。フィルタリングパターンを作成するときは、この点に注意してください。

 ペイロードの例を次に示します。ここでは、`JSON` を `EventRecordFormat` として指定し、`key` および `value` 属性の両方を `SchemaValidationConfigs` として指定します。

```
{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON
            "keySchemaMetadata": {
                "dataFormat": "AVRO",
                "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
            },
            "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON
            "valueSchemaMetadata": {
                "dataFormat": "AVRO",
                "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
            },
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

 この例では、以下のようになっています：
+ `key` と `value` はどちらも、逆シリアル化後の JSON 表現の base64 エンコードされた文字列です。
+ Lambda には、`keySchemaMetadata` と `valueSchemaMetadata` の両方の属性のスキーマメタデータが含まれます。
+ 関数は、`key` および `value` 文字列をデコードして、逆シリアル化された JSON データにアクセスできます。

 JSON 形式は、Python や Node.js など、厳密に入力されていない言語に推奨されます。これらの言語は、JSON のオブジェクトへの変換をネイティブにサポートしています。

### ソース形式
<a name="services-consume-kafka-events-payload-source"></a>

 `SOURCE` を `EventRecordFormat` として指定した場合でも、Lambda はスキーマレジストリに対してレコードを検証しますが、元のバイナリデータを逆シリアル化せずに関数に配信します。このバイナリデータは、元のバイトデータの Base64 でエンコードされた文字列として配信され、プロデューサーが付加したメタデータは削除されます。その結果、未加工のバイナリデータを関数コード内の Avro オブジェクトと Protobuf オブジェクトに直接変換できます。AWS Lambda には Powertools を使用することをお勧めします。これにより、未加工のバイナリデータが逆シリアル化され、Avro オブジェクトと Protobuf オブジェクトが直接提供されます。

 例えば、`key` 属性と `value` 属性の両方を検証するように Lambda を設定し、`SOURCE` 形式を使用する場合、関数は次のようなペイロードを受け取ります。

```
{
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1",
    "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
    "records": {
        "mytopic-0": [
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 15,
                "timestamp": 1545084650987,
                "timestampType": "CREATE_TIME",
                "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed
                "keySchemaMetadata": {
                    "dataFormat": "AVRO",
                    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
                },
                "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed
                "valueSchemaMetadata": {
                    "dataFormat": "AVRO",
                    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
                },
                "headers": [
                    {
                        "headerKey": [
                            104,
                            101,
                            97,
                            100,
                            101,
                            114,
                            86,
                            97,
                            108,
                            117,
                            101
                        ]
                    }
                ]
            }
        ]
    }
}
```

 この例では、以下のようになっています：
+ `key` と `value` の両方に、元のバイナリデータが Base64 でエンコードされた文字列として含まれます。
+ 関数は、適切なライブラリを使用して逆シリアル化を処理する必要があります。

 Avro 生成オブジェクトまたは Protobuf 生成オブジェクト、特に Java 関数を使用している場合は、`EventRecordFormat` ではなく `SOURCE` を選択することをお勧めします。これは、Java が厳密に型付けされており、Avro 形式と Protobuf 形式に特定のデシリアライザーが必要であるためです。関数コードでは、任意の Avro または Protobuf ライブラリを使用してデータを逆シリアル化できます。

## Lambda 関数での逆シリアル化されたデータの操作
<a name="services-consume-kafka-events-payload-examples"></a>

Powertools for AWS Lambda は、使用する形式に基づいて、関数コード内で Kafka レコードを逆シリアル化するのに役立ちます。このユーティリティは、データ変換を処理し、すぐに使えるオブジェクトを提供することで、Kafka レコードの操作を簡素化します。

 関数内で Powertools for AWS Lambda を使用するには、Lambda 関数の構築時に Powertools for AWS Lambda をレイヤーとして追加するか、依存関係として含める必要があります。セットアップ手順と詳細については、使用する言語の　Powertools for AWS Lambda ドキュメントを参照してください。
+ [Powertools for AWS Lambda (Java)](https://docs.powertools.aws.dev/lambda/java/latest/utilities/kafka/)
+ [Powertools for AWS Lambda (Python)](https://docs.powertools.aws.dev/lambda/python/latest/utilities/kafka/)
+ [Powertools for AWS Lambda (TypeScript)](https://docs.powertools.aws.dev/lambda/typescript/latest/features/kafka/)
+ [Powertools for AWS Lambda (.NET)](https://docs.powertools.aws.dev/lambda/dotnet/utilities/kafka/)

**注記**  
スキーマレジストリ統合を使用する場合は、`SOURCE` または `JSON` 形式を選択できます。各オプションは、以下に示すようにさまざまなシリアル化形式をサポートしています。  


| 形式 | サポート対象 | 
| --- | --- | 
|  SOURCE  |  Avro と Protobuf (Lambda Schema Registry 統合を使用)  | 
|  JSON  |  JSON データ  | 

 `SOURCE` または `JSON` 形式を使用する場合、Powertools for AWS を使用して、関数コード内のデータを逆シリアル化できます。さまざまなデータ形式を処理する方法の例を以下に示します。

------
#### [ AVRO ]

Java の例:

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.avro.AvroProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_AVRO)
    public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) {
        for (ConsumerRecord<String, AvroProduct> consumerRecord : records) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            AvroProduct product = consumerRecord.value();
            LOGGER.info("AvroProduct: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }

}
```

Python の例

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer
from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

logger = Logger(service="kafkaConsumerPowertools")

value_schema_str = open("customer_profile.avsc", "r").read()

schema_config = SchemaConfig(
value_schema_type="AVRO",
value_schema=value_schema_str)

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript サンプル:

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';

import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

const schema = '{   
    "type": "record",   
    "name": "Product",   
    "fields": [     
        { "name": "id", "type": "int" },     
        { "name": "name", "type": "string" },     
        { "name": "price", "type": "double" }   
    ] 
}';

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
            // You can add more processing logic here
        }
    },
    {
        value: {
            type: 'avro',
            schema: schema,
        },
    }
);
```

.NET/C\$1 の例:

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Avro;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))]

namespace ProtoBufClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------
#### [ PROTOBUF ]

Java の例:

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.protobuf.ProtobufProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class ProtobufDeserializationFunction
        implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_PROTOBUF)
    public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) {
        for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            ProtobufProduct product = consumerRecord.value();
            LOGGER.info("ProtobufProduct: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }

}
```

Python の例

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer

from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

from user_pb2 import User # protobuf generated class

logger = Logger(service="kafkaConsumerPowertools")

schema_config = SchemaConfig(
value_schema_type="PROTOBUF",
value_schema=User)

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript サンプル:

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';
import { Product } from './product.generated.js';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
        }
    },
    {
        value: {
            type: 'protobuf',
            schema: Product,
        },
    }
);
```

.NET/C\$1 の例:

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Protobuf;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))]

namespace ProtoBufClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------
#### [ JSON ]

Java の例:

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_JSON)
    public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) {
        for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            Product product = consumerRecord.value();
            LOGGER.info("Product: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }
}
```

Python の例

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer

from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

logger = Logger(service="kafkaConsumerPowertools")

schema_config = SchemaConfig(value_schema_type="JSON")

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript サンプル:

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
            // You can add more processing logic here
        }
    },
    {
        value: {
            type: 'json',
        },
    }
);
```

.NET/C\$1 の例:

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Json;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))]

namespace JsonClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------

## スキーマレジストリの認証方法
<a name="services-consume-kafka-events-auth"></a>

 スキーマレジストリを使用するには、Lambda がスキーマレジストリに安全にアクセスできる必要があります。AWS Glue スキーマレジストリを使用している場合、Lambda は IAM 認証に依存します。つまり、関数の [実行ロール](lambda-intro-execution-role.md)には、AWS Glue レジストリにアクセスするための次のアクセス許可が必要です。
+ *AWS Glueウェブ API リファレンス*の [GetRegistry](https://docs.aws.amazon.com/glue/latest/webapi/API_GetRegistry.html)
+ *AWS Glueウェブ API リファレンス*の [GetSchemaVersion](https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html)

必要な IAM ポリシーの例：

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetRegistry",
                "glue:GetSchemaVersion"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**注記**  
 AWS Glue スキーマレジストリの場合、AWS Glue レジストリ に `AccessConfigs` を指定すると、Lambda は検証例外を返します。

Confluent スキーマレジストリを使用している場合は、[KafkaSchemaRegistryAccessConfig](https://docs.aws.amazon.com/lambda/latest/api/API_KafkaSchemaRegistryAccessConfig) オブジェクトの `Type` パラメータでサポートされている 3 つの認証方法のいずれかを選択できます。
+ **BASIC\$1AUTH** — Lambda はユーザー名とパスワードまたは API キーと API シークレット認証を使用してレジストリにアクセスします。このオプションを選択した場合、[URI] フィールドに認証情報を含む Secrets Manager ARN を指定します。
+ **CLIENT\$1CERTIFICATE\$1TLS\$1AUTH** — Lambda はクライアント証明書で相互 TLS 認証を使用します。このオプションを使用するには、Lambda が証明書とプライベートキーの両方にアクセスする必要があります。これらの認証情報を含む Secrets Manager ARN を [URI] フィールドに入力します。
+ **NO\$1AUTH** — パブリック CA 証明書は、Lambda トラストストア内の認証局 (CA) によって署名される必要があります。プライベート CA/自己署名証明書の場合は、サーバルート CA 証明書を設定します。このオプションを使用するには、`AccessConfigs` パラメータを省略します。

 さらに、Lambda がスキーマレジストリの TLS 証明書を検証するためにプライベート CA 証明書にアクセスする必要がある場合は、`Type` として [`SERVER_ROOT_CA_CERT`] を選択し、[URI] フィールドの証明書に Secrets Manager ARN を指定します。

**注記**  
 コンソールで `SERVER_ROOT_CA_CERT` オプションを設定するには、**[暗号化]** フィールドに証明書を含むシークレット ARN を指定します。

 スキーマレジストリの認証設定は、Kafka クラスター用に設定した認証とは別のものです。同様の認証方法を使用している場合でも、両方を個別に設定する必要があります。

## スキーマレジストリの問題のエラー処理とトラブルシューティング
<a name="services-consume-kafka-events-troubleshooting"></a>

Amazon MSK イベントソースでスキーマレジストリを使用すると、さまざまなエラーが発生する可能性があります。このセクションでは、一般的な問題とその解決方法に関するガイダンスを提供します。

### 設定エラー
<a name="consume-kafka-events-troubleshooting-configuration-errors"></a>

これらのエラーは、スキーマレジストリの設定時に発生します。

プロビジョニングモードが必要です  
**エラーメッセージ**: `SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.`  
**解決策:** `ProvisionedPollerConfig` の `MinimumPollers` パラメータを設定して、イベントソースマッピングのプロビジョニングモードを有効にします。

無効なスキーマレジストリ URL  
**エラーメッセージ**: `Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.`  
**解決策:** Confluent スキーマレジストリの有効な HTTPS URL または AWS Glue Schema Registry の有効な ARN を指定します。

無効または欠落しているイベントレコード形式  
**エラーメッセージ**: `EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.`  
**解決策:** スキーマレジストリ設定で EventRecordFormat として SOURCE または JSON を指定します。

重複する検証属性  
**エラーメッセージ**: `Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.`  
**解決策:** SchemaValidationConfigs から重複する KEY 属性または VALUE 属性を削除します。各属性タイプは 1 回しか表示できません。

検証設定がありません  
**エラーメッセージ**: `SchemaValidationConfigs is a required field for SchemaRegistryConfig.`  
**解決策:** SchemaValidationConfigs を設定に追加し、少なくとも 1 つの検証属性 (KEY または VALUE) を指定します。

### アクセスとアクセス許可のエラー
<a name="consume-kafka-events-troubleshooting-access-errors"></a>

これらのエラーは、アクセス許可または認証の問題により Lambda がスキーマレジストリにアクセスできない場合に発生します。

AWS Glue スキーマレジストリへのアクセスが拒否されました  
**エラーメッセージ**: `Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.`  
**解決策:** 関数の実行ロールに必要なアクセス許可（`glue:GetRegistry` と `glue:GetSchemaVersion`）を追加します。

Confluent スキーマレジストリへのアクセスが拒否されました  
**エラーメッセージ**: `Cannot access Confluent Schema with the provided access configuration.`  
**解決策:** 認証情報 (Secrets Manager に保存) が正しく、スキーマレジストリにアクセスするために必要なアクセス許可があることを確認します。

クロスアカウント AWS Glue スキーマレジストリ  
**エラーメッセージ**: `Cross-account Glue Schema Registry ARN not supported.`  
**解決策:** Lambda 関数と同じ AWS アカウントにある AWS Glue スキーマレジストリを使用します。

クロスリージョン AWS Glue スキーマレジストリ  
**エラーメッセージ**: `Cross-region Glue Schema Registry ARN not supported.`  
**解決策:** Lambda 関数と同じリージョンにある AWS Glue スキーマレジストリを使用します。

シークレットアクセスの問題  
**エラーメッセージ**: `Lambda received InvalidRequestException from Secrets Manager.`  
**解決策:** 関数の実行ロールにシークレットへのアクセス許可があり、別のアカウントからアクセスする場合、シークレットがデフォルトの AWS KMS キーで暗号化されていないことを確認します。

### 接続エラー
<a name="consume-kafka-events-troubleshooting-connection-errors"></a>

これらのエラーは、Lambda がスキーマレジストリへの接続を確立できない場合に発生します。

VPC 接続の問題  
**エラーメッセージ**: `Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.`  
**解決策:** AWS PrivateLink、NAT ゲートウェイ、または VPC ピアリングを使用してスキーマレジストリへの接続を許可するように VPC ネットワークを設定します。

TLS ハンドシェイクの失敗  
**エラーメッセージ**: `Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.`  
**解決策:** CA 証明書とクライアント証明書 (mTLS 用) が正しく、Secrets Manager で適切に設定されていることを確認します。

スロットリング  
**エラーメッセージ**: `Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.`  
**解決策:** スキーマレジストリの API レート制限を引き上げるか、アプリケーションからのリクエストのレートを減らします。

セルフマネージドスキーマレジストリエラー  
**エラーメッセージ**: `Lambda received an internal server an unexpected error from the provided self-managed schema registry.`  
**解決策:** セルフマネージドスキーマレジストリサーバーの状態と設定を確認します。