Configuración de los controles de gestión de errores para orígenes de eventos de Kafka - AWS Lambda

Configuración de los controles de gestión de errores para orígenes de eventos de Kafka

Puede configurar la forma en que Lambda administra los errores y los reintentos de las asignaciones de orígenes de eventos de Kafka. Estas configuraciones lo ayudan a controlar la forma en que Lambda procesa los registros fallidos y administra el comportamiento de los reintentos.

Configuraciones de reintento disponibles

Las siguientes configuraciones de reintento están disponibles para orígenes de eventos de Amazon MSK y autoadministrados de Kafka:

  • Número máximo de reintentos: número máximo de reintentos que Lambda realiza cuando la función devuelve un error. Esto no cuenta el intento de invocación inicial. El valor predeterminado es -1 (infinito).

  • Antigüedad máxima de registro: antigüedad máxima de un registro que Lambda envía a su función. El valor predeterminado es -1 (infinito).

  • División del lote en caso de error: cuando la función devuelve un error, divida el lote en dos lotes más pequeños y reintente cada uno por separado. Esto ayuda a aislar los registros problemáticos.

  • Respuesta de lote parcial: permita que la función devuelva información sobre los registros de un lote que no se procesaron correctamente, de modo que Lambda solo pueda reintentar los registros con errores.

Configuración de los controles de gestión de errores (consola)

Puede configurar el comportamiento de reintento cuando crea o actualiza una asignación de orígenes de eventos de Kafka en la consola de Lambda.

Cómo configurar el comportamiento de reintento de un origen de eventos de Kafka (consola)
  1. Abra la página de Funciones en la consola de Lambda.

  2. Elija el nombre de su función.

  3. Realice una de las siguientes acciones:

    • Para añadir un nuevo desencadenador de Kafka, en Descripción general de la función, seleccione Añadir desencadenador.

    • Para modificar un desencadenador de Kafka existente, selecciónelo y, a continuación, elija Editar.

  4. En Configuración del sondeador de eventos, seleccione el modo aprovisionado para configurar los controles de gestión de errores:

    1. En Reintentos, introduzca el número máximo de reintentos (0-10000 o -1 para infinito).

    2. En Antigüedad máxima registrada, introduzca la antigüedad máxima en segundos (60-604 800 o -1 para infinito).

    3. Para habilitar la división por lotes cuando se produzcan errores, seleccione Dividir lote en caso de error.

    4. Para habilitar la respuesta parcial por lotes, seleccione ReportBatchItemFailures.

  5. Elija Añadir o Guardar.

Cómo configurar el comportamiento de reintento (AWS CLI)

Utilice los siguientes comandos de la AWS CLI para configurar el comportamiento de los reintentos en las asignaciones de orígenes de eventos de Kafka.

Creación de una asignación de orígenes de eventos con configuraciones de reintento

En el siguiente ejemplo, se crea una asignación de orígenes de eventos de Kafka autoadministrado con controles de gestión de errores:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics my-kafka-topic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"

En el caso de orígenes de eventos de Amazon MSK:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSMSKKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"

Actualización de las configuraciones de reintento

Utilice el comando update-event-source-mapping para modificar las configuraciones de reintentos de una asignación de orígenes de eventos existente:

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --maximum-retry-attempts 5 \ --maximum-record-age-in-seconds 7200 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"

PartialBatchResponse

La respuesta parcial por lotes, también conocida como ReportBatchItemFailures, es una característica clave para la gestión de errores en la integración de Lambda con los orígenes de Kafka. Sin esta característica, cuando se produce un error en uno de los elementos de un lote, se vuelven a procesar todos los mensajes de ese lote. Con la respuesta parcial por lotes habilitada e implementada, el controlador devuelve los identificadores solo de los mensajes fallidos, lo que permite que Lambda reintente solo esos elementos específicos. Esto proporciona un mayor control sobre cómo se procesan los lotes que contienen mensajes fallidos.

Para informar los errores de lote, utilizará este esquema JSON:

{ "batchItemFailures": [ { "itemIdentifier": { "partition": "topic-partition_number", "offset": 100 } }, ... ] }
importante

Si devuelve una JSON válida vacía o nula, la asignación de orígenes de eventos considerará que el lote se ha procesado correctamente. Cualquier topic-partition_number o desplazamiento inválido devuelto que no estuviera presente en el evento invocado se considerará un error, y se reintentará procesar todo el lote.

Los siguientes ejemplos de código muestran cómo implementar una respuesta parcial por lotes para las funciones de Lambda que reciben eventos de orígenes de Kafka. La función informa los errores de los elementos del lote en la respuesta y le indica a Lambda que vuelva a intentar esos mensajes más adelante.

Esta es una implementación del controlador de Lambda de Python que muestra este enfoque:

import base64 from typing import Any, Dict, List def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]: failures: List[Dict[str, Dict[str, Any]]] = [] records_dict = event.get("records", {}) for topic_partition, records_list in records_dict.items(): for record in records_list: topic = record.get("topic") partition = record.get("partition") offset = record.get("offset") value_b64 = record.get("value") try: data = base64.b64decode(value_b64).decode("utf-8") process_message(data) except Exception as exc: print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}") item_identifier: Dict[str, Any] = { "partition": f"{topic}-{partition}", "offset": int(offset) if offset is not None else None, } failures.append({"itemIdentifier": item_identifier}) return {"batchItemFailures": failures} def process_message(data: str) -> None: # Your business logic for a single message pass

Esta es una versión de Node.js:

const { Buffer } = require("buffer"); const handler = async (event) => { const failures = []; for (let topicPartition in event.records) { const records = event.records[topicPartition]; for (const record of records) { const topic = record.topic; const partition = record.partition; const offset = record.offset; const valueBase64 = record.value; const data = Buffer.from(valueBase64, "base64").toString("utf8"); try { await processMessage(data); } catch (error) { console.error("Failed to process record", { topic, partition, offset, error }); const itemIdentifier = { "partition": `${topic}-${partition}`, "offset": Number(offset), }; failures.push({ itemIdentifier }); } } } return { batchItemFailures: failures }; }; async function processMessage(payload) { // Your business logic for a single message } module.exports = { handler };