

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 使用 Bolt 协议流式传输查询结果
<a name="access-graph-opencypher-streaming"></a>

当您运行返回大量记录的 OpenCypher 查询时，不必同时将整个结果集加载到客户端内存中。Neptune 支持通过 Bolt 协议传输结果，该协议允许驱动程序批量获取记录，并在记录到达时对其进行增量处理。如果你想在从服务器返回结果时处理结果，但需要配置`FetchSize`和使用延迟迭代来避免将完整的结果集加载到内存中，这可能很有好处。

这通过 [Bolt v4.0 PULL 机制起](https://neo4j.com/docs/bolt/current/bolt/message/#messages-pull)作用。驱动程序不会在一条消息中请求所有记录，而是发送批量大小的 PULL 请求。例如，“把接下来的 50 条记录给我。” Neptune 发回了该数量的记录，并附上一个标志，表示是否还有更多记录可用。然后，当客户端准备就绪时，驱动程序会请求下一批，并一直持续到结果集被完全消耗为止。

您可以使用驱动程序会话配置中的`FetchSize`设置来控制批次大小。再加上延迟迭代（在返回记录时一次处理一个记录），无论结果集总量有多大，这都会限制客户端的内存使用量。

**重要**  
`FetchSize`控制服务器和客户端之间的数据流。它不会影响 Neptune 执行查询的方式。Neptune 在收到 RUN 消息后立即开始执行查询并生成结果。结果在服务器端缓存，直到客户端通过 PULL 消息请求结果。如果客户端未在配置的查询超时时间内使用结果，Neptune 将终止查询，丢弃缓冲的结果并释放服务器端资源。没有单独的服务器端缓冲区限制；缓冲会一直持续到客户端提取结果或达到查询超时为止。  
对于可以逐步生成结果的非聚合查询，流式处理最为有效。包含`ORDER BY`聚合函数（例如`count()``collect()`、或`sum()`）或`DISTINCT`要求 Neptune 在返回任何记录之前计算完整结果集的查询。在这些情况下，`FetchSize`仍会限制每批次的客户端内存使用量，但是在开始流式传输之前，服务器必须将整个结果集保存在内存中。

**注意**  
OpenCypher HTTPS 端点使用 HTTP 分块传输编码提供结果，该编码在生成数据时将数据流式传输到客户端。但是，客户无法控制交付的速度。HTTPS 终端节点没有等效项。`FetchSize`要控制大型结果集的客户端内存消耗，请使用`FetchSize`已配置的 Bolt 驱动程序连接。

## 批量配置 FetchSize 和处理结果
<a name="access-graph-opencypher-streaming-usage"></a>

设置`FetchSize`会话或驱动程序配置，以控制每条 PULL 消息记录来自 Neptune 的驱动程序请求的数量。请注意，它`FetchSize`控制的是 driver-to-server流程，而不是应用程序级批处理。要处理应用程序级批处理结果（例如，在记录组之间执行中间工作），请使用从驱动程序的懒惰迭代器中累积记录的辅助方法。

以下示例显示了两者：在会话`FetchSize`上进行配置，以及使用辅助方法将记录分组为应用程序级批次进行处理。

**Example Java**  

```
static void processBatches(Result result, int size,
        Consumer<List<Record>> handler) {
    var batch = new ArrayList<Record>();
    while (result.hasNext()) {
        batch.add(result.next());
        if (batch.size() >= size) {
            handler.accept(batch);
            batch = new ArrayList<>();
        }
    }
    if (!batch.isEmpty()) handler.accept(batch);
}

// Usage:
try (Session session = driver.session(SessionConfig.builder()
        .withFetchSize(50)
        .withDefaultAccessMode(AccessMode.READ)
        .build())) {

    Result result = session.run("MATCH (m:movie) RETURN m.title AS title");
    processBatches(result, 10, batch -> {
        for (var record : batch) {
            System.out.println(record.get("title").asString());
        }

        // Do other intermediary work here between batch calls
    });
    result.consume();
}
```

**Example Python**  

```
async def batched(result, size):
    batch = []
    async for record in result:
        batch.append(record)
        if len(batch) >= size:
            yield batch
            batch = []
    if batch:
        yield batch

# Usage:
async with driver.session(fetch_size=50) as session:
    result = await session.run("MATCH (m:movie) RETURN m.title AS title")
    async for batch in batched(result, 10):
        for record in batch:
            print(record["title"])

        # Do other intermediary work here between batch calls
```

**Example Go**  

```
func processBatches(ctx context.Context, result neo4j.ResultWithContext,
        size int, handler func([]*neo4j.Record)) error {
    var batch []*neo4j.Record
    for result.Next(ctx) {
        batch = append(batch, result.Record())
        if len(batch) >= size {
            handler(batch)
            batch = nil
        }
    }
    if len(batch) > 0 {
        handler(batch)
    }
    return result.Err()
}

// Usage:
session := driver.NewSession(ctx, neo4j.SessionConfig{
    AccessMode: neo4j.AccessModeRead,
    FetchSize:  50,
})
defer session.Close(ctx)

result, err := session.Run(ctx,
    "MATCH (m:movie) RETURN m.title AS title", nil)
if err != nil {
    log.Fatal(err)
}

if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) {
    for _, record := range batch {
        title, _ := record.Get("title")
        fmt.Println(title)
    }

    // Do other intermediary work here between batch calls
}); err != nil {
    log.Fatal(err)
}
```

**Example .NET**  

```
static async IAsyncEnumerable<List<IRecord>> Batched(
        IResultCursor result, int size)
{
    var batch = new List<IRecord>();
    while (await result.FetchAsync())
    {
        batch.Add(result.Current);
        if (batch.Count >= size)
        {
            yield return batch;
            batch = new List<IRecord>();
        }
    }
    if (batch.Count > 0) yield return batch;
}

// Usage:
await using var session = driver.AsyncSession(o => o
    .WithFetchSize(50)
    .WithDefaultAccessMode(AccessMode.Read));

var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title");
await foreach (var batch in Batched(result, 10))
{
    foreach (var record in batch)
    {
        Console.WriteLine(record["title"].As<string>());
    }

    // Do other intermediary work here between batch calls
}
```

**Example Node.js**  

```
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+).
// This is pull-based: the driver pauses fetching while the loop body
// executes, providing natural backpressure. This is preferred over
// result.subscribe(), which is push-based and continues delivering
// records during async processing.
async function* batched(result, size) {
    let batch = [];
    for await (const record of result) {
        batch.push(record);
        if (batch.length >= size) {
            yield batch;
            batch = [];
        }
    }
    if (batch.length > 0) yield batch;
}

// Usage:
const session = driver.session({
    defaultAccessMode: neo4j.session.READ,
    fetchSize: 50
});

try {
    const result = session.run('MATCH (m:movie) RETURN m.title AS title');
    for await (const batch of batched(result, 10)) {
        for (const record of batch) {
            console.log(record.get('title'));
        }

        // Do other intermediary work here between batch calls
    }
} finally {
    await session.close();
}
```

## 急需消费与增量消费
<a name="access-graph-opencypher-streaming-avoid"></a>

以下方法会一直阻塞，直到将整个结果集收集到内存中，从而防止您的应用程序在结果到达时对其进行操作：
+ **Java：**`result.list()`
+ **Python:**`result.data()`，`list(result)`
+ **去：**`result.Collect(ctx)`，`neo4j.EagerResultTransformer`
+ **.NET：**`result.ToListAsync()`
+ **Node.js：**`const { records } = await result`

要以增量方式处理结果，请使用上面示例中显示的延迟迭代和批处理帮助器模式。