Intégration de DynamoDB à Amazon Managed Streaming for Apache Kafka - Amazon DynamoDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Intégration de DynamoDB à Amazon Managed Streaming for Apache Kafka

Amazon Managed Streaming for Apache Kafka (Amazon MSK) facilite l’ingestion et le traitement des données de streaming en temps réel grâce à un service Apache Kafka entièrement géré.

Apache Kafka est un magasin de données distribué optimisé pour l’ingestion et le traitement de données de streaming en temps réel. Kafka peut traiter des flux d’enregistrements, stocker efficacement des flux d’enregistrements dans l’ordre dans lequel les enregistrements ont été générés, et publier des flux d’enregistrements et s’abonner à des flux d’enregistrements.

Grâce à ces fonctionnalités, Apache Kafka est souvent utilisé pour créer des pipelines de données de streaming en temps réel. Un pipeline de données traite et déplace les données de manière fiable d’un système à un autre et peut jouer un rôle important dans l’adoption d’une stratégie de base de données sur mesure en facilitant l’utilisation de plusieurs bases de données qui prennent chacune en charge différents cas d’utilisation.

Amazon DynamoDB est une cible courante dans ces pipelines de données pour prendre en charge les applications qui utilisent des modèles de données clé-valeur ou des modèles de données de document et qui recherchent une capacité de mise à l’échelle illimitée avec des performances constantes à une milliseconde à un chiffre.

Fonctionnement

Une intégration entre Amazon MSK et DynamoDB utilise une fonction Lambda pour utiliser les enregistrements d’Amazon MSK et les écrire dans DynamoDB.

Une intégration entre Amazon MSK et DynamoDB utilise une fonction Lambda pour utiliser les enregistrements d’Amazon MSK et les écrire dans DynamoDB.

Lambda interroge en interne les nouveaux messages de la source d’événement, puis invoque de manière synchrone la fonction Lambda cible. Les données utiles des événements de la fonction Lambda contient des lots de messages provenant d’Amazon MSK. Pour l’intégration entre Amazon MSK et DynamoDB, la fonction Lambda écrit ces messages dans DynamoDB.

Configuration d’une intégration entre Amazon MSK et DynamoDB

Note

Vous pouvez télécharger les ressources utilisées dans cet exemple dans le référentiel GitHub suivant.

Les étapes ci-dessous montrent comment configurer un exemple d’intégration entre Amazon MSK et Amazon DynamoDB. L’exemple représente des données générées par des appareils IoT (Internet des objets) et ingérées dans Amazon MSK. Lorsque les données sont ingérées dans Amazon MSK, elles peuvent être intégrées à des services analytiques ou à des outils tiers compatibles avec Apache Kafka, ce qui permet divers cas d’utilisation de l’analytique. L’intégration de DynamoDB permet également de rechercher des valeurs clés dans des enregistrements de d’appareils individuels.

Cet exemple montre comment un script Python écrit les données des capteurs IoT sur Amazon MSK. Ensuite, une fonction Lambda écrit des éléments avec la clé de partition « deviceid » dans DynamoDB.

Le modèle CloudFormation fourni créera les ressources suivantes : un compartiment Amazon S3, un VPC Amazon, un cluster Amazon MSK et un AWS CloudShell pour tester les opérations de données.

Pour générer des données de test, créez une rubrique Amazon MSK, puis créez une table DynamoDB. Vous pouvez utiliser le gestionnaire de session depuis la console de gestion pour vous connecter au système d’exploitation de CloudShell et exécuter des scripts Python.

