View a markdown version of this page

Hasil kueri streaming dengan protokol Bolt - Amazon Neptune

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Hasil kueri streaming dengan protokol Bolt

Saat Anda menjalankan kueri OpenCypher yang mengembalikan sejumlah besar catatan, Anda tidak perlu memuat seluruh hasil yang disetel ke memori klien sekaligus. Neptunus mendukung streaming hasil melalui protokol Bolt, yang memungkinkan pengemudi mengambil catatan dalam batch dan memprosesnya secara bertahap saat mereka tiba. Ini bisa menguntungkan jika Anda ingin memproses hasil saat dikembalikan dari server, tetapi memerlukan konfigurasi FetchSize dan menggunakan iterasi lambat untuk menghindari memuat hasil lengkap yang disetel ke dalam memori.

Ini bekerja melalui mekanisme Bolt v4.0 PULL. Alih-alih meminta semua catatan dalam satu pesan, driver mengirimkan permintaan PULL dengan ukuran batch. Misalnya, “beri saya 50 catatan berikutnya.” Neptunus mengirimkan kembali jumlah catatan itu bersama dengan bendera yang menunjukkan apakah lebih banyak yang tersedia. Pengemudi kemudian meminta batch berikutnya ketika klien siap, dan ini berlanjut hingga set hasil sepenuhnya dikonsumsi.

Anda mengontrol ukuran batch menggunakan FetchSize pengaturan dalam konfigurasi sesi pengemudi. Dikombinasikan dengan iterasi malas (memproses catatan satu per satu saat dikembalikan), ini membatasi penggunaan memori sisi klien terlepas dari seberapa besar total kumpulan hasil.

penting

FetchSizemengontrol aliran data antara server dan klien. Itu tidak mempengaruhi bagaimana Neptunus mengeksekusi kueri. Neptunus mulai mengeksekusi query dan menghasilkan hasil segera setelah menerima pesan RUN. Hasil buffer sisi server sampai klien memintanya dengan pesan PULL. Jika klien tidak menggunakan hasil dalam batas waktu kueri yang dikonfigurasi, Neptunus menghentikan kueri, membuang hasil buffer, dan melepaskan sumber daya sisi server. Tidak ada batas buffer sisi server yang terpisah; buffering berlanjut sampai klien menarik hasil atau batas waktu kueri tercapai.

Streaming paling efektif dengan kueri non-agregasi yang dapat menghasilkan hasil secara bertahap. Kueri yang menyertakanORDER BY, fungsi agregasi (seperticount(),, atausum())collect(), atau mengharuskan DISTINCT Neptunus untuk menghitung set hasil lengkap sebelum mengembalikan catatan apa pun. Dalam kasus ini, FetchSize masih membatasi penggunaan memori sisi klien per batch, tetapi server harus menyimpan seluruh hasil yang disetel dalam memori sebelum streaming dimulai.

catatan

Titik akhir OpenCypher HTTPS memberikan hasil menggunakan encoding transfer chunked HTTP, yang mengalirkan data ke klien saat diproduksi. Namun, klien tidak dapat mengontrol kecepatan pengiriman. Tidak ada yang setara dengan FetchSize untuk titik akhir HTTPS. Untuk mengontrol konsumsi memori sisi klien dengan set hasil yang besar, gunakan koneksi driver Bolt dengan FetchSize konfigurasi.

Mengkonfigurasi FetchSize dan memproses hasil dalam batch

Tetapkan FetchSize pada sesi atau konfigurasi driver untuk mengontrol berapa banyak catatan permintaan driver dari Neptunus per pesan PULL. Perhatikan bahwa FetchSize mengontrol driver-to-server aliran, bukan batching tingkat aplikasi. Untuk memproses hasil dalam batch tingkat aplikasi (misalnya, untuk melakukan pekerjaan perantara antara kelompok catatan), gunakan metode pembantu yang mengakumulasi catatan dari iterator malas pengemudi.

Contoh berikut menunjukkan keduanya: mengonfigurasi FetchSize pada sesi, dan menggunakan metode pembantu untuk mengelompokkan catatan ke dalam batch tingkat aplikasi untuk diproses.

