

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Bolt プロトコルを使用したクエリ結果のストリーミング
<a name="access-graph-opencypher-streaming"></a>

多数のレコードを返す openCypher クエリを実行する場合、結果セット全体をクライアントメモリに一度にロードする必要はありません。Neptune は Bolt プロトコルを介した結果ストリーミングをサポートしています。これにより、ドライバーはレコードをバッチで取得し、到着時に段階的に処理できます。これは、サーバーから返される結果を処理するが、結果セット全体をメモリにロードしないように遅延反復を設定`FetchSize`して使用する必要がある場合に便利です。

これは [Bolt v4.0 PULL メカニズム](https://neo4j.com/docs/bolt/current/bolt/message/#messages-pull)を介して機能します。1 つのメッセージ内のすべてのレコードをリクエストする代わりに、ドライバーはバッチサイズで PULL リクエストを送信します。たとえば、「次の 50 レコードを渡してください」と入力します。Neptune は、より多くのレコードが利用可能かどうかを示すフラグとともに、その数のレコードを返します。次に、ドライバーはクライアントの準備ができたら次のバッチをリクエストします。これは、結果セットが完全に消費されるまで続きます。

バッチサイズは、ドライバーのセッション設定の `FetchSize`設定を使用して制御します。遅延反復 (返されるたびにレコードを 1 つずつ処理) と組み合わせると、結果セットの合計の大きさに関係なく、クライアント側のメモリ使用量が制限されます。

**重要**  
`FetchSize` は、サーバーとクライアント間のデータフローを制御します。Neptune がクエリを実行する方法には影響しません。Neptune は、RUN メッセージを受信するとすぐにクエリの実行と結果の生成を開始します。結果は、クライアントが PULL メッセージでリクエストするまで、サーバー側でバッファされます。クライアントが設定されたクエリタイムアウト内に結果を消費しない場合、Neptune はクエリを終了し、バッファされた結果を破棄し、サーバー側のリソースを解放します。サーバー側のバッファには個別の制限はありません。バッファリングは、クライアントが結果を取得するか、クエリのタイムアウトに達するまで続行されます。  
ストリーミングは、結果を段階的に生成できる集計されていないクエリで最も効果的です。`ORDER BY`、集計関数 (`count()`、、 など`sum()`) を含むクエリ`collect()`、またはレコードを返す前に Neptune が完全な結果セットを計算する`DISTINCT`必要があるクエリ。このような場合でも、 はバッチあたりのクライアント側のメモリ使用量を制限`FetchSize`しますが、サーバーはストリーミングを開始する前に結果セット全体をメモリに保持する必要があります。

**注記**  
openCypher HTTPS エンドポイントは、HTTP チャンク転送エンコーディングを使用して結果を配信し、生成されたデータをクライアントにストリーミングします。ただし、クライアントは配信のペースを制御できません。HTTPS エンドポイント`FetchSize`に に相当するものはありません。大きな結果セットでクライアント側のメモリ消費量を制御するには、 が`FetchSize`設定された Bolt ドライバー接続を使用します。

## FetchSize の設定とバッチ処理の結果
<a name="access-graph-opencypher-streaming-usage"></a>

セッションまたはドライバー設定`FetchSize`で を設定し、PULL メッセージごとに Neptune からドライバーがリクエストしたレコードの数を制御します。は、アプリケーションレベルのバッチ処理ではなく、driver-to-serverフロー`FetchSize`を制御することに注意してください。アプリケーションレベルのバッチで結果を処理するには (レコードのグループ間で中間作業を実行するなど）、ドライバーの遅延イテレーターからレコードを蓄積するヘルパーメソッドを使用します。

次の例は、セッション`FetchSize`で を設定する方法と、ヘルパーメソッドを使用してレコードを処理のためにアプリケーションレベルのバッチにグループ化する方法の両方を示しています。

**Example Java**  

```
static void processBatches(Result result, int size,
        Consumer<List<Record>> handler) {
    var batch = new ArrayList<Record>();
    while (result.hasNext()) {
        batch.add(result.next());
        if (batch.size() >= size) {
            handler.accept(batch);
            batch = new ArrayList<>();
        }
    }
    if (!batch.isEmpty()) handler.accept(batch);
}

// Usage:
try (Session session = driver.session(SessionConfig.builder()
        .withFetchSize(50)
        .withDefaultAccessMode(AccessMode.READ)
        .build())) {

    Result result = session.run("MATCH (m:movie) RETURN m.title AS title");
    processBatches(result, 10, batch -> {
        for (var record : batch) {
            System.out.println(record.get("title").asString());
        }

        // Do other intermediary work here between batch calls
    });
    result.consume();
}
```

**Example Python**  

```
async def batched(result, size):
    batch = []
    async for record in result:
        batch.append(record)
        if len(batch) >= size:
            yield batch
            batch = []
    if batch:
        yield batch

# Usage:
async with driver.session(fetch_size=50) as session:
    result = await session.run("MATCH (m:movie) RETURN m.title AS title")
    async for batch in batched(result, 10):
        for record in batch:
            print(record["title"])

        # Do other intermediary work here between batch calls
```

**Example Go**  

```
func processBatches(ctx context.Context, result neo4j.ResultWithContext,
        size int, handler func([]*neo4j.Record)) error {
    var batch []*neo4j.Record
    for result.Next(ctx) {
        batch = append(batch, result.Record())
        if len(batch) >= size {
            handler(batch)
            batch = nil
        }
    }
    if len(batch) > 0 {
        handler(batch)
    }
    return result.Err()
}

// Usage:
session := driver.NewSession(ctx, neo4j.SessionConfig{
    AccessMode: neo4j.AccessModeRead,
    FetchSize:  50,
})
defer session.Close(ctx)

result, err := session.Run(ctx,
    "MATCH (m:movie) RETURN m.title AS title", nil)
if err != nil {
    log.Fatal(err)
}

if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) {
    for _, record := range batch {
        title, _ := record.Get("title")
        fmt.Println(title)
    }

    // Do other intermediary work here between batch calls
}); err != nil {
    log.Fatal(err)
}
```

**Example .NET**  

```
static async IAsyncEnumerable<List<IRecord>> Batched(
        IResultCursor result, int size)
{
    var batch = new List<IRecord>();
    while (await result.FetchAsync())
    {
        batch.Add(result.Current);
        if (batch.Count >= size)
        {
            yield return batch;
            batch = new List<IRecord>();
        }
    }
    if (batch.Count > 0) yield return batch;
}

// Usage:
await using var session = driver.AsyncSession(o => o
    .WithFetchSize(50)
    .WithDefaultAccessMode(AccessMode.Read));

var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title");
await foreach (var batch in Batched(result, 10))
{
    foreach (var record in batch)
    {
        Console.WriteLine(record["title"].As<string>());
    }

    // Do other intermediary work here between batch calls
}
```

**Example Node.js**  

```
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+).
// This is pull-based: the driver pauses fetching while the loop body
// executes, providing natural backpressure. This is preferred over
// result.subscribe(), which is push-based and continues delivering
// records during async processing.
async function* batched(result, size) {
    let batch = [];
    for await (const record of result) {
        batch.push(record);
        if (batch.length >= size) {
            yield batch;
            batch = [];
        }
    }
    if (batch.length > 0) yield batch;
}

// Usage:
const session = driver.session({
    defaultAccessMode: neo4j.session.READ,
    fetchSize: 50
});

try {
    const result = session.run('MATCH (m:movie) RETURN m.title AS title');
    for await (const batch of batched(result, 10)) {
        for (const record of batch) {
            console.log(record.get('title'));
        }

        // Do other intermediary work here between batch calls
    }
} finally {
    await session.close();
}
```

## 切望度と増分消費量
<a name="access-graph-opencypher-streaming-avoid"></a>

次のメソッドは、結果セット全体がメモリに収集されるまでブロックし、アプリケーションが結果に到達したときに動作しないようにします。
+ **Java:** `result.list()`
+ **Python:** `result.data()`、 `list(result)`
+ **Go:** `result.Collect(ctx)`、 `neo4j.EagerResultTransformer`
+ **.NET:** `result.ToListAsync()`
+ **Node.js:** `const { records } = await result`

結果を段階的に処理するには、上記の例に示す遅延反復パターンとバッチヘルパーパターンを使用します。