Après avoir exécuté le modèle CloudFormation, vous pouvez terminer la création de cette architecture en effectuant les opérations suivantes.

  1. Exécutez le modèle CloudFormation S3bucket.yaml pour créer un compartiment S3. Pour tous les scripts ou opérations suivants, exécutez-les dans la même région. Entrez ForMSKTestS3 comme nom de la pile CloudFormation.

    Image montrant l’écran de création de la pile de la console CloudFormation.

    Une fois cette opération terminée, notez le nom du compartiment S3 affiché sous Sorties. Vous aurez besoin du nom à l’étape 3.

    CloudFormation stack outputs showing S3 bucket name for MSK and DynamoDB sample.
  2. Chargez le fichier ZIP fromMSK.zip téléchargé dans le compartiment S3 que vous venez de créer.

    Image montrant où vous pouvez charger des fichiers dans la console S3.
  3. Exécutez le modèle CloudFormation VPC.yaml pour créer un VPC, un cluster Amazon MSK et une fonction Lambda. Sur l’écran de saisie des paramètres, entrez le nom du compartiment S3 que vous avez créé à l’étape 1 où il est demandé le compartiment S3. Définissez le nom de la pile CloudFormation sur ForMSKTestVPC.

    Image montrant les champs que vous devez remplir lorsque vous spécifiez les détails de la pile CloudFormation.
  4. Préparez l’environnement pour exécuter des scripts Python dans CloudShell. Vous pouvez utiliser CloudShell sur la AWS Management Console. Pour plus d’informations sur l’utilisation de CloudShell, consultez Getting started with AWS CloudShell. Après avoir démarré CloudShell, créez un CloudShell qui appartient au VPC que vous venez de créer afin de vous connecter au cluster Amazon MSK. Créez le CloudShell dans un sous-réseau privé. Remplissez les champs suivants :

    1. Name : peut être défini sur n’importe quel nom. MSK-VPC en est un exemple

    2. VPC : sélectionnez MSKTest

    3. Subnet : sélectionnez MSKTest Private Subnet (AZ1)

    4. SecurityGroup : sélectionnez ForMSKSecurityGroup

    CloudShell interface showing ap-southeast-1 environment and option to create VPC environment.
    Image illustrant un environnement CloudShell avec les champs que vous devez spécifier.

    Une fois que le CloudShell appartenant au sous-réseau privé a démarré, exécutez la commande suivante :

    pip install boto3 kafka-python aws-msk-iam-sasl-signer-python
  5. Téléchargez des scripts Python à partir du compartiment S3.

    aws s3 cp s3://[YOUR-BUCKET-NAME]/pythonScripts.zip ./ unzip pythonScripts.zip
  6. Vérifiez la console de gestion et définissez les variables d’environnement pour l’URL du courtier et la valeur de région dans les scripts Python. Vérifiez le point de terminaison du courtier de clusters Amazon MSK dans la console de gestion.

    Amazon MSKcluster summary showing active status, serverless type, and creation details.
    Liste de tâches.
  7. Définissez les variables d’environnement sur le CloudShell. Si vous utilisez USA Ouest (Oregon) :

    export AWS_REGION="us-west-2" export MSK_BROKER="boot-YOURMSKCLUSTER.c3.kafka-serverless.ap-southeast-1.amazonaws.com:9098"
  8. Exécutez les script Python suivants.

    Créez une rubrique Amazon MSK.

    python ./createTopic.py

    Créez une table DynamoDB.

    python ./createTable.py

    Écrivez des données de test dans la rubrique Amazon MSK :

    python ./kafkaDataGen.py
  9. Vérifiez les métriques CloudWatch pour les ressources Amazon MSK, Lambda et DynamoDB créées, et vérifiez les données stockées dans la table device_status à l’aide de l’explorateur de données DynamoDB pour vous assurer que tous les processus se sont exécutés correctement. Si chaque processus est exécuté sans erreur, vous pouvez vérifier que les données de test écrites depuis CloudShell vers Amazon MSK sont également écrites sur DynamoDB.

    Image illustrant la console DynamoDB et expliquant comment certains éléments sont désormais renvoyés lorsque vous effectuez une analyse.
  10. Lorsque vous avez terminé avec cet exemple, supprimez les ressources créées dans ce didacticiel. Supprimez les deux piles CloudFormation : ForMSKTestS3 et ForMSKTestVPC. Si la suppression de la pile aboutit, toutes les ressources seront supprimées.

Étapes suivantes

Note

Si vous avez créé des ressources en suivant cet exemple, pensez à les supprimer pour éviter des frais imprévus.

L’intégration a identifié une architecture qui lie Amazon MSK et DynamoDB afin de permettre aux données de flux de prendre en charge les charges de travail OLTP. À partir de là, des recherches plus complexes peuvent être effectuées en liant DynamoDB à OpenSearch Service. Envisagez d’intégrer EventBridge pour des besoins plus complexes liés à des événements, et des extensions telles que le service géré Amazon pour Apache Flink pour un débit plus élevé et des exigences de latence plus faibles.