

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Utilisation de registres de schémas avec les sources d’événements Kafka dans Lambda
<a name="services-consume-kafka-events"></a>

 Les registres de schémas vous aident à définir et gérer des schémas de flux de données. Un schéma définit la structure et le format d'un enregistrement de données. Dans le contexte des mappages des sources d’événements Kafka, vous pouvez configurer un registre de schémas pour valider la structure et le format des messages Kafka par rapport à des schémas prédéfinis avant qu’ils atteignent votre fonction Lambda. Cela ajoute une couche de gouvernance des données à votre application et vous permet de gérer efficacement les formats de données, de garantir la conformité des schémas et d’optimiser les coûts grâce au filtrage des événements. 

 Cette fonctionnalité est compatible avec tous les langages de programmation, mais tenez compte des points importants suivants : 
+ Powertools for Lambda fournit un support spécifique pour Java, Python TypeScript et assure la cohérence avec les modèles de développement Kafka existants et permet un accès direct aux objets métier sans code de désérialisation personnalisé.
+ Cette fonctionnalité n’est disponible que pour les mappages des sources d’événements utilisant le mode provisionné. Le registre des schémas ne prend pas en charge les mappages des sources d’événements en mode à la demande. Si vous utilisez le mode provisionné et qu’un registre de schéma est configuré, vous ne pouvez pas passer en mode à la demande, sauf si vous supprimez d’abord la configuration de votre registre de schémas. Pour de plus amples informations, consultez [Mode alloué](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode).
+ Vous ne pouvez configurer qu’un seul registre de schémas par mappage des sources d’événements (ESM). L’utilisation d’un registre de schémas avec votre source d’événements Kafka peut augmenter votre utilisation d’unités de sondeur d’événements (EPU) Lambda, une dimension tarifaire pour le mode provisionné. 

