$changeStream - Amazon DocumentDB

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

$changeStream

No es compatible con el clúster de Elastic.

La etapa de $changeStream agregación abre un cursor de flujo de cambios para monitorear los cambios en tiempo real en una colección. Devuelve los documentos de los eventos de cambio cuando se realizan operaciones de inserción, actualización, reemplazo o eliminación.

Parámetros

  • fullDocument: Especifica si se debe devolver el documento completo para las operaciones de actualización. Las opciones son default y updateLookup.

  • resumeAfter: opcional. Reanude el token para continuar desde un punto específico del flujo de cambios.

  • startAtOperationTime: opcional. Marca de tiempo desde la que se inicia el flujo de cambios.

  • allChangesForCluster: opcional. Valor booleano. Cuandotrue, observa todos los cambios en el clúster (para la base de datos de administración). Cuando false (predeterminado), solo observa la colección especificada.

Ejemplo (MongoDB Shell)

El siguiente ejemplo demuestra el uso del $changeStream escenario para supervisar los cambios en una colección.

Ejemplo de consulta

// Open change stream first const changeStream = db.inventory.aggregate([ { $changeStream: { fullDocument: "updateLookup" } } ]); // In another session, insert a document db.inventory.insertOne({ _id: 1, item: "Widget", qty: 10 }); // Back in the first session, read the change event if (changeStream.hasNext()) { print(tojson(changeStream.next())); }

Salida

{ _id: { _data: '...' }, operationType: 'insert', clusterTime: Timestamp(1, 1234567890), fullDocument: { _id: 1, item: 'Widget', qty: 10 }, ns: { db: 'test', coll: 'inventory' }, documentKey: { _id: 1 } }

Ejemplos de código

Para ver un ejemplo de código para usar la etapa de $changeStream agregación, elija la pestaña correspondiente al idioma que desee usar:

Node.js
const { MongoClient } = require('mongodb'); async function example() { const client = await MongoClient.connect('mongodb://<username>:<password>@<cluster-endpoint>:27017/?tls=true&tlsCAFile=global-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false'); const db = client.db('test'); const collection = db.collection('inventory'); // Open change stream const changeStream = collection.watch([]); changeStream.on('change', (change) => { console.log('Change detected:', change); }); // Simulate insert in another operation setTimeout(async () => { await collection.insertOne({ _id: 1, item: 'Widget', qty: 10 }); }, 1000); // Keep connection open to receive changes // In production, handle cleanup appropriately } example();
Python
from pymongo import MongoClient import threading import time def example(): client = MongoClient('mongodb://<username>:<password>@<cluster-endpoint>:27017/?tls=true&tlsCAFile=global-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false') db = client['test'] collection = db['inventory'] # Open change stream change_stream = collection.watch([]) # Insert document in separate thread after delay def insert_doc(): time.sleep(1) collection.insert_one({'_id': 1, 'item': 'Widget', 'qty': 10}) threading.Thread(target=insert_doc).start() # Watch for changes for change in change_stream: print('Change detected:', change) break # Exit after first change client.close() example()