

# Uso de registros de esquemas con orígenes de eventos de Kafka en Lambda
<a name="services-consume-kafka-events"></a>

 Los registros de esquemas ayudan a definir y administrar los esquemas de flujo de datos. Un esquema define la estructura y el formato de un registro de datos. En el contexto de las asignaciones de orígenes de eventos de Kafka, puede configurar un registro de esquemas para validar la estructura y el formato de los mensajes de Kafka comparándolos con esquemas predefinidos antes de que lleguen a su función de Lambda. Esto agrega una capa de gobernanza de datos a su aplicación y le permite administrar los formatos de datos de forma eficiente, garantizar el cumplimiento del esquema y optimizar los costos mediante el filtrado de eventos. 

 Esta característica funciona con todos los lenguajes de programación, pero debe tener en cuenta estos puntos importantes: 
+ Powertools para Lambda proporciona soporte específico para Java, Python y TypeScript, mantiene la coherencia con los patrones de desarrollo de Kafka existentes y permite el acceso directo a los objetos de negocio sin necesidad de un código de deserialización personalizado.
+ Esta característica solo está disponible para asignaciones de orígenes de eventos con modo aprovisionado. El registro de esquemas no admite asignaciones de orígenes de eventos en el modo bajo demanda. Si utiliza el modo aprovisionado y tiene un registro de esquemas configurado, no podrá cambiar al modo bajo demanda a menos que elimine primero la configuración del registro de esquemas. Para obtener más información, consulte [Modo aprovisionado](invocation-eventsourcemapping.md#invocation-eventsourcemapping-provisioned-mode)
+ Solo puede configurar un registro de esquemas por asignación de orígenes de eventos (ESM). El uso de un registro de esquemas con los orígenes de eventos de Kafka puede aumentar el uso de la unidad Lambda Event Poller Unit (EPU), que es una dimensión de precios para el modo aprovisionado. 

**Topics**
+ [Opciones del registro de esquemas](#services-consume-kafka-events-options)
+ [Cómo Lambda valida el esquema de los mensajes de Kafka](#services-consume-kafka-events-how)
+ [Configuración de un registro de esquemas de Kafka](#services-consume-kafka-events-config)
+ [Filtrado para Avro y Protobuf](#services-consume-kafka-events-filtering)
+ [Formatos de carga útil y comportamiento de deserialización](#services-consume-kafka-events-payload)
+ [Trabajar con datos deserializados en funciones de Lambda](#services-consume-kafka-events-payload-examples)
+ [Métodos de autenticación para el registro de esquemas](#services-consume-kafka-events-auth)
+ [Gestión de errores y solución de problemas relacionados con el registro de esquemas](#services-consume-kafka-events-troubleshooting)

## Opciones del registro de esquemas
<a name="services-consume-kafka-events-options"></a>

 Lambda es compatible con las siguientes opciones de registro de esquemas: 
+ [AWS Glue Registro de esquemas](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)
+ [Registro de esquemas de Confluent Cloud](https://docs.confluent.io/platform/current/schema-registry/index.html)
+ [Registro de esquemas de Confluent de administración automática](https://docs.confluent.io/platform/current/schema-registry/index.html)

 El registro de esquemas es compatible con la validación de mensajes en los siguientes formatos de datos: 
+ Apache Avro
+ Búferes de protocolo (Protobuf)
+ Esquema JSON (JSON-SE)

 Para usar un registro de esquemas, primero asegúrese de que la asignación de orígenes de eventos esté en modo aprovisionado. Cuando se utiliza un registro de esquemas, Lambda agrega metadatos sobre el esquema a la carga útil. Para obtener más información, consulte [Formatos de carga útil y comportamiento de deserialización](#services-consume-kafka-events-payload). 

## Cómo Lambda valida el esquema de los mensajes de Kafka
<a name="services-consume-kafka-events-how"></a>

 Al configurar un registro de esquemas, Lambda realiza los siguientes pasos para cada mensaje de Kafka: 

1. Lambda sondea el registro de Kafka de su clúster.

1. Lambda valida los atributos de los mensajes seleccionados en el registro en relación con un esquema específico en su registro de esquemas.
   + Si el esquema asociado al mensaje no se encuentra en el registro, Lambda envía el mensaje a un DLQ con el código de motivo `SCHEMA_NOT_FOUND`.

1. Lambda deserializa el mensaje de acuerdo con la configuración del registro de esquemas para validar el mensaje. Si está configurado el filtrado de eventos, Lambda lo realiza en función de los criterios de filtro configurados.
   + Si se produce un error en la deserialización, Lambda envía el mensaje a un DLQ con el código de motivo `DESERIALIZATION_ERROR`. Si no se configura ningún DLQ, Lambda descarta el mensaje.

1. Si el mensaje es validado por el registro del esquema y no se filtra según sus criterios de filtro, Lambda invoca su función con el mensaje.

 Esta característica está destinada a validar los mensajes que ya se han producido con clientes de Kafka integrados con un registro de esquemas. Recomendamos que configure sus productores de Kafka para que funcionen con el registro de esquemas a fin de crear mensajes con el formato adecuado. 

## Configuración de un registro de esquemas de Kafka
<a name="services-consume-kafka-events-config"></a>

 Los siguientes pasos de la consola añaden una configuración de registro del esquema de Kafka a la asignación de orígenes de eventos. 

**Cómo agregar una configuración de registro de esquema de Kafka a la asignación de orígenes de eventos (consola)**

1. Abra la [Página de función](https://console.aws.amazon.com/lambda/home#/functions) en la consola de Lambda.

1. Elija **Configuración**.

1. Elija **Añadir desencadenador**.

1. Seleccione la asignación de orígenes de eventos de Kafka que desea configurar para un registro de esquemas y elija **Editar**.

1. En **Configuración del sondeo de eventos**, elija **Configurar registro de esquemas**. Su asignación de orígenes de eventos debe estar en modo aprovisionado para ver esta opción.

1. Para el **URI del registro de esquemas**, ingrese el ARN de su registro de esquemas de AWS Glue o la URL HTTPS de su registro de esquemas de Confluent Cloud o registro de esquemas de Confluent de administración automática.

1. Los siguientes pasos de configuración indican a Lambda cómo acceder al registro de esquemas. Para obtener más información, consulte [Métodos de autenticación para el registro de esquemas](#services-consume-kafka-events-auth).
   + En el **tipo de configuración de acceso**, elija el tipo de autenticación que Lambda utiliza para acceder al registro de esquemas.
   + En el **URI de configuración de acceso**, introduzca el ARN del secreto de Secrets Manager para autenticarse con el registro de esquemas, si corresponde. Asegúrese de que el rol de [ejecución de su función](with-msk-permissions.md) contenga los permisos correctos.

1. El campo **Cifrado** solo se aplica si el registro del esquema está firmado por una entidad de certificación (CA) privada o una entidad de certificación (CA) que no se encuentra en el almacén de confianza de Lambda. Si corresponde, proporcione la clave secreta que contiene el certificado de CA privado que utiliza el registro de esquemas para el cifrado TLS.

1. En **Formato de registro de eventos**, elija cómo desea que Lambda entregue los registros a su función después de la validación del esquema. Para obtener más información, consulte [Ejemplos de formato de carga útil](#services-consume-kafka-events-payload).
   + Si elige **JSON**, Lambda entrega los atributos que selecciona en el atributo de validación de esquema que aparece a continuación en formato JSON estándar. En el caso de los atributos que no seleccione, Lambda los entrega tal cual.
   + Si elige **SOURCE**, Lambda entrega los atributos que selecciona en el atributo de validación de esquema que aparece a continuación en formato original.

1. En **Atributo de validación de esquema**, seleccione los atributos de mensaje que desea que Lambda valide y deserialice mediante el registro de esquemas. Debe seleccionar al menos uno de los siguientes: **KEY** o **VALUE**. Si eligió JSON como formato de registro de eventos, Lambda también deserializa los atributos de mensaje seleccionados antes de enviarlos a su función. Para obtener más información, consulte [Formatos de carga útil y comportamiento de deserialización](#services-consume-kafka-events-payload).

1. Seleccione **Save**.

 También puede usar la API de Lambda para crear o actualizar la asignación de orígenes de eventos con una configuración de registro de esquemas. Los siguientes ejemplos muestran cómo configurar un registro de esquema de AWS Glue o de Confluent mediante el AWS CLI, que corresponde a las operaciones de la API de [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) y de [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) en la *Referencia de la API de AWS Lambda* : 

**importante**  
Si está actualizando un campo de configuración del registro de esquemas mediante la API de AWS CLI o la API de `update-event-source-mapping`, debe actualizar todos los campos de la configuración del registro de esquemas.

------
#### [ Create Event Source Mapping ]

```
aws lambda create-event-source-mapping \
  --function-name my-schema-validator-function \
  --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \
  --topics my-kafka-topic \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --amazon-managed-kafka-event-source-mapping '{
      "SchemaRegistryConfig" : {
          "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
          "AccessConfigs": [{
              "Type": "BASIC_AUTH", 
              "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName"
          }],
          "EventRecordFormat": "JSON",
          "SchemaValidationConfigs": [
          { 
              "Attribute": "KEY" 
          },
          { 
              "Attribute": "VALUE" 
          }]
      }
  }'
```

------
#### [ Update AWS Glue Schema Registry ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName",
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Update Confluent Schema Registry with Authentication ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
            "AccessConfigs": [{
                "Type": "BASIC_AUTH", 
                "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName"
            }],
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Update Confluent Schema Registry without Authentication ]

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {
            "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud",
            "EventRecordFormat": "JSON",
            "SchemaValidationConfigs": [
            { 
                "Attribute": "KEY" 
            },
            { 
                "Attribute": "VALUE" 
            }]
        }
    }'
```

------
#### [ Remove Schema Registry Configuration ]

Para eliminar una configuración de registro de esquemas de la asignación de orígenes de eventos, puede usar el comando de CLI [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) en la *Referencia de la API de AWS Lambda * .

```
aws lambda update-event-source-mapping \
    --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \
    --amazon-managed-kafka-event-source-mapping '{
        "SchemaRegistryConfig" : {}
    }'
```

------

## Filtrado para Avro y Protobuf
<a name="services-consume-kafka-events-filtering"></a>

 Si utiliza los formatos Avro o Protobuf con un registro de esquemas, puede aplicar el filtrado de eventos a la función de Lambda. Los patrones de filtro se aplican a la representación JSON clásica deserializada de sus datos después de la validación del esquema. Por ejemplo, con un esquema Avro que defina los detalles del producto, incluido el precio, puede filtrar los mensajes en función del valor del precio: 

**nota**  
 Cuando se deserializa, Avro se convierte en formato JSON estándar, lo que significa que no se puede volver a convertir directamente en un objeto Avro. Si necesita convertirlo en un objeto Avro, utilice el formato SOURCE.   
 Para la deserialización de Protobuf, los nombres de campo del JSON resultante coinciden con los que están definidos en el esquema, en lugar de convertirse en estilo camel case como suele hacer Protobuf. Tenga esto en cuenta durante la creación de patrones de filtrado. 

```
aws lambda create-event-source-mapping \
    --function-name myAvroFunction \
    --topics myAvroTopic \
    --starting-position TRIM_HORIZON \
    --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \
    --schema-registry-config '{
        "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry",
        "EventRecordFormat": "JSON",
        "SchemaValidationConfigs": [
            { 
                "Attribute": "VALUE" 
            }
        ]
    }' \
    --filter-criteria '{
        "Filters": [
            {
                "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }"
            }
        ]
    }'
```

 En este ejemplo, el patrón de filtro analiza el objeto `value` y hace coincidir los mensajes `field_1` con `"value1"` y `field_2` con `"value2"`. Los criterios del filtro se comparan con los datos deserializados, después de que Lambda convierte el mensaje del formato Avro l formato JSON. 

 Para obtener más información respecto del filtrado de eventos, consulte [Filtrado de eventos de Lambda](invocation-eventfiltering.md). 

## Formatos de carga útil y comportamiento de deserialización
<a name="services-consume-kafka-events-payload"></a>

 Cuando se utiliza un registro de esquemas, Lambda entrega la carga útil final a la función en un formato similar al de la [carga útil de evento regular](with-msk.md#msk-sample-event), con campos adicionales. Los campos adicionales dependen del parámetro `SchemaValidationConfigs`. Para cada atributo que seleccione para la validación (clave o valor), Lambda añade los metadatos del esquema correspondientes a la carga útil. 

**nota**  
Debe actualizar su [aws-lambda-java-events](https://github.com/aws/aws-lambda-java-libs/tree/main/aws-lambda-java-events) a la versión 3.16.0 o una superior para utilizar los campos de metadatos del esquema.

 Por ejemplo, si valida el campo `value`, Lambda añade un campo llamado `valueSchemaMetadata` a su carga útil. Del mismo modo, para el campo `key`, Lambda agrega un campo llamado `keySchemaMetadata`. Estos metadatos contienen información sobre el formato de los datos y el ID del esquema utilizados para la validación: 

```
"valueSchemaMetadata": {
    "dataFormat": "AVRO",
    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
}
```

 El parámetro `EventRecordFormat` se puede establecer en `JSON` o `SOURCE`, lo que determina cómo Lambda gestiona los datos validados por el esquema antes de entregarlos a la función. Cada opción ofrece distintas capacidades de procesamiento: 
+ `JSON`: Lambda deserializa los atributos validados en formato JSON estándar, lo que permite que los datos estén listos para uso directo en lenguajes compatibles con JSON nativo. Este formato es ideal cuando no se necesita conservar el formato binario original ni trabajar con las clases generadas.
+ `SOURCE`: Lambda conserva el formato binario original de los datos como una cadena de codificación Base64, lo que permite la conversión directa a objetos Avro o Protobuf. Este formato es esencial cuando se trabaja con lenguajes fuertemente tipados o cuando se necesitan mantener todas las capacidades de los esquemas de Avro o Protobuf.

En función de estas características del formato y de las consideraciones específicas del lenguaje, recomendamos los siguientes formatos:


**Formatos recomendados según el lenguaje de programación**  

| Idioma | Avro | Protobuf | JSON | 
| --- | --- | --- | --- | 
| Java | SOURCE | SOURCE | SOURCE | 
| Python | JSON | JSON | JSON | 
| NodeJS | JSON | JSON | JSON | 
| .NET | SOURCE | SOURCE | SOURCE | 
| Otros | JSON | JSON | JSON | 

En las siguientes secciones se describen estos formatos en detalle y se proporcionan ejemplos de cargas útiles para cada uno de ellos.

### Formato JSON
<a name="services-consume-kafka-events-payload-json"></a>

 Si elige `JSON` como el `EventRecordFormat`, Lambda valida y deserializa los atributos del mensaje que ha seleccionado en el campo `SchemaValidationConfigs` (los atributos `key` o `value`). Lambda entrega estos atributos seleccionados como cadenas de codificación base64 de su representación JSON estándar en su función. 

**nota**  
 Cuando se deserializa, Avro se convierte en formato JSON estándar, lo que significa que no se puede volver a convertir directamente en un objeto Avro. Si necesita convertirlo en un objeto Avro, utilice el formato SOURCE.   
 Para la deserialización de Protobuf, los nombres de campo del JSON resultante coinciden con los que están definidos en el esquema, en lugar de convertirse en estilo camel case como suele hacer Protobuf. Tenga esto en cuenta durante la creación de patrones de filtrado. 

 A continuación se muestra un ejemplo de carga útil, sobre la suposición de que elige `JSON` como `EventRecordFormat`, y ambos atributos `key` y `value` como `SchemaValidationConfigs`: 

```
{
   "eventSource":"aws:kafka",
   "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1",
   "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
   "records":{
      "mytopic-0":[
         {
            "topic":"mytopic",
            "partition":0,
            "offset":15,
            "timestamp":1545084650987,
            "timestampType":"CREATE_TIME",
            "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON
            "keySchemaMetadata": {
                "dataFormat": "AVRO",
                "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
            },
            "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON
            "valueSchemaMetadata": {
                "dataFormat": "AVRO",
                "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
            },
            "headers":[
               {
                  "headerKey":[
                     104,
                     101,
                     97,
                     100,
                     101,
                     114,
                     86,
                     97,
                     108,
                     117,
                     101
                  ]
               }
            ]
         }
      ]
   }
}
```

 En este ejemplo: 
+ Ambos `key` y `value` son cadenas de codificación base64 de su representación JSON después de la deserialización.
+ Lambda incluye metadatos de esquema para ambos atributos en `keySchemaMetadata` y `valueSchemaMetadata`.
+ La función puede decodificar las cadenas `key` y `value` para acceder a los datos JSON deserializados.

 Se recomienda el formato JSON para los lenguajes que no están fuertemente tipados, como Python o Node.js. Estos lenguajes cuentan con soporte nativo para convertir el formato JSON en objetos. 

### Formato de origen
<a name="services-consume-kafka-events-payload-source"></a>

 Si elige `SOURCE` como `EventRecordFormat`, Lambda igual valida el registro con respecto al registro del esquema, pero entrega los datos binarios originales a la función sin deserialización. Estos datos binarios se entregan como una cadena de codificación Base64 de los datos de bytes originales, sin incluir los metadatos agregados por el productor. Como resultado, puede convertir directamente los datos binarios sin procesar en objetos Avro y Protobuf dentro del código de función. Se recomienda que utilice Powertools para AWS Lambda, que deserializará los datos binarios sin procesar y proporcionará directamente los objetos Avro y Protobuf. 

 Por ejemplo, si configura Lambda para validar los atributos `key` y `value`, pero utiliza el formato `SOURCE`, la función recibe una carga útil como la siguiente: 

```
{
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1",
    "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092",
    "records": {
        "mytopic-0": [
            {
                "topic": "mytopic",
                "partition": 0,
                "offset": 15,
                "timestamp": 1545084650987,
                "timestampType": "CREATE_TIME",
                "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed
                "keySchemaMetadata": {
                    "dataFormat": "AVRO",
                    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
                },
                "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed
                "valueSchemaMetadata": {
                    "dataFormat": "AVRO",
                    "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111"
                },
                "headers": [
                    {
                        "headerKey": [
                            104,
                            101,
                            97,
                            100,
                            101,
                            114,
                            86,
                            97,
                            108,
                            117,
                            101
                        ]
                    }
                ]
            }
        ]
    }
}
```

 En este ejemplo: 
+ Ambos `key` y `value` contienen los datos binarios originales en forma de cadenas de codificación Base64.
+ Su función debe gestionar la deserialización con las bibliotecas adecuadas.

 Se recomienda elegir `SOURCE` para `EventRecordFormat` si utiliza objetos generados por Avro o Protobuf, especialmente con funciones de Java. Esto se debe a que Java está fuertemente tipado y requiere deserializadores específicos para los formatos Avro y Protobuf. En el código de la función, puede usar la biblioteca Avro o Protobuf de su preferencia para deserializar los datos. 

## Trabajar con datos deserializados en funciones de Lambda
<a name="services-consume-kafka-events-payload-examples"></a>

Powertools para AWS Lambda ayuda a deserializar los registros de Kafka en el código de su función según el formato que utilice. Esta utilidad simplifica el trabajo con los registros de Kafka al gestionar la conversión de datos y proporcionar objetos listos para usar.

 Si desea usar Powertools para AWS Lambda en su función, debe agregar Powertools para AWS Lambda como una capa o incluirlo como una dependencia al crear su función de Lambda. Para obtener instrucciones de configuración y más información, consulte la documentación de Powertools para AWS Lambda para su lenguaje preferido: 
+ [Powertools para AWS Lambda (Java)](https://docs.powertools.aws.dev/lambda/java/latest/utilities/kafka/)
+ [Powertools para AWS Lambda (Python)](https://docs.powertools.aws.dev/lambda/python/latest/utilities/kafka/)
+ [Powertools para AWS Lambda (TypeScript)](https://docs.powertools.aws.dev/lambda/typescript/latest/features/kafka/)
+ [Powertools para AWS Lambda (.NET)](https://docs.powertools.aws.dev/lambda/dotnet/utilities/kafka/)

**nota**  
Al trabajar con la integración del registro de esquemas, puede elegir el formato `SOURCE` o `JSON`. Cada opción admite diferentes formatos de serialización, como se muestra a continuación:  


| Formato | Admite | 
| --- | --- | 
|  SOURCE  |  Avro y Protobuf (mediante la integración del registro de esquema de Lambda)  | 
|  JSON  |  Datos JSON  | 

 Al usar el formato `SOURCE` o `JSON`, puede utilizar Powertools para AWS a fin de ayudar a deserializar los datos en el código de la función. Estos son algunos ejemplos de cómo manejar los diferentes formatos de datos: 

------
#### [ AVRO ]

Ejemplo de Java:

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.avro.AvroProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_AVRO)
    public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) {
        for (ConsumerRecord<String, AvroProduct> consumerRecord : records) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            AvroProduct product = consumerRecord.value();
            LOGGER.info("AvroProduct: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }

}
```

Ejemplo de Python:

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer
from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

logger = Logger(service="kafkaConsumerPowertools")

value_schema_str = open("customer_profile.avsc", "r").read()

schema_config = SchemaConfig(
value_schema_type="AVRO",
value_schema=value_schema_str)

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

Muestra de TypeScript:

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';

import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

const schema = '{   
    "type": "record",   
    "name": "Product",   
    "fields": [     
        { "name": "id", "type": "int" },     
        { "name": "name", "type": "string" },     
        { "name": "price", "type": "double" }   
    ] 
}';

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
            // You can add more processing logic here
        }
    },
    {
        value: {
            type: 'avro',
            schema: schema,
        },
    }
);
```

Ejemplo de .NET:

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Avro;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))]

namespace ProtoBufClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------
#### [ PROTOBUF ]

Ejemplo de Java:

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.demo.kafka.protobuf.ProtobufProduct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class ProtobufDeserializationFunction
        implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_PROTOBUF)
    public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) {
        for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            ProtobufProduct product = consumerRecord.value();
            LOGGER.info("ProtobufProduct: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }

}
```

Ejemplo de Python:

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer

from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

from user_pb2 import User # protobuf generated class

logger = Logger(service="kafkaConsumerPowertools")

schema_config = SchemaConfig(
value_schema_type="PROTOBUF",
value_schema=User)

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

Muestra de TypeScript:

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';
import { Product } from './product.generated.js';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
        }
    },
    {
        value: {
            type: 'protobuf',
            schema: Product,
        },
    }
);
```

Ejemplo de .NET:

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Protobuf;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))]

namespace ProtoBufClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------
#### [ JSON ]

Ejemplo de Java:

```
package org.demo.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;

import software.amazon.lambda.powertools.kafka.Deserialization;
import software.amazon.lambda.powertools.kafka.DeserializationType;
import software.amazon.lambda.powertools.logging.Logging;

public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> {

    private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class);

    @Override
    @Logging
    @Deserialization(type = DeserializationType.KAFKA_JSON)
    public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) {
        for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) {
            LOGGER.info("ConsumerRecord: {}", consumerRecord);

            Product product = consumerRecord.value();
            LOGGER.info("Product: {}", product);

            String key = consumerRecord.key();
            LOGGER.info("Key: {}", key);
        }

        return "OK";
    }
}
```

Ejemplo de Python:

```
from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer

from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig
from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords

from aws_lambda_powertools.utilities.typing import LambdaContext
from aws_lambda_powertools import Logger

logger = Logger(service="kafkaConsumerPowertools")

schema_config = SchemaConfig(value_schema_type="JSON")

@kafka_consumer(schema_config=schema_config)
def lambda_handler(event: ConsumerRecords, context:LambdaContext):

  for record in event.records:
      value = record.value
      logger.info(f"Received value: {value}")
```

Muestra de TypeScript:

```
import { kafkaConsumer } from '@aws-lambda-powertools/kafka';
import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types';
import { Logger } from '@aws-lambda-powertools/logger';
import type { Context } from 'aws-lambda';

const logger = new Logger();

type Value = {
    id: number;
    name: string;
    price: number;
};

export const handler = kafkaConsumer<string, Value>(
    (event: ConsumerRecords<string, Value>, _context: Context) => {
        for (const record of event.records) {
            logger.info(Processing record with key: ${record.key});
            logger.info(Record value: ${JSON.stringify(record.value)});
            // You can add more processing logic here
        }
    },
    {
        value: {
            type: 'json',
        },
    }
);
```

Ejemplo de .NET:

```
using Amazon.Lambda.Core;
using AWS.Lambda.Powertools.Kafka;
using AWS.Lambda.Powertools.Kafka.Json;
using AWS.Lambda.Powertools.Logging;
using Com.Example;

// Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))]

namespace JsonClassLibrary;

public class Function
{
    public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context)
    {
        foreach (var record in records)
        {
            Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic);
            Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset);
            Logger.LogInformation("Produced at: {timestamp}", record.Timestamp);
            
            foreach (var header in record.Headers.DecodedValues())
            {
                Logger.LogInformation($"{header.Key}: {header.Value}");
            }
            
            Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName);
        }
    
        return "Processed " + records.Count() + " records";
    }
}
```

------

## Métodos de autenticación para el registro de esquemas
<a name="services-consume-kafka-events-auth"></a>

 Para utilizar un registro de esquemas, Lambda debe poder acceder a este de forma segura. Si trabaja con un registro de esquemas de AWS Glue, Lambda se basa en la autenticación de IAM. Esto significa que el [rol de ejecución](lambda-intro-execution-role.md) de su función debe tener los siguientes permisos para acceder al registro AWS Glue: 
+ [GetRegistry](https://docs.aws.amazon.com/glue/latest/webapi/API_GetRegistry.html) en la *referencia de la API web de AWS Glue*
+ [GetSchemaVersion](https://docs.aws.amazon.com/glue/latest/webapi/API_GetSchemaVersion.html) en la *referencia de la API web de AWS Glue*

Ejemplo de la política de IAM requerida: 

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "glue:GetRegistry",
                "glue:GetSchemaVersion"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}
```

