

# チュートリアル: Lambda を Kinesis Data Streams で使用する
<a name="with-kinesis-example"></a>

このチュートリアルでは、Amazon Kinesis データストリームのイベントを処理する Lambda 関数を作成します。

1. カスタムアプリケーションがストリームにレコードを書き込みます。

1. AWS Lambda はストリームをポーリングし、ストリームで新しいレコードを検出すると Lambda 関数を呼び出します。

1. AWS Lambda は、Lambda 関数の作成時に指定した実行ロールを引き受けることにより、Lambda 関数を実行します。

## 前提条件
<a name="with-kinesis-prepare"></a>

### AWS Command Line Interface のインストール
<a name="install_aws_cli"></a>

AWS Command Line Interface をまだインストールしていない場合は、「[最新バージョンの AWS CLI のインストールまたは更新](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html)」にある手順に従ってインストールしてください。

このチュートリアルでは、コマンドを実行するためのコマンドラインターミナルまたはシェルが必要です。Linux および macOS では、任意のシェルとパッケージマネージャーを使用してください。

**注記**  
Windows では、Lambda でよく使用される一部の Bash CLI コマンド (`zip` など) が、オペレーティングシステムの組み込みターミナルでサポートされていません。Ubuntu および Bash の Windows 統合バージョンを取得するには、[Windows Subsystem for Linux をインストール](https://docs.microsoft.com/en-us/windows/wsl/install-win10)します。

## 実行ロールを作成する
<a name="with-kinesis-example-create-iam-role"></a>

AWS リソースにアクセスするためのアクセス権限を関数に付与する[実行ロール](lambda-intro-execution-role.md)を作成します。

**実行ロールを作成するには**

1. IAM コンソールの [[ロールページ](https://console.aws.amazon.com/iam/home#/roles)] を開きます。

1. [**ロールの作成**] を選択します。

1. 次のプロパティでロールを作成します。
   + **信頼されたエンティティ** - **AWS Lambda**
   + **アクセス許可** - **AWSLambdaKinesisExecutionRole**。
   + **Role name** – **lambda-kinesis-role**。

**AWSLambdaKinesisExecutionRole** ポリシーには、Kinesis から項目を読み取り、CloudWatch Logs にログを書き込むために関数が必要とするアクセス許可があります。

## 関数を作成する
<a name="with-kinesis-example-create-function"></a>

Kinesis メッセージを処理する Lambda 関数を作成します。この関数コードは、Kinesis レコードのイベント ID とイベントデータを CloudWatch Logs にログ記録します。

このチュートリアルでは Node.js 24 ランタイムを使用しますが、他のランタイム言語のサンプルコードも提供しています。次のボックスでタブを選択すると、関心のあるランタイムのコードが表示されます。このステップで使用する JavaScript コードは、**[JavaScript]** タブに表示されている最初のサンプルにあります。

------
#### [ .NET ]

**SDK for .NET**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
.NET を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK for Go V2**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Go を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK for Java 2.x**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Java を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**SDK for JavaScript (v3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
JavaScript を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
TypeScript を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK for PHP**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
PHP を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK for Python (Boto3)**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Python を使用した Lambda での Kinesis イベントの消費。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK for Ruby**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Ruby を使用した Lambda での Kinesis イベントの消費。  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**SDK for Rust**  
 GitHub には、その他のリソースもあります。[サーバーレスサンプル](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda)リポジトリで完全な例を見つけて、設定と実行の方法を確認してください。
Rust を使用した Lambda での Kinesis イベントの消費。  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**関数を作成するには**

1. プロジェクト用のディレクトリを作成し、そのディレクトリに切り替えます。

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. サンプル JavaScript コードを `index.js` という名前の新しいファイルにコピーします。

1. デプロイパッケージを作成します。

   ```
   zip function.zip index.js
   ```

1. `create-function` コマンドを使用して Lambda 関数を作成します。

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## Lambda 関数をテストする
<a name="walkthrough-kinesis-events-adminuser-create-test-function-upload-zip-test-manual-invoke"></a>

`invoke`AWS Lambda CLI コマンドおよびサンプルの Kinesis イベントを使用して、手動で Lambda 関数を呼び出します。

**Lambda 関数をテストするには**

1. 以下の JSON をファイルにコピーし、`input.txt` という名前で保存します。

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. `invoke` コマンドを使用して、関数にイベントを送信します。

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   AWS CLI バージョン 2 を使用している場合、**cli-binary-format** オプションは必須です。これをデフォルト設定にするには、`aws configure set cli-binary-format raw-in-base64-out` を実行します。詳細については、バージョン 2 の AWS Command Line Interface ユーザーガイドの「[AWS CLI でサポートされているグローバルコマンドラインオプション](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list)」を参照してください。

   レスポンスは `out.txt` に保存されます。

## Kinesis Stream を作成する
<a name="with-kinesis-example-configure-event-source-create"></a>

`create-stream ` コマンドを使用して、スキーマを作成します。

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

次の `describe-stream` コマンドを実行して、ストリーム ARN を取得します。

```
aws kinesis describe-stream --stream-name lambda-stream
```

次のような出力が表示されます。

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

次のステップで Lambda 関数にストリームを関連付けるために、ストリーム ARN を使用します。

## AWS Lambda でイベントソースを追加する
<a name="with-kinesis-example-configure-event-source-add-event-source"></a>

次の AWS CLI `add-event-source` コマンドを実行します。

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

後で使用するために、マッピング ID をメモしておきます。`list-event-source-mappings` コマンドを実行して、イベントソースマッピングのリストを取得できます。

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

レスポンスでは、ステータス値が `enabled` であることを確認できます。イベントソースマッピングを無効にすると、レコードを失うことなくポーリングを一時停止できます。

## セットアップをテストする
<a name="with-kinesis-example-configure-event-source-test-end-to-end"></a>

イベントソースマッピングをテストするには、イベントレコードを Kinesis ストリームに追加します。`--data` 値は、文字列を Kinesis に送信する前に CLI で base64 にエンコードされる文字列です。同じコマンドを複数回実行して、複数のレコードをストリームに追加することができます。

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

Lambda は実行ロールを使用して、ストリームからレコードを読み取ります。次に、Lambda 関数を呼び出し、レコードのバッチを渡します。この関数は、各レコードからデータをデコードしてログ記録し、出力を CloudWatch Logs に送信します。[CloudWatch コンソール](https://console.aws.amazon.com/cloudwatch)でログを表示する

## リソースのクリーンアップ
<a name="cleanup"></a>

このチュートリアル用に作成したリソースは、保持しない場合は削除できます。使用しなくなった AWS リソースを削除することで、AWS アカウント アカウントに請求される料金の発生を防ぎます。

**実行ロールを削除する**

1. IAM コンソールの [[ロール]](https://console.aws.amazon.com/iam/home#/roles) ページを開きます。

1. 作成した実行ロールを選択します。

1. **[削除]** を選択します。

1. テキスト入力フィールドにロールの名前を入力し、**[削除]** を選択します。

**Lambda 関数を削除するには**

1. Lambda コンソールの [[関数]](https://console.aws.amazon.com/lambda/home#/functions) ページを開きます。

1. 作成した関数を選択します。

1. **[アクション]** で、**[削除]** を選択します。

1. テキスト入力フィールドに **confirm** と入力し、**[Delete]** (削除) を選択します。

**Kinesis ストリームを削除するには**

1. AWS マネジメントコンソール にサインインし、Kinesis コンソール ([https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)) を開きます。

1. 作成したストリームを選択します。

1. [** Actions**] で、[**Delete **] を選択します。

1. テキスト入力フィールドに **delete** を入力します。

1. **[Delete]** (削除) をクリックします。