Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Utilizzo delle operazioni di streaming in sessioni AWS Glue interattive
Commutazione del tipo di sessione streaming
Utilizzo della configurazione magic sessioni interattive AWS Glue, %streaming
, per definire il processo che si sta eseguendo e inizializzare una sessione interattiva in streaming.
Flusso di input di campionamento per lo sviluppo interattivo
Uno strumento che abbiamo sviluppato per migliorare l'esperienza interattiva nelle sessioni AWS Glue interattive è l'aggiunta di un nuovo metodo GlueContext
per ottenere un'istantanea di uno stream in modalità statica DynamicFrame. GlueContext
consente di ispezionare, interagire e implementare il flusso di lavoro.
Con l'istanza di classe GlueContext
, sarai in grado di localizzare il metodo getSampleStreamingDynamicFrame
. Gli argomenti richiesti per questo metodo sono:
-
dataFrame
: Lo Spark Streaming DataFrame -
options
: vedi le opzioni disponibili di seguito
Le opzioni disponibili includono:
-
windowSize: viene chiamato anche durata del microbatch. Questo parametro determinerà la durata di attesa di una query di streaming dopo l'attivazione del batch precedente. Il valore del parametro deve essere inferiore a
pollingTimeInMs
. -
pollingTimeInMs: La durata totale dell'esecuzione del metodo. Esso spedirà almeno un micro batch per ottenere registri campione dal flusso di input.
-
recordPollingLimit: Questo parametro consente di limitare il numero totale di record che verranno esaminati dallo stream.
-
(Facoltativo) È possibile utilizzare anche
writeStreamFunction
per applicare questa funzione personalizzata a ogni funzione di campionamento del registro. Vedi di seguito alcuni esempi in Scala e Python.
Nota
Se il DynFrame
d'esempio è vuoto, le ragioni possono essere varie:
-
La fonte di streaming è impostata su "Più recente" e non sono stati inseriti nuovi dati durante il periodo di campionamento.
-
Il tempo del polling non è sufficiente per elaborare i registri importati. I dati non verranno visualizzati a meno che l'intero batch non sia stato elaborato.
Esecuzione di applicazioni di streaming in sessioni interattive
Nelle sessioni interattive AWS Glue, è possibile eseguire un'applicazione di streaming AWS Glue nello stesso modo in cui si creerebbe un'applicazione di streaming nella Console AWS Glue. Poiché le sessioni interattive sono basate su sessione, l'individuazione di eccezioni nel runtime non provoca l'interruzione della sessione. Ora abbiamo l'ulteriore vantaggio di sviluppare iterativamente la funzione batch. Ad esempio:
def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
Nell'esempio precedente, abbiamo incluso un utilizzo non valido di un metodo e a differenza dei normali processi AWS Glue che usciranno dall'intera applicazione, le definizioni e il contesto di codifica dell'utente vengono conservati interamente e la sessione è ancora operativa. Non è necessario avviare un nuovo cluster e rieseguire tutte le trasformazioni precedenti. Ciò consente di concentrarsi sull'iterazione rapida delle implementazioni delle funzioni batch per ottenere risultati desiderati.
È importante notare che le sessioni interattive valutano ogni istruzione in modo bloccante per far sì che la sessione esegua una sola istruzione alla volta. Poiché le query di streaming sono continue e senza fine, le sessioni con query di streaming attive non saranno in grado di gestire alcuna istruzione di follow-up a meno che non vengano interrotte. Puoi emettere il comando di interruzione direttamente da Jupyter Notebook e il nostro kernel gestirà la cancellazione per te.
Prendi come esempio la seguente sequenza di istruzioni in attesa dell'esecuzione:
Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely