

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Kafka 主題做為失敗時的目的地
<a name="kafka-on-failure-destination"></a>

您可以將 Kafka 主題設定為 Kafka 事件來源映射的失敗時目的地。當 Lambda 在耗盡重試嘗試後或記錄超過最長存留期後無法處理記錄時，Lambda 會將失敗的記錄傳送至指定的 Kafka 主題，以供稍後處理。當您同時設定[無限次重試](kafka-retry-configurations.md)和失敗時的目的地時，Lambda 會自動套用最多 10 次重試嘗試。

## Kafka 失敗時目的地的運作方式
<a name="kafka-ofd-overview"></a>

當您將 Kafka 主題設定為失敗時的目的地時，Lambda 會充當 Kafka 生產者，並將失敗的記錄寫入目的地主題。這會在 Kafka 基礎設施內建立無效字母主題 (DLT) 模式。
+ **相同的叢集需求** – 目的地主題必須與來源主題存在於相同的 Kafka 叢集中。
+ **實際記錄內容** – Kafka 目的地會收到實際失敗的記錄以及失敗中繼資料。
+ **防止遞迴** – Lambda 透過封鎖來源和目的地主題相同的組態來防止無限迴圈。

## 設定 Kafka 失敗時的目的地
<a name="kafka-ofd-configure"></a>

您可以在建立或更新 Kafka 事件來源映射時，將 Kafka 主題設定為失敗時的目的地。

### 設定 Kafka 目的地 （主控台）
<a name="kafka-ofd-console"></a>

**將 Kafka 主題設定為失敗時的目的地 （主控台）**

1. 開啟 Lambda 主控台中的[函數頁面](https://console.aws.amazon.com/lambda/home#/functions)。

1. 選擇您的函數名稱。

1. 執行以下任意一項：
   + 若要新增新的 Kafka 觸發條件，請在**函數概觀**下，選擇**新增觸發條件**。
   + 若要修改現有的 Kafka 觸發條件，請選擇觸發條件，然後選擇**編輯**。

1. 在**其他設定**下，針對**失敗目的地**，選擇 **Kafka 主題**。

1. 針對**主題名稱**，輸入您要傳送失敗記錄的 Kafka 主題名稱。

1. 選擇**新增**或**儲存**。

### 設定 Kafka 目的地 (AWS CLI)
<a name="kafka-ofd-cli"></a>

使用 `kafka://`字首將 Kafka 主題指定為失敗時的目的地。

#### 使用 Kafka 目的地建立事件來源映射
<a name="kafka-ofd-cli-create"></a>

下列範例會建立 Amazon MSK 事件來源映射，並以 Kafka 主題做為失敗時的目的地：

```
aws lambda create-event-source-mapping \
  --function-name my-kafka-function \
  --topics AWSKafkaTopic \
  --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \
  --starting-position LATEST \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \
  --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
```

對於自我管理的 Kafka，請使用相同的語法：

```
aws lambda create-event-source-mapping \
  --function-name my-kafka-function \
  --topics AWSKafkaTopic \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \
  --starting-position LATEST \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \
  --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
```

#### 更新 Kafka 目的地
<a name="kafka-ofd-cli-update"></a>

使用 `update-event-source-mapping`命令來新增或修改 Kafka 目的地：

```
aws lambda update-event-source-mapping \
  --uuid 12345678-1234-1234-1234-123456789012 \
  --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
```

## Kafka 目的地的記錄格式
<a name="kafka-ofd-record-format"></a>

當 Lambda 將失敗的記錄傳送到 Kafka 主題時，每個訊息都會包含有關失敗和實際記錄內容的中繼資料。

### 失敗中繼資料
<a name="kafka-ofd-metadata"></a>

中繼資料包含記錄失敗原因的相關資訊，以及原始批次的詳細資訊：

```
{
  "requestContext": {
    "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e",
    "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST",
    "condition": "RetriesExhausted",
    "approximateInvokeCount": 3
  },
  "responseContext": {
    "statusCode": 200,
    "executedVersion": "$LATEST",
    "functionError": "Unhandled"
  },
  "version": "1.0",
  "timestamp": "2019-11-14T18:16:05.568Z",
  "KafkaBatchInfo": {
    "batchSize": 1,
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
    "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
    "payloadSize": 1162,
    "recordInfo": {
      "offset": "49601189658422359378836298521827638475320189012309704722",
      "timestamp": "2019-11-14T18:16:04.835Z"
    }
  },
  "payload": {
    "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
    "eventSource": "aws:kafka",
    "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
    "records": {
      "my-topic-0": [
        {
          "headers": [],
          "key": "dGVzdC1rZXk=",
          "offset": 100,
          "partition": 0,
          "timestamp": 1749116692330,
          "timestampType": "CREATE_TIME",
          "topic": "my-topic",
          "value": "dGVzdC12YWx1ZQ=="
        }
      ]
    }
  }
}
```

### 分割區索引鍵行為
<a name="kafka-ofd-partitioning"></a>

在產生目的地主題時，Lambda 會使用原始記錄中的相同分割區索引鍵。如果原始記錄沒有金鑰，Lambda 會在目的地主題中的所有可用分割區中使用 Kafka 的預設循環配置分割。

## 要求與限制
<a name="kafka-ofd-requirements"></a>
+ **需要佈建模式** – Kafka 故障時目的地僅適用於啟用佈建模式的事件來源映射。
+ **僅限相同叢集** – 目的地主題必須與來源主題存在於相同的 Kafka 叢集中。
+ **主題許可** – 您的事件來源映射必須具有目的地主題的寫入許可。範例：

  ```
  {
      "Version": "2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "ClusterPermissions",
              "Effect": "Allow",
              "Action": [
                  "kafka-cluster:Connect",
                  "kafka-cluster:DescribeCluster",
                  "kafka-cluster:DescribeTopic",
                  "kafka-cluster:WriteData",
                  "kafka-cluster:ReadData"
              ],
              "Resource": [
                  "arn:aws:kafka:*:*:cluster/*"
              ]
          },
          {
              "Sid": "TopicPermissions",
              "Effect": "Allow",
              "Action": [
                  "kafka-cluster:DescribeTopic",
                  "kafka-cluster:WriteData",
                  "kafka-cluster:ReadData"
              ],
              "Resource": [
                  "arn:aws:kafka:*:*:topic/*/*"
              ]
          },
          {
              "Effect": "Allow",
              "Action": [
                  "kafka:DescribeCluster",
                  "kafka:GetBootstrapBrokers",
                  "kafka:Produce"
              ],
              "Resource": "arn:aws:kafka:*:*:cluster/*"
          },
          {
              "Effect": "Allow",
              "Action": [
                  "ec2:CreateNetworkInterface",
                  "ec2:DescribeNetworkInterfaces",
                  "ec2:DeleteNetworkInterface",
                  "ec2:DescribeSubnets",
                  "ec2:DescribeSecurityGroups"
              ],
              "Resource": "*"
          }
      ]
  }
  ```
+ **無遞迴** – 目的地主題名稱不能與來源主題名稱相同。