Usar registros de esquema com origens de eventos do Kafka no Lambda - AWS Lambda

Usar registros de esquema com origens de eventos do Kafka no Lambda

Registros de esquema ajudam a definir e gerenciar esquemas de fluxos de dados. O esquema define a estrutura e o formato de um registro de dados. No contexto de mapeamentos de origens de eventos do Kafka, é possível configurar um registro de esquema para validar a estrutura e o formato das mensagens do Kafka em relação a esquemas predefinidos antes que estas cheguem à sua função do Lambda. Isso acrescenta uma camada de governança de dados à sua aplicação e permite gerenciar formatos de dados com eficiência, garantir a conformidade do esquema e otimizar os custos por meio da filtragem de eventos.

Embora esse recurso funciona com todas as linguagens de programação, considere os seguintes pontos importantes:

  • O Powertools para Lambda fornece suporte específico para Java, Python e TypeScript, mantendo a consistência com padrões de desenvolvimento existentes do Kafka e possibilitando o acesso direto a objetos de negócios sem um código de desserialização personalizado

  • Esse recurso está disponível apenas para mapeamentos de origens de eventos que usam o modo provisionado. O registro de esquema não oferece suporte a mapeamentos de origens de eventos no modo sob demanda. Se estiver usando o modo provisionado e tiver um registro de esquema configurado, você não poderá mudar para o modo sob demanda, a não ser que remova primeiro a configuração desse registro de esquema. Para obter mais informações, consulte Modo provisionado.

  • É possível configurar apenas um registro de esquema por mapeamento de origem do evento (ESM). Usar um registro de esquema com uma origem de evento do Kafka tem o potencial de aumentar o uso da Event Poller Unit (EPU) do Lambda, que é uma dimensão de preço para o modo Provisionado.

Opções para registros de esquema

O Lambda é compatível com as seguintes opções de registros de esquema:

Seu registro de esquema é compatível com a validação de mensagens nos seguintes formatos de dados:

  • Apache Avro

  • Buffers de protocolo (Protobuf)

  • Esquema JSON (JSON-SE)

Para utilizar um registro de esquema, verifique primeiro se o mapeamento da origem do evento está no modo provisionado. Quando você utiliza um registro de esquema, o Lambda adiciona metadados sobre o esquema à carga útil. Para obter mais informações, consulte Formatos de carga útil e comportamento de desserialização.

Como o Lambda realiza a validação do esquema para mensagens do Kafka

Quando você configura um registro de esquema, o Lambda realiza as seguintes etapas para cada mensagem do Kafka:

  1. O Lambda faz a sondagem do registro do Kafka do seu cluster.

  2. O Lambda valida os atributos de mensagem selecionados no registro em relação a um esquema específico no registro de esquema.

    • Se o esquema associado à mensagem não for encontrado no registro, o Lambda enviará a mensagem a uma DLQ com o código do motivo SCHEMA_NOT_FOUND.

  3. O Lambda desserializa a mensagem de acordo com a configuração do registro do esquema para validar a mensagem. Se a filtragem de eventos estiver configurada, o Lambda realizará a filtragem com base nos critérios de filtros configurados.

    • Se a desserialização falhar, o Lambda enviará a mensagem a uma DLQ com o código do motivo DESERIALIZATION_ERROR. Se não houver uma DLQ configurada, o Lambda descartará a mensagem.

  4. Se a mensagem for validada pelo registro do esquema e não for filtrada por seus critérios de filtro, o Lambda invocará sua função com a mensagem.

O objetivo desse recurso é validar mensagens que já foram produzidas usando clientes Kafka integrados a um registro de esquema. Recomendamos configurar seus produtores do Kafka para funcionarem com seu registro de esquema a fim de criar mensagens formatadas corretamente.

Configurar um registro do esquema do Kafka

As seguintes etapas do console adicionam uma configuração de registro do esquema do Kafka ao seu mapeamento da origem do evento.

