Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Streaming dei risultati delle query con il protocollo Bolt
Quando si esegue una query OpenCypher che restituisce un numero elevato di record, non è necessario caricare l'intero set di risultati nella memoria del client in una sola volta. Neptune supporta lo streaming dei risultati tramite il protocollo Bolt, che consente al driver di recuperare i record in batch ed elaborarli in modo incrementale man mano che arrivano. Ciò può essere vantaggioso se si desidera elaborare i risultati man mano che vengono restituiti dal server, ma richiede la configurazione FetchSize e l'utilizzo dell'iterazione lenta per evitare di caricare l'intero set di risultati in memoria.
Questo funziona tramite il meccanismo BOLT v4.0 PULL.
È possibile controllare la dimensione del batch utilizzando l'FetchSizeimpostazione nella configurazione della sessione del driver. In combinazione con l'iterazione lenta (elaborazione dei record uno alla volta man mano che vengono restituiti), ciò limita l'utilizzo della memoria lato client indipendentemente dalla dimensione totale del set di risultati.
Importante
FetchSizecontrolla il flusso di dati tra server e client. Non influisce sul modo in cui Neptune esegue la query. Neptune inizia a eseguire la query e a produrre risultati non appena riceve il messaggio RUN. I risultati vengono memorizzati nel buffer lato server fino a quando il client non li richiede con un messaggio PULL. Se il client non consuma risultati entro il timeout della query configurato, Neptune termina la query, elimina i risultati memorizzati nel buffer e rilascia le risorse lato server. Non esiste un limite di buffer separato sul lato server; il buffering continua fino a quando il client non ottiene i risultati o non viene raggiunto il timeout della query.
Lo streaming è più efficace con le query non aggregate che possono produrre risultati in modo incrementale. Query che includono ORDER BY funzioni di aggregazione (come count()collect(), osum()) o che richiedono a DISTINCT Neptune di calcolare l'intero set di risultati prima di restituire qualsiasi record. In questi casi, limita FetchSize comunque l'utilizzo della memoria lato client per batch, ma il server deve conservare l'intero set di risultati in memoria prima che inizi lo streaming.
Nota
L'endpoint HTTPS OpenCypher fornisce risultati utilizzando la codifica di trasferimento in blocchi HTTP, che trasmette i dati al client man mano che vengono prodotti. Tuttavia, il cliente non può controllare il ritmo di consegna. Non esiste un equivalente FetchSize per l'endpoint HTTPS. Per controllare il consumo di memoria lato client con set di risultati di grandi dimensioni, utilizza una connessione al driver Bolt configurata. FetchSize
Configurazione FetchSize ed elaborazione dei risultati in batch
Impostato FetchSize sulla configurazione della sessione o del driver per controllare il numero di record richiesti dal driver a Neptune per messaggio PULL. Nota che FetchSize controlla il driver-to-server flusso, non il batch a livello di applicazione. Per elaborare i risultati in batch a livello di applicazione (ad esempio, per eseguire operazioni intermedie tra gruppi di record), utilizzate un metodo di supporto che accumuli i record dall'iteratore pigro del driver.
Gli esempi seguenti mostrano entrambi: la configurazione FetchSize sulla sessione e l'utilizzo di un metodo di supporto per raggruppare i record in batch a livello di applicazione per l'elaborazione.
Esempio Java
static void processBatches(Result result, int size, Consumer<List<Record>> handler) { var batch = new ArrayList<Record>(); while (result.hasNext()) { batch.add(result.next()); if (batch.size() >= size) { handler.accept(batch); batch = new ArrayList<>(); } } if (!batch.isEmpty()) handler.accept(batch); } // Usage: try (Session session = driver.session(SessionConfig.builder() .withFetchSize(50) .withDefaultAccessMode(AccessMode.READ) .build())) { Result result = session.run("MATCH (m:movie) RETURN m.title AS title"); processBatches(result, 10, batch -> { for (var record : batch) { System.out.println(record.get("title").asString()); } // Do other intermediary work here between batch calls }); result.consume(); }
Esempio Python
async def batched(result, size): batch = [] async for record in result: batch.append(record) if len(batch) >= size: yield batch batch = [] if batch: yield batch # Usage: async with driver.session(fetch_size=50) as session: result = await session.run("MATCH (m:movie) RETURN m.title AS title") async for batch in batched(result, 10): for record in batch: print(record["title"]) # Do other intermediary work here between batch calls
Esempio Go
func processBatches(ctx context.Context, result neo4j.ResultWithContext, size int, handler func([]*neo4j.Record)) error { var batch []*neo4j.Record for result.Next(ctx) { batch = append(batch, result.Record()) if len(batch) >= size { handler(batch) batch = nil } } if len(batch) > 0 { handler(batch) } return result.Err() } // Usage: session := driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, FetchSize: 50, }) defer session.Close(ctx) result, err := session.Run(ctx, "MATCH (m:movie) RETURN m.title AS title", nil) if err != nil { log.Fatal(err) } if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) { for _, record := range batch { title, _ := record.Get("title") fmt.Println(title) } // Do other intermediary work here between batch calls }); err != nil { log.Fatal(err) }
Esempio.NET
static async IAsyncEnumerable<List<IRecord>> Batched( IResultCursor result, int size) { var batch = new List<IRecord>(); while (await result.FetchAsync()) { batch.Add(result.Current); if (batch.Count >= size) { yield return batch; batch = new List<IRecord>(); } } if (batch.Count > 0) yield return batch; } // Usage: await using var session = driver.AsyncSession(o => o .WithFetchSize(50) .WithDefaultAccessMode(AccessMode.Read)); var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title"); await foreach (var batch in Batched(result, 10)) { foreach (var record in batch) { Console.WriteLine(record["title"].As<string>()); } // Do other intermediary work here between batch calls }
Esempio Node.js
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+). // This is pull-based: the driver pauses fetching while the loop body // executes, providing natural backpressure. This is preferred over // result.subscribe(), which is push-based and continues delivering // records during async processing. async function* batched(result, size) { let batch = []; for await (const record of result) { batch.push(record); if (batch.length >= size) { yield batch; batch = []; } } if (batch.length > 0) yield batch; } // Usage: const session = driver.session({ defaultAccessMode: neo4j.session.READ, fetchSize: 50 }); try { const result = session.run('MATCH (m:movie) RETURN m.title AS title'); for await (const batch of batched(result, 10)) { for (const record of batch) { console.log(record.get('title')); } // Do other intermediary work here between batch calls } } finally { await session.close(); }
Consumo impaziente e consumo incrementale
I seguenti metodi si bloccano fino a quando l'intero set di risultati non viene raccolto in memoria, impedendo all'applicazione di agire sui risultati non appena arrivano:
Java:
result.list()Python:,
result.data()list(result)Vai:
result.Collect(ctx),neo4j.EagerResultTransformer.NET:
result.ToListAsync()Node.js:
const { records } = await result
Per elaborare i risultati in modo incrementale, utilizzate i pattern lazy iteration e batch helper mostrati negli esempi precedenti.