Tutorial: Verwenden einer Amazon-MSK-Zuordnung von Ereignisquellen zum Aufrufen einer Lambda-Funktion - AWS Lambda

Tutorial: Verwenden einer Amazon-MSK-Zuordnung von Ereignisquellen zum Aufrufen einer Lambda-Funktion

In diesem Tutorial führen Sie Folgendes durch:

  • Erstellen Sie eine Lambda-Funktion in demselben AWS-Konto wie ein vorhandener Amazon MSK-Cluster.

  • Konfigurieren Sie Netzwerk und Authentifizierung für Lambda für die Kommunikation mit Amazon MSK.

  • Richten Sie eine Lambda Amazon MSK-Zuordnung von Ereignisquellen ein, die Ihre Lambda-Funktion ausführt, wenn Ereignisse im Thema auftauchen.

Nachdem Sie diese Schritte durchgeführt haben, können Sie, wenn Ereignisse an Amazon MSK gesendet werden, eine Lambda-Funktion einrichten, um diese Ereignisse automatisch mit Ihrem eigenen benutzerdefinierten Lambda-Code zu verarbeiten.

Was können Sie mit diesem Feature machen?

Beispiellösung: Verwenden Sie eine MSK-Zuordnung von Ereignisquellen, um Ihren Kunden Live-Ergebnisse zu liefern.

Stellen Sie sich das folgende Szenario vor: Ihr Unternehmen hostet eine Webanwendung, mit der Ihre Kunden Informationen über Live-Ereignisse, z. B. Sportspiele, abrufen können. Aktuelle Informationen aus dem Spiel werden Ihrem Team über ein Kafka-Thema auf Amazon MSK zur Verfügung gestellt. Sie möchten eine Lösung entwerfen, die Aktualisierungen aus dem MSK-Thema abruft, um den Kunden in einer von Ihnen entwickelten Anwendung eine aktualisierte Ansicht des Live-Ereignisses zu bieten. Sie haben sich für den folgenden Designansatz entschieden: Ihre Client-Anwendungen werden mit einem Serverless-Backend kommunizieren, das in AWS gehostet wird. Die Clients verbinden sich über Websocket-Sitzungen unter Verwendung der Amazon API Gateway WebSocket-API.

Bei dieser Lösung benötigen Sie eine Komponente, die MSK-Ereignisse liest, eine benutzerdefinierte Logik ausführt, um diese Ereignisse für die Anwendungsschicht vorzubereiten und diese Informationen dann an die API-Gateway-API weiterleitet. Sie können diese Komponente mit AWS Lambda implementieren, indem Sie Ihre benutzerdefinierte Logik in einer Lambda-Funktion bereitstellen und diese dann mit einer AWS Lambda-Amazon-MSK-Zuordnung von Ereignisquellen aufrufen.

Weitere Informationen zur Implementierung von Lösungen mit der Amazon API Gateway WebSocket-API finden Sie in den WebSocket API-Tutorials in der API Gateway-Dokumentation.

Voraussetzungen

Ein AWS-Konto mit den folgenden vorkonfigurierten Ressourcen:

Um diese Voraussetzungen zu erfüllen, empfehlen wir den Abschnitt Erste Schritte mit Amazon MSK in der Amazon MSK-Dokumentation.

  • Ein Amazon-MSK-Cluster. Siehe Erstellen eines Amazon MSK-Clusters in Erste Schritte mit Amazon MSK.

  • Die folgende Konfiguration:

    • Vergewissern Sie sich, dass die rollenbasierte IAM-Authentifizierung in den Sicherheitseinstellungen Ihres Clusters aktiviert ist. Dies verbessert Ihre Sicherheit, da Ihre Lambda-Funktion nur auf die benötigten Amazon MSK-Ressourcen zugreifen kann. Dies ist bei neuen Amazon MSK-Clustern standardmäßig aktiviert.

    • Vergewissern Sie sich, dass der öffentliche Zugang in den Netzwerkeinstellungen Ihres Clusters deaktiviert ist. Wenn Sie den Zugang Ihres Amazon MSK Clusters zum Internet einschränken, erhöht sich Ihre Sicherheit, da die Anzahl der Vermittler, die Ihre Daten verarbeiten, begrenzt wird. Dies ist bei neuen Amazon MSK-Clustern standardmäßig aktiviert.

  • Ein Kafka-Thema in Ihrem Amazon MSK-Cluster, das Sie für diese Lösung verwenden können. Weitere Informationen finden Sie unter Erstellen eines Themas unter Erste Schritte mit Amazon MSK.

  • Ein Kafka-Admin-Host, der so eingerichtet ist, dass er Informationen von Ihrem Kafka-Cluster abruft und Kafka-Ereignisse zu Testzwecken an Ihr Thema sendet, z. B. eine Amazon EC2-Instance mit der Kafka-Admin-CLI und der installierten Amazon-MSK-IAM-Bibliothek. Siehe Erstellen eines Client-Rechners in Erste Schritte mit Amazon MSK.

