$ ChangeStream - Amazon DocumentDB

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

$ ChangeStream

Non pris en charge par le cluster Elastic.

La phase $changeStream d'agrégation ouvre un curseur de flux de modifications pour surveiller les modifications en temps réel apportées à une collection. Il renvoie les documents relatifs aux événements de modification lorsque des opérations d'insertion, de mise à jour, de remplacement ou de suppression ont lieu.

Paramètres

  • fullDocument: Spécifie s'il faut renvoyer le document complet pour les opérations de mise à jour. Les options sont default ou updateLookup.

  • resumeAfter: Facultatif. Jeton de reprise pour continuer à partir d'un point spécifique du flux de modifications.

  • startAtOperationTime: Facultatif. Horodatage à partir duquel démarrer le flux de modifications.

  • allChangesForCluster: Facultatif. Valeur booléenne. Quandtrue, surveille toutes les modifications apportées au sein du cluster (pour la base de données d'administration). Lorsque false (par défaut), surveille uniquement la collection spécifiée.

Exemple (MongoDB Shell)

L'exemple suivant montre comment utiliser le $changeStream stage pour surveiller les modifications apportées à une collection.

Exemple de requête

// Open change stream first const changeStream = db.inventory.aggregate([ { $changeStream: { fullDocument: "updateLookup" } } ]); // In another session, insert a document db.inventory.insertOne({ _id: 1, item: "Widget", qty: 10 }); // Back in the first session, read the change event if (changeStream.hasNext()) { print(tojson(changeStream.next())); }

Sortie

{ _id: { _data: '...' }, operationType: 'insert', clusterTime: Timestamp(1, 1234567890), fullDocument: { _id: 1, item: 'Widget', qty: 10 }, ns: { db: 'test', coll: 'inventory' }, documentKey: { _id: 1 } }

Exemples de code

Pour afficher un exemple de code relatif à l'utilisation de la phase d'$changeStreamagrégation, choisissez l'onglet correspondant à la langue que vous souhaitez utiliser :

Node.js
const { MongoClient } = require('mongodb'); async function example() { const client = await MongoClient.connect('mongodb://<username>:<password>@<cluster-endpoint>:27017/?tls=true&tlsCAFile=global-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false'); const db = client.db('test'); const collection = db.collection('inventory'); // Open change stream const changeStream = collection.watch([]); changeStream.on('change', (change) => { console.log('Change detected:', change); }); // Simulate insert in another operation setTimeout(async () => { await collection.insertOne({ _id: 1, item: 'Widget', qty: 10 }); }, 1000); // Keep connection open to receive changes // In production, handle cleanup appropriately } example();
Python
from pymongo import MongoClient import threading import time def example(): client = MongoClient('mongodb://<username>:<password>@<cluster-endpoint>:27017/?tls=true&tlsCAFile=global-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false') db = client['test'] collection = db['inventory'] # Open change stream change_stream = collection.watch([]) # Insert document in separate thread after delay def insert_doc(): time.sleep(1) collection.insert_one({'_id': 1, 'item': 'Widget', 'qty': 10}) threading.Thread(target=insert_doc).start() # Watch for changes for change in change_stream: print('Change detected:', change) break # Exit after first change client.close() example()