

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Erstellen Sie eine Managed Service for Apache Flink-Anwendung und führen Sie sie aus
<a name="gs-table-create"></a>

In dieser Übung erstellen Sie eine Managed Service für Apache Flink-Anwendung mit Kinesis-Datenströmen als Quelle und Senke.

**Topics**
+ [Erstellen Sie abhängige Ressourcen](#gs-table-resources)
+ [Einrichten der lokalen Entwicklungsumgebung](#gs-table-2)
+ [Laden Sie den Apache Flink-Streaming-Java-Code herunter und untersuchen Sie ihn](#gs-table-5)
+ [Führen Sie Ihre Anwendung lokal aus](#gs-table-run-locally)
+ [Beobachten Sie, wie die Anwendung Daten in einen S3-Bucket schreibt](#gs-table-input-output)
+ [Stoppen Sie, dass Ihre Anwendung lokal ausgeführt wird](#gs-table-stop)
+ [Kompilieren und verpacken Sie Ihren Anwendungscode](#gs-table-5.5)
+ [Laden Sie die JAR-Datei mit dem Anwendungscode hoch](#gs-table-6)
+ [Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink](#gs-table-7)

## Erstellen Sie abhängige Ressourcen
<a name="gs-table-resources"></a>

Bevor Sie für diese Übung einen Managed Service für Apache Flink erstellen, erstellen Sie die folgenden abhängigen Ressourcen: 
+ Ein Amazon S3 S3-Bucket zum Speichern des Anwendungscodes und zum Schreiben der Anwendungsausgabe.
**Anmerkung**  
In diesem Tutorial wird davon ausgegangen, dass Sie Ihre Anwendung in der Region us-east-1 bereitstellen. Wenn Sie eine andere Region verwenden, müssen Sie alle Schritte entsprechend anpassen.

### Erstellen Sie einen Amazon S3 S3-Bucket
<a name="gs-table-resources-s3"></a>

Sie können ein Amazon-S3-Bucket mithilfe der Konsole erstellen. Anweisungen zum Erstellen dieser Ressource finden Sie in den folgenden Themen:
+ [Wie erstelle ich einen S3-Bucket?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) im *Amazon Simple Storage Service-Benutzerhandbuch*. Geben Sie dem Amazon S3 S3-Bucket einen weltweit eindeutigen Namen, indem Sie Ihren Anmeldenamen anhängen.
**Anmerkung**  
Stellen Sie sicher, dass Sie den Bucket in der Region erstellen, die Sie für dieses Tutorial verwenden. Die Standardeinstellung für das Tutorial ist us-east-1.

### Sonstige Ressourcen
<a name="gs-table-resources-cw"></a>

Wenn Sie Ihre Anwendung erstellen, erstellt Managed Service for Apache Flink die folgenden CloudWatch Amazon-Ressourcen, sofern sie noch nicht vorhanden sind:
+ Eine Protokollgruppe namens `/AWS/KinesisAnalytics-java/<my-application>`.
+ Einen Protokollstream mit dem Namen `kinesis-analytics-log-stream`.

## Einrichten der lokalen Entwicklungsumgebung
<a name="gs-table-2"></a>

Für die Entwicklung und das Debuggen können Sie die Apache Flink-Anwendung auf Ihrem Computer direkt von der IDE Ihrer Wahl aus ausführen. Alle Apache Flink-Abhängigkeiten werden mit Maven als normale Java-Abhängigkeiten behandelt.

**Anmerkung**  
Auf Ihrem Entwicklungscomputer müssen Sie Java JDK 11, Maven und Git installiert haben. Wir empfehlen Ihnen, eine Entwicklungsumgebung wie [Eclipse, Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) oder [IntelliJ](https://www.jetbrains.com/idea/) IDEA zu verwenden. Um zu überprüfen, ob Sie alle Voraussetzungen erfüllen, finden Sie unter. [Erfüllen Sie die Voraussetzungen für das Abschließen der Übungen](getting-started.md#setting-up-prerequisites) Sie müssen **keinen** Apache Flink-Cluster auf Ihrem Computer installieren. 

### Authentifizieren Sie Ihre Sitzung AWS
<a name="get-started-exercise-table-authenticate"></a>

Die Anwendung verwendet Kinesis-Datenströme, um Daten zu veröffentlichen. Bei der lokalen Ausführung benötigen Sie eine gültige AWS authentifizierte Sitzung mit Schreibberechtigungen in den Kinesis-Datenstrom. Verwenden Sie die folgenden Schritte, um Ihre Sitzung zu authentifizieren:

1. Wenn Sie das Profil AWS CLI und ein benanntes Profil mit gültigen Anmeldeinformationen nicht konfiguriert haben, finden Sie weitere Informationen unter. [Richten Sie das AWS Command Line Interface () ein AWS CLI](setup-awscli.md)

1. Wenn Ihre IDE über ein Plug-in zur Integration verfügt AWS, können Sie dieses verwenden, um die Anmeldeinformationen an die Anwendung zu übergeben, die in der IDE ausgeführt wird. Weitere Informationen finden Sie unter [AWS Toolkit für IntelliJ IDEA](https://aws.amazon.com/intellij/) und [AWS Toolkit zum Kompilieren der Anwendung oder zum Ausführen](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html) von Eclipse.

## Laden Sie den Apache Flink-Streaming-Java-Code herunter und untersuchen Sie ihn
<a name="gs-table-5"></a>

Der Anwendungscode für dieses Beispiel ist verfügbar unter GitHub. 

**So laden Sie den Java-Anwendungscode herunter**

1. Klonen Sie das Remote-Repository, indem Sie den folgenden Befehl verwenden:

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. Navigieren Sie zum `./java/GettingStartedTable` Verzeichnis .

### Überprüfen Sie die Anwendungskomponenten
<a name="get-started-exercise-table-components"></a>

Die Anwendung ist vollständig in der `com.amazonaws.services.msf.BasicTableJob` Klasse implementiert. Die `main()` Methode definiert Quellen, Transformationen und Senken. Die Ausführung wird durch eine Ausführungsanweisung am Ende dieser Methode eingeleitet.

**Anmerkung**  
Für ein optimales Entwicklererlebnis ist die Anwendung so konzipiert, dass sie ohne Codeänderungen sowohl auf Amazon Managed Service für Apache Flink als auch lokal für die Entwicklung in Ihrer IDE ausgeführt werden kann.
+ Um die Laufzeitkonfiguration so zu lesen, dass sie funktioniert, wenn sie in Amazon Managed Service for Apache Flink und in Ihrer IDE ausgeführt wird, erkennt die Anwendung automatisch, ob sie lokal in der IDE eigenständig ausgeführt wird. In diesem Fall lädt die Anwendung die Laufzeitkonfiguration anders:

  1. Wenn die Anwendung feststellt, dass sie in Ihrer IDE im Standalone-Modus ausgeführt wird, erstellen Sie die `application_properties.json` Datei, die im **Ressourcenordner** des Projekts enthalten ist. Der Inhalt der Datei folgt.

  1. Wenn die Anwendung in Amazon Managed Service für Apache Flink ausgeführt wird, lädt das Standardverhalten die Anwendungskonfiguration aus den Laufzeiteigenschaften, die Sie in der Anwendung Amazon Managed Service für Apache Flink definieren. Siehe [Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink](get-started-exercise.md#get-started-exercise-7).

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ Die `main()` Methode definiert den Anwendungsdatenfluss und führt ihn aus. 
  + Initialisiert die Standard-Streaming-Umgebungen. In diesem Beispiel zeigen wir, wie sowohl die `StreamExecutionEnvironment` zur Verwendung mit der DataStream API als auch die `StreamTableEnvironment` zur Verwendung mit SQL und der Tabellen-API erstellt werden. Die beiden Umgebungsobjekte sind zwei separate Verweise auf dieselbe Laufzeitumgebung, um unterschiedliche APIs zu verwenden.

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    ```
  + Laden Sie die Konfigurationsparameter der Anwendung. Dadurch werden sie automatisch von der richtigen Stelle geladen, je nachdem, wo die Anwendung ausgeführt wird:

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + Der [FileSystem Sink-Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink), den die Anwendung verwendet, um Ergebnisse in Amazon S3 S3-Ausgabedateien zu schreiben, wenn Flink einen [Checkpoint](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/stateful-stream-processing/#checkpointing) abschließt. Sie müssen Checkpoints aktivieren, um Dateien in das Ziel zu schreiben. Wenn die Anwendung in Amazon Managed Service für Apache Flink ausgeführt wird, steuert die Anwendungskonfiguration den Checkpoint und aktiviert ihn standardmäßig. Umgekehrt sind Checkpoints bei lokaler Ausführung standardmäßig deaktiviert. Die Anwendung erkennt, dass sie lokal ausgeführt wird, und konfiguriert Checkpoints alle 5.000 ms. 

    ```
     if (env instanceof LocalStreamEnvironment) {
        env.enableCheckpointing(5000);
     }
    ```
  + Diese Anwendung empfängt keine Daten von einer tatsächlichen externen Quelle. Sie generiert zufällige Daten, die über den [DataGen Konnektor](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/) verarbeitet werden. Dieser Konnektor ist für DataStream API, SQL und Tabellen-API verfügbar. Um die Integration zwischen APIs zu demonstrieren, verwendet die Anwendung die DataStram API-Version, da sie mehr Flexibilität bietet. Jeder Datensatz wird durch eine Generatorfunktion generiert, die `StockPriceGeneratorFunction` in diesem Fall aufgerufen wird und in der Sie benutzerdefinierte Logik einfügen können. 

    ```
    DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>(
            new StockPriceGeneratorFunction(),
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(recordPerSecond),
            TypeInformation.of(StockPrice.class));
    ```
  + In der DataStream API können Datensätze benutzerdefinierte Klassen haben. Klassen müssen bestimmten Regeln folgen, damit Flink sie als Datensatz verwenden kann. Weitere Informationen finden Sie unter [Unterstützte Datentypen](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#supported-data-types). In diesem Beispiel ist die `StockPrice` Klasse ein [POJO](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos). 
  + Die Quelle wird dann an die Ausführungsumgebung angehängt, wodurch ein `DataStream` of `StockPrice` generiert wird. Diese Anwendung verwendet keine [Semantik zur Ereigniszeit](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/#notions-of-time-event-time-and-processing-time) und generiert kein Wasserzeichen. Führen Sie die DataGenerator Quelle unabhängig von der Parallelität der restlichen Anwendung mit einer Parallelität von 1 aus.

    ```
    DataStream<StockPrice> stockPrices = env.fromSource(
            source, 
            WatermarkStrategy.noWatermarks(),
            "data-generator"
        ).setParallelism(1);
    ```
  + Was im Datenverarbeitungsablauf folgt, wird mithilfe der Tabellen-API und SQL definiert. Dazu konvertieren wir den Wert DataStream von StockPrices in eine Tabelle. Das Schema der Tabelle wird automatisch aus der `StockPrice` Klasse abgeleitet. 

    ```
    Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    ```
  + Der folgende Codeausschnitt zeigt, wie eine Ansicht und eine Abfrage mithilfe der programmatischen Tabellen-API definiert werden:

    ```
    Table filteredStockPricesTable = stockPricesTable.
            select(
                    $("eventTime").as("event_time"),
                    $("ticker"),
                    $("price"),
                    dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"),
                    dateFormat($("eventTime"), "HH").as("hr")
            ).where($("price").isGreater(50));
            
    tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    ```
  + Eine Sink-Tabelle ist so definiert, dass sie die Ergebnisse als JSON-Dateien in einen Amazon S3 S3-Bucket schreibt. Um den Unterschied bei der programmgesteuerten Definition einer Ansicht zu verdeutlichen, wird die Sink-Tabelle mit der Tabellen-API mithilfe von SQL definiert.

    ```
    tableEnv.executeSql("CREATE TABLE s3_sink (" +
                "eventTime TIMESTAMP(3)," +
                "ticker STRING," +
                "price DOUBLE," +
                "dt STRING," +
                "hr STRING" +
            ") PARTITIONED BY ( dt, hr ) WITH (" +
            "'connector' = 'filesystem'," +
            "'fmat' = 'json'," +
            "'path' = 's3a://"  + s3Path + "'" +
            ")");
    ```
  + Der letzte Schritt von besteht darin`executeInsert()`, die gefilterte Aktienkursansicht in die Sink-Tabelle einzufügen. Diese Methode initiiert die Ausführung des Datenflusses, den wir bisher definiert haben. 

    ```
    filteredStockPricesTable.executeInsert("s3_sink");
    ```

### Verwenden Sie die Datei pom.xml
<a name="get-started-exercise-5-2"></a>

Die Datei pom.xml definiert alle Abhängigkeiten, die von der Anwendung benötigt werden, und richtet das Maven Shade-Plugin ein, um das Fat-Jar zu erstellen, das alle von Flink benötigten Abhängigkeiten enthält. 
+ Einige Abhängigkeiten haben einen Gültigkeitsbereich. `provided` Diese Abhängigkeiten sind automatisch verfügbar, wenn die Anwendung in Amazon Managed Service for Apache Flink ausgeführt wird. Sie sind für die Anwendung oder für die Anwendung lokal in Ihrer IDE erforderlich. Weitere Informationen finden Sie unter (Update auf TableAPI)[Führen Sie Ihre Anwendung lokal aus](get-started-exercise.md#get-started-exercise-5-run). Stellen Sie sicher, dass Sie dieselbe Flink-Version wie die Runtime verwenden, die Sie in Amazon Managed Service for Apache Flink verwenden werden. Um die TableAPI und SQL zu verwenden, müssen Sie das `flink-table-planner-loader` und angeben`flink-table-runtime-dependencies`, beide mit Gültigkeitsbereich. `provided` 

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-loader</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ Sie müssen dem POM mit dem Standardbereich zusätzliche Apache Flink-Abhängigkeiten hinzufügen. Zum Beispiel der [DataGen Konnektor](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/), der [FileSystem SQL-Konnektor](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/) und das [JSON-Format](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/json/). 

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
  </dependency>
  ```
+ Um in Amazon S3 zu schreiben, wenn es lokal ausgeführt wird, ist das S3 Hadoop File System ebenfalls in `provided` Scope enthalten.

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-s3-fs-hadoop</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ Das Maven Java Compiler-Plugin stellt sicher, dass der Code mit Java 11 kompiliert wird, der JDK-Version, die derzeit von Apache Flink unterstützt wird. 
+ Das Maven Shade-Plugin packt das Fat-Jar, mit Ausnahme einiger Bibliotheken, die von der Runtime bereitgestellt werden. Es spezifiziert auch zwei Transformatoren: und. `ServicesResourceTransformer` `ManifestResourceTransformer` Letzteres konfiguriert die Klasse, die die `main` Methode zum Starten der Anwendung enthält. Wenn Sie die Hauptklasse umbenennen, vergessen Sie nicht, diesen Transformator zu aktualisieren.
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## Führen Sie Ihre Anwendung lokal aus
<a name="gs-table-run-locally"></a>

Sie können Ihre Flink-Anwendung lokal in Ihrer IDE ausführen und debuggen.

**Anmerkung**  
Bevor Sie fortfahren, stellen Sie sicher, dass die Eingabe- und Ausgabestreams verfügbar sind. Siehe [Erstellen Sie zwei Amazon Kinesis Kinesis-Datenstreams](get-started-exercise.md#get-started-exercise-1). Stellen Sie außerdem sicher, dass Sie über Lese- und Schreibberechtigungen für beide Streams verfügen. Siehe [Authentifizieren Sie Ihre Sitzung AWS](get-started-exercise.md#get-started-exercise-2-5).   
Für die Einrichtung der lokalen Entwicklungsumgebung sind Java 11 JDK, Apache Maven und eine IDE für die Java-Entwicklung erforderlich. Stellen Sie sicher, dass Sie die erforderlichen Voraussetzungen erfüllen. Siehe [Erfüllen Sie die Voraussetzungen für das Abschließen der Übungen](getting-started.md#setting-up-prerequisites).

### Importieren Sie das Java-Projekt in Ihre IDE
<a name="gs-table-import"></a>

Um mit der Arbeit an der Anwendung in Ihrer IDE zu beginnen, müssen Sie sie als Java-Projekt importieren. 

Das von Ihnen geklonte Repository enthält mehrere Beispiele. Jedes Beispiel ist ein separates Projekt. Importieren Sie für dieses Tutorial den Inhalt im `./jave/GettingStartedTable` Unterverzeichnis in Ihre IDE. 

Fügen Sie den Code mithilfe von Maven als vorhandenes Java-Projekt ein.

**Anmerkung**  
Der genaue Vorgang zum Importieren eines neuen Java-Projekts hängt von der verwendeten IDE ab.

### Ändern Sie die lokale Anwendungskonfiguration
<a name="gs-table-modify-2"></a>

Bei der lokalen Ausführung verwendet die Anwendung die Konfiguration in der `application_properties.json` Datei im Ressourcenordner des Projekts unter`./src/main/resources`. Für diese Tutorial-Anwendung sind die Konfigurationsparameter der Name des Buckets und der Pfad, in den die Daten geschrieben werden.

Bearbeiten Sie die Konfiguration und ändern Sie den Namen des Amazon S3 S3-Buckets so, dass er mit dem Bucket übereinstimmt, den Sie zu Beginn dieses Tutorials erstellt haben.

```
[
 {
 "PropertyGroupId": "bucket",
 "PropertyMap": {
 "name": "{{<bucket-name>}}",
 "path": "output"
 }
 }
]
```

**Anmerkung**  
Die Konfigurationseigenschaft `name` darf beispielsweise nur den Bucket-Namen enthalten`my-bucket-name`. Geben Sie kein Präfix wie `s3://` oder einen abschließenden Schrägstrich ein.  
Wenn Sie den Pfad ändern, lassen Sie alle führenden oder nachfolgenden Schrägstriche weg.

### Richten Sie Ihre IDE-Run-Konfiguration ein
<a name="gs-table-setupIDE"></a>

Sie können die Flink-Anwendung direkt von Ihrer IDE aus ausführen und debuggen, indem Sie die Hauptklasse ausführen`com.amazonaws.services.msf.BasicTableJob`, wie Sie jede Java-Anwendung ausführen würden. Bevor Sie die Anwendung ausführen, müssen Sie die Run-Konfiguration einrichten. Das Setup hängt von der IDE ab, die Sie verwenden. Sehen Sie sich beispielsweise [Run/debug Konfigurationen](https://www.jetbrains.com/help/idea/run-debug-configuration.html) in der IntelliJ IDEA-Dokumentation an. Insbesondere müssen Sie Folgendes einrichten:

1. **Fügen Sie die `provided` Abhängigkeiten zum Klassenpfad** hinzu. Dies ist erforderlich, um sicherzustellen, dass die Abhängigkeiten mit `provided` Gültigkeitsbereich an die Anwendung übergeben werden, wenn sie lokal ausgeführt wird. Ohne diese Einrichtung zeigt die Anwendung sofort einen `class not found` Fehler an. 

1. **Übergeben Sie die AWS Anmeldeinformationen für den Zugriff auf die Kinesis-Streams an die Anwendung**. Am schnellsten ist es, das [AWS Toolkit für IntelliJ](https://aws.amazon.com/intellij/) IDEA zu verwenden. Mit diesem IDE-Plugin in der Run-Konfiguration können Sie ein bestimmtes Profil auswählen. AWS AWS Die Authentifizierung erfolgt mit diesem Profil. Sie müssen die AWS Anmeldeinformationen nicht direkt weitergeben. 

1. Stellen Sie sicher, dass die IDE die Anwendung mit **JDK 11** ausführt.

### Führen Sie die Anwendung in Ihrer IDE aus
<a name="gs-table-runIDE"></a>

Nachdem Sie die Run-Konfiguration für eingerichtet haben`BasicTableJob`, können Sie sie wie eine normale Java-Anwendung ausführen oder debuggen. 

**Anmerkung**  
Sie können das von Maven generierte Fat-Jar nicht direkt über die `java -jar ...` Befehlszeile ausführen. Dieses JAR enthält nicht die Kernabhängigkeiten von Flink, die für die eigenständige Ausführung der Anwendung erforderlich sind.

Wenn die Anwendung erfolgreich gestartet wird, protokolliert sie einige Informationen über den eigenständigen Minicluster und die Initialisierung der Konnektoren. Darauf folgen eine Reihe von INFO- und einige WARN-Logs, die Flink normalerweise beim Start der Anwendung ausgibt.

```
21:28:34,982 INFO  com.amazonaws.services.msf.BasicTableJob                     
                [] - Loading application properties from 'flink-application-properties-dev.json'
21:28:35,149 INFO  com.amazonaws.services.msf.BasicTableJob                     
[] - s3Path is ExampleBucket/my-output-bucket
...
```

Nach Abschluss der Initialisierung gibt die Anwendung keine weiteren Protokolleinträge aus. **Während des Datenflusses wird kein Protokoll ausgegeben.**

Um zu überprüfen, ob die Anwendung Daten korrekt verarbeitet, können Sie den Inhalt des Ausgabe-Buckets überprüfen, wie im folgenden Abschnitt beschrieben.

**Anmerkung**  
 Es ist das normale Verhalten einer Flink-Anwendung, keine Protokolle über fließende Daten auszugeben. Das Ausgeben von Protokollen für jeden Datensatz mag für das Debuggen praktisch sein, kann aber bei der Ausführung in der Produktion zu erheblichem Mehraufwand führen. 

## Beobachten Sie, wie die Anwendung Daten in einen S3-Bucket schreibt
<a name="gs-table-input-output"></a>

Diese Beispielanwendung generiert intern zufällige Daten und schreibt diese Daten in den von Ihnen konfigurierten Ziel-S3-Bucket. Sofern Sie den Standardkonfigurationspfad nicht geändert haben, werden die Daten im folgenden Format `./output/<yyyy-MM-dd>/<HH>` in den `output` Pfad geschrieben, gefolgt von der Daten- und Stundenpartitionierung.

Der [ FileSystem Sink-Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink) erstellt neue Dateien auf dem Flink-Checkpoint. Bei der lokalen Ausführung führt die Anwendung alle 5 Sekunden (5.000 Millisekunden) einen Checkpoint aus, wie im Code angegeben. 

```
 if (env instanceof LocalStreamEnvironment) {
    env.enableCheckpointing(5000);
 }
```

**Um den S3-Bucket zu durchsuchen und die von der Anwendung geschriebene Datei zu beobachten**

1. 

   1. Öffnen Sie die Amazon S3 S3-Konsole unter [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Wählen Sie den Bucket aus, den Sie zuvor erstellt haben. 

1. Navigieren Sie zum `output` Pfad und dann zu den Datums- und Stundenordnern, die der aktuellen Uhrzeit in der UTC-Zeitzone entsprechen. 

1. Aktualisieren Sie regelmäßig, um zu beobachten, dass alle 5 Sekunden neue Dateien angezeigt werden.

1. Wählen Sie eine Datei aus und laden Sie sie herunter, um den Inhalt zu beobachten.
**Anmerkung**  
Standardmäßig haben die Dateien keine Erweiterungen. Der Inhalt ist als JSON formatiert. Sie können die Dateien mit einem beliebigen Texteditor öffnen, um den Inhalt zu überprüfen.

## Stoppen Sie, dass Ihre Anwendung lokal ausgeführt wird
<a name="gs-table-stop"></a>

Stoppen Sie die Anwendung, die in Ihrer IDE ausgeführt wird. Die IDE bietet normalerweise eine „Stopp“ -Option. Der genaue Standort und die Methode hängen von der IDE ab. 

## Kompilieren und verpacken Sie Ihren Anwendungscode
<a name="gs-table-5.5"></a>

In diesem Abschnitt verwenden Sie Apache Maven, um den Java-Code zu kompilieren und in eine JAR-Datei zu packen. Sie können Ihren Code mit dem Maven-Befehlszeilentool oder Ihrer IDE kompilieren und verpacken.

**Um mit der Maven-Befehlszeile zu kompilieren und zu paketieren**

Gehen Sie in das Verzeichnis, das das GettingStarted Jave-Projekt enthält, und führen Sie den folgenden Befehl aus:

```
$ mvn package
```

**Um mit Ihrer IDE zu kompilieren und zu paketieren**

Führen Sie es `mvn package` von Ihrer IDE-Maven-Integration aus aus.

In beiden Fällen `target/amazon-msf-java-table-app-1.0.jar` wird die JAR-Datei erstellt.

**Anmerkung**  
Wenn *Sie ein Build-Projekt* von Ihrer IDE aus ausführen, wird die JAR-Datei möglicherweise nicht erstellt.

## Laden Sie die JAR-Datei mit dem Anwendungscode hoch
<a name="gs-table-6"></a>

In diesem Abschnitt laden Sie die JAR-Datei, die Sie im vorherigen Abschnitt erstellt haben, in den Amazon S3 S3-Bucket hoch, den Sie zu Beginn dieses Tutorials erstellt haben. Wenn Sie es schon getan haben, schließen Sie es ab[Erstellen Sie einen Amazon S3 S3-Bucket](#gs-table-resources-s3).

**So laden Sie den Anwendungscode hoch**

1. Öffnen Sie die Amazon S3 S3-Konsole unter [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Wählen Sie den Bucket aus, den Sie zuvor für den Anwendungscode erstellt haben.

1. Wählen Sie Feld **hochladen** aus. 

1. Klicken Sie auf **Add files**.

1. Navigieren Sie zu der im vorherigen Abschnitt generierten JAR-Datei:`target/amazon-msf-java-table-app-1.0.jar`.

1. Wählen Sie **Hochladen**, ohne andere Einstellungen zu ändern.
**Warnung**  
 Stellen Sie sicher, dass Sie die richtige JAR-Datei in auswählen`<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar`.   
Das Zielverzeichnis enthält auch andere JAR-Dateien, die Sie nicht hochladen müssen. 

## Erstellen und konfigurieren Sie die Anwendung Managed Service für Apache Flink
<a name="gs-table-7"></a>

Sie können eine Managed Service for Apache Flink-Anwendung entweder mit der Konsole oder der erstellen und konfigurieren. AWS CLI Für dieses Tutorial verwenden Sie die Konsole.

**Anmerkung**  
Wenn Sie die Anwendung mithilfe der Konsole erstellen, werden Ihre AWS Identity and Access Management (IAM) und Amazon CloudWatch Logs-Ressourcen für Sie erstellt. Wenn Sie die Anwendung mithilfe von erstellen AWS CLI, müssen Sie diese Ressourcen separat erstellen.

### Erstellen der Anwendung
<a name="gs-table-7-console-create"></a>

1. Melden Sie sich bei der AWS-Managementkonsole an und öffnen Sie die Amazon MSF-Konsole unter https://console.aws.amazon.com/flink.

1. Stellen Sie sicher, dass die richtige Region ausgewählt ist: USA Ost (Nord-Virginia) us-east-1.

1. Wählen Sie im rechten Menü **Apache Flink-Anwendungen** und dann Streaming-Anwendung **erstellen**. Wählen Sie alternativ im Bereich **Erste Schritte** auf der Startseite die Option **Streaming-Anwendung erstellen** aus. 

1. Gehen Sie auf der Seite **Streaming-Anwendung erstellen** wie folgt vor:
   + **Wählen Sie für Wählen Sie eine Methode zum Einrichten der Streamverarbeitungsanwendung** die Option **Von Grund auf neu erstellen aus**.
   + Wählen Sie für die **Apache Flink-Konfiguration und die Version von Application Flink** die Option **Apache Flink** 1.19.
   + Gehen Sie im Abschnitt **Anwendungskonfiguration** wie folgt vor:
     + Geben Sie als **Anwendungsname** ein **MyApplication**.
     + Geben Sie für **Beschreibung** den Text **My Java Table API test app** ein.
     + Wählen Sie **unter Zugriff auf Anwendungsressourcen** die Option **Create/update IAM role kinesis-analytics- MyApplication-us-east-1 ** with required policies aus.
   + Gehen Sie unter **Vorlage für Anwendungseinstellungen** wie folgt vor:
     + Wählen Sie für **Vorlagen** die Option **Entwicklung aus.**

1. Wählen Sie **Streaming-Anwendung erstellen** aus.

**Anmerkung**  
Beim Erstellen einer Anwendung von Managed Service für Apache Flink mit der Konsole haben Sie die Möglichkeit, eine IAM-Rolle und -Richtlinie für Ihre Anwendung erstellen zu lassen. Ihre Anwendung verwendet diese Rolle und Richtlinie für den Zugriff auf ihre abhängigen Ressourcen. Diese IAM-Ressourcen werden unter Verwendung Ihres Anwendungsnamens und der Region wie folgt benannt:  
Richtlinie: `kinesis-analytics-service-{{MyApplication}}-{{us-east-1}}`
Rolle: `kinesisanalytics-{{MyApplication}}-{{us-east-1}}`

### Bearbeiten Sie die IAM-Richtlinie
<a name="gs-table-7-console-iam"></a>

Bearbeiten Sie die IAM-Richtlinie zum Hinzufügen von Berechtigungen für den Zugriff auf den Amazon S3-Bucket.

**Um die IAM-Richtlinie zu bearbeiten, um S3-Bucket-Berechtigungen hinzuzufügen**

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Wählen Sie **Policies (Richtlinien)**. Wählen Sie die **`kinesis-analytics-service-MyApplication-us-east-1`**-Richtlinie aus, die die Konsole im vorherigen Abschnitt für Sie erstellt hat. 

1. Wählen Sie **Bearbeiten** und dann die Registerkarte **JSON**.

1. Fügen Sie den markierten Abschnitt der folgenden Beispielrichtlinie der Richtlinie hinzu. Ersetzen Sie die Beispielkonto-ID ({{012345678901}}) durch Ihre Konto-ID und {{<bucket-name>}} den Namen des S3-Buckets, den Sie erstellt haben.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket}}/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           }{{, 
           {
               "Sid": "WriteOutputBucket", 
               "Effect": "Allow", 
               "Action": "s3:*", 
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket2}}"
               ]
          }}}
       ]
   }
   ```

------

1.  Wählen Sie **Weiter** und dann **Änderungen speichern** aus.

### Konfigurieren Sie die Anwendung
<a name="gs-table-7-console-configure"></a>

Bearbeiten Sie die Anwendung, um das Anwendungscode-Artefakt festzulegen.

**Konfigurieren der Anwendung**

1. Wählen Sie auf der **MyApplication**Seite **Configure** aus.

1. Wählen Sie im Abschnitt **Speicherort des Anwendungscodes** die Option **Konfigurieren** aus.
   + Wählen Sie für **Amazon S3 S3-Bucket** den Bucket aus, den Sie zuvor für den Anwendungscode erstellt haben. Wählen Sie **Durchsuchen** und wählen Sie den richtigen Bucket aus. Wählen Sie dann **Wählen aus**. Klicken Sie nicht auf den Bucket-Namen. 
   + Geben Sie als **Pfad zum Amazon-S3-Objekt** den Wert **amazon-msf-java-table-app-1.0.jar** ein.

1. Wählen Sie für **Zugriffsberechtigungen** die Option **Erstellen / Aktualisieren Sie IAM-Rolle `kinesis-analytics-MyApplication-us-east-1`** aus.

1. Fügen Sie im Abschnitt **Runtime-Eigenschaften** die folgenden Eigenschaften hinzu. 

1. Wählen **Sie Neues Element** hinzufügen und fügen Sie jeden der folgenden Parameter hinzu:    
[See the AWS documentation website for more details](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/gs-table-create.html)

1. Ändern Sie keine anderen Einstellungen.

1. Wählen Sie **Änderungen speichern ** aus.

**Anmerkung**  
Wenn Sie sich dafür entscheiden, die CloudWatch Amazon-Protokollierung zu aktivieren, erstellt Managed Service für Apache Flink eine Protokollgruppe und einen Protokollstream für Sie. Die Namen dieser Ressourcen lauten wie folgt:   
Protokollgruppe: `/aws/kinesis-analytics/MyApplication`
Protokollstream: `kinesis-analytics-log-stream`

### Führen Sie die Anwendung aus.
<a name="gs-table-7-console-run"></a>

Die Anwendung ist jetzt konfiguriert und kann ausgeführt werden.

**Ausführen der Anwendung**

1. Kehren Sie zur Konsolenseite in Amazon Managed Service für Apache Flink zurück und wählen Sie **MyApplication**.

1. Wählen Sie **Ausführen**, um die Anwendung zu starten.

1. Wählen Sie in der **Konfiguration zur Anwendungswiederherstellung** die Option **Mit neuestem Snapshot ausführen** aus.

1. Klicken Sie auf **Ausführen**.

1. Der **Status** in den **Anwendungsdetails** wechselt von `Ready` zu `Starting` und dann zu „`Running`Nach dem Start der Anwendung“. 

Wenn sich die Anwendung im `Running` Status befindet, können Sie das Flink-Dashboard öffnen. 

**Um das Dashboard zu öffnen und den Job anzusehen**

1. Wählen Sie **Apache Flink-Dashboard öffnen**. Das Dashboard wird auf einer neuen Seite geöffnet.

1. Wählen Sie in der Liste „**Laufende Jobs**“ den einzelnen Job aus, den Sie sehen können.
**Anmerkung**  
Wenn Sie die Runtime-Eigenschaften festgelegt oder die IAM-Richtlinien falsch bearbeitet haben, ändert sich der Anwendungsstatus möglicherweise auf, aber das Flink-Dashboard zeigt an`Running`, dass der Job kontinuierlich neu gestartet wird. Dies ist ein häufiges Fehlerszenario, wenn die Anwendung falsch konfiguriert ist oder nicht über die erforderlichen Berechtigungen für den Zugriff auf die externen Ressourcen verfügt.   
Überprüfen Sie in diesem Fall die Registerkarte **Ausnahmen** im Flink-Dashboard, um die Ursache des Problems zu untersuchen.

### Beobachten Sie die Metriken der laufenden Anwendung
<a name="gs-observe-metrics"></a>

Auf der **MyApplication**Seite, im Abschnitt ** CloudWatch Amazon-Metriken**, können Sie einige der grundlegenden Metriken der laufenden Anwendung sehen. 

**Um die Metriken einzusehen**

1. Wählen Sie neben der Schaltfläche „**Aktualisieren**“ in der Dropdownliste die Option **10 Sekunden** aus.

1. Wenn die Anwendung läuft und fehlerfrei ist, können Sie sehen, dass die **Uptime-Metrik** kontinuierlich zunimmt.

1. Die Metrik für **vollständige Neustarts** sollte Null sein. Wenn sie zunimmt, kann es bei der Konfiguration zu Problemen kommen. Überprüfen Sie die Registerkarte **Ausnahmen** im Flink-Dashboard, um das Problem zu untersuchen.

1. Die Metrik „**Anzahl fehlgeschlagener Checkpoints**“ sollte in einer fehlerfreien Anwendung Null sein. 
**Anmerkung**  
Dieses Dashboard zeigt einen festen Satz von Metriken mit einer Granularität von 5 Minuten. Sie können ein benutzerdefiniertes Anwendungs-Dashboard mit beliebigen Metriken im CloudWatch Dashboard erstellen.

### Beobachten Sie, wie die Anwendung Daten in den Ziel-Bucket schreibt
<a name="gs-observe-output"></a>

Sie können jetzt beobachten, wie die Anwendung in Amazon Managed Service für Apache Flink ausgeführt wird und Dateien auf Amazon S3 schreibt.

Um die Dateien zu beobachten, gehen Sie genauso vor, wie Sie die Dateien überprüft haben, die geschrieben wurden, als die Anwendung lokal ausgeführt wurde. Siehe [Beobachten Sie, wie die Anwendung Daten in einen S3-Bucket schreibt](#gs-table-input-output). 

Denken Sie daran, dass die Anwendung neue Dateien auf den Flink-Checkpoint schreibt. Bei der Ausführung auf Amazon Managed Service für Apache Flink sind Checkpoints standardmäßig aktiviert und werden alle 60 Sekunden ausgeführt. Die Anwendung erstellt ungefähr alle 1 Minute neue Dateien.

### Stoppen Sie die Anwendung
<a name="gs-table-7-console-stop"></a>

Um die Anwendung zu beenden, rufen Sie die Konsolenseite der Anwendung Managed Service for Apache Flink mit dem Namen auf. `MyApplication`

**So stoppen Sie die Anwendung**

1. **Wählen Sie in der Dropdownliste **Aktion** die Option Stopp aus.**

1. Der **Status** in den **Anwendungsdetails** wechselt von `Running` zu und dann zu dem `Ready` Zeitpunkt`Stopping`, an dem die Anwendung vollständig gestoppt wurde. 
**Anmerkung**  
Vergessen Sie nicht, auch das Senden von Daten aus dem Python-Skript oder dem Kinesis Data Generator an den Eingabestream zu beenden.