

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Apache Flink
<a name="emr-flink"></a>

[Apache Flink](https://flink.apache.org/) ist eine Streaming-Datenfluss-Engine, die mittels Datenquellen mit hohem Durchsatz die Echtzeitverarbeitung von Streams ermöglicht. Flink unterstützt Ereigniszeitsemantik für out-of-order Ereignisse, Exactly-Once-Semantik und Gegendruckkontrolle und ist für das Schreiben von Streaming- und APIs Batch-Anwendungen optimiert.

Außerdem verfügt Flink über Konnektoren für Datenquellen von Drittanbieter, wie z. B. die folgenden:
+ [Amazon Kinesis Data Streams](https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kinesis.html)
+ [Apache Kafka](https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/kafka.html)
+ [Flink-Elasticsearch-Konnektor](https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/elasticsearch2.html)
+ [Twitter-Streaming-API](https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/twitter.html)
+ [Cassandra](https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/connectors/cassandra.html)

Amazon EMR unterstützt Flink als YARN-Anwendung, sodass Sie Ressourcen zusammen mit anderen Anwendungen innerhalb eines Clusters verwalten können. Flink-on-YARNermöglicht es Ihnen, vorübergehende Flink-Jobs einzureichen, oder Sie können einen Cluster mit langer Laufzeit erstellen, der mehrere Jobs akzeptiert und Ressourcen entsprechend der gesamten YARN-Reservierung zuweist.

Flink ist in Amazon EMR Version 5.1.0 und höher enthalten.

**Anmerkung**  
Support für die `FlinkKinesisConsumer`-Klasse wurde mit Amazon-EMR-Version 5.2.1 bereitgestellt.

Die folgende Tabelle listet die Version von Flink auf, die in der neuesten Version der Amazon-EMR-7.x-Serie enthalten ist, zusammen mit den Komponenten, die Amazon EMR mit Flink installiert.

[Die Version der Komponenten, die in dieser Version mit Flink installiert wurden, finden Sie unter Komponentenversionen von Version 7.12.0.](emr-7120-release.md)


**Flink-Versionsinformationen für emr-7.12.0**  

| Amazon-EMR-Versionsbezeichnung | Flink-Version | Mit Flink installierte Komponenten | 
| --- | --- | --- | 
| emr-7.12.0 | Flink 1.20.0-amzn-6 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-hdfs-zkfc, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 

Die folgende Tabelle listet die Version von Flink auf, die in der neuesten Version der Amazon-EMR-6.x-Serie enthalten ist, zusammen mit den Komponenten, die Amazon EMR mit Flink installiert.

Die Version der Komponenten, die mit Flink in dieser Version installiert wurden, finden Sie unter [Komponentenversionen der Version 6.15.0](emr-6150-release.md).


**Flink-Versionsinformationen für emr-6.15.0**  

| Amazon-EMR-Versionsbezeichnung | Flink-Version | Mit Flink installierte Komponenten | 
| --- | --- | --- | 
| emr-6.15.0 | Flink 1.17.1-amzn-1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 

Die folgende Tabelle listet die Version von Flink auf, die in der neuesten Version der Amazon-EMR-5.x-Serie enthalten ist, zusammen mit den Komponenten, die Amazon EMR mit Flink installiert.

[Informationen zur Version der Komponenten, die in dieser Version mit Flink installiert wurden, finden Sie unter Komponentenversionen von Version 5.36.2.](emr-5362-release.md)


**Flink-Versionsinformationen für emr-5.36.2**  

| Amazon-EMR-Versionsbezeichnung | Flink-Version | Mit Flink installierte Komponenten | 
| --- | --- | --- | 
| emr-5.36.2 | Flink 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 

**Topics**
+ [Erstellen eines Clusters mit Flink](flink-create-cluster.md)
+ [Konfiguration von Flink in Amazon EMR](flink-configure.md)
+ [Arbeiten mit Flink-Jobs in Amazon EMR](flink-jobs.md)
+ [Verwenden der Scala-Shell](flink-scala.md)
+ [Suchen der Flink Webschnittstelle](flink-web-interface.md)
+ [Flink Autoscaler](flink-autoscaler.md)
+ [Optimieren der Neustartzeiten von Aufträgen für die Aufgabenwiederherstellung und -skalierung](flink-restart.md)
+ [Arbeiten mit Flink-Aufträgen von Zeppelin in Amazon EMR](flink-zeppelin.md)
+ [Versionsverlauf von Flink](Flink-release-history.md)

# Erstellen eines Clusters mit Flink
<a name="flink-create-cluster"></a>

Sie können einen Cluster mit dem AWS-Managementkonsole AWS CLI, oder einem AWS SDK starten.<a name="emr-flink-create-console"></a>

**So starten Sie einen Cluster mit Flink über die Konsole**

1. Öffnen Sie die Amazon EMR-Konsole unter [https://console.aws.amazon.com/emr](https://console.aws.amazon.com/emr/).

1. Wählen Sie **Create Cluster (Cluster erstellen)** und **Go to advanced options (Zu erweiterten Optionen)** aus.

1.  Wählen Sie für das Feld **Software Configuration (Softwarekonfiguration)** die Option **EMR Release emr-5.1.0** oder höher aus.

1.  Wählen Sie **Flink** als Anwendung zusammen mit anderen zu installierenden Anwendungen aus.

1.  Wählen Sie nach Bedarf weitere Optionen und anschließend **Create cluster (Cluster erstellen)** aus.

**Um einen Cluster mit Flink von der AWS CLI**
+ Erstellen Sie den Cluster mit dem folgenden Befehl:

  ```
  aws emr create-cluster --release-label emr-7.12.0 \
  --applications Name=Flink \
  --region us-east-1 \
  --log-uri s3://myLogUri \
  --instance-type m5.xlarge \
  --instance-count 2 \
  --service-role EMR_DefaultRole_V2 \ 
  --ec2-attributes KeyName=MyKeyName,InstanceProfile=EMR_EC2_DefaultRole \
  --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
  Args=flink-yarn-session,-d
  ```
**Anmerkung**  
Linux-Zeilenfortsetzungszeichen (\$1) sind aus Gründen der Lesbarkeit enthalten. Sie können entfernt oder in Linux-Befehlen verwendet werden. Entfernen Sie sie unter Windows oder ersetzen Sie sie durch ein Caret-Zeichen (^).

# Konfiguration von Flink in Amazon EMR
<a name="flink-configure"></a>

## Konfigurieren Sie Flink mit Hive Metastore und Glue Catalog
<a name="flink-configure-hive"></a>

Amazon EMR-Versionen 6.9.0 und höher unterstützen sowohl Hive Metastore als auch AWS Glue Catalog mit dem Apache Flink-Connector zu Hive. In diesem Abschnitt werden die Schritte beschrieben, die zur Konfiguration von [AWS Glue Catalog](#flink-configure-hive-glue) und [Hive Metastore](#flink-configure-hive-metastore) mit Flink erforderlich sind.

**Topics**
+ [Verwenden Sie den Hive Metastore](#flink-configure-hive-metastore)
+ [Verwenden Sie den AWS Glue-Datenkatalog](#flink-configure-hive-glue)

### Verwenden Sie den Hive Metastore
<a name="flink-configure-hive-metastore"></a>

1. Erstellen Sie einen EMR-Cluster mit Version 6.9.0 oder höher und mindestens zwei Anwendungen: **Hive** und **Flink**. 

1. Verwenden Sie [Script Runner](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html), um das folgende Skript als Schrittfunktion auszuführen:

   `hive-metastore-setup.sh`

   ```
   sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib 
   sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib 
   sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
   sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar 
   sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar 
   sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
   sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
   ```  
![\[Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/hive.png)

### Verwenden Sie den AWS Glue-Datenkatalog
<a name="flink-configure-hive-glue"></a>

1. Erstellen Sie einen EMR-Cluster mit Version 6.9.0 oder höher und mindestens zwei Anwendungen: **Hive** und **Flink**. 

1. Wählen Sie in den Einstellungen des AWS Glue Data Catalogs die Option **Für Hive-Tabellenmetadaten verwenden** aus, um den Data Catalog im Cluster zu aktivieren.

1. Verwenden Sie [Script-Runner](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html), um das folgende Skript als Schrittfunktion auszuführen: [Befehle und Skripts auf einem Amazon-EMR-Cluster ausführen](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-commandrunner.html):

   glue-catalog-setup.sh 

   ```
   sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib 
   sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib 
   sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib 
   sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib
   sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar 
   sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar 
   sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar 
   sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar
   sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
   ```  
![\[Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/hive.png)

## Konfigurieren Sie Flink mit einer Konfigurationsdatei
<a name="flink-configure-config"></a>

Sie können die Amazon-EMR-Konfigurations-API verwenden, um Flink mit einer Konfigurationsdatei zu konfigurieren. Folgende Dateien sind in der API konfigurierbar:
+ `flink-conf.yaml`
+ `log4j.properties`
+ `flink-log4j-session`
+ `log4j-cli.properties`

Die Hauptkonfigurationsdatei für Flink heißt `flink-conf.yaml`. 

**Um die Anzahl der Task-Slots zu konfigurieren, die für Flink verwendet werden, von AWS CLI**

1. Erstellen Sie eine Datei, `configurations.json`, mit folgendem Inhalt:

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "taskmanager.numberOfTaskSlots":"2"
         }
       }
   ]
   ```

1. Im nächsten Schritt erstellen Sie einen Cluster mit der folgenden Konfiguration:

   ```
   aws emr create-cluster --release-label emr-7.12.0 \
   --applications Name=Flink \
   --configurations file://./configurations.json \
   --region us-east-1 \
   --log-uri s3://myLogUri \
   --instance-type m5.xlarge \
   --instance-count 2 \
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
   ```

**Anmerkung**  
Sie können einige Konfigurationen auch mit der Flink-API ändern. Weitere Informationen finden Sie unter [https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/index.html](https://ci.apache.org/projects/flink/flink-docs-release-1.12/concepts/index.html) in der Flink-Dokumentation.  
Ab Amazon-EMR-Version 5.21.0 können Sie Cluster-Konfigurationen überschreiben und zusätzliche Konfigurationsklassifikationen für jede Instance-Gruppe in einem ausgeführten Cluster angeben. Dazu verwenden Sie die Amazon EMR-Konsole, das AWS Command Line Interface (AWS CLI) oder das AWS SDK. Weitere Informationen finden Sie unter [Angeben einer Konfiguration für eine Instance-Gruppe in einem aktiven Cluster](https://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-configure-apps-running-cluster.html).

### Parallelitätsoptionen
<a name="flink-parallelism"></a>

Als Eigentümer Ihrer Anwendung wissen Sie am besten, welche Ressourcen Aufgaben innerhalb von Flink zugewiesen werden müssen. Für Beispiele in dieser Dokumentation verwenden Sie die gleiche Anzahl von Aufgaben wie die Aufgaben-Instances, die Sie für die Anwendung nutzen. Wir empfehlen diese Vorgehensweise generell für die anfängliche Parallelität. Sie können jedoch die Granularität der Parallelität mithilfe von Aufgaben-Slots erhöhen. Dabei sollte die Anzahl von [virtuellen Cores](https://aws.amazon.com/ec2/virtualcores/) pro Instance im Allgemeinen nicht überschritten werden. Weitere Informationen über die Architektur von Flink finden Sie unter [https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html](https://ci.apache.org/projects/flink/flink-docs-master/concepts/index.html) in der Flink-Dokumentation.

## Konfigurieren von Flink auf einem EMR-Cluster mit mehreren Primärknoten
<a name="flink-multi-master"></a>

Der JobManager von Flink bleibt während des Failover-Prozesses für den primären Knoten in einem Amazon EMR-Cluster mit mehreren Primärknoten verfügbar. Ab Amazon EMR 5.28.0 wird JobManager Hochverfügbarkeit auch automatisch aktiviert. Es ist keine manuelle Konfiguration erforderlich.

Bei Amazon EMR-Versionen 5.27.0 oder früher JobManager handelt es sich um einen einzigen Fehlerpunkt. Wenn der JobManager fehlschlägt, gehen alle Job-Status verloren und die laufenden Jobs werden nicht wieder aufgenommen. Sie können JobManager Hochverfügbarkeit aktivieren, indem Sie die Anzahl der Anwendungsversuche, das Checkpointing und die Aktivierung ZooKeeper als Statusspeicher für Flink konfigurieren, wie das folgende Beispiel zeigt:

```
[
  {
    "Classification": "yarn-site",
    "Properties": {
      "yarn.resourcemanager.am.max-attempts": "10"
    }
  },
  {
    "Classification": "flink-conf",
    "Properties": {
        "yarn.application-attempts": "10",
        "high-availability": "zookeeper",
        "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}",
        "high-availability.storageDir": "hdfs:///user/flink/recovery",
        "high-availability.zookeeper.path.root": "/flink"
    }
  }
]
```

Sie müssen sowohl die maximalen Anwendungsmasterversuche für YARN als auch die maximalen Anwendungsversuche für Flink konfigurieren. Weitere Informationen finden Sie unter [Konfiguration der Hochverfügbarkeit des YARN-Clusters](https://ci.apache.org/projects/flink/flink-docs-release-1.8/ops/jobmanager_high_availability.html#maximum-application-master-attempts-yarn-sitexml). Möglicherweise möchten Sie auch das Flink-Checkpointing so konfigurieren, dass neu gestartete Jobs von zuvor abgeschlossenen Checkpoints JobManager wiederhergestellt werden. Weitere Informationen finden Sie unter [Flink-Checkpointing](https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html).

## Konfiguration der Größe des Speicherprozesses
<a name="flink-process-memory"></a>

Für Amazon EMR-Versionen, die Flink 1.11.x verwenden, müssen Sie die Gesamtspeicherprozessgröße sowohl für () als auch für JobManager (`jobmanager.memory.process.size`) in konfigurieren. TaskManager `taskmanager.memory.process.size` `flink-conf.yaml` Sie können diese Werte festlegen, indem Sie entweder den Cluster mit der Konfigurations-API konfigurieren oder diese Felder manuell über SSH auskommentieren. Flink bietet die folgenden Standardwerte.
+ `jobmanager.memory.process.size`: 1 600 m
+ `taskmanager.memory.process.size`: 1 728 m

Um JVM-Metaspace und Overhead auszuschließen, verwenden Sie die gesamte Flink-Speichergröße (`taskmanager.memory.flink.size`) anstelle von `taskmanager.memory.process.size`. Der Standardwert von `taskmanager.memory.process.size` beträgt 1 280 m. Es wird nicht empfohlen, sowohl `taskmanager.memory.process.size` als auch `taskmanager.memory.process.size` zu setzen.

Alle Amazon-EMR-Versionen, die Flink 1.12.0 und höher verwenden, haben die im Open-Source-Set für Flink aufgeführten Standardwerte als Standardwerte für Amazon EMR, sodass Sie sie nicht selbst konfigurieren müssen.

## Größe der Protokollausgabedatei konfigurieren
<a name="flink-log-output"></a>

Flink-Anwendungscontainer erstellen drei Arten von Protokolldateien und schreiben in sie: `.out`-Dateien, `.log`-Dateien und `.err`-Dateien. Nur `.err`-Dateien werden komprimiert und aus dem Dateisystem entfernt, während `.log`- und `.out`-Protokolldateien im Dateisystem verbleiben. Um sicherzustellen, dass diese Ausgabedateien verwaltbar bleiben und der Cluster stabil bleibt, können Sie die Protokollrotation in `log4j.properties` so konfigurieren, dass eine maximale Anzahl von Dateien festgelegt und deren Größe begrenzt wird.

**Amazon EMR 5.30.0 und höher**

Ab Amazon EMR 5.30.0 verwendet Flink das Logging-Framework log4j2 mit dem Namen der Konfigurationsklassifikation `flink-log4j.`. Die folgende Beispielkonfiguration demonstriert das log4j2-Format.

```
[
  {
    "Classification": "flink-log4j",
    "Properties": {
      "appender.main.name": "MainAppender",
      "appender.main.type": "RollingFile",
      "appender.main.append" : "false",
      "appender.main.fileName" : "${sys:log.file}",
      "appender.main.filePattern" : "${sys:log.file}.%i",
      "appender.main.layout.type" : "PatternLayout",
      "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n",
      "appender.main.policies.type" : "Policies",
      "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy",
      "appender.main.policies.size.size" : "100MB",
      "appender.main.strategy.type" : "DefaultRolloverStrategy",
      "appender.main.strategy.max" : "10"
    },
  }
]
```

**Amazon-EMR-Versionen 5.29.0 und früher**

Mit den Amazon-EMR-Versionen 5.29.0 und früher verwendet Flink das Logging-Framework log4j. Die folgende Beispielkonfiguration veranschaulicht das log4j Format.

```
[
  {
    "Classification": "flink-log4j",
    "Properties": {
      "log4j.appender.file": "org.apache.log4j.RollingFileAppender",
      "log4j.appender.file.append":"true",
      # keep up to 4 files and each file size is limited to 100MB
      "log4j.appender.file.MaxFileSize":"100MB",
      "log4j.appender.file.MaxBackupIndex":4,
      "log4j.appender.file.layout":"org.apache.log4j.PatternLayout",
      "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n"
    },
  }
]
```

## Flink für die Ausführung mit Java 11 konfigurieren
<a name="flink-configure-java11"></a>

Amazon-EMR-Versionen 6.12.0 und höher bieten Java-11-Laufzeitunterstützung für Flink. In den folgenden Abschnitten wird beschrieben, wie der Cluster so konfiguriert wird, dass er Java-11-Laufzeitunterstützung für Flink bereitstellt.

**Topics**
+ [Konfigurieren Sie Flink für Java 11, wenn Sie einen Cluster erstellen](#flink-configure-java11-create)
+ [Flink für Java 11 auf einem laufenden Cluster konfigurieren](#flink-configure-java11-update)
+ [Bestätigen Sie die Java-Laufzeit für Flink auf einem laufenden Cluster](#flink-configure-java11-confirm)

### Konfigurieren Sie Flink für Java 11, wenn Sie einen Cluster erstellen
<a name="flink-configure-java11-create"></a>

Führen Sie die folgenden Schritte aus, um einen EMR-Cluster mit Flink und Java-11-Laufzeit zu erstellen. Die Konfigurationsdatei, in der Sie die Java-11-Laufzeitunterstützung hinzufügen, befindet sich. `flink-conf.yaml`

------
#### [ Console ]

**Um einen Cluster mit Flink und Java 11-Runtime in der Konsole zu erstellen**

1. Melden Sie sich bei der AWS-Managementkonsole an und öffnen Sie die Amazon EMR-Konsole unter [https://console.aws.amazon.com/emr](https://console.aws.amazon.com/emr).

1. Wählen Sie im Navigationsbereich unter **EMR in EC2** die Option **Cluster** und dann **Cluster erstellen** aus.

1. Wählen Sie Amazon-EMR-Version 6.12.0 oder höher und installieren Sie die Flink-Anwendung. Wählen Sie alle anderen Anwendungen aus, die Sie auf Ihrem Cluster installieren möchten.

1. Fahren Sie mit der Einrichtung Ihres Clusters fort. Verwenden Sie im Bereich optionale **Softwareeinstellungen** die Standardoption **Konfiguration eingeben** und geben Sie die folgende Konfiguration ein:

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "env.java.home":"/usr/lib/jvm/jre-11"
         }
       }
   ]
   ```

1. Fahren Sie mit der Einrichtung und dem Start Ihres Clusters fort.

------
#### [ AWS CLI ]

**Um einen Cluster mit Flink und Java-11-Laufzeit in der neuen Konsole zu erstellen**

1. Erstellen Sie eine Konfigurationsdatei `configurations.json`, die Flink für die Verwendung von Java 11 konfiguriert. 

   ```
   [
       {
         "Classification": "flink-conf",
         "Properties": {
           "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
           "env.java.home":"/usr/lib/jvm/jre-11"
         }
       }
   ]
   ```

1. Erstellen Sie aus dem AWS CLI einen neuen EMR-Cluster mit Amazon EMR Version 6.12.0 oder höher und installieren Sie die Flink-Anwendung, wie im folgenden Beispiel gezeigt:

   ```
   aws emr create-cluster --release-label emr-6.12.0 \ 
   --applications Name=Flink \ 
   --configurations file://./configurations.json \ 
   --region us-east-1 \ 
   --log-uri s3://myLogUri \ 
   --instance-type m5.xlarge \ 
   --instance-count 2 \ 
   --service-role EMR_DefaultRole_V2 \ 
   --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
   ```

------

### Flink für Java 11 auf einem laufenden Cluster konfigurieren
<a name="flink-configure-java11-update"></a>

Gehen Sie wie folgt vor, um einen laufenden EMR-Cluster mit Flink und Java 11-Laufzeit zu aktualisieren. Die Konfigurationsdatei, in der Sie die Java-11-Laufzeitunterstützung hinzufügen, befindet sich. `flink-conf.yaml`

------
#### [ Console ]

**Um einen laufenden Cluster mit Flink und Java 11-Runtime in der Konsole zu aktualisieren**

1. Melden Sie sich bei der AWS-Managementkonsole an und öffnen Sie die Amazon EMR-Konsole unter [https://console.aws.amazon.com/emr](https://console.aws.amazon.com/emr).

1. Wählen Sie im Navigationsbereich unter **EMR in EC2** die Option **Cluster** und dann den Cluster aus, den Sie aktualisieren möchten.
**Anmerkung**  
Der Cluster muss Amazon-EMR-Version 6.12.0 oder höher verwenden, um Java 11 zu unterstützen.

1. Wählen Sie die Registerkarte **Konfigurationen** aus.

1. Wählen Sie im Abschnitt **Instance-Gruppenkonfigurationen** die Instance-Gruppe **Running** aus, die Sie aktualisieren möchten, und wählen Sie dann **Neukonfiguration** aus dem Menü mit den Listenaktionen aus.

1. Konfigurieren Sie die Instance-Gruppe mit der Option **Attribute bearbeiten** wie folgt neu. Wählen Sie nach jeder Konfiguration **Neue Konfiguration hinzufügen** aus.    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/flink-configure.html)

1. Wählen Sie **Änderungen speichern** aus um die Konfigurationen hinzuzufügen. .

------
#### [ AWS CLI ]

**Um einen laufenden Cluster für die Verwendung von Flink und Java-11-Laufzeit über die CLI zu aktualisieren**

Verwenden Sie den Befehl `modify-instance-groups`, um eine neue Konfiguration für eine Instance-Gruppe in einem laufenden Cluster anzugeben.

1. Erstellen Sie eine Konfigurationsdatei `configurations.json`, die Flink für die Verwendung von Java 11 konfiguriert. Ersetzen Sie es im folgenden Beispiel *ig-1xxxxxxx9* durch die ID für die Instanzgruppe, die Sie neu konfigurieren möchten. Speichern Sie die Datei im selben Verzeichnis, in dem Sie den `modify-instance-groups`-Befehl ausführen werden.

   ```
   [
      {
         "InstanceGroupId":"ig-1xxxxxxx9",
         "Configurations":[
            {
               "Classification":"flink-conf",
               "Properties":{
                 "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
                 "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11",
                 "env.java.home":"/usr/lib/jvm/jre-11"
               },
               "Configurations":[]
            }
         ]
      }
   ]
   ```

1. Führen Sie in AWS CLI der den folgenden Befehl aus. Ersetzen Sie die ID für die Instance-Gruppe, die Sie neu konfigurieren möchten:

   ```
   aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \
   --instance-groups file://configurations.json
   ```

------

### Bestätigen Sie die Java-Laufzeit für Flink auf einem laufenden Cluster
<a name="flink-configure-java11-confirm"></a>

Um die Java-Laufzeit für einen laufenden Cluster zu ermitteln, melden Sie sich mit SSH beim Primärknoten an, wie unter Mit [SSH mit dem Primärknoten verbinden](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) beschrieben. Führen Sie anschließend den folgenden Befehl aus:

```
ps -ef | grep flink
```

Der `ps`-Befehl mit der `-ef`-Option listet alle laufenden Prozesse auf dem System auf. Sie können diese Ausgabe mit `grep` filtern, um Erwähnungen der Zeichenfolge `flink` zu finden. Überprüfen Sie die Ausgabe für den Java-Laufzeitumgebung (JRE)-Wert, `jre-XX`. In der folgenden Ausgabe wird `jre-11` angegeben, dass Java 11 zur Laufzeit von Flink abgerufen wird.

```
flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.
```

Alternativ können Sie [sich mit SSH am Primärknoten anmelden](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) und eine Flink YARN-Sitzung mit dem Befehl `flink-yarn-session -d` starten. Die Ausgabe zeigt die Java Virtual Machine (JVM) für Flink, `java-11-amazon-corretto` im folgenden Beispiel:

```
2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64
```

# Arbeiten mit Flink-Jobs in Amazon EMR
<a name="flink-jobs"></a>

Es gibt mehrere Möglichkeiten, mit Flink in Amazon EMR zu interagieren: über die Konsole, die Flink-Oberfläche auf der ResourceManager Tracking-Benutzeroberfläche und über die Befehlszeile. Mit jedem dieser Programme können Sie eine JAR-Datei an eine Flink-Anwendung senden. Sobald Sie eine JAR-Datei eingereicht haben, wird sie zu einem Job, der vom Flink verwaltet wird. JobManager Der JobManager befindet sich auf dem YARN-Knoten, der den Application Master-Daemon für die Flink-Sitzung hostet.

Sie können eine Flink-Anwendung als YARN-Auftrag auf einem Cluster mit langer Laufzeit oder auf einem vorübergehenden Cluster ausführen. In einem Cluster mit langer Laufzeit können Sie mehrere Flink-Aufträge an einen Flink-Cluster senden, der auf Amazon EMR ausgeführt wird. Wenn Sie einen Flink-Auftrag auf einem vorübergehenden Cluster ausführen, existiert Ihr Amazon-EMR-Cluster nur für die Zeit, die zum Ausführen der Flink-Anwendung benötigt wird, sodass Ihnen nur die verbrauchten Ressourcen und die Zeit in Rechnung gestellt werden. Sie können einen Flink-Job mit der Amazon `AddSteps` EMR-API-Operation als Schrittargument für die `RunJobFlow` Operation und über die Befehle AWS CLI `add-steps` oder `create-cluster` einreichen.

## Eine Flink-YARN-Anwendung als Schritt auf einem Cluster mit langer Laufzeit starten
<a name="flink-add-step"></a>

Um eine Flink-Anwendung zu starten, an die mehrere Clients über YARN-API-Operationen Arbeit einreichen können, müssen Sie entweder einen Cluster erstellen oder eine Flink-Anwendung zu einem vorhandenen Cluster hinzufügen. Eine Anleitung zur Erstellung eines neuen Clusters finden Sie unter [Erstellen eines Clusters mit Flink](flink-create-cluster.md). Um eine YARN-Sitzung auf einem vorhandenen Cluster zu starten, führen Sie die folgenden Schritte über die Konsole, AWS CLI oder das Java-SDK aus.

**Anmerkung**  
Der Befehl `flink-yarn-session` wurde in Amazon-EMR-Version 5.5.0 als Wrapper für das Skript `yarn-session.sh` zur Vereinfachung der Ausführung hinzugefügt. Wenn Sie eine frühere Version von Amazon EMR verwenden, ersetzen Sie `bash -c "/usr/lib/flink/bin/yarn-session.sh -d"` mit **Argumente** in der Konsole oder `Args` im AWS CLI -Befehl.

**So senden Sie einen Flink-Auftrag auf einem vorhandenen Cluster von der Konsole aus**

Senden Sie die Flink-Sitzung mit dem Befehl `flink-yarn-session` in einem vorhandenen Cluster.

1. Öffnen Sie die Amazon EMR-Konsole unter [https://console.aws.amazon.com/emr](https://console.aws.amazon.com/emr/).

1. Wählen Sie in der Cluster-Liste den Cluster aus, den Sie zuvor gestartet haben.

1. Wählen Sie auf der Cluster-Detailseite **Steps (Schritte)** und **Add Step (Schritt hinzufügen)** aus.

1. Befolgen Sie die folgenden Richtlinien, um die Parameter einzugeben, und wählen Sie dann **Hinzufügen** aus.  
****    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/flink-jobs.html)

**Um einen Flink-Job auf einem vorhandenen Cluster einzureichen mit AWS CLI**
+ Verwenden Sie den `add-steps`-Befehl, um einem Cluster mit langer Laufzeit einen Flink-Auftrag hinzuzufügen. Der folgende Beispielbefehl gibt `Args="flink-yarn-session", "-d"` an, sodass eine Flink-Sitzung innerhalb Ihres YARN-Clusters in einem getrennten Zustand (`-d`) gestartet werden soll. Weitere Informationen finden Sie unter [YARN-Einrichtung](https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/yarn_setup.html#flink-yarn-session) in der aktuellen Flink-Dokumentation für Argumentdetails.

  ```
  aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"
  ```

## Senden Sie Ihre Arbeit an eine bestehende Flink-Anwendung auf einem Cluster mit langer Laufzeit
<a name="flink-submit-work"></a>

Wenn Sie bereits eine bestehende Flink-Anwendung auf einem Cluster mit langer Laufzeit haben, können Sie die Flink-Anwendungs-ID des Clusters angeben, um Arbeit an diesen zu senden. Um die Anwendungs-ID zu erhalten, führen Sie `yarn application -list` den Vorgang AWS CLI oder über den [YarnClient](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/yarn/client/api/YarnClient.html)API-Vorgang aus:

```
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id    Application-Name    Application-Type    User    Queue    State    Final-State    Progress    Tracking-URL
application_1473169569237_0002    Flink session with 14 TaskManagers (detached)	        Apache Flink	    hadoop	   default	           RUNNING	         UNDEFINED	           100%	http://ip-10-136-154-194.ec2.internal:33089
```

Die Anwendungs-ID für diese Flink-Sitzung lautet`application_1473169569237_0002`, mit der Sie Arbeiten aus dem AWS CLI oder einem SDK an die Anwendung einreichen können.

**Example SDK für Java**  

```
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
  
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
    .withJar("command-runner.jar")
    .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", 
      "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/");
  
StepConfig flinkRunWordCount = new StepConfig()
  .withName("Flink add a wordcount step")
  .withActionOnFailure("CONTINUE")
  .withHadoopJarStep(flinkWordCountConf);
  
stepConfigs.add(flinkRunWordCount); 
  
AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest()
   .withJobFlowId("myClusterId")
   .withSteps(stepConfigs));
```

**Example AWS CLI**  

```
aws emr add-steps --cluster-id <j-XXXXXXXX> \
--steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\
Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\
"/usr/lib/flink/examples/streaming/WordCount.jar",\
"--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \
--region <region-code>
```

## Senden eines kurzlebigen Flink-Auftrags
<a name="flink-transient-job"></a>

Die folgenden Beispiele starten einen vorübergehenden Cluster, der einen Flink-Auftrag ausführt und dann nach Abschluss beendet wird.

**Example SDK für Java**  

```
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;

public class Main_test {

	public static void main(String[] args) {
		AWSCredentials credentials_profile = null;
		try {
			credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
		} catch (Exception e) {
			throw new AmazonClientException(
					"Cannot load credentials from .aws/credentials file. " +
							"Make sure that the credentials file exists and the profile name is specified within it.",
					e);
		}

		AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
				.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
				.withRegion(Regions.US_WEST_1)
				.build();

		List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
		HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
				.withJar("command-runner.jar")
				.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
						"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
						"s3://path/to/output/");

		StepConfig flinkRunWordCountStep = new StepConfig()
				.withName("Flink add a wordcount step and terminate")
				.withActionOnFailure("CONTINUE")
				.withHadoopJarStep(flinkWordCountConf);

		stepConfigs.add(flinkRunWordCountStep);

		Application flink = new Application().withName("Flink");

		RunJobFlowRequest request = new RunJobFlowRequest()
				.withName("flink-transient")
				.withReleaseLabel("emr-5.20.0")
				.withApplications(flink)
				.withServiceRole("EMR_DefaultRole")
				.withJobFlowRole("EMR_EC2_DefaultRole")
				.withLogUri("s3://path/to/my/logfiles")
				.withInstances(new JobFlowInstancesConfig()
						.withEc2KeyName("myEc2Key")
						.withEc2SubnetId("subnet-12ab3c45")
						.withInstanceCount(3)
						.withKeepJobFlowAliveWhenNoSteps(false)
						.withMasterInstanceType("m4.large")
						.withSlaveInstanceType("m4.large"))
				.withSteps(stepConfigs);

		RunJobFlowResult result = emr.runJobFlow(request);
		System.out.println("The cluster ID is " + result.toString());

	}

}
```

**Example AWS CLI**  
Verwenden Sie den Unterbefehl `create-cluster`, um einen kurzlebigen Cluster zu erstellen, der beendet wird, wenn der Flink-Auftrag abgeschlossen ist:  

```
aws emr create-cluster --release-label emr-5.2.1 \
--name "Flink_Transient" \
--applications Name=Flink \
--configurations file://./configurations.json \
--region us-east-1 \
--log-uri s3://myLogUri \
--auto-terminate
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \ 
--ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \
--steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\
Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar
--input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""
```

# Verwenden der Scala-Shell
<a name="flink-scala"></a>

Die Flink-Scala-Shell für EMR-Cluster ist so konfiguriert, dass nur neue YARN Sitzungen starten. Sie können die Scala-Shell wie im Folgenden beschrieben verwenden.

**Den Flink Scala-Shell auf dem Primärknoten verwenden**

1. Melden Sie sich mit SSH beim Primärknoten an, wie unter Mit [SSH mit dem Primärknoten verbinden](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) beschrieben.

1. Geben Sie Folgendes ein, um eine Shell zu starten:

   In Amazon EMR Version 5.5.0 und höher können Sie den folgenden Befehl verwenden, um einen Yarn-Cluster für die Scala Shell mit einem zu starten. TaskManager

   ```
   % flink-scala-shell yarn 1
   ```

   In früheren Versionen von Amazon EMR verwenden Sie:

   ```
   % /usr/lib/flink/bin/start-scala-shell.sh yarn 1
   ```

   Damit wird die Flink Scala-Shell gestartet, sodass Sie Flink interaktiv nutzen können. Wie bei anderen Schnittstellen und Optionen können Sie im Beispiel verwendeten Optionswert `-n` basierend auf der Anzahl der Aufgaben, die Sie in der Shell ausführen möchten, skalieren.

   Weitere Informationen finden Sie unter [Scala REPL](https://ci.apache.org/projects/flink/flink-docs-release-1.10/ops/scala_shell.html) in der offiziellen Apache-Flink-Dokumentation.

# Suchen der Flink Webschnittstelle
<a name="flink-web-interface"></a>

Der Application Master, der zur Flink-Anwendung gehört, hostet das Flink-Webinterface. Dies ist eine alternative Möglichkeit, ein JAR als Auftrag einzureichen oder den aktuellen Status anderer Aufträge einzusehen. Die Flink-Webschnittstelle ist aktiv, solange eine Flink-Sitzung ausgeführt wird. Wenn Sie bereits einen YARN-Job mit langer Laufzeit aktiv haben, können Sie den Anweisungen im Thema [Connect to the primary node with SSH](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-ssh.html) im *Amazon EMR Management Guide* folgen, um eine Verbindung zum YARN herzustellen. ResourceManager Wenn Sie beispielsweise einen SSH-Tunnel eingerichtet und einen Proxy in Ihrem Browser aktiviert haben, wählen Sie die ResourceManager Verbindung unter **Verbindungen** auf Ihrer EMR-Cluster-Detailseite aus.

![\[Resource Manager link under Connections section in Cluster details page.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/resourcemanager.png)


Nachdem Sie die gefunden haben ResourceManager, wählen Sie die YARN-Anwendung aus, die eine Flink-Sitzung hostet. Wählen Sie den Link unter der Spalte **Tracking UI** aus. 

![\[Application details table showing a running Apache Flink session with ApplicationMaster link.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/resourcemanager2.png)


In der Flink-Webschnittstelle können Sie die Konfiguration ansehen, Ihre eigene benutzerdefinierte JAR-Datei als Auftrag senden oder in Bearbeitung befindliche Aufträge überwachen. 

![\[Apache Flink Dashboard overview showing task managers, slots, and job statistics.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/flink.png)


# Flink Autoscaler
<a name="flink-autoscaler"></a>

## -Übersicht
<a name="flink-autoscaler-overview"></a>

Amazon-EMR-Versionen 6.15.0 und höher unterstützen *Flink Autoscaler*. Die Auftrag-Autoscaler-Funktion sammelt Metriken von laufenden Flink-Streaming-Aufträgen und skaliert automatisch die einzelnen Scheitelpunkte der Aufträge. Dadurch wird der Gegendruck reduziert und das von Ihnen festgelegte Nutzungsziel wird erfüllt.

Weitere Informationen finden Sie im Abschnitt [Autoscaler](https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main/docs/custom-resource/autoscaler/) in der Dokumentation zu *Apache Flink Kubernetes Operator*.

## Überlegungen
<a name="flink-autoscaler-considerations"></a>
+ Flink Autoscaler wird mit Amazon EMR 6.15.0 und höher unterstützt.
+ Flink Autoscaler wird nur für Streaming-Aufträge unterstützt.
+ Nur der adaptive Scheduler wird unterstützt. Der Standard-Scheduler wird nicht unterstützt.
+ Wir empfehlen Ihnen, die Cluster-Skalierung zu aktivieren, um eine dynamische Ressourcenbereitstellung zu ermöglichen. Amazon EMR Managed Scaling wird bevorzugt, weil die Metrikauswertung alle 5 bis 10 Sekunden erfolgt. In diesem Intervall kann sich Ihr Cluster leichter an die Änderung der erforderlichen Cluster-Ressourcen anpassen.

## Autoscaler aktivieren
<a name="flink-autoscaler-start"></a>

Gehen Sie wie folgt vor, um den Flink Autoscaler zu aktivieren, wenn Sie einen Amazon EMR in einem EC2-Cluster erstellen.

1. Erstellen Sie in der Amazon-EMR-Konsole einen neuen EMR-Cluster:

   1. Wählen Sie Amazon EMR Version `emr-6.15.0` oder höher aus. Wählen Sie das **Flink**-Anwendungspaket und alle anderen Anwendungen aus, die Sie möglicherweise in Ihren Cluster aufnehmen möchten.  
![\[Application bundle options for Amazon EMRCluster, with Flink highlighted and selected.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/emr-flink-cluster-create.png)

   1. Wählen Sie unter Option **Cluster scaling and provisioning** (Cluster-Skalierung und -Bereitstellung) **Use EMR-managed scaling** (EMR-verwaltete Skalierung verwenden) aus.  
![\[Cluster scaling options: manual, EMR-managed (selected), or custom automatic scaling.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/emr-flink-cluster-managedscaling.png)

1. Geben Sie im Abschnitt **Software settings** (Softwareeinstellungen) die folgende Konfiguration ein, um Flink Autoscaler zu aktivieren. Stellen Sie für Testszenarien das Entscheidungsintervall, das Metrikfensterintervall und das Stabilisierungsintervall auf einen niedrigeren Wert ein, sodass der Auftrag sofort eine Skalierungsentscheidung trifft, um die Überprüfung zu vereinfachen.

   ```
   [
     {
       "Classification": "flink-conf",
       "Properties": {
         "job.autoscaler.enabled": "true",
         "jobmanager.scheduler": "adaptive",
         "job.autoscaler.stabilization.interval": "60s",
         "job.autoscaler.metrics.window": "60s",
         "job.autoscaler.decision.interval": "10s",
         "job.autoscaler.debug.logs.interval": "60s"
       }
     }
   ]
   ```

1. Wählen oder konfigurieren Sie alle anderen Einstellungen nach Ihren Wünschen und erstellen Sie den Flink-Autoscaler-fähigen Cluster.

## Autoscaler-Konfigurationen
<a name="flink-autoscaler-config"></a>

In diesem Abschnitt werden die meisten Konfigurationen behandelt, die Sie je nach Ihren spezifischen Anforderungen ändern können.

**Anmerkung**  
Bei zeitbasierten Konfigurationen wie `time`-, `interval`- und `window`-Einstellungen ist die Standardeinheit, wenn keine Einheit angegeben ist, Millisekunden. Ein Wert von `30` ohne Suffix entspricht also 30 Millisekunden. Geben Sie für andere Zeiteinheiten das entsprechende Suffix `s` für *Sekunden*, `m` für *Minuten* oder `h` für *Stunden* an.

**Topics**
+ [Loop-Konfigurationen](#flink-autoscaler-config-loop)
+ [Metriken und Verlaufskonfigurationen](#flink-autoscaler-config-metrics)
+ [Scheitelpunktkonfigurationen](#flink-autoscaler-config-vertex)
+ [Backlog-Konfigurationen](#flink-autoscaler-config-backlog)
+ [Vorgangskonfigurationen skalieren](#flink-autoscaler-config-scale)

### Autoscaler-Loop-Konfigurationen
<a name="flink-autoscaler-config-loop"></a>

Autoscaler ruft die Metriken auf Ebene des Auftragsscheitelpunkts alle paar konfigurierbare Zeitintervalle ab, wandelt sie in umsetzbare Maßeinheiten um, schätzt die Parallelität neuer Auftragsscheitelpunkte und empfiehlt sie dem Auftrags-Scheduler. Metriken werden erst nach dem Neustart des Auftrags und dem Cluster-Stabilisierungsintervall erfasst.


| Konfigurationsschlüssel | Standardwert | Description | Beispielwerte | 
| --- | --- | --- | --- | 
| job.autoscaler.enabled | false | Aktivieren Sie die automatische Skalierung auf Ihrem Flink-Cluster. | true, false | 
| job.autoscaler.decision.interval | 60s | Autoscaler-Entscheidungsintervall. | 30 (Standardeinheit ist Millisekunden), 5m, 1h | 
| job.autoscaler.restart.time | 3m | Die erwartete Neustartzeit wird verwendet, bis der Bediener sie anhand des Verlaufs zuverlässig ermitteln kann. | 30 (Standardeinheit ist Millisekunden), 5m, 1h | 
| job.autoscaler.stabilization.interval | 300s | Stabilisierungszeitraum, in dem keine neue Skalierung durchgeführt wird. | 30 (Standardeinheit ist Millisekunden), 5m, 1h | 
| job.autoscaler.debug.logs.interval | 300s | Intervall für Autoscaler-Debug-Protokolle. | 30 (Standardeinheit ist Millisekunden), 5m, 1h | 

### Aggregation von Metriken und Verlaufskonfigurationen
<a name="flink-autoscaler-config-metrics"></a>

Autoscaler ruft die Metriken ab, aggregiert sie über ein zeitbasiertes gleitendes Fenster und wertet sie zu Skalierungsentscheidungen aus. Der Verlauf der Skalierungsentscheidungen für jeden Auftragsscheitelpunkt wird verwendet, um die neue Parallelität abzuschätzen. Diese haben sowohl ein altersabhängiges Verfallsdatum als auch eine historische Größe (mindestens 1).


| Konfigurationsschlüssel | Standardwert | Description | Beispielwerte | 
| --- | --- | --- | --- | 
| job.autoscaler.metrics.window | 600s | Scaling metrics aggregation window size. | 30 (Standardeinheit ist Millisekunden), 5m, 1h | 
| job.autoscaler.history.max.count | 3 | Maximale Anzahl vergangener Skalierungsentscheidungen, die pro Scheitelpunkt beibehalten werden sollen. | 1 auf Integer.MAX\$1VALUE | 
| job.autoscaler.history.max.age | 24h | Mindestanzahl vergangener Skalierungsentscheidungen, die pro Scheitelpunkt beibehalten werden sollen. | 30 (Standardeinheit ist Millisekunden), 5m, 1h | 

### Konfigurationen auf Ebene des Auftragsscheitelpunkts
<a name="flink-autoscaler-config-vertex"></a>

Die Parallelität jedes Auftragsscheitelpunkts wird auf der Grundlage der Zielauslastung geändert und durch die Min./Max.-Parallelitätsgrenzen begrenzt. Es wird nicht empfohlen, die Zielauslastung auf nahezu 100 % festzulegen (d. h. den Wert 1), und die Nutzungsgrenze dient als Puffer, um die zwischenzeitlichen Lastschwankungen zu bewältigen.


| Konfigurationsschlüssel | Standardwert | Description | Beispielwerte | 
| --- | --- | --- | --- | 
| job.autoscaler.target.utilization | 0.7 | Nutzung des Zielscheitelpunkts. | 0 - 1 | 
| job.autoscaler.target.utilization.boundary | 0.4 | Nutzung des Zielscheitelpunkts. Die Skalierung wird nicht durchgeführt, wenn die aktuelle Verarbeitungsrate innerhalb von [target\$1rate / (target\$1utilization - boundary) und (target\$1rate / (target\$1utilization \$1 boundary)] liegt | 0 - 1 | 
| job.autoscaler.vertex.min-parallelism | 1 | Die Mindestparallelität, die der Autoscaler verwenden kann. | 0 - 200 | 
| job.autoscaler.vertex.max-parallelism | 200 | Die maximale Parallelität, die der Autoscaler verwenden kann. Der Autoscaler ignoriert dieses Limit, wenn es höher ist als die maximale Parallelität, die in der Flink-Konfiguration oder direkt für jeden Bediener konfiguriert wurde. | 0 - 200 | 

### Konfigurationen für die Backlog-Verarbeitung
<a name="flink-autoscaler-config-backlog"></a>

Der Auftragsscheitelpunkt benötigt zusätzliche Ressourcen, um die ausstehenden Ereignisse oder Rückstände zu bewältigen, die sich während des Skalierungsvorgangs ansammeln. Dies wird auch als die `catch-up`-Dauer bezeichnet. Wenn die Zeit für die Verarbeitung des Backlogs den konfigurierten `lag -threshold`-Wert überschreitet, steigt die Zielauslastung des Auftragsscheitelpunkts auf den Höchstwert. Auf diese Weise können unnötige Skalierungsvorgänge während der Backlog-Verarbeitung vermieden werden.


| Konfigurationsschlüssel | Standardwert | Description | Beispielwerte | 
| --- | --- | --- | --- | 
| job.autoscaler.backlog-processing.lag-threshold | 5m | Verzögerungsschwellenwert, der unnötige Skalierungen verhindert und gleichzeitig die ausstehenden Nachrichten entfernt, die für die Verzögerung verantwortlich sind. | 30 (Standardeinheit ist Millisekunden), 5m, 1h | 
| job.autoscaler.catch-up.duration | 15m | Die Zieldauer für die vollständige Backlog-Verarbeitung nach einem Skalierungsvorgang. Auf 0 setzen, um die Backlog-basierte Skalierung zu deaktivieren. | 30 (Standardeinheit ist Millisekunden), 5m, 1h | 

### Vorgangskonfigurationen skalieren
<a name="flink-autoscaler-config-scale"></a>

Autoscaler führt die Herunterskalierung innerhalb einer Kulanzzeit nicht unmittelbar nach einer Hochskalierung durch. Dies verhindert unnötige Zyklen von up-down-up-down Waagenoperationen, die durch vorübergehende Lastschwankungen verursacht werden. 

Wir können das Verhältnis der Herunterskalierung verwenden, um die Parallelität schrittweise zu verringern und Ressourcen freizusetzen, um vorübergehende Lastspitzen auszugleichen. Das trägt auch dazu bei, unnötige kleinere Hochskalierungsvorgänge nach einer umfangreichen Herunterskalierung zu vermeiden. 

Wir können einen ineffektiven Skalierungsvorgang auf der Grundlage vergangener Entscheidungen über die Skalierung von Auftragsscheitelpunkten erkennen, um weitere Änderungen der Parallelität zu verhindern.


| Konfigurationsschlüssel | Standardwert | Description | Beispielwerte | 
| --- | --- | --- | --- | 
| job.autoscaler.scale-up.grace-period | 1h | Dauer, in der ein Scheitelpunkt nicht herunterskaliert werden darf, nachdem er hochskaliert wurde. | 30 (Standardeinheit ist Millisekunden), 5m, 1h | 
| job.autoscaler.scale-down.max-factor | 0.6 | Maximaler Herunterskalierungsfaktor. Ein Wert von 1 bedeutet, dass es keine Begrenzung der Herunterskalierung gibt. 0.6 bedeutet, dass der Auftrag nur mit 60 % der ursprünglichen Parallelität herunterskaliert werden kann. | 0 - 1 | 
| job.autoscaler.scale-up.max-factor | 100000. | Maximales Verhältnis der Hochskalierung. Der Wert 2.0 bedeutet, dass der Auftrag nur mit 200 % der aktuellen Parallelität hochskaliert werden kann. | 0 - Integer.MAX\$1VALUE | 
| job.autoscaler.scaling.effectiveness.detection.enabled | false | Ob ineffektive Skalierungsvorgänge erkannt werden sollen und ob der Autoscaler weitere Hochskalierungen blockieren kann. | true, false | 

# Optimieren der Neustartzeiten von Aufträgen für die Aufgabenwiederherstellung und -skalierung
<a name="flink-restart"></a>

Wenn eine Aufgabe fehlschlägt oder wenn ein Skalierungsvorgang stattfindet, versucht Flink, die Aufgabe vom letzten abgeschlossenen Prüfpunkt aus erneut auszuführen. Die Ausführung des Neustartvorgangs kann eine Minute oder länger dauern, abhängig von der Größe des Prüfpunktzustands und der Anzahl der parallelen Aufgaben. Während des Neustarts können sich Backlog-Aufgaben für den Auftrag ansammeln. Es gibt jedoch einige Möglichkeiten, wie Flink die Geschwindigkeit der Wiederherstellung und des Neustarts von Ausführungsdiagrammen optimiert, um die Auftragsstabilität zu verbessern.

Auf dieser Seite werden einige der Möglichkeiten beschrieben, mit denen Amazon EMR Flink die Zeit für den Neustart des Auftrags während der Aufgabenwiederherstellung oder -skalierung verbessern kann.

**Topics**
+ [Aufgabenlokale Wiederherstellung](#flink-restart-task-local)
+ [Generischer protokollbasierter inkrementeller Prüfpunkt](#flink-restart-log-check)
+ [Differenzierte Wiederherstellung](#flink-restart-fine-grained)
+ [Kombinierter Neustartmechanismus im adaptiven Scheduler](#flink-restart-combined)

## Aufgabenlokale Wiederherstellung
<a name="flink-restart-task-local"></a>

**Anmerkung**  
Aufgabenlokale Wiederherstellung wird mit Amazon EMR 6.0.0 und höher unterstützt.

Mit Flink-Prüfpunkten erstellt jede Aufgabe einen Snapshot ihres Status, den Flink in verteilte Speicher wie Amazon S3 schreibt. Im Falle einer Wiederherstellung stellen die Aufgaben ihren Status aus dem verteilten Speicher wieder her. Der verteilte Speicher bietet Fehlertoleranz und kann den Status während der Neuskalierung neu verteilen, da er für alle Knoten zugänglich ist.

Ein verteilter Remote-Speicher hat jedoch auch einen Nachteil: Alle Aufgaben müssen ihren Status von einem entfernten Standort aus über das Netzwerk lesen. Dies kann bei der Aufgabenwiederherstellung oder bei Skalierungsvorgängen zu langen Wiederherstellungszeiten für große Zustände führen.

Dieses Problem der langen Wiederherstellungszeit wird durch eine *aufgabenlokale* Wiederherstellung gelöst. Aufgaben schreiben ihren Status am Prüfüunkt in einen sekundären Speicher, der sich lokal zur Aufgabe befindet, z. B. auf eine lokale Festplatte. Sie speichern ihren Status auch im Primärspeicher oder in unserem Fall in Amazon S3. Während der Wiederherstellung plant der Scheduler die Aufgaben in demselben Task-Manager, in dem die Aufgaben zuvor ausgeführt wurden, sodass sie aus dem lokalen Statusspeicher wiederhergestellt werden können, anstatt sie aus dem Remote-Statusspeicher zu lesen. Weitere Informationen finden Sie unter [Aufgabenlokale Wiederherstellung](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/large_state_tuning/#task-local-recovery) in der *Apache-Flink-Dokumentation*.

Unsere Benchmark-Tests mit Beispielaufträgen haben gezeigt, dass die Wiederherstellungszeit bei aktivierter aufgabenlokaler Wiederherstellung von Minuten auf wenige Sekunden reduziert wurde.

Um die aufgabenlokale Wiederherstellung zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer `flink-conf.yaml`-Datei fest. Geben Sie den Wert für das Prüfpunkt-Intervall in Millisekunden an.

```
    state.backend.local-recovery: true
    state.backend: hasmap or rocksdb
    state.checkpoints.dir: s3://storage-location-bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Generischer protokollbasierter inkrementeller Prüfpunkt
<a name="flink-restart-log-check"></a>

**Anmerkung**  
Generische protokollbasierte inkrementelle Prüfpunkte werden mit Amazon EMR 6.10.0 und höher unterstützt.

Generische protokollbasierte inkrementelle Prüfpunkte wurden in Flink 1.16 hinzugefügt, um die Geschwindigkeit von Prüfpunkten zu verbessern. Ein schnelleres Prüfpunktintervall führt häufig zu einer Reduzierung des Wiederherstellungsaufwands, da weniger Ereignisse nach der Wiederherstellung erneut verarbeitet werden müssen. Weitere Informationen finden Sie im *Apache-Flink-Blog* unter [Verbesserung der Geschwindigkeit und Stabilität von Prüfpunkten mit generischen protokollbasierten inkrementellen Prüfpunkten](https://flink.apache.org/2022/05/30/improving-speed-and-stability-of-checkpointing-with-generic-log-based-incremental-checkpoints/).

Unsere Benchmark-Tests haben anhand von Beispielaufträgen gezeigt, dass sich die Prüfpunktzeit mit dem generischen protokollbasierten inkrementellen Prüfpunkt von Minuten auf wenige Sekunden reduziert hat.

Um generische protokollbasierte inkrementelle Prüfpunkte zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer Datei `flink-conf.yaml` fest. Geben Sie den Wert für das Prüfpunkt-Intervall in Millisekunden an.

```
    state.backend.changelog.enabled: true 
    state.backend.changelog.storage: filesystem
    dstl.dfs.base-path: s3://bucket-path/changelog
    state.backend.local-recovery: true
    state.backend: rocksdb
    state.checkpoints.dir: s3://bucket-path/checkpoint
    execution.checkpointing.interval: 15000
```

## Differenzierte Wiederherstellung
<a name="flink-restart-fine-grained"></a>

**Anmerkung**  
Eine differenzierte Wiederherstellungsunterstützung für den Standard-Scheduler ist mit Amazon EMR 6.0.0 und höher verfügbar. Eine differenzierte Wiederherstellungsunterstützung im adaptiven Scheduler ist mit Amazon EMR 6.15.0 und höher verfügbar.

Wenn eine Aufgabe während der Ausführung fehlschlägt, setzt Flink das gesamte Ausführungsdiagramm zurück und löst eine vollständige Neuausführung ab dem letzten abgeschlossenen Prüfpunkt aus. Das ist teurer, als nur die fehlgeschlagenen Aufgaben erneut auszuführen. Bei einer differenzierten Wiederherstellung wird nur die mit der Pipeline verbundene Komponente der fehlgeschlagenen Aufgabe neu gestartet. Im folgenden Beispiel hat das Auftragsdiagramm 5 Scheitelpunkte (`A` bis `E`). Alle Verbindungen zwischen den Scheitelpunkten werden punktweise in Pipelines verlegt, und der Wert `parallelism.default` für den Auftrag ist auf `2` eingestellt. 

```
A → B → C → D → E
```

In diesem Beispiel werden insgesamt 10 Aufgaben ausgeführt. Die erste Pipeline (`a1` bis `e1`) wird in einem TaskManager (`TM1`) ausgeführt, während die zweite Pipeline (`a2` bis `e2`) in einem anderen TaskManager (`TM2`) ausgeführt wird.

```
a1 → b1 → c1 → d1 → e1
a2 → b2 → c2 → d2 → e2
```

Es gibt zwei Komponenten, die über eine Pipeline miteinander verbunden sind: `a1 → e1` und `a2 → e2`. Wenn entweder `TM1` oder `TM2` fehlschlägt, wirkt sich der Fehler nur auf die 5 Aufgaben in der Pipeline aus, in denen der TaskManager ausgeführt wurde. Bei der Neustartstrategie wird nur die betroffene Pipeline-Komponente gestartet. 

Eine differenzierte Wiederherstellung funktioniert nur mit perfekt parallelen Flink-Aufträgen. Sie wird nicht mit `keyBy()`- oder `redistribute()`-Vorgängen unterstützt. Weitere Informationen finden Sie unter [FLIP-1: Fine Grained Recovery from Task Failures](https://cwiki.apache.org/confluence/display/FLINK/FLIP-1%3A+Fine+Grained+Recovery+from+Task+Failures) (FLIP-1: Differenzierte Wiederherstellung nach Aufgabenfehlern) im Jira-Projekt *Flink Improvement Proposal*.

Um die differenzierte Wiederherstellung zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer `flink-conf.yaml`-Datei fest.

```
jobmanager.execution.failover-strategy: region 
restart-strategy: exponential-delay or fixed-delay
```

## Kombinierter Neustartmechanismus im adaptiven Scheduler
<a name="flink-restart-combined"></a>

**Anmerkung**  
Der kombinierte Neustartmechanismus im adaptiven Scheduler wird mit Amazon EMR 6.15.0 und höher unterstützt.

Der adaptive Scheduler kann die Parallelität des Auftrags auf der Grundlage der verfügbaren Slots anpassen. Er reduziert automatisch die Parallelität, wenn nicht genügend Slots für die konfigurierte Auftragsparallelität verfügbar sind. Wenn neue Slots verfügbar werden, wird der Auftrag wieder auf die konfigurierte Auftragsparallelität hochskaliert. Ein adaptiver Scheduler vermeidet Ausfallzeiten beim Auftrag, wenn nicht genügend Ressourcen verfügbar sind. Dies ist der unterstützte Scheduler für Flink Autoscaler. Aus diesen Gründen empfehlen wir den adaptiven Scheduler mit Amazon EMR Flink. Adaptive Scheduler können jedoch innerhalb kurzer Zeit mehrere Neustarts durchführen, und zwar einen Neustart für jede neu hinzugefügte Ressource. Dies könnte zu einem Leistungsabfall des Auftrags führen.

Mit Amazon EMR 6.15.0 und höher verfügt Flink über einen kombinierten Neustartmechanismus im adaptiven Scheduler, der ein Neustartfenster öffnet, wenn die erste Ressource hinzugefügt wird, und dann bis zum konfigurierten Fensterintervall von 1 Minute wartet. Er führt einen einzigen Neustart durch, wenn genügend Ressourcen zur Verfügung stehen, um den Auftrag mit konfigurierter Parallelität auszuführen, oder wenn das Intervall abgelaufen ist.

Unsere Benchmark-Tests haben anhand von Beispielaufträgen gezeigt, dass dieses Feature 10 % mehr Datensätze verarbeitet als das Standardverhalten, wenn Sie den adaptiven Scheduler und Flink Autoscaler verwenden.

Um den kombinierten Neustartmechanismus zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer Datei `flink-conf.yaml` fest.

```
jobmanager.adaptive-scheduler.combined-restart.enabled: true 
jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m
```

# Arbeiten mit Flink-Aufträgen von Zeppelin in Amazon EMR
<a name="flink-zeppelin"></a>

## Einführung
<a name="flink-zeppelin-intro"></a>

Amazon-EMR-Versionen 6.10.0 und höher unterstützen die [Apache Zeppelin](emr-zeppelin.md)-Integration mit Apache Flink. Sie können Flink-Aufträge interaktiv über Zeppelin-Notebooks einreichen. Mit dem Flink-Interpreter können Sie Flink-Abfragen ausführen, Flink-Streaming- und Batch-Aufträge definieren und die Ausgabe in Zeppelin-Notebooks visualisieren. Der Flink-Interpreter basiert auf der Flink-REST-API. Auf diese Weise können Sie von der Zeppelin-Umgebung aus auf Flink-Aufträge zugreifen und diese bearbeiten, um eine Datenverarbeitung und -analyse in Echtzeit durchzuführen.

In Flink Interpreter gibt es vier Unterinterpreter. Sie dienen unterschiedlichen Zwecken, befinden sich aber alle in der JVM und teilen sich dieselben vorkonfigurierten Einstiegspunkte zu Flink (`ExecutionEnviroment`, `StreamExecutionEnvironment`, `BatchTableEnvironment`, `StreamTableEnvironment`). Die Interpreter sind wie folgt:
+ `%flink` – Erzeugt `ExecutionEnvironment`, `StreamExecutionEnvironment`, `BatchTableEnvironment`, `StreamTableEnvironment` und stellt eine Scala-Umgebung bereit
+ `%flink.pyflink` – Stellt eine Python-Umgebung bereit
+ `%flink.ssql` – Stellt eine Streaming-SQL-Umgebung bereit
+ `%flink.bsql` – Stellt eine Batch-SQL-Umgebung bereit

## Voraussetzungen
<a name="flink-zeppelin-prerequisites"></a>
+ Die Zeppelin-Integration mit Flink wird für Cluster unterstützt, die mit Amazon EMR 6.10.0 und höher erstellt wurden.
+ Um Webschnittstellen, die auf EMR-Clustern gehostet werden, wie für diese Schritte erforderlich, anzuzeigen, müssen Sie einen SSH-Tunnel konfigurieren, der eingehenden Zugriff ermöglicht. Weitere Informationen finden Sie unter [Konfigurieren von Proxy-Einstellungen, um auf dem Primärknoten gehostete Websites anzeigen zu lassen](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-proxy.html).

## Zeppelin-Flink auf einem EMR-Cluster konfigurieren
<a name="flink-zeppelin-configure"></a>

Gehen Sie wie folgt vor, um Apache Flink auf Apache Zeppelin für die Ausführung auf einem EMR-Cluster zu konfigurieren:

1. Erstellen Sie einen neuen Cluster von der Amazon-EMR-Konsole aus. Wählen Sie emr-6.10.0 oder höher für die Amazon-EMR-Version aus. Wählen Sie dann, ob Sie Ihr Anwendungspaket mit der Option Benutzerdefiniert anpassen möchten. Nehmen Sie mindestens Flink, Hadoop und Zeppelin in Ihr Paket auf.  
![\[Passen Sie in der Amazon-EMR-Konsole Ihr Anwendungspaket mit der Option Benutzerdefiniert an. Mindestens Flink, Hadoop und Zeppelin in Ihr Paket aufnehmen\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/emr-flink-zeppelin-console.png)

1. Erstellen Sie den Rest Ihres Clusters mit den Einstellungen, die Sie bevorzugen.

1. Sobald Ihr Cluster läuft, wählen Sie den Cluster in der Konsole aus, um seine Details anzuzeigen, und öffnen Sie die Registerkarte Anwendungen. Wählen Sie Zeppelin im Bereich Benutzeroberflächen für Anwendungen aus, um die Zeppelin-Weboberfläche zu öffnen. Stellen Sie sicher, dass Sie den Zugriff auf die Zeppelin-Weboberfläche mit einem SSH-Tunnel zum Primärknoten und einer Proxyverbindung eingerichtet haben, wie in [Voraussetzungen](#flink-zeppelin-prerequisites) beschrieben.  
![\[Auf der Zeppelin-Weboberfläche können Sie neue Notebooks importieren und erstellen.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/welcome-to-zeppelin.png)

1. Jetzt können Sie eine neue Notiz in einem Zeppelin-Notebook mit Flink als Standardinterpreter erstellen.  
![\[Sie können eine neue Notiz in einem Zeppelin-Notebook mit Flink als Standardinterpreter erstellen.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/emr-flink-zeppelin-create-notebook.png)

1. In den folgenden Codebeispielen wird veranschaulicht, wie Flink-Jobs von einem Zeppelin-Notebook aus ausgeführt werden.

## Führen Sie Flink-Aufträge mit Zeppelin-Flink auf einem EMR-Cluster aus
<a name="flink-zeppelin-run-jobs"></a>
+ Beispiel 1, Flink Scala

  a) WordCount Batch-Beispiel (SCALA)

  ```
  %flink
  
  val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
  data.flatMap(line => line.split("\\s"))
               .map(w => (w, 1))
               .groupBy(0)
               .sum(1)
               .print()
  ```

  b) WordCount Streaming-Beispiel (SCALA)

  ```
  %flink
  
  val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
  data.flatMap(line => line.split("\\s"))
    .map(w => (w, 1))
    .keyBy(0)
    .sum(1)
    .print
  
  senv.execute()
  ```  
![\[Sie können beispielsweise Batch WordCount - und WordCount Streaming-Jobs von einem Zeppelin-Notebook aus ausführen.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/streaming-wordcount-example.png)
+ Beispiel 2, Flink Streaming SQL

  ```
  %flink.ssql
  SET 'sql-client.execution.result-mode' = 'tableau';
  SET 'table.dml-sync' = 'true';
  SET 'execution.runtime-mode' = 'streaming';
  
  create table dummy_table (
    id int,
    data string
  ) with (
    'connector' = 'filesystem',
    'path' = 's3://s3-bucket/dummy_table',
    'format' = 'csv'
  );
  
  INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
  
  SELECT * FROM dummy_table;
  ```  
![\[Dieses Beispiel zeigt, wie ein Flink-Streaming-SQL-Auftrag ausgeführt wird.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/flink-streaming-sql.png)
+ Beispiel 3, Pyflink. Beachten Sie, dass Sie Ihre eigene Beispieltextdatei mit dem Namen `word.txt` in Ihren S3-Bucket hochladen müssen.

  ```
  %flink.pyflink
  
  import argparse
  import logging
  import sys
  
  from pyflink.common import Row
  from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                             DataTypes, FormatDescriptor)
  from pyflink.table.expressions import lit, col
  from pyflink.table.udf import udtf
  
  def word_count(input_path, output_path):
      t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
      # write all the data to one file
      t_env.get_config().set("parallelism.default", "1")
  
      # define the source
      if input_path is not None:
          t_env.create_temporary_table(
              'source',
              TableDescriptor.for_connector('filesystem')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .build())
                             .option('path', input_path)
                             .format('csv')
                             .build())
          tab = t_env.from_path('source')
      else:
          print("Executing word_count example with default input data set.")
          print("Use --input to specify file input.")
          tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                    DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))
  
      # define the sink
      if output_path is not None:
          t_env.create_temporary_table(
              'sink',
              TableDescriptor.for_connector('filesystem')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .column('count', DataTypes.BIGINT())
                                     .build())
                             .option('path', output_path)
                             .format(FormatDescriptor.for_format('canal-json')
                                     .build())
                             .build())
      else:
          print("Printing result to stdout. Use --output to specify output path.")
          t_env.create_temporary_table(
              'sink',
              TableDescriptor.for_connector('print')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .column('count', DataTypes.BIGINT())
                                     .build())
                             .build())
  
      @udtf(result_types=[DataTypes.STRING()])
      def split(line: Row):
          for s in line[0].split():
              yield Row(s)
  
      # compute word count
      tab.flat_map(split).alias('word') \
         .group_by(col('word')) \
         .select(col('word'), lit(1).count) \
         .execute_insert('sink') \
         .wait()
  
  
  logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
  
  
  word_count("s3://s3_bucket/word.txt", "s3://s3_bucket/demo_output.txt")
  ```

1. Wählen Sie **FLINK JOB** in der Zeppelin-Benutzeroberfläche, um auf die Flink-Web-UI zuzugreifen und diese anzusehen.  
![\[Flink code snippet for word count with output showing counts for "hello", "flink", "hadoop", and "world".\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/batch-wordcount-example.png)

1. Wenn Sie **FLINK JOB** wählen, gelangen Sie zur Flink Web Console in einer anderen Registerkarte Ihres Browsers.  
![\[Wenn Sie FLINK JOB wählen, wird die Flink Web Console in einer anderen Registerkarte Ihres Browsers geöffnet.\]](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/flink-web-console.png)

# Versionsverlauf von Flink
<a name="Flink-release-history"></a>

In der folgenden Tabelle sind die Version von Flink aufgeführt, die in jeder Release-Version von Amazon EMR enthalten ist, zusammen mit den Komponenten, die mit der Anwendung installiert wurden. Informationen zu den Komponentenversionen in den einzelnen Versionen finden Sie im Abschnitt Komponentenversion für Ihre Version in [Amazon-EMR-7.x-Versionen](emr-release-7x.md), [Amazon-EMR-6.x-Versionen](emr-release-6x.md) oder [Amazon-EMR-5.x-Versionen](emr-release-5x.md).


**Die Flink-Versionsinformationen**  

| Amazon-EMR-Versionsbezeichnung | Flink-Version | Mit Flink installierte Komponenten | 
| --- | --- | --- | 
| emr-7.12.0 | 1,20,0-amzn-6 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-hdfs-zkfc, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.11.0 | 1,20,0-amzn-5 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-hdfs-zkfc, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.10.0 | 1,20,0-amzn-4 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.9.0 | 1,20,0-amzn-3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.8.0 | 1,20,0-amzn-2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.7.0 | 1,20,0-amzn-1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.6.0 | 1,20,0-amzn-0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.5.0 | 1.19.1-amzn-1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.4.0 | 1.19.1-amzn-0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.3.0 | 1.18.1-amzn-2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.2.0 | 1.18.1-amzn-1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-5.36,2 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-7.1.0 | 1.18.1-amzn-0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta | 
| emr-7.0.0 | 1.18.0-amzn-0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.15.0 | 1.17.1-amzn-1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.14.0 | 1.17.1-amzn-0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.13.0 | 1.17.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.12.0 | 1.17.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.11.1 | 1.16.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.11.0 | 1.16.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi, delta-standalone-connectors | 
| emr-6.10.1 | 1.16.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.10.0 | 1.16.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.9.1 | 1.15.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.9.0 | 1.15.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.8.1 | 1.15.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.8.0 | 1.15.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.7.0 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-5.36.1 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.36.0 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.6.0 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-5.35.0 | 1.14.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.5.0 | 1.14.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.4.0 | 1.13.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config, hudi | 
| emr-6.3.1 | 1.12.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.3.0 | 1.12.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.2.1 | 1.11.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.2.0 | 1.11.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-6.1.1 | 1.11.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-6.1.0 | 1.11.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.34.0 | 1.13.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.33.1 | 1.12.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.33.0 | 1.12.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.32.1 | 1.11.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.32.0 | 1.11.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.31.1 | 1.11.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.31.0 | 1.11.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client, flink-jobmanager-config | 
| emr-5.30.2 | 1.10.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.30.1 | 1.10.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.30.0 | 1.10.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.29.0 | 1.9.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.28.1 | 1.9.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.28.0 | 1.9.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.27.1 | 1.8.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.27.0 | 1.8.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.26.0 | 1.8.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.25.0 | 1.8.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.24.1 | 1.8.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.24.0 | 1.8.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.23.1 | 1.7.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.23.0 | 1.7.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.22.0 | 1.7.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.21.2 | 1.7.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.21.1 | 1.7.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.21.0 | 1.7.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.20.1 | 1.6.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.20.0 | 1.6.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.19.1 | 1.6.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.19.0 | 1.6.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.18.1 | 1.6.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.18.0 | 1.6.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.17.2 | 1.5.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.17.1 | 1.5.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.17.0 | 1.5.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.16.1 | 1.5.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.16.0 | 1.5.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.15.1 | 1.4.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.15.0 | 1.4.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.14.2 | 1.4.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.14.1 | 1.4.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.14.0 | 1.4.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.13.1 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.13.0 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.12.3 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.12.2 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.12.1 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.12.0 | 1.4.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.11.4 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.11.3 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.11.2 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.11.1 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.11.0 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.10.1 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.10.0 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.9.1 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.9.0 | 1.3.2 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.8.3 | 1.3.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.8.2 | 1.3.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.8.1 | 1.3.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.8.0 | 1.3.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.7.1 | 1.3.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.7.0 | 1.3.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.6.1 | 1.2.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.6.0 | 1.2.1 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client | 
| emr-5.5.4 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.5.3 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.5.2 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.5.1 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.5.0 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.4.1 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.4.0 | 1.2.0 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.3.2 | 1.1.4 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.3.1 | 1.1.4 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.3.0 | 1.1.4 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.2.3 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.2.2 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.2.1 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.2.0 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.1.1 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 
| emr-5.1.0 | 1.1.3 | emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, flink-client | 

# Versionshinweise zu Flink nach Version
<a name="Flink-release-history-versions"></a>

Die vollständigen Versionshinweise finden Sie in den folgenden Abschnitten.

# Amazon EMR 7.10.0 — Versionshinweise zu Flink
<a name="Flink-release-history-7100"></a>

**Amazon EMR 7.10.0 — Änderungen bei Flink**


| Typ | Description | 
| --- | --- | 
|  Neues Feature  |  Ab Amazon EMR Version 7.10.0 können Sie Kafka- und Kinesis Flink-Konnektoren mithilfe der Konfigurationseinstellungen einfacher aktivieren. Fügen Sie der `flink-conf` Klassifizierung während der `kinesis.enabled: true` Clustererstellung entweder `kafka.enabled: true` oder hinzu, um den entsprechenden Konnektor automatisch zu konfigurieren. Dieser optimierte Ansatz macht die manuellen Konfigurationsschritte überflüssig, die zuvor erforderlich waren.  | 

# Amazon EMR 7.9.0 — Versionshinweise zu Flink
<a name="Flink-release-history-790"></a>

**Amazon EMR 7.9.0 — Änderungen bei Flink**


| Typ | Description | 
| --- | --- | 
|  Neues Feature  |  Ab Amazon EMR 7.9.0 out-of-the-box unterstützt Apache Flink die Dateiformate Avro, Parquet und ORC. Sie können diese Formate direkt mit jeder Flink-API (DataStream, Tabelle oder SQL) verwenden, ohne dass eine zusätzliche Konfiguration erforderlich ist.  | 
|  Neues Feature  |  Ab Amazon EMR Version 7.9.0 können Sie Hive Metastore- oder AWS Glue-Datenkataloge mithilfe der Konfigurationseinstellungen einfacher aktivieren. Fügen Sie der `flink-conf` Klassifizierung während der Cluster-Erstellung entweder `hive.enabled: true` oder `glue.enabled: true` hinzu, um den jeweiligen Datenkatalog automatisch zu konfigurieren. Dieser optimierte Ansatz macht die manuellen Konfigurationsschritte überflüssig, die zuvor erforderlich waren.  | 

# Amazon EMR 7.8.0 — Versionshinweise zu Flink
<a name="Flink-release-history-780"></a>

**Konfiguration** — EMR Flink funktioniert sofort mit S3A in allen Regionen/Partitionen. AWS 

# Amazon EMR 7.7.0 — Versionshinweise zu Flink
<a name="Flink-release-history-770"></a>
+ Die Flink SQL-Shell kann einfach mit dem Befehl aufgerufen werden, der mit einem Symlink verknüpft ist `flink-sql-client` `/usr/lib/flink/bin/sql-client.sh`

# Amazon EMR 7.6.0 — Versionshinweise zu Flink
<a name="Flink-release-history-760"></a>

## Amazon EMR 7.6.0 — Funktionen von Flink
<a name="Flink-release-history-760-features"></a>
+ Keine Änderungen für die Veröffentlichung.

# Amazon EMR 7.5.0 — Versionshinweise zu Flink
<a name="Flink-release-history-750"></a>


| Typ | Description | 
| --- | --- | 
|  Feature  |  Unterstützung für die Ausführung von Flink-Jobs mit einem Remote-JAR wurde hinzugefügt.  | 
|  Verbesserung  |  Machen Sie den Ausschluss und die Inklusion von Vertex-Threads sicher.  | 

## Amazon EMR 7.5.0 — Funktionen von Flink
<a name="Flink-release-history-750-features"></a>
+ Ab Amazon EMR 7.5.0 können Sie einen Amazon S3 S3-Speicherort als JAR-Pfad angeben, wenn Sie die Befehle `run` und `run-application` Apache Flink CLI verwenden. Wenn Sie einen S3-Pfad angeben, lädt EMR die JAR-Datei automatisch von Amazon S3 in den EBS-Speicher des Clusters herunter. Jedes Mal, wenn Sie dieselbe JAR-Datei angeben, lädt EMR die neueste Version von Amazon S3 herunter, anstatt die bestehende JAR-Datei auf dem Cluster wiederzuverwenden. 
+ Ab Amazon EMR 7.5.0 können Kunden den Remote-Pfad (einen S3-Standort) als JAR-Pfad mit `run` und `run-application` Flink-CLI-Befehlen übergeben. Die JAR wird dann automatisch vom S3-Speicher in den EBS-Speicher des Clusters abgerufen. Wenn dieselbe JAR erneut bereitgestellt wird, wird die neueste Version von S3 heruntergeladen und die vorhandene JAR auf dem Cluster nicht wiederverwendet.

# Amazon EMR 7.4.0 — Versionshinweise zu Flink
<a name="Flink-release-history-740"></a>


| Typ | Description | 
| --- | --- | 
|  Upgrade  |  Die Flink-Version wurde auf 1.19.1 aktualisiert.  | 

# Amazon EMR 7.3.0 — Versionshinweise zu Flink
<a name="Flink-release-history-730"></a>
+ Standardmäßig verwenden Cluster, bei denen die Verschlüsselung während der Übertragung durch die Sicherheitskonfiguration aktiviert ist, TLS 1.3 für die interne Kommunikation zwischen Flink-Prozessen, dem Job Manager-REST-Endpunkt und dem Flink Job History Server.

# Amazon EMR 7.2.0 — Versionshinweise zu Flink
<a name="Flink-release-history-720"></a>


| Typ | Description | 
| --- | --- | 
|  Verbesserung  |  Support das Hinzufügen von benutzerdefinierten Labels zum Kubernetes-Dienst pro Flink-Job über die Konfiguration. `kubernetes.service.labels`  | 