使用 Kafka 主题作为失败时的目标 - AWS Lambda

使用 Kafka 主题作为失败时的目标

您可以将 Kafka 主题配置为 Kafka 事件源映射失败时的目标。当 Lambda 在重试次数用尽后仍无法处理记录,或者当记录的保存时间超过最大期限时,Lambda 会将这些失败的记录发送至指定的 Kafka 主题,以便后续进行处理。

Kafka 失败时的目标的工作原理

当您将 Kafka 主题配置为失败时的目标时,Lambda 将充当 Kafka 生产者,并将失败的记录写入目标主题。这会在您的 Kafka 基础设施中创建死信主题(DLT)模式。

  • 相同的集群要求:目标主题必须与源主题位于同一个 Kafka 集群中。

  • 实际记录内容:Kafka 目标会收到实际的失败记录以及故障元数据。

  • 递归防护:Lambda 通过阻止源主题和目标主题相同的配置来避免无限循环。

配置 Kafka 失败时的目标

在创建或更新 Kafka 事件源映射时,您可以将 Kafka 主题配置为失败时的目标。

配置 Kafka 目标(控制台)

要将 Kafka 主题配置为失败时的目标(控制台)
  1. 打开 Lamba 控制台的函数页面。

  2. 选择您的函数名称。

  3. 请执行以下操作之一:

    • 要添加新的 Kafka 触发器,请在函数概述下选择添加触发器

    • 要修改现有的 Kafka 触发器,请选择该触发器,然后选择编辑

  4. 其他设置下,对于失败时的目标,选择 Kafka 主题

  5. 对于主题名称,输入要将失败记录发送至的 Kafka 主题的名称。

  6. 选择添加保存

配置 Kafka 目标(AWS CLI)

使用 kafka:// 前缀将 Kafka 主题指定为失败时的目标。

使用 Kafka 目标创建事件源映射

以下示例将 Kafka 主题作为失败时的目标创建了一个 Amazon MSK 事件源映射:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

对于自行管理的 Kafka,请使用相同的语法:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

更新 Kafka 目标

使用 update-event-source-mapping 命令添加或修改 Kafka 目标:

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Kafka 目标的记录格式

当 Lambda 向 Kafka 主题发送失败的记录时,每条消息都包含有关失败的元数据和实际的记录内容。

失败元数据

元数据包含记录失败的原因以及原始批次的详细信息:

{ "requestContext": { "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST", "condition": "RetriesExhausted", "approximateInvokeCount": 3 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T18:16:05.568Z", "KafkaBatchInfo": { "batchSize": 1, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "payloadSize": 1162, "recordInfo": { "offset": "49601189658422359378836298521827638475320189012309704722", "timestamp": "2019-11-14T18:16:04.835Z" } }, "payload": { "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "records": { "my-topic-0": [ { "headers": [], "key": "dGVzdC1rZXk=", "offset": 100, "partition": 0, "timestamp": 1749116692330, "timestampType": "CREATE_TIME", "topic": "my-topic", "value": "dGVzdC12YWx1ZQ==" } ] } } }

分区键行为

Lambda 在将数据发送至目标主题时,会使用原始记录中的相同分区键。如果原始记录中没有键,则 Lambda 会使用 Kafka 的默认轮询分区机制,在目标主题的所有可用分区中进行分区。

要求和限制

  • 需要预置模式:Kafka 失败时的目标仅适用于启用了预置模式的事件源映射。

  • 仅限同一集群:目标主题必须与源主题位于同一个 Kafka 集群中。

  • 主题权限:您的事件源映射必须对目标主题具有写入权限。示例:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 无递归:目标主题名称不能与您的任何源主题名称相同。