

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 在 AWS Glue 互動式工作階段中使用串流操作
<a name="interactive-sessions-streaming"></a>

## 切換串流工作階段類型
<a name="interactive-sessions-switching-streaming-session-type"></a>

 使用 AWS Glue 互動式工作階段組態魔術命令 `%streaming`，定義您正在執行的任務並初始化串流互動式工作階段。

## 為互動式開發抽樣輸入串流
<a name="w2aac29c29b7"></a>

 為幫助增強 AWS Glue 互動式工作階段中的互動式體驗，我們開發出的一個工具是在 `GlueContext` 下新增了一種新方法，該方法用於獲取靜態 DynamicFrame 中的串流快照。`GlueContext` 讓您能夠檢查、互動並實作您的工作流程。

 使用 `GlueContext` 類執行個體，您將能夠找到方法 `getSampleStreamingDynamicFrame`。此方法所需的引數為：
+  `dataFrame`：Spark Streaming DataFrame 
+  `options`：請參閱下列可用的選項 

 可用選項包括：
+  **windowSize**：這也稱為微量批持續時間。此參數將判定觸發前一個批次後串流查詢將等待的時長。此參數值必須小於 `pollingTimeInMs`。
+  **pollingTimeInMs**：方法將執行的總時長。它將至少觸發一個微批次，以從輸入串流中獲取樣本記錄。
+  **recordPollingLimit**：此參數可幫助您限制將從串流輪詢的記錄總數。
+  (選用) 您也可以使用 `writeStreamFunction` 將此自訂函數應用至每個記錄抽樣函數。請參閱下列 Scala 和 Python 中的範例。

****  
  

```
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here}
val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}"""
val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction)
dynFrame.show()
```

```
def sample_batch_function(batch_df, batch_id):
       //Optional but you can replace your own forEachBatch function here
options = {
            "pollingTimeInMs": "10000",
            "windowSize": "5 seconds",
        }
glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
```

**注意**  
 當抽樣 `DynFrame` 為空白時，它可能是由以下幾個原因造成的：  
 串流來源設定為「最新」，而且在抽樣週期期間沒有擷取任何新資料。
 輪詢時間不足，無法處理它擷取的記錄。除非整個批次處理完畢，否則資料不會顯示。

## 在互動式工作階段中執行串流應用程式
<a name="running-streaming-applications-interactive-sessions"></a>

 在 AWS Glue 互動式工作階段中，您可以執行 AWS Glue 串流應用程式，就像您在 AWS Glue 主控台中建立串流應用程式一樣。由於互動式工作階段是以工作階段為基礎，因此在執行時間遇到異常情形不會導致工作階段停止。反覆開發批次函數現在帶來了額外好處。例如：

```
def batch_function(data_frame, batch_id):
    log.info(data_frame.count())
    invalid_method_call()
glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
```

 上面的範例中包含了對方法的無效使用，與將退出整個應用程式的常規 AWS Glue 任務不同的是，使用者的編碼內容和定義得到完全保留，且工作階段仍然可運作。無需引導新叢集並重新執行之前的所有轉換。這樣，您就可以專注於快速逐一查看批次函數實作，以獲得理想的結果。

 需要注意的是，互動式工作階段會以封鎖的方式評估每個陳述句，以便讓工作階段一次只執行一個陳述句。由於串流查詢是連續的並且永不結束，因此具有作用中串流查詢的工作階段將無法處理任何後續陳述句，除非它們被中斷。您可以直接從 Jupyter 筆記本發出中斷命令，我們的核心會為您處理取消操作。

 以下列正在等待執行的陳述句序列為範例：

```
Statement 1:
      val number = df.count() 
      #Spark Action with deterministic result
      Result: 5
      
Statement 2:
      streamingQuery.start().awaitTermination()
      #Spark Streaming Query that will be executing continously
      Result: Constantly updated with each microbatch
      
Statement 3:
      val number2 = df.count()
      #This will not be executed as previous statement will be running indefinitely
```