Utilizzo dei registri degli schemi con le sorgenti di eventi Kafka in Lambda - AWS Lambda

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo dei registri degli schemi con le sorgenti di eventi Kafka in Lambda

I registri degli schemi ti aiutano a definire e gestire gli schemi dei flussi di dati. Uno schema definisce la struttura e il formato di un registro di dati. Nel contesto delle mappature delle sorgenti degli eventi Kafka, puoi configurare un registro degli schemi per convalidare la struttura e il formato dei messaggi Kafka rispetto a schemi predefiniti prima che raggiungano la tua funzione Lambda. Ciò aggiunge un livello di governance dei dati all'applicazione e consente di gestire in modo efficiente i formati di dati, garantire la conformità allo schema e ottimizzare i costi tramite il filtraggio degli eventi.

Questa funzionalità è compatibile con tutti i linguaggi di programmazione, ma considera questi punti importanti:

  • Powertools for Lambda fornisce supporto specifico per Java, Python e mantiene la coerenza con i modelli di sviluppo Kafka esistenti TypeScript e consente l'accesso diretto agli oggetti aziendali senza codice di deserializzazione personalizzato

  • Questa funzionalità è disponibile solo per le mappature delle sorgenti degli eventi che utilizzano la modalità provisioned. Il registro degli schemi non supporta i mapping delle sorgenti degli eventi in modalità su richiesta. Se si utilizza la modalità provisioning e si dispone di un registro degli schemi configurato, non è possibile passare alla modalità su richiesta a meno che non si rimuova prima la configurazione del registro dello schema. Per ulteriori informazioni, consulta Modalità provisioning

  • È possibile configurare un solo registro degli schemi per Event Source Mapping (ESM). L'utilizzo di un registro degli schemi con l'origine degli eventi Kafka può aumentare l'utilizzo della Lambda Event Poller Unit (EPU), che è una dimensione tariffaria per la modalità Provisioned.

Opzioni del registro dello schema

Lambda supporta le seguenti opzioni di registro dello schema:

Il registro degli schemi supporta la convalida dei messaggi nei seguenti formati di dati:

  • Apache Avro

  • Buffer di protocollo (Protobuf)

  • Schema JSON (JSON-SE)

Per utilizzare un registro degli schemi, assicurati innanzitutto che la mappatura delle sorgenti degli eventi sia in modalità provisioning. Quando si utilizza un registro dello schema, Lambda aggiunge i metadati relativi allo schema al payload. Per ulteriori informazioni, consulta Formati di payload e comportamento di deserializzazione.

Come Lambda esegue la convalida dello schema per i messaggi Kafka

Quando configuri un registro degli schemi, Lambda esegue i seguenti passaggi per ogni messaggio Kafka:

  1. Lambda analizza il record di Kafka dal tuo cluster.

  2. Lambda convalida gli attributi dei messaggi selezionati nel record rispetto a uno schema specifico nel registro degli schemi.

    • Se lo schema associato al messaggio non viene trovato nel registro, Lambda invia il messaggio a un DLQ con codice motivo. SCHEMA_NOT_FOUND

  3. Lambda deserializza il messaggio in base alla configurazione del registro dello schema per convalidarlo. Se il filtraggio degli eventi è configurato, Lambda esegue quindi il filtraggio in base ai criteri di filtro configurati.

    • Se la deserializzazione fallisce, Lambda invia il messaggio a un DLQ con il codice motivo. DESERIALIZATION_ERROR Se non è configurato alcun DLQ, Lambda rilascia il messaggio.

  4. Se il messaggio è convalidato dal registro dello schema e non viene filtrato in base ai criteri di filtro, Lambda richiama la funzione con il messaggio.

Questa funzionalità ha lo scopo di convalidare i messaggi già prodotti utilizzando client Kafka integrati con un registro degli schemi. Ti consigliamo di configurare i tuoi produttori Kafka in modo che utilizzino il registro degli schemi per creare messaggi formattati correttamente.

Configurazione di un registro degli schemi Kafka

I seguenti passaggi della console aggiungono una configurazione del registro dello schema Kafka alla mappatura delle sorgenti degli eventi.

