View a markdown version of this page

Streaming dei risultati delle query con Gremlin - Amazon Neptune

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Streaming dei risultati delle query con Gremlin

Quando si esegue un attraversamento Gremlin che restituisce un gran numero di risultati, Neptune li trasmette al client in batch tramite la connessione. WebSocket Neptune invia i batch di risultati man mano che vengono prodotti, senza attendere che il cliente ne richieda altri. Ciò può essere vantaggioso se si desidera elaborare i risultati man mano che vengono restituiti dal server, ma richiede l'utilizzo di schemi di iterazione lenta per evitare di raccogliere l'intero set di risultati in memoria.

Per impostazione predefinita, Neptune invia i risultati in batch di 64 WebSocket per frame. Non è possibile modificare questa impostazione predefinita sul lato server, ma la dimensione del batch può essere sostituita dal client in base alla richiesta effettuata dal client utilizzando l'opzione batchSizerequest (chiamata Tokens.ARGS_BATCH_SIZE nel driver Java o come impostazione predefinita a livello di driver). connectionPool.resultIterationBatchSize

Per informazioni dettagliate sulla configurazione dei driver batchSize in altre lingue, consultate la sezione Configurazione per ciascun driver nella documentazione relativa ai driver e alle varianti di Apache Gremlin. TinkerPop

Poiché il server invia automaticamente i risultati, la contropressione lato client viene gestita implicitamente tramite TCP e controllo del flusso. WebSocket Se il client è lento a leggere dal socket, le scritture del server finiranno per bloccarsi finché il client non recupera il ritardo.

Importante

Lo streaming è più efficace con trasversali che possono produrre risultati in modo incrementale. Gli attraversamenti che includonoorder(),, groupCount() group()dedup(), o altri passaggi che richiedono il completamento dell'attraversamento completo prima di emettere risultati faranno sì che Neptune materializzi l'intero set di risultati in memoria prima dell'inizio dello streaming. In questi casi, il batch riduce comunque il sovraccarico di serializzazione per frame, ma non riduce l'utilizzo della memoria lato server.

Consumare i risultati in modo incrementale

Per elaborare i risultati non appena arrivano, ripeti pigramente usandohasNext()/next()o uno strumento equivalente APIs anziché raccogliere tutti i risultati in un elenco. È possibile utilizzarlo next(batchSize) per estrarre i risultati in batch a livello di applicazione, in modo da eseguire operazioni intermedie tra i batch mentre il server continua a produrre risultati.

Esempio Java (bytecode GLV)
GraphTraversalSource g = traversal().withRemote(connection); int batchSize = 10; int batchNum = 0; var traversal = g.V().hasLabel("movie").values("title").limit(1000); while (traversal.hasNext()) { var batch = traversal.next(batchSize); batchNum++; for (var title : batch) { System.out.println(" " + title); } // Do other intermediary work here between batch calls System.out.println("Batch " + batchNum + " processing complete\n"); }
Esempio Python
g = traversal().with_remote(connection) BATCH_SIZE = 10 batch_num = 0 t = g.V().has_label('movie').values('title').limit(1000) while t.has_next(): batch = t.next(BATCH_SIZE) batch_num += 1 for title in batch: print(f" {title}") # Do other intermediary work here between batch calls print(f"Batch {batch_num} processing complete\n")
Esempio Go
// The Go driver does not support next(n), so batches are accumulated manually. g := gremlingo.Traversal_().WithRemote(connection) resultSet, err := g.V().HasLabel("movie").Values("title").Limit(1000).GetResultSet() if err != nil { log.Fatal(err) } batchSize := 10 batchNum := 0 for { var batch []interface{} for i := 0; i < batchSize; i++ { result, ok, err := resultSet.One() // returns (value, ok, error); ok is false when results are exhausted if err != nil { log.Fatal(err) } if !ok { break } batch = append(batch, result) } if len(batch) == 0 { break } batchNum++ for _, v := range batch { fmt.Printf(" %v\n", v) } // Do other intermediary work here between batch calls fmt.Printf("Batch %d processing complete\n\n", batchNum) }
Esempio.NET
var g = Traversal().WithRemote(connection); var batchSize = 10; var batchNum = 0; var traversal = g.V().HasLabel("movie").Values<string>("title").Limit<string>(1000); while (traversal.HasNext()) { var batch = traversal.Next(batchSize); batchNum++; foreach (var title in batch) { Console.WriteLine($" {title}"); } // Do other intermediary work here between batch calls Console.WriteLine($"Batch {batchNum} processing complete\n"); }
Esempio Node.js
// The Node.js driver does not support next(n), so batches are accumulated manually. const g = traversal().withRemote(connection); const batchSize = 10; let batchNum = 0; const t = g.V().hasLabel('movie').values('title').limit(1000); while (true) { const batch = []; for (let i = 0; i < batchSize; i++) { const result = await t.next(); if (result.done) break; batch.push(result.value); } if (batch.length === 0) break; batchNum++; for (const title of batch) { console.log(` ${title}`); } // Do other intermediary work here between batch calls console.log(`Batch ${batchNum} processing complete\n`); }

Consumo impaziente e consumo incrementale

Lo streaming consente di elaborare i risultati in modo incrementale man mano che vengono recuperati e restituiti dati aggiuntivi. I seguenti metodi si bloccano fino a quando l'intero set di risultati non viene raccolto in memoria, impedendo all'applicazione di agire sui risultati non appena arrivano:

  • Java: toList() o toSet()

  • Python: o toList() toSet()

  • Vai: ToList()ToSet(), o GetResultSet().GetAll()

  • .NET: ToList() o Promise()

  • Node.js: toList()

Nota

I dati continuano a fluire in modo incrementale sulla WebSocket connessione anche quando si utilizzano questi metodi. La differenza è che l'applicazione non può elaborare i singoli risultati fino al completamento dell'intera raccolta. Per elaborare i risultati non appena arrivano, utilizzate la lazy iteration o gli schemi batch mostrati negli esempi precedenti.