기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Bolt 프로토콜을 사용하여 쿼리 결과 스트리밍
많은 수의 레코드를 반환하는 openCypher 쿼리를 실행할 때 전체 결과 세트를 클라이언트 메모리에 한 번에 로드할 필요가 없습니다. Neptune은 Bolt 프로토콜을 통한 결과 스트리밍을 지원하므로 드라이버는 레코드를 배치로 가져와 도착 시 점진적으로 처리할 수 있습니다. 이는 서버에서 반환되는 결과를 처리하려는 경우 유리할 수 있지만 전체 결과 세트를 메모리에 로드하지 않도록 지연 반복을 구성FetchSize하고 사용해야 합니다.
이는 Bolt v4.0 PULL 메커니즘
드라이버의 세션 구성에서 FetchSize 설정을 사용하여 배치 크기를 제어합니다. 지연 반복(레코드가 반환될 때 한 번에 하나씩 레코드 처리)과 결합하면 총 결과 집합의 크기에 관계없이 클라이언트 측 메모리 사용량이 제한됩니다.
중요
FetchSize는 서버와 클라이언트 간의 데이터 흐름을 제어합니다. Neptune이 쿼리를 실행하는 방식에는 영향을 주지 않습니다. Neptune은 RUN 메시지를 수신하는 즉시 쿼리를 실행하고 결과를 생성하기 시작합니다. 클라이언트가 PULL 메시지와 함께 요청할 때까지 서버 측에서 결과가 버퍼링됩니다. 클라이언트가 구성된 쿼리 제한 시간 내에 결과를 사용하지 않는 경우 Neptune은 쿼리를 종료하고 버퍼링된 결과를 삭제하며 서버 측 리소스를 릴리스합니다. 별도의 서버 측 버퍼 제한은 없습니다. 클라이언트가 결과를 가져오거나 쿼리 제한 시간에 도달할 때까지 버퍼링이 계속됩니다.
스트리밍은 결과를 점진적으로 생성할 수 있는 비집계 쿼리에 가장 효과적입니다. ORDER BY, 집계 함수(예: count(), collect()또는 sum())를 포함하거나 레코드를 반환하기 전에 Neptune이 전체 결과 세트를 계산DISTINCT해야 하는 쿼리입니다. 이러한 경우는 여전히 배치당 클라이언트 측 메모리 사용량을 FetchSize 제한하지만 스트리밍이 시작되기 전에 서버가 메모리에 전체 결과 세트를 보유해야 합니다.
참고
openCypher HTTPS 엔드포인트는 데이터가 생성될 때 클라이언트로 데이터를 스트리밍하는 HTTP 청크 전송 인코딩을 사용하여 결과를 전송합니다. 그러나 클라이언트는 전송 속도를 제어할 수 없습니다. HTTPS 엔드포인트에 FetchSize 대해와 동등한 것은 없습니다. 대규모 결과 집합으로 클라이언트 측 메모리 소비를 제어하려면가 FetchSize 구성된 Bolt 드라이버 연결을 사용합니다.
FetchSize 구성 및 배치로 결과 처리
세션 또는 드라이버 구성FetchSize에서를 설정하여 PULL 메시지당 Neptune의 드라이버 요청을 기록하는 수를 제어합니다. 는 애플리케이션 수준 일괄 처리가 아닌 driver-to-server 흐름을 FetchSize 제어합니다. 결과를 애플리케이션 수준 배치로 처리하려면(예: 레코드 그룹 간 중간 작업 수행) 드라이버의 지연 반복자에서 레코드를 누적하는 헬퍼 메서드를 사용합니다.
다음 예제는 세션FetchSize에서를 구성하고 헬퍼 메서드를 사용하여 처리를 위해 레코드를 애플리케이션 수준 배치로 그룹화하는 것을 보여줍니다.
예 Java
static void processBatches(Result result, int size, Consumer<List<Record>> handler) { var batch = new ArrayList<Record>(); while (result.hasNext()) { batch.add(result.next()); if (batch.size() >= size) { handler.accept(batch); batch = new ArrayList<>(); } } if (!batch.isEmpty()) handler.accept(batch); } // Usage: try (Session session = driver.session(SessionConfig.builder() .withFetchSize(50) .withDefaultAccessMode(AccessMode.READ) .build())) { Result result = session.run("MATCH (m:movie) RETURN m.title AS title"); processBatches(result, 10, batch -> { for (var record : batch) { System.out.println(record.get("title").asString()); } // Do other intermediary work here between batch calls }); result.consume(); }
예 Python
async def batched(result, size): batch = [] async for record in result: batch.append(record) if len(batch) >= size: yield batch batch = [] if batch: yield batch # Usage: async with driver.session(fetch_size=50) as session: result = await session.run("MATCH (m:movie) RETURN m.title AS title") async for batch in batched(result, 10): for record in batch: print(record["title"]) # Do other intermediary work here between batch calls
예 Go
func processBatches(ctx context.Context, result neo4j.ResultWithContext, size int, handler func([]*neo4j.Record)) error { var batch []*neo4j.Record for result.Next(ctx) { batch = append(batch, result.Record()) if len(batch) >= size { handler(batch) batch = nil } } if len(batch) > 0 { handler(batch) } return result.Err() } // Usage: session := driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, FetchSize: 50, }) defer session.Close(ctx) result, err := session.Run(ctx, "MATCH (m:movie) RETURN m.title AS title", nil) if err != nil { log.Fatal(err) } if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) { for _, record := range batch { title, _ := record.Get("title") fmt.Println(title) } // Do other intermediary work here between batch calls }); err != nil { log.Fatal(err) }
예.NET
static async IAsyncEnumerable<List<IRecord>> Batched( IResultCursor result, int size) { var batch = new List<IRecord>(); while (await result.FetchAsync()) { batch.Add(result.Current); if (batch.Count >= size) { yield return batch; batch = new List<IRecord>(); } } if (batch.Count > 0) yield return batch; } // Usage: await using var session = driver.AsyncSession(o => o .WithFetchSize(50) .WithDefaultAccessMode(AccessMode.Read)); var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title"); await foreach (var batch in Batched(result, 10)) { foreach (var record in batch) { Console.WriteLine(record["title"].As<string>()); } // Do other intermediary work here between batch calls }
예 Node.js
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+). // This is pull-based: the driver pauses fetching while the loop body // executes, providing natural backpressure. This is preferred over // result.subscribe(), which is push-based and continues delivering // records during async processing. async function* batched(result, size) { let batch = []; for await (const record of result) { batch.push(record); if (batch.length >= size) { yield batch; batch = []; } } if (batch.length > 0) yield batch; } // Usage: const session = driver.session({ defaultAccessMode: neo4j.session.READ, fetchSize: 50 }); try { const result = session.run('MATCH (m:movie) RETURN m.title AS title'); for await (const batch of batched(result, 10)) { for (const record of batch) { console.log(record.get('title')); } // Do other intermediary work here between batch calls } } finally { await session.close(); }
Eager와 증분 소비 비교
다음 메서드는 전체 결과 집합이 메모리에 수집될 때까지 차단하여 애플리케이션이 결과가 도착할 때 결과에 대한 작업을 수행하지 못하게 합니다.
Java:
result.list()Python:
result.data(),list(result)이동:
result.Collect(ctx),neo4j.EagerResultTransformer.NET:
result.ToListAsync()Node.js:
const { records } = await result
결과를 점진적으로 처리하려면 위 예제에 표시된 지연 반복 및 배치 헬퍼 패턴을 사용합니다.