View a markdown version of this page

Gremlin を使用したクエリ結果のストリーミング - Amazon Neptune

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Gremlin を使用したクエリ結果のストリーミング

多数の結果を返す Gremlin トラバーサルを実行すると、Neptune は WebSocket 接続を介してそれらをバッチでクライアントにストリーミングします。Neptune は、クライアントがそれ以上リクエストするのを待たずに、生成された結果バッチを送信します。これは、サーバーから返される結果を処理するが、結果セット全体をメモリに収集しないように遅延反復パターンを使用する必要がある場合に便利です。

Neptune は、デフォルトで WebSocket フレームあたり 64 のバッチで結果を送信します。このサーバー側のデフォルトを変更することはできませんが、リクエストオプション (Java ドライバーTokens.ARGS_BATCH_SIZEまたはドライバーレベルのデフォルトconnectionPool.resultIterationBatchSize) を使用して、batchSizeリクエストごとにクライアントからバッチサイズを上書きできます。

他の言語ドライバーbatchSizeでの の設定の詳細については、「Apache TinkerPop Gremlin ドライバーとバリアント」ドキュメントの「各ドライバーの設定」セクションを参照してください。

サーバーは自動的に結果をプッシュするため、クライアント側のバックプレッシャーは TCP および WebSocket フロー制御を通じて暗黙的に処理されます。クライアントがソケットから読み取るのが遅い場合、クライアントが追いつくまでサーバーの書き込みは最終的にブロックされます。

重要

ストリーミングは、結果を段階的に生成できるトラバーサルで最も効果的です。、order()、、groupCount()group()、 などのステップを含むトラバーサルはdedup()、結果を生成する前に完全なトラバーサルを完了する必要があるため、Neptune はストリーミングを開始する前に結果セット全体をメモリにマテリアライズします。このような場合、バッチ処理によってフレームごとのシリアル化オーバーヘッドは削減されますが、サーバー側のメモリ使用量は削減されません。

結果を段階的に消費する

到着時に結果を処理するには、すべての結果をリストに収集するのではなく、/ hasNext() next()または同等の APIs を使用して遅延的に反復処理します。next(batchSize) を使用してアプリケーションレベルのバッチで結果を取得できるため、サーバーが結果を生成し続ける間、バッチ間で中間作業を実行できます。

例 Java (GLV バイトコード)
GraphTraversalSource g = traversal().withRemote(connection); int batchSize = 10; int batchNum = 0; var traversal = g.V().hasLabel("movie").values("title").limit(1000); while (traversal.hasNext()) { var batch = traversal.next(batchSize); batchNum++; for (var title : batch) { System.out.println(" " + title); } // Do other intermediary work here between batch calls System.out.println("Batch " + batchNum + " processing complete\n"); }
例 Python
g = traversal().with_remote(connection) BATCH_SIZE = 10 batch_num = 0 t = g.V().has_label('movie').values('title').limit(1000) while t.has_next(): batch = t.next(BATCH_SIZE) batch_num += 1 for title in batch: print(f" {title}") # Do other intermediary work here between batch calls print(f"Batch {batch_num} processing complete\n")
例 Go
// The Go driver does not support next(n), so batches are accumulated manually. g := gremlingo.Traversal_().WithRemote(connection) resultSet, err := g.V().HasLabel("movie").Values("title").Limit(1000).GetResultSet() if err != nil { log.Fatal(err) } batchSize := 10 batchNum := 0 for { var batch []interface{} for i := 0; i < batchSize; i++ { result, ok, err := resultSet.One() // returns (value, ok, error); ok is false when results are exhausted if err != nil { log.Fatal(err) } if !ok { break } batch = append(batch, result) } if len(batch) == 0 { break } batchNum++ for _, v := range batch { fmt.Printf(" %v\n", v) } // Do other intermediary work here between batch calls fmt.Printf("Batch %d processing complete\n\n", batchNum) }
例.NET
var g = Traversal().WithRemote(connection); var batchSize = 10; var batchNum = 0; var traversal = g.V().HasLabel("movie").Values<string>("title").Limit<string>(1000); while (traversal.HasNext()) { var batch = traversal.Next(batchSize); batchNum++; foreach (var title in batch) { Console.WriteLine($" {title}"); } // Do other intermediary work here between batch calls Console.WriteLine($"Batch {batchNum} processing complete\n"); }
例 Node.js
// The Node.js driver does not support next(n), so batches are accumulated manually. const g = traversal().withRemote(connection); const batchSize = 10; let batchNum = 0; const t = g.V().hasLabel('movie').values('title').limit(1000); while (true) { const batch = []; for (let i = 0; i < batchSize; i++) { const result = await t.next(); if (result.done) break; batch.push(result.value); } if (batch.length === 0) break; batchNum++; for (const title of batch) { console.log(` ${title}`); } // Do other intermediary work here between batch calls console.log(`Batch ${batchNum} processing complete\n`); }

切望度と増分消費量

ストリーミングを使用すると、追加のデータが取得されて返されるたびに、結果を段階的に処理できます。次のメソッドは、結果セット全体がメモリに収集されるまでブロックし、アプリケーションが結果に到達したときに動作しないようにします。

  • Java: toList()または toSet()

  • Python: toList()または toSet()

  • Go: ToList()ToSet()、または GetResultSet().GetAll()

  • .NET: ToList()または Promise()

  • Node.js: toList()

注記

これらのメソッドを使用しても、データは WebSocket 接続を介して段階的に流れます。違いは、コレクション全体が完了するまで、アプリケーションが個々の結果を処理できないことです。到着時に結果を処理するには、上記の例に示す遅延反復パターンまたはバッチパターンを使用します。