Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Programación basada en eventos con Amazon DocumentDB y Java
La programación basada en eventos en el contexto de Amazon DocumentDB representa un patrón arquitectónico potente en el que los cambios en la base de datos actúan como los principales generadores de eventos que desencadenan la lógica y los procesos empresariales posteriores. Cuando se insertan, actualizan o eliminan registros en una colección de DocumentDB, estos cambios actúan como eventos que inician automáticamente varios procesos posteriores, notificaciones o tareas de sincronización de datos. Este patrón es particularmente valioso en los sistemas distribuidos modernos, donde aplicaciones o servicios múltiples deben reaccionar a los cambios en los datos en tiempo real. El mecanismo principal de implementación de la programación basada en eventos en DocumentDB es mediante flujos de cambios.
nota
En esta guía se asume que habilitó los flujos de cambios en una colección con la que está trabajando. Consulte Uso de flujos de cambios con Amazon DocumentDB para obtener información sobre cómo habilitar los flujos de cambios en la colección.
Trabajo con flujos de cambios desde la aplicación Java
El método watch() del controlador Java de MongoDB es el mecanismo principal para supervisar los cambios de datos en tiempo real en Amazon DocumentDB. Los objetos MongoClientMongoDatabaseMongoCollectionwatch().
El método watch() devuelve una instancia de ChangeStreamIterable
ChangeStreamIterableIterable y puede usarse con forEach(). Para capturar eventos mediante forEach(), transfiera una función de devolución de llamada a forEach() que procese el evento cambiado. En el siguiente fragmento de código se muestra cómo abrir un flujo de cambios en una colección para iniciar la supervisión de eventos de cambio:
ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });
Otra forma de recorrer todos los eventos de cambio consiste en abrir un cursor que mantenga una conexión con el clúster y reciba continuamente nuevos eventos de cambio a medida que se producen. Para obtener un cursor de flujo de cambios, utilice el método cursor() del objeto ChangeStreamIterable
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }
Como práctica recomendada, puede crearla MongoChangeStreamCursorcursor() en ChangeStreamIterableMongoChangeStreamCursor creado sobre un objeto de ChangeStreamDocument
La clase ChangeStreamDocumentgetOperationType() para determinar el tipo de cambio, getFullDocument() para acceder al estado completo del documento y getDocumentKey() para identificar el documento modificado.
El objeto ChangeStreamDocument
Los tokens de reanudación y las operaciones basadas en el tiempo en los flujos de cambios de DocumentDB proporcionan mecanismos cruciales para mantener la continuidad y administrar el acceso a los datos históricos. Un token de reanudación es un identificador único que se genera para cada evento de cambio y sirve como marcador que permite que las aplicaciones reinicien el procesamiento del flujo de cambios desde un punto específico tras una desconexión o un fallo. Cuando se crea un cursor de flujo de cambios, se puede usar un token de reanudación previamente almacenado mediante la opción resumeAfter(), lo que permite que el flujo continúe desde donde dejó en lugar de empezar desde el principio o perder los eventos.
Las operaciones basadas en el tiempo en los flujos de cambios ofrecen diferentes enfoques para gestionar el punto de partida de la supervisión de los eventos de cambio. La opción startAtOperationTime() le permite empezar a observar los cambios que ocurrieron en una marca de tiempo específica o después de esta. Estas funciones basadas en el tiempo son particularmente valiosas en escenarios que requieren el procesamiento, la point-in-time recuperación o la sincronización de datos históricos entre sistemas.
El siguiente ejemplo de código recupera el evento asociado al documento de inserción, captura su token de reanudación y, a continuación, proporciona ese token para empezar a supervisar los eventos posteriores al evento de inserción. El evento se asocia al evento de actualización y, a continuación, obtiene la hora del clúster en que se produjo la actualización y utiliza esa marca de tiempo como punto de partida para el procesamiento posterior.
BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }
De manera predeterminada, el evento de cambio de actualización no incluye el documento completo, sino tan solo los cambios realizados. Si necesita acceder al documento completo que se actualizó, puede llamar al método fullDocument() en el objeto ChangeStreamIterable
Este método toma un enumerador FullDocumentUPDATE_LOOKUP. El siguiente fragmento de código muestra cómo solicitar el documento completo para los eventos de actualización cuando empieza a observar los cambios:
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())