Integrieren von DynamoDB in Amazon Managed Streaming für Apache Kafka - Amazon DynamoDB

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Integrieren von DynamoDB in Amazon Managed Streaming für Apache Kafka

Amazon Managed Streaming für Apache Kafka (Amazon MSK) erleichtert das Erfassen und Verarbeiten von Streaming-Daten in Echtzeit mithilfe eines vollständig verwalteten, hochverfügbaren Apache Kafka-Service.

Apache Kafka ist ein verteilter Datenspeicher, der für die Aufnahme und Verarbeitung von Streaming-Daten in Echtzeit optimiert ist. Kafka kann Streams von Datensätzen verarbeiten, sie effektiv in der Reihenfolge speichern, in sie generiert wurden, und sie veröffentlichen und abonnieren.

Aufgrund dieser Funktionen wird Apache Kafka häufig zur Erstellung von Echtzeit-Streaming-Datenpipelines verwendet. Eine Datenpipeline verarbeitet und verschiebt Daten zuverlässig von einem System in ein anderes. Sie kann ein wichtiger Bestandteil einer speziell entwickelten Datenbankstrategie sein, da sie die Verwendung mehrerer Datenbanken erleichtert, die jeweils unterschiedliche Anwendungsfälle unterstützen.

Amazon DynamoDB wird in diesen Datenpipelines häufig zur Unterstützung von Anwendungen eingesetzt, die Schlüssel-Wert- oder Dokumentdatenmodelle verwenden und für die eine grenzenlose Skalierbarkeit mit konsistenter Leistung im einstelligen Millisekundenbereich gewünscht ist.

Funktionsweise

Bei der Integration von Amazon MSK in DynamoDB wird eine Lambda-Funktion verwendet, die Datensätze von Amazon MSK verarbeitet und in DynamoDB schreibt.

Darstellung einer Integration von Amazon MSK in DynamoDB mit Verwendung einer Lambda-Funktion, die Datensätze von Amazon MSK verarbeitet und in DynamoDB schreibt.

Lambda fragt intern neue Nachrichten von Amazon MSK ab und ruft dann synchron die Ziel-Lambda-Funktion auf. Die Ereignisnutzlast der Lambda-Funktion enthält Stapel von Nachrichten von Amazon MSK. Für die Integration von Amazon MSK in DynamoDB schreibt die Lambda-Funktion diese Nachrichten in DynamoDB.

Einrichten einer Integration von Amazon MSK in DynamoDB

Anmerkung

Sie können die in diesem Beispiel verwendeten Ressourcen im folgenden GitHub-Repository herunterladen.

Die folgenden Schritte zeigen, wie Sie eine Beispielintegration von Amazon MSK in Amazon DynamoDB einrichten. Das Beispiel enthält Daten, die von IoT-Geräten (Internet der Dinge) generiert und in Amazon MSK erfasst wurden. In Amazon MSK erfasste Daten können in Analysedienste oder Tools von Drittanbietern integriert werden, die mit Apache Kafka kompatibel sind, was verschiedene Analyseanwendungsfälle ermöglicht. DynamoDB-Integration ermöglicht zudem auch die Suche nach Schlüssel-Wert-Paaren einzelner Gerätedatensätze.

Dieses Beispiel zeigt, wie ein Python-Skript Daten eines IoT-Sensors in Amazon MSK schreibt. Anschließend werden durch eine Lambda-Funktion Elemente mit dem Partitionsschlüssel "deviceid" in DynamoDB geschrieben.

Durch die bereitgestellte CloudFormation-Vorlage werden die folgenden Ressourcen erstellt: ein Amazon-S3-Bucket, eine Amazon-VPC, ein Amazon-MSK-Cluster und eine AWS CloudShell zum Testen von Datenoperationen.

Um Testdaten zu generieren, erstellen Sie ein Amazon-MSK-Thema und anschließend eine DynamoDB-Tabelle. Sie können sich mit Session Manager aus der Managementkonsole am CloudShell-Betriebssystem an und Python-Skripts ausführen.

