本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Gremlin 串流查詢結果
當您執行傳回大量結果的 Gremlin 周遊時,Neptune 會透過 WebSocket 連線將它們批次串流回用戶端。Neptune 會在產生結果批次時傳送結果批次,而無需等待用戶端請求更多結果。如果您想要處理從伺服器傳回的結果,但需要使用延遲反覆運算模式,以避免將完整的結果集收集到記憶體中,這會很有幫助。
Neptune 預設會以每個 WebSocket 影格 64 個批次傳送結果。您無法變更此伺服器端預設值,但可以使用 batchSizeTokens.ARGS_BATCH_SIZE中稱為 ,或connectionPool.resultIterationBatchSize作為驅動程式層級預設值) 從用戶端依請求覆寫批次大小。
如需在其他語言驅動程式batchSize中設定 的詳細資訊,請參閱 Apache TinkerPop Gremlin 驅動程式和變體文件中每個驅動程式的
由於伺服器會自動推送結果,透過 TCP 和 WebSocket 流程控制隱含地處理用戶端背壓。如果用戶端從通訊端讀取速度緩慢,伺服器的寫入最終會封鎖,直到用戶端趕上進度為止。
重要
串流對於可以逐步產生結果的周遊最有效。在發出結果之前,包含 order()、groupCount()、dedup()、 group()或其他需要完整周遊的步驟的周遊會導致 Neptune 在串流開始之前具體化記憶體中的整個結果集。在這些情況下,批次處理仍會減少每個影格序列化的額外負荷,但不會減少伺服器端記憶體使用量。
逐步使用結果
若要在結果到達時處理結果,請使用 hasNext() / next()或同等 APIs 來延遲重複,而不是將所有結果收集到清單中。您可以使用 next(batchSize) 在應用程式層級批次中提取結果,讓您在伺服器繼續產生結果時,在批次之間執行中繼工作。
範例 Java (GLV 位元組碼)
GraphTraversalSource g = traversal().withRemote(connection); int batchSize = 10; int batchNum = 0; var traversal = g.V().hasLabel("movie").values("title").limit(1000); while (traversal.hasNext()) { var batch = traversal.next(batchSize); batchNum++; for (var title : batch) { System.out.println(" " + title); } // Do other intermediary work here between batch calls System.out.println("Batch " + batchNum + " processing complete\n"); }
範例 Python
g = traversal().with_remote(connection) BATCH_SIZE = 10 batch_num = 0 t = g.V().has_label('movie').values('title').limit(1000) while t.has_next(): batch = t.next(BATCH_SIZE) batch_num += 1 for title in batch: print(f" {title}") # Do other intermediary work here between batch calls print(f"Batch {batch_num} processing complete\n")
範例 Go
// The Go driver does not support next(n), so batches are accumulated manually. g := gremlingo.Traversal_().WithRemote(connection) resultSet, err := g.V().HasLabel("movie").Values("title").Limit(1000).GetResultSet() if err != nil { log.Fatal(err) } batchSize := 10 batchNum := 0 for { var batch []interface{} for i := 0; i < batchSize; i++ { result, ok, err := resultSet.One() // returns (value, ok, error); ok is false when results are exhausted if err != nil { log.Fatal(err) } if !ok { break } batch = append(batch, result) } if len(batch) == 0 { break } batchNum++ for _, v := range batch { fmt.Printf(" %v\n", v) } // Do other intermediary work here between batch calls fmt.Printf("Batch %d processing complete\n\n", batchNum) }
範例.NET
var g = Traversal().WithRemote(connection); var batchSize = 10; var batchNum = 0; var traversal = g.V().HasLabel("movie").Values<string>("title").Limit<string>(1000); while (traversal.HasNext()) { var batch = traversal.Next(batchSize); batchNum++; foreach (var title in batch) { Console.WriteLine($" {title}"); } // Do other intermediary work here between batch calls Console.WriteLine($"Batch {batchNum} processing complete\n"); }
範例 Node.js
// The Node.js driver does not support next(n), so batches are accumulated manually. const g = traversal().withRemote(connection); const batchSize = 10; let batchNum = 0; const t = g.V().hasLabel('movie').values('title').limit(1000); while (true) { const batch = []; for (let i = 0; i < batchSize; i++) { const result = await t.next(); if (result.done) break; batch.push(result.value); } if (batch.length === 0) break; batchNum++; for (const title of batch) { console.log(` ${title}`); } // Do other intermediary work here between batch calls console.log(`Batch ${batchNum} processing complete\n`); }
Eager 與增量消耗
串流可讓您在擷取和傳回其他資料時,逐步處理結果。下列方法會封鎖 ,直到將整個結果集收集到記憶體中,讓您的應用程式無法在結果到達時對其採取行動:
Java:
toList()或toSet()Python:
toList()或toSet()Go:
ToSet()、ToList()或GetResultSet().GetAll().NET:
ToList()或Promise()Node.js:
toList()
注意
即使使用這些方法,資料仍會累加流經 WebSocket 連線。差別在於在整個集合完成之前,您的應用程式無法處理個別結果。若要在結果到達時進行處理,請使用上述範例中顯示的延遲反覆運算或批次模式。