$changeStream - Amazon DocumentDB

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

$changeStream

Elastic クラスターではサポートされていません。

$changeStream 集約ステージは変更ストリームカーソルを開き、コレクションへのリアルタイムの変更をモニタリングします。挿入、更新、置換、または削除オペレーションが発生すると、変更イベントドキュメントが返されます。

パラメータ

  • fullDocument: 更新オペレーションのために完全なドキュメントを返すかどうかを指定します。オプションは default または updateLookup です。

  • resumeAfter: オプション。トークンを再開して、変更ストリームの特定のポイントから続行します。

  • startAtOperationTime: オプション。変更ストリームを開始するタイムスタンプ。

  • allChangesForCluster: オプション。ブール値。の場合true、 はクラスター全体のすべての変更を監視します (管理者データベース用)。false (デフォルト) の場合、 は指定されたコレクションのみを監視します。

例 (MongoDB シェル)

次の例は、 $changeStreamステージを使用してコレクションの変更をモニタリングする方法を示しています。

クエリの例

// Open change stream first const changeStream = db.inventory.aggregate([ { $changeStream: { fullDocument: "updateLookup" } } ]); // In another session, insert a document db.inventory.insertOne({ _id: 1, item: "Widget", qty: 10 }); // Back in the first session, read the change event if (changeStream.hasNext()) { print(tojson(changeStream.next())); }

出力

{ _id: { _data: '...' }, operationType: 'insert', clusterTime: Timestamp(1, 1234567890), fullDocument: { _id: 1, item: 'Widget', qty: 10 }, ns: { db: 'test', coll: 'inventory' }, documentKey: { _id: 1 } }

コードの例

$changeStream 集約ステージを使用するコード例を表示するには、使用する言語のタブを選択します。

Node.js
const { MongoClient } = require('mongodb'); async function example() { const client = await MongoClient.connect('mongodb://<username>:<password>@<cluster-endpoint>:27017/?tls=true&tlsCAFile=global-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false'); const db = client.db('test'); const collection = db.collection('inventory'); // Open change stream const changeStream = collection.watch([]); changeStream.on('change', (change) => { console.log('Change detected:', change); }); // Simulate insert in another operation setTimeout(async () => { await collection.insertOne({ _id: 1, item: 'Widget', qty: 10 }); }, 1000); // Keep connection open to receive changes // In production, handle cleanup appropriately } example();
Python
from pymongo import MongoClient import threading import time def example(): client = MongoClient('mongodb://<username>:<password>@<cluster-endpoint>:27017/?tls=true&tlsCAFile=global-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false') db = client['test'] collection = db['inventory'] # Open change stream change_stream = collection.watch([]) # Insert document in separate thread after delay def insert_doc(): time.sleep(1) collection.insert_one({'_id': 1, 'item': 'Widget', 'qty': 10}) threading.Thread(target=insert_doc).start() # Watch for changes for change in change_stream: print('Change detected:', change) break # Exit after first change client.close() example()