Nachdem Sie die CloudFormation-Vorlage ausgeführt haben, können Sie mit folgenden Vorgängen die Erstellung dieser Architektur abschließen.

  1. Führen Sie die CloudFormation-Vorlage S3bucket.yaml aus, um einen S3-Bucket zu erstellen. Führen Sie sie für alle nachfolgenden Skripts oder Operationen in derselben Region aus. Geben Sie als Namen des CloudFormation-Stacks „ForMSKTestS3“ ein.

    Abbildung mit dem Bildschirm des erstellten Stacks in der CloudFormation-Konsole.

    Wenn der Vorgang abgeschlossen ist, notieren Sie sich den unter Ausgaben angezeigten Namen des S3-Buckets. Sie benötigen den Namen in Schritt 3.

    CloudFormation stack outputs showing S3 bucket name for MSK and DynamoDB sample.
  2. Laden Sie die heruntergeladene ZIP-Datei fromMSK.zip in den soeben erstellten S3-Bucket hoch.

    Abbildung mit Informationen zum Hochladen von Dateien in der S3-Konsole.
  3. Führen Sie die CloudFormation-Vorlage VPC.yaml aus, um eine VPC, einen Amazon-MSK-Cluster und eine Lambda-Funktion zu erstellen. Geben Sie auf dem Bildschirm für die Parametereingabe auf Anforderung den Namen des S3-Buckets ein, den Sie in Schritt 1 erstellt haben. Geben Sie als Namen für den CloudFormation-Stacks ForMSKTestVPC an.

    Das Bild zeigt die Felder, die Sie bei der Angabe der Details zum CloudFormation-Stack ausfüllen müssen.
  4. Bereiten Sie die Umgebung für die Ausführung von Python-Skripten in CloudShell vor. Sie können CloudShell in der AWS-Managementkonsole verwenden. Weitere Informationen zur Verwendung von CloudShell finden Sie unter Erste Schritte mit AWS CloudShell. Erstellen Sie nach dem Start von CloudShell eine CloudShell, die zur soeben erstellten VPC gehört, und stellen Sie eine Verbindung zum Amazon MSK-Cluster her. Erstellen Sie die CloudShell in einem privaten Subnetz. Füllen Sie die folgenden Felder aus:

    1. Name: Ein beliebiger Name. Zum Beispiel: MSK-VPC.

    2. VPC: Wählen Sie MSKTest aus

    3. Subnetz: Wählen Sie MSKTest Private Subnet (AZ1) aus

    4. SecurityGroup: Wählen Sie ForMSKSecurityGroup aus

    CloudShell interface showing ap-southeast-1 environment and option to create VPC environment.
    Abbildung einer CloudShell-Umgebung mit den anzugebenden Feldern.

    Sobald die zum privaten Subnetz gehörende CloudShell gestartet wurde, führen Sie den folgenden Befehl aus:

    pip install boto3 kafka-python aws-msk-iam-sasl-signer-python
  5. Laden Sie Python-Skripte aus dem S3-Bucket herunter.

    aws s3 cp s3://[YOUR-BUCKET-NAME]/pythonScripts.zip ./ unzip pythonScripts.zip
  6. Überprüfen Sie die Management-Konsole und legen Sie die Umgebungsvariablen für die Broker-URL und den Regionswert in den Python-Skripten fest. Überprüfen Sie den Endpunkt des Amazon-MSK-Clusters in der Management-Konsole.

    Amazon MSKCluster summary showing active status, serverless type, and creation details.
    TODO.
  7. Legen Sie die Umgebungsvariablen in der CloudShell fest. Bei Verwendung der Region „USA West (Oregon)“:

    export AWS_REGION="us-west-2" export MSK_BROKER="boot-YOURMSKCLUSTER.c3.kafka-serverless.ap-southeast-1.amazonaws.com:9098"
  8. Führen Sie die folgenden Python-Skripte aus.

    Erstellen Sie ein Amazon-MSK-Thema.

    python ./createTopic.py

    Erstellen Sie eine DynamoDB-Tabelle.

    python ./createTable.py

    Schreiben Sie Testdaten in das Amazon MSK-Thema:

    python ./kafkaDataGen.py
  9. Überprüfen Sie die CloudWatch-Metriken für die erstellten Amazon-MSK-, -Lambda- und -DynamoDB-Ressourcen und überprüfen Sie die in der Tabelle device_status gespeicherten Daten im Daten-Explorer von DynamoDB, um sicherzustellen, dass alle Prozesse korrekt ausgeführt wurden. Wenn alle Prozesse fehlerfrei ausgeführt werden, können Sie überprüfen, ob die von CloudShell nach Amazon MSK geschriebenen Testdaten auch in DynamoDB vorhanden sind.

    Abbildung der DynamoDB-Konsole mit zurückgegebenen Elementen bei Durchführung eines Scans.
  10. Wenn Sie mit diesem Beispiel fertig sind, löschen Sie die in diesem Tutorial erstellten Ressourcen. Löschen Sie die beiden CloudFormation-Stacks: ForMSKTestS3 und ForMSKTestVPC. Wenn das Löschen des Stacks erfolgreich war, werden alle Ressourcen gelöscht.

Nächste Schritte

Anmerkung

Wenn Sie im Rahmen dieses Beispiels Ressourcen erstellt haben, denken Sie daran, diese wieder zu löschen, damit keine unerwarteten Gebühren anfallen.

Die Integration hat eine Architektur identifiziert, die Amazon MSK und DynamoDB miteinander verbindet, sodass Stream-Daten OLTP-Workloads unterstützen können. Von hier aus können mittels Verknüpfung von DynamoDB und dem OpenSearch-Service komplexere Suchvorgänge durchgeführt werden. Für komplexere Anforderungen an die Ereignissteuerung sollten Sie die Integration in EventBridge und für höhere Durchsatz- und geringere Latenzanforderungen Erweiterungen wie Amazon Managed Service für Apache Flink in Erwägung ziehen.