**Topics**
+ [Options du registre de schémas](#services-consume-kafka-events-options)
+ [Comment Lambda effectue la validation du schéma pour les messages Kafka](#services-consume-kafka-events-how)
+ [Configuration d’un registre de schémas Kafka](#services-consume-kafka-events-config)
+ [Filtrage pour Avro et Protobuf](#services-consume-kafka-events-filtering)
+ [Formats de données utiles et comportement de désérialisation](#services-consume-kafka-events-payload)
+ [Utilisation de données désérialisées dans des fonctions Lambda](#services-consume-kafka-events-payload-examples)
+ [Méthodes d’authentification pour votre registre de schémas](#services-consume-kafka-events-auth)
+ [Gestion des erreurs et résolution des problèmes liés au registre de schémas](#services-consume-kafka-events-troubleshooting)

## Options du registre de schémas
<a name="services-consume-kafka-events-options"></a>

 Lambda prend en charge les options de registre de schémas suivantes : 
+ [AWS Glue Registre de schémas](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)
+ [Registre de schémas Confluent Cloud](https://docs.confluent.io/platform/current/schema-registry/index.html)
+ [Registre de schémas Confluent autogéré](https://docs.confluent.io/platform/current/schema-registry/index.html)

 Votre registre de schémas prend en charge la validation des messages dans les formats de données suivants : 
+ Apache Avro
+ Protocol Buffers (Protobuf)
+ Schéma JSON (JSON-SE)

 Pour utiliser un registre de schémas, assurez-vous d’abord que votre mappage des sources d’événements est en mode provisionné. Lorsque vous utilisez un registre de schémas, Lambda ajoute des métadonnées relatives au schéma aux données utiles. Pour plus d’informations, consultez [Formats de données utiles et comportement de désérialisation](#services-consume-kafka-events-payload). 

## Comment Lambda effectue la validation du schéma pour les messages Kafka
<a name="services-consume-kafka-events-how"></a>

 Lorsque vous configurez un registre de schémas, Lambda exécute les étapes suivantes pour chaque message Kafka : 

1. Lambda interroge l’enregistrement Kafka de votre cluster.

1. Lambda valide les attributs de message sélectionnés dans l’enregistrement par rapport à un schéma spécifique de votre registre de schémas.
   + Si le schéma associé au message n’est pas trouvé dans le registre, Lambda envoie le message à une DLQ avec le code de motif `SCHEMA_NOT_FOUND`.

1. Lambda désérialise le message conformément à la configuration du registre de schémas pour valider le message. Si le filtrage des événements est configuré, Lambda effectue ensuite le filtrage en fonction des critères de filtre configurés.
   + Si la désérialisation échoue, Lambda envoie le message à une DLQ avec le code de motif `DESERIALIZATION_ERROR`. Si aucune DLQ n’est configurée, Lambda abandonne le message.

1. Si le message est validé par le registre de schémas et qu’il n’est pas filtré selon vos critères de filtre, Lambda invoque votre fonction avec le message.

 Cette fonctionnalité est destinée à valider les messages déjà produits à l’aide de clients Kafka intégrés à un registre de schémas. Nous vous recommandons de configurer vos producteurs Kafka pour qu’ils fonctionnent avec votre registre de schémas afin de créer des messages correctement formatés. 

## Configuration d’un registre de schémas Kafka
<a name="services-consume-kafka-events-config"></a>

 Les étapes de console suivantes ajoutent une configuration de registre de schémas Kafka à votre mappage des sources d’événements. 

**Ajouter une configuration de registre de schémas Kafka à votre mappage des sources d’événements (console)**

1. Ouvrez la [page Fonction](https://console.aws.amazon.com/lambda/home#/functions) de la console Lambda.

1. Choisissez **Configuration**.

1. Choisissez **Déclencheurs**.

1. Sélectionnez le mappage des sources d’événements Kafka pour lequel vous souhaitez configurer un registre de schémas, puis choisissez **Modifier**.

1. Sous **Configuration du sondeur d’événements**, choisissez **Configurer le registre de schémas**. Votre mappage des sources d’événements doit être en mode provisionné pour voir cette option.

1. Pour l'**URI du registre de schéma**, entrez l'ARN de votre registre de AWS Glue schémas ou l'URL HTTPS de votre registre de schémas Confluent Cloud ou de votre registre de schémas Confluent autogéré.

1. Les étapes de configuration suivantes indiquent à Lambda comment accéder à votre registre de schémas. Pour de plus amples informations, veuillez consulter [Méthodes d’authentification pour votre registre de schémas](#services-consume-kafka-events-auth).
   + Pour **Type de configuration d’accès**, choisissez le type d’authentification utilisé par Lambda pour accéder à votre registre de schémas.
   + Pour **URI de configuration d’accès**, entrez l’ARN du secret Secrets Manager pour vous authentifier auprès de votre registre de schémas, le cas échéant. Assurez-vous que le [rôle d’exécution](with-msk-permissions.md) de votre fonction contient les autorisations appropriées.

1. Le champ **Chiffrement** s’applique uniquement si votre registre de schémas est signé par une autorité de certification (CA) privée ou une autorité de certification (CA) qui ne figure pas dans le magasin d’approbations Lambda. Le cas échéant, fournissez la clé secrète contenant le certificat CA privé utilisé par votre registre de schémas pour le chiffrement TLS.

1. Pour **Format d’enregistrement de l’événement**, choisissez la manière dont vous souhaitez que Lambda transmette les enregistrements à votre fonction après la validation du schéma. Pour plus d’informations consultez [Exemples de formats de données utiles](#services-consume-kafka-events-payload).
   + Si vous choisissez **JSON**, Lambda fournit les attributs que vous sélectionnez dans Attribut de validation du schéma ci-dessous au format JSON standard. Pour les attributs que vous ne sélectionnez pas, Lambda les fournit en l’état.
   + Si vous choisissez **SOURCE**, Lambda fournit les attributs que vous avez sélectionnés dans Attribut de validation du schéma ci-dessous dans leur format source d’origine.

1. Pour **Attribut de validation du schéma**, sélectionnez les attributs de message que Lambda doit valider et désérialiser à l’aide de votre registre de schémas. Vous devez sélectionner au moins une option entre **KEY** et **VALUE**. Si vous avez choisi JSON pour le format d’enregistrement d’événement, Lambda désérialise également les attributs de message sélectionnés avant de les envoyer à votre fonction. Pour plus d’informations, consultez [Formats de données utiles et comportement de désérialisation](#services-consume-kafka-events-payload).

1. Choisissez **Enregistrer**.

 Vous pouvez également utiliser l’API Lambda pour créer ou mettre à jour votre mappage des sources d’événements avec une configuration de registre de schémas. Les exemples suivants montrent comment configurer un registre de schéma AWS Glue ou un registre de schéma Confluent à l'aide du AWS CLI, qui correspond aux opérations d'[CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)API [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)et de la *référence AWS Lambda d'API :* 

**Important**  
Si vous mettez à jour un champ de configuration du registre de schéma à l'aide de l'`update-event-source-mapping`API AWS CLI ou de l'API, vous devez mettre à jour tous les champs de configuration du registre de schéma.

------
#### [ Create Event Source Mapping ]

```
aws lambda create-event-source-mapping \
  --function-name my-schema-validator-function \
  --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \
  --topics my-kafka-topic \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --amazon-managed-kafka-event-source-mapping '{
      "SchemaRegistryConfig" : {
          "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
          "AccessConfigs": [{
              "Type": "BASIC_AUTH", 
              "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName"
          }],
          "EventRecordFormat": "JSON",
          "SchemaValidationConfigs": [
          { 
              "Attribute": "KEY" 
          },
          { 
              "Attribute": "VALUE" 
          }]
      }
  }'
```

------
#### [ Update AWS Glue Schema Registry ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName",
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Update Confluent Schema Registry with Authentication ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
            "AccessConfigs": [{
                "Type": "BASIC_AUTH", 
                "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName"
            }],
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Update Confluent Schema Registry without Authentication ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Remove Schema Registry Configuration ]

Pour supprimer une configuration de registre de schéma de votre mappage de source d'événements, vous pouvez utiliser la commande CLI [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html)dans la *référence AWS Lambda d'API*.

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {}
    }'
```

------

## Filtrage pour Avro et Protobuf
<a name="services-consume-kafka-events-filtering"></a>

 Lorsque vous utilisez les formats Avro ou Protobuf avec un registre de schémas, vous pouvez appliquer le filtrage des événements à votre fonction Lambda. Les modèles de filtre sont appliqués à la représentation JSON classique désérialisée de vos données après validation du schéma. Par exemple, avec un schéma Avro définissant les détails du produit, y compris le prix, vous pouvez filtrer les messages en fonction de la valeur du prix : 

**Note**  
 Lors de la désérialisation, l’objet Avro est converti en JSON standard, ce qui signifie qu’il ne peut pas être reconverti directement en objet Avro. Si vous devez effectuer une conversion en objet Avro, utilisez plutôt le format SOURCE.   
 Pour la désérialisation de Protobuf, les noms de champs dans le JSON obtenu correspondent à ceux définis dans votre schéma, plutôt que d’être convertis en casse mixte comme le fait généralement Protobuf. Gardez cela à l’esprit lorsque vous créez des schémas de filtrage. 

```
aws lambda create-event-source-mapping \
    --function-name myAvroFunction \
    --topics myAvroTopic \
    --starting-position TRIM_HORIZON \
    --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \
    --schema-registry-config '{
        "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry",
        "EventRecordFormat": "JSON",
        "SchemaValidationConfigs": [
            { 
                "Attribute": "VALUE" 
            }
        ]
    }' \
    --filter-criteria '{
        "Filters": [
            {
                "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }"
            }
        ]
    }'
```

 Dans cet exemple, le modèle de filtre analyse l’objet `value` en faisant correspondre les messages dans `field_1` avec `"value1"`, et `field_2` avec `"value2"`. Les critères de filtre sont évalués par rapport aux données désérialisées, une fois que Lambda a converti le message du format Avro en JSON. 

 Pour plus d’informations sur le filtrage des événements, consultez la section [Filtrage des événements Lambda](invocation-eventfiltering.md). 

## Formats de données utiles et comportement de désérialisation
<a name="services-consume-kafka-events-payload"></a>

 Lorsque vous utilisez un registre de schémas, Lambda fournit les données utiles finales à votre fonction dans un format similaire aux [données utiles d’événement ordinaires](with-msk.md#msk-sample-event), avec quelques champs supplémentaires. Les champs supplémentaires dépendent du paramètre `SchemaValidationConfigs`. Pour chaque attribut que vous sélectionnez pour validation (clé ou valeur), Lambda ajoute les métadonnées de schéma correspondantes aux données utiles. 

**Note**  
Vous devez passer [aws-lambda-java-events](https://github.com/aws/aws-lambda-java-libs/tree/main/aws-lambda-java-events)à la version 3.16.0 ou supérieure pour utiliser les champs de métadonnées du schéma.

 Par exemple, si vous validez le champ `value`, Lambda ajoute un champ appelé `valueSchemaMetadata` à vos données utiles. De même, pour le champ `key`, Lambda ajoute un champ appelé `keySchemaMetadata`. Ces métadonnées contiennent des informations sur le format des données et l’ID de schéma utilisés pour la validation : 

```
"valueSchemaMetadata": {
    "dataFormat": "AVRO",
    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
}
```

 Le paramètre `EventRecordFormat` peut être défini sur `JSON` ou `SOURCE`, ce qui détermine la manière dont Lambda gère les données validées par le schéma avant de les transmettre à votre fonction. Chaque option offre des capacités de traitement différentes : 
+ `JSON` : Lambda désérialise les attributs validés au format JSON standard, ce qui rend les données prêtes à être utilisées directement dans les langages prenant en charge le format JSON natif. Ce format est idéal lorsque vous n’avez pas besoin de conserver le format binaire d’origine ou de travailler avec les classes générées.
+ `SOURCE` : Lambda préserve le format binaire original des données sous forme de chaîne codée en Base64, ce qui permet une conversion directe en objets Avro ou Protobuf. Ce format est essentiel lorsque vous travaillez avec des langages fortement typés ou lorsque vous devez conserver toutes les fonctionnalités des schémas Avro ou Protobuf.

Sur la base de ces caractéristiques de format et des considérations spécifiques au langage, nous recommandons les formats suivants :


**Formats recommandés selon le langage de programmation**  

| Language | Avro | Protobuf | JSON | 
| --- | --- | --- | --- | 
| Java | SOURCE | SOURCE | SOURCE | 
| Python | JSON | JSON | JSON | 
| NodeJS | JSON | JSON | JSON | 
| .NET | SOURCE | SOURCE | SOURCE | 
| Autres | JSON | JSON | JSON | 

Les sections suivantes décrivent ces formats en détail et fournissent des exemples de données utiles pour chaque format.

### Format JSON
<a name="services-consume-kafka-events-payload-json"></a>

 Si vous choisissez `JSON` comme tel`EventRecordFormat`, Lambda valide et désérialise les attributs de message que vous avez sélectionnés dans le `SchemaValidationConfigs` champ (les attributs). `key` and/or `value` Lambda fournit ces attributs sélectionnés sous forme de chaînes codées en Base64 de leur représentation JSON standard dans votre fonction. 

**Note**  
 Lors de la désérialisation, l’objet Avro est converti en JSON standard, ce qui signifie qu’il ne peut pas être reconverti directement en objet Avro. Si vous devez effectuer une conversion en objet Avro, utilisez plutôt le format SOURCE.   
 Pour la désérialisation de Protobuf, les noms de champs dans le JSON obtenu correspondent à ceux définis dans votre schéma, plutôt que d’être convertis en casse mixte comme le fait généralement Protobuf. Gardez cela à l’esprit lorsque vous créez des schémas de filtrage. 

 Voici un exemple de données utiles, en supposant que vous choisissez `JSON` comme `EventRecordFormat` et les attributs `key` et `value` comme `SchemaValidationConfigs` : 

```
{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON
            "keySchemaMetadata": {
                "dataFormat": "AVRO",
                "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
            },
            "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON
            "valueSchemaMetadata": {
                "dataFormat": "AVRO",
                "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
            },
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

 Dans cet exemple : 
+ `key` et `value` sont des chaînes codées en Base64 de leur représentation JSON après désérialisation.
+ Lambda inclut des métadonnées de schéma pour les deux attributs dans `keySchemaMetadata` et `valueSchemaMetadata`.
+ Votre fonction peut décoder les chaînes `key` et `value` pour accéder aux données JSON désérialisées.

 Le format JSON est recommandé pour les langages qui ne sont pas fortement typés, comme Python ou Node.js. Ces langages offrent la prise en charge native de la conversion de JSON en objets. 

### Format source
<a name="services-consume-kafka-events-payload-source"></a>

 Si vous choisissez `SOURCE` comme `EventRecordFormat`, Lambda valide toujours l’enregistrement par rapport au registre de schémas, mais fournit les données binaires d’origine à votre fonction sans désérialisation. Ces données binaires sont fournies sous la forme d’une chaîne codée en Base64 des données d’octets d’origine, les métadonnées ajoutées par le producteur étant supprimées. Par conséquent, vous pouvez directement convertir les données binaires brutes en objets Avro et Protobuf dans le code de votre fonction. Nous vous recommandons d'utiliser Powertools for AWS Lambda, qui désérialisera les données binaires brutes et vous fournira directement les objets Avro et Protobuf. 

 Par exemple, si vous configurez Lambda pour valider à la fois les attributs `key` et `value`, mais que vous utilisez le format `SOURCE`, votre fonction reçoit des données utiles semblables à celles-ci : 

```
{
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1",
    "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
    "records": {
        "mytopic-0": [
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 15,
                "timestamp": 1545084650987,
                "timestampType": "CREATE_TIME",
                "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed
                "keySchemaMetadata": {
                    "dataFormat": "AVRO",
                    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
                },
                "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed
                "valueSchemaMetadata": {
                    "dataFormat": "AVRO",
                    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
                },
                "headers": [
                    {
                        "headerKey": [
                            104,
                            101,
                            97,
                            100,
                            101,
                            114,
                            86,
                            97,
                            108,
                            117,
                            101
                        ]
                    }
                ]
            }
        ]
    }
}
```

 Dans cet exemple : 
+ `key` et `value` contiennent tous deux les données binaires d’origine sous forme de chaînes codées en Base64.
+ Votre fonction doit gérer la désérialisation à l’aide des bibliothèques appropriées.

 Il est recommandé de choisir `SOURCE` pour `EventRecordFormat` si vous utilisez des objets générés par Avro ou Protobuf, en particulier avec des fonctions Java. Cela est dû au fait que Java est fortement typé et nécessite des désérialiseurs spécifiques pour les formats Avro et Protobuf. Dans le code de votre fonction, vous pouvez utiliser votre bibliothèque Avro ou Protobuf préférée pour désérialiser les données. 

## Utilisation de données désérialisées dans des fonctions Lambda
<a name="services-consume-kafka-events-payload-examples"></a>

Powertools for vous AWS Lambda aide à désérialiser les enregistrements Kafka dans votre code de fonction en fonction du format que vous utilisez. Cet utilitaire simplifie l'utilisation des enregistrements Kafka en gérant la conversion des données et en fournissant des ready-to-use objets.

 Pour utiliser Powertools AWS Lambda dans votre fonction, vous devez ajouter Powertools en tant que couche AWS Lambda ou en tant que dépendance lors de la création de votre fonction Lambda. Pour obtenir des instructions de configuration et plus d'informations, consultez les Powertools pour obtenir AWS Lambda la documentation correspondant à votre langue préférée : 
+ [Powertools pour AWS Lambda (Java)](https://docs.powertools.aws.dev/lambda/java/latest/utilities/kafka/)
+ [Powertools pour AWS Lambda (Python)](https://docs.powertools.aws.dev/lambda/python/latest/utilities/kafka/)
+ [Outils électriques pour AWS Lambda () TypeScript](https://docs.powertools.aws.dev/lambda/typescript/latest/features/kafka/)
+ [Powertools pour AWS Lambda (.NET)](https://docs.powertools.aws.dev/lambda/dotnet/utilities/kafka/)

**Note**  
Lorsque vous travaillez avec l’intégration du registre de schémas, vous pouvez choisir le format `SOURCE` ou `JSON`. Chaque option prend en charge différents formats de sérialisation, comme indiqué ci-dessous :  


| Format | Prend en charge | 
| --- | --- | 
|  SOURCE  |  Avro et Protobuf (en utilisant l’intégration du registre de schémas Lambda)  | 
|  JSON  |  Données JSON  | 

 Lorsque vous utilisez le `JSON` format `SOURCE` or, vous pouvez utiliser Powertools pour vous aider AWS à désérialiser les données de votre code de fonction. Voici des exemples de gestion de différents formats de données : 

------
#### [ AVRO ]

Exemple Java :

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.avro.AvroProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_AVRO)
    public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) {
        for (ConsumerRecord<String, AvroProduct> consumerRecord : records) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            AvroProduct product = consumerRecord.value();
            LOGGER.info("AvroProduct: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }

}
```

Exemple Python :

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer
from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

logger = Logger(service="kafkaConsumerPowertools")

value_schema_str = open("customer_profile.avsc", "r").read()

schema_config = SchemaConfig(
value_schema_type="AVRO",
value_schema=value_schema_str)

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript exemple :

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';

import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

const schema = '{   
    "type": "record",   
    "name": "Product",   
    "fields": [     
        { "name": "id", "type": "int" },     
        { "name": "name", "type": "string" },     
        { "name": "price", "type": "double" }   
    ] 
}';

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
            // You can add more processing logic here
        }
    },
    {
        value: {
            type: 'avro',
            schema: schema,
        },
    }
);
```

Exemple .NET :

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Avro;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))]

namespace ProtoBufClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------
#### [ PROTOBUF ]

Exemple Java :

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.protobuf.ProtobufProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class ProtobufDeserializationFunction
        implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_PROTOBUF)
    public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) {
        for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            ProtobufProduct product = consumerRecord.value();
            LOGGER.info("ProtobufProduct: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }

}
```

Exemple Python :

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer

from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

from user_pb2 import User # protobuf generated class

logger = Logger(service="kafkaConsumerPowertools")

schema_config = SchemaConfig(
value_schema_type="PROTOBUF",
value_schema=User)

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript exemple :

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';
import { Product } from './product.generated.js';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
        }
    },
    {
        value: {
            type: 'protobuf',
            schema: Product,
        },
    }
);
```

Exemple .NET :

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Protobuf;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))]

namespace ProtoBufClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------
#### [ JSON ]

Exemple Java :

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_JSON)
    public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) {
        for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            Product product = consumerRecord.value();
            LOGGER.info("Product: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }
}
```

Exemple Python :

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer

from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

logger = Logger(service="kafkaConsumerPowertools")

schema_config = SchemaConfig(value_schema_type="JSON")

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

TypeScript exemple :

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
            // You can add more processing logic here
        }
    },
    {
        value: {
            type: 'json',
        },
    }
);
```

Exemple .NET :

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Json;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))]

namespace JsonClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------

## Méthodes d’authentification pour votre registre de schémas
<a name="services-consume-kafka-events-auth"></a>

 Pour utiliser un registre de schémas, Lambda doit pouvoir y accéder de façon sécurisée. Si vous travaillez avec un registre de AWS Glue schémas, Lambda s'appuie sur l'authentification IAM. Cela signifie que le [rôle d'exécution](lambda-intro-execution-role.md) de votre fonction doit disposer des autorisations suivantes pour accéder au AWS Glue registre : 
+ [GetRegistry](https://docs.aws.amazon.com/glue/latest/webapi/API_GetRegistry.html)dans la *référence de l'API AWS Glue Web*
+ [GetSchemaVersion](https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html)dans la *référence de l'API AWS Glue Web*

Exemple de politique IAM requise : 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetRegistry",
                "glue:GetSchemaVersion"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**Note**  
 Pour les registres de AWS Glue schémas, si vous fournissez `AccessConfigs` un AWS Glue registre, Lambda renverra une exception de validation. 

Si vous travaillez avec un registre de schémas Confluent, vous pouvez choisir l'une des trois méthodes d'authentification prises en charge pour le `Type` paramètre de votre [KafkaSchemaRegistryAccessConfig](https://docs.aws.amazon.com/lambda/latest/api/API_KafkaSchemaRegistryAccessConfig)objet :
+ **BASIC\$1AUTH** : Lambda utilise le nom d’utilisateur et le mot de passe ou l’authentification par clé d’API et secret d’API pour accéder à votre registre. Si vous choisissez cette option, indiquez l’ARN Secrets Manager contenant vos informations d’identification dans le champ URI.
+ **CLIENT\$1CERTIFICATE\$1TLS\$1AUTH** : Lambda utilise l’authentification TLS mutuelle avec les certificats clients. Pour utiliser cette option, Lambda doit avoir accès à la fois au certificat et à la clé privée. Indiquez l’ARN Secrets Manager contenant ces informations d’identification dans le champ URI.
+ **NO\$1AUTH** : le certificat CA public doit être signé par une autorité de certification située dans le magasin d’approbations Lambda. Pour un certificat CA/autosigné privé, vous configurez le certificat CA racine du serveur. Pour utiliser cette option, omettez le paramètre `AccessConfigs`.

 En outre, si Lambda a besoin d’accéder à un certificat CA privé pour vérifier le certificat TLS de votre registre de schémas, choisissez `SERVER_ROOT_CA_CERT` comme `Type` et fournissez l’ARN Secrets Manager au certificat dans le champ URI. 

**Note**  
 Pour configurer l’option `SERVER_ROOT_CA_CERT` dans la console, fournissez l’ARN secret contenant le certificat dans le champ **Chiffrement**. 

 La configuration d’authentification de votre registre de schémas est distincte de toute authentification que vous avez configurée pour votre cluster Kafka. Vous devez configurer les deux séparément, même s’ils utilisent des méthodes d’authentification similaires. 

## Gestion des erreurs et résolution des problèmes liés au registre de schémas
<a name="services-consume-kafka-events-troubleshooting"></a>

Lorsque vous utilisez un registre de schémas avec votre source d’événements Amazon MSK, vous pouvez rencontrer diverses erreurs. Cette section fournit des conseils sur les problèmes courants et leur résolution.

### Erreurs de configuration
<a name="consume-kafka-events-troubleshooting-configuration-errors"></a>

Ces erreurs se produisent lors de la configuration de votre registre de schémas.

Mode provisionné requis  
**Message d’erreur** : `SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.`  
**Résolution :** activez le mode provisionné pour votre mappage des sources d'événements en configurant le paramètre `MinimumPollers` dans `ProvisionedPollerConfig`.

URL de registre de schémas non valide  
**Message d’erreur** : `Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.`  
**Solution :** Fournissez une URL HTTPS valide pour le registre des schémas Confluent ou un ARN valide pour le registre des AWS Glue schémas.

Format d’enregistrement d’événement non valide ou manquant  
**Message d’erreur** : `EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.`  
**Résolution :** Spécifiez SOURCE ou JSON EventRecordFormat dans la configuration de votre registre de schéma.

Attributs de validation en double  
**Message d’erreur** : `Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.`  
**Résolution :** Supprimez les attributs KEY ou VALUE dupliqués de votre SchemaValidationConfigs. Chaque type d’attribut ne peut apparaître qu’une seule fois.

Configuration de validation manquante  
**Message d’erreur** : `SchemaValidationConfigs is a required field for SchemaRegistryConfig.`  
**Solution :** Ajoutez SchemaValidationConfigs à votre configuration en spécifiant au moins un attribut de validation (KEY ou VALUE).

### Erreurs d’accès et d’autorisation
<a name="consume-kafka-events-troubleshooting-access-errors"></a>

Ces erreurs se produisent lorsque Lambda ne peut pas accéder au registre de schémas en raison de problèmes d’autorisation ou d’authentification.

AWS Glue Accès au registre du schéma refusé  
**Message d’erreur** : `Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.`  
**Résolution :** ajoutez les autorisations requises (`glue:GetRegistry` et `glue:GetSchemaVersion`) au rôle d’exécution de votre fonction.

Accès au registre de schémas Confluent refusé  
**Message d’erreur** : `Cannot access Confluent Schema with the provided access configuration.`  
**Solution :** vérifiez que vos informations d’authentification (stockées dans Secrets Manager) sont correctes et que vous disposez des autorisations nécessaires pour accéder au registre de schémas.

Registre des AWS Glue schémas entre comptes  
**Message d’erreur** : `Cross-account Glue Schema Registry ARN not supported.`  
**Solution :** utilisez un registre de AWS Glue schémas enregistré dans le même AWS compte que votre fonction Lambda.

Registre de AWS Glue schémas interrégional  
**Message d’erreur** : `Cross-region Glue Schema Registry ARN not supported.`  
**Solution :** utilisez un registre de AWS Glue schémas situé dans la même région que votre fonction Lambda.

Problèmes d’accès au secret  
**Message d’erreur** : `Lambda received InvalidRequestException from Secrets Manager.`  
**Solution :** Vérifiez que le rôle d'exécution de votre fonction est autorisé à accéder au secret et que celui-ci n'est pas chiffré avec une AWS KMS clé par défaut si vous y accédez depuis un autre compte.

### Erreurs de connexion
<a name="consume-kafka-events-troubleshooting-connection-errors"></a>

Ces erreurs se produisent lorsque Lambda ne parvient pas à établir une connexion au registre de schémas.

Problèmes de connectivité VPC  
**Message d’erreur** : `Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.`  
**Solution :** configurez votre réseau VPC pour autoriser les connexions au registre de schéma à l'aide AWS PrivateLink d'une passerelle NAT ou d'un peering VPC.

Échec d’établissement d’une liaison TLS  
**Message d’erreur** : `Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.`  
**Solution :** vérifiez que vos certificats CA et vos certificats clients (pour mTLS) sont corrects et bien configurés dans Secrets Manager.

étranglement  
**Message d’erreur** : `Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.`  
**Résolution :** augmentez les limites de débit d’API pour votre registre de schémas ou réduisez le taux de demandes provenant de votre application.

Erreurs de registre de schémas autogéré  
**Message d’erreur** : `Lambda received an internal server an unexpected error from the provided self-managed schema registry.`  
**Solution :** vérifiez l’état et la configuration de votre serveur de registre de schémas autogéré.