Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Streaming-Abfrageergebnisse mit dem Bolt-Protokoll
Wenn Sie eine OpenCypher-Abfrage ausführen, die eine große Anzahl von Datensätzen zurückgibt, müssen Sie nicht die gesamte Ergebnismenge auf einmal in den Client-Speicher laden. Neptune unterstützt das Streamen von Ergebnissen über das Bolt-Protokoll, sodass der Treiber Datensätze stapelweise abrufen und sie inkrementell verarbeiten kann, sobald sie ankommen. Dies kann von Vorteil sein, wenn Sie Ergebnisse so verarbeiten möchten, wie sie vom Server zurückgegeben werden, erfordert jedoch die Konfiguration FetchSize und Verwendung von verzögerter Iteration, um zu verhindern, dass die gesamte Ergebnismenge in den Speicher geladen wird.
Dies funktioniert über den Bolt v4.0-PULL-Mechanismus.
Sie steuern die Batchgröße mithilfe der FetchSize Einstellung in der Sitzungskonfiguration des Treibers. In Kombination mit verzögerter Iteration (die Datensätze werden einzeln verarbeitet, sobald sie zurückgegeben werden), begrenzt dies die clientseitige Speichernutzung, unabhängig davon, wie groß die Gesamtergebnismenge ist.
Wichtig
FetchSizesteuert den Datenfluss zwischen Server und Client. Es hat keinen Einfluss darauf, wie Neptune die Abfrage ausführt. Neptune beginnt mit der Ausführung der Abfrage und der Erzeugung von Ergebnissen, sobald es die RUN-Meldung empfängt. Die Ergebnisse werden serverseitig zwischengespeichert, bis der Client sie mit einer PULL-Nachricht anfordert. Wenn der Client innerhalb des konfigurierten Abfrage-Timeouts keine Ergebnisse verarbeitet, beendet Neptune die Abfrage, verwirft gepufferte Ergebnisse und gibt serverseitige Ressourcen frei. Es gibt kein separates serverseitiges Pufferlimit. Die Pufferung wird fortgesetzt, bis der Client die Ergebnisse abruft oder das Abfrage-Timeout erreicht ist.
Streaming ist am effektivsten bei Abfragen, die nicht aggregieren und inkrementell zu Ergebnissen führen können. AbfragenORDER BY, die Aggregationsfunktionen (wiecount(), collect() odersum()) enthalten oder DISTINCT erfordern, dass Neptune die vollständige Ergebnismenge berechnet, bevor Datensätze zurückgegeben werden. In diesen Fällen wird die clientseitige Speichernutzung pro Batch FetchSize immer noch begrenzt, aber der Server muss die gesamte Ergebnismenge im Speicher speichern, bevor das Streaming beginnt.
Anmerkung
Der OpenCypher HTTPS-Endpunkt liefert Ergebnisse mithilfe der HTTP-Chunked-Transferverschlüsselung, bei der Daten während der Erstellung an den Client gestreamt werden. Der Client kann das Tempo der Lieferung jedoch nicht kontrollieren. Es gibt kein Äquivalent FetchSize für den HTTPS-Endpunkt. Um den clientseitigen Speicherverbrauch bei großen Ergebnismengen zu kontrollieren, verwenden Sie eine Bolt-Treiberverbindung mit FetchSize konfiguriertem.
Konfiguration FetchSize und Verarbeitung von Ergebnissen in Batches
Legen Sie in FetchSize der Sitzungs- oder Treiberkonfiguration fest, wie viele Datensätze die Treiberanfragen von Neptune pro PULL-Nachricht enthalten. Beachten Sie, dass dadurch der driver-to-server Ablauf FetchSize gesteuert wird, nicht die Batchverarbeitung auf Anwendungsebene. Um Ergebnisse in Batches auf Anwendungsebene zu verarbeiten (z. B. um Zwischenarbeiten zwischen Datensatzgruppen durchzuführen), verwenden Sie eine Hilfsmethode, die Datensätze aus dem Lazy-Iterator des Treibers sammelt.
Die folgenden Beispiele zeigen beides: die Konfiguration in der Sitzung und die Verwendung einer Hilfsmethode, um Datensätze zur Verarbeitung in Batches FetchSize auf Anwendungsebene zu gruppieren.
Beispiel Java
static void processBatches(Result result, int size, Consumer<List<Record>> handler) { var batch = new ArrayList<Record>(); while (result.hasNext()) { batch.add(result.next()); if (batch.size() >= size) { handler.accept(batch); batch = new ArrayList<>(); } } if (!batch.isEmpty()) handler.accept(batch); } // Usage: try (Session session = driver.session(SessionConfig.builder() .withFetchSize(50) .withDefaultAccessMode(AccessMode.READ) .build())) { Result result = session.run("MATCH (m:movie) RETURN m.title AS title"); processBatches(result, 10, batch -> { for (var record : batch) { System.out.println(record.get("title").asString()); } // Do other intermediary work here between batch calls }); result.consume(); }
Beispiel Python
async def batched(result, size): batch = [] async for record in result: batch.append(record) if len(batch) >= size: yield batch batch = [] if batch: yield batch # Usage: async with driver.session(fetch_size=50) as session: result = await session.run("MATCH (m:movie) RETURN m.title AS title") async for batch in batched(result, 10): for record in batch: print(record["title"]) # Do other intermediary work here between batch calls
Beispiel Go
func processBatches(ctx context.Context, result neo4j.ResultWithContext, size int, handler func([]*neo4j.Record)) error { var batch []*neo4j.Record for result.Next(ctx) { batch = append(batch, result.Record()) if len(batch) >= size { handler(batch) batch = nil } } if len(batch) > 0 { handler(batch) } return result.Err() } // Usage: session := driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, FetchSize: 50, }) defer session.Close(ctx) result, err := session.Run(ctx, "MATCH (m:movie) RETURN m.title AS title", nil) if err != nil { log.Fatal(err) } if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) { for _, record := range batch { title, _ := record.Get("title") fmt.Println(title) } // Do other intermediary work here between batch calls }); err != nil { log.Fatal(err) }
Beispiel.NET
static async IAsyncEnumerable<List<IRecord>> Batched( IResultCursor result, int size) { var batch = new List<IRecord>(); while (await result.FetchAsync()) { batch.Add(result.Current); if (batch.Count >= size) { yield return batch; batch = new List<IRecord>(); } } if (batch.Count > 0) yield return batch; } // Usage: await using var session = driver.AsyncSession(o => o .WithFetchSize(50) .WithDefaultAccessMode(AccessMode.Read)); var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title"); await foreach (var batch in Batched(result, 10)) { foreach (var record in batch) { Console.WriteLine(record["title"].As<string>()); } // Do other intermediary work here between batch calls }
Beispiel Node.js
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+). // This is pull-based: the driver pauses fetching while the loop body // executes, providing natural backpressure. This is preferred over // result.subscribe(), which is push-based and continues delivering // records during async processing. async function* batched(result, size) { let batch = []; for await (const record of result) { batch.push(record); if (batch.length >= size) { yield batch; batch = []; } } if (batch.length > 0) yield batch; } // Usage: const session = driver.session({ defaultAccessMode: neo4j.session.READ, fetchSize: 50 }); try { const result = session.run('MATCH (m:movie) RETURN m.title AS title'); for await (const batch of batched(result, 10)) { for (const record of batch) { console.log(record.get('title')); } // Do other intermediary work here between batch calls } } finally { await session.close(); }
Eifriger Konsum versus inkrementeller Konsum
Die folgenden Methoden blockieren, bis die gesamte Ergebnismenge im Speicher erfasst ist. Dadurch wird verhindert, dass Ihre Anwendung auf Ergebnisse reagiert, sobald sie eintreffen:
Java:
result.list()Python:
result.data(),list(result)Gehe:
result.Collect(ctx),neo4j.EagerResultTransformer.NET:
result.ToListAsync()Node.js:
const { records } = await result
Um die Ergebnisse inkrementell zu verarbeiten, verwenden Sie die in den obigen Beispielen gezeigten Lazy-Iterations- und Batch-Helper-Muster.