

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 使用 Bolt 通訊協定串流查詢結果
<a name="access-graph-opencypher-streaming"></a>

當您執行傳回大量記錄的 openCypher 查詢時，您不需要一次將整個結果集載入用戶端記憶體。Neptune 支援透過 Bolt 通訊協定進行結果串流，可讓驅動程式分批擷取記錄，並在記錄送達時遞增處理。如果您想要處理從伺服器傳回的結果，但需要設定`FetchSize`和使用延遲反覆運算，以避免將完整的結果集載入記憶體，這會很有幫助。

這可透過 [Bolt v4.0 PULL 機制運作。](https://neo4j.com/docs/bolt/current/bolt/message/#messages-pull)驅動程式不會在單一訊息中請求所有記錄，而是傳送批次大小的 PULL 請求。例如，「提供我接下來的 50 筆記錄。」 Neptune 會傳回該筆記錄數量，以及指出是否有更多可用的旗標。然後，當用戶端就緒時，驅動程式會請求下一個批次，這會持續到結果集用完為止。

您可以使用驅動程式工作階段組態中的 `FetchSize`設定來控制批次大小。結合延遲反覆運算 （在傳回時一次處理一個記錄），這會限制用戶端記憶體用量，無論總結果集的大小為何。

**重要**  
`FetchSize` 控制伺服器和用戶端之間的資料流程。它不會影響 Neptune 執行查詢的方式。Neptune 在收到 RUN 訊息後立即開始執行查詢並產生結果。結果會在伺服器端緩衝，直到用戶端使用 PULL 訊息請求結果為止。如果用戶端未在設定的查詢逾時內取用結果，Neptune 會終止查詢、捨棄緩衝的結果，並釋出伺服器端資源。沒有單獨的伺服器端緩衝區限制；緩衝會繼續進行，直到用戶端提取結果或達到查詢逾時為止。  
串流對於可以遞增產生結果的非彙總查詢最有效。查詢包含 `ORDER BY`、彙總函數 （例如 `count()`、 或 `sum()`)`collect()`，或`DISTINCT`要求 Neptune 在傳回任何記錄之前計算完整的結果集。在這些情況下， `FetchSize` 仍會限制每個批次的用戶端記憶體用量，但伺服器必須先將整個結果集保留在記憶體中，才能開始串流。

**注意**  
openCypher HTTPS 端點使用 HTTP 區塊傳輸編碼交付結果，其會在產生資料時將資料串流至用戶端。不過，用戶端無法控制交付速度。HTTPS 端點沒有同等`FetchSize`的 。若要使用大型結果集控制用戶端記憶體耗用，請使用已`FetchSize`設定的 Bolt 驅動程式連線。

## 分批設定 FetchSize 和處理結果
<a name="access-graph-opencypher-streaming-usage"></a>

在工作階段或驅動程式組態`FetchSize`上設定 ，以控制每個 PULL 訊息從 Neptune 請求多少筆記錄。請注意， `FetchSize`控制driver-to-server的流程，而非應用程式層級批次處理。若要處理應用程式層級批次的結果 （例如，若要在記錄群組之間執行中繼工作），請使用協助程式方法，從驅動程式的延遲迭代器累積記錄。

下列範例顯示兩者：在工作階段`FetchSize`上設定 ，以及使用協助程式方法來將記錄分組為應用程式層級批次進行處理。

**Example Java**  

```
static void processBatches(Result result, int size,
        Consumer<List<Record>> handler) {
    var batch = new ArrayList<Record>();
    while (result.hasNext()) {
        batch.add(result.next());
        if (batch.size() >= size) {
            handler.accept(batch);
            batch = new ArrayList<>();
        }
    }
    if (!batch.isEmpty()) handler.accept(batch);
}

// Usage:
try (Session session = driver.session(SessionConfig.builder()
        .withFetchSize(50)
        .withDefaultAccessMode(AccessMode.READ)
        .build())) {

    Result result = session.run("MATCH (m:movie) RETURN m.title AS title");
    processBatches(result, 10, batch -> {
        for (var record : batch) {
            System.out.println(record.get("title").asString());
        }

        // Do other intermediary work here between batch calls
    });
    result.consume();
}
```

