Uso de um tópico do Kafka como destino em caso de falha
É possível configurar um tópico do Kafka como um destino do Kafka em caso de falha para os seus mapeamentos da origem do evento do Kafka. Quando o Lambda não consegue processar registros após exaurir as novas tentativas ou quando os registros excedem a idade máxima, o Lambda envia os registros com falha para o tópico especificado do Kafka para processamento posterior.
Como funciona um destino do Kafka em caso de falha
Quando você configura um tópico do Kafka como um destino em caso de falha, o Lambda atua como produtor do Kafka e grava registros com falha no tópico de destino. Isso cria um padrão de tópico de mensagem não entregue (DLT) em sua infraestrutura do Kafka.
-
Mesmo requisito do cluster: o tópico de destino deve existir no mesmo cluster do Kafka que seus tópicos de origem.
-
Conteúdo real do registro: os destinos do Kafka recebem os registros reais com falha junto com os metadados da falha.
-
Prevenção de recursão: o Lambda evita loops infinitos bloqueando configurações em que os tópicos de origem e destino são os mesmos.
Configuração de um destino do Kafka em caso de falha
É possível configurar um tópico do Kafka como um destino em caso de falha ao criar ou atualizar um mapeamento da origem do evento do Kafka.
Configuração de um destino do Kafka (console)
Para configurar um tópico do Kafka como um destino em caso de falha (console)
-
Abra a página Funções
do console do Lambda. -
Escolha o nome da sua função.
-
Execute um destes procedimentos:
-
Para adicionar um novo acionador do Kafka, em Visão geral da função, escolha Adicionar acionador.
-
Para modificar um acionador existente do Kafka, escolha o acionador e, em seguida, escolha Editar.
-
-
Em Configurações adicionais, para Destino em caso de falha, escolha Tópico do Kafka.
-
Em Nome do tópico, insira o nome do tópico do Kafka para o qual os registros que falharem serão enviados.
-
Escolha Adicionar ou Salvar.
Configuração de um destino do Kafka (AWS CLI)
Use o prefixo kafka:// para especificar um tópico do Kafka como destino em caso de falha.
Criação de um mapeamento da origem do evento com destino do Kafka
O exemplo a seguir cria um mapeamento da origem do evento do Amazon MSK com um tópico do Kafka como destino em caso de falha:
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
Para o Kafka autogerenciado, use a mesma sintaxe:
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
Atualização de um destino do Kafka
Use o comando update-event-source-mapping para adicionar ou modificar um destino do Kafka:
aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
Formato de registro para um destino do Kafka
Quando o Lambda envia registros com falha para um tópico do Kafka, cada mensagem contém metadados sobre a falha e o conteúdo real do registro.
Metadados com falha
Os metadados incluem informações sobre por que o registro falhou e detalhes sobre o lote original:
{
"requestContext": {
"requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e",
"functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST",
"condition": "RetriesExhausted",
"approximateInvokeCount": 3
},
"responseContext": {
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2019-11-14T18:16:05.568Z",
"KafkaBatchInfo": {
"batchSize": 1,
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
"bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
"payloadSize": 1162,
"recordInfo": {
"offset": "49601189658422359378836298521827638475320189012309704722",
"timestamp": "2019-11-14T18:16:04.835Z"
}
},
"payload": {
"bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
"records": {
"my-topic-0": [
{
"headers": [],
"key": "dGVzdC1rZXk=",
"offset": 100,
"partition": 0,
"timestamp": 1749116692330,
"timestampType": "CREATE_TIME",
"topic": "my-topic",
"value": "dGVzdC12YWx1ZQ=="
}
]
}
}
}
Comportamento das chaves de partição
O Lambda usa a mesma chave de partição do registro original ao produzir para o tópico de destino. Se o registro original não tiver chave, o Lambda usará o particionamento round-robin padrão do Kafka em todas as partições disponíveis no tópico de destino.
Requisitos e limitações
-
Modo provisionado obrigatório: um destino em caso de falha do Kafka só está disponível para mapeamentos da origem do evento com o modo provisionado habilitado.
-
Mesmo cluster apenas: o tópico de destino deve existir no mesmo cluster do Kafka que seus tópicos de origem.
-
Permissões de tópico: o mapeamento da origem do evento deve ter permissões de gravação no tópico de destino. Exemplo:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] } -
Sem recursão: o nome do tópico de destino não pode ser o mesmo de nenhum dos nomes dos tópicos de origem.