翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Bolt プロトコルを使用したクエリ結果のストリーミング
多数のレコードを返す openCypher クエリを実行する場合、結果セット全体をクライアントメモリに一度にロードする必要はありません。Neptune は Bolt プロトコルを介した結果ストリーミングをサポートしています。これにより、ドライバーはレコードをバッチで取得し、到着時に段階的に処理できます。これは、サーバーから返される結果を処理するが、結果セット全体をメモリにロードしないように遅延反復を設定FetchSizeして使用する必要がある場合に便利です。
これは Bolt v4.0 PULL メカニズム
バッチサイズは、ドライバーのセッション設定の FetchSize設定を使用して制御します。遅延反復 (返されるたびにレコードを 1 つずつ処理) と組み合わせると、結果セットの合計の大きさに関係なく、クライアント側のメモリ使用量が制限されます。
重要
FetchSize は、サーバーとクライアント間のデータフローを制御します。Neptune がクエリを実行する方法には影響しません。Neptune は、RUN メッセージを受信するとすぐにクエリの実行と結果の生成を開始します。結果は、クライアントが PULL メッセージでリクエストするまで、サーバー側でバッファされます。クライアントが設定されたクエリタイムアウト内に結果を消費しない場合、Neptune はクエリを終了し、バッファされた結果を破棄し、サーバー側のリソースを解放します。サーバー側のバッファには個別の制限はありません。バッファリングは、クライアントが結果を取得するか、クエリのタイムアウトに達するまで続行されます。
ストリーミングは、結果を段階的に生成できる集計されていないクエリで最も効果的です。ORDER BY、集計関数 (count()、、 などsum()) を含むクエリcollect()、またはレコードを返す前に Neptune が完全な結果セットを計算するDISTINCT必要があるクエリ。このような場合でも、 はバッチあたりのクライアント側のメモリ使用量を制限FetchSizeしますが、サーバーはストリーミングを開始する前に結果セット全体をメモリに保持する必要があります。
注記
openCypher HTTPS エンドポイントは、HTTP チャンク転送エンコーディングを使用して結果を配信し、生成されたデータをクライアントにストリーミングします。ただし、クライアントは配信のペースを制御できません。HTTPS エンドポイントFetchSizeに に相当するものはありません。大きな結果セットでクライアント側のメモリ消費量を制御するには、 がFetchSize設定された Bolt ドライバー接続を使用します。
FetchSize の設定とバッチ処理の結果
セッションまたはドライバー設定FetchSizeで を設定し、PULL メッセージごとに Neptune からドライバーがリクエストしたレコードの数を制御します。は、アプリケーションレベルのバッチ処理ではなく、driver-to-serverフローFetchSizeを制御することに注意してください。アプリケーションレベルのバッチで結果を処理するには (レコードのグループ間で中間作業を実行するなど)、ドライバーの遅延イテレーターからレコードを蓄積するヘルパーメソッドを使用します。
次の例は、セッションFetchSizeで を設定する方法と、ヘルパーメソッドを使用してレコードを処理のためにアプリケーションレベルのバッチにグループ化する方法の両方を示しています。
例 Java
static void processBatches(Result result, int size, Consumer<List<Record>> handler) { var batch = new ArrayList<Record>(); while (result.hasNext()) { batch.add(result.next()); if (batch.size() >= size) { handler.accept(batch); batch = new ArrayList<>(); } } if (!batch.isEmpty()) handler.accept(batch); } // Usage: try (Session session = driver.session(SessionConfig.builder() .withFetchSize(50) .withDefaultAccessMode(AccessMode.READ) .build())) { Result result = session.run("MATCH (m:movie) RETURN m.title AS title"); processBatches(result, 10, batch -> { for (var record : batch) { System.out.println(record.get("title").asString()); } // Do other intermediary work here between batch calls }); result.consume(); }
例 Python
async def batched(result, size): batch = [] async for record in result: batch.append(record) if len(batch) >= size: yield batch batch = [] if batch: yield batch # Usage: async with driver.session(fetch_size=50) as session: result = await session.run("MATCH (m:movie) RETURN m.title AS title") async for batch in batched(result, 10): for record in batch: print(record["title"]) # Do other intermediary work here between batch calls
例 Go
func processBatches(ctx context.Context, result neo4j.ResultWithContext, size int, handler func([]*neo4j.Record)) error { var batch []*neo4j.Record for result.Next(ctx) { batch = append(batch, result.Record()) if len(batch) >= size { handler(batch) batch = nil } } if len(batch) > 0 { handler(batch) } return result.Err() } // Usage: session := driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, FetchSize: 50, }) defer session.Close(ctx) result, err := session.Run(ctx, "MATCH (m:movie) RETURN m.title AS title", nil) if err != nil { log.Fatal(err) } if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) { for _, record := range batch { title, _ := record.Get("title") fmt.Println(title) } // Do other intermediary work here between batch calls }); err != nil { log.Fatal(err) }
例.NET
static async IAsyncEnumerable<List<IRecord>> Batched( IResultCursor result, int size) { var batch = new List<IRecord>(); while (await result.FetchAsync()) { batch.Add(result.Current); if (batch.Count >= size) { yield return batch; batch = new List<IRecord>(); } } if (batch.Count > 0) yield return batch; } // Usage: await using var session = driver.AsyncSession(o => o .WithFetchSize(50) .WithDefaultAccessMode(AccessMode.Read)); var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title"); await foreach (var batch in Batched(result, 10)) { foreach (var record in batch) { Console.WriteLine(record["title"].As<string>()); } // Do other intermediary work here between batch calls }
例 Node.js
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+). // This is pull-based: the driver pauses fetching while the loop body // executes, providing natural backpressure. This is preferred over // result.subscribe(), which is push-based and continues delivering // records during async processing. async function* batched(result, size) { let batch = []; for await (const record of result) { batch.push(record); if (batch.length >= size) { yield batch; batch = []; } } if (batch.length > 0) yield batch; } // Usage: const session = driver.session({ defaultAccessMode: neo4j.session.READ, fetchSize: 50 }); try { const result = session.run('MATCH (m:movie) RETURN m.title AS title'); for await (const batch of batched(result, 10)) { for (const record of batch) { console.log(record.get('title')); } // Do other intermediary work here between batch calls } } finally { await session.close(); }
切望度と増分消費量
次のメソッドは、結果セット全体がメモリに収集されるまでブロックし、アプリケーションが結果に到達したときに動作しないようにします。
Java:
result.list()Python:
result.data()、list(result)Go:
result.Collect(ctx)、neo4j.EagerResultTransformer.NET:
result.ToListAsync()Node.js:
const { records } = await result
結果を段階的に処理するには、上記の例に示す遅延反復パターンとバッチヘルパーパターンを使用します。