View a markdown version of this page

使用 Bolt 协议流式传输查询结果 - Amazon Neptune

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Bolt 协议流式传输查询结果

当您运行返回大量记录的 OpenCypher 查询时,不必同时将整个结果集加载到客户端内存中。Neptune 支持通过 Bolt 协议传输结果,该协议允许驱动程序批量获取记录,并在记录到达时对其进行增量处理。如果你想在从服务器返回结果时处理结果,但需要配置FetchSize和使用延迟迭代来避免将完整的结果集加载到内存中,这可能很有好处。

这通过 Bolt v4.0 PULL 机制起作用。驱动程序不会在一条消息中请求所有记录,而是发送批量大小的 PULL 请求。例如,“把接下来的 50 条记录给我。” Neptune 发回了该数量的记录,并附上一个标志,表示是否还有更多记录可用。然后,当客户端准备就绪时,驱动程序会请求下一批,并一直持续到结果集被完全消耗为止。

您可以使用驱动程序会话配置中的FetchSize设置来控制批次大小。再加上延迟迭代(在返回记录时一次处理一个记录),无论结果集总量有多大,这都会限制客户端的内存使用量。

重要

FetchSize控制服务器和客户端之间的数据流。它不会影响 Neptune 执行查询的方式。Neptune 在收到 RUN 消息后立即开始执行查询并生成结果。结果在服务器端缓存,直到客户端通过 PULL 消息请求结果。如果客户端未在配置的查询超时时间内使用结果,Neptune 将终止查询,丢弃缓冲的结果并释放服务器端资源。没有单独的服务器端缓冲区限制;缓冲会一直持续到客户端提取结果或达到查询超时为止。

对于可以逐步生成结果的非聚合查询,流式处理最为有效。包含ORDER BY聚合函数(例如count()collect()、或sum())或DISTINCT要求 Neptune 在返回任何记录之前计算完整结果集的查询。在这些情况下,FetchSize仍会限制每批次的客户端内存使用量,但是在开始流式传输之前,服务器必须将整个结果集保存在内存中。

注意

OpenCypher HTTPS 端点使用 HTTP 分块传输编码提供结果,该编码在生成数据时将数据流式传输到客户端。但是,客户无法控制交付的速度。HTTPS 终端节点没有等效项。FetchSize要控制大型结果集的客户端内存消耗,请使用FetchSize已配置的 Bolt 驱动程序连接。

批量配置 FetchSize 和处理结果

设置FetchSize会话或驱动程序配置,以控制每条 PULL 消息记录来自 Neptune 的驱动程序请求的数量。请注意,它FetchSize控制的是 driver-to-server流程,而不是应用程序级批处理。要处理应用程序级批处理结果(例如,在记录组之间执行中间工作),请使用从驱动程序的懒惰迭代器中累积记录的辅助方法。

以下示例显示了两者:在会话FetchSize上进行配置,以及使用辅助方法将记录分组为应用程序级批次进行处理。

例 Java
static void processBatches(Result result, int size, Consumer<List<Record>> handler) { var batch = new ArrayList<Record>(); while (result.hasNext()) { batch.add(result.next()); if (batch.size() >= size) { handler.accept(batch); batch = new ArrayList<>(); } } if (!batch.isEmpty()) handler.accept(batch); } // Usage: try (Session session = driver.session(SessionConfig.builder() .withFetchSize(50) .withDefaultAccessMode(AccessMode.READ) .build())) { Result result = session.run("MATCH (m:movie) RETURN m.title AS title"); processBatches(result, 10, batch -> { for (var record : batch) { System.out.println(record.get("title").asString()); } // Do other intermediary work here between batch calls }); result.consume(); }
例 Python
async def batched(result, size): batch = [] async for record in result: batch.append(record) if len(batch) >= size: yield batch batch = [] if batch: yield batch # Usage: async with driver.session(fetch_size=50) as session: result = await session.run("MATCH (m:movie) RETURN m.title AS title") async for batch in batched(result, 10): for record in batch: print(record["title"]) # Do other intermediary work here between batch calls
例 Go
func processBatches(ctx context.Context, result neo4j.ResultWithContext, size int, handler func([]*neo4j.Record)) error { var batch []*neo4j.Record for result.Next(ctx) { batch = append(batch, result.Record()) if len(batch) >= size { handler(batch) batch = nil } } if len(batch) > 0 { handler(batch) } return result.Err() } // Usage: session := driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, FetchSize: 50, }) defer session.Close(ctx) result, err := session.Run(ctx, "MATCH (m:movie) RETURN m.title AS title", nil) if err != nil { log.Fatal(err) } if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) { for _, record := range batch { title, _ := record.Get("title") fmt.Println(title) } // Do other intermediary work here between batch calls }); err != nil { log.Fatal(err) }
例.NET
static async IAsyncEnumerable<List<IRecord>> Batched( IResultCursor result, int size) { var batch = new List<IRecord>(); while (await result.FetchAsync()) { batch.Add(result.Current); if (batch.Count >= size) { yield return batch; batch = new List<IRecord>(); } } if (batch.Count > 0) yield return batch; } // Usage: await using var session = driver.AsyncSession(o => o .WithFetchSize(50) .WithDefaultAccessMode(AccessMode.Read)); var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title"); await foreach (var batch in Batched(result, 10)) { foreach (var record in batch) { Console.WriteLine(record["title"].As<string>()); } // Do other intermediary work here between batch calls }
例 Node.js
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+). // This is pull-based: the driver pauses fetching while the loop body // executes, providing natural backpressure. This is preferred over // result.subscribe(), which is push-based and continues delivering // records during async processing. async function* batched(result, size) { let batch = []; for await (const record of result) { batch.push(record); if (batch.length >= size) { yield batch; batch = []; } } if (batch.length > 0) yield batch; } // Usage: const session = driver.session({ defaultAccessMode: neo4j.session.READ, fetchSize: 50 }); try { const result = session.run('MATCH (m:movie) RETURN m.title AS title'); for await (const batch of batched(result, 10)) { for (const record of batch) { console.log(record.get('title')); } // Do other intermediary work here between batch calls } } finally { await session.close(); }

急需消费与增量消费

以下方法会一直阻塞,直到将整个结果集收集到内存中,从而防止您的应用程序在结果到达时对其进行操作:

  • Java:result.list()

  • Python:result.data()list(result)

  • 去:result.Collect(ctx)neo4j.EagerResultTransformer

  • .NET:result.ToListAsync()

  • Node.js:const { records } = await result

要以增量方式处理结果,请使用上面示例中显示的延迟迭代和批处理帮助器模式。