------

**nota**  
 Para los registros de esquemas de AWS Glue, si proporciona `AccessConfigs` para un registro AWS Glue, Lambda devolverá una excepción de validación. 

Si trabaja con un registro de esquemas de Confluent, puede elegir uno de los tres métodos de autenticación compatibles para el parámetro `Type` de su objeto [KafkaSchemaRegistryAccessConfig](https://docs.aws.amazon.com/lambda/latest/api/API_KafkaSchemaRegistryAccessConfig):
+ **BASIC\$1AUTH**: Lambda utiliza la autenticación de nombre de usuario y contraseña o clave de API y secreto de API para acceder al registro. Si elige esta opción, proporcione el ARN de Secrets Manager que contiene sus credenciales en el campo URI.
+ **CLIENT\$1CERTIFICATE\$1TLS\$1AUTH**: Lambda utiliza la autenticación TLS mutua con los certificados de cliente. Lambda necesita acceder tanto al certificado como a la clave privada para usar esta opción. Proporcione el ARN de Secrets Manager que contiene estas credenciales en el campo URI.
+ **NO\$1AUTH**: El certificado de entidad de certificación pública debe estar firmado por una entidad de certificación que esté en el almacén de confianza de Lambda. Para un certificado autofirmado o de entidad de certificación privada, configure el certificado de entidad de certificación raíz del servidor. Para usar esta opción, se debe omitir el parámetro `AccessConfigs`.

 Además, si Lambda necesita acceder a un certificado de CA privado para verificar el certificado TLS del registro de esquemas, seleccione `SERVER_ROOT_CA_CERT` como `Type` y proporcione el ARN de Secrets Manager al certificado en el campo URI. 

**nota**  
 Para configurar la opción `SERVER_ROOT_CA_CERT` en la consola, proporcione el ARN secreto que contiene el certificado en el campo **Cifrado**. 

 La configuración de autenticación del registro de esquemas es independiente de cualquier autenticación que haya configurado para el clúster de Kafka. Debe configurar ambas por separado, incluso si utilizan métodos de autenticación que son similares. 

## Gestión de errores y solución de problemas relacionados con el registro de esquemas
<a name="services-consume-kafka-events-troubleshooting"></a>

Al utilizar un registro de esquemas con los orígenes de eventos de Amazon MSK, es posible que se produzcan varios errores. En esta sección se proporciona orientación sobre los problemas habituales y cómo resolverlos.

### Errores de configuración
<a name="consume-kafka-events-troubleshooting-configuration-errors"></a>

Estos errores se producen al configurar el registro de esquemas.

Se requiere el modo aprovisionado  
**Mensaje de error:** `SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.`  
**Solución:** habilite el modo aprovisionado para la asignación de orígenes de eventos mediante la configuración del parámetro `MinimumPollers` en `ProvisionedPollerConfig`. 

URL del registro de esquemas no válida  
**Mensaje de error:** `Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.`  
**Solución:** proporcione una URL HTTPS válida para el registro de esquemas de Confluent o un ARN válido para el registro de esquemas de AWS Glue.

Falta el formato de registro de eventos o este no es válido  
**Mensaje de error:** `EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.`  
**Solución:** especifique SOURCE o JSON como EventRecordFormat en la configuración del registro de esquemas.

Atributos de validación duplicados  
**Mensaje de error:** `Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.`  
**Solución:** elimine los atributos KEY o VALUE duplicados de sus SchemaValidationConfigs. Cada tipo de atributo solo puede aparecer una vez.

Falta la configuración de validación  
**Mensaje de error:** `SchemaValidationConfigs is a required field for SchemaRegistryConfig.`  
**Solución:** Añada SchemaValidationConfigs a su configuración y especifique al menos un atributo de validación (CLAVE o VALOR).

### Accesos y errores de permisos
<a name="consume-kafka-events-troubleshooting-access-errors"></a>

Estos errores ocurren cuando Lambda no puede acceder al registro de esquemas debido a problemas de permisos o autenticación.

Acceso denegado al registro de esquemas de AWS Glue  
**Mensaje de error:** `Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.`  
**Solución:** Agregue una política con los permisos necesarios (`glue:GetRegistry` y `glue:GetSchemaVersion`) al rol de ejecución de la función.

Acceso denegado al registro de esquemas de Confluent  
**Mensaje de error:** `Cannot access Confluent Schema with the provided access configuration.`  
**Solución:** Verifique que sus credenciales de autenticación (almacenadas en Secrets Manager) sean correctas y tengan los permisos necesarios para acceder al registro de esquemas.

Registro de esquemas de AWS Glue entre cuentas  
**Mensaje de error:** `Cross-account Glue Schema Registry ARN not supported.`  
**Solución:** Utilice un registro de esquemas de AWS Glue que esté en la misma cuenta de AWS que su función de Lambda.

Registro de esquemasAWS Glue entre regiones  
**Mensaje de error:** `Cross-region Glue Schema Registry ARN not supported.`  
**Solución:** Utilice un registro de esquemas que esté en la misma región que su función de Lambda.AWS Glue

Problemas de acceso a secretos  
**Mensaje de error:** `Lambda received InvalidRequestException from Secrets Manager.`  
**Solución:** Verifique que el rol de ejecución de su función tenga permiso para acceder al secreto y que este no esté cifrado con una clave AWS KMS predeterminada si accede desde una cuenta diferente.

### Errores de conexión
<a name="consume-kafka-events-troubleshooting-connection-errors"></a>

Estos errores ocurren cuando Lambda no puede establecer una conexión con el registro de esquemas.

Problemas de conectividad de VPC  
**Mensaje de error:** `Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.`  
**Solución:** Configure la red de VPC para permitir conexiones al registro de esquemas mediante AWS PrivateLink, una puerta de enlace NAT o un emparejamiento de VPC.

Error de protocolo de enlace TLS  
**Mensaje de error:** `Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.`  
**Solución:** Verifique que sus certificados de CA y de cliente (para mTLS) sean correctos y estén correctamente configurados en Secrets Manager.

Limitación  
**Mensaje de error:** `Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.`  
**Solución:** Aumente los límites de tasa de API para el registro de esquemas o reduzca la tasa de las solicitudes de su aplicación.

Errores de registro de esquemas de administración automática  
**Mensaje de error:** `Lambda received an internal server an unexpected error from the provided self-managed schema registry.`  
**Solución:** Verifique el estado y la configuración de su servidor de registro de esquemas de administración automática.