

# Kafka 이벤트 소스의 오류 처리 제어 구성
<a name="kafka-retry-configurations"></a>

Lambda가 오류를 처리하고 Kafka 이벤트 소스 매핑을 재시도하는 방법을 구성할 수 있습니다. 이러한 구성을 통해 Lambda가 실패한 레코드를 처리하고 재시도 동작을 관리하는 방식을 제어할 수 있습니다.

## 사용 가능한 재시도 구성
<a name="kafka-retry-options"></a>

Amazon MSK 및 자체 관리형 Kafka 이벤트 소스 모두에 다음 재시도 구성을 사용할 수 있습니다.
+ **최대 재시도 횟수** - 함수가 오류를 반환할 때 Lambda에서 재시도하는 최대 횟수입니다. 최초 간접 호출 시도는 포함되지 않습니다. 기본값은 -1(무제한)입니다. 무한 재시도와 [실패 시 대상](kafka-on-failure-destination.md)을 모두 구성하면 Lambda에서는 자동으로 최대 10회의 재시도를 적용합니다.
+ **최대 레코드 사용 기간** – Lambda에서 함수로 보내는 최대 레코드 사용 기간입니다. 기본값은 -1(무제한)입니다.
+ **오류 시 배치 분할** - 함수에서 오류가 반환되면 배치를 두 개의 작은 배치로 분할하고 각각을 개별적으로 다시 시도합니다. 문제가 있는 레코드를 격리하는 데 도움이 됩니다.
+ **부분 배치 응답** - Lambda가 실패한 레코드만 재시도할 수 있도록 함수에서 배치 처리 실패 레코드 관련 정보 반환을 허용합니다.

## 오류 처리 제어 구성(콘솔)
<a name="kafka-retry-console"></a>

Lambda 콘솔에서 Kafka 이벤트 소스 매핑을 생성하거나 업데이트할 때 재시도 동작을 구성할 수 있습니다.

**Kafka 이벤트 소스의 재시도 동작 구성(콘솔)**