Sobald Sie diese Ressourcen eingerichtet haben, holen Sie die folgenden Informationen von Ihrem AWS-Konto ein, um zu bestätigen, dass Sie bereit sind, fortzufahren.

  • Der Name Ihres Amazon MSK-Clusters. Sie finden diese Informationen in der Amazon-MSK-Konsole.

  • Die Cluster-UUID, Teil des ARN für Ihren Amazon MSK-Cluster, den Sie in der Amazon MSK-Konsole finden können. Folgen Sie den Verfahren unter Cluster auflisten in der Amazon MSK-Dokumentation, um diese Informationen zu finden.

  • Die mit Ihrem Amazon MSK-Cluster verbundenen Sicherheitsgruppen. Sie finden diese Informationen in der Amazon-MSK-Konsole. In den folgenden Schritten bezeichnen Sie diese als Ihre ClusterSecurityGroups.

  • Die ID der Amazon VPC, die Ihren Amazon MSK-Cluster enthält. Sie können diese Informationen finden, indem Sie in der Amazon MSK-Konsole die mit Ihrem Amazon MSK-Cluster verbundenen Subnetze identifizieren und dann in der Amazon VPC-Konsole die mit dem Subnetz verbundene Amazon VPC identifizieren.

  • Der Name des in Ihrer Lösung verwendeten Kafka-Themas. Sie können diese Informationen finden, indem Sie Ihren Amazon MSK-Cluster mit der Kafka topics CLI von Ihrem Kafka-Administrationshost aus aufrufen. Weitere Informationen zur Themen-CLI finden Sie in der Kafka-Dokumentation unter Themen hinzufügen und entfernen.

  • Der Name einer Verbrauchergruppe für Ihr Kafka-Thema, geeignet für die Verwendung durch Ihre Lambda-Funktion. Diese Gruppe kann automatisch von Lambda erstellt werden, Sie müssen sie also nicht mit der Kafka-CLI erstellen. Wenn Sie Ihre Verbrauchergruppen verwalten müssen, finden Sie weitere Informationen über die Verbrauchergruppen-CLI unter Verwalten von Verbrauchergruppen in der Kafka-Dokumentation.

Die folgenden Berechtigungen in Ihrem AWS-Konto:

  • Berechtigung zur Erstellung und Verwaltung einer Lambda-Funktion.

  • Erlaubnis, IAM-Richtlinien zu erstellen und sie mit Ihrer Lambda-Funktion zu verknüpfen.

  • Berechtigung zum Erstellen von Amazon VPC-Endpunkten und zum Ändern der Netzwerkkonfiguration in der Amazon VPC, die Ihren Amazon MSK-Cluster hostet.

Wenn Sie die AWS Command Line Interface noch nicht installiert haben, folgen Sie den Schritten unter Installieren oder Aktualisieren der neuesten Version von AWS CLI, um diese zu installieren.

Das Tutorial erfordert zum Ausführen von Befehlen ein Befehlszeilenterminal oder eine Shell. Verwenden Sie unter Linux und macOS Ihre bevorzugte Shell und Ihren bevorzugten Paketmanager.

Anmerkung

In Windows werden einige Bash-CLI-Befehle, die Sie häufig mit Lambda verwenden (z. B. zip), von den integrierten Terminals des Betriebssystems nicht unterstützt. Um eine in Windows integrierte Version von Ubuntu und Bash zu erhalten, installieren Sie das Windows-Subsystem für Linux.

