View a markdown version of this page

Streaming de resultados de consultas com o Gremlin - Amazon Neptune

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Streaming de resultados de consultas com o Gremlin

Quando você executa uma travessia do Gremlin que retorna um grande número de resultados, o Neptune os envia de volta ao cliente em lotes pela conexão. WebSocket O Neptune envia lotes de resultados à medida que são produzidos, sem esperar que o cliente solicite mais. Isso pode ser vantajoso se você quiser processar os resultados à medida que eles são retornados do servidor, mas requer o uso de padrões de iteração lenta para evitar a coleta do conjunto completo de resultados na memória.

O Neptune envia resultados em lotes de 64 WebSocket por quadro por padrão. Você não pode alterar esse padrão do lado do servidor, mas o tamanho do lote pode ser substituído por solicitação do cliente usando a opção de solicitação (chamada Tokens.ARGS_BATCH_SIZE no driver Java ou connectionPool.resultIterationBatchSize como padrão no nível do driver). batchSize

Para obter detalhes sobre a configuração de drivers batchSize em outros idiomas, consulte a seção Configuração de cada driver na documentação de drivers e variantes do Apache TinkerPop Gremlin.

Como o servidor envia os resultados automaticamente, a contrapressão do lado do cliente é tratada implicitamente por meio de TCP e controle de fluxo. WebSocket Se o cliente demorar para ler do soquete, as gravações do servidor acabarão sendo bloqueadas até que o cliente se atualize.

Importante

O streaming é mais eficaz com travessias que podem produzir resultados incrementalmente. Percursos que incluamorder(),,, groupCount() group()dedup(), ou outras etapas que exijam que a travessia completa seja concluída antes de emitir resultados farão com que o Neptune materialize todo o conjunto de resultados na memória antes do início da transmissão. Nesses casos, o agrupamento em lotes ainda reduz a sobrecarga de serialização por quadro, mas não reduz o uso da memória do lado do servidor.

Consumindo resultados de forma incremental

Para processar os resultados à medida que eles chegam, repita lentamente usandohasNext()/next()ou equivalente, APIs em vez de coletar todos os resultados em uma lista. Você pode usar next(batchSize) para obter resultados em lotes no nível do aplicativo, permitindo que você execute um trabalho intermediário entre os lotes enquanto o servidor continua produzindo resultados.

exemplo Java (bytecode GLV)
GraphTraversalSource g = traversal().withRemote(connection); int batchSize = 10; int batchNum = 0; var traversal = g.V().hasLabel("movie").values("title").limit(1000); while (traversal.hasNext()) { var batch = traversal.next(batchSize); batchNum++; for (var title : batch) { System.out.println(" " + title); } // Do other intermediary work here between batch calls System.out.println("Batch " + batchNum + " processing complete\n"); }
exemplo Python
g = traversal().with_remote(connection) BATCH_SIZE = 10 batch_num = 0 t = g.V().has_label('movie').values('title').limit(1000) while t.has_next(): batch = t.next(BATCH_SIZE) batch_num += 1 for title in batch: print(f" {title}") # Do other intermediary work here between batch calls print(f"Batch {batch_num} processing complete\n")
exemplo Go
// The Go driver does not support next(n), so batches are accumulated manually. g := gremlingo.Traversal_().WithRemote(connection) resultSet, err := g.V().HasLabel("movie").Values("title").Limit(1000).GetResultSet() if err != nil { log.Fatal(err) } batchSize := 10 batchNum := 0 for { var batch []interface{} for i := 0; i < batchSize; i++ { result, ok, err := resultSet.One() // returns (value, ok, error); ok is false when results are exhausted if err != nil { log.Fatal(err) } if !ok { break } batch = append(batch, result) } if len(batch) == 0 { break } batchNum++ for _, v := range batch { fmt.Printf(" %v\n", v) } // Do other intermediary work here between batch calls fmt.Printf("Batch %d processing complete\n\n", batchNum) }
exemplo.NET
var g = Traversal().WithRemote(connection); var batchSize = 10; var batchNum = 0; var traversal = g.V().HasLabel("movie").Values<string>("title").Limit<string>(1000); while (traversal.HasNext()) { var batch = traversal.Next(batchSize); batchNum++; foreach (var title in batch) { Console.WriteLine($" {title}"); } // Do other intermediary work here between batch calls Console.WriteLine($"Batch {batchNum} processing complete\n"); }
exemplo Node.js
// The Node.js driver does not support next(n), so batches are accumulated manually. const g = traversal().withRemote(connection); const batchSize = 10; let batchNum = 0; const t = g.V().hasLabel('movie').values('title').limit(1000); while (true) { const batch = []; for (let i = 0; i < batchSize; i++) { const result = await t.next(); if (result.done) break; batch.push(result.value); } if (batch.length === 0) break; batchNum++; for (const title of batch) { console.log(` ${title}`); } // Do other intermediary work here between batch calls console.log(`Batch ${batchNum} processing complete\n`); }

Consumo ávido versus consumo incremental

O streaming permite que você processe os resultados de forma incremental à medida que dados adicionais são buscados e retornados. Os métodos a seguir são bloqueados até que todo o conjunto de resultados seja coletado na memória, impedindo que seu aplicativo atue nos resultados à medida que eles chegam:

  • Java: toList() ou toSet()

  • Python: ou toList() toSet()

  • Vá:ToList(),ToSet(), ou GetResultSet().GetAll()

  • .NET: ToList() ou Promise()

  • Node.js: toList()

nota

Os dados ainda fluem incrementalmente pela WebSocket conexão, mesmo ao usar esses métodos. A diferença é que sua inscrição não pode processar resultados individuais até que toda a coleção seja concluída. Para processar os resultados à medida que eles chegam, use a iteração lenta ou os padrões de lote mostrados nos exemplos acima.