Configuração dos controles de tratamento de erros para origens de eventos do Kafka
É possível configurar como o Lambda lida com erros e novas tentativas para os mapeamentos da origem do evento do Kafka. Essas configurações ajudam você a controlar como o Lambda processa registros com falha e gerencia o comportamento de novas tentativas.
Configurações disponíveis de novas tentativas
As configurações de novas tentativas a seguir estão disponíveis para origens de eventos do Amazon MSK e do Kafka autogerenciado:
-
Máximo de novas tentativas: o número máximo de vezes que o Lambda tenta novamente quando a função retorna um erro. Isso não conta a tentativa inicial de invocação. O valor padrão é -1 (infinito).
-
Idade máxima do registro: a idade máxima de um registro que o Lambda envia para sua função. O valor padrão é -1 (infinito).
-
Dividir lote em caso de erro: quando a sua função retorna um erro, o lote é dividido em dois lotes menores e cada um é tentado novamente em separado. Isso ajuda a isolar registros problemáticos.
-
Resposta em lote parcial: permite que sua função retorne informações sobre quais registros em um lote falharam no processamento, para que o Lambda possa repetir somente os registros com falha.
Configuração dos controles de tratamento de erro (console)
É possível configurar o comportamento de novas tentativas ao criar ou atualizar um mapeamento da origem do evento do Kafka no console do Lambda.
Para configurar o comportamento de novas tentativas para uma origem de eventos do Kafka (console)
-
Abra a página Funções
do console do Lambda. -
Escolha o nome da sua função.
-
Execute um destes procedimentos:
-
Para adicionar um novo acionador do Kafka, em Visão geral da função, escolha Adicionar acionador.
-
Para modificar um acionador existente do Kafka, escolha o acionador e, em seguida, escolha Editar.
-
-
Em Configuração do agente de sondagem de eventos, selecione o modo provisionado para configurar os controles de tratamento de erros:
-
Em Novas tentativas, insira o número máximo de novas tentativas (0-10000, ou -1 para infinito).
-
Em Idade máxima do registro, insira a idade máxima em segundos (60-604800, ou -1 para infinito).
-
Para habilitar a divisão dos lotes quando ocorrerem erros, selecione Dividir lote em caso de erro.
-
Para habilitar a resposta em lote parcial, selecione ReportBatchItemFailures.
-
-
Escolha Adicionar ou Salvar.
Configuração do comportamento de novas tentativas (AWS CLI)
Use os comandos da AWS CLI a seguir para configurar o comportamento de novas tentativas para seus mapeamentos da origem do evento do Kafka.
Criação de um mapeamento da origem do evento com novas tentativas
O exemplo a seguir cria um mapeamento da origem do evento do Kafka autogerenciado com controles de tratamento de erros:
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics my-kafka-topic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
Para origens de eventos do Amazon MSK:
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSMSKKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
Atualização das configurações de novas tentativas
Use o comando update-event-source-mapping para modificar as configurações de novas tentativas de um mapeamento da origem do evento existente:
aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --maximum-retry-attempts 5 \ --maximum-record-age-in-seconds 7200 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
PartialBatchResponse
A resposta em lote parcial, também conhecida como ReportBatchItemFailures, é um atributo fundamental para o tratamento de erros na integração do Lambda com origens do Kafka. Sem esse atributo, quando ocorre um erro em um dos itens de um lote, isso resulta no reprocessamento de todas as mensagens desse lote. Com a resposta em lote parcial habilitada e implementada, o manipulador retorna identificadores somente para as mensagens com falha, permitindo que o Lambda tente novamente apenas esses itens específicos. Isso fornece maior controle sobre como os lotes contendo mensagens com falha são processados.
Para relatar erros de lote, você usará este esquema JSON:
{ "batchItemFailures": [ { "itemIdentifier": { "topic-partition": "topic-partition_number", "offset": 100 } }, ... ] }
Importante
Se você retornar um JSON vazio válido ou nulo, o mapeamento da origem do evento considerará um lote como processado com êxito. Qualquer topic-partition_number ou deslocamento inválido retornado que não esteja presente no evento invocado será tratado como falha, e todo o lote será testado novamente.
Os exemplos de código a seguir mostram como implementar uma resposta parcial em lote para funções do Lambda que recebem eventos de origens do Kafka. A função relata as falhas do item em lote na resposta, sinalizando para o Lambda tentar novamente essas mensagens posteriormente.
Aqui está uma implementação do manipulador do Lambda em Python que mostra essa abordagem:
import base64 from typing import Any, Dict, List def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]: failures: List[Dict[str, Dict[str, Any]]] = [] records_dict = event.get("records", {}) for topic_partition, records_list in records_dict.items(): for record in records_list: topic = record.get("topic") partition = record.get("partition") offset = record.get("offset") value_b64 = record.get("value") try: data = base64.b64decode(value_b64).decode("utf-8") process_message(data) except Exception as exc: print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}") item_identifier: Dict[str, Any] = { "topic-partition": f"{topic}-{partition}", "offset": int(offset) if offset is not None else None, } failures.append({"itemIdentifier": item_identifier}) return {"batchItemFailures": failures} def process_message(data: str) -> None: # Your business logic for a single message pass
Aqui está uma versão em Node.js:
const { Buffer } = require("buffer"); const handler = async (event) => { const failures = []; for (let topicPartition in event.records) { const records = event.records[topicPartition]; for (const record of records) { const topic = record.topic; const partition = record.partition; const offset = record.offset; const valueBase64 = record.value; const data = Buffer.from(valueBase64, "base64").toString("utf8"); try { await processMessage(data); } catch (error) { console.error("Failed to process record", { topic, partition, offset, error }); const itemIdentifier = { "topic-partition": `${topic}-${partition}`, offset: Number(offset), }; failures.push({ itemIdentifier }); } } } return { batchItemFailures: failures }; }; async function processMessage(payload) { // Your business logic for a single message } module.exports = { handler };