Konfigurieren Sie die Netzwerkkonnektivität für Lambda zur Kommunikation mit Amazon MSK

Verwenden Sie AWS PrivateLink, um Lambda und Amazon MSK zu verbinden. Sie können dies tun, indem Sie Amazon VPC-Endpunkte in der Amazon VPC-Konsole erstellen. Weitere Informationen zur Netzwerkkonfiguration finden Sie unter Konfiguration Ihres Amazon-MSK-Clusters und Ihres Amazon-VPC-Netzwerks für Lambda.

Wenn eine Amazon MSK-Zuordnung von Ereignisquellen im Namen einer Lambda-Funktion ausgeführt wird, übernimmt es die Ausführungsrolle der Lambda-Funktion. Diese IAM-Rolle autorisiert die Zuordnung für den Zugriff auf durch IAM gesicherte Ressourcen, wie z. B. Ihren Amazon MSK-Cluster. Obwohl die Komponenten eine gemeinsame Ausführungsrolle haben, haben das Amazon MSK-Mapping und Ihre Lambda-Funktion separate Konnektivitätsanforderungen für ihre jeweiligen Aufgaben, wie im folgenden Diagramm dargestellt.

Eine Lambda-Funktion fragt einen Cluster ab und kommuniziert mit Lambda über AWS STS.

Ihre Zuordnung von Ereignisquellen gehört zu Ihrer Amazon MSK-Cluster-Sicherheitsgruppe. In diesem Vernetzungsschritt erstellen Sie Amazon VPC-Endpunkte von Ihrer Amazon MSK-Cluster-VPC, um die Zuordnung von Ereignisquellen mit den Lambda- und STS-Services zu verbinden. Sichern Sie diese Endpunkte, damit sie Datenverkehr von Ihrer Amazon MSK-Cluster-Sicherheitsgruppe akzeptieren. Passen Sie dann die Sicherheitsgruppen des Amazon MSK-Clusters an, damit die Zuordnung von Ereignisquellen mit dem Amazon MSK-Cluster kommunizieren kann.

Sie können die folgenden Schritte mit dem AWS-Managementkonsole.

