Kafka 주제를 실패 시 대상으로 사용 - AWS Lambda

Kafka 주제를 실패 시 대상으로 사용

Kafka 주제를 Kafka 이벤트 소스 매핑의 실패 시 대상으로 구성할 수 있습니다. Lambda가 재시도 횟수를 소진한 후 레코드를 처리할 수 없거나 레코드가 최대 수명을 초과하는 경우 Lambda는 나중에 처리할 수 있도록 실패한 레코드를 지정된 Kafka 주제로 보냅니다.

Kafka 실패 시 대상 작동 방식

Kafka 주제를 실패 시 대상으로 구성하면 Lambda는 Kafka 생산자 역할을 하고 실패한 레코드를 대상 주제에 기록합니다. 그러면 Kafka 인프라 내에 배달 못한 편지 주제(DLT) 패턴이 생성됩니다.

  • 동일한 클러스터 요구 사항 - 대상 주제는 소스 주제와 동일한 Kafka 클러스터에 있어야 합니다.

  • 실제 레코드 콘텐츠 - Kafka 대상은 실패 메타데이터와 함께 실제로 실패한 레코드를 수신합니다.

  • 재귀 방지 - Lambda는 소스 및 대상 주제가 동일한 구성을 차단하여 무한 루프를 방지합니다.

Kafka 실패 시 대상 구성

Kafka 이벤트 소스 매핑을 생성하거나 업데이트할 때 Kafka 주제를 실패 시 대상으로 구성할 수 있습니다.

Kafka 대상 구성(콘솔)

Kafka 주제를 실패 시 대상으로 구성(콘솔)
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수 이름을 선택합니다.

  3. 다음 중 하나를 수행하세요.

    • 새 Kafka 트리거를 추가하려면 함수 개요에서 트리거 추가를 선택합니다.

    • 기존 Kafka 트리거를 수정하려면 트리거를 선택하고 편집을 선택합니다.

  4. 추가 설정실패 시 대상에서 Kafka 주제를 선택합니다.

  5. 주제 이름에 실패한 레코드를 전송할 Kafka 주제의 이름을 입력합니다.

  6. 추가 또는 저장을 선택합니다.

Kafka 대상 구성(AWS CLI)

kafka:// 접두사를 사용하여 Kafka 주제를 실패 시 대상으로 지정합니다.

Kafka 대상을 사용하여 이벤트 소스 매핑 생성

다음 예제에서는 Kafka 주제를 실패 시 대상으로 사용하여 Amazon MSK 이벤트 소스 매핑을 생성합니다.

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

자체 관리형 Kafka의 경우 동일한 구문을 사용합니다.

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Kafka 대상 업데이트

update-event-source-mapping 명령을 사용하여 Kafka 대상을 추가 또는 수정합니다.

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'

Kafka 대상의 레코드 형식

Lambda가 실패한 레코드를 Kafka 주제로 전송할 때 각 메시지에는 실패에 대한 메타데이터와 실제 레코드 콘텐츠가 모두 포함됩니다.

실패 메타데이터

메타데이터에는 레코드가 실패한 이유에 대한 정보와 원본 배치에 대한 세부 정보가 포함됩니다.

{ "requestContext": { "requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST", "condition": "RetriesExhausted", "approximateInvokeCount": 3 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T18:16:05.568Z", "KafkaBatchInfo": { "batchSize": 1, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "payloadSize": 1162, "recordInfo": { "offset": "49601189658422359378836298521827638475320189012309704722", "timestamp": "2019-11-14T18:16:04.835Z" } }, "payload": { "bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098", "eventSource": "aws:kafka", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123", "records": { "my-topic-0": [ { "headers": [], "key": "dGVzdC1rZXk=", "offset": 100, "partition": 0, "timestamp": 1749116692330, "timestampType": "CREATE_TIME", "topic": "my-topic", "value": "dGVzdC12YWx1ZQ==" } ] } } }

파티션 키 동작

Lambda는 대상 주제에 생성할 때 원본 레코드와 동일한 파티션 키를 사용합니다. 원본 레코드에 키가 없는 경우 Lambda는 대상 주제에서 사용 가능한 모든 파티션에 대해 Kafka의 기본 라운드 로빈 파티셔닝을 사용합니다.

요구 사항 및 제한 사항

  • 프로비저닝 모드 필요 - Kafka 실패 시 대상은 프로비저닝 모드가 활성화된 이벤트 소스 매핑에만 사용할 수 있습니다.

  • 동일한 클러스터만 - 대상 주제는 소스 주제와 동일한 Kafka 클러스터에 있어야 합니다.

  • 주제 권한 - 이벤트 소스 매핑에는 대상 주제에 대한 쓰기 권한이 있어야 합니다. 예제:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 재귀 없음 - 대상 주제 이름은 소스 주제 이름과 같을 수 없습니다.