

# Trabalhar com operações de streaming em sessões do AWS Glue interativas
<a name="interactive-sessions-streaming"></a>

## Alterar o tipo de sessão de transmissão
<a name="interactive-sessions-switching-streaming-session-type"></a>

 Use a mágica de configuração das sessões interativas do AWS Glue, `%streaming`, para definir o trabalho que você está executando e inicializar uma sessão interativa de transmissão. 

## Amostrar o fluxo de entrada para desenvolvimento interativo
<a name="w2aac29c29b7"></a>

 Uma ferramenta que obtivemos para ajudar a aprimorar a experiência interativa em sessões interativas do AWS Glue é a adição de um novo método em `GlueContext` para obter um snapshot de um fluxo em um DynamicFrame estático. `GlueContext` permite inspecionar, interagir e implementar seu fluxo de trabalho. 

 Com a instância de classe `GlueContext`, você poderá localizar o método `getSampleStreamingDynamicFrame`. Os argumentos necessários para esse método são: 
+  `dataFrame`: o DataFrame de streaming do Spark 
+  `options`: veja as opções disponíveis abaixo 

 As opções disponíveis incluem: 
+  **windowSize**: também chamado de Microbatch Duration (Duração de microlote). Esse parâmetro determinará quanto tempo uma consulta de transmissão aguardará após o acionamento do lote anterior. Esse valor de parâmetro deve ser inferior a `pollingTimeInMs`. 
+  **pollingTimeInMs**: o período total do tempo de execução do método. Ele acionará pelo menos um microlote para obter registros de amostra do fluxo de entrada. 
+  **recordPollingLimit**: esse parâmetro ajuda você a limitar o número total de registros que você vai sondar no fluxo. 
+  (Opcional) Você também pode usar `writeStreamFunction` para aplicar essa função personalizada a cada função de amostragem de registro. Veja abaixo exemplos em Scala e Python. 

****  
  

```
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here}
val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}"""
val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction)
dynFrame.show()
```

```
def sample_batch_function(batch_df, batch_id):
       //Optional but you can replace your own forEachBatch function here
options = {
            "pollingTimeInMs": "10000",
            "windowSize": "5 seconds",
        }
glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
```

**nota**  
 Quando o `DynFrame` amostrado estiver vazio, isso pode acontecer por alguns motivos:   
 A fonte de transmissão está definida como “Latest” (Mais recente), e nenhum novo dado foi ingerido durante o período de amostragem. 
 O tempo de sondagem não é suficiente para processar os registros ingeridos. Os dados não serão exibidos, a menos que o lote inteiro tenha sido processado. 

## Executar aplicativos de transmissão em sessões interativas
<a name="running-streaming-applications-interactive-sessions"></a>

 Em sessões interativas do AWS Glue, você pode executar o aplicativo de transmissão do AWS Glue assim como criaria um aplicativo de transmissão no console do AWS Glue. Como as sessões interativas são baseadas em sessão, encontrar exceções no runtime não faz com que a sessão seja interrompida. Agora temos o benefício adicional de desenvolver sua função em lote iterativamente. Por exemplo: 

```
def batch_function(data_frame, batch_id):
    log.info(data_frame.count())
    invalid_method_call()
glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
```

 No exemplo acima, incluímos um uso inválido de um método e, ao contrário dos trabalhos normais do AWS Glue que sairão de toda a aplicação, o contexto de codificação e as definições do usuário são totalmente preservados e a sessão permanece operacional. Não há necessidade de fazer o bootstrap de um novo cluster e executar novamente toda a transformação anterior. Isso permite que você mantenha o foco em iterar rapidamente suas implementações de função em lote para obter resultados desejáveis. 

 É importante observar que o Interactive Session avalia cada instrução de maneira bloqueadora para que a sessão execute apenas uma instrução por vez. Como as consultas de transmissão são contínuas e infinitas, as sessões com consultas de transmissão ativa não poderão processar instruções de acompanhamento a menos que sejam interrompidas. Você pode emitir o comando de interrupção diretamente do Jupyter Notebook e nosso kernel processará o cancelamento para você. 

 Considere como exemplo a seguinte sequência de instruções que estão aguardando a execução: 

```
Statement 1:
      val number = df.count() 
      #Spark Action with deterministic result
      Result: 5
      
Statement 2:
      streamingQuery.start().awaitTermination()
      #Spark Streaming Query that will be executing continously
      Result: Constantly updated with each microbatch
      
Statement 3:
      val number2 = df.count()
      #This will not be executed as previous statement will be running indefinitely
```