

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Bekerja dengan operasi streaming dalam sesi AWS Glue interaktif
<a name="interactive-sessions-streaming"></a>

## Mengalihkan jenis sesi streaming
<a name="interactive-sessions-switching-streaming-session-type"></a>

 Gunakan sihir konfigurasi sesi AWS Glue interaktif,`%streaming`, untuk menentukan pekerjaan yang Anda jalankan dan menginisialisasi sesi interaktif streaming. 

## Sampling input stream untuk pengembangan interaktif
<a name="w2aac29c29b7"></a>

 Salah satu alat yang kami peroleh untuk membantu meningkatkan pengalaman AWS Glue interaktif dalam sesi interaktif adalah penambahan metode baru `GlueContext` untuk mendapatkan snapshot aliran dalam DynamicFrame statis. `GlueContext`memungkinkan Anda untuk memeriksa, berinteraksi, dan mengimplementasikan alur kerja Anda. 

 Dengan instance `GlueContext` kelas, Anda akan dapat menemukan metode`getSampleStreamingDynamicFrame`. Argumen yang diperlukan untuk metode ini adalah: 
+  `dataFrame`: Streaming Spark DataFrame 
+  `options`: Lihat opsi yang tersedia di bawah 

 Pilihan yang tersedia meliputi: 
+  **WindowSize**: Ini juga disebut Durasi Microbatch. Parameter ini akan menentukan berapa lama kueri streaming akan menunggu setelah batch sebelumnya dipicu. Nilai parameter ini harus lebih kecil dari`pollingTimeInMs`. 
+  **pollingTimeInMs**: Total panjang waktu metode akan berjalan. Ini akan menembakkan setidaknya satu batch mikro untuk mendapatkan catatan sampel dari aliran input. 
+  **recordPollingLimit**: Parameter ini membantu Anda membatasi jumlah total catatan yang akan Anda polling dari aliran. 
+  (Opsional) Anda juga dapat menggunakan `writeStreamFunction` untuk menerapkan fungsi kustom ini ke setiap fungsi pengambilan sampel rekaman. Lihat di bawah untuk contoh di Scala dan Python. 

****  
  

```
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here}
val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}"""
val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction)
dynFrame.show()
```

```
def sample_batch_function(batch_df, batch_id):
       //Optional but you can replace your own forEachBatch function here
options = {
            "pollingTimeInMs": "10000",
            "windowSize": "5 seconds",
        }
glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
```

**catatan**  
 Ketika sampel `DynFrame` kosong, itu bisa disebabkan oleh beberapa alasan:   
 Sumber Streaming diatur ke “Terbaru” dan tidak ada data baru yang tertelan selama periode pengambilan sampel. 
 Waktu pemungutan suara tidak cukup untuk memproses catatan yang dicerna. Data tidak akan muncul kecuali seluruh batch telah diproses. 

## Menjalankan aplikasi streaming dalam sesi interaktif
<a name="running-streaming-applications-interactive-sessions"></a>

 Dalam sesi AWS Glue interaktif, Anda dapat menjalankan aplikasi AWS Glue streaming seperti bagaimana Anda akan membuat aplikasi streaming di AWS Glue Konsol. Karena sesi interaktif berbasis sesi, menghadapi pengecualian di runtime tidak menyebabkan sesi berhenti. Kami sekarang memiliki manfaat tambahan untuk mengembangkan fungsi batch Anda secara iteratif. Contoh: 

```
def batch_function(data_frame, batch_id):
    log.info(data_frame.count())
    invalid_method_call()
glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
```

 Dalam contoh di atas, kami menyertakan penggunaan metode yang tidak valid dan tidak seperti AWS Glue pekerjaan biasa yang akan keluar dari seluruh aplikasi, konteks dan definisi pengkodean pengguna sepenuhnya dipertahankan dan sesi masih beroperasi. Tidak perlu mem-bootstrap cluster baru dan menjalankan kembali semua transformasi sebelumnya. Ini memungkinkan Anda untuk fokus pada iterasi implementasi fungsi batch Anda dengan cepat untuk mendapatkan hasil yang diinginkan. 

 Penting untuk dicatat bahwa Sesi Interaktif mengevaluasi setiap pernyataan dengan cara pemblokiran sehingga sesi hanya akan mengeksekusi satu pernyataan pada satu waktu. Karena kueri streaming terus menerus dan tidak pernah berakhir, sesi dengan kueri streaming aktif tidak akan dapat menangani pernyataan tindak lanjut apa pun kecuali jika terputus. Anda dapat mengeluarkan perintah interupsi langsung dari Jupyter Notebook dan kernel kami akan menangani pembatalan untuk Anda. 

 Ambil urutan pernyataan berikut yang menunggu eksekusi sebagai contoh: 

```
Statement 1:
      val number = df.count() 
      #Spark Action with deterministic result
      Result: 5
      
Statement 2:
      streamingQuery.start().awaitTermination()
      #Spark Streaming Query that will be executing continously
      Result: Constantly updated with each microbatch
      
Statement 3:
      val number2 = df.count()
      #This will not be executed as previous statement will be running indefinitely
```