

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# AWS Glue Streaming-Konzepte
<a name="glue-streaming-concepts"></a>

 Die folgenden Abschnitte enthalten Informationen zu AWS Glue Streaming-Konzepten. 

**Topics**
+ [Aufbau eines AWS Glue Streaming-Jobs](#glue-streaming-anatomy)

## Aufbau eines AWS Glue Streaming-Jobs
<a name="glue-streaming-anatomy"></a>

 AWS Glue Streaming-Jobs basieren auf dem Spark-Streaming-Paradigma und nutzen strukturiertes Streaming aus dem Spark-Framework. Streaming-Aufträge fragen in einem bestimmten Zeitintervall ständig die Streaming-Datenquelle ab, um Datensätze als Micro-Batches abzurufen. In den folgenden Abschnitten werden die verschiedenen Teile eines AWS Glue Streaming-Jobs untersucht. 

![Der Screenshot zeigt ein Amazon CloudWatch Monitoring-Protokoll AWS Glue für das oben angegebene Beispiel. Es berücksichtigt die Anzahl der benötigten Executors (orange Linie) und skaliert die Executors (blaue Linie) entsprechend, ohne dass eine manuelle Anpassung erforderlich ist.](http://docs.aws.amazon.com/de_de/glue/latest/dg/images/glue-streaming-anatomy.png)


### forEachBatch
<a name="glue-streaming-anatomy-batch"></a>

 Die `forEachBatch` Methode ist der Einstiegspunkt eines ausgeführten Streaming-Jobs. AWS Glue AWS Glue Streaming-Jobs verwenden `forEachBatch` diese Methode, um Daten abzufragen. Sie funktioniert wie ein Iterator, der während des gesamten Lebenszyklus des Streaming-Jobs aktiv bleibt, die Streaming-Quelle regelmäßig nach neuen Daten abfragt und die neuesten Daten in Mikrobatches verarbeitet. 

```
glueContext.forEachBatch(
    frame=dataFrame_AmazonKinesis_node1696872487972,    
    batch_function=processBatch,
    options={
        "windowSize": "100 seconds",
        "checkpointLocation": args["TempDir"] + "/" + args["JOB_NAME"] + "/checkpoint/",
    },
)
```

 Konfigurieren Sie die Eigenschaft `frame` von `forEachBatch`, um eine Streaming-Quelle anzugeben. In diesem Beispiel wird der Quellknoten, den Sie bei der Auftragserstellung auf der leeren Arbeitsfläche erstellt haben, mit der Standardeinstellung DataFrame des Jobs gefüllt. Legen Sie die Eigenschaft `batch_function` als die `function` fest, das Sie für jede Micro-Batch-Operation aufrufen möchten. Sie müssen eine Funktion definieren, die die Batch-Transformation der eingehenden Daten übernimmt. 

### Quelle
<a name="glue-streaming-anatomy-source"></a>

 Im ersten Schritt der `processBatch` Funktion überprüft das Programm die Anzahl der Datensätze der Eigenschaft DataFrame , die Sie als Frame definiert haben. `forEachBatch` Das Programm fügt einem Wert, der nicht leer ist, einen Zeitstempel für die Aufnahme an. DataFrame Die `data_frame.count()>0`-Klausel bestimmt, ob der letzte Micro-Batch nicht leer ist und für die weitere Verarbeitung bereit ist. 

```
def processBatch(data_frame, batchId):
    if data_frame.count() >0:       
        AmazonKinesis_node1696872487972 = DynamicFrame.fromDF(
            glueContext.add_ingestion_time_columns(data_frame, "hour"),
            glueContext,
            "from_data_frame",
        )
```

### Mapping (Zuordnung)
<a name="glue-streaming-anatomy-mapping"></a>

 Im nächsten Abschnitt des Programms wird Mapping angewendet. Die `Mapping.apply` Methode auf einem Spark DataFrame ermöglicht es Ihnen, Transformationsregeln für Datenelemente zu definieren. Normalerweise können Sie die Quelldatenspalte umbenennen, den Datentyp ändern oder eine benutzerdefinierte Funktion darauf anwenden und diese den Zielspalten zuordnen. 

```
    #Script generated for node ChangeSchema        
    ChangeSchema_node16986872679326 = ApplyMapping.apply(
        frame =  AmazonKinesis_node1696872487972,
        mappings = [
            ("eventtime", "string", "eventtime", "string"),
            ("manufacturer", "string", "manufacturer", "string"),
            ("minutevolume", "long", "minutevolume", "int"),
            ("o2stats", "long", "OxygenSaturation", "int"),
            ("pressurecontrol", "long", "pressurecontrol", "int"),
            ("serialnumber", "string", "serialnumber", "string"),
            ("ventilatorid", "long", "ventilatorid", "long"),
            ("ingest_year", "string", "ingest_year", "string"),
            ("ingest_month", "string", "ingest_month", "string"),
            ("ingest_day", "string", "ingest_day", "string"),
            ("ingest_hour", "string", "ingest_hour", "string"),
        ],
        transformation_ctx="ChangeSchema_node16986872679326",
    )
        )
```

### Sink
<a name="glue-streaming-anatomy-sink"></a>

 In diesem Abschnitt werden die von der Streaming-Quelle eingehenden Datensätze an einem Zielort gespeichert. In diesem Beispiel werden wir die Daten in einen Amazon-S3-Speicherort schreiben. Die `AmazonS3_node_path`-Eigenschaftsdetails werden entsprechend den Einstellungen, die Sie bei der Auftragserstellung in der Vorlage verwendet haben, vorausgefüllt. Sie können `updateBehavior` auf der Grundlage Ihres Anwendungsfalls einstellen und entscheiden, ob Sie die Datenkatalogtabelle nicht aktualisieren, einen Datenkatalog erstellen und das Datenkatalogschema bei nachfolgenden Ausführungen aktualisieren oder eine Katalogtabelle erstellen und die Schemadefinition bei nachfolgenden Ausführungen nicht aktualisieren. 

 Die Eigenschaft `partitionKeys` definiert die Speicherpartitionsoption. Standardmäßig werden die Daten gemäß dem im Quellabschnitt zur Verfügung gestellten `ingestion_time_columns` aufgeteilt. Mit dieser `compression`-Eigenschaft können Sie den Komprimierungsalgorithmus festlegen, der beim Schreiben des Ziels angewendet werden soll. Sie haben die Möglichkeit, Snappy, LZO oder GZIP als Komprimierungstechnik einzustellen. Die `enableUpdateCatalog`-Eigenschaft bestimmt, ob die AWS Glue -Katalogtabelle aktualisiert werden muss. Verfügbare Optionen für diese Eigenschaft sind `True` oder `False`. 

```
    #Script generated for node Amazon S3        
    AmazonS3_node1696872743449 = glueContext.getSink(
        path =  AmazonS3_node1696872743449_path,
        connection_type = "s3",
        updateBehavior = "UPDATE_IN_DATABASE",
        partitionKeys = ["ingest_year", "ingest_month", "ingest_day", "ingest_hour"],
        compression = "snappy",
        enableUpdateCatalog = True,
        transformation_ctx = "AmazonS3_node1696872743449",
    )
```

### AWS Glue Katalogsenke
<a name="glue-streaming-anatomy-catalog-sink"></a>

 Dieser Abschnitt des Jobs steuert das Verhalten bei der Aktualisierung der AWS Glue Katalogtabelle. Satz `catalogDatabase` und `catalogTableName` Eigenschaft entsprechend Ihrem AWS Glue Katalogdatenbanknamen und dem Tabellennamen, der mit dem AWS Glue Job verknüpft ist, den Sie entwerfen. Sie können das Dateiformat der Zieldaten über die `setFormat`-Eigenschaft festlegen. In diesem Beispiel werden wir die Daten im Parquet-Format speichern. 

 Sobald Sie den AWS Glue Streaming-Job eingerichtet und ausgeführt haben, der auf dieses Tutorial verweist, Amazon Kinesis Data Streams werden die Streaming-Daten, die unter erzeugt wurden, am Amazon S3 S3-Standort in einem Parquet-Format mit schneller Komprimierung gespeichert. Bei erfolgreichen Ausführungen des Streaming-Auftrags können Sie die Daten über Amazon Athena abfragen. 

```
       
    AmazonS3_node1696872743449 = setCatalogInfo(
        catalogDatabase =  "demo", catalogTableName = "demo_stream_transform_result"
    )
    AmazonS3_node1696872743449.setFormat("glueparquet")
    AmazonS3_node1696872743449.writeFormat("ChangeSchema_node16986872679326")
    )
```