

# 使用 Kafka 主题作为失败时的目标
<a name="kafka-on-failure-destination"></a>

您可以将 Kafka 主题配置为 Kafka 事件源映射失败时的目标。当 Lambda 在重试次数用尽后仍无法处理记录，或者当记录的保存时间超过最大期限时，Lambda 会将这些失败的记录发送至指定的 Kafka 主题，以便后续进行处理。当您同时配置[无限重试](kafka-retry-configurations.md)和失败目标时，Lambda 会自动应用最多 10 次重试尝试。

## Kafka 失败时的目标的工作原理
<a name="kafka-ofd-overview"></a>

当您将 Kafka 主题配置为失败时的目标时，Lambda 将充当 Kafka 生产者，并将失败的记录写入目标主题。这会在您的 Kafka 基础设施中创建死信主题（DLT）模式。
+ **相同的集群要求**：目标主题必须与源主题位于同一个 Kafka 集群中。
+ **实际记录内容**：Kafka 目标会收到实际的失败记录以及故障元数据。
+ **递归防护**：Lambda 通过阻止源主题和目标主题相同的配置来避免无限循环。

## 配置 Kafka 失败时的目标
<a name="kafka-ofd-configure"></a>

在创建或更新 Kafka 事件源映射时，您可以将 Kafka 主题配置为失败时的目标。

### 配置 Kafka 目标（控制台）
<a name="kafka-ofd-console"></a>

**要将 Kafka 主题配置为失败时的目标（控制台）**

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择您的函数名称。

1. 请执行以下操作之一：
   + 要添加新的 Kafka 触发器，请在**函数概述**下选择**添加触发器**。
   + 要修改现有的 Kafka 触发器，请选择该触发器，然后选择**编辑**。

1. 在**其他设置**下，对于**失败时的目标**，选择 **Kafka 主题**。

1. 对于**主题名称**，输入要将失败记录发送至的 Kafka 主题的名称。

1. 选择**添加**或**保存**。

### 配置 Kafka 目标（AWS CLI）
<a name="kafka-ofd-cli"></a>

使用 `kafka://` 前缀将 Kafka 主题指定为失败时的目标。

#### 使用 Kafka 目标创建事件源映射
<a name="kafka-ofd-cli-create"></a>

以下示例将 Kafka 主题作为失败时的目标创建了一个 Amazon MSK 事件源映射：

```
aws lambda create-event-source-mapping \
  --function-name my-kafka-function \
  --topics AWSKafkaTopic \
  --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \
  --starting-position LATEST \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \
  --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
```

对于自行管理的 Kafka，请使用相同的语法：

```
aws lambda create-event-source-mapping \
  --function-name my-kafka-function \
  --topics AWSKafkaTopic \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \
  --starting-position LATEST \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \
  --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
```

#### 更新 Kafka 目标
<a name="kafka-ofd-cli-update"></a>

使用 `update-event-source-mapping` 命令添加或修改 Kafka 目标：

```
aws lambda update-event-source-mapping \
  --uuid 12345678-1234-1234-1234-123456789012 \
  --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
```

## Kafka 目标的记录格式
<a name="kafka-ofd-record-format"></a>

当 Lambda 向 Kafka 主题发送失败的记录时，每条消息都包含有关失败的元数据和实际的记录内容。

### 失败元数据
<a name="kafka-ofd-metadata"></a>

元数据包含记录失败的原因以及原始批次的详细信息：

```
{
  "requestContext": {
    "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e",
    "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST",
    "condition": "RetriesExhausted",
    "approximateInvokeCount": 3
  },
  "responseContext": {
    "statusCode": 200,
    "executedVersion": "$LATEST",
    "functionError": "Unhandled"
  },
  "version": "1.0",
  "timestamp": "2019-11-14T18:16:05.568Z",
  "KafkaBatchInfo": {
    "batchSize": 1,
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
    "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
    "payloadSize": 1162,
    "recordInfo": {
      "offset": "49601189658422359378836298521827638475320189012309704722",
      "timestamp": "2019-11-14T18:16:04.835Z"
    }
  },
  "payload": {
    "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
    "records": {
      "my-topic-0": [
        {
          "headers": [],
          "key": "dGVzdC1rZXk=",
          "offset": 100,
          "partition": 0,
          "timestamp": 1749116692330,
          "timestampType": "CREATE_TIME",
          "topic": "my-topic",
          "value": "dGVzdC12YWx1ZQ=="
        }
      ]
    }
  }
}
```

### 分区键行为
<a name="kafka-ofd-partitioning"></a>

Lambda 在将数据发送至目标主题时，会使用原始记录中的相同分区键。如果原始记录中没有键，则 Lambda 会使用 Kafka 的默认轮询分区机制，在目标主题的所有可用分区中进行分区。

## 要求和限制
<a name="kafka-ofd-requirements"></a>
+ **需要预置模式**：Kafka 失败时的目标仅适用于启用了预置模式的事件源映射。
+ **仅限同一集群**：目标主题必须与源主题位于同一个 Kafka 集群中。
+ **主题权限**：您的事件源映射必须对目标主题具有写入权限。示例：

  ```
  {
      "Version": "2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "ClusterPermissions",
              "Effect": "Allow",
              "Action": [
                  "kafka-cluster:Connect",
                  "kafka-cluster:DescribeCluster",
                  "kafka-cluster:DescribeTopic",
                  "kafka-cluster:WriteData",
                  "kafka-cluster:ReadData"
              ],
              "Resource": [
                  "arn:aws:kafka:*:*:cluster/*"
              ]
          },
          {
              "Sid": "TopicPermissions",
              "Effect": "Allow",
              "Action": [
                  "kafka-cluster:DescribeTopic",
                  "kafka-cluster:WriteData",
                  "kafka-cluster:ReadData"
              ],
              "Resource": [
                  "arn:aws:kafka:*:*:topic/*/*"
              ]
          },
          {
              "Effect": "Allow",
              "Action": [
                  "kafka:DescribeCluster",
                  "kafka:GetBootstrapBrokers",
                  "kafka:Produce"
              ],
              "Resource": "arn:aws:kafka:*:*:cluster/*"
          },
          {
              "Effect": "Allow",
              "Action": [
                  "ec2:CreateNetworkInterface",
                  "ec2:DescribeNetworkInterfaces",
                  "ec2:DeleteNetworkInterface",
                  "ec2:DescribeSubnets",
                  "ec2:DescribeSecurityGroups"
              ],
              "Resource": "*"
          }
      ]
  }
  ```
+ **无递归**：目标主题名称不能与您的任何源主题名称相同。