1. Lambda 콘솔의 [함수 페이지](https://console.aws.amazon.com/lambda/home#/functions)를 엽니다.

1. 함수 이름을 선택합니다.

1. 다음 중 하나를 수행하세요.
   + 새 Kafka 트리거를 추가하려면 **함수 개요**에서 **트리거 추가**를 선택합니다.
   + 기존 Kafka 트리거를 수정하려면 트리거를 선택하고 **편집**을 선택합니다.

1. **이벤트 폴러 구성**에서 프로비저닝 모드를 선택하여 오류 처리 제어를 구성합니다.

   1. **재시도 시도 횟수**에 최대 재시도 횟수(0\$110,000 또는 무제한으로 시도하려면 -1)를 입력합니다.

   1. **최대 레코드 사용 기간**에 최대 사용 기간을 초 단위로 입력합니다(60\$1604,800 또는 무제한의 경우 -1).

   1. 오류가 발생할 때 배치 분할을 활성화하려면 **오류 시 배치 분할**을 선택합니다.

   1. 부분 배치 응답을 활성화하려면 **ReportBatchItemFailures**를 선택합니다.

1. **추가** 또는 **저장**을 선택합니다.

## 재시도 동작 구성(AWS CLI)
<a name="kafka-retry-cli"></a>

다음 AWS CLI 명령을 사용하여 Kafka 이벤트 소스 매핑의 재시도 동작을 구성합니다.

### 재시도 구성 포함 이벤트 소스 매핑 생성
<a name="kafka-retry-cli-create"></a>

다음 예시에서는 오류 처리 제어가 포함된 자체 관리형 Kafka 이벤트 소스 매핑을 생성합니다.

```
aws lambda create-event-source-mapping \
  --function-name my-kafka-function \
  --topics my-kafka-topic \
  --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \
  --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \
  --starting-position LATEST \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --maximum-retry-attempts 3 \
  --maximum-record-age-in-seconds 3600 \
  --bisect-batch-on-function-error \
  --function-response-types "ReportBatchItemFailures"
```

Amazon MSK 이벤트 소스의 경우는 다음과 같습니다.

```
aws lambda create-event-source-mapping \
  --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \
  --topics AWSMSKKafkaTopic \
  --starting-position LATEST \
  --function-name my-kafka-function \
  --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \
  --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \
  --maximum-retry-attempts 3 \
  --maximum-record-age-in-seconds 3600 \
  --bisect-batch-on-function-error \
  --function-response-types "ReportBatchItemFailures"
```

### 재시도 구성 업데이트
<a name="kafka-retry-cli-update"></a>

`update-event-source-mapping` 명령을 사용하여 기존 이벤트 소스 매핑의 재시도 구성을 수정합니다.

```
aws lambda update-event-source-mapping \
  --uuid 12345678-1234-1234-1234-123456789012 \
  --maximum-retry-attempts 5 \
  --maximum-record-age-in-seconds 7200 \
  --bisect-batch-on-function-error \
  --function-response-types "ReportBatchItemFailures"
```

## PartialBatchResponse
<a name="kafka-partial-batch-response"></a>

ReportBatchItemFailures라고도 하는 부분 배치 응답은 Lambda와 Kafka 소스의 통합에서 오류를 처리하는 주요 기능입니다. 이 기능이 없으면 배치의 항목 중 하나에서 오류가 발생할 때 해당 배치의 모든 메시지가 재처리됩니다. 부분 배치 응답이 활성화 및 구현된 경우 핸들러는 실패한 메시지의 식별자만 반환하므로 Lambda는 해당되는 특정 메시지만 재시도할 수 있습니다. 이를 통해 실패한 메시지가 포함된 배치가 처리되는 방식을 더 효과적으로 제어할 수 있습니다.

배치 오류를 보고하려면 다음 JSON 스키마를 사용합니다.

```
{
  "batchItemFailures": [
    {
      "itemIdentifier": {
        "partition": "topic-partition_number",
        "offset": 100
      }
    },
    ...
  ]
}
```

**중요**  
유효한 빈 JSON 또는 null을 반환하면 이벤트 소스 매핑은 배치가 성공적으로 처리되었다고 간주합니다. 간접 호출된 이벤트에 존재하지 않은 잘못된 topic-partition\$1number 또는 offset이 반환되면 실패로 처리되고 전체 배치를 재시도합니다.

다음 코드 예제에서는 Kafka 소스에서 이벤트를 수신하는 Lambda 함수에 대한 부분 배치 응답을 구현하는 방법을 보여줍니다. 이 함수는 응답으로 배치 항목 실패를 보고하고 나중에 해당 메시지를 다시 시도하도록 Lambda에 신호를 보냅니다.

다음은 이 접근 방식을 보여주는 Python Lambda 핸들러 구현입니다.

```
import base64
from typing import Any, Dict, List

def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]:
    failures: List[Dict[str, Dict[str, Any]]] = []
    records_dict = event.get("records", {})
    
    for topic_partition, records_list in records_dict.items():
        for record in records_list:
            topic = record.get("topic")
            partition = record.get("partition")
            offset = record.get("offset")
            value_b64 = record.get("value")
            
            try:
                data = base64.b64decode(value_b64).decode("utf-8")
                process_message(data)
            except Exception as exc:
                print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}")
                item_identifier: Dict[str, Any] = {
                    "partition": f"{topic}-{partition}",
                    "offset": int(offset) if offset is not None else None,
                }
                failures.append({"itemIdentifier": item_identifier})
    
    return {"batchItemFailures": failures}

def process_message(data: str) -> None:
    # Your business logic for a single message
    pass
```

다음은 Node.js 버전입니다.

```
const { Buffer } = require("buffer");

const handler = async (event) => {
  const failures = [];
  
  for (let topicPartition in event.records) {
    const records = event.records[topicPartition];
    
    for (const record of records) {
      const topic = record.topic;
      const partition = record.partition;
      const offset = record.offset;
      const valueBase64 = record.value;
      const data = Buffer.from(valueBase64, "base64").toString("utf8");
      
      try {
        await processMessage(data);
      } catch (error) {
        console.error("Failed to process record", { topic, partition, offset, error });
        const itemIdentifier = {
          "partition": `${topic}-${partition}`,
          "offset": Number(offset),
        };
        failures.push({ itemIdentifier });
      }
    }
  }
  
  return { batchItemFailures: failures };
};

async function processMessage(payload) {
  // Your business logic for a single message
}

module.exports = { handler };
```