

# 在 AWS Glue 交互式会话中使用流式传输操作
<a name="interactive-sessions-streaming"></a>

## 切换串流会话类型
<a name="interactive-sessions-switching-streaming-session-type"></a>

 使用 AWS Glue 交互式会话配置魔法命令 `%streaming` 以定义您正在运行的任务并初始化串流交互式会话。

## 用于交互式开发的采样输入流式传输
<a name="w2aac29c29b7"></a>

 皆在帮助提升 AWS Glue 交互式会话中交互式体验的一种派生工具是在 `GlueContext` 下添加一种新方法，以获取静态 DynamicFrame 中流式传输的快照。`GlueContext` 允许您检查、交互和实施工作流。

 使用 `GlueContext` 类实例，您将能够找到方法 `getSampleStreamingDynamicFrame`。此方法要求的参数为: 
+  `dataFrame`：Spark Streaming DataFrame 
+  `options`：查看以下可用选项 

 可用选项包括: 
+  **windowSize**：这也称为微批处理持续时间。此参数将确定在触发前一批处理后串流查询的等待时间。此参数值必须小于 `pollingTimeInMs`。
+  **pollingTimeInMs**：方法将运行的总时间长度。它将触发至少一个微批处理，以从输入流式传输中获取样本记录。
+  **recordPollingLimit**：此参数帮助您限制从流式传输中轮询的记录的总数。
+  （可选）您也可以使用 `writeStreamFunction` 将此自定义函数应用于每个记录采样函数。有关 Scala 和 Python 中的示例，请参阅以下内容。

****  
  

```
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here}
val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}"""
val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction)
dynFrame.show()
```

```
def sample_batch_function(batch_df, batch_id):
       //Optional but you can replace your own forEachBatch function here
options = {
            "pollingTimeInMs": "10000",
            "windowSize": "5 seconds",
        }
glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
```

**注意**  
 在采样的 `DynFrame` 为空时，可能是由以下几个原因造成的：  
 流式传输源设置为“Latest（最新）”，并且在采样周期内没有摄入新数据。
 轮询时间不足以处理其摄入的记录。除非整个批处理处理完毕，否则数据不会显示。

## 在交互式会话中运行串流应用程序
<a name="running-streaming-applications-interactive-sessions"></a>

 在 AWS Glue 交互式会话中，您可以运行 AWS Glue 串流应用程序，就像您在 AWS Glue 控制台中创建串流应用程序一样。由于交互式会话基于会话，因此在运行时遇到异常不会导致会话停止。目前,我们具有以迭代方式开发批处理函数的额外优势。例如：

```
def batch_function(data_frame, batch_id):
    log.info(data_frame.count())
    invalid_method_call()
glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
```

 我们在以上例子中包括了方法的无效用法，与退出整个应用程序的常规 AWS Glue 任务不同，用户的编码上下文和定义都完全保留，并且会话仍然在运行。无需引导启动新集群和重新运行所有之前的转换。这使您可以专注于快速迭代批处理函数实施以获得理想的结果。

 需要注意的是，交互式会话以阻塞方式评估每个语句，以便会话一次仅能执行一条语句。由于流式传输查询将始终连续且永不结束，具有有效流式传输查询的会话将无法处理任何后续语句，除非这些会话中断。您可以直接从 Jupyter Notebook 发出中断命令，我们的内核将为您处理取消。

 以下列正在等待执行的语句序列为例：

```
Statement 1:
      val number = df.count() 
      #Spark Action with deterministic result
      Result: 5
      
Statement 2:
      streamingQuery.start().awaitTermination()
      #Spark Streaming Query that will be executing continously
      Result: Constantly updated with each microbatch
      
Statement 3:
      val number2 = df.count()
      #This will not be executed as previous statement will be running indefinitely
```