Per aggiungere una configurazione del registro dello schema Kafka alla mappatura delle sorgenti degli eventi (console)
  1. Apri la pagina Funzione della console Lambda.

  2. Scegliere Configuration (Configurazione).

  3. Scegli Triggers.

  4. Seleziona la mappatura della sorgente degli eventi Kafka per cui desideri configurare un registro dello schema e scegli Modifica.

  5. In Event poller configuration, scegli Configura il registro dello schema. La mappatura delle sorgenti degli eventi deve essere in modalità provisioning per visualizzare questa opzione.

  6. Per l'URI del registro degli schemi, inserisci l'ARN del registro degli AWS Glue schemi o l'URL HTTPS del registro degli schemi di Confluent Cloud o del registro degli schemi Confluent Self-Managed.

  7. I seguenti passaggi di configurazione indicano a Lambda come accedere al registro degli schemi. Per ulteriori informazioni, consulta Metodi di autenticazione per il registro degli schemi.

    • Per il tipo di configurazione Access, scegli il tipo di autenticazione che Lambda utilizza per accedere al registro dello schema.

    • Per l'URI di configurazione di Access, inserisci l'ARN del segreto Secrets Manager per l'autenticazione con il registro dello schema, se applicabile. Assicurati che il ruolo di esecuzione della funzione contenga le autorizzazioni corrette.

  8. Il campo Encryption si applica solo se il registro dello schema è firmato da un'Autorità di certificazione (CA) privata o da un'autorità di certificazione (CA) che non si trova nel Lambda Trust Store. Se applicabile, fornisci la chiave segreta contenente il certificato CA privato utilizzato dal registro dello schema per la crittografia TLS.

  9. Per il formato di registrazione degli eventi, scegli come desideri che Lambda fornisca i record della tua funzione dopo la convalida dello schema. Per ulteriori informazioni, consulta Esempi di formato Payload.

    • Se scegli JSON, Lambda fornisce gli attributi selezionati nell'attributo di convalida dello schema riportato di seguito in formato JSON standard. Per gli attributi che non selezioni, Lambda li fornisce così come sono.

    • Se scegli SOURCE, Lambda fornisce gli attributi selezionati nell'attributo di convalida dello schema riportato di seguito nel formato di origine originale.

  10. Per l'attributo di convalida dello schema, seleziona gli attributi del messaggio che desideri che Lambda convalidi e deserializzi utilizzando il registro dello schema. È necessario selezionare almeno uno dei valori KEY o VALUE. Se hai scelto JSON per il formato di registrazione degli eventi, Lambda deserializza anche gli attributi del messaggio selezionati prima di inviarli alla tua funzione. Per ulteriori informazioni, consulta Formati di payload e comportamento di deserializzazione.

  11. Scegli Save (Salva).

Puoi anche utilizzare l'API Lambda per creare o aggiornare la mappatura delle sorgenti degli eventi con una configurazione del registro dello schema. Gli esempi seguenti mostrano come configurare un registro di schemi AWS Glue o Confluent utilizzando AWS CLI, che corrisponde alle operazioni UpdateEventSourceMappinge CreateEventSourceMappingAPI nell'API Reference:AWS Lambda

Importante

Se si sta aggiornando un campo di configurazione del registro dello schema utilizzando AWS CLI o l'update-event-source-mappingAPI, è necessario aggiornare tutti i campi della configurazione del registro dello schema.

Create Event Source Mapping
aws lambda create-event-source-mapping \ --function-name my-schema-validator-function \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/a1b2c3d4-5678-90ab-cdef-11111EXAMPLE \ --topics my-kafka-topic \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update AWS Glue Schema Registry
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/registryName", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry with Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "AccessConfigs": [{ "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:secretName" }], "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Update Confluent Schema Registry without Authentication
aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : { "SchemaRegistryURI": "https://abcd-ef123.us-west-2.aws.confluent.cloud", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "KEY" }, { "Attribute": "VALUE" }] } }'
Remove Schema Registry Configuration

Per rimuovere una configurazione del registro dello schema dalla mappatura dell'origine degli eventi, puoi utilizzare il UpdateEventSourceMappingcomando AWS Lambda CLI nell'API Reference.

aws lambda update-event-source-mapping \ --uuid a1b2c3d4-5678-90ab-cdef-EXAMPLE11111 \ --amazon-managed-kafka-event-source-mapping '{ "SchemaRegistryConfig" : {} }'

Filtraggio per Avro e Protobuf

