Event-driven programming with Amazon DocumentDB and Java
Event-driven programming in the context of Amazon DocumentDB represents a powerful architectural pattern where database changes serve as the primary event generators that trigger subsequent business logic and processes. When records are inserted, updated, or deleted in a DocumentDB collection, these changes act as events that automatically initiate various downstream processes, notifications, or data synchronization tasks. This pattern is particularly valuable in modern distributed systems where multiple applications or services need to react to data changes in real-time. The primary mechanism of implementing event-driven programming in DocumentDB is by change streams.
Note
This guide assumes you have enabled change streams on a collection that you are working with. See Using change streams with Amazon DocumentDB to learn how to enable change streams on the collection.
Working with change streams from the Java application
The watch()
method in MongoDB’s Java driver is the primary mechanism for monitoring real-time data changes in Amazon DocumentDB.
The watch()
method can be called on by MongoClient
MongoDatabase
MongoCollection
The watch()
method returns an instance of ChangeStreamIterable
ChangeStreamIterable
Iterable
and can be used with forEach()
.
To capture events using forEach()
, pass in a callback function to forEach()
that processes the changed event.
The following code snippet shows how to open a change streams on a collection to start change event monitoring:
ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });
Another way of traversing through all the change events is by opening a cursor that maintains a connection to the cluster and continuously receives new change events as they occur.
To obtain a change streams cursor, use the cursor()
method of ChangeStreamIterable
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }
As a best practice, either create the MongoChangeStreamCursor
cursor()
method on ChangeStreamIterable
MongoChangeStreamCursor
that is created over a ChangeStreamDocument
The ChangeStreamDocument
getOperationType()
to determine the type of change, getFullDocument()
to access the complete document state, and getDocumentKey()
to identify the modified document.
The ChangeStreamDocument
Resume tokens and time-based operations in DocumentDB change streams provide crucial mechanisms for maintaining continuity and managing historical data access.
A resume token is a unique identifier generated for each change event, serving as a bookmark that allows applications to restart change stream processing from a specific point after disconnections or failures.
When a change stream cursor is created, it can use a previously stored resume token through the resumeAfter()
option, enabling the stream to continue from where it left off rather than starting from the beginning or losing events.
Time-based operations in change streams offer different approaches to manage the starting point of change event monitoring.
The startAtOperationTime()
option allows you to begin watching changes that occurred at or after a specific timestamp.
These time-based features are particularly valuable in scenarios requiring historical data processing, point-in-time recovery, or synchronization between systems.
The following code example retrieves the event associated with the insert document, captures it’s resume token, and then provides that token to start monitoring for events after the insert event. The event is associated with the update event, then gets the cluster time when the update happened and uses that timestamp as a starting point for further processing.
BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }
By default, the update change event does not include the full document and it only include the changes that were made.
If you need to access the complete document that was updated, you can call the fullDocument()
method on the ChangeStreamIterable
This method takes a FullDocument
UPDATE_LOOKUP
values.
The following code snippet shows how to ask for full document for update events when starting to watch for changes:
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())