

# AWS Glue 대화형 세션에서 스트리밍 작업 수행
<a name="interactive-sessions-streaming"></a>

## 스트리밍 세션 유형 전환
<a name="interactive-sessions-switching-streaming-session-type"></a>

 AWS Glue 대화형 세션 구성 매직(`%streaming`)을 사용하여 실행 중인 작업을 정의하고 스트리밍 대화형 세션을 초기화합니다.

## 대화형 개발을 위한 샘플링 입력 스트림
<a name="w2aac29c29b7"></a>

 AWS Glue 대화형 세션의 대화형 경험을 개선하기 위해 도출한 한 가지 방법은 정적 DynamicFrame에서 스트림의 스냅샷을 얻을 수 있도록 `GlueContext`에 새로운 방법을 추가하는 것입니다. `GlueContext`는 워크플로를 검사, 상호 작용, 구현할 수 있도록 합니다.

 `GlueContext` 클래스 인스턴스를 사용하면 `getSampleStreamingDynamicFrame` 메서드를 찾을 수 있습니다. 이 메서드의 필수 인수는 다음과 같습니다.
+  `dataFrame`: Spark Streaming Dataframe 
+  `options`: 아래 사용 가능한 옵션 참조 

 사용 가능한 옵션은 다음과 같습니다.
+  **windowSize**: 마이크로 배치 기간이라고도 합니다. 이 파라미터는 이전 배치가 트리거된 후 스트리밍 쿼리가 대기하는 시간을 결정합니다. 이 파라미터 값은 `pollingTimeInMs`보다 작아야 합니다.
+  **pollingTimeInMs**: 메서드가 실행될 총 시간입니다. 입력 스트림에서 샘플 레코드를 얻기 위해 적어도 하나의 마이크로 배치를 실행합니다.
+  **recordPollingLimit**: 이 파라미터를 사용하면 스트림에서 폴링할 총 레코드 수를 제한할 수 있습니다.
+  (선택 사항) `writeStreamFunction`을 사용하여 모든 레코드 샘플링 함수에 이 사용자 지정 함수를 적용할 수도 있습니다. Scala 및 Python 예제는 아래를 참조하세요.

****  
  

```
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here}
val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}"""
val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction)
dynFrame.show()
```

```
def sample_batch_function(batch_df, batch_id):
       //Optional but you can replace your own forEachBatch function here
options = {
            "pollingTimeInMs": "10000",
            "windowSize": "5 seconds",
        }
glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
```

**참고**  
 몇 가지 이유로 인해 샘플링된 `DynFrame`이 비어 있는 경우가 발생할 수 있습니다.  
 스트리밍 소스가 ‘최신’으로 설정되어 있으며 샘플링 기간 동안 새 데이터가 수집되지 않았습니다.
 폴링 시간이 충분하지 않아 수집된 레코드를 처리할 수 없습니다. 전체 배치가 처리되지 않으면 데이터가 표시되지 않습니다.

## 대화형 세션에서 스트리밍 애플리케이션 실행
<a name="running-streaming-applications-interactive-sessions"></a>

 AWS Glue 대화형 세션에서는 AWS Glue 콘솔에서 스트리밍 애플리케이션을 생성하는 것처럼 AWS Glue 스트리밍 애플리케이션을 실행할 수 있습니다. 대화형 세션은 세션 기반이므로 런타임에 예외가 발생해도 세션이 중지되지 않습니다. 이제 배치 함수를 반복적으로 개발할 수 있다는 추가 이점이 있습니다. 예: 

```
def batch_function(data_frame, batch_id):
    log.info(data_frame.count())
    invalid_method_call()
glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
```

 위의 예에서는 잘못된 메서드 사용을 포함했고, 전체 애플리케이션을 종료하는 일반 AWS Glue 작업과는 달리 사용자의 코딩 컨텍스트 및 정의가 완전히 보존되며 세션이 여전히 작동 중입니다. 새 클러스터를 부트스트랩하고 모든 이전 변환을 다시 실행할 필요가 없습니다. 이를 통해 배치 함수 구현을 신속하게 반복하여 바람직한 결과를 얻을 수 있습니다.

 대화형 세션은 세션이 한 번에 하나의 문만 실행하도록 각 문을 차단 방식으로 평가한다는 점에 유의해야 합니다. 스트리밍 쿼리는 지속적이고 끝나지 않으므로 활성 스트리밍 쿼리가 포함된 세션은 중단되지 않는 한 어떤 후속 문도 처리할 수 없습니다. Jupyter Notebook에서 직접 중단 명령을 실행할 수 있으며 커널이 취소를 처리할 것입니다.

 실행 대기 중인 다음 일련의 문을 예로 들어 보겠습니다.

```
Statement 1:
      val number = df.count() 
      #Spark Action with deterministic result
      Result: 5
      
Statement 2:
      streamingQuery.start().awaitTermination()
      #Spark Streaming Query that will be executing continously
      Result: Constantly updated with each microbatch
      
Statement 3:
      val number2 = df.count()
      #This will not be executed as previous statement will be running indefinitely
```