Quando si utilizzano i formati Avro o Protobuf con un registro degli schemi, è possibile applicare il filtro degli eventi alla funzione Lambda. I modelli di filtro vengono applicati alla rappresentazione JSON classica deserializzata dei dati dopo la convalida dello schema. Ad esempio, con uno schema Avro che definisce i dettagli del prodotto, incluso il prezzo, puoi filtrare i messaggi in base al valore del prezzo:

Nota

Quando viene deserializzato, Avro viene convertito in JSON standard, il che significa che non può essere riconvertito direttamente in un oggetto Avro. Se devi eseguire la conversione in un oggetto Avro, usa invece il formato SOURCE.

Per la deserializzazione di Protobuf, i nomi dei campi nel JSON risultante corrispondono a quelli definiti nello schema, anziché essere convertiti in camel case come fa normalmente Protobuf. Tienilo a mente quando crei modelli di filtraggio.

aws lambda create-event-source-mapping \ --function-name myAvroFunction \ --topics myAvroTopic \ --starting-position TRIM_HORIZON \ --kafka-bootstrap-servers '["broker1:9092", "broker2:9092"]' \ --schema-registry-config '{ "SchemaRegistryURI": "arn:aws:glue:us-east-1:123456789012:registry/myAvroRegistry", "EventRecordFormat": "JSON", "SchemaValidationConfigs": [ { "Attribute": "VALUE" } ] }' \ --filter-criteria '{ "Filters": [ { "Pattern": "{ \"value\" : { \"field_1\" : [\"value1\"], \"field_2\" : [\"value2\"] } }" } ] }'

In questo esempio, lo schema di filtro analizza l'valueoggetto, associando i messaggi field_1 con "value1" e field_2 con. "value2" I criteri di filtro vengono valutati rispetto ai dati deserializzati, dopo che Lambda converte il messaggio dal formato Avro a JSON.

Per informazioni più dettagliate sul filtraggio degli eventi, consulta Filtraggio degli eventi Lambda.

Formati di payload e comportamento di deserializzazione

Quando si utilizza un registro degli schemi, Lambda fornisce il payload finale alla funzione in un formato simile al normale payload degli eventi, con alcuni campi aggiuntivi. I campi aggiuntivi dipendono dal parametro. SchemaValidationConfigs Per ogni attributo selezionato per la convalida (chiave o valore), Lambda aggiunge i metadati dello schema corrispondenti al payload.

Nota

Devi aggiornare il tuo aws-lambda-java-eventsalla versione 3.16.0 o successiva per utilizzare i campi dei metadati dello schema.

Ad esempio, se convalidi il value campo, Lambda aggiunge un campo valueSchemaMetadata chiamato al tuo payload. Allo stesso modo, per il key campo, Lambda aggiunge un campo chiamato. keySchemaMetadata Questi metadati contengono informazioni sul formato dei dati e sull'ID dello schema utilizzati per la convalida:

"valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }

Il EventRecordFormat parametro può essere impostato su JSON oSOURCE, il che determina in che modo Lambda gestisce i dati convalidati dallo schema prima di consegnarli alla tua funzione. Ogni opzione offre diverse funzionalità di elaborazione:

  • JSON- Lambda deserializza gli attributi convalidati in formato JSON standard, rendendo i dati pronti per l'uso diretto nelle lingue con supporto JSON nativo. Questo formato è ideale quando non è necessario conservare il formato binario originale o lavorare con classi generate.

  • SOURCE- Lambda conserva il formato binario originale dei dati come stringa con codifica Base64, che consente la conversione diretta in oggetti Avro o Protobuf. Questo formato è essenziale quando si lavora con linguaggi fortemente tipizzati o quando è necessario mantenere tutte le funzionalità degli schemi Avro o Protobuf.

In base a queste caratteristiche di formato e alle considerazioni specifiche della lingua, consigliamo i seguenti formati:

Formati consigliati basati sul linguaggio di programmazione
Lingua Avro Protobuf JSON
Java SOURCE SOURCE SOURCE
Python JSON JSON JSON
NodeJS JSON JSON JSON
.NET SOURCE SOURCE SOURCE
Altri JSON JSON JSON

Le sezioni seguenti descrivono questi formati in dettaglio e forniscono esempi di payload per ogni formato.

Formato JSON

Se scegli JSON comeEventRecordFormat, Lambda convalida e deserializza gli attributi del messaggio che hai selezionato nel campo (gli attributi). SchemaValidationConfigs key and/or value Lambda fornisce questi attributi selezionati come stringhe con codifica base64 della loro rappresentazione JSON standard nella funzione.

