Lambda를 사용해 Amazon Kinesis Data Streams의 레코드를 처리 - AWS Lambda

Lambda를 사용해 Amazon Kinesis Data Streams의 레코드를 처리

Lambda 함수를 사용하여 Amazon Kinesis 데이터 스트림의 레코드를 처리할 수 있습니다. Lambda 함수를 Kinesis Data Streams 공유 처리량 소비자(표준 반복기)에 매핑하거나 향상된 팬 아웃 기능이 있는 전용 처리량 소비자에 매핑할 수 있습니다. 표준 반복기의 경우 Lambda는 HTTP 프로토콜을 사용하여 Kinesis 스트림의 각 샤드를 폴링합니다. 이벤트 소스 매핑은 샤드의 다른 소비자와 읽기 처리량을 공유합니다.

Kinesis 데이터 스트림에 대한 자세한 내용은 Amazon Kinesis Data Streams에서 데이터 읽기를 참조하세요.

참고

Kinesis는 각 샤드에 대해 요금을 부과하고 향상된 팬아웃의 경우 스트림에서 읽은 데이터에 대해 요금을 부과합니다. 요금 세부 정보는 Amazon Kinesis 요금을 참조하세요.

폴링 및 배치 처리 스트림

Lambda는 데이터 스트림에서 레코드를 읽고 스트림 레코드를 포함한 이벤트와 동기적으로 함수를 간접 호출합니다. Lambda는 배치의 레코드를 읽고 함수를 간접 호출하여 배치의 레코드를 처리합니다. 각 배치에는 단일 샤드/데이터 스트림의 레코드가 포함됩니다.

Lambda 함수는 데이터 스트림의 소비자 애플리케이션입니다. 이 함수는 각 샤드에서 한 번에 한 개의 레코드 배치를 처리합니다. Lambda 함수를 공유 처리량 소비자(표준 반복기)에 매핑하거나 향상된 팬 아웃 기능이 있는 전용 처리량 소비자에 매핑할 수 있습니다.

  • 표준 반복자: Lambda는 초당 1회의 속도로 Kinesis 스트림의 각 샤드에서 레코드를 폴링합니다. 더 많은 레코드를 사용할 수 있는 경우 Lambda는 함수가 스트림을 따라잡을 때까지 배치 처리를 유지합니다. 이벤트 소스 매핑은 샤드의 다른 소비자와 읽기 처리량을 공유합니다.

  • 향상된 팬아웃: 지연 시간을 최소화하고 읽기 처리량을 최대화하려면 향상된 팬아웃으로 데이터 스트림 소비자를 생성하세요. 향상된 팬아웃 소비자는 각 샤드에 대해 전용 연결을 설정하므로 스트림에서 읽는 다른 애플리케이션에 영향을 주지 않습니다. 스트림 소비자는 HTTP/2를 사용하여 수명이 긴 연결을 통해 레코드를 Lambda에 푸시하고 요청 헤더를 압축함으로써 지연 시간을 최소화합니다. Kinesis RegisterStreamConsumer API를 사용하여 스트림 소비자를 생성할 수 있습니다.

aws kinesis register-stream-consumer \ --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

다음 결과가 표시됩니다:

{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}

함수가 레코드를 처리하는 속도를 높이려면 데이터 스트림에 샤드를 추가합니다. Lambda는 각 샤드의 레코드를 순서대로 처리합니다. 함수가 오류를 반환하면 샤드는 추가 레코드 처리를 중지합니다. 샤드가 많을수록 한 번에 더 많은 배치가 처리되므로 동시 실행에 대한 오류의 영향이 줄어듭니다.

함수가 총 동시 배치 수를 처리하기 위해 확장할 수 없는 경우 함수에 대한 할당량 증가를 요청하거나 동시성을 예약합니다.