So konfigurieren Sie Schnittstellen-Amazon-VPC-Endpunkte, um Lambda und Amazon MSK zu verbinden
  1. Erstellen Sie eine Sicherheitsgruppe für Ihre Schnittstellen-Amazon-VPC-Endpunkte, endpointSecurityGroup, die eingehenden TCP-Verkehr auf 443 von clusterSecurityGroups erlaubt. Befolgen Sie das Verfahren unter Erstellen einer Sicherheitsgruppe in der Amazon EC2-Dokumentation, um eine Sicherheitsgruppe zu erstellen. Folgen Sie dann dem Verfahren unter Hinzufügen von Regeln zu einer Sicherheitsgruppe in der Amazon EC2-Dokumentation, um entsprechende Regeln hinzuzufügen.

    Erstellen Sie eine Sicherheitsgruppe mit den folgenden Informationen:

    Wenn Sie Ihre Eingangsregeln hinzufügen, erstellen Sie eine Regel für jede Sicherheitsgruppe in clusterSecurityGroups. Für jede Regel:

    • Wählen Sie für Type (Typ) HTTPS aus.

    • Wählen Sie als Quelle eine der clusterSecurityGroups aus.

  2. Erstellen Sie einen Endpunkt, der den Lambda-Service mit der Amazon VPC verbindet, die Ihren Amazon MSK-Cluster enthält. Folgen Sie dem Verfahren unter Schnittstellenendpunkt erstellen.

    Erstellen Sie einen Schnittstellenendpunkt mit den folgenden Informationen:

    • Wählen Sie com.amazonaws.regionName.lambda als Dienstname, wobei regionName Ihre Lambda-Funktion hostet.

    • Wählen Sie für VPC die Amazon VPC aus, die Ihren Amazon MSK-Cluster enthält.

    • Wählen Sie unter Sicherheitsgruppen die zuvor erstellte Gruppe endpointSecurityGroup aus.

    • Wählen Sie für Subnetze die Subnetze aus, die Ihren Amazon MSK-Cluster hosten.

    • Geben Sie für die Richtlinie das folgende Richtliniendokument an, das den Endpunkt für die Verwendung durch den Lambda-Serviceprinzipal für die lambda:InvokeFunction-Aktion sichert.

      { "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • Stellen Sie sicher, dass DNS-Name aktivieren weiterhin aktiviert ist.

  3. Erstellen Sie einen Endpunkt, der den AWS STS-Service mit der Amazon VPC verbindet, die Ihren Amazon MSK-Cluster enthält. Folgen Sie dem Verfahren unter Schnittstellenendpunkt erstellen.

    Erstellen Sie einen Schnittstellenendpunkt mit den folgenden Informationen:

    • Wählen Sie als Dienstname die Option AWS STS aus.

    • Wählen Sie für VPC die Amazon VPC aus, die Ihren Amazon MSK-Cluster enthält.

    • Wählen Sie für Sicherheitsgruppen endpointSecurityGroup aus.

    • Wählen Sie für Subnetze die Subnetze aus, die Ihren Amazon MSK-Cluster hosten.

    • Geben Sie für die Richtlinie das folgende Richtliniendokument an, das den Endpunkt für die Verwendung durch den Lambda-Serviceprinzipal für die sts:AssumeRole-Aktion sichert.

      { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • Stellen Sie sicher, dass DNS-Name aktivieren weiterhin aktiviert ist.

  4. Für jede Sicherheitsgruppe, die mit Ihrem Amazon MSK-Cluster verbunden ist, d. h. in clusterSecurityGroups, erlauben Sie Folgendes:

    • Erlaubt allen ein- und ausgehenden TCP-Verkehr auf 9098 zu allen clusterSecurityGroups, auch innerhalb der Gruppe selbst.

    • Den gesamten ausgehenden TCP-Verkehr auf 443 zulassen.

    Ein Teil dieses Datenverkehrs wird durch die Standardregeln der Sicherheitsgruppe zugelassen. Wenn Ihr Cluster also nur einer einzigen Sicherheitsgruppe angehört und diese Gruppe über Standardregeln verfügt, sind zusätzliche Regeln nicht erforderlich. Um die Regeln für Sicherheitsgruppen anzupassen, befolgen Sie die Verfahren unter Hinzufügen von Regeln zu einer Sicherheitsgruppe in der Amazon EC2-Dokumentation.

    Fügen Sie Ihren Sicherheitsgruppen Regeln mit den folgenden Informationen hinzu:

    • Geben Sie für jede eingehende oder ausgehende Regel für Port 9098 Folgendes an

      • Wählen Sie für Type (Typ) Custom TCP (Benutzerdefiniertes TCP).

      • Geben Sie als Portbereich 9098 an.

      • Stellen Sie als Quelle eine der clusterSecurityGroups bereit.

    • Wählen Sie für jede eingehende Regel für Port 443 als Typ die Option HTTPS aus.

Erstellen Sie eine IAM-Rolle, die Lambda aus Ihrem Amazon-MSK-Thema lesen kann

Identifizieren Sie die Authentifizierungsanforderungen für Lambda, um aus Ihrem Amazon MSK-Thema zu lesen und definieren Sie sie dann in einer Richtlinie. Erstellen Sie eine Rolle, lambdaAuthRole, die Lambda zur Verwendung dieser Berechtigungen ermächtigt. Autorisieren Sie Aktionen in Ihrem Amazon MSK-Cluster mithilfe von kafka-cluster IAM-Aktionen. Autorisieren Sie dann Lambda für die Durchführung von Amazon MSK kafka- und Amazon EC2-Aktionen, die für die Erkennung und Verbindung mit Ihrem Amazon MSK-Cluster erforderlich sind, sowie für CloudWatch-Aktionen, damit Lambda protokollieren kann, was es getan hat.

So beschreiben Sie die Authentifizierungsanforderungen für Lambda, um von Amazon MSK zu lesen
  1. Schreiben Sie ein IAM-Richtliniendokument (ein JSON-Dokument), clusterAuthPolicy, das es Lambda ermöglicht, mithilfe Ihrer Kafka-Verbrauchergruppe aus Ihrem Kafka-Thema in Ihrem Amazon MSK-Cluster zu lesen. Für Lambda muss beim Lesen eine Kafka-Verbrauchergruppe festgelegt werden.

    Ändern Sie die folgende Vorlage, um sie an Ihre Voraussetzungen anzupassen:

    JSON
    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:us-east-1:111122223333:cluster/mskClusterName/cluster-uuid", "arn:aws:kafka:us-east-1:111122223333:topic/mskClusterName/cluster-uuid/mskTopicName", "arn:aws:kafka:us-east-1:111122223333:group/mskClusterName/cluster-uuid/mskGroupName" ] } ] }

    Weitere Informationen erhalten Sie von Konfigurieren von Rollenberechtigungen für Lambda-Ausführungen. Wenn Sie Ihre Richtlinie schreiben:

    • Ersetzen Sie us-east-1 und 111122223333 durch die AWS-Region und das AWS-Konto Ihres Amazon-MSK-Clusters.

    • Geben Sie für mskClusterName den Namen Ihres Amazon MSK-Clusters ein.

    • Geben Sie für cluster-uuid die UUID im ARN für Ihren Amazon MSK-Cluster an.

    • Geben Sie für mskTopicName den Namen Ihres Kafka-Themas ein.

    • Geben Sie für mskGroupName den Namen Ihrer Kafka-Verbrauchergruppe an.

  2. Identifizieren Sie die Amazon MSK-, Amazon EC2- und CloudWatch-Berechtigungen, die für Lambda erforderlich sind, um Ihren Amazon MSK-Cluster zu erkennen und zu verbinden und protokollieren Sie diese Ereignisse.

    Die AWSLambdaMSKExecutionRole-verwaltete Richtlinie definiert die erforderlichen Berechtigungen auf unzulässige Weise. Verwenden Sie es in den folgenden Schritten.

    Beurteilen Sie in einer Produktionsumgebung AWSLambdaMSKExecutionRole, um Ihre Ausführungsrollenrichtlinie auf der Grundlage des Prinzips der geringsten Berechtigung einzuschränken und schreiben Sie dann eine Richtlinie für Ihre Rolle, die diese verwaltete Richtlinie ersetzt.

Weitere Informationen zur IAM-Richtliniensprache finden Sie unter IAM-Dokumentation.

Nachdem Sie nun Ihr Richtliniendokument verfasst haben, erstellen Sie eine IAM-Richtlinie, die Sie mit Ihrer Rolle verknüpfen können. Sie können dies über die Konsole mit dem folgenden Verfahren tun.

So erstellen Sie eine IAM-Richtlinie aus Ihrem Richtliniendokument
  1. Melden Sie sich bei der AWS-Managementkonsole an, und öffnen Sie die IAM-Konsole unter https://console.aws.amazon.com/iam/.

  2. Wählen Sie im Navigationsbereich auf der linken Seite Policies (Richtlinien).

  3. Wählen Sie Richtlinie erstellen aus.

  4. Wählen Sie im Bereich Policy editor (Richtlinien-Editor) die Option JSON aus.

  5. Fügen Sie clusterAuthPolicy ein.

  6. Wenn Sie mit dem Hinzufügen von Berechtigungen zur Richtlinie fertig sind, wählen Sie Next (Weiter) aus.

  7. Geben Sie auf der Seite Review and create (Überprüfen und erstellen) unter Name einen Namen und unter Description (Beschreibung) (optional) eine Beschreibung für die Richtlinie ein, die Sie erstellen. Überprüfen Sie Permissions defined in this policy (In dieser Richtlinie definierte Berechtigungen), um die Berechtigungen einzusehen, die von Ihrer Richtlinie gewährt werden.

  8. Wählen Sie Create policy (Richtlinie erstellen) aus, um Ihre neue Richtlinie zu speichern.

Weitere Informationen finden Sie unter Erstellen von IAM-Richtlinien in der IAM-Dokumentation.

Nachdem Sie nun über die entsprechenden IAM-Richtlinien verfügen, erstellen Sie eine Rolle und ordnen Sie sie dieser zu. Sie können dies über die Konsole mit dem folgenden Verfahren tun.

So erstellen Sie eine Ausführungsrolle in der IAM-Konsole
  1. Öffnen Sie die Seite Roles (Rollen) in der IAM-Konsole.

  2. Wählen Sie Create role aus.

  3. Wählen Sie unter Typ der vertrauenswürdigen Entität die Option AWS-Service aus.

  4. Wählen Sie unter Anwendungsfall Lambda aus.

  5. Wählen Sie Weiter aus.

  6. Wählen Sie die folgenden Richtlinien:

    • clusterAuthPolicy

    • AWSLambdaMSKExecutionRole

  7. Wählen Sie Weiter aus.

  8. Geben Sie als Rollenname lambdaAuthRole ein und wählen Sie dann Rolle erstellen.

Weitere Informationen finden Sie unter Definieren von Lambda-Funktionsberechtigungen mit einer Ausführungsrolle.

Erstellen Sie eine Lambda-Funktion zum Lesen aus Ihrem Amazon MSK-Thema

Erstellen Sie eine Lambda-Funktion, die für die Verwendung Ihrer IAM-Rolle konfiguriert ist. Sie können Ihre Lambda-Funktion über die Konsole erstellen.

So erstellen Sie eine Lambda-Funktion mit Ihrer Authentifizierungskonfiguration
  1. Öffnen Sie die Lambda-Konsole und wählen Sie im Header die Option Funktion erstellen aus.

  2. Wählen Sie Verfassen von Grund auf aus.

  3. Geben Sie als Funktionsnamen einen geeigneten Namen Ihrer Wahl an.

  4. Wählen Sie für Laufzeit die neueste unterstützte Version von Node.js, um den in diesem Tutorial bereitgestellten Code zu verwenden.

  5. Wählen Sie Standardeausführungsrolle ändern aus.

  6. Wählen Sie Eine vorhandene Rolle verwenden aus.

  7. Wählen Sie für Vorhandene Rolle die Option lambdaAuthRole.

In einer Produktionsumgebung müssen Sie der Ausführungsrolle für Ihre Lambda-Funktion normalerweise weitere Richtlinien hinzufügen, um Ihre Amazon MSK-Ereignisse sinnvoll verarbeiten zu können. Weitere Informationen zum Hinzufügen von Richtlinien zu Ihrer Rolle finden Sie unter Hinzufügen oder Entfernen von Identitätsberechtigungen in der IAM-Dokumentation.

Erstellen einer Zuordnung der Ereignisquelle zu Ihrer Lambda-Funktion

Ihre Amazon MSK-Zuordnung von Ereignisquellen liefert dem Lambda-Service die notwendigen Informationen, um Ihr Lambda aufzurufen, wenn entsprechende Amazon MSK-Ereignisse auftreten. Sie können über die Konsole eine Amazon MSK-Zuordnung erstellen. Erstellen Sie einen Lambda-Trigger, dann wird die Zuordnung von Ereignisquellen automatisch eingerichtet.

So erstellen Sie einen Lambda-Trigger (und eine Zuordnung von Ereignisquellen)
  1. Navigieren Sie zur Übersichtsseite Ihrer Lambda-Funktion.

  2. Wählen Sie im Abschnitt Funktionsübersicht unten links Auslöser hinzufügen.

  3. Wählen Sie in der Dropdownliste Quelle auswählen die Option Amazon MSK aus.

  4. Legen Sie keine Authentifizierung fest.

  5. Wählen Sie für MSK-Cluster den Namen Ihres Clusters aus.

  6. Geben Sie für Batch size (Stapelgröße) 1 ein. Dieser Schritt erleichtert das Testen dieses Features, ist aber für die Produktion nicht ideal.

  7. Geben Sie für Themenname den Namen Ihres Kafka-Themas ein.

  8. Geben Sie unter Verbrauchergruppen-ID die ID Ihrer Kafka-Verbrauchergruppe an.

Aktualisieren Sie Ihre Lambda-Funktion, um Ihre Streaming-Daten zu lesen

Lambda liefert Informationen über Kafka-Ereignisse über den Parameter der Ereignismethode. Ein Beispiel für die Struktur eines Amazon MSK-Ereignisses finden Sie unter Beispielereignis. Nachdem Sie verstanden haben, wie die von Lambda weitergeleiteten Amazon MSK-Ereignisse zu interpretieren sind, können Sie Ihren Lambda-Funktionscode so ändern, dass er die von ihnen bereitgestellten Informationen nutzt.

Fügen Sie den folgenden Code in Ihre Lambda-Funktion ein, um den Inhalt eines Lambda Amazon MSK-Ereignisses zu Testzwecken zu protokollieren:

.NET
SDK für .NET
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Amazon MSK-Ereignisses mit Lambda unter Verwendung von .NET.

using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KafkaEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace MSKLambda; public class Function { /// <param name="input">The event for the Lambda function handler to process.</param> /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param> /// <returns></returns> public void FunctionHandler(KafkaEvent evnt, ILambdaContext context) { foreach (var record in evnt.Records) { Console.WriteLine("Key:" + record.Key); foreach (var eventRecord in record.Value) { var valueBytes = eventRecord.Value.ToArray(); var valueText = Encoding.UTF8.GetString(valueBytes); Console.WriteLine("Message:" + valueText); } } } }
Go
SDK für Go V2
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Amazon MSK-Ereignisses mit Lambda unter Verwendung von Go.

package main import ( "encoding/base64" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(event events.KafkaEvent) { for key, records := range event.Records { fmt.Println("Key:", key) for _, record := range records { fmt.Println("Record:", record) decodedValue, _ := base64.StdEncoding.DecodeString(record.Value) message := string(decodedValue) fmt.Println("Message:", message) } } } func main() { lambda.Start(handler) }
Java
SDK für Java 2.x
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Amazon MSK-Ereignisses mit Lambda unter Verwendung von Java.

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KafkaEvent; import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; import java.util.Base64; import java.util.Map; public class Example implements RequestHandler<KafkaEvent, Void> { @Override public Void handleRequest(KafkaEvent event, Context context) { for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) { String key = entry.getKey(); System.out.println("Key: " + key); for (KafkaEventRecord record : entry.getValue()) { System.out.println("Record: " + record); byte[] value = Base64.getDecoder().decode(record.getValue()); String message = new String(value); System.out.println("Message: " + message); } } return null; } }
JavaScript
SDK für JavaScript (v3)
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Amazon MSK-Ereignisses mit Lambda unter Verwendung von JavaScript.

exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }

Nutzen eines Amazon-MSK-Ereignisses mit Lambda unter Verwendung von TypeScript.

import { MSKEvent, Context } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "msk-handler-sample", }); export const handler = async ( event: MSKEvent, context: Context ): Promise<void> => { for (const [topic, topicRecords] of Object.entries(event.records)) { logger.info(`Processing key: ${topic}`); // Process each record in the partition for (const record of topicRecords) { try { // Decode the message value from base64 const decodedMessage = Buffer.from(record.value, 'base64').toString(); logger.info({ message: decodedMessage }); } catch (error) { logger.error('Error processing event', { error }); throw error; } }; } }
PHP
SDK für PHP
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Amazon MSK-Ereignisses mit Lambda unter Verwendung von PHP.

<?php // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 // using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kafka\KafkaEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): void { $kafkaEvent = new KafkaEvent($event); $this->logger->info("Processing records"); $records = $kafkaEvent->getRecords(); foreach ($records as $record) { try { $key = $record->getKey(); $this->logger->info("Key: $key"); $values = $record->getValue(); $this->logger->info(json_encode($values)); foreach ($values as $value) { $this->logger->info("Value: $value"); } } catch (Exception $e) { $this->logger->error($e->getMessage()); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK für Python (Boto3)
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Amazon MSK-Ereignisses mit Lambda unter Verwendung von Python.

import base64 def lambda_handler(event, context): # Iterate through keys for key in event['records']: print('Key:', key) # Iterate through records for record in event['records'][key]: print('Record:', record) # Decode base64 msg = base64.b64decode(record['value']).decode('utf-8') print('Message:', msg)
Ruby
SDK für Ruby
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Amazon MSK-Ereignisses mit Lambda unter Verwendung von Ruby.

require 'base64' def lambda_handler(event:, context:) # Iterate through keys event['records'].each do |key, records| puts "Key: #{key}" # Iterate through records records.each do |record| puts "Record: #{record}" # Decode base64 msg = Base64.decode64(record['value']) puts "Message: #{msg}" end end end
Rust
SDK für Rust
Anmerkung

Auf GitHub finden Sie noch mehr. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Nutzen eines Amazon-MSK-Ereignisses mit Lambda unter Verwendung von Rust.

use aws_lambda_events::event::kafka::KafkaEvent; use lambda_runtime::{run, service_fn, tracing, Error, LambdaEvent}; use base64::prelude::*; use serde_json::{Value}; use tracing::{info}; /// Pre-Requisites: /// 1. Install Cargo Lambda - see https://www.cargo-lambda.info/guide/getting-started.html /// 2. Add packages tracing, tracing-subscriber, serde_json, base64 /// /// This is the main body for the function. /// Write your code inside it. /// There are some code example in the following URLs: /// - https://github.com/awslabs/aws-lambda-rust-runtime/tree/main/examples /// - https://github.com/aws-samples/serverless-rust-demo/ async fn function_handler(event: LambdaEvent<KafkaEvent>) -> Result<Value, Error> { let payload = event.payload.records; for (_name, records) in payload.iter() { for record in records { let record_text = record.value.as_ref().ok_or("Value is None")?; info!("Record: {}", &record_text); // perform Base64 decoding let record_bytes = BASE64_STANDARD.decode(record_text)?; let message = std::str::from_utf8(&record_bytes)?; info!("Message: {}", message); } } Ok(().into()) } #[tokio::main] async fn main() -> Result<(), Error> { // required to enable CloudWatch error logging by the runtime tracing::init_default_subscriber(); info!("Setup CW subscriber!"); run(service_fn(function_handler)).await }

Sie können Ihrem Lambda Funktionscode über die Konsole zur Verfügung stellen.

So aktualiseren Sie Ihren Funktionscode mit dem Code-Editor
  1. Öffnen Sie die Funktionsseite der Lambda-Konsole und wählen Sie Ihre Funktion aus.

  2. Wählen Sie die Registerkarte Code aus.

  3. Wählen Sie im Bereich Codequelle Ihre Quellcodedatei aus und bearbeiten Sie sie im integrierten Code-Editor.

  4. Wählen Sie im Abschnitt BEREITSTELLEN die Option Bereitstellen aus, um den Code Ihrer Funktion zu aktualisieren:

    Schaltfläche „Bereitstellen“ im Code-Editor der Lambda-Konsole

Testen Sie Ihre Lambda-Funktion, um zu überprüfen, ob sie mit Ihrem Amazon MSK-Thema verbunden ist

Sie können nun überprüfen, ob Ihr Lambda von der Ereignisquelle aufgerufen wird, indem Sie die CloudWatch-Ereignisprotokolle untersuchen.

So überprüfen Sie, ob Ihre Lambda-Funktion aufgerufen wird
  1. Verwenden Sie Ihren Kafka-Admin-Host, um Kafka-Ereignisse mithilfe der kafka-console-producer-CLI zu generieren. Weitere Informationen finden Sie in der Kafka-Dokumentation unter Einige Ereignisse in das Thema schreiben. Senden Sie so viele Ereignisse, dass der durch die Stapelgröße definierte Stapel für die im vorherigen Schritt definierte Zuordnung von Ereignisquellen gefüllt wird, oder Lambda wartet auf weitere Informationen, um den Aufruf zu starten.

  2. Wenn Ihre Funktion ausgeführt wird, schreibt Lambda an CloudWatch, was passiert ist. Navigieren Sie in der Konsole zur Detailseite Ihrer Lambda-Funktion.

  3. Wählen Sie die Registerkarte Configuration aus.

  4. Wählen Sie in der Seitenleiste die Option Überwachungs- und Betriebstools.

  5. Identifizieren Sie die CloudWatch-Protokollgruppe unter Protokollierungskonfiguration. Die Protokollgruppe sollte mit /aws/lambda beginnen. Wählen Sie den Link zur Protokollgruppe.

  6. Suchen Sie in der CloudWatch-Konsole in den Protokollereignissen nach den Protokollereignissen, die Lambda an den Protokollstream gesendet hat. Stellen Sie fest, ob es Protokollereignisse gibt, die die Nachricht aus Ihrem Kafka-Ereignis enthalten, wie in der folgenden Abbildung dargestellt. Falls ja, haben Sie erfolgreich eine Lambda-Funktion mit einer Lambda-Zuordnung von Ereignisquellen mit Amazon MSK verbunden.

    Ein Protokollereignis in CloudWatch, das Ereignisinformationen anzeigt, die durch den bereitgestellten Code extrahiert wurden.