

# 자습서: Kinesis Data Streams에서 Lambda 사용
<a name="with-kinesis-example"></a>

이 자습서에서는 Amazon Kinesis 데이터 스트림의 이벤트를 소비하기 위한 Lambda 함수를 생성합니다.

1. 사용자 지정 앱은 스트림에 레코드를 기록합니다.

1. AWS Lambda는 스트림에서 새로운 레코드가 감지될 때마다 스트림을 폴링하고 Lambda 함수를 간접 호출합니다.

1. AWS Lambda는 Lambda 함수를 생성할 때 지정한 실행 역할을 수임하여 Lambda 함수를 실행합니다.

## 사전 조건
<a name="with-kinesis-prepare"></a>

### AWS Command Line Interface 설치
<a name="install_aws_cli"></a>

아직 AWS Command Line Interface를 설치하지 않은 경우 [AWS CLI의 최신 버전 설치 또는 업데이트](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)에서 설명하는 단계에 따라 설치하세요.

이 자습서에서는 명령을 실행할 셸 또는 명령줄 터미널이 필요합니다. Linux 및 macOS에서는 선호하는 셸과 패키지 관리자를 사용합니다.

**참고**  
Windows에서는 Lambda와 함께 일반적으로 사용하는 일부 Bash CLI 명령(예:`zip`)은 운영 체제의 기본 제공 터미널에서 지원되지 않습니다. Ubuntu와 Bash의 Windows 통합 버전을 가져오려면 [Linux용 Windows Subsystem을 설치](https://docs.microsoft.com/en-us/windows/wsl/install-win10)합니다.

## 실행 역할 생성
<a name="with-kinesis-example-create-iam-role"></a>

함수에 AWS 리소스에 액세스할 수 있는 권한을 제공하는 [실행 역할](lambda-intro-execution-role.md)을 만듭니다.

**실행 역할을 만들려면**

1. IAM 콘솔에서 [역할 페이지](https://console.aws.amazon.com/iam/home#/roles)를 엽니다.

1. **역할 생성**을 선택합니다.

1. 다음 속성을 사용하여 역할을 만듭니다.
   + **신뢰할 수 있는 엔터티** – **AWS Lambda**.
   + **권한** – **AWSLambdaKinesisExecutionRole**.
   + **역할 이름** – **lambda-kinesis-role**.

**AWSLambdaKinesisExecutionRole** 정책은 함수가 Kinesis에서 항목을 읽고 CloudWatch Logs에 로그를 쓰는 데 필요한 권한을 가집니다.

## 함수 생성
<a name="with-kinesis-example-create-function"></a>

Kinesis 메시지를 처리하는 Lambda 함수를 생성합니다. 함수 코드는 Kinesis 레코드의 이벤트 ID 및 이벤트 데이터를 CloudWatch 로그에 기록합니다.

이 자습서에서는 Node.js 24 런타임을 사용하지만 다른 런타임 언어의 예제 코드도 제공했습니다. 다음 상자에서 탭을 선택하여 관심 있는 런타임에 대한 코드를 볼 수 있습니다. 이 단계에서 사용할 JavaScript 코드는 **JavaScript** 탭에 표시된 첫 번째 예제입니다.

------
#### [ .NET ]

**SDK for .NET**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
.NET을 사용하여 Lambda로 Kinesis 이벤트 사용  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Go를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Java를 사용하여 Lambda에서 Kinesis 이벤트를 사용합니다.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
JavaScript를 사용하여 Lambda로 Kinesis 이벤트 사용  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
TypeScript를 사용하여 Lambda로 Kinesis 이벤트 사용  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
PHP를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python(Boto3)**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Python을 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Ruby를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub에 더 많은 내용이 있습니다. [서버리스 예제](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda) 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.
Rust를 사용하여 Lambda로 Kinesis 이벤트를 사용합니다.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**함수를 만들려면**

1. 프로젝트에 대한 디렉터리를 생성하고 해당 디렉터리로 전환합니다.

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. 샘플 JavaScript 코드를 새로운 `index.js` 파일에 복사합니다.

1. 배포 패키지를 만듭니다.

   ```
   zip function.zip index.js
   ```

1. `create-function` 명령을 사용해 Lambda 함수를 만듭니다.

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## Lambda 함수 테스트
<a name="walkthrough-kinesis-events-adminuser-create-test-function-upload-zip-test-manual-invoke"></a>

`invoke` AWS Lambda CLI 명령 및 샘플 Kinesis 이벤트를 사용하여 Lambda 함수를 수동으로 간접 호출합니다.

**Lambda 함수를 테스트하려면**

1. 다음 JSON을 파일에 복사하고 `input.txt`로 저장합니다.

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. `invoke` 명령을 사용하여 함수로 이벤트를 전송합니다.

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   **cli-binary-format** 옵션은 AWS CLI 버전 2를 사용할 때 필요합니다. 이 설정을 기본 설정으로 지정하려면 `aws configure set cli-binary-format raw-in-base64-out`을(를) 실행하세요. 자세한 내용은 [AWS CLI 지원되는 글로벌 명령줄 옵션](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)을 AWS Command Line Interface 사용 설명서 버전 2에서 참조하세요.

   응답이 `out.txt`로 저장됩니다.

## Kinesis 스트림 생성
<a name="with-kinesis-example-configure-event-source-create"></a>

`create-stream ` 명령을 사용하여 스트림을 만듭니다.

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

다음 `describe-stream` 명령을 실행하여 스트림 ARN을 가져옵니다.

```
aws kinesis describe-stream --stream-name lambda-stream
```

다음 결과가 표시됩니다.

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

다음 단계에서 스트림 ARN을 사용하여 스트림을 Lambda 함수와 연결합니다.

## AWS Lambda에서 이벤트 소스 추가
<a name="with-kinesis-example-configure-event-source-add-event-source"></a>

다음 AWS CLI `add-event-source` 명령을 실행합니다.

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

나중에 사용할 수 있도록 매핑 ID를 기록해 둡니다. `list-event-source-mappings` 명령을 실행하여 이벤트 소스 매핑 목록을 가져올 수 있습니다.

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

응답에서 상태 값이 `enabled`인지 확인할 수 있습니다. 이벤트 소스 매핑을 비활성화하여 폴링을 일시적으로 중지시켜 레코드 손실을 방지할 수 있습니다.

## 설정 테스트
<a name="with-kinesis-example-configure-event-source-test-end-to-end"></a>

이벤트 소스 매핑을 테스트하려면 Kinesis 스트림에 이벤트 레코드를 추가합니다. `--data` 값은 Kinesis로 전송하기 전에 CLI가 base64로 인코딩하는 문자열입니다. 동일한 명령을 두 번 이상 실행하여 여러 레코드를 스트림에 추가할 수 있습니다.

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

Lambda에서는 실행 역할을 사용하여 스트림에서 레코드를 읽습니다. 그런 다음 Lambda 함수를 간접 호출해 레코드 배치를 전달합니다. 함수는 각 레코드에서 데이터를 디코딩하고 로깅해 CloudWatch Logs에 출력을 전송합니다. 로그는 [CloudWatch 콘솔](https://console.aws.amazon.com/cloudwatch)에서 확인합니다.

## 리소스 정리
<a name="cleanup"></a>

이 자습서 용도로 생성한 리소스를 보관하고 싶지 않다면 지금 삭제할 수 있습니다. 더 이상 사용하지 않는 AWS 리소스를 삭제하면 AWS 계정에 불필요한 요금이 발생하는 것을 방지할 수 있습니다.

**집행 역할 삭제**

1. IAM 콘솔에서 [역할 페이지](https://console.aws.amazon.com/iam/home#/roles)를 엽니다.

1. 생성한 실행 역할을 선택합니다.

1. **삭제**를 선택합니다.

1. 텍스트 입력 필드에 역할의 이름을 입력하고 **Delete**(삭제)를 선택합니다.

**Lambda 함수를 삭제하려면**

1. Lambda 콘솔의 [함수 페이지](https://console.aws.amazon.com/lambda/home#/functions)를 엽니다.

1. 생성한 함수를 선택합니다.

1. **작업**, **삭제**를 선택합니다.

1. 텍스트 입력 필드에 **confirm**를 입력하고 **Delete**(삭제)를 선택합니다.

**Kinesis 스트림을 삭제하려면**

1. AWS Management Console에 로그인하여 [https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)에서 Kinesis 콘솔을 엽니다.

1. 생성한 스트림을 선택합니다.

1. **작업**, **삭제**를 선택합니다.

1. 텍스트 입력 필드에 **delete**을 입력합니다.

1. **삭제**를 선택합니다.