使用 Kafka 主题作为失败时的目标
您可以将 Kafka 主题配置为 Kafka 事件源映射失败时的目标。当 Lambda 在重试次数用尽后仍无法处理记录,或者当记录的保存时间超过最大期限时,Lambda 会将这些失败的记录发送至指定的 Kafka 主题,以便后续进行处理。
Kafka 失败时的目标的工作原理
当您将 Kafka 主题配置为失败时的目标时,Lambda 将充当 Kafka 生产者,并将失败的记录写入目标主题。这会在您的 Kafka 基础设施中创建死信主题(DLT)模式。
-
相同的集群要求:目标主题必须与源主题位于同一个 Kafka 集群中。
-
实际记录内容:Kafka 目标会收到实际的失败记录以及故障元数据。
-
递归防护:Lambda 通过阻止源主题和目标主题相同的配置来避免无限循环。
配置 Kafka 失败时的目标
在创建或更新 Kafka 事件源映射时,您可以将 Kafka 主题配置为失败时的目标。
配置 Kafka 目标(控制台)
要将 Kafka 主题配置为失败时的目标(控制台)
-
打开 Lamba 控制台的函数
页面。 -
选择您的函数名称。
-
请执行以下操作之一:
-
要添加新的 Kafka 触发器,请在函数概述下选择添加触发器。
-
要修改现有的 Kafka 触发器,请选择该触发器,然后选择编辑。
-
-
在其他设置下,对于失败时的目标,选择 Kafka 主题。
-
对于主题名称,输入要将失败记录发送至的 Kafka 主题的名称。
-
选择添加或保存。
配置 Kafka 目标(AWS CLI)
使用 kafka:// 前缀将 Kafka 主题指定为失败时的目标。
使用 Kafka 目标创建事件源映射
以下示例将 Kafka 主题作为失败时的目标创建了一个 Amazon MSK 事件源映射:
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
对于自行管理的 Kafka,请使用相同的语法:
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
更新 Kafka 目标
使用 update-event-source-mapping 命令添加或修改 Kafka 目标:
aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
Kafka 目标的记录格式
当 Lambda 向 Kafka 主题发送失败的记录时,每条消息都包含有关失败的元数据和实际的记录内容。
失败元数据
元数据包含记录失败的原因以及原始批次的详细信息:
{
"requestContext": {
"requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e",
"functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST",
"condition": "RetriesExhausted",
"approximateInvokeCount": 3
},
"responseContext": {
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2019-11-14T18:16:05.568Z",
"KafkaBatchInfo": {
"batchSize": 1,
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
"bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
"payloadSize": 1162,
"recordInfo": {
"offset": "49601189658422359378836298521827638475320189012309704722",
"timestamp": "2019-11-14T18:16:04.835Z"
}
},
"payload": {
"bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
"records": {
"my-topic-0": [
{
"headers": [],
"key": "dGVzdC1rZXk=",
"offset": 100,
"partition": 0,
"timestamp": 1749116692330,
"timestampType": "CREATE_TIME",
"topic": "my-topic",
"value": "dGVzdC12YWx1ZQ=="
}
]
}
}
}
分区键行为
Lambda 在将数据发送至目标主题时,会使用原始记录中的相同分区键。如果原始记录中没有键,则 Lambda 会使用 Kafka 的默认轮询分区机制,在目标主题的所有可用分区中进行分区。
要求和限制
-
需要预置模式:Kafka 失败时的目标仅适用于启用了预置模式的事件源映射。
-
仅限同一集群:目标主题必须与源主题位于同一个 Kafka 集群中。
-
主题权限:您的事件源映射必须对目标主题具有写入权限。示例:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] } -
无递归:目标主题名称不能与您的任何源主题名称相同。