View a markdown version of this page

Streaming des résultats de requêtes avec le protocole Bolt - Amazon Neptune

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Streaming des résultats de requêtes avec le protocole Bolt

Lorsque vous exécutez une requête OpenCypher qui renvoie un grand nombre d'enregistrements, il n'est pas nécessaire de charger l'ensemble des résultats dans la mémoire du client en une seule fois. Neptune prend en charge le streaming des résultats via le protocole Bolt, qui permet au pilote de récupérer les enregistrements par lots et de les traiter progressivement au fur et à mesure qu'ils arrivent. Cela peut être avantageux si vous souhaitez traiter les résultats au fur et à mesure qu'ils sont renvoyés par le serveur, mais cela nécessite de configurer FetchSize et d'utiliser une itération différée pour éviter de charger le jeu de résultats complet en mémoire.

Cela fonctionne grâce au mécanisme BOLT v4.0 PULL. Au lieu de demander tous les enregistrements dans un seul message, le pilote envoie une demande PULL avec une taille de lot. Par exemple, « donnez-moi les 50 prochains enregistrements ». Neptune renvoie ce nombre d'enregistrements avec un indicateur indiquant si d'autres sont disponibles. Le pilote demande ensuite le lot suivant lorsque le client est prêt, et cela continue jusqu'à ce que le jeu de résultats soit entièrement consommé.

Vous pouvez contrôler la taille du lot à l'aide des FetchSize paramètres définis dans la configuration de session du pilote. Combiné à une itération lente (traitement des enregistrements un par un au fur et à mesure qu'ils sont renvoyés), cela limite l'utilisation de la mémoire côté client, quelle que soit la taille du jeu de résultats total.

Important

FetchSizecontrôle le flux de données entre le serveur et le client. Cela n'a aucune incidence sur la façon dont Neptune exécute la requête. Neptune commence à exécuter la requête et à produire des résultats dès qu'il reçoit le message RUN. Les résultats sont mis en mémoire tampon côté serveur jusqu'à ce que le client les demande avec un message PULL. Si le client ne consomme pas les résultats dans le délai de requête configuré, Neptune met fin à la requête, supprime les résultats mis en mémoire tampon et libère les ressources côté serveur. Il n'y a pas de limite de mémoire tampon distincte côté serveur ; la mise en mémoire tampon continue jusqu'à ce que le client extrait les résultats ou que le délai d'expiration de la requête soit atteint.

Le streaming est particulièrement efficace avec des requêtes non agrégantes qui peuvent produire des résultats de manière incrémentielle. Requêtes qui incluent des ORDER BY fonctions d'agrégation (telles que count()collect(), ousum()) ou qui DISTINCT nécessitent que Neptune calcule le jeu de résultats complet avant de renvoyer des enregistrements. Dans ces cas, l'utilisation de la mémoire côté client par lot est FetchSize toujours limitée, mais le serveur doit conserver l'ensemble des résultats en mémoire avant le début de la diffusion.

Note

Le point de terminaison HTTPS OpenCypher fournit des résultats à l'aide du codage de transfert HTTP fragmenté, qui diffuse les données au client au fur et à mesure de leur production. Cependant, le client ne peut pas contrôler le rythme de livraison. Il n'existe pas d'équivalent FetchSize pour le point de terminaison HTTPS. Pour contrôler la consommation de mémoire côté client avec de grands ensembles de résultats, utilisez une connexion au pilote Bolt configurée. FetchSize

Configuration FetchSize et traitement des résultats par lots

Définissez FetchSize la session ou la configuration du pilote pour contrôler le nombre d'enregistrements que le pilote demande à Neptune par message PULL. Notez que cela FetchSize contrôle le driver-to-server flux, et non le traitement par lots au niveau de l'application. Pour traiter les résultats par lots au niveau de l'application (par exemple, pour effectuer un travail intermédiaire entre des groupes d'enregistrements), utilisez une méthode d'assistance qui accumule les enregistrements provenant de l'itérateur paresseux du pilote.

