

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Hasil kueri streaming dengan protokol Bolt
<a name="access-graph-opencypher-streaming"></a>

Saat Anda menjalankan kueri OpenCypher yang mengembalikan sejumlah besar catatan, Anda tidak perlu memuat seluruh hasil yang disetel ke memori klien sekaligus. Neptunus mendukung streaming hasil melalui protokol Bolt, yang memungkinkan pengemudi mengambil catatan dalam batch dan memprosesnya secara bertahap saat mereka tiba. Ini bisa menguntungkan jika Anda ingin memproses hasil saat dikembalikan dari server, tetapi memerlukan konfigurasi `FetchSize` dan menggunakan iterasi lambat untuk menghindari memuat hasil lengkap yang disetel ke dalam memori.

Ini bekerja melalui [mekanisme Bolt v4.0 PULL](https://neo4j.com/docs/bolt/current/bolt/message/#messages-pull). Alih-alih meminta semua catatan dalam satu pesan, driver mengirimkan permintaan PULL dengan ukuran batch. Misalnya, “beri saya 50 catatan berikutnya.” Neptunus mengirimkan kembali jumlah catatan itu bersama dengan bendera yang menunjukkan apakah lebih banyak yang tersedia. Pengemudi kemudian meminta batch berikutnya ketika klien siap, dan ini berlanjut hingga set hasil sepenuhnya dikonsumsi.

Anda mengontrol ukuran batch menggunakan `FetchSize` pengaturan dalam konfigurasi sesi pengemudi. Dikombinasikan dengan iterasi malas (memproses catatan satu per satu saat dikembalikan), ini membatasi penggunaan memori sisi klien terlepas dari seberapa besar total kumpulan hasil.

**penting**  
`FetchSize`mengontrol aliran data antara server dan klien. Itu tidak mempengaruhi bagaimana Neptunus mengeksekusi kueri. Neptunus mulai mengeksekusi query dan menghasilkan hasil segera setelah menerima pesan RUN. Hasil buffer sisi server sampai klien memintanya dengan pesan PULL. Jika klien tidak menggunakan hasil dalam batas waktu kueri yang dikonfigurasi, Neptunus menghentikan kueri, membuang hasil buffer, dan melepaskan sumber daya sisi server. Tidak ada batas buffer sisi server yang terpisah; buffering berlanjut sampai klien menarik hasil atau batas waktu kueri tercapai.  
Streaming paling efektif dengan kueri non-agregasi yang dapat menghasilkan hasil secara bertahap. Kueri yang menyertakan`ORDER BY`, fungsi agregasi (seperti`count()`,, atau`sum()`)`collect()`, atau mengharuskan `DISTINCT` Neptunus untuk menghitung set hasil lengkap sebelum mengembalikan catatan apa pun. Dalam kasus ini, `FetchSize` masih membatasi penggunaan memori sisi klien per batch, tetapi server harus menyimpan seluruh hasil yang disetel dalam memori sebelum streaming dimulai.

**catatan**  
Titik akhir OpenCypher HTTPS memberikan hasil menggunakan encoding transfer chunked HTTP, yang mengalirkan data ke klien saat diproduksi. Namun, klien tidak dapat mengontrol kecepatan pengiriman. Tidak ada yang setara dengan `FetchSize` untuk titik akhir HTTPS. Untuk mengontrol konsumsi memori sisi klien dengan set hasil yang besar, gunakan koneksi driver Bolt dengan `FetchSize` konfigurasi.

## Mengkonfigurasi FetchSize dan memproses hasil dalam batch
<a name="access-graph-opencypher-streaming-usage"></a>

Tetapkan `FetchSize` pada sesi atau konfigurasi driver untuk mengontrol berapa banyak catatan permintaan driver dari Neptunus per pesan PULL. Perhatikan bahwa `FetchSize` mengontrol driver-to-server aliran, bukan batching tingkat aplikasi. Untuk memproses hasil dalam batch tingkat aplikasi (misalnya, untuk melakukan pekerjaan perantara antara kelompok catatan), gunakan metode pembantu yang mengakumulasi catatan dari iterator malas pengemudi.

Contoh berikut menunjukkan keduanya: mengonfigurasi `FetchSize` pada sesi, dan menggunakan metode pembantu untuk mengelompokkan catatan ke dalam batch tingkat aplikasi untuk diproses.

**Example Java**  

```
static void processBatches(Result result, int size,
        Consumer<List<Record>> handler) {
    var batch = new ArrayList<Record>();
    while (result.hasNext()) {
        batch.add(result.next());
        if (batch.size() >= size) {
            handler.accept(batch);
            batch = new ArrayList<>();
        }
    }
    if (!batch.isEmpty()) handler.accept(batch);
}

// Usage:
try (Session session = driver.session(SessionConfig.builder()
        .withFetchSize(50)
        .withDefaultAccessMode(AccessMode.READ)
        .build())) {

    Result result = session.run("MATCH (m:movie) RETURN m.title AS title");
    processBatches(result, 10, batch -> {
        for (var record : batch) {
            System.out.println(record.get("title").asString());
        }

        // Do other intermediary work here between batch calls
    });
    result.consume();
}
```

**Example Python**  

```
async def batched(result, size):
    batch = []
    async for record in result:
        batch.append(record)
        if len(batch) >= size:
            yield batch
            batch = []
    if batch:
        yield batch

# Usage:
async with driver.session(fetch_size=50) as session:
    result = await session.run("MATCH (m:movie) RETURN m.title AS title")
    async for batch in batched(result, 10):
        for record in batch:
            print(record["title"])

        # Do other intermediary work here between batch calls
```

**Example Go**  

```
func processBatches(ctx context.Context, result neo4j.ResultWithContext,
        size int, handler func([]*neo4j.Record)) error {
    var batch []*neo4j.Record
    for result.Next(ctx) {
        batch = append(batch, result.Record())
        if len(batch) >= size {
            handler(batch)
            batch = nil
        }
    }
    if len(batch) > 0 {
        handler(batch)
    }
    return result.Err()
}

// Usage:
session := driver.NewSession(ctx, neo4j.SessionConfig{
    AccessMode: neo4j.AccessModeRead,
    FetchSize:  50,
})
defer session.Close(ctx)

result, err := session.Run(ctx,
    "MATCH (m:movie) RETURN m.title AS title", nil)
if err != nil {
    log.Fatal(err)
}

if err := processBatches(ctx, result, 10, func(batch []*neo4j.Record) {
    for _, record := range batch {
        title, _ := record.Get("title")
        fmt.Println(title)
    }

    // Do other intermediary work here between batch calls
}); err != nil {
    log.Fatal(err)
}
```

**Example .NET**  

```
static async IAsyncEnumerable<List<IRecord>> Batched(
        IResultCursor result, int size)
{
    var batch = new List<IRecord>();
    while (await result.FetchAsync())
    {
        batch.Add(result.Current);
        if (batch.Count >= size)
        {
            yield return batch;
            batch = new List<IRecord>();
        }
    }
    if (batch.Count > 0) yield return batch;
}

// Usage:
await using var session = driver.AsyncSession(o => o
    .WithFetchSize(50)
    .WithDefaultAccessMode(AccessMode.Read));

var result = await session.RunAsync("MATCH (m:movie) RETURN m.title AS title");
await foreach (var batch in Batched(result, 10))
{
    foreach (var record in batch)
    {
        Console.WriteLine(record["title"].As<string>());
    }

    // Do other intermediary work here between batch calls
}
```

**Example Node.js**  

```
// Uses for-await-of on the Result's async iterator (Neo4j driver v5+).
// This is pull-based: the driver pauses fetching while the loop body
// executes, providing natural backpressure. This is preferred over
// result.subscribe(), which is push-based and continues delivering
// records during async processing.
async function* batched(result, size) {
    let batch = [];
    for await (const record of result) {
        batch.push(record);
        if (batch.length >= size) {
            yield batch;
            batch = [];
        }
    }
    if (batch.length > 0) yield batch;
}

// Usage:
const session = driver.session({
    defaultAccessMode: neo4j.session.READ,
    fetchSize: 50
});

try {
    const result = session.run('MATCH (m:movie) RETURN m.title AS title');
    for await (const batch of batched(result, 10)) {
        for (const record of batch) {
            console.log(record.get('title'));
        }

        // Do other intermediary work here between batch calls
    }
} finally {
    await session.close();
}
```

## Konsumsi bersemangat vs. tambahan
<a name="access-graph-opencypher-streaming-avoid"></a>

Metode berikut memblokir hingga seluruh kumpulan hasil dikumpulkan ke dalam memori, mencegah aplikasi Anda bertindak berdasarkan hasil saat tiba:
+ **Jawa:** `result.list()`
+ **Python**:, `result.data()` `list(result)`
+ **Pergi:**`result.Collect(ctx)`, `neo4j.EagerResultTransformer`
+ **.NET:** `result.ToListAsync()`
+ **Node.js:** `const { records } = await result`

Untuk memproses hasil secara bertahap, gunakan pola lazy iteration dan batch helper yang ditunjukkan pada contoh di atas.