

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Tutorial: Utilizzo di Lambda con Flusso di dati Kinesis
<a name="with-kinesis-example"></a>

In questo tutorial, viene creata una funzione Lambda per utilizzare gli eventi da un flusso di dati Amazon Kinesis. 

1. L'app personalizzata scrive record nel flusso.

1. AWS Lambda interroga lo stream e, quando rileva nuovi record nello stream, richiama la funzione Lambda.

1. AWS Lambda esegue la funzione Lambda assumendo il ruolo di esecuzione specificato al momento della creazione della funzione Lambda.

## Prerequisiti
<a name="with-kinesis-prepare"></a>

### Installa AWS Command Line Interface
<a name="install_aws_cli"></a>

Se non l'hai ancora installato AWS Command Line Interface, segui i passaggi indicati in [Installazione o aggiornamento della versione più recente di AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html) per installarlo.

Per eseguire i comandi nel tutorial, sono necessari un terminale a riga di comando o una shell (interprete di comandi). In Linux e macOS, utilizza la shell (interprete di comandi) e il gestore pacchetti preferiti.

**Nota**  
Su Windows, alcuni comandi della CLI Bash utilizzati comunemente con Lambda (ad esempio, `zip`) non sono supportati dai terminali integrati del sistema operativo. Per ottenere una versione integrata su Windows di Ubuntu e Bash, [installa il sottosistema Windows per Linux](https://docs.microsoft.com/en-us/windows/wsl/install-win10). 

## Creazione del ruolo di esecuzione
<a name="with-kinesis-example-create-iam-role"></a>

Crea il [ruolo di esecuzione](lambda-intro-execution-role.md) che autorizza la funzione ad accedere alle AWS risorse.

**Per creare un ruolo di esecuzione**

1. Apri la pagina [Ruoli](https://console.aws.amazon.com/iam/home#/roles) nella console IAM.

1. Scegliere **Crea ruolo**.

1. Creare un ruolo con le seguenti proprietà.
   + **Trusted entity **(Entità attendibile) – **AWS Lambda**.
   + **Autorizzazioni** — **AWSLambdaKinesisExecutionRole**.
   + **Nome ruolo** – **lambda-kinesis-role**.

La **AWSLambdaKinesisExecutionRole**policy dispone delle autorizzazioni necessarie alla funzione per leggere gli elementi da Kinesis e scrivere i log in CloudWatch Logs.

## Creazione della funzione
<a name="with-kinesis-example-create-function"></a>

Crea una funzione Lambda che elabora i messaggi Kinesis. Il codice funzione registra l'ID dell'evento e i dati dell'evento del record CloudWatch Kinesis in Logs.

Questo tutorial utilizza il runtime Node.js 24, ma abbiamo anche fornito codice di esempio in altri linguaggi di runtime. Per visualizzare il codice per il runtime che ti interessa, seleziona la scheda corrispondente nella casella seguente. Il JavaScript codice che utilizzerai in questo passaggio si trova nel primo esempio mostrato nella **JavaScript**scheda.

------
#### [ .NET ]

**SDK per .NET**  
 C'è altro su GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite .NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK per Go V2**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK per Java 2.x**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**SDK per JavaScript (v3)**  
 C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda). 
Consumo di un evento Kinesis con Lambda utilizzando. JavaScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Consumo di un evento Kinesis con Lambda utilizzando. TypeScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK per PHP**  
 C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK per Python (Boto3)**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK per Ruby**  
 C'è dell'altro GitHub. Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**SDK per Rust**  
 C'è altro da fare. GitHub Trova l’esempio completo e scopri come eseguire la configurazione e l’esecuzione nel repository di [Esempi serverless](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Utilizzo di un evento Kinesis con Lambda tramite Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**Creazione della funzione**

1. Crea una directory per il progetto, quindi passa a quella directory.

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. Copia il JavaScript codice di esempio in un nuovo file denominato`index.js`.

1. Crea un pacchetto di implementazione.

   ```
   zip function.zip index.js
   ```

1. Creare una funzione Lambda con il comando `create-function`.

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## Test della funzione Lambda
<a name="walkthrough-kinesis-events-adminuser-create-test-function-upload-zip-test-manual-invoke"></a>

Richiama la funzione Lambda manualmente utilizzando il comando `invoke` AWS Lambda CLI e un evento Kinesis di esempio.

**Verifica della funzione Lambda**

1. Copiare il codice JSON seguente in un file e salvarlo con nome `input.txt`. 

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. Utilizzare il comando `invoke` per inviare l'evento alla funzione.

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   L'**cli-binary-format**opzione è obbligatoria se utilizzi la versione 2. AWS CLI Per rendere questa impostazione come predefinita, esegui `aws configure set cli-binary-format raw-in-base64-out`. Per ulteriori informazioni, consulta la pagina [AWS CLI supported global command line options](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list) nella *Guida per l'utente di AWS Command Line Interface versione 2*.

   La risposta viene salvata in `out.txt`.

## Creare un flusso Kinesis
<a name="with-kinesis-example-configure-event-source-create"></a>

Utilizzare il comando `create-stream ` per creare un flusso.

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

Eseguire il seguente comando `describe-stream` per ottenere il flusso ARN.

```
aws kinesis describe-stream --stream-name lambda-stream
```

Verrà visualizzato l’output seguente:

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

Nella fase successiva utilizzare l'ARN del flusso per associare il flusso alla funzione Lambda.

## Aggiungi una fonte di eventi in AWS Lambda
<a name="with-kinesis-example-configure-event-source-add-event-source"></a>

Eseguire il seguente comando AWS CLI `add-event-source`.

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

Annotare l'ID della mappatura per utilizzarlo in futuro. Mediante l'esecuzione del comando `list-event-source-mappings` è possibile ottenere un elenco di mappature delle origini eventi.

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

Nella risposta è possibile verificare che il valore dello stato sia `enabled` (attivato). Le mappature dell'origine eventi possono essere disabilitate per sospendere il polling temporaneamente senza perdere alcuni record.

## Eseguire il test della configurazione
<a name="with-kinesis-example-configure-event-source-test-end-to-end"></a>

Per testare la mappatura dell'origine eventi, aggiungere i record dell'evento al flusso Kinesis. Il valore `--data` è una stringa che il CLI codifica in base64 prima di inviarla a Kinesis. È possibile eseguire lo stesso comando più volte per aggiungere più record al flusso.

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

Lambda usa il ruolo di esecuzione per leggere i record dal flusso. Quindi richiama la funzione Lambda, passando ai batch dei record. La funzione decodifica i dati di ogni record e li registra, inviando l'output a CloudWatch Logs. Visualizza i log nella [console CloudWatch ](https://console.aws.amazon.com/cloudwatch).

## Pulizia delle risorse
<a name="cleanup"></a>

Ora è possibile eliminare le risorse create per questo tutorial, a meno che non si voglia conservarle. Eliminando AWS le risorse che non utilizzi più, eviti addebiti inutili a tuo carico. Account AWS

**Come eliminare il ruolo di esecuzione**

1. Aprire la pagina [Ruoli](https://console.aws.amazon.com/iam/home#/roles) della console IAM.

1. Selezionare il ruolo di esecuzione creato.

1. Scegliere **Elimina**.

1. Inserisci il nome del ruolo nel campo di immissione testo e seleziona **Delete** (Elimina).

**Per eliminare la funzione Lambda**

1. Aprire la pagina [Functions (Funzioni)](https://console.aws.amazon.com/lambda/home#/functions) della console Lambda.

1. Selezionare la funzione creata.

1. Scegliere **Operazioni**, **Elimina**.

1. Digita **confirm** nel campo di immissione testo e scegli **Delete** (Elimina).

**Per eliminare il flusso Kinesis**

1. [Accedi Console di gestione AWS e apri la console Kinesis all'indirizzo https://console.aws.amazon.com /kinesis.](https://console.aws.amazon.com/kinesis)

1. Selezionare il flusso creato.

1. Scegli **Operazioni** > **Elimina**.

1. Inserisci **delete** nel campo di immissione del testo.

1. Scegli **Delete** (Elimina).