기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
$changeStream
Elastic 클러스터에서는 지원되지 않습니다.
$changeStream 집계 단계에서는 변경 스트림 커서를 열어 컬렉션에 대한 실시간 변경 사항을 모니터링합니다. 삽입, 업데이트, 교체 또는 삭제 작업이 발생할 때 변경 이벤트 문서를 반환합니다.
파라미터
-
fullDocument: 업데이트 작업을 위해 전체 문서를 반환할지 여부를 지정합니다. 옵션은 default 또는 updateLookup입니다.
-
resumeAfter: 선택 사항입니다. 토큰을 재개하여 변경 스트림의 특정 지점에서 계속합니다.
-
startAtOperationTime: 선택 사항입니다. 변경 스트림을 시작할 타임스탬프입니다.
-
allChangesForCluster: 선택 사항입니다. 부울 값입니다. 인 경우 true는 클러스터 전체의 모든 변경 사항을 감시합니다(관리자 데이터베이스의 경우). false (기본값)인 경우는 지정된 컬렉션만 감시합니다.
예제(MongoDB 쉘)
다음 예제에서는 $changeStream 스테이지를 사용하여 컬렉션에 대한 변경 사항을 모니터링하는 방법을 보여줍니다.
쿼리 예제
// Open change stream first
const changeStream = db.inventory.aggregate([
{ $changeStream: { fullDocument: "updateLookup" } }
]);
// In another session, insert a document
db.inventory.insertOne({ _id: 1, item: "Widget", qty: 10 });
// Back in the first session, read the change event
if (changeStream.hasNext()) {
print(tojson(changeStream.next()));
}
출력
{
_id: { _data: '...' },
operationType: 'insert',
clusterTime: Timestamp(1, 1234567890),
fullDocument: { _id: 1, item: 'Widget', qty: 10 },
ns: { db: 'test', coll: 'inventory' },
documentKey: { _id: 1 }
}
코드 예제
$changeStream 집계 단계 사용에 대한 코드 예제를 보려면 사용하려는 언어의 탭을 선택합니다.
- Node.js
-
const { MongoClient } = require('mongodb');
async function example() {
const client = await MongoClient.connect('mongodb://<username>:<password>@<cluster-endpoint>:27017/?tls=true&tlsCAFile=global-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false');
const db = client.db('test');
const collection = db.collection('inventory');
// Open change stream
const changeStream = collection.watch([]);
changeStream.on('change', (change) => {
console.log('Change detected:', change);
});
// Simulate insert in another operation
setTimeout(async () => {
await collection.insertOne({ _id: 1, item: 'Widget', qty: 10 });
}, 1000);
// Keep connection open to receive changes
// In production, handle cleanup appropriately
}
example();
- Python
-
from pymongo import MongoClient
import threading
import time
def example():
client = MongoClient('mongodb://<username>:<password>@<cluster-endpoint>:27017/?tls=true&tlsCAFile=global-bundle.pem&replicaSet=rs0&readPreference=secondaryPreferred&retryWrites=false')
db = client['test']
collection = db['inventory']
# Open change stream
change_stream = collection.watch([])
# Insert document in separate thread after delay
def insert_doc():
time.sleep(1)
collection.insert_one({'_id': 1, 'item': 'Widget', 'qty': 10})
threading.Thread(target=insert_doc).start()
# Watch for changes
for change in change_stream:
print('Change detected:', change)
break # Exit after first change
client.close()
example()