Arbeiten mit Streaming-Vorgängen in interaktiven AWS Glue-Sitzungen - AWS Glue

Arbeiten mit Streaming-Vorgängen in interaktiven AWS Glue-Sitzungen

Wechseln des Streaming-Sitzungstyps

Verwenden Sie das Konfigurations-Magic von AWS Glue-interaktiven Sitzungen, %streaming, um den Auftrag zu definieren, den Sie ausführen, und eine interaktive Streaming-Sitzung zu initialisieren.

Sampling des Eingabestreams für die interaktive Entwicklung

Ein Hilfsmittel, das wir abgeleitet haben, um das interaktive Erlebnis in AWS Glue-interaktiven Sitzungen zu verbessern, ist eine neue Methode unter GlueContext, mit der Sie einen Snapshot eines Streams in einem statischen DynamicFrame erhalten. GlueContext ermöglicht Ihnen, Ihren Workflow zu untersuchen, zu implementieren und damit zu interagieren.

Anhand der GlueContext-Klassen-Instance können Sie die Methode getSampleStreamingDynamicFrame finden. Erforderliche Argumente für diese Methode sind:

  • dataFrame: Der Spark-Streaming-DataFrame

  • options: Verfügbare Optionen siehe unten

Verfügbare Optionen:

  • windowSize: Dies wird auch als „Microbatch Duration“ bezeichnet. Dieser Parameter bestimmt, wie lange eine Streaming-Abfrage wartet, nachdem der vorherige Batch ausgelöst wurde. Der Parameterwert muss kleiner sein als pollingTimeInMs.

  • pollingTimeInMs: Die Gesamtlaufzeit der Methode. Sie löst mindestens einen Microbatch aus, um Beispieldatensätze aus dem Eingabestream abzurufen.

  • recordPollingLimit: Dieser Parameter hilft Ihnen, die Gesamtzahl der Datensätze zu begrenzen, die aus dem Stream abgefragt werden.

  • (Optional) Sie können auch writeStreamFunction verwenden, um diese benutzerdefinierte Funktion auf jede Datensatz-Sampling-Funktion anzuwenden. Beispiele in Scala und Python finden Sie unten.

Scala
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
Python
def sample_batch_function(batch_df, batch_id): //Optional but you can replace your own forEachBatch function here options = { "pollingTimeInMs": "10000", "windowSize": "5 seconds", } glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
Anmerkung

Wenn der DynFrame aus dem Sampling leer ist, kann das mehrere Gründe haben:

  • Die Streaming-Quelle ist auf „Neueste“ eingestellt und während des Sampling-Zeitraums wurden keine neuen Daten aufgenommen.

  • Die Abfragezeit reicht nicht aus, um die aufgenommenen Datensätze zu verarbeiten. Daten werden erst angezeigt, wenn der gesamte Batch verarbeitet wurde.

Ausführen von Streaming-Anwendungen in interaktive Sitzungen

In AWS Glue-interaktiven Sitzungen können Sie eine AWS Glue-Streaming-Anwendung so ausführen, wie Sie eine Streaming-Anwendung in der AWS Glue-Konsole erstellen würden. Da interaktive Sitzungen sitzungsbasiert sind, führen Ausnahmen in der Laufzeit nicht dazu, dass die Sitzung beendet wird. Wir haben jetzt den zusätzlichen Vorteil, Ihre Batch-Funktion iterativ entwickeln zu können. Zum Beispiel:

def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})

Im obigen Beispiel haben wir die ungültige Nutzung einer Methode eingebaut. Im Gegensatz zu normalen AWS Glue-Aufträgen, bei denen dadurch die gesamte Anwendung beendet wird, bleiben Codierungskontext und -definitionen des Benutzers vollständig erhalten und die Sitzung ist weiterhin aktiv. Sie müssen keinen Bootstrap für einen neuen Cluster durchführen und die gesamte vorangegangene Transformation erneut ausführen. So können Sie sich auf die schnelle Iteration Ihrer Batch-Funktionsimplementierungen konzentrieren, um die gewünschten Ergebnisse zu erzielen.

Beachten Sie, dass Interactive Sessions jede Anweisung blockierend auswertet, sodass die Sitzung immer nur eine Anweisung nach der anderen ausführt. Da Streaming-Abfragen kontinuierlich ausgeführt werden und niemals enden, können Sitzungen mit aktiven Streaming-Abfragen keine Follow-up-Anweisungen verarbeiten, ohne unterbrochen zu werden. Sie können den Unterbrechungsbefehl direkt in Jupyter Notebook ausgeben und unser Kernel wird den Abbruch für Sie vornehmen.

Das folgende Beispiel enthält eine Abfolge von Anweisungen, die auf die Ausführung warten:

Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely