Kafka 이벤트 소스의 오류 처리 제어 구성 - AWS Lambda

Kafka 이벤트 소스의 오류 처리 제어 구성

Lambda가 오류를 처리하고 Kafka 이벤트 소스 매핑을 재시도하는 방법을 구성할 수 있습니다. 이러한 구성을 통해 Lambda가 실패한 레코드를 처리하고 재시도 동작을 관리하는 방식을 제어할 수 있습니다.

사용 가능한 재시도 구성

Amazon MSK 및 자체 관리형 Kafka 이벤트 소스 모두에 다음 재시도 구성을 사용할 수 있습니다.

  • 최대 재시도 횟수 - 함수가 오류를 반환할 때 Lambda에서 재시도하는 최대 횟수입니다. 최초 간접 호출 시도는 포함되지 않습니다. 기본값은 -1(무제한)입니다.

  • 최대 레코드 사용 기간 – Lambda에서 함수로 보내는 최대 레코드 사용 기간입니다. 기본값은 -1(무제한)입니다.

  • 오류 시 배치 분할 - 함수에서 오류가 반환되면 배치를 두 개의 작은 배치로 분할하고 각각을 개별적으로 다시 시도합니다. 문제가 있는 레코드를 격리하는 데 도움이 됩니다.

  • 부분 배치 응답 - Lambda가 실패한 레코드만 재시도할 수 있도록 함수에서 배치 처리 실패 레코드 관련 정보 반환을 허용합니다.

오류 처리 제어 구성(콘솔)

Lambda 콘솔에서 Kafka 이벤트 소스 매핑을 생성하거나 업데이트할 때 재시도 동작을 구성할 수 있습니다.

Kafka 이벤트 소스의 재시도 동작 구성(콘솔)
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수 이름을 선택합니다.

  3. 다음 중 하나를 수행하세요.

    • 새 Kafka 트리거를 추가하려면 함수 개요에서 트리거 추가를 선택합니다.

    • 기존 Kafka 트리거를 수정하려면 트리거를 선택하고 편집을 선택합니다.

  4. 이벤트 폴러 구성에서 프로비저닝 모드를 선택하여 오류 처리 제어를 구성합니다.

    1. 재시도 시도 횟수에 최대 재시도 횟수(0~10,000 또는 무제한으로 시도하려면 -1)를 입력합니다.

    2. 최대 레코드 사용 기간에 최대 사용 기간을 초 단위로 입력합니다(60~604,800 또는 무제한의 경우 -1).

    3. 오류가 발생할 때 배치 분할을 활성화하려면 오류 시 배치 분할을 선택합니다.

    4. 부분 배치 응답을 활성화하려면 ReportBatchItemFailures를 선택합니다.

  5. 추가 또는 저장을 선택합니다.

재시도 동작 구성(AWS CLI)

다음 AWS CLI 명령을 사용하여 Kafka 이벤트 소스 매핑의 재시도 동작을 구성합니다.

재시도 구성 포함 이벤트 소스 매핑 생성

다음 예시에서는 오류 처리 제어가 포함된 자체 관리형 Kafka 이벤트 소스 매핑을 생성합니다.

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics my-kafka-topic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"

Amazon MSK 이벤트 소스의 경우는 다음과 같습니다.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSMSKKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"

재시도 구성 업데이트

update-event-source-mapping 명령을 사용하여 기존 이벤트 소스 매핑의 재시도 구성을 수정합니다.

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --maximum-retry-attempts 5 \ --maximum-record-age-in-seconds 7200 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"

PartialBatchResponse

ReportBatchItemFailures라고도 하는 부분 배치 응답은 Lambda와 Kafka 소스의 통합에서 오류를 처리하는 주요 기능입니다. 이 기능이 없으면 배치의 항목 중 하나에서 오류가 발생할 때 해당 배치의 모든 메시지가 재처리됩니다. 부분 배치 응답이 활성화 및 구현된 경우 핸들러는 실패한 메시지의 식별자만 반환하므로 Lambda는 해당되는 특정 메시지만 재시도할 수 있습니다. 이를 통해 실패한 메시지가 포함된 배치가 처리되는 방식을 더 효과적으로 제어할 수 있습니다.

배치 오류를 보고하려면 다음 JSON 스키마를 사용합니다.

{ "batchItemFailures": [ { "itemIdentifier": { "partition": "topic-partition_number", "offset": 100 } }, ... ] }
중요

유효한 빈 JSON 또는 null을 반환하면 이벤트 소스 매핑은 배치가 성공적으로 처리되었다고 간주합니다. 간접 호출된 이벤트에 존재하지 않은 잘못된 topic-partition_number 또는 offset이 반환되면 실패로 처리되고 전체 배치를 재시도합니다.

다음 코드 예제에서는 Kafka 소스에서 이벤트를 수신하는 Lambda 함수에 대한 부분 배치 응답을 구현하는 방법을 보여줍니다. 이 함수는 응답으로 배치 항목 실패를 보고하고 나중에 해당 메시지를 다시 시도하도록 Lambda에 신호를 보냅니다.

다음은 이 접근 방식을 보여주는 Python Lambda 핸들러 구현입니다.

import base64 from typing import Any, Dict, List def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]: failures: List[Dict[str, Dict[str, Any]]] = [] records_dict = event.get("records", {}) for topic_partition, records_list in records_dict.items(): for record in records_list: topic = record.get("topic") partition = record.get("partition") offset = record.get("offset") value_b64 = record.get("value") try: data = base64.b64decode(value_b64).decode("utf-8") process_message(data) except Exception as exc: print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}") item_identifier: Dict[str, Any] = { "partition": f"{topic}-{partition}", "offset": int(offset) if offset is not None else None, } failures.append({"itemIdentifier": item_identifier}) return {"batchItemFailures": failures} def process_message(data: str) -> None: # Your business logic for a single message pass

다음은 Node.js 버전입니다.

const { Buffer } = require("buffer"); const handler = async (event) => { const failures = []; for (let topicPartition in event.records) { const records = event.records[topicPartition]; for (const record of records) { const topic = record.topic; const partition = record.partition; const offset = record.offset; const valueBase64 = record.value; const data = Buffer.from(valueBase64, "base64").toString("utf8"); try { await processMessage(data); } catch (error) { console.error("Failed to process record", { topic, partition, offset, error }); const itemIdentifier = { "partition": `${topic}-${partition}`, "offset": Number(offset), }; failures.push({ itemIdentifier }); } } } return { batchItemFailures: failures }; }; async function processMessage(payload) { // Your business logic for a single message } module.exports = { handler };