View a markdown version of this page

Streaming de resultados de consultas com o protocolo Bolt - Amazon Neptune

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Streaming de resultados de consultas com o protocolo Bolt

Ao executar uma consulta OpenCypher que retorna um grande número de registros, você não precisa carregar todo o conjunto de resultados na memória do cliente de uma só vez. O Neptune suporta streaming de resultados pelo protocolo Bolt, que permite que o driver busque registros em lotes e os processe incrementalmente à medida que chegam. Isso pode ser vantajoso se você quiser processar os resultados à medida que eles são retornados do servidor, mas requer a configuração FetchSize e o uso de iteração lenta para evitar o carregamento do conjunto completo de resultados na memória.

Isso funciona por meio do mecanismo BOLT v4.0 PULL. Em vez de solicitar todos os registros em uma única mensagem, o driver envia uma solicitação PULL com um tamanho de lote. Por exemplo, “me dê os próximos 50 registros”. Neptune envia de volta esse número de registros junto com uma bandeira indicando se há mais registros disponíveis. O driver então solicita o próximo lote quando o cliente estiver pronto, e isso continua até que o conjunto de resultados seja totalmente consumido.

Você controla o tamanho do lote usando a FetchSize configuração da sessão do driver. Combinado com a iteração lenta (processando registros um de cada vez à medida que são retornados), isso limita o uso da memória do lado do cliente, independentemente do tamanho total do conjunto de resultados.

Importante

FetchSizecontrola o fluxo de dados entre o servidor e o cliente. Isso não afeta a forma como o Neptune executa a consulta. O Neptune começa a executar a consulta e a produzir resultados assim que recebe a mensagem RUN. Os resultados são armazenados em buffer no lado do servidor até que o cliente os solicite com uma mensagem PULL. Se o cliente não consumir resultados dentro do tempo limite de consulta configurado, o Neptune encerra a consulta, descarta os resultados armazenados em buffer e libera recursos do lado do servidor. Não há limite de buffer separado no lado do servidor; o buffer continua até que o cliente obtenha os resultados ou o tempo limite da consulta seja atingido.

O streaming é mais eficaz com consultas não agregadas que podem produzir resultados incrementalmente. Consultas que incluem ORDER BY funções de agregação (como count()collect(), ousum()) ou exigem que o DISTINCT Neptune calcule o conjunto completo de resultados antes de retornar qualquer registro. Nesses casos, FetchSize ainda limita o uso da memória do lado do cliente por lote, mas o servidor deve manter todo o conjunto de resultados na memória antes do início da transmissão.

nota

O endpoint HTTPS OpenCypher fornece resultados usando a codificação de transferência fragmentada HTTP, que transmite dados para o cliente à medida que são produzidos. No entanto, o cliente não pode controlar o ritmo da entrega. Não há equivalente FetchSize para o endpoint HTTPS. Para controlar o consumo de memória do lado do cliente com grandes conjuntos de resultados, use uma conexão de driver Bolt configurada. FetchSize

Configurando FetchSize e processando resultados em lotes

FetchSizeDefina a configuração da sessão ou do driver para controlar quantos registros o driver solicita do Neptune por mensagem PULL. Observe que FetchSize controla o driver-to-server fluxo, não o agrupamento em lotes no nível do aplicativo. Para processar resultados em lotes no nível do aplicativo (por exemplo, para realizar um trabalho intermediário entre grupos de registros), use um método auxiliar que acumule registros do iterador lento do driver.

Os exemplos a seguir mostram ambos: a configuração FetchSize na sessão e o uso de um método auxiliar para agrupar registros em lotes no nível do aplicativo para processamento.

exemplo Java
static void processBatches(Result result, int size, Consumer<List<Record>> handler) { var batch = new ArrayList<Record>(); while (result.hasNext()) { batch.add(result.next()); if (batch.size() >= size) { handler.accept(batch); batch = new ArrayList<>(); } } if (!batch.isEmpty()) handler.accept(batch); } // Usage: try (Session session = driver.session(SessionConfig.builder() .withFetchSize(50) .withDefaultAccessMode(AccessMode.READ) .build())) { Result result = session.run("MATCH (m:movie) RETURN m.title AS title"); processBatches(result, 10, batch -> { for (var record : batch) { System.out.println(record.get("title").asString()); } // Do other intermediary work here between batch calls }); result.consume(); }
exemplo Python
async def batched(result, size): batch = [] async for record in result: batch.append(record) if len(batch) >= size: yield batch batch = [] if batch: yield batch # Usage: async with driver.session(fetch_size=50) as session: result = await session.run("MATCH (m:movie) RETURN m.title AS title") async for batch in batched(result, 10): for record in batch: print(record["title"]) # Do other intermediary work here between batch calls
exemplo Go
func processBatches(ctx context.Context, result neo4j.ResultWithContext, size int, handler func([]*neo4j.Record)) error { var batch []*neo4j.Record for result.Next(ctx) { batch = append(batch, result.Record()) if len(batch) >= size { handler(batch) batch = nil } } if len(batch) > 0 { handler(batch) } return result.Err() } // Usage: session := driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, FetchSize: 50, }) defer session.Close(ctx) result, err := session.Run(ctx, "MATCH (m:movie) RETURN m.title AS title", nil) if err != nil { log.Fatal(err) } if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) { for _, record := range batch { title, _ := record.Get("title") fmt.Println(title) } // Do other intermediary work here between batch calls }); err != nil { log.Fatal(err) }
exemplo.NET
static async IAsyncEnumerable<List<IRecord>> Batched( IResultCursor result, int size) { var batch = new List<IRecord>(); while (await result.FetchAsync()) { batch.Add(result.Current); if (batch.Count >= size) { yield return batch; batch = new List<IRecord>(); } } if (batch.Count > 0) yield return batch; } // Usage: await using var session = driver.AsyncSession(o => o .WithFetchSize(50) .WithDefaultAccessMode(AccessMode.Read)); var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title"); await foreach (var batch in Batched(result, 10)) { foreach (var record in batch) { Console.WriteLine(record["title"].As<string>()); } // Do other intermediary work here between batch calls }
exemplo Node.js
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+). // This is pull-based: the driver pauses fetching while the loop body // executes, providing natural backpressure. This is preferred over // result.subscribe(), which is push-based and continues delivering // records during async processing. async function* batched(result, size) { let batch = []; for await (const record of result) { batch.push(record); if (batch.length >= size) { yield batch; batch = []; } } if (batch.length > 0) yield batch; } // Usage: const session = driver.session({ defaultAccessMode: neo4j.session.READ, fetchSize: 50 }); try { const result = session.run('MATCH (m:movie) RETURN m.title AS title'); for await (const batch of batched(result, 10)) { for (const record of batch) { console.log(record.get('title')); } // Do other intermediary work here between batch calls } } finally { await session.close(); }

Consumo ávido versus consumo incremental

Os métodos a seguir são bloqueados até que todo o conjunto de resultados seja coletado na memória, impedindo que seu aplicativo atue nos resultados à medida que eles chegam:

  • Java: result.list()

  • Python:, result.data() list(result)

  • Vá:result.Collect(ctx), neo4j.EagerResultTransformer

  • .NET: result.ToListAsync()

  • Node.js: const { records } = await result

Para processar os resultados de forma incremental, use a iteração lenta e os padrões auxiliares em lote mostrados nos exemplos acima.