**Example Python**  

```
async def batched(result, size):
    batch = []
    async for record in result:
        batch.append(record)
        if len(batch) >= size:
            yield batch
            batch = []
    if batch:
        yield batch

# Usage:
async with driver.session(fetch_size=50) as session:
    result = await session.run("MATCH (m:movie) RETURN m.title AS title")
    async for batch in batched(result, 10):
        for record in batch:
            print(record["title"])

        # Do other intermediary work here between batch calls
```

**Example Go**  

```
func processBatches(ctx context.Context, result neo4j.ResultWithContext,
        size int, handler func([]*neo4j.Record)) error {
    var batch []*neo4j.Record
    for result.Next(ctx) {
        batch = append(batch, result.Record())
        if len(batch) >= size {
            handler(batch)
            batch = nil
        }
    }
    if len(batch) > 0 {
        handler(batch)
    }
    return result.Err()
}

// Usage:
session := driver.NewSession(ctx, neo4j.SessionConfig{
    AccessMode: neo4j.AccessModeRead,
    FetchSize:  50,
})
defer session.Close(ctx)

result, err := session.Run(ctx,
    "MATCH (m:movie) RETURN m.title AS title", nil)
if err != nil {
    log.Fatal(err)
}

if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) {
    for _, record := range batch {
        title, _ := record.Get("title")
        fmt.Println(title)
    }

    // Do other intermediary work here between batch calls
}); err != nil {
    log.Fatal(err)
}
```

**Example .NET**  

```
static async IAsyncEnumerable<List<IRecord>> Batched(
        IResultCursor result, int size)
{
    var batch = new List<IRecord>();
    while (await result.FetchAsync())
    {
        batch.Add(result.Current);
        if (batch.Count >= size)
        {
            yield return batch;
            batch = new List<IRecord>();
        }
    }
    if (batch.Count > 0) yield return batch;
}

// Usage:
await using var session = driver.AsyncSession(o => o
    .WithFetchSize(50)
    .WithDefaultAccessMode(AccessMode.Read));

var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title");
await foreach (var batch in Batched(result, 10))
{
    foreach (var record in batch)
    {
        Console.WriteLine(record["title"].As<string>());
    }

    // Do other intermediary work here between batch calls
}
```

**Example Node.js**  

```
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+).
// This is pull-based: the driver pauses fetching while the loop body
// executes, providing natural backpressure. This is preferred over
// result.subscribe(), which is push-based and continues delivering
// records during async processing.
async function* batched(result, size) {
    let batch = [];
    for await (const record of result) {
        batch.push(record);
        if (batch.length >= size) {
            yield batch;
            batch = [];
        }
    }
    if (batch.length > 0) yield batch;
}

// Usage:
const session = driver.session({
    defaultAccessMode: neo4j.session.READ,
    fetchSize: 50
});

try {
    const result = session.run('MATCH (m:movie) RETURN m.title AS title');
    for await (const batch of batched(result, 10)) {
        for (const record of batch) {
            console.log(record.get('title'));
        }

        // Do other intermediary work here between batch calls
    }
} finally {
    await session.close();
}
```

## Eager 與增量消耗
<a name="access-graph-opencypher-streaming-avoid"></a>

下列方法會封鎖 ，直到將整個結果集收集到記憶體中，讓您的應用程式無法在結果到達時對其採取行動：
+ **Java：**`result.list()`
+ **Python：**`result.data()`、 `list(result)`
+ **Go：**`result.Collect(ctx)`、 `neo4j.EagerResultTransformer`
+ **.NET：**`result.ToListAsync()`
+ **Node.js：** `const { records } = await result`

若要逐步處理結果，請使用上述範例中顯示的延遲反覆運算和批次協助程式模式。