

# Tutorial: Usar o Lambda com o Kinesis Data Streams
<a name="with-kinesis-example"></a>

Neste tutorial, você criará uma função do Lambda para consumir eventos de um fluxo de dados do Amazon Kinesis. 

1. O aplicativo personalizado grava registros no fluxo.

1. AWS LambdaO sonda a transmissão e, quando detecta novos registros, invoca sua função do Lambda.

1. AWS LambdaO executa a função do Lambda assumindo a função de execução especificada no momento da criação da função do Lambda.

## Pré-requisitos
<a name="with-kinesis-prepare"></a>

### Instalar o AWS Command Line Interface
<a name="install_aws_cli"></a>

Se você ainda não instalou a AWS Command Line Interface, siga as etapas em [Instalar ou atualizar a versão mais recente da AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) para instalá-la.

O tutorial requer um terminal de linha de comando ou um shell para executar os comandos. No Linux e no macOS, use o gerenciador de pacotes e de shell de sua preferência.

**nota**  
No Windows, alguns comandos da CLI do Bash que você costuma usar com o Lambda (como `zip`) não são compatíveis com os terminais integrados do sistema operacional. Para obter uma versão do Ubuntu com o Bash integrada no Windows, [instale o Subsistema do Windows para Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10). 

## Criar a função de execução
<a name="with-kinesis-example-create-iam-role"></a>

Crie a [função de execução](lambda-intro-execution-role.md) que dá à sua função permissão para acessar recursos do AWS.

**Para criar uma função de execução**

1. Abra a [página Roles](https://console.aws.amazon.com/iam/home#/roles) (Funções) no console do IAM.

1. Selecione **Create role** (Criar função).

1. Crie uma função com as propriedades a seguir.
   + **Trusted entity (Entidade confiável** – **AWS Lambda**.
   + **Permissões**: **AWSLambdaKinesisExecutionRole**.
   + **Role name (Nome da função** – **lambda-kinesis-role**.

A política **AWSLambdaKinesisExecutionRole** tem as permissões necessárias para a função ler itens do Kinesis e gravar logs no CloudWatch Logs.

## Criar a função
<a name="with-kinesis-example-create-function"></a>

Crie uma função do Lambda que processe suas mensagens do Kinesis. O código da função registra o ID do evento e os dados do evento do registro do Kinesis no CloudWatch Logs.

Este tutorial usa o runtime do Node.js 24, mas também fornecemos exemplos de códigos em outras linguagens de runtime. Você pode selecionar a guia na caixa a seguir para ver o código do runtime do seu interesse. O código JavaScript que você usará nesta etapa é o primeiro exemplo mostrado na guia **JavaScript**.

------
#### [ .NET ]

**SDK para .NET**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK para Go V2**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK para Java 2.x**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**SDK para JavaScript (v3)**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando JavaScript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Consumir um evento do Kinesis com o Lambda usando TypeScript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK para PHP**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK para Python (Boto3).**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK para Ruby**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consumir um evento do Kinesis com o Lambda usando Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**SDK para Rust**  
 Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos [Exemplos sem servidor](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Consuma um evento do Kinesis com o Lambda usando Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**Para criar a função**

1. Crie um diretório para o projeto e depois mude para esse diretório.

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. Copie o código JavaScript de amostra em um novo arquivo denominado `index.js`.

1. Crie um pacote de implantação.

   ```
   zip function.zip index.js
   ```

1. Crie uma função do Lambda com o comando `create-function`.

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## Testar a função do Lambda
<a name="walkthrough-kinesis-events-adminuser-create-test-function-upload-zip-test-manual-invoke"></a>

Invoque sua função do Lambda manualmente usando o comando `invoke` da CLI do AWS Lambda e um evento do Kinesis de amostra.

**Para testar a função do Lambda**

1. Copie o JSON a seguir em um arquivo e salve-o como `input.txt`. 

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. Use o comando `invoke` para enviar o evento para a função.

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   A opção **cli-binary-format** será necessária se você estiver usando a AWS CLI versão 2. Para que essa seja a configuração padrão, execute `aws configure set cli-binary-format raw-in-base64-out`. Para obter mais informações, consulte [A AWS CLI comporta opções de linha de comando globais](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list) no *Guia do usuário da AWS Command Line Interface versão 2*.

   A resposta é salva no `out.txt`.

## Criar uma transmissão do Kinesis
<a name="with-kinesis-example-configure-event-source-create"></a>

Use o comando `create-stream ` para criar um fluxo.

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

Execute o comando `describe-stream` a seguir para obter o ARN do stream.

```
aws kinesis describe-stream --stream-name lambda-stream
```

A seguinte saída deverá ser mostrada:

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

Você usa o ARN do stream na próxima etapa para associar o stream à sua função do Lambda.

## Adicionar uma fonte de eventos no AWS Lambda
<a name="with-kinesis-example-configure-event-source-add-event-source"></a>

Execute o comando AWS CLI a seguir da `add-event-source`.

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

Observe o ID do mapeamento para uso posterior. Você pode obter uma lista de mapeamentos de origem de evento executando o comando `list-event-source-mappings` a seguir.

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

Na resposta, pode verificar que o valor do status é `enabled`. Os mapeamentos de origem de eventos podem ser desabilitados para pausar a pesquisa temporariamente, sem perder nenhum registro.

## Testar a configuração
<a name="with-kinesis-example-configure-event-source-test-end-to-end"></a>

Para testar o mapeamento da origem do evento, adicione registros de eventos ao seu stream do Kinesis. O valor `--data` é uma string que a CLI codifica em base64 antes de enviá-la ao Kinesis. Você pode executar o mesmo comando mais de uma vez para adicionar vários registros ao fluxo.

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

O Lambda usa a função de execução para ler registros da transmissão. Em seguida, ele invoca sua função do Lambda, transmitindo lotes de registros. A função decodifica os dados de cada registro e os registra, enviando a saída para CloudWatch Logs. Visualize os logs no [console do CloudWatch](https://console.aws.amazon.com/cloudwatch).

## Limpe os recursos
<a name="cleanup"></a>

Agora você pode excluir os recursos criados para este tutorial, a menos que queira mantê-los. Excluindo os recursos da AWS que você não está mais usando, você evita cobranças desnecessárias em sua Conta da AWS.

**Para excluir a função de execução**

1. Abra a página [Roles](https://console.aws.amazon.com/iam/home#/roles) (Funções) no console do IAM.

1. Selecione a função de execução que você criou.

1. Escolha **Excluir**.

1. Insira o nome do perfil no campo de entrada de texto e escolha **Delete** (Excluir).

**Como excluir a função do Lambda**

1. Abra a página [Functions](https://console.aws.amazon.com/lambda/home#/functions) (Funções) no console do Lambda.

1. Selecione a função que você criou.

1. Selecione **Ações**, **Excluir**.

1. Digite **confirm** no campo de entrada de texto e escolha **Delete** (Excluir).

**Para excluir a transmissão do Kinesis**

1. Faça login no Console de gerenciamento da AWS e abra o console do Kinesis em [ https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis).

1. Selecione a transmissão criada.

1. Selecione **Ações**, **Excluir**.

1. Digite **delete** no campo de entrada de texto.

1. Escolha **Excluir**.