

# 为 Kafka 事件源配置错误处理控件
<a name="kafka-retry-configurations"></a>

您可以配置 Lambda 为 Kafka 事件源映射处理错误及重试情况的方式。这些配置可帮助您控制 Lambda 如何处理失败的记录和管理重试行为。

## 可用重试配置
<a name="kafka-retry-options"></a>

以下重试配置适用于 Amazon MSK 和自行管理的 Kafka 事件源：
+ **最大重试次数**：您函数返回错误时 Lambda 重试的最大次数。这不包括初始的调用尝试。默认值为 -1（无限）。当您同时配置无限重试和[失败目标](kafka-on-failure-destination.md)时，Lambda 会自动应用最多 10 次重试尝试。
+ **最长记录期限**：Lambda 发送到您的函数的记录的最长期限。默认值为 -1（无限）。
+ **出现错误时拆分批**：当您的函数返回错误时，将批次分成两个较小的批次，然后分别对每个批次进行重试。这有助于隔离有问题的记录。
+ **部分批次响应**：使您的函数能够返回有关批次中哪些记录在处理过程中出现错误的信息，这样 Lambda 就可以仅对那些失败的记录进行重试。

## 配置错误处理控件（控制台）
<a name="kafka-retry-console"></a>

在 Lambda 控制台中创建或更新 Kafka 事件源映射时，您可以配置重试行为。

**要为 Kafka 事件源配置重试行为（控制台）**

1. 打开 Lamba 控制台的[函数](https://console.aws.amazon.com/lambda/home#/functions)页面。

1. 选择您的函数名称。

1. 请执行以下操作之一：
   + 要添加新的 Kafka 触发器，请在**函数概述**下选择**添加触发器**。
   + 要修改现有的 Kafka 触发器，请选择该触发器，然后选择**编辑**。

1. 在**事件轮询器配置**下，选择预置模式以配置错误处理控件：

   1. 对于**重试尝试**，请输入最大重试次数（0-10000，或者 -1 表示无限次）。

   1. 对于**最长记录期限**，输入以秒为单位的最长期限（60-604800，或 -1 表示无限期）。

   1. 要在出现错误时启用批次拆分，请选择**出现错误时拆分批**。

   1. 要启用部分批次响应，请选择 **ReportBatchItemFailures**。

1. 选择**添加**或**保存**。

## 配置重试行为（AWS CLI）
<a name="kafka-retry-cli"></a>

使用以下 AWS CLI 命令为 Kafka 事件源映射配置重试行为。

### 使用重试配置创建事件源映射
<a name="kafka-retry-cli-create"></a>

以下示例创建了一个带有错误处理控件的自行管理 Kafka 事件源映射：

```
aws lambda create-event-source-mapping \
  --function-name my-kafka-function \
  --topics my-kafka-topic \
  --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \
  --starting-position LATEST \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --maximum-retry-attempts 3 \
  --maximum-record-age-in-seconds 3600 \
  --bisect-batch-on-function-error \
  --function-response-types "ReportBatchItemFailures"
```

对于 Amazon MSK 事件源：

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSMSKKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --maximum-retry-attempts 3 \
  --maximum-record-age-in-seconds 3600 \
  --bisect-batch-on-function-error \
  --function-response-types "ReportBatchItemFailures"
```

### 更新重试配置
<a name="kafka-retry-cli-update"></a>

使用 `update-event-source-mapping` 命令修改现有事件源映射的重试配置：

```
aws lambda update-event-source-mapping \
  --uuid 12345678-1234-1234-1234-123456789012 \
  --maximum-retry-attempts 5 \
  --maximum-record-age-in-seconds 7200 \
  --bisect-batch-on-function-error \
  --function-response-types "ReportBatchItemFailures"
```

## PartialBatchResponse
<a name="kafka-partial-batch-response"></a>

部分批次响应，也称为 ReportBatchItemFailures，是 Lambda 与 Kafka 源集成时错误处理的关键功能。如果没有此功能，当批次中的某一项目出现错误时，就会导致需要重新处理该批次中的所有消息。启用并实施部分批次响应功能后，处理程序仅返回失败消息的标识符，从而使得 Lambda 只能重试这些特定的项目。这样可以更好地控制对包含失败消息的批次的处理方式。

要报告批次错误，您将使用以下 JSON 架构：

```
{
  "batchItemFailures": [
    {
      "itemIdentifier": {
        "partition": "topic-partition_number",
        "offset": 100
      }
    },
    ...
  ]
}
```

**重要**  
如果您返回空的有效 JSON 或 null，则事件源映射会将批次视为已成功处理。如果返回的任何无效的 topic-partition\$1number 或偏移量并未包含在调用事件中，则将被视为失败，并且整个批次将重新尝试执行。

以下代码示例显示如何为接收来自 Kafka 源的事件的 Lambda 函数实现部分批次响应。该函数在响应中报告批处理项目失败，并指示 Lambda 稍后重试这些消息。

以下是展示该方法的 Python Lambda 处理程序实现示例：

```
import base64
from typing import Any, Dict, List

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]:
    failures: List[Dict[str, Dict[str, Any]]] = []
    records_dict = event.get("records", {})
    
    for topic_partition, records_list in records_dict.items():
        for record in records_list:
            topic = record.get("topic")
            partition = record.get("partition")
            offset = record.get("offset")
            value_b64 = record.get("value")
            
            try:
                data = base64.b64decode(value_b64).decode("utf-8")
                process_message(data)
            except Exception as exc:
                print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}")
                item_identifier: Dict[str, Any] = {
                    "partition": f"{topic}-{partition}",
                    "offset": int(offset) if offset is not None else None,
                }
                failures.append({"itemIdentifier": item_identifier})
    
    return {"batchItemFailures": failures}

def process_message(data: str) -> None:
    # Your business logic for a single message
    pass
```

以下是 Node.js 版本：

```
const { Buffer } = require("buffer");

const handler = async (event) => {
  const failures = [];
  
  for (let topicPartition in event.records) {
    const records = event.records[topicPartition];
    
    for (const record of records) {
      const topic = record.topic;
      const partition = record.partition;
      const offset = record.offset;
      const valueBase64 = record.value;
      const data = Buffer.from(valueBase64, "base64").toString("utf8");
      
      try {
        await processMessage(data);
      } catch (error) {
        console.error("Failed to process record", { topic, partition, offset, error });
        const itemIdentifier = {
          "partition": `${topic}-${partition}`,
          "offset": Number(offset),
        };
        failures.push({ itemIdentifier });
      }
    }
  }
  
  return { batchItemFailures: failures };
};

async function processMessage(payload) {
  // Your business logic for a single message
}

module.exports = { handler };
```