Kafka 주제를 실패 시 대상으로 사용
Kafka 주제를 Kafka 이벤트 소스 매핑의 실패 시 대상으로 구성할 수 있습니다. Lambda가 재시도 횟수를 소진한 후 레코드를 처리할 수 없거나 레코드가 최대 수명을 초과하는 경우 Lambda는 나중에 처리할 수 있도록 실패한 레코드를 지정된 Kafka 주제로 보냅니다.
Kafka 실패 시 대상 작동 방식
Kafka 주제를 실패 시 대상으로 구성하면 Lambda는 Kafka 생산자 역할을 하고 실패한 레코드를 대상 주제에 기록합니다. 그러면 Kafka 인프라 내에 배달 못한 편지 주제(DLT) 패턴이 생성됩니다.
-
동일한 클러스터 요구 사항 - 대상 주제는 소스 주제와 동일한 Kafka 클러스터에 있어야 합니다.
-
실제 레코드 콘텐츠 - Kafka 대상은 실패 메타데이터와 함께 실제로 실패한 레코드를 수신합니다.
-
재귀 방지 - Lambda는 소스 및 대상 주제가 동일한 구성을 차단하여 무한 루프를 방지합니다.
Kafka 실패 시 대상 구성
Kafka 이벤트 소스 매핑을 생성하거나 업데이트할 때 Kafka 주제를 실패 시 대상으로 구성할 수 있습니다.
Kafka 대상 구성(콘솔)
Kafka 주제를 실패 시 대상으로 구성(콘솔)
-
Lambda 콘솔의 함수 페이지
를 엽니다. -
함수 이름을 선택합니다.
-
다음 중 하나를 수행하세요.
-
새 Kafka 트리거를 추가하려면 함수 개요에서 트리거 추가를 선택합니다.
-
기존 Kafka 트리거를 수정하려면 트리거를 선택하고 편집을 선택합니다.
-
-
추가 설정의 실패 시 대상에서 Kafka 주제를 선택합니다.
-
주제 이름에 실패한 레코드를 전송할 Kafka 주제의 이름을 입력합니다.
-
추가 또는 저장을 선택합니다.
Kafka 대상 구성(AWS CLI)
kafka:// 접두사를 사용하여 Kafka 주제를 실패 시 대상으로 지정합니다.
Kafka 대상을 사용하여 이벤트 소스 매핑 생성
다음 예제에서는 Kafka 주제를 실패 시 대상으로 사용하여 Amazon MSK 이벤트 소스 매핑을 생성합니다.
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
자체 관리형 Kafka의 경우 동일한 구문을 사용합니다.
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
Kafka 대상 업데이트
update-event-source-mapping 명령을 사용하여 Kafka 대상을 추가 또는 수정합니다.
aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --destination-config '{"OnFailure":{"Destination":"kafka://failed-records-topic"}}'
Kafka 대상의 레코드 형식
Lambda가 실패한 레코드를 Kafka 주제로 전송할 때 각 메시지에는 실패에 대한 메타데이터와 실제 레코드 콘텐츠가 모두 포함됩니다.
실패 메타데이터
메타데이터에는 레코드가 실패한 이유에 대한 정보와 원본 배치에 대한 세부 정보가 포함됩니다.
{
"requestContext": {
"requestId": "e4b46cbf-b738-xmpl-8880-a18cdf61200e",
"functionArn": "arn:aws:lambda:us-east-1:123456789012:function:my-function:$LATEST",
"condition": "RetriesExhausted",
"approximateInvokeCount": 3
},
"responseContext": {
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2019-11-14T18:16:05.568Z",
"KafkaBatchInfo": {
"batchSize": 1,
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
"bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
"payloadSize": 1162,
"recordInfo": {
"offset": "49601189658422359378836298521827638475320189012309704722",
"timestamp": "2019-11-14T18:16:04.835Z"
}
},
"payload": {
"bootstrapServers": "b-1.mycluster.abc123.kafka.us-east-1.amazonaws.com:9098",
"eventSource": "aws:kafka",
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123",
"records": {
"my-topic-0": [
{
"headers": [],
"key": "dGVzdC1rZXk=",
"offset": 100,
"partition": 0,
"timestamp": 1749116692330,
"timestampType": "CREATE_TIME",
"topic": "my-topic",
"value": "dGVzdC12YWx1ZQ=="
}
]
}
}
}
파티션 키 동작
Lambda는 대상 주제에 생성할 때 원본 레코드와 동일한 파티션 키를 사용합니다. 원본 레코드에 키가 없는 경우 Lambda는 대상 주제에서 사용 가능한 모든 파티션에 대해 Kafka의 기본 라운드 로빈 파티셔닝을 사용합니다.
요구 사항 및 제한 사항
-
프로비저닝 모드 필요 - Kafka 실패 시 대상은 프로비저닝 모드가 활성화된 이벤트 소스 매핑에만 사용할 수 있습니다.
-
동일한 클러스터만 - 대상 주제는 소스 주제와 동일한 Kafka 클러스터에 있어야 합니다.
-
주제 권한 - 이벤트 소스 매핑에는 대상 주제에 대한 쓰기 권한이 있어야 합니다. 예제:
{ "Version": "2012-10-17", "Statement": [ { "Sid": "ClusterPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeCluster", "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:cluster/*" ] }, { "Sid": "TopicPermissions", "Effect": "Allow", "Action": [ "kafka-cluster:DescribeTopic", "kafka-cluster:WriteData", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:*:*:topic/*/*" ] }, { "Effect": "Allow", "Action": [ "kafka:DescribeCluster", "kafka:GetBootstrapBrokers", "kafka:Produce" ], "Resource": "arn:aws:kafka:*:*:cluster/*" }, { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] } -
재귀 없음 - 대상 주제 이름은 소스 주제 이름과 같을 수 없습니다.