Anleitung: Verwenden von Lambda mit Kinesis Data Streams - AWS Lambda

Anleitung: Verwenden von Lambda mit Kinesis Data Streams

In dieser Anleitung erstellen Sie eine Lambda-Funktion, um Ereignisse aus einem Amazon Kinesis Data Streams zu nutzen.

  1. Die benutzerdefinierte Anwendung schreibt die Datensätze zum Stream.

  2. AWS Lambda fragt den Stream ab und ruft bei Erkennung neuer Datensätze im Stream Ihre Lambda-Funktion auf.

  3. AWS Lambda führt die Lambda-Funktion aus. Dabei übernimmt der Service die Ausführungsrolle, die bei der Erstellung der Lambda-Funktion angegeben wurde.

Voraussetzungen

Wenn Sie die AWS Command Line Interface noch nicht installiert haben, folgen Sie den Schritten unter Installieren oder Aktualisieren der neuesten Version von AWS CLI, um diese zu installieren.

Das Tutorial erfordert zum Ausführen von Befehlen ein Befehlszeilenterminal oder eine Shell. Verwenden Sie unter Linux und macOS Ihre bevorzugte Shell und Ihren bevorzugten Paketmanager.

Anmerkung

In Windows werden einige Bash-CLI-Befehle, die Sie häufig mit Lambda verwenden (z. B. zip), von den integrierten Terminals des Betriebssystems nicht unterstützt. Um eine in Windows integrierte Version von Ubuntu und Bash zu erhalten, installieren Sie das Windows-Subsystem für Linux.

Erstellen der Ausführungsrolle

Erstellen Sie die Ausführungsrolle die Ihrer Funktion die Berechtigung für den Zugriff auf AWS-Ressourcen erteilt.