contoh Java
static void processBatches(Result result, int size, Consumer<List<Record>> handler) { var batch = new ArrayList<Record>(); while (result.hasNext()) { batch.add(result.next()); if (batch.size() >= size) { handler.accept(batch); batch = new ArrayList<>(); } } if (!batch.isEmpty()) handler.accept(batch); } // Usage: try (Session session = driver.session(SessionConfig.builder() .withFetchSize(50) .withDefaultAccessMode(AccessMode.READ) .build())) { Result result = session.run("MATCH (m:movie) RETURN m.title AS title"); processBatches(result, 10, batch -> { for (var record : batch) { System.out.println(record.get("title").asString()); } // Do other intermediary work here between batch calls }); result.consume(); }
contoh Python
async def batched(result, size): batch = [] async for record in result: batch.append(record) if len(batch) >= size: yield batch batch = [] if batch: yield batch # Usage: async with driver.session(fetch_size=50) as session: result = await session.run("MATCH (m:movie) RETURN m.title AS title") async for batch in batched(result, 10): for record in batch: print(record["title"]) # Do other intermediary work here between batch calls
contoh Go
func processBatches(ctx context.Context, result neo4j.ResultWithContext, size int, handler func([]*neo4j.Record)) error { var batch []*neo4j.Record for result.Next(ctx) { batch = append(batch, result.Record()) if len(batch) >= size { handler(batch) batch = nil } } if len(batch) > 0 { handler(batch) } return result.Err() } // Usage: session := driver.NewSession(ctx, neo4j.SessionConfig{ AccessMode: neo4j.AccessModeRead, FetchSize: 50, }) defer session.Close(ctx) result, err := session.Run(ctx, "MATCH (m:movie) RETURN m.title AS title", nil) if err != nil { log.Fatal(err) } if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) { for _, record := range batch { title, _ := record.Get("title") fmt.Println(title) } // Do other intermediary work here between batch calls }); err != nil { log.Fatal(err) }
contoh.NET
static async IAsyncEnumerable<List<IRecord>> Batched( IResultCursor result, int size) { var batch = new List<IRecord>(); while (await result.FetchAsync()) { batch.Add(result.Current); if (batch.Count >= size) { yield return batch; batch = new List<IRecord>(); } } if (batch.Count > 0) yield return batch; } // Usage: await using var session = driver.AsyncSession(o => o .WithFetchSize(50) .WithDefaultAccessMode(AccessMode.Read)); var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title"); await foreach (var batch in Batched(result, 10)) { foreach (var record in batch) { Console.WriteLine(record["title"].As<string>()); } // Do other intermediary work here between batch calls }
contoh Node.js
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+). // This is pull-based: the driver pauses fetching while the loop body // executes, providing natural backpressure. This is preferred over // result.subscribe(), which is push-based and continues delivering // records during async processing. async function* batched(result, size) { let batch = []; for await (const record of result) { batch.push(record); if (batch.length >= size) { yield batch; batch = []; } } if (batch.length > 0) yield batch; } // Usage: const session = driver.session({ defaultAccessMode: neo4j.session.READ, fetchSize: 50 }); try { const result = session.run('MATCH (m:movie) RETURN m.title AS title'); for await (const batch of batched(result, 10)) { for (const record of batch) { console.log(record.get('title')); } // Do other intermediary work here between batch calls } } finally { await session.close(); }

Konsumsi bersemangat vs. tambahan

Metode berikut memblokir hingga seluruh kumpulan hasil dikumpulkan ke dalam memori, mencegah aplikasi Anda bertindak berdasarkan hasil saat tiba:

  • Jawa: result.list()

  • Python:, result.data() list(result)

  • Pergi:result.Collect(ctx), neo4j.EagerResultTransformer

  • .NET: result.ToListAsync()

  • Node.js: const { records } = await result

Untuk memproses hasil secara bertahap, gunakan pola lazy iteration dan batch helper yang ditunjukkan pada contoh di atas.