

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Arbeiten mit Flink-Aufträgen von Zeppelin in Amazon EMR
<a name="flink-zeppelin"></a>

## Einführung
<a name="flink-zeppelin-intro"></a>

Amazon-EMR-Versionen 6.10.0 und höher unterstützen die [Apache Zeppelin](emr-zeppelin.md)-Integration mit Apache Flink. Sie können Flink-Aufträge interaktiv über Zeppelin-Notebooks einreichen. Mit dem Flink-Interpreter können Sie Flink-Abfragen ausführen, Flink-Streaming- und Batch-Aufträge definieren und die Ausgabe in Zeppelin-Notebooks visualisieren. Der Flink-Interpreter basiert auf der Flink-REST-API. Auf diese Weise können Sie von der Zeppelin-Umgebung aus auf Flink-Aufträge zugreifen und diese bearbeiten, um eine Datenverarbeitung und -analyse in Echtzeit durchzuführen.

In Flink Interpreter gibt es vier Unterinterpreter. Sie dienen unterschiedlichen Zwecken, befinden sich aber alle in der JVM und teilen sich dieselben vorkonfigurierten Einstiegspunkte zu Flink (`ExecutionEnviroment`, `StreamExecutionEnvironment`, `BatchTableEnvironment`, `StreamTableEnvironment`). Die Interpreter sind wie folgt:
+ `%flink` – Erzeugt `ExecutionEnvironment`, `StreamExecutionEnvironment`, `BatchTableEnvironment`, `StreamTableEnvironment` und stellt eine Scala-Umgebung bereit
+ `%flink.pyflink` – Stellt eine Python-Umgebung bereit
+ `%flink.ssql` – Stellt eine Streaming-SQL-Umgebung bereit
+ `%flink.bsql` – Stellt eine Batch-SQL-Umgebung bereit

## Voraussetzungen
<a name="flink-zeppelin-prerequisites"></a>
+ Die Zeppelin-Integration mit Flink wird für Cluster unterstützt, die mit Amazon EMR 6.10.0 und höher erstellt wurden.
+ Um Webschnittstellen, die auf EMR-Clustern gehostet werden, wie für diese Schritte erforderlich, anzuzeigen, müssen Sie einen SSH-Tunnel konfigurieren, der eingehenden Zugriff ermöglicht. Weitere Informationen finden Sie unter [Konfigurieren von Proxy-Einstellungen, um auf dem Primärknoten gehostete Websites anzeigen zu lassen](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-connect-master-node-proxy.html).

## Zeppelin-Flink auf einem EMR-Cluster konfigurieren
<a name="flink-zeppelin-configure"></a>

Gehen Sie wie folgt vor, um Apache Flink auf Apache Zeppelin für die Ausführung auf einem EMR-Cluster zu konfigurieren:

1. Erstellen Sie einen neuen Cluster von der Amazon-EMR-Konsole aus. Wählen Sie emr-6.10.0 oder höher für die Amazon-EMR-Version aus. Wählen Sie dann, ob Sie Ihr Anwendungspaket mit der Option Benutzerdefiniert anpassen möchten. Nehmen Sie mindestens Flink, Hadoop und Zeppelin in Ihr Paket auf.  
![Passen Sie in der Amazon-EMR-Konsole Ihr Anwendungspaket mit der Option Benutzerdefiniert an. Mindestens Flink, Hadoop und Zeppelin in Ihr Paket aufnehmen](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/emr-flink-zeppelin-console.png)

1. Erstellen Sie den Rest Ihres Clusters mit den Einstellungen, die Sie bevorzugen.

1. Sobald Ihr Cluster läuft, wählen Sie den Cluster in der Konsole aus, um seine Details anzuzeigen, und öffnen Sie die Registerkarte Anwendungen. Wählen Sie Zeppelin im Bereich Benutzeroberflächen für Anwendungen aus, um die Zeppelin-Weboberfläche zu öffnen. Stellen Sie sicher, dass Sie den Zugriff auf die Zeppelin-Weboberfläche mit einem SSH-Tunnel zum Primärknoten und einer Proxyverbindung eingerichtet haben, wie in [Voraussetzungen](#flink-zeppelin-prerequisites) beschrieben.  
![Auf der Zeppelin-Weboberfläche können Sie neue Notebooks importieren und erstellen.](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/welcome-to-zeppelin.png)

1. Jetzt können Sie eine neue Notiz in einem Zeppelin-Notebook mit Flink als Standardinterpreter erstellen.  
![Sie können eine neue Notiz in einem Zeppelin-Notebook mit Flink als Standardinterpreter erstellen.](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/emr-flink-zeppelin-create-notebook.png)

1. In den folgenden Codebeispielen wird veranschaulicht, wie Flink-Jobs von einem Zeppelin-Notebook aus ausgeführt werden.

## Führen Sie Flink-Aufträge mit Zeppelin-Flink auf einem EMR-Cluster aus
<a name="flink-zeppelin-run-jobs"></a>
+ Beispiel 1, Flink Scala

  a) WordCount Batch-Beispiel (SCALA)

  ```
  %flink
  
  val data = benv.fromElements("hello world", "hello flink", "hello hadoop")
  data.flatMap(line => line.split("\\s"))
               .map(w => (w, 1))
               .groupBy(0)
               .sum(1)
               .print()
  ```

  b) WordCount Streaming-Beispiel (SCALA)

  ```
  %flink
  
  val data = senv.fromElements("hello world", "hello flink", "hello hadoop")
  data.flatMap(line => line.split("\\s"))
    .map(w => (w, 1))
    .keyBy(0)
    .sum(1)
    .print
  
  senv.execute()
  ```  