Les exemples suivants montrent à la fois la configuration FetchSize sur la session et l'utilisation d'une méthode d'assistance pour regrouper les enregistrements en lots au niveau de l'application à des fins de traitement.

Exemple Java
static void processBatches(Result result, int size, Consumer<List<Record>> handler) { var batch = new ArrayList<Record>(); while (result.hasNext()) { batch.add(result.next()); if (batch.size() >= size) { handler.accept(batch); batch = new ArrayList<>(); } } if (!batch.isEmpty()) handler.accept(batch); } // Usage: try (Session session = driver.session(SessionConfig.builder() .withFetchSize(50) .withDefaultAccessMode(AccessMode.READ) .build())) { Result result = session.run("MATCH (m:movie) RETURN m.title AS title"); processBatches(result, 10, batch -> { for (var record : batch) { System.out.println(record.get("title").asString()); } // Do other intermediary work here between batch calls }); result.consume(); }
Exemple Python
async def batched(result, size): batch = [] async for record in result: batch.append(record) if len(batch) >= size: yield batch batch = [] if batch: yield batch # Usage: async with driver.session(fetch_size=50) as session: result = await session.run("MATCH (m:movie) RETURN m.title AS title") async for batch in batched(result, 10): for record in batch: print(record["title"]) # Do other intermediary work here between batch calls
Exemple Go
func processBatches(ctx context.Context, result neo4j.ResultWithContext, size int, handler func([]*neo4j.Record)) error { var batch []*neo4j.Record for result.Next(ctx) { batch = append(batch, result.Record()) if len(batch) >= size { handler(batch) batch = nil } } if len(batch) > 0 { handler(batch) } return result.Err() } // Usage: session := driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, FetchSize: 50, }) defer session.Close(ctx) result, err := session.Run(ctx, "MATCH (m:movie) RETURN m.title AS title", nil) if err != nil { log.Fatal(err) } if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) { for _, record := range batch { title, _ := record.Get("title") fmt.Println(title) } // Do other intermediary work here between batch calls }); err != nil { log.Fatal(err) }
Exemple.NET
static async IAsyncEnumerable<List<IRecord>> Batched( IResultCursor result, int size) { var batch = new List<IRecord>(); while (await result.FetchAsync()) { batch.Add(result.Current); if (batch.Count >= size) { yield return batch; batch = new List<IRecord>(); } } if (batch.Count > 0) yield return batch; } // Usage: await using var session = driver.AsyncSession(o => o .WithFetchSize(50) .WithDefaultAccessMode(AccessMode.Read)); var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title"); await foreach (var batch in Batched(result, 10)) { foreach (var record in batch) { Console.WriteLine(record["title"].As<string>()); } // Do other intermediary work here between batch calls }
Exemple Node.js
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+). // This is pull-based: the driver pauses fetching while the loop body // executes, providing natural backpressure. This is preferred over // result.subscribe(), which is push-based and continues delivering // records during async processing. async function* batched(result, size) { let batch = []; for await (const record of result) { batch.push(record); if (batch.length >= size) { yield batch; batch = []; } } if (batch.length > 0) yield batch; } // Usage: const session = driver.session({ defaultAccessMode: neo4j.session.READ, fetchSize: 50 }); try { const result = session.run('MATCH (m:movie) RETURN m.title AS title'); for await (const batch of batched(result, 10)) { for (const record of batch) { console.log(record.get('title')); } // Do other intermediary work here between batch calls } } finally { await session.close(); }

Consommation rapide ou consommation incrémentielle

Les méthodes suivantes bloquent jusqu'à ce que la totalité du jeu de résultats soit collectée en mémoire, empêchant ainsi votre application d'agir sur les résultats lorsqu'ils arrivent :

  • Java : result.list()

  • Python :result.data(), list(result)

  • Allez :result.Collect(ctx), neo4j.EagerResultTransformer

  • .NET : result.ToListAsync()

  • Node.js : const { records } = await result

Pour traiter les résultats de manière incrémentielle, utilisez les modèles d'itération paresseuse et d'assistance par lots illustrés dans les exemples ci-dessus.