Nota

Quando viene deserializzato, Avro viene convertito in JSON standard, il che significa che non può essere riconvertito direttamente in un oggetto Avro. Se devi eseguire la conversione in un oggetto Avro, usa invece il formato SOURCE.

Per la deserializzazione di Protobuf, i nomi dei campi nel JSON risultante corrispondono a quelli definiti nello schema, anziché essere convertiti in camel case come fa normalmente Protobuf. Tienilo a mente quando crei modelli di filtraggio.

Di seguito viene mostrato un esempio di payload, supponendo che tu scelga JSON come e che entrambi gli attributi key e value siano: EventRecordFormat SchemaValidationConfigs

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", //Base64 encoded string of JSON "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value":"abcDEFghiJKLmnoPQRstuVWXyz1234", //Base64 encoded string of JSON "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

In questo esempio:

  • Entrambi key value sono stringhe codificate in base64 della loro rappresentazione JSON dopo la deserializzazione.

  • Lambda include i metadati dello schema per entrambi gli attributi in e. keySchemaMetadata valueSchemaMetadata

  • La tua funzione può decodificare le value stringhe key and per accedere ai dati JSON deserializzati.

Il formato JSON è consigliato per i linguaggi che non sono fortemente tipizzati, come Python o Node.js. Questi linguaggi dispongono del supporto nativo per la conversione di JSON in oggetti.

Formato sorgente

Se scegli SOURCE comeEventRecordFormat, Lambda convalida comunque il record rispetto al registro dello schema, ma fornisce i dati binari originali alla tua funzione senza deserializzazione. Questi dati binari vengono forniti come stringa codificata in Base64 dei dati in byte originali, con i metadati aggiunti dal produttore rimossi. Di conseguenza, potete convertire direttamente i dati binari non elaborati in oggetti Avro e Protobuf all'interno del codice della funzione. Ti consigliamo di utilizzare Powertools for AWS Lambda, che deserializzerà i dati binari grezzi e ti fornirà direttamente gli oggetti Avro e Protobuf.

Ad esempio, se configuri Lambda per convalidare entrambi value gli attributi key e ma utilizzi il SOURCE formato, la tua funzione riceve un payload come questo:

{ "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/a1b2c3d4-5678-90ab-cdef-EXAMPLE11111-1", "bootstrapServers": "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records": { "mytopic-0": [ { "topic": "mytopic", "partition": 0, "offset": 15, "timestamp": 1545084650987, "timestampType": "CREATE_TIME", "key": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "keySchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "value": "abcDEFghiJKLmnoPQRstuVWXyz1234==", // Base64 encoded string of Original byte data, producer-appended metadata removed "valueSchemaMetadata": { "dataFormat": "AVRO", "schemaId": "a1b2c3d4-5678-90ab-cdef-EXAMPLE11111" }, "headers": [ { "headerKey": [ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

In questo esempio:

  • Entrambi value contengono key i dati binari originali come stringhe codificate in Base64.

  • La tua funzione deve gestire la deserializzazione utilizzando le librerie appropriate.

La scelta di SOURCE for EventRecordFormat è consigliata se utilizzate oggetti generati da AVRO o da ProtoBuf, specialmente con le funzioni Java. Questo perché Java è fortemente tipizzato e richiede deserializzatori specifici per i formati Avro e Protobuf. Nel codice della funzione, puoi usare la tua libreria Avro o Protobuf preferita per deserializzare i dati.

Utilizzo di dati deserializzati nelle funzioni Lambda

Powertools for AWS Lambda consente di deserializzare i record Kafka nel codice della funzione in base al formato utilizzato. Questa utilità semplifica l'utilizzo dei record di Kafka gestendo la conversione dei dati e fornendo oggetti. ready-to-use

Per utilizzare Powertools for AWS Lambda nella tua funzione, devi aggiungere Powertools come livello o includerlo come dipendenza quando crei la tua funzione Lambda. AWS Lambda Per istruzioni di configurazione e ulteriori informazioni, consultate Powertools per la AWS Lambda documentazione relativa alla vostra lingua preferita:

Nota

Quando si lavora con l'integrazione del registro dello schema, è possibile scegliere SOURCE o JSON formattare. Ogni opzione supporta diversi formati di serializzazione, come illustrato di seguito:

Formato Supporta

SOURCE

Avro e Protobuf (utilizzando l'integrazione del Lambda Schema Registry)

JSON

Dati JSON

Quando si utilizza il JSON formato SOURCE o, è possibile utilizzare Powertools per aiutare AWS a deserializzare i dati nel codice della funzione. Ecco alcuni esempi di come gestire diversi formati di dati:

AVRO

Esempio di Java:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.avro.AvroProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class AvroDeserializationFunction implements RequestHandler<ConsumerRecords<String, AvroProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(AvroDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_AVRO) public String handleRequest(ConsumerRecords<String, AvroProduct> records, Context context) { for (ConsumerRecord<String, AvroProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); AvroProduct product = consumerRecord.value(); LOGGER.info("AvroProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Esempio di Python:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") value_schema_str = open("customer_profile.avsc", "r").read() schema_config = SchemaConfig( value_schema_type="AVRO", value_schema=value_schema_str) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript esempio:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; const schema = '{ "type": "record", "name": "Product", "fields": [ { "name": "id", "type": "int" }, { "name": "name", "type": "string" }, { "name": "price", "type": "double" } ] }'; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'avro', schema: schema, }, } );

Esempio.NET:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Avro; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaAvroSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
PROTOBUF

Esempio di Java:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.demo.kafka.protobuf.ProtobufProduct; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class ProtobufDeserializationFunction implements RequestHandler<ConsumerRecords<String, ProtobufProduct>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(ProtobufDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_PROTOBUF) public String handleRequest(ConsumerRecords<String, ProtobufProduct> records, Context context) { for (ConsumerRecord<String, ProtobufProduct> consumerRecord : records) { LOGGER.info("ConsumerRecord: {}", consumerRecord); ProtobufProduct product = consumerRecord.value(); LOGGER.info("ProtobufProduct: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Esempio di Python:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger from user_pb2 import User # protobuf generated class logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig( value_schema_type="PROTOBUF", value_schema=User) @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript esempio:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; import { Product } from './product.generated.js'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); } }, { value: { type: 'protobuf', schema: Product, }, } );

Esempio.NET:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Protobuf; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaProtobufSerializer))] namespace ProtoBufClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }
JSON

Esempio di Java:

package org.demo.kafka; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import software.amazon.lambda.powertools.kafka.Deserialization; import software.amazon.lambda.powertools.kafka.DeserializationType; import software.amazon.lambda.powertools.logging.Logging; public class JsonDeserializationFunction implements RequestHandler<ConsumerRecords<String, Product>, String> { private static final Logger LOGGER = LoggerFactory.getLogger(JsonDeserializationFunction.class); @Override @Logging @Deserialization(type = DeserializationType.KAFKA_JSON) public String handleRequest(ConsumerRecords<String, Product> consumerRecords, Context context) { for (ConsumerRecord<String, Product> consumerRecord : consumerRecords) { LOGGER.info("ConsumerRecord: {}", consumerRecord); Product product = consumerRecord.value(); LOGGER.info("Product: {}", product); String key = consumerRecord.key(); LOGGER.info("Key: {}", key); } return "OK"; } }

Esempio di Python:

from aws_lambda_powertools.utilities.kafka_consumer.kafka_consumer import kafka_consumer from aws_lambda_powertools.utilities.kafka_consumer.schema_config import SchemaConfig from aws_lambda_powertools.utilities.kafka_consumer.consumer_records import ConsumerRecords from aws_lambda_powertools.utilities.typing import LambdaContext from aws_lambda_powertools import Logger logger = Logger(service="kafkaConsumerPowertools") schema_config = SchemaConfig(value_schema_type="JSON") @kafka_consumer(schema_config=schema_config) def lambda_handler(event: ConsumerRecords, context:LambdaContext): for record in event.records: value = record.value logger.info(f"Received value: {value}")

TypeScript esempio:

import { kafkaConsumer } from '@aws-lambda-powertools/kafka'; import type { ConsumerRecords } from '@aws-lambda-powertools/kafka/types'; import { Logger } from '@aws-lambda-powertools/logger'; import type { Context } from 'aws-lambda'; const logger = new Logger(); type Value = { id: number; name: string; price: number; }; export const handler = kafkaConsumer<string, Value>( (event: ConsumerRecords<string, Value>, _context: Context) => { for (const record of event.records) { logger.info(Processing record with key: ${record.key}); logger.info(Record value: ${JSON.stringify(record.value)}); // You can add more processing logic here } }, { value: { type: 'json', }, } );

Esempio.NET:

using Amazon.Lambda.Core; using AWS.Lambda.Powertools.Kafka; using AWS.Lambda.Powertools.Kafka.Json; using AWS.Lambda.Powertools.Logging; using Com.Example; // Assembly attribute to enable the Lambda function's Kafka event to be converted into a .NET class. [assembly: LambdaSerializer(typeof(PowertoolsKafkaJsonSerializer))] namespace JsonClassLibrary; public class Function { public string FunctionHandler(ConsumerRecords<string, CustomerProfile> records, ILambdaContext context) { foreach (var record in records) { Logger.LogInformation("Processing messagem from topic: {topic}", record.Topic); Logger.LogInformation("Partition: {partition}, Offset: {offset}", record.Partition, record.Offset); Logger.LogInformation("Produced at: {timestamp}", record.Timestamp); foreach (var header in record.Headers.DecodedValues()) { Logger.LogInformation($"{header.Key}: {header.Value}"); } Logger.LogInformation("Processing order for: {fullName}", record.Value.FullName); } return "Processed " + records.Count() + " records"; } }

Metodi di autenticazione per il registro degli schemi

Per utilizzare un registro degli schemi, Lambda deve essere in grado di accedervi in modo sicuro. Se lavori con un registro di AWS Glue schemi, Lambda si affida all'autenticazione IAM. Ciò significa che il ruolo di esecuzione della funzione deve disporre delle seguenti autorizzazioni per accedere al registro: AWS Glue

Esempio della policy IAM richiesta:

JSON
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "glue:GetRegistry", "glue:GetSchemaVersion" ], "Resource": [ "*" ] } ] }
Nota

Per i registri AWS Glue dello schema, se fornisci AccessConfigs un AWS Glue registro, Lambda restituirà un'eccezione di convalida.

Se lavori con un registro di schemi Confluent, puoi scegliere uno dei tre metodi di autenticazione supportati per il parametro del tuo oggetto: Type KafkaSchemaRegistryAccessConfig

  • BASIC_AUTH — Lambda utilizza nome utente e password o chiave API e autenticazione segreta API per accedere al registro. Se scegli questa opzione, fornisci l'ARN di Secrets Manager contenente le tue credenziali nel campo URI.

  • CLIENT_CERTIFICATE_TLS_AUTH — Lambda utilizza l'autenticazione TLS reciproca con i certificati client. Per utilizzare questa opzione, Lambda deve accedere sia al certificato che alla chiave privata. Fornisci l'ARN di Secrets Manager contenente queste credenziali nel campo URI.

  • NO_AUTH — Il certificato CA pubblico deve essere firmato da un'autorità di certificazione (CA) presente nel Lambda Trust Store. Per un certificato CA privato/autofirmato, si configura il certificato CA principale del server. Per utilizzare questa opzione, ometti il parametro. AccessConfigs

Inoltre, se Lambda necessita dell'accesso a un certificato CA privato per verificare il certificato TLS del registro dello schema, scegli SERVER_ROOT_CA_CERT Type e fornisci l'ARN di Secrets Manager al certificato nel campo URI.

Nota

Per configurare l'SERVER_ROOT_CA_CERTopzione nella console, fornisci l'ARN segreto contenente il certificato nel campo Crittografia.

La configurazione di autenticazione per il registro degli schemi è separata da qualsiasi autenticazione configurata per il cluster Kafka. È necessario configurarli entrambi separatamente, anche se utilizzano metodi di autenticazione simili.

Gestione degli errori e risoluzione dei problemi relativi al registro dello schema

Quando si utilizza un registro degli schemi con l'origine degli eventi Amazon MSK, è possibile che si verifichino diversi errori. Questa sezione fornisce indicazioni sui problemi più comuni e su come risolverli.

Errori di configurazione

Questi errori si verificano durante l'impostazione della configurazione del registro dello schema.

È richiesta la modalità di provisioning

Messaggio di errore: SchemaRegistryConfig is only available for Provisioned Mode. To configure Schema Registry, please enable Provisioned Mode by specifying MinimumPollers in ProvisionedPollerConfig.

Risoluzione: abilita la modalità provisioning per la mappatura delle sorgenti degli eventi configurando il parametro in. MinimumPollers ProvisionedPollerConfig

URL del registro dello schema non valido

Messaggio di errore: Malformed SchemaRegistryURI provided. Please provide a valid URI or ARN. For example, https://schema-registry.example.com:8081 or arn:aws:glue:us-east-1:123456789012:registry/ExampleRegistry.

Risoluzione: fornire un URL HTTPS valido per Confluent Schema Registry o un ARN valido per Schema Registry. AWS Glue

Formato di registrazione degli eventi non valido o mancante

Messaggio di errore: EventRecordFormat is a required field for SchemaRegistryConfig. Please provide one of supported format types: SOURCE, JSON.

Risoluzione: specifica SOURCE o JSON come file EventRecordFormat nella configurazione del registro dello schema.

Attributi di convalida duplicati

Messaggio di errore: Duplicate KEY/VALUE Attribute in SchemaValidationConfigs. SchemaValidationConfigs must contain at most one KEY/VALUE Attribute.

Risoluzione: rimuovi gli attributi KEY o VALUE duplicati dal tuo. SchemaValidationConfigs Ogni tipo di attributo può apparire solo una volta.

Configurazione di convalida mancante

Messaggio di errore: SchemaValidationConfigs is a required field for SchemaRegistryConfig.

Risoluzione: aggiungi SchemaValidationConfigs alla tua configurazione, specificando almeno un attributo di convalida (KEY o VALUE).

Errori di accesso e autorizzazione

Questi errori si verificano quando Lambda non può accedere al registro dello schema a causa di problemi di autorizzazione o autenticazione.

AWS Glue Accesso al registro degli schemi negato

Messaggio di errore: Cannot access Glue Schema with provided role. Please ensure the provided role can perform the GetRegistry and GetSchemaVersion Actions on your schema.

Risoluzione: aggiungi le autorizzazioni richieste (glue:GetRegistryeglue:GetSchemaVersion) al ruolo di esecuzione della funzione.

Accesso al Confluent Schema Registry negato

Messaggio di errore: Cannot access Confluent Schema with the provided access configuration.

Risoluzione: verifica che le credenziali di autenticazione (archiviate in Secrets Manager) siano corrette e dispongano delle autorizzazioni necessarie per accedere al registro dello schema.

Registro degli schemi tra account AWS Glue

Messaggio di errore: Cross-account Glue Schema Registry ARN not supported.

Risoluzione: utilizza un registro AWS Glue degli schemi che si trova nello stesso AWS account della funzione Lambda.

Registro degli schemi tra regioni AWS Glue

Messaggio di errore: Cross-region Glue Schema Registry ARN not supported.

Risoluzione: utilizza un registro AWS Glue degli schemi che si trova nella stessa area della funzione Lambda.

Problemi di accesso segreto

Messaggio di errore: Lambda received InvalidRequestException from Secrets Manager.

Risoluzione: verifica che il ruolo di esecuzione della funzione sia autorizzato ad accedere al segreto e che il segreto non sia crittografato con una AWS KMS chiave predefinita se si accede da un account diverso.

Errori di connessione

Questi errori si verificano quando Lambda non riesce a stabilire una connessione al registro dello schema.

Problemi di connettività VPC

Messaggio di errore: Cannot connect to your Schema Registry. Your Kafka cluster's VPC must be able to connect to the schema registry. You can provide access by configuring AWS PrivateLink or a NAT Gateway or VPC Peering between Kafka Cluster VPC and the schema registry VPC.

Risoluzione: configura la rete VPC per consentire le connessioni al registro dello schema utilizzando AWS PrivateLink un gateway NAT o un peering VPC.

Errore di handshake TLS

Messaggio di errore: Unable to establish TLS handshake with the schema registry. Please provide correct CA-certificate or client certificate using Secrets Manager to access your schema registry.

Risoluzione: verifica che i certificati CA e i certificati client (per MTL) siano corretti e configurati correttamente in Secrets Manager.

Throttling

Messaggio di errore: Receiving throttling errors when accessing the schema registry. Please increase API TPS limits for your schema registry.

Risoluzione: aumenta i limiti di velocità delle API per il registro degli schemi o riduci la frequenza delle richieste provenienti dall'applicazione.

Errori del registro dello schema autogestiti

Messaggio di errore: Lambda received an internal server an unexpected error from the provided self-managed schema registry.

Risoluzione: verifica lo stato e la configurazione del server del registro degli schemi autogestito.