Para adicionar uma configuração de registro do esquema do Kafka ao mapeamento da origem do evento (console)
  1. Abra a página Funções no console do Lambda.

  2. Escolher configuração.

  3. Escolha Gatilhos.

  4. Selecione o mapeamento da origem do evento do Kafka para o qual você deseja configurar um registro de esquema e, em seguida, escolha Editar.

  5. Em Configuração do agente de sondagem de eventos, escolha Configurar registro de esquema. Seu mapeamento da origem do evento deve estar no modo provisionado para você ver essa opção.

  6. Em URI do registro do esquema, insira o ARN do registro de esquema AWS Glue ou o URL HTTPS do Confluent Cloud Schema Registry ou do Registro Confluent Schema Registry autogerenciado.

  7. As etapas de configuração abaixo instruem o Lambda a acessar seu registro de esquema. Para obter mais informações, consulte Métodos de autenticação do registro de esquema.

    • Em Tipo de configuração de acesso, escolha o tipo de autenticação usado pelo Lambda para acessar seu registro de esquema.

    • Em URI de configuração de acesso, insira o ARN do segredo do Secrets Manager para autenticação com seu registro de esquema, se aplicável. Certifique-se de que o perfil de execução da sua função contenha as permissões corretas.

  8. O campo Criptografia apenas será aplicável se o registro do esquema for assinado por uma autoridade de certificação (CA) privada ou por uma autoridade de certificação (CA) que não esteja no repositório confiável do Lambda. Se aplicável, forneça a chave do segredo que contém o certificado da CA privada usado pelo registro de esquema para criptografia TLS.

  9. Para Formato do registro de eventos, escolha como você deseja que o Lambda forneça os registros da sua função após a validação do esquema. Para obter mais informações, consulte Exemplos de formatos de carga útil.

    • Se você escolher JSON, o Lambda fornecerá os atributos selecionados no atributo de validação do esquema abaixo, no formato JSON padrão. Para os atributos não selecionados, o Lambda os fornece como estão.

    • Se você escolher SOURCE, o Lambda fornecerá os atributos selecionados no atributo de validação de esquema abaixo, no formato JSON padrão.

  10. Para Atributo de validação de esquema, selecione os atributos de mensagens que você deseja que o Lambda valide e desserialize usando seu registro de esquema. É necessário selecionar pelo menos uma das opções KEY ou VALUE. Se você escolheu o JSON para o formato de registro de eventos, o Lambda também desserializa os atributos de mensagem selecionados antes de os enviar para sua função. Para obter mais informações, consulte Formatos de carga útil e comportamento de desserialização.

  11. Escolha Salvar.

Você também pode usar a API do Lambda para criar ou atualizar seu mapeamento de origem do evento com uma configuração de registro de esquema. Os exemplos a seguir demonstram como configurar um registro do esquema AWS Glue ou Confluent Schema Registry usando o AWS CLI, que corresponde às operações de API UpdateEventSourceMapping e CreateEventSourceMapping na Referência de APIs do AWS Lambda:

Importante

Se estiver atualizando qualquer campo de configuração do registro do esquema usando a API AWS CLI ou update-event-source-mapping, você deverá atualizar todos os campos da configuração do registro de esquema.

Create Event Source Mapping
aws lambda create-event-source-mapping \ --function-name my-schema-validator-function \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \ --topics my-kafka-topic \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update AWS Glue Schema Registry
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry with Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry without Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Remove Schema Registry Configuration

Para remover uma configuração de registro de esquema do mapeamento da origem do evento, é possível usar o comando da CLI UpdateEventSourceMapping na Referência a APIs do AWS Lambda.

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : {} }'

Filtragem para Avro e Protobuf

Ao usar os formatos Avro ou Protobuf com um registro de esquema, você pode aplicar a filtragem de eventos à sua função do Lambda. Os padrões de filtro são aplicados à representação JSON clássica desserializada dos dados após a validação do esquema. Por exemplo, com um esquema Avro definindo detalhes de produtos, incluindo preço, você pode filtrar mensagens com base no valor de preço:

nota

Ao ser desserializado, o Avro é convertido em JSON padrão, o que significa que não pode ser reconvertido diretamente em um objeto Avro. Se precisar converter em um objeto Avro, use o formato SOURCE em vez disso.

Para desserialização do Protobuf, os nomes dos campos no JSON resultante correspondem aos definidos no seu esquema, em vez de serem convertidos em camel case, como o Protobuf normalmente faz. Lembre-se disso ao criar padrões de filtragem.

aws lambda create-event-source-mapping \ --function-name myAvroFunction \ --topics myAvroTopic \ --starting-position TRIM_HORIZON \ --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \ --schema-registry-config '{ "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "VALUE" } ] }' \ --filter-criteria '{ "Filters": [ { "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }" } ] }'

Neste exemplo, o padrão de filtro analisa o objeto value, combinando as mensagens no field_1 com "value1" e no field_2 com "value2". Os critérios do filtro são avaliados em relação aos dados desserializados depois que o Lambda converte a mensagem do formato Avro em JSON.

Para obter informações mais detalhadas sobre a filtragem de eventos, consulte Filtragem de eventos do Lambda.

