

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon DocumentDB と Java を使用したイベント駆動型プログラミング
<a name="event-driven-programming"></a>

Amazon DocumentDB のコンテキストにおけるイベント駆動型プログラミングは、データベースの変更が後続のビジネスロジックとプロセスをトリガーする主要なイベントジェネレーターとして機能する強力なアーキテクチャパターンを表します。DocumentDB コレクションでレコードが挿入、更新、または削除されると、これらの変更はさまざまなダウンストリームプロセス、通知、またはデータ同期タスクを自動的に開始するイベントとして機能します。このパターンは、複数のアプリケーションまたはサービスがデータ変更にリアルタイムで対応する必要がある最新の分散システムでは特に重要です。DocumentDB でイベント駆動型プログラミングを実装する主なメカニズムは、変更ストリームです。

**注記**  
このガイドでは、作業中のコレクションで変更ストリームを有効にしていることを前提としています。コレクションで変更ストリームを有効にする方法については、「[Amazon DocumentDB を用いた変更ストリームの使用](change_streams.md)」を参照してください。

**Java アプリケーションからの変更ストリームの使用**

MongoDB の Java ドライバーの `watch()` メソッドは、Amazon DocumentDB でリアルタイムのデータ変更をモニタリングするための主要なメカニズムです。`watch()` メソッドは、[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoClient.html](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoClient.html)、[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoDatabase.html#watch(com.mongodb.client.ClientSession,java.lang.Class)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoDatabase.html#watch(com.mongodb.client.ClientSession,java.lang.Class))、および [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoCollection.html#watch()](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoCollection.html#watch()) オブジェクトによって呼び出すことができます。

`watch()` メソッドは、更新の完全なドキュメント検索、信頼性のための再開トークンとタイムスタンプの提供、変更をフィルタリングするためのパイプライン集約ステージなど、さまざまな設定オプションをサポートする [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)) のインスタンスを返します。

[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)) はコア Java インターフェイス `Iterable` を実装しており、`forEach()` で使用できます。`forEach()` を使用してイベントをキャプチャするには、変更されたイベントを処理する `forEach()` にコールバック関数を渡します。次のコードスニペットは、コレクションで変更ストリームを開いて変更イベントのモニタリングを開始する方法を示しています。

```
ChangeStreamIterable < Document > iterator = collection.watch();
iterator.forEach(event - > {
    System.out.println("Received a change: " + event);
});
```

すべての変更イベントを通過するもう 1 つの方法は、クラスターへの接続を維持し、発生時に新しい変更イベントを継続的に受信するカーソルを開くことです。変更ストリームカーソルを取得するには、[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)) オブジェクトの `cursor()` メソッドを使用します。次のコード例は、カーソルを使用して変更イベントをモニタリングする方法を示しています。

```
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) {
    System.out.println(cursor.tryNext());
}
```

ベストプラクティスとして、 try-with-resource ステートメントで [https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoChangeStreamCursor.html#getResumeToken()](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/MongoChangeStreamCursor.html#getResumeToken()) を作成するか、カーソルを手動で閉じます。[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)) で `cursor()` メソッドを呼び出すと、[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html) オブジェクト上に作成された `MongoChangeStreamCursor` が返されます。

[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html) クラスは、ストリーム内の個々の変更イベントを表す重要なコンポーネントです。これには、オペレーションタイプ (挿入、更新、削除、置換)、ドキュメントキー、名前空間情報、および使用可能な場合の完全なドキュメントコンテンツなど、各変更に関する詳細情報が含まれます。クラスは、変更のタイプを決定する `getOperationType()`、完全なドキュメント状態にアクセスする `getFullDocument()`、変更されたドキュメントを識別する `getDocumentKey()` など、変更イベントのさまざまな側面にアクセスするメソッドを提供します。

[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/ChangeStreamDocument.html) オブジェクトは、再開トークンと変更イベントという、時刻の 2 つの重要な情報を提供します。

DocumentDB 変更ストリームの再開トークンと時間ベースのオペレーションは、継続性を維持し、履歴データアクセスを管理するための重要なメカニズムを提供します。再開トークンは、変更イベントごとに生成される一意の識別子であり、切断または障害後にアプリケーションが特定のポイントから変更ストリーム処理を再開できるようにするブックマークとして機能します。変更ストリームカーソルを作成すると、`resumeAfter()` オプションを使用して以前に保存した再開トークンを使用できます。これにより、ストリームは最初から開始したり、イベントを失ったりするのではなく、中断した場所から続行できます。

変更ストリームの時間ベースのオペレーションは、変更イベントモニタリングの開始点を管理するためのさまざまなアプローチを提供します。`startAtOperationTime()` オプションを使用すると、特定のタイムスタンプ以降に発生した変更の監視を開始できます。これらの時間ベースの機能は、履歴データ処理、ポイントインタイムリカバリ、またはシステム間の同期を必要とするシナリオで特に役立ちます。

次のコード例は、挿入ドキュメントに関連付けられたイベントを取得し、再開トークンをキャプチャしてから、挿入イベントの後にイベントのモニタリングを開始するトークンを提供します。イベントは更新イベントに関連付けられ、更新が発生したクラスター時間を取得し、そのタイムスタンプをさらなる処理の開始点として使用します。

```
BsonDocument resumeToken;
BsonTimestamp resumeTime;

try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().cursor()) {
    System.out.println("****************** Insert Document *******************");
    ChangeStreamDocument < Document > insertChange = cursor.tryNext();
    resumeToken = insertChange.getResumeToken();
    printJson(cursor.tryNext());
}
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch()
    .resumeAfter(resumeToken)
    .cursor()) {
    System.out.println("****************** Update Document *******************");
    ChangeStreamDocument < Document > insertChange = cursor.tryNext();
    resumeTime = insertChange.getClusterTime();
    printJson(cursor.tryNext());
}
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch()
    .startAtOperationTime(resumeTime)
    .cursor()) {
    System.out.println("****************** Delete Document *******************");
    printJson(cursor.tryNext());
  }
```

デフォルトで、変更の更新イベントには、ドキュメント全体は含まれず、加えられた変更のみが含まれます。更新した完全なドキュメントにアクセスする必要がある場合は、[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-sync/com/mongodb/client/ChangeStreamIterable.html#startAtOperationTime(org.bson.BsonTimestamp)) オブジェクトで `fullDocument()` メソッドを呼び出すことができます。更新イベントに対して完全なドキュメントが返されるように要求すると、変更ストリームへの呼び出しが行われた時点で存在するドキュメントが返されることに注意してください。

このメソッドは、[https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/FullDocument.html](https://mongodb.github.io/mongo-java-driver/5.3/apidocs/mongodb-driver-core/com/mongodb/client/model/changestream/FullDocument.html) 列挙型をパラメータとして受け取ります。現在、Amazon DocumentDB は DEFAULT および `UPDATE_LOOKUP` 値のみをサポートしています。次のコードスニペットは、変更の監視を開始するときに、更新イベントの完全なドキュメントを要求する方法を示しています。

```
try (MongoChangeStreamCursor < ChangeStreamDocument < Document >> cursor = collection.watch().fullDocument(FullDocument.UPDATE_LOOKUP).cursor())
```