Programação orientada a eventos com Amazon DocumentDB e Java - Amazon DocumentDB

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Programação orientada a eventos com Amazon DocumentDB e Java

A programação orientada por eventos no contexto do Amazon DocumentDB representa um poderoso padrão arquitetônico em que as alterações no banco de dados servem como os principais geradores de eventos que acionam a lógica e os processos comerciais subsequentes. Quando os registros são inseridos, atualizados ou excluídos em uma coleção do DocumentDB, essas alterações agem como eventos que iniciam automaticamente vários processos posteriores, notificações ou tarefas de sincronização de dados. Esse padrão é particularmente valioso em sistemas distribuídos modernos, nos quais vários aplicativos ou serviços precisam reagir às mudanças de dados em tempo real. O principal mecanismo de implementação da programação orientada a eventos no DocumentDB é por meio de fluxos de mudança.

nota

Este guia pressupõe que você tenha habilitado fluxos de alteração em uma coleção com a qual você está trabalhando. Consulte Usar fluxos de alterações com o Amazon DocumentDB para saber como habilitar fluxos de mudança na coleção.

Trabalhando com fluxos de mudança do aplicativo Java

O watch() método no driver Java do MongoDB é o principal mecanismo para monitorar alterações de dados em tempo real no Amazon DocumentDB. O watch() método pode ser chamado por MongoClient, MongoDatabase, e MongoCollectionobjetos.

O watch() método retorna uma instância do ChangeStreamIterableque oferece suporte a várias opções de configuração, incluindo pesquisa completa de documentos para atualizações, fornecimento de tokens de currículo e registro de data e hora para fins de confiabilidade e estágios de agregação de pipeline para filtrar alterações.

ChangeStreamIterableimplementa a interface Java principal Iterable e pode ser usada comforEach(). Para capturar eventos usandoforEach(), passe uma função de retorno de chamada para forEach() que processe o evento alterado. O trecho de código a seguir mostra como abrir um fluxo de alterações em uma coleção para iniciar o monitoramento de eventos de alteração:

ChangeStreamIterable < Document > iterator = collection.watch(); iterator.forEach(event - > { System.out.println("Received a change: " + event); });

Outra forma de percorrer todos os eventos de mudança é abrir um cursor que mantém uma conexão com o cluster e recebe continuamente novos eventos de alteração à medida que eles ocorrem. Para obter um cursor de fluxos de alteração, use o cursor() método do ChangeStreamIterableobjeto. O exemplo de código a seguir mostra como monitorar eventos de alteração usando o cursor:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println(cursor.tryNext()); }

Como prática recomendada, crie o MongoChangeStreamCursorem uma try-with-resource instrução ou feche manualmente o cursor. Chamar o cursor() método on ChangeStreamIterableretorna um MongoChangeStreamCursor que é criado sobre um ChangeStreamDocumentobjeto.

A ChangeStreamDocumentclasse é um componente crucial que representa eventos de mudança individuais no fluxo. Ele contém informações detalhadas sobre cada modificação, incluindo o tipo de operação (inserir, atualizar, excluir, substituir), a chave do documento, as informações do namespace e o conteúdo completo do documento, quando disponível. A classe fornece métodos para acessar vários aspectos do evento de alteração, como getOperationType() determinar o tipo de alteração, getFullDocument() acessar o estado completo do documento e getDocumentKey() identificar o documento modificado.

O ChangeStreamDocumentobjeto fornece duas informações importantes, um token de currículo e a hora do evento de mudança.

Os tokens de retomada e as operações baseadas em tempo nos fluxos de mudança do DocumentDB fornecem mecanismos cruciais para manter a continuidade e gerenciar o acesso histórico aos dados. Um token de resumo é um identificador exclusivo gerado para cada evento de alteração, servindo como um marcador que permite que os aplicativos reiniciem o processamento do fluxo de alterações a partir de um ponto específico após desconexões ou falhas. Quando um cursor de fluxo de alteração é criado, ele pode usar um token de resumo armazenado anteriormente por meio da resumeAfter() opção, permitindo que o fluxo continue de onde parou, em vez de começar do início ou perder eventos.

As operações baseadas no tempo em fluxos de mudança oferecem diferentes abordagens para gerenciar o ponto de partida do monitoramento de eventos de mudança. A startAtOperationTime() opção permite que você comece a observar as alterações que ocorreram em ou após um registro de data e hora específico. Esses recursos baseados em tempo são particularmente valiosos em cenários que exigem processamento, point-in-time recuperação ou sincronização de dados históricos entre sistemas.

O exemplo de código a seguir recupera o evento associado ao documento inserido, captura seu token de currículo e, em seguida, fornece esse token para iniciar o monitoramento de eventos após o evento de inserção. O evento é associado ao evento de atualização e, em seguida, obtém a hora do cluster em que a atualização aconteceu e usa esse carimbo de data/hora como ponto de partida para processamento adicional.

BsonDocument resumeToken; BsonTimestamp resumeTime; try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) { System.out.println("****************** Insert Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeToken = insertChange.getResumeToken(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .resumeAfter(resumeToken) .cursor()) { System.out.println("****************** Update Document *******************"); ChangeStreamDocument < Document > insertChange = cursor.tryNext(); resumeTime = insertChange.getClusterTime(); printJson(cursor.tryNext()); } try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch() .startAtOperationTime(resumeTime) .cursor()) { System.out.println("****************** Delete Document *******************"); printJson(cursor.tryNext()); }

Por padrão, o evento de alteração de atualização não inclui o documento completo e inclui apenas as alterações que foram feitas. Se precisar acessar o documento completo que foi atualizado, você pode chamar o fullDocument() método no ChangeStreamIterableobjeto. Lembre-se de que, quando você solicita a devolução de um documento completo para um evento de atualização, ele retorna o documento que existe no momento em que a chamada para alterar fluxos é feita.

Esse método usa uma FullDocumentenumeração como parâmetro. Atualmente, o Amazon DocumentDB só oferece suporte a DEFAULT e UPDATE_LOOKUP valores. O trecho de código a seguir mostra como solicitar o documento completo para eventos de atualização ao começar a observar as alterações:

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())