So erstellen Sie eine Ausführungsrolle
  1. Öffnen Sie die Seite Roles (Rollen) in der IAM-Konsole.

  2. Wählen Sie Create role aus.

  3. Erstellen Sie eine Rolle mit den folgenden Eigenschaften.

    • Trusted entity (Vertrauenswürdige EntitätAWS Lambda.

    • BerechtigungenAWSLambdaKinesisExecutionRole.

    • Role name (Name der Rollelambda-kinesis-role.

Die Richtlinie AWSLambdaKinesisExecutionRole verfügt über die Berechtigungen, die die Funktion zum Lesen von Elementen aus Kinesis und zum Schreiben von Protokollen in CloudWatch Logs benötigt.

Erstellen der Funktion

Erstellen Sie eine Lambda-Funktion, die Ihre Kinesis-Nachrichten verarbeitet. Der Funktionscode protokolliert die Ereignis-ID und die Ereignisdaten des Kinesis-Datensatzes in CloudWatch Logs.

In diesem Tutorial wird die Node.js 22-Laufzeit verwendet. Es stehen aber auch Beispielcodes für andere Laufzeitensprachen zur Verfügung. Sie können die Registerkarte im folgenden Feld auswählen, um Code für die gewünschte Laufzeit anzusehen. Der JavaScript-Code, den Sie in diesem Schritt verwenden werden, ist im ersten Beispiel auf der Registerkarte JavaScript zu sehen.

.NET
SDK für .NET
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von .NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }
Go
SDK für Go V2
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "log" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error { if len(kinesisEvent.Records) == 0 { log.Printf("empty Kinesis event received") return nil } for _, record := range kinesisEvent.Records { log.Printf("processed Kinesis event with EventId: %v", record.EventID) recordDataBytes := record.Kinesis.Data recordDataText := string(recordDataBytes) log.Printf("record data: %v", recordDataText) // TODO: Do interesting work based on the new data } log.Printf("successfully processed %v records", len(kinesisEvent.Records)) return nil } func main() { lambda.Start(handler) }
Java
SDK für Java 2.x
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package example; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.LambdaLogger; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; public class Handler implements RequestHandler<KinesisEvent, Void> { @Override public Void handleRequest(final KinesisEvent event, final Context context) { LambdaLogger logger = context.getLogger(); if (event.getRecords().isEmpty()) { logger.log("Empty Kinesis Event received"); return null; } for (KinesisEvent.KinesisEventRecord record : event.getRecords()) { try { logger.log("Processed Event with EventId: "+record.getEventID()); String data = new String(record.getKinesis().getData().array()); logger.log("Data:"+ data); // TODO: Do interesting work based on the new data } catch (Exception ex) { logger.log("An error occurred:"+ex.getMessage()); throw ex; } } logger.log("Successfully processed:"+event.getRecords().size()+" records"); return null; } }
JavaScript
SDK für JavaScript (v3)
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von JavaScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); throw err; } } console.log(`Successfully processed ${event.Records.length} records.`); }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von TypeScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<void> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); throw err; } logger.info(`Successfully processed ${event.Records.length} records.`); } }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
PHP
SDK für PHP
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von PHP.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kinesis\KinesisEvent; use Bref\Event\Kinesis\KinesisHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends KinesisHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleKinesis(KinesisEvent $event, Context $context): void { $this->logger->info("Processing records"); $records = $event->getRecords(); foreach ($records as $record) { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data // Any exception thrown will be logged and the invocation will be marked as failed } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK für Python (Boto3)
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import base64 def lambda_handler(event, context): for record in event['Records']: try: print(f"Processed Kinesis Event - EventID: {record['eventID']}") record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8') print(f"Record Data: {record_data}") # TODO: Do interesting work based on the new data except Exception as e: print(f"An error occurred {e}") raise e print(f"Successfully processed {len(event['Records'])} records.")
Ruby
SDK für Ruby
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'aws-sdk' def lambda_handler(event:, context:) event['Records'].each do |record| begin puts "Processed Kinesis Event - EventID: #{record['eventID']}" record_data = get_record_data_async(record['kinesis']) puts "Record Data: #{record_data}" # TODO: Do interesting work based on the new data rescue => err $stderr.puts "An error occurred #{err}" raise err end end puts "Successfully processed #{event['Records'].length} records." end def get_record_data_async(payload) data = Base64.decode64(payload['data']).force_encoding('UTF-8') # Placeholder for actual async work # You can use Ruby's asynchronous programming tools like async/await or fibers here. return data end
Rust
SDK für Rust
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Kinesis-Ereignisses mit Lambda unter Verwendung von Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::event::kinesis::KinesisEvent; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> { if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } event.payload.records.iter().for_each(|record| { tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default()); let record_data = std::str::from_utf8(&record.kinesis.data); match record_data { Ok(data) => { // log the record data tracing::info!("Data: {}", data); } Err(e) => { tracing::error!("Error: {}", e); } } }); tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }
So erstellen Sie die Funktion
  1. Erstellen Sie ein Verzeichnis für das Projekt und wechseln Sie dann zu diesem Verzeichnis.

    mkdir kinesis-tutorial cd kinesis-tutorial
  2. Kopieren Sie den JavaScript-Beispielcode in eine neue Datei mit dem Namen index.js.

  3. Erstellen Sie ein Bereitstellungspaket.

    zip function.zip index.js
  4. Erstellen Sie eine Lambda-Funktion mit dem Befehl create-function.

    aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs22.x \ --role arn:aws:iam::111122223333:role/lambda-kinesis-role

Lambda-Funktion testen

Rufen Sie die Lambda-Funktion mit dem invoke-CLI-Befehl AWS Lambda und einem Kinesis-Beispielereignis manuell auf.

Lambda-Funktion testen
  1. Kopieren Sie das folgende JSON in eine Datei und speichern Sie sie unter input.txt.

    { "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream" } ] }
  2. Verwenden Sie den invoke-Befehl, um das Ereignis an die Funktion zu senden.

    aws lambda invoke --function-name ProcessKinesisRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt

    Die cli-binary-format-Option ist erforderlich, wenn Sie AWS CLI Version 2 verwenden. Um dies zur Standardeinstellung zu machen, führen Sie aws configure set cli-binary-format raw-in-base64-out aus. Weitere Informationen finden Sie unter Von AWS CLI unterstützte globale Befehlszeilenoptionen im AWS Command Line Interface-Benutzerhandbuch für Version 2.

    Die Antwort wird in gespeicher out.txt.

Kinesis-Stream erstellen

Verwenden Sie den Befehl create-stream , um einen Stream zu erstellen.

aws kinesis create-stream --stream-name lambda-stream --shard-count 1

Führen Sie den folgenden describe-stream-Befehl aus, um den Stream-ARN zu erhalten.

aws kinesis describe-stream --stream-name lambda-stream

Die Ausgabe sollte folgendermaßen aussehen:

{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}

Sie benötigen im nächsten Schritt den Stream-ARN, um die Lambda-Funktion dem Stream zuzuordnen.

Hinzufügen einer Ereignisquelle in AWS Lambda

Führen Sie den folgenden AWS CLI-add-event-source-Befehl aus.

aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \ --batch-size 100 --starting-position LATEST

Notieren Sie die Zuweisungs-ID zur späteren Verwendung. Sie erhalten eine Liste der Zuweisungen von Ereignisquellen, indem Sie den list-event-source-mappings-Befehl ausführen.

aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream

In der Antwort können Sie überprüfen, ob der Statuswert is enabled. Ereignisquellen-Zuweisungen können deaktiviert werden, um Abfragen vorübergehend zu pausieren, ohne dass Datensätze verloren gehen.

Testen der Einrichtung

Zum Testen der Ereignisquellen-Zuweisung fügen Sie Ihrem Kinesis-Stream Ereignisdatensätze hinzu. Der Wert --data ist eine Zeichenfolge, die die CLI vor dem Senden an Kinesis mit base64 verschlüsselt. Der gleiche Befehl kann mehrmals ausgeführt werden, um dem Stream mehrere Datensätze hinzuzufügen.

aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."

Lambda verwendet die Ausführungsrolle, um Datensätze aus dem Stream zu lesen. Anschließend wird Ihre Lambda-Funktion aufgerufen und Datensatzbatches werden übergeben. Die Funktion decodiert Daten aus jedem Datensatz und protokolliert sie, indem die Ausgabe an CloudWatch Logs gesendet wird. Zeigen Sie die Protokolle in der CloudWatch-Konsole an.

Bereinigen Ihrer Ressourcen

Sie können jetzt die Ressourcen, die Sie für dieses Tutorial erstellt haben, löschen, es sei denn, Sie möchten sie behalten. Durch das Löschen von AWS-Ressourcen, die Sie nicht mehr verwenden, können Sie verhindern, dass unnötige Gebühren in Ihrem AWS-Konto-Konto anfallen.

So löschen Sie die Ausführungsrolle
  1. Öffnen Sie die Seite Roles in der IAM-Konsole.

  2. Wählen Sie die von Ihnen erstellte Ausführungsrolle aus.

  3. Wählen Sie Löschen aus.

  4. Geben Sie den Namen der Rolle in das Texteingabefeld ein und wählen Sie Delete (Löschen) aus.

So löschen Sie die Lambda-Funktion:
  1. Öffnen Sie die Seite Funktionen der Lambda-Konsole.

  2. Wählen Sie die Funktion aus, die Sie erstellt haben.

  3. Wählen Sie Actions, Delete.

  4. Geben Sie confirm in das Texteingabefeld ein und wählen Sie Delete (Löschen) aus.

So löschen Sie den Kinesis-Stream
  1. Melden Sie sich bei AWS-Managementkonsole an und öffnen Sie die Kinesis-Konsole unter https://console.aws.amazon.com/kinesis.

  2. Wählen Sie den von Ihnen erstellten Stream aus

  3. Wählen Sie Actions, Delete.

  4. Geben Sie delete in das Texteingabefeld ein.

  5. Wählen Sie Delete (Löschen).