翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
$changeStream
Elastic クラスターではサポートされていません。
$changeStream 集約ステージは変更ストリームカーソルを開き、コレクションへのリアルタイムの変更をモニタリングします。挿入、更新、置換、または削除オペレーションが発生すると、変更イベントドキュメントが返されます。
パラメータ
-
fullDocument: 更新オペレーションのために完全なドキュメントを返すかどうかを指定します。オプションは default または updateLookup です。
-
resumeAfter: オプション。トークンを再開して、変更ストリームの特定のポイントから続行します。
-
startAtOperationTime: オプション。変更ストリームを開始するタイムスタンプ。
-
allChangesForCluster: オプション。ブール値。の場合true、 はクラスター全体のすべての変更を監視します (管理者データベース用)。false (デフォルト) の場合、 は指定されたコレクションのみを監視します。
例 (MongoDB シェル)
次の例は、 $changeStreamステージを使用してコレクションの変更をモニタリングする方法を示しています。
クエリの例
// Open change stream first
const changeStream = db.inventory.aggregate([
{ $changeStream: { fullDocument: "updateLookup" } }
]);
// In another session, insert a document
db.inventory.insertOne({ _id: 1, item: "Widget", qty: 10 });
// Back in the first session, read the change event
if (changeStream.hasNext()) {
print(tojson(changeStream.next()));
}
出力
{
_id: { _data: '...' },
operationType: 'insert',
clusterTime: Timestamp(1, 1234567890),
fullDocument: { _id: 1, item: 'Widget', qty: 10 },
ns: { db: 'test', coll: 'inventory' },
documentKey: { _id: 1 }
}
コードの例
$changeStream 集約ステージを使用するコード例を表示するには、使用する言語のタブを選択します。
- Node.js
-
const { MongoClient } = require('mongodb');
async function example() {
const client = await MongoClient.connect('mongodb://<username>:<password>@<cluster-endpoint>:27017/?tls=true&tlsCAFile=global-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false');
const db = client.db('test');
const collection = db.collection('inventory');
// Open change stream
const changeStream = collection.watch([]);
changeStream.on('change', (change) => {
console.log('Change detected:', change);
});
// Simulate insert in another operation
setTimeout(async () => {
await collection.insertOne({ _id: 1, item: 'Widget', qty: 10 });
}, 1000);
// Keep connection open to receive changes
// In production, handle cleanup appropriately
}
example();
- Python
-
from pymongo import MongoClient
import threading
import time
def example():
client = MongoClient('mongodb://<username>:<password>@<cluster-endpoint>:27017/?tls=true&tlsCAFile=global-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false')
db = client['test']
collection = db['inventory']
# Open change stream
change_stream = collection.watch([])
# Insert document in separate thread after delay
def insert_doc():
time.sleep(1)
collection.insert_one({'_id': 1, 'item': 'Widget', 'qty': 10})
threading.Thread(target=insert_doc).start()
# Watch for changes
for change in change_stream:
print('Change detected:', change)
break # Exit after first change
client.close()
example()