Uso de un tema de Kafka como destino en caso de error - AWS Lambda

Uso de un tema de Kafka como destino en caso de error

Puede configurar un tema de Kafka como destino en caso de error para las asignaciones de orígenes de eventos de Kafka. Cuando Lambda no puede procesar los registros tras agotar los reintentos o cuando los registros superan la antigüedad máxima, Lambda envía los registros fallidos al tema de Kafka especificado para su posterior procesamiento.

Cómo funciona un destino en caso de error de Kafka

Cuando configura un tema de Kafka como destino en caso de error, Lambda actúa como productor de Kafka y escribe los registros fallidos en el tema de destino. Esto crea un patrón de temas de mensajes fallidos (DLT) en su infraestructura de Kafka.

  • Mismo requisito de clúster: el tema de destino debe estar en el mismo clúster de Kafka que los temas de origen.

  • Contenido real del registro: los destinos de Kafka reciben los registros fallidos reales junto con los metadatos del error.

  • Prevención de recursión: Lambda evita los bucles infinitos porque bloquea las configuraciones en las que los temas de origen y destino son los mismos.

Configuración de un destino en caso de error de Kafka

Puede configurar un tema de Kafka como un destino en caso de error al crear o actualizar una asignación de orígenes de eventos de Kafka.

Configuración de un destino de Kafka (consola)

Cómo configurar un tema de Kafka como destino en caso de error (consola)
  1. Abra la página de Funciones en la consola de Lambda.

  2. Elija el nombre de su función.

  3. Realice una de las siguientes acciones:

    • Para añadir un nuevo desencadenador de Kafka, en Descripción general de la función, seleccione Añadir desencadenador.

    • Para modificar un desencadenador de Kafka existente, selecciónelo y, a continuación, elija Editar.

  4. En Configuración adicional, en Destino en caso de error, elija Tema de Kafka.

  5. En Nombre del tema, escriba el nombre del tema de Kafka al que quiere enviar los registros fallidos.

  6. Elija Añadir o Guardar.

Configuración de un destino de Kafka (AWS CLI)

Utilice el prefijo kafka:// para especificar un tema de Kafka como destino en caso de error.

Creación de una asignación de orígenes de eventos con el destino de Kafka

En el siguiente ejemplo, se crea una asignación de orígenes de eventos de Amazon MSK con un tema de Kafka como destino en caso de error:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

En el caso de Kafka autoadministrado, utilice la misma sintaxis:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Actualización de un destino de Kafka

Utilice el comando update-event-source-mapping para añadir o modificar un destino de Kafka:

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Formato de registro para un destino de Kafka

Cuando Lambda envía registros fallidos a un tema de Kafka, cada mensaje contiene metadatos sobre el error y el contenido real del registro.

Metadatos de error

Los metadatos incluyen información sobre el motivo por el que se produjo un error en el registro y detalles sobre el lote original:

{ "requestContext": { "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST", "condition": "RetriesExhausted", "approximateInvokeCount": 3 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T18:16:05.568Z", "KafkaBatchInfo": { "batchSize": 1, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "payloadSize": 1162, "recordInfo": { "offset": "49601189658422359378836298521827638475320189012309704722", "timestamp": "2019-11-14T18:16:04.835Z" } }, "payload": { "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "records": { "my-topic-0": [ { "headers": [], "key": "dGVzdC1rZXk=", "offset": 100, "partition": 0, "timestamp": 1749116692330, "timestampType": "CREATE_TIME", "topic": "my-topic", "value": "dGVzdC12YWx1ZQ==" } ] } } }

Comportamiento de claves de partición

Lambda usa la misma clave de partición del registro original cuando produce para el tema de destino. Si el registro original no tenía ninguna clave, Lambda utiliza la partición por turnos predeterminada de Kafka en todas las particiones disponibles en el tema de destino.

Requisitos y limitaciones

  • Se requiere el modo aprovisionado: un destino en caso de error de Kafka solo está disponible para las asignaciones de orígenes de eventos con el modo aprovisionado habilitado.

  • Solo el mismo clúster: el tema de destino debe estar en el mismo clúster de Kafka que los temas de origen.

  • Permisos de tema: la asignación de orígenes de eventos debe tener permisos de escritura en el tema de destino. Ejemplo:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Sin recursión: el nombre del tema de destino no puede coincidir con ninguno de los nombres del tema de origen.