![Sie können beispielsweise Batch WordCount - und WordCount Streaming-Jobs von einem Zeppelin-Notebook aus ausführen.](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/streaming-wordcount-example.png)
+ Beispiel 2, Flink Streaming SQL

  ```
  %flink.ssql
  SET 'sql-client.execution.result-mode' = 'tableau';
  SET 'table.dml-sync' = 'true';
  SET 'execution.runtime-mode' = 'streaming';
  
  create table dummy_table (
    id int,
    data string
  ) with (
    'connector' = 'filesystem',
    'path' = 's3://{{s3-bucket}}/dummy_table',
    'format' = 'csv'
  );
  
  INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE'));
  
  SELECT * FROM dummy_table;
  ```  
![Dieses Beispiel zeigt, wie ein Flink-Streaming-SQL-Auftrag ausgeführt wird.](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/flink-streaming-sql.png)
+ Beispiel 3, Pyflink. Beachten Sie, dass Sie Ihre eigene Beispieltextdatei mit dem Namen `word.txt` in Ihren S3-Bucket hochladen müssen.

  ```
  %flink.pyflink
  
  import argparse
  import logging
  import sys
  
  from pyflink.common import Row
  from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                             DataTypes, FormatDescriptor)
  from pyflink.table.expressions import lit, col
  from pyflink.table.udf import udtf
  
  def word_count(input_path, output_path):
      t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
      # write all the data to one file
      t_env.get_config().set("parallelism.default", "1")
  
      # define the source
      if input_path is not None:
          t_env.create_temporary_table(
              'source',
              TableDescriptor.for_connector('filesystem')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .build())
                             .option('path', input_path)
                             .format('csv')
                             .build())
          tab = t_env.from_path('source')
      else:
          print("Executing word_count example with default input data set.")
          print("Use --input to specify file input.")
          tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
                                    DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))
  
      # define the sink
      if output_path is not None:
          t_env.create_temporary_table(
              'sink',
              TableDescriptor.for_connector('filesystem')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .column('count', DataTypes.BIGINT())
                                     .build())
                             .option('path', output_path)
                             .format(FormatDescriptor.for_format('canal-json')
                                     .build())
                             .build())
      else:
          print("Printing result to stdout. Use --output to specify output path.")
          t_env.create_temporary_table(
              'sink',
              TableDescriptor.for_connector('print')
                             .schema(Schema.new_builder()
                                     .column('word', DataTypes.STRING())
                                     .column('count', DataTypes.BIGINT())
                                     .build())
                             .build())
  
      @udtf(result_types=[DataTypes.STRING()])
      def split(line: Row):
          for s in line[0].split():
              yield Row(s)
  
      # compute word count
      tab.flat_map(split).alias('word') \
         .group_by(col('word')) \
         .select(col('word'), lit(1).count) \
         .execute_insert('sink') \
         .wait()
  
  
  logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
  
  
  word_count("s3://{{s3_bucket}}/word.txt", "s3://{{s3_bucket}}/demo_output.txt")
  ```

1. Wählen Sie **FLINK JOB** in der Zeppelin-Benutzeroberfläche, um auf die Flink-Web-UI zuzugreifen und diese anzusehen.  
![Die Schaltfläche FLINK JOB ist in der Werkzeugleiste der Zeppelin-Notebook-Oberfläche hervorgehoben.](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/batch-wordcount-example.png)

1. Wenn Sie **FLINK JOB** wählen, gelangen Sie zur Flink Web Console in einer anderen Registerkarte Ihres Browsers.  
![Wenn Sie FLINK JOB wählen, wird die Flink Web Console in einer anderen Registerkarte Ihres Browsers geöffnet.](http://docs.aws.amazon.com/de_de/emr/latest/ReleaseGuide/images/flink-web-console.png)