Utilizzo delle operazioni di streaming in sessioni AWS Glue interattive - AWS Glue

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo delle operazioni di streaming in sessioni AWS Glue interattive

Commutazione del tipo di sessione streaming

Utilizzo della configurazione magic sessioni interattive AWS Glue, %streaming, per definire il processo che si sta eseguendo e inizializzare una sessione interattiva in streaming.

Flusso di input di campionamento per lo sviluppo interattivo

Uno strumento che abbiamo sviluppato per migliorare l'esperienza interattiva nelle sessioni AWS Glue interattive è l'aggiunta di un nuovo metodo GlueContext per ottenere un'istantanea di uno stream in modalità statica DynamicFrame. GlueContextconsente di ispezionare, interagire e implementare il flusso di lavoro.

Con l'istanza di classe GlueContext, sarai in grado di localizzare il metodo getSampleStreamingDynamicFrame. Gli argomenti richiesti per questo metodo sono:

  • dataFrame: Lo Spark Streaming DataFrame

  • options: vedi le opzioni disponibili di seguito

Le opzioni disponibili includono:

  • windowSize: viene chiamato anche durata del microbatch. Questo parametro determinerà la durata di attesa di una query di streaming dopo l'attivazione del batch precedente. Il valore del parametro deve essere inferiore a pollingTimeInMs.

  • pollingTimeInMs: La durata totale dell'esecuzione del metodo. Esso spedirà almeno un micro batch per ottenere registri campione dal flusso di input.

  • recordPollingLimit: Questo parametro consente di limitare il numero totale di record che verranno esaminati dallo stream.

  • (Facoltativo) È possibile utilizzare anche writeStreamFunction per applicare questa funzione personalizzata a ogni funzione di campionamento del registro. Vedi di seguito alcuni esempi in Scala e Python.

Scala
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
Python
def sample_batch_function(batch_df, batch_id): //Optional but you can replace your own forEachBatch function here options = { "pollingTimeInMs": "10000", "windowSize": "5 seconds", } glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
Nota

Se il DynFrame d'esempio è vuoto, le ragioni possono essere varie:

  • La fonte di streaming è impostata su "Più recente" e non sono stati inseriti nuovi dati durante il periodo di campionamento.

  • Il tempo del polling non è sufficiente per elaborare i registri importati. I dati non verranno visualizzati a meno che l'intero batch non sia stato elaborato.

Esecuzione di applicazioni di streaming in sessioni interattive

Nelle sessioni interattive AWS Glue, è possibile eseguire un'applicazione di streaming AWS Glue nello stesso modo in cui si creerebbe un'applicazione di streaming nella Console AWS Glue. Poiché le sessioni interattive sono basate su sessione, l'individuazione di eccezioni nel runtime non provoca l'interruzione della sessione. Ora abbiamo l'ulteriore vantaggio di sviluppare iterativamente la funzione batch. Ad esempio:

def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})

Nell'esempio precedente, abbiamo incluso un utilizzo non valido di un metodo e a differenza dei normali processi AWS Glue che usciranno dall'intera applicazione, le definizioni e il contesto di codifica dell'utente vengono conservati interamente e la sessione è ancora operativa. Non è necessario avviare un nuovo cluster e rieseguire tutte le trasformazioni precedenti. Ciò consente di concentrarsi sull'iterazione rapida delle implementazioni delle funzioni batch per ottenere risultati desiderati.

È importante notare che le sessioni interattive valutano ogni istruzione in modo bloccante per far sì che la sessione esegua una sola istruzione alla volta. Poiché le query di streaming sono continue e senza fine, le sessioni con query di streaming attive non saranno in grado di gestire alcuna istruzione di follow-up a meno che non vengano interrotte. Puoi emettere il comando di interruzione direttamente da Jupyter Notebook e il nostro kernel gestirà la cancellazione per te.

Prendi come esempio la seguente sequenza di istruzioni in attesa dell'esecuzione:

Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely