View a markdown version of this page

使用 Bolt 通訊協定串流查詢結果 - Amazon Neptune

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Bolt 通訊協定串流查詢結果

當您執行傳回大量記錄的 openCypher 查詢時,您不需要一次將整個結果集載入用戶端記憶體。Neptune 支援透過 Bolt 通訊協定進行結果串流,可讓驅動程式分批擷取記錄,並在記錄送達時遞增處理。如果您想要處理從伺服器傳回的結果,但需要設定FetchSize和使用延遲反覆運算,以避免將完整的結果集載入記憶體,這會很有幫助。

這可透過 Bolt v4.0 PULL 機制運作。驅動程式不會在單一訊息中請求所有記錄,而是傳送批次大小的 PULL 請求。例如,「提供我接下來的 50 筆記錄。」 Neptune 會傳回該筆記錄數量,以及指出是否有更多可用的旗標。然後,當用戶端就緒時,驅動程式會請求下一個批次,這會持續到結果集用完為止。

您可以使用驅動程式工作階段組態中的 FetchSize設定來控制批次大小。結合延遲反覆運算 (在傳回時一次處理一個記錄),這會限制用戶端記憶體用量,無論總結果集的大小為何。

重要

FetchSize 控制伺服器和用戶端之間的資料流程。它不會影響 Neptune 執行查詢的方式。Neptune 在收到 RUN 訊息後立即開始執行查詢並產生結果。結果會在伺服器端緩衝,直到用戶端使用 PULL 訊息請求結果為止。如果用戶端未在設定的查詢逾時內取用結果,Neptune 會終止查詢、捨棄緩衝的結果,並釋出伺服器端資源。沒有單獨的伺服器端緩衝區限制;緩衝會繼續進行,直到用戶端提取結果或達到查詢逾時為止。

串流對於可以遞增產生結果的非彙總查詢最有效。查詢包含 ORDER BY、彙總函數 (例如 count()、 或 sum())collect(),或DISTINCT要求 Neptune 在傳回任何記錄之前計算完整的結果集。在這些情況下, FetchSize 仍會限制每個批次的用戶端記憶體用量,但伺服器必須先將整個結果集保留在記憶體中,才能開始串流。

注意

openCypher HTTPS 端點使用 HTTP 區塊傳輸編碼交付結果,其會在產生資料時將資料串流至用戶端。不過,用戶端無法控制交付速度。HTTPS 端點沒有同等FetchSize的 。若要使用大型結果集控制用戶端記憶體耗用,請使用已FetchSize設定的 Bolt 驅動程式連線。

分批設定 FetchSize 和處理結果

在工作階段或驅動程式組態FetchSize上設定 ,以控制每個 PULL 訊息從 Neptune 請求多少筆記錄。請注意, FetchSize控制driver-to-server的流程,而非應用程式層級批次處理。若要處理應用程式層級批次的結果 (例如,若要在記錄群組之間執行中繼工作),請使用協助程式方法,從驅動程式的延遲迭代器累積記錄。

下列範例顯示兩者:在工作階段FetchSize上設定 ,以及使用協助程式方法來將記錄分組為應用程式層級批次進行處理。

範例 Java
static void processBatches(Result result, int size, Consumer<List<Record>> handler) { var batch = new ArrayList<Record>(); while (result.hasNext()) { batch.add(result.next()); if (batch.size() >= size) { handler.accept(batch); batch = new ArrayList<>(); } } if (!batch.isEmpty()) handler.accept(batch); } // Usage: try (Session session = driver.session(SessionConfig.builder() .withFetchSize(50) .withDefaultAccessMode(AccessMode.READ) .build())) { Result result = session.run("MATCH (m:movie) RETURN m.title AS title"); processBatches(result, 10, batch -> { for (var record : batch) { System.out.println(record.get("title").asString()); } // Do other intermediary work here between batch calls }); result.consume(); }
範例 Python
async def batched(result, size): batch = [] async for record in result: batch.append(record) if len(batch) >= size: yield batch batch = [] if batch: yield batch # Usage: async with driver.session(fetch_size=50) as session: result = await session.run("MATCH (m:movie) RETURN m.title AS title") async for batch in batched(result, 10): for record in batch: print(record["title"]) # Do other intermediary work here between batch calls
範例 Go
func processBatches(ctx context.Context, result neo4j.ResultWithContext, size int, handler func([]*neo4j.Record)) error { var batch []*neo4j.Record for result.Next(ctx) { batch = append(batch, result.Record()) if len(batch) >= size { handler(batch) batch = nil } } if len(batch) > 0 { handler(batch) } return result.Err() } // Usage: session := driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, FetchSize: 50, }) defer session.Close(ctx) result, err := session.Run(ctx, "MATCH (m:movie) RETURN m.title AS title", nil) if err != nil { log.Fatal(err) } if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) { for _, record := range batch { title, _ := record.Get("title") fmt.Println(title) } // Do other intermediary work here between batch calls }); err != nil { log.Fatal(err) }
範例.NET
static async IAsyncEnumerable<List<IRecord>> Batched( IResultCursor result, int size) { var batch = new List<IRecord>(); while (await result.FetchAsync()) { batch.Add(result.Current); if (batch.Count >= size) { yield return batch; batch = new List<IRecord>(); } } if (batch.Count > 0) yield return batch; } // Usage: await using var session = driver.AsyncSession(o => o .WithFetchSize(50) .WithDefaultAccessMode(AccessMode.Read)); var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title"); await foreach (var batch in Batched(result, 10)) { foreach (var record in batch) { Console.WriteLine(record["title"].As<string>()); } // Do other intermediary work here between batch calls }
範例 Node.js
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+). // This is pull-based: the driver pauses fetching while the loop body // executes, providing natural backpressure. This is preferred over // result.subscribe(), which is push-based and continues delivering // records during async processing. async function* batched(result, size) { let batch = []; for await (const record of result) { batch.push(record); if (batch.length >= size) { yield batch; batch = []; } } if (batch.length > 0) yield batch; } // Usage: const session = driver.session({ defaultAccessMode: neo4j.session.READ, fetchSize: 50 }); try { const result = session.run('MATCH (m:movie) RETURN m.title AS title'); for await (const batch of batched(result, 10)) { for (const record of batch) { console.log(record.get('title')); } // Do other intermediary work here between batch calls } } finally { await session.close(); }

Eager 與增量消耗

下列方法會封鎖 ,直到將整個結果集收集到記憶體中,讓您的應用程式無法在結果到達時對其採取行動:

  • Java:result.list()

  • Python:result.data()list(result)

  • Go:result.Collect(ctx)neo4j.EagerResultTransformer

  • .NET:result.ToListAsync()

  • Node.js: const { records } = await result

若要逐步處理結果,請使用上述範例中顯示的延遲反覆運算和批次協助程式模式。