Formatos de cargas úteis e comportamento de desserialização

Ao usar um registro de esquema, o Lambda entrega a carga útil final para sua função em um formato semelhante à carga útil do evento regular, com alguns campos adicionais. Os campos adicionais dependem do parâmetro SchemaValidationConfigs. Para cada atributo selecionado para validação (chave ou valor), o Lambda adiciona os metadados do esquema correspondentes à carga útil.

nota

Você deve atualizar aws-lambda-java-events para a versão 3.16.0 ou superior para usar campos de metadados de esquema.

Por exemplo, se você validar o campo value, o Lambda adicionará um campo chamado valueSchemaMetadata à sua carga útil. Da mesma forma, para o campo key, o Lambda adiciona um campo chamado keySchemaMetadata. Esses metadados contêm informações sobre o formato dos dados e o ID do esquema usado para validação:

"valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }

O parâmetro EventRecordFormat pode ser definido como JSON ou SOURCE, o que determina como o Lambda lida com dados validados pelo esquema antes de os entregar à função. Cada opção oferece diferentes recursos de processamento:

  • JSON: o Lambda desserializa os atributos validados no formato JSON padrão, preparando os dados para uso direto em linguagens com suporte nativo para JSON. Esse formato é ideal quando não é necessário preservar o formato binário original ou trabalhar com classes geradas.

  • SOURCE: o Lambda preserva o formato binário original dos dados como uma string codificada em Base64, possibilitando a conversão direta em objetos Avro ou Protobuf. Esse formato é essencial ao trabalhar com linguagens de tipagem forte ou quando você precisa manter todos os recursos de esquemas Avro ou Protobuf.

Com base nessas características de formato e considerações específicas de linguagem, recomendamos os seguintes formatos:

Formatos recomendados com base na linguagem de programação
Idioma Avro Protobuf JSON
Java SOURCE SOURCE SOURCE
Python JSON JSON JSON
NodeJS JSON JSON JSON
.NET SOURCE SOURCE SOURCE
Outros JSON JSON JSON

As seções a seguir descrevem esses formatos em detalhes e fornecem exemplos de cargas úteis para cada um.

Formato JSON

Se você escolher JSON como EventRecordFormat, o Lambda validará e desserializará os atributos da mensagem selecionados no campo SchemaValidationConfigs (os atributos key e/ou value). O Lambda fornece esses atributos selecionados como strings codificadas em base64 da sua representação JSON padrão na sua função.

nota

Ao ser desserializado, o Avro é convertido em JSON padrão, o que significa que não pode ser reconvertido diretamente em um objeto Avro. Se precisar converter em um objeto Avro, use o formato SOURCE em vez disso.

Para desserialização do Protobuf, os nomes dos campos no JSON resultante correspondem aos definidos no seu esquema, em vez de serem convertidos em camel case, como o Protobuf normalmente faz. Lembre-se disso ao criar padrões de filtragem.

O seguinte exemplo mostra uma carga útil, supondo que você escolha JSON como EventRecordFormat, e os atributos key e value como SchemaValidationConfigs:

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Neste exemplo:

  • Tanto key quanto value são strings codificadas em base64 da sua representação JSON após a desserialização.

  • O Lambda inclui metadados de esquema para ambos os atributos em keySchemaMetadata e valueSchemaMetadata.

  • Sua função pode decodificar as strings key e value para acessar os dados JSON desserializados.

O formato JSON é recomendado para linguagens sem tipo forte, como Python ou Node.js. Essas linguagens têm suporte nativo para converter JSON em objetos.

Formato de origem

Se você escolher SOURCE como EventRecordFormat, o Lambda ainda validará o registro em relação ao registro de esquema, mas entregará os dados binários originais para sua função sem desserialização. Esses dados binários são fornecidos como uma string codificada em Base64 dos dados de bytes originais, com metadados anexados pelo produtor removidos. Como resultado, é possível converter diretamente os dados binários brutos em objetos Avro e Protobuf dentro do código da função. Recomendamos o uso do Powertools para AWS Lambda, que desserializará os dados binários brutos e fornecerá objetos Avro e Protobuf diretamente.

Por exemplo, se você configurar o Lambda para validar os atributos key e value, mas usar o formato SOURCE, sua função receberá uma carga útil como esta:

{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "mytopic-0": [ { "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Neste exemplo:

  • Tanto key quanto value contêm os dados binários originais como strings codificadas em Base64.

  • Sua função precisa lidar com a desserialização usando as bibliotecas apropriadas.

É recomendável escolher SOURCE para EventRecordFormat se você estiver usando objetos gerados pelo Avro ou pelo Protobuf, especialmente com funções Java. Isso porque o Java é uma linguagem com tipagem forte e requer desserializadores específicos para os formatos Avro e Protobuf. No código da função, você pode usar sua biblioteca Avro ou Protobuf preferida para desserializar os dados.

Trabalhar com dados desserializados em funções do Lambda

O Powertools para AWS Lambda ajuda a desserializar registros do Kafka no código da função com base no formato em uso. Esse utilitário simplifica o trabalho com registros do Kafka, manipulando a conversão dos dados e fornecendo objetos prontos para uso.

Para usar o Powertools para AWS Lambda em uma função, você precisa adicionar o Powertools para AWS Lambda como uma camada ou incluí-lo como uma dependência ao criar sua função do Lambda. Para obter instruções de configuração e mais informações, consulte a documentação do Powertools para AWS Lambda para sua linguagem preferida:

nota

Ao trabalhar com a integração de registros de esquemas, você pode escolher o formato SOURCE ou JSON. Cada opção oferece suporte a diferentes formatos de serialização, conforme mostrado a seguir:

Formato Suportes

SOURCE

Avro e Protobuf (usando a integração de registro de esquema do Lambda)

JSON

Dados JSON

Ao usar o formato SOURCE ou JSON, você pode usar o Powertools para AWS para ajudar a desserializar os dados no código da função. Estes são alguns exemplos de como lidar com diferentes formatos de dados:

AVRO

Exemplo de Java:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.avro.AvroProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_AVRO) public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) { for (ConsumerRecord<String, AvroProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); AvroProduct product = consumerRecord.value(); LOGGER.info("AvroProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Exemplo do Python:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") value_schema_str = open("customer_profile.avsc", "r").read() schema_config = SchemaConfig( value_schema_type="AVRO", value_schema=value_schema_str) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

Exemplo com TypeScript:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; const schema = '{ "type": "record", "name": "Product", "fields": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "price", "type": "double" } ] }'; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'avro', schema: schema, }, } );

Exemplo com .NET:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Avro; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
PROTOBUF

Exemplo de Java:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.protobuf.ProtobufProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class ProtobufDeserializationFunction implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_PROTOBUF) public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) { for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); ProtobufProduct product = consumerRecord.value(); LOGGER.info("ProtobufProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Exemplo do Python:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger from user_pb2 import User # protobuf generated class logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig( value_schema_type="PROTOBUF", value_schema=User) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

Exemplo com TypeScript:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; import { Product } from './product.generated.js'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); } }, { value: { type: 'protobuf', schema: Product, }, } );

Exemplo com .NET:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Protobuf; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
JSON

Exemplo de Java:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_JSON) public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) { for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) { LOGGER.info("ConsumerRecord: {}", consumerRecord); Product product = consumerRecord.value(); LOGGER.info("Product: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Exemplo do Python:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig(value_schema_type="JSON") @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

Exemplo com TypeScript:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'json', }, } );

Exemplo com .NET:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Json; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))] namespace JsonClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }

Métodos de autenticação do registro de esquema

Para usar um registro de esquema, o Lambda precisa ser capaz de acessá-lo com segurança. Se estiver trabalhando com um registro de esquema do AWS Glue, o Lambda depende da autenticação do IAM. Isso significa que a função de execução da função deve ter as seguintes permissões para acessar o registro do AWS Glue:

Exemplo de política do IAM necessária:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:GetRegistry", "glue:GetSchemaVersion" ], "Resource": [ "*" ] } ] }
nota

Para registros de esquema do AWS Glue, se você fornecer AccessConfigs para um registro do AWS Glue, o Lambda retornará uma exceção de validação.

Se você estiver trabalhando com um registro de esquema Confluent, poderá escolher um dos três métodos de autenticação compatíveis para o parâmetro Type do objeto KafkaSchemaRegistryAccessConfig:

  • BASIC_AUTH: o Lambda usa nome de usuário e a senha ou a autenticação de chave de API e o segredo de API para acessar seu registro. Se você escolher essa opção, forneça o ARN do Secrets Manager contendo suas credenciais no campo de URI.

  • CLIENT_CERTIFICATE_TLS_AUTH: o Lambda usa autenticação mútua TLS com certificados de cliente. Para usar essa opção, o Lambda precisa acessar o certificado e a chave privada. Forneça o ARN do Secrets Manager contendo essas credenciais no campo de URI.

  • NO_AUTH: o certificado de CA pública deve ser assinado por uma autoridade de certificação (CA) que esteja no repositório de confiança do Lambda. Para um certificado de CA privada/autoassinado, configure o certificado de CA raiz do servidor. Para usar essa opção, omita o parâmetro AccessConfigs.

