Intégration de DynamoDB à Amazon Managed Streaming pour Apache Kafka Kafka - Amazon DynamoDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Intégration de DynamoDB à Amazon Managed Streaming pour Apache Kafka Kafka

Amazon Managed Streaming for Apache Kafka (Amazon MSK) facilite l'ingestion et le traitement des données de streaming en temps réel grâce à un service Apache Kafka entièrement géré et hautement disponible.

Apache Kafka est un magasin de données distribué optimisé pour l'ingestion et le traitement de données de streaming en temps réel. Kafka peut traiter des flux d'enregistrements, stocker efficacement des flux d'enregistrements dans l'ordre dans lequel les enregistrements ont été générés, et publier des flux d'enregistrements et s'abonner à des flux d'enregistrements.

Grâce à ces fonctionnalités, Apache Kafka est souvent utilisé pour créer des pipelines de données de streaming en temps réel. Un pipeline de données traite et déplace les données de manière fiable d'un système à un autre et peut jouer un rôle important dans l'adoption d'une stratégie de base de données spécialement conçue en facilitant l'utilisation de plusieurs bases de données qui prennent chacune en charge différents cas d'utilisation.

Amazon DynamoDB est une cible courante dans ces pipelines de données pour prendre en charge les applications qui utilisent des modèles de données clé-valeur ou des modèles de données documentaires et qui recherchent une évolutivité illimitée avec des performances constantes à une milliseconde à un chiffre.

Fonctionnement

Une intégration entre Amazon MSK et DynamoDB utilise une fonction Lambda pour utiliser les enregistrements d'Amazon MSK et les écrire dans DynamoDB.

Schéma illustrant une intégration entre Amazon MSK et DynamoDB, et montrant comment Amazon MSK utilise une fonction Lambda pour consommer des enregistrements et les écrire dans DynamoDB.

Lambda interroge en interne les nouveaux messages provenant d'Amazon MSK, puis invoque de manière synchrone la fonction Lambda cible. La charge utile des événements de la fonction Lambda contient des lots de messages provenant d'Amazon MSK. Pour l'intégration entre Amazon MSK et DynamoDB, la fonction Lambda écrit ces messages dans DynamoDB.

Configurer une intégration entre Amazon MSK et DynamoDB

Note

Vous pouvez télécharger les ressources utilisées dans cet exemple dans le GitHub référentiel suivant.

Les étapes ci-dessous montrent comment configurer un exemple d'intégration entre Amazon MSK et Amazon DynamoDB. L'exemple représente les données générées par les appareils de l'Internet des objets (IoT) et ingérées dans Amazon MSK. Lorsque les données sont ingérées dans Amazon MSK, elles peuvent être intégrées à des services d'analyse ou à des outils tiers compatibles avec Apache Kafka, ce qui permet divers cas d'utilisation de l'analyse. L'intégration de DynamoDB permet également de rechercher des valeurs clés dans des enregistrements de périphériques individuels.

Cet exemple montre comment un script Python écrit les données des capteurs IoT sur Amazon MSK. Ensuite, une fonction Lambda écrit des éléments avec la clé de partition « deviceid » dans DynamoDB.

Le CloudFormation modèle fourni créera les ressources suivantes : un compartiment Amazon S3, un Amazon VPC, un cluster Amazon MSK et un AWS CloudShell pour tester les opérations de données.

Pour générer des données de test, créez une rubrique Amazon MSK, puis créez une table DynamoDB. Vous pouvez utiliser le gestionnaire de session depuis la console de gestion pour vous connecter au système CloudShell d'exploitation et exécuter des scripts Python.

