View a markdown version of this page

使用 Gremlin 直播查询结果 - Amazon Neptune

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Gremlin 直播查询结果

当你运行返回大量结果的 Gremlin 遍历时,Neptune 会通过连接将它们分批流回客户端。 WebSocket Neptune 在生成结果批次时发送结果,无需等待客户提出更多请求。如果你想在从服务器返回结果时处理结果,但需要使用延迟迭代模式来避免将完整的结果集收集到内存中,这可能很有好处。

默认情况下,Neptune 会按每 WebSocket 帧 64 的批次发送结果。您无法更改此服务器端默认值,但是可以使用请求选项(在 Java 驱动程序中调Tokens.ARGS_BATCH_SIZE用,或connectionPool.resultIterationBatchSize作为驱动程序级别的默认值)从客户端根据每个batchSize请求覆盖批处理大小。

有关使用其他语言配置驱动程序batchSize的详细信息,请参阅 Apache TinkerPop Gremlin 驱动程序和变体文档中每个驱动程序的 “配置” 部分。

由于服务器会自动推送结果,因此客户端的背压是通过 TCP 和 WebSocket 流量控制隐式处理的。如果客户端从套接字读取速度很慢,则服务器的写入最终将阻塞,直到客户端赶上。

重要

对于可以逐步生成结果的遍历,流式传输最为有效。如果遍历包含order()、、groupCount()group()dedup()、或其他需要在发出结果之前完成完整遍历的步骤,则会导致 Neptune 在流式传输开始之前在内存中实现整个结果集。在这些情况下,批处理仍然可以减少每帧的序列化开销,但不会减少服务器端的内存使用量。

逐步消耗结果

要在结果到达时对其进行处理,请使用hasNext()/next()或等效项进行延迟迭代, APIs 而不是将所有结果收集到列表中。您可以使用next(batchSize)在应用程序级批处理中提取结果,从而允许您在服务器继续生成结果的同时在批次之间执行中间工作。

例 Java(GLV 字节码)
GraphTraversalSource g = traversal().withRemote(connection); int batchSize = 10; int batchNum = 0; var traversal = g.V().hasLabel("movie").values("title").limit(1000); while (traversal.hasNext()) { var batch = traversal.next(batchSize); batchNum++; for (var title : batch) { System.out.println(" " + title); } // Do other intermediary work here between batch calls System.out.println("Batch " + batchNum + " processing complete\n"); }
例 Python
g = traversal().with_remote(connection) BATCH_SIZE = 10 batch_num = 0 t = g.V().has_label('movie').values('title').limit(1000) while t.has_next(): batch = t.next(BATCH_SIZE) batch_num += 1 for title in batch: print(f" {title}") # Do other intermediary work here between batch calls print(f"Batch {batch_num} processing complete\n")
例 Go
// The Go driver does not support next(n), so batches are accumulated manually. g := gremlingo.Traversal_().WithRemote(connection) resultSet, err := g.V().HasLabel("movie").Values("title").Limit(1000).GetResultSet() if err != nil { log.Fatal(err) } batchSize := 10 batchNum := 0 for { var batch []interface{} for i := 0; i < batchSize; i++ { result, ok, err := resultSet.One() // returns (value, ok, error); ok is false when results are exhausted if err != nil { log.Fatal(err) } if !ok { break } batch = append(batch, result) } if len(batch) == 0 { break } batchNum++ for _, v := range batch { fmt.Printf(" %v\n", v) } // Do other intermediary work here between batch calls fmt.Printf("Batch %d processing complete\n\n", batchNum) }
例.NET
var g = Traversal().WithRemote(connection); var batchSize = 10; var batchNum = 0; var traversal = g.V().HasLabel("movie").Values<string>("title").Limit<string>(1000); while (traversal.HasNext()) { var batch = traversal.Next(batchSize); batchNum++; foreach (var title in batch) { Console.WriteLine($" {title}"); } // Do other intermediary work here between batch calls Console.WriteLine($"Batch {batchNum} processing complete\n"); }
例 Node.js
// The Node.js driver does not support next(n), so batches are accumulated manually. const g = traversal().withRemote(connection); const batchSize = 10; let batchNum = 0; const t = g.V().hasLabel('movie').values('title').limit(1000); while (true) { const batch = []; for (let i = 0; i < batchSize; i++) { const result = await t.next(); if (result.done) break; batch.push(result.value); } if (batch.length === 0) break; batchNum++; for (const title of batch) { console.log(` ${title}`); } // Do other intermediary work here between batch calls console.log(`Batch ${batchNum} processing complete\n`); }

急需消费与增量消费

流式传输允许您在获取和返回更多数据时以增量方式处理结果。以下方法会一直阻塞,直到将整个结果集收集到内存中,从而防止您的应用程序在结果到达时对其进行操作:

  • Java:toList()toSet()

  • Python:toList()toSet()

  • 去:ToList()ToSet()、或 GetResultSet().GetAll()

  • .NET:ToList()Promise()

  • Node.js:toList()

注意

即使使用这些方法,数据仍会以增量方式通过 WebSocket 连接流动。不同之处在于,在整个收集完成之前,您的应用程序无法处理单个结果。要在结果到达时对其进行处理,请使用上面示例中显示的延迟迭代或批处理模式。