Além disso, se o Lambda precisar acessar um certificado de CA privada para verificar o certificado TLS do registro de esquema, escolha SERVER_ROOT_CA_CERT como Type e forneça o ARN do Secrets Manager ao certificado no campo de URI.

nota

Para configurar a opção SERVER_ROOT_CA_CERT no console, forneça o ARN do segredo contendo o certificado no campo Criptografia.

A configuração de autenticação para o registro de esquema é separada de qualquer autenticação que você tenha configurado para o cluster Kafka. É necessário configurar os dois separadamente, mesmo que eles usem métodos de autenticação semelhantes.

Tratamento de erros e solução de problemas de registro de esquema

Ao usar um registro de esquema com a sua origem de evento do Amazon MSK, você pode se deparar com vários erros. Esta seção fornece orientação sobre problemas comuns e como resolvê-los.

Erros de configuração

Esses erros ocorrem ao configurar o registro de esquema.

Modo provisionado necessário

A mensagem de erro: SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.

Resolução: habilite o modo provisionado para o mapeamento da origem do evento configurando o parâmetro MinimumPollers em ProvisionedPollerConfig.

URL de registro de esquema inválido

A mensagem de erro: Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.

Resolução: forneça um URL HTTPS válido para o Confluent Schema Registry ou um ARN válido para o AWS Glue Schema Registry.

Formato de registro de evento inválido ou ausente

A mensagem de erro: EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.

Resolução: especifique SOURCE ou JSON como o EventRecordFormat na configuração do registro de esquema.

Atributos de validação duplicados

A mensagem de erro: Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.

Resolução: remova os atributos KEY ou VALUE duplicados dos seus SchemaValidationConfigs. Cada tipo de atributo somente pode aparecer uma vez.

Configuração de validação ausente

A mensagem de erro: SchemaValidationConfigs is a required field for SchemaRegistryConfig.

Resolução: adicione SchemaValidationConfigs à sua configuração, especificando pelo menos um atributo de validação (KEY ou VALUE).

Erros de acesso e permissões

Esses erros ocorrem quando o Lambda não consegue acessar o registro de esquema devido a problemas de permissão ou autenticação.

Acesso ao Registro do esquema do AWS Glue negado

A mensagem de erro: Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.

Resolução: adicione as permissões necessárias (glue:GetRegistry e glue:GetSchemaVersion) ao perfil de execução da sua função.

Acesso ao Confluent Schema Registry negado

A mensagem de erro: Cannot access Confluent Schema with the provided access configuration.

Resolução: verifique se as suas credenciais de autenticação (armazenadas no Secrets Manager) estão corretas e se têm as permissões necessárias para acessar o registro de esquema.

Registro de esquemas do AWS Glue entre contas

A mensagem de erro: Cross-account Glue Schema Registry ARN not supported.

Resolução: use um Registro de esquema do AWS Glue que esteja na mesma conta da AWS da sua função do Lambda.

Registro de esquema do AWS Glue entre regiões

A mensagem de erro: Cross-region Glue Schema Registry ARN not supported.

Resolução: use um Registro de esquema do AWS Glue que esteja na mesma região da sua função do Lambda.

Problemas de acesso a segredos

A mensagem de erro: Lambda received InvalidRequestException from Secrets Manager.

Resolução: verifique se o perfil de execução da função tem permissão para acessar o segredo e se o este não está criptografado com uma chave padrão do AWS KMS ao acessar de uma conta diferente.

Erros de conexão

Esses erros ocorrem quando o Lambda não consegue estabelecer uma conexão com o registro de esquema.

Problemas de conectividade com a VPC

A mensagem de erro: Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.

Resolução: configure sua rede VPC para permitir conexões com o registro de esquema usando um AWS PrivateLink, um gateway NAT ou emparelhamento de VPCs.

Falha do handshake TLS

A mensagem de erro: Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.

Resolução: verifique se seus certificados de CA e certificados de cliente (para mTLS) estão corretos e configurados corretamente no Secrets Manager.

Controle de utilização

A mensagem de erro: Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.

Resolução: aumente os limites de taxas de API para o registro de esquema ou reduza a taxa de solicitações provenientes da sua aplicação.

Erros de registro do esquema autogerenciado

A mensagem de erro: Lambda received an internal server an unexpected error from the provided self-managed schema registry.

Resolução: verifique a integridade e a configuração do seu servidor de registro de esquema autogerenciado.