Après avoir exécuté le CloudFormation modèle, vous pouvez terminer la création de cette architecture en effectuant les opérations suivantes.

  1. Exécutez le CloudFormation modèle S3bucket.yaml pour créer un compartiment S3. Pour tous les scripts ou opérations suivants, veuillez les exécuter dans la même région. Entrez ForMSKTestS3 comme nom de CloudFormation pile.

    Image montrant l'écran de création de la pile de CloudFormation consoles.

    Une fois cette opération terminée, notez le nom du compartiment S3 affiché sous Sorties. Vous aurez besoin du nom à l'étape 3.

    AWS CloudFormation stack outputs showing S3 bucket name for MSK and DynamoDB sample.
  2. Téléchargez le fichier ZIP téléchargé dans fromMSK.zip le compartiment S3 que vous venez de créer.

    Image montrant où vous pouvez télécharger des fichiers dans la console S3.
  3. Exécutez le CloudFormation modèle VPC.yaml pour créer un VPC, un cluster Amazon MSK et une fonction Lambda. Sur l'écran de saisie des paramètres, entrez le nom du compartiment S3 que vous avez créé à l'étape 1 où il est demandé le compartiment S3. Définissez le nom de la CloudFormation pile surForMSKTestVPC.

    Image montrant les champs que vous devez remplir lorsque vous spécifiez les détails de la CloudFormation pile.
  4. Préparez l'environnement dans lequel les scripts Python seront exécutés CloudShell. Vous pouvez utiliser CloudShell sur le AWS Management Console. Pour plus d'informations sur l'utilisation CloudShell, consultez Getting started with AWS CloudShell. Après avoir démarré CloudShell, créez un VPC appartenant au VPC CloudShell que vous venez de créer afin de vous connecter au cluster Amazon MSK. Créez-le CloudShell dans un sous-réseau privé. Remplissez les champs suivants :

    1. Nom - peut être défini sur n'importe quel nom. MSK-VPC en est un exemple.

    2. VPC - sélectionnez MSKTest

    3. Sous-réseau : sélectionnez Sous-réseau MSKTest privé () AZ1

    4. SecurityGroup- sélectionnez Pour le MSKSecurity groupe

    CloudShell interface showing ap-southeast-1 environment and option to create VPC environment.
    Image montrant un CloudShell environnement avec les champs que vous devez spécifier.

    Une fois que l' CloudShell appartenance au sous-réseau privé a commencé, exécutez la commande suivante :

    pip install boto3 kafka-python aws-msk-iam-sasl-signer-python
  5. Téléchargez des scripts Python depuis le compartiment S3.

    aws s3 cp s3://[YOUR-BUCKET-NAME]/pythonScripts.zip ./ unzip pythonScripts.zip
  6. Vérifiez la console de gestion et définissez les variables d'environnement pour l'URL du courtier et la valeur de région dans les scripts Python. Vérifiez le point de terminaison du courtier de clusters Amazon MSK dans la console de gestion.

    Amazon MSKcluster summary showing active status, serverless type, and creation details.
    TODO.
  7. Définissez les variables d'environnement sur CloudShell. Si vous utilisez l'ouest des États-Unis (Oregon) :

    export AWS_REGION="us-west-2" export MSK_BROKER="boot-YOURMSKCLUSTER.c3.kafka-serverless.ap-southeast-1.amazonaws.com:9098"
  8. Exécutez les scripts Python suivants.

    Créez une rubrique Amazon MSK :

    python ./createTopic.py

    Créez une table DynamoDB :

    python ./createTable.py

    Rédigez les données de test dans la rubrique Amazon MSK :

    python ./kafkaDataGen.py
  9. Vérifiez les CloudWatch métriques des ressources Amazon MSK, Lambda et DynamoDB créées, et vérifiez les données stockées dans la device_status table à l'aide de l'explorateur de données DynamoDB pour vous assurer que tous les processus se sont exécutés correctement. Si chaque processus est exécuté sans erreur, vous pouvez vérifier que les données de test écrites depuis CloudShell Amazon MSK sont également écrites sur DynamoDB.

    Image illustrant la console DynamoDB et expliquant comment certains éléments sont désormais renvoyés lorsque vous effectuez une analyse.
  10. Lorsque vous aurez terminé avec cet exemple, supprimez les ressources créées dans ce didacticiel. Supprimez les deux CloudFormation piles : ForMSKTestS3 et. ForMSKTestVPC Si la suppression de la pile aboutit, toutes les ressources seront supprimées.

Étapes suivantes

Note

Si vous avez créé des ressources en suivant cet exemple, pensez à les supprimer pour éviter des frais imprévus.

L'intégration a identifié une architecture qui lie Amazon MSK et DynamoDB afin de permettre aux données de flux de prendre en charge les charges de travail OLTP. À partir de là, des recherches plus complexes peuvent être effectuées en liant DynamoDB à Service OpenSearch . Envisagez l'intégration EventBridge pour les besoins plus complexes liés aux événements, ainsi que des extensions telles qu'Amazon Managed Service pour Apache Flink pour un débit plus élevé et des exigences de latence plus faibles.