기본적으로, Lambda는 레코드가 사용 가능하게 되는 즉시 함수를 간접 호출합니다. Lambda가 이벤트 소스에서 읽는 배치에 하나의 레코드만 있는 경우, Lambda는 함수에 하나의 레코드만 전송합니다. 소수의 레코드로 함수를 호출하는 것을 피하려면 일괄 처리 기간을 구성하여 이벤트 소스가 최대 5분 동안 레코드를 버퍼링하도록 지정할 수 있습니다. 함수를 호출하기 전에 Lambda는 전체 배치가 수집되거나, 일괄 처리 기간이 만료되거나, 배치가 페이로드 한도인 6MB에 도달할 때까지 이벤트 소스에서 레코드를 계속 읽습니다. 자세한 내용은 일괄 처리 동작 섹션을 참조하세요.

주의

Lambda 이벤트 소스 매핑은 각 이벤트를 한 번 이상 처리하므로 레코드가 중복될 수 있습니다. 중복 이벤트와 관련된 잠재적 문제를 방지하려면 함수 코드를 멱등성으로 만드는 것이 좋습니다. 자세한 내용은 AWS 지식 센터의 함수를 멱등성 Lambda 함수로 만들려면 어떻게 해야 하나요?를 참조하세요.

Lambda는 처리를 위해 다음 배치를 전송하기 전에 구성된 확장이 완료될 때까지 기다리지 않습니다. 즉, Lambda가 다음 레코드 배치를 처리할 때 확장이 계속 실행될 수 있습니다. 이로 인해 계정의 동시성 설정 또는 스로틀링을 위반하는 경우 제한 문제가 발생할 수 있습니다. 이것이 잠재적인 문제인지 여부를 탐지하려면 함수를 모니터링하고 이벤트 소스 매핑에 대해 예상보다 높은 동시성 지표가 표시되는지 확인하세요. 간접 호출 간격이 짧기 때문에 Lambda는 일시적으로 동시성 사용량을 샤드 수보다 더 높게 보고할 수 있습니다. 확장이 없는 Lambda 함수에서도 마찬가지입니다.

Kinesis 데이터 스트림의 한 샤드와 하나 이상의 Lambda 간접 호출을 동시에 처리하도록 ParallelizationFactor 설정을 구성합니다. Lambda가 병렬화 계수를 통해 샤드에서 폴링하는 동시 배치의 수는 1(기본값)부터 10까지 지정할 수 있습니다. 예를 들어 ParallelizationFactor(을)를 2로 설정하는 경우 최대 100개의 Kinesis 데이터 샤드를 처리하기 위한 200번의 동시 Lambda 간접 호출을 보유할 수 있습니다(실제 ConcurrentExecutions 지표의 값은 다를 수 있음). 이는 데이터 볼륨이 일시적이고 IteratorAge가 높을 때 처리량을 확장하는 데 도움을 줍니다. 샤드당 동시 배치 수를 높여도 Lambda는 파티션-키 수준에서의 순차 처리를 계속 보장합니다.

Kinesis 집계와 함께 ParallelizationFactor를 사용할 수도 있습니다. 이벤트 소스 매핑의 동작은 향상된 팬아웃을 사용하는지 여부에 따라 달라집니다.

  • 향상된 팬아웃 사용 안 함: 집계된 이벤트 내의 모든 이벤트는 동일한 파티션 키를 가져야 합니다. 파티션 키도 집계된 이벤트의 파티션 키와 일치해야 합니다. 집계된 이벤트 내의 이벤트가 다른 파티션 키를 가지는 경우 Lambda는 파티션 키를 기준으로 이벤트를 순서대로 처리한다고 보장할 수 없습니다.

  • 향상된 팬아웃 사용: 먼저 Lambda는 집계된 이벤트를 개별 이벤트로 디코딩합니다. 집계된 이벤트는 포함된 이벤트와 다른 파티션 키를 가질 수 있습니다. 하지만 파티션 키가 일치하지 않는 이벤트는 삭제되고 손실됩니다. Lambda는 이러한 이벤트를 처리하지 않으며 구성된 장애 대상으로 전송하지도 않습니다.

예제 이벤트

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }