

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verwenden Sie Apache Beam mit Managed Service für Apache Flink-Anwendungen
<a name="how-creating-apps-beam"></a>

**Anmerkung**  
**Es gibt keinen kompatiblen Apache Flink Runner für Flink 1.20. Weitere Informationen finden Sie unter [Flink-Versionskompatibilität](https://beam.apache.org/documentation/runners/flink/#flink-version-compatibility) in der Apache Beam-Dokumentation.** >

Sie können das [Apache Beam](https://beam.apache.org/)-Framework mit Ihrer Managed Service for Apache Flink-Anwendung verwenden, um Streaming-Daten zu verarbeiten. Managed Service für Apache Flink-Anwendungen, die Apache Beam verwenden, verwendet [Apache Flink Runner](https://beam.apache.org/documentation/runners/flink/) zur Ausführung von Beam-Pipelines.

Ein Tutorial zur Verwendung von Apache Beam in einer Managed Service for Apache Flink-Anwendung finden Sie unter[Verwenden CloudFormationErstellen einer Anwendung mit Apache Beam](examples-beam.md).

**Topics**
+ [

## Einschränkungen von Apache Flink Runner mit Managed Service für Apache Flink
](#how-creating-apps-beam-using)
+ [

## Apache Beam-Funktionen mit Managed Service für Apache Flink
](#how-creating-apps-beam-capabilities)
+ [

# Erstellen Sie eine Anwendung mit Apache Beam
](examples-beam.md)

## Einschränkungen von Apache Flink Runner mit Managed Service für Apache Flink
<a name="how-creating-apps-beam-using"></a>

Beachten Sie Folgendes zur Verwendung des Apache Flink Runners mit Managed Service für Apache Flink:
+ Apache Beam-Metriken sind in der Managed Service for Apache Flink-Konsole nicht sichtbar.
+ **Apache Beam wird nur mit Managed Service für Apache Flink-Anwendungen unterstützt, die Apache Flink Version 1.8 und höher verwenden. Apache Beam wird mit Managed Service für Apache Flink-Anwendungen, die Apache Flink Version 1.6 verwenden, nicht unterstützt.**

## Apache Beam-Funktionen mit Managed Service für Apache Flink
<a name="how-creating-apps-beam-capabilities"></a>

Managed Service für Apache Flink unterstützt dieselben Apache Beam-Funktionen wie der Apache Flink Runner. Informationen darüber, welche Feature vom Apache Flink Runner unterstützt werden, finden Sie in der [Beam-Kompatibilitätsmatrix.](https://beam.apache.org/documentation/runners/capability-matrix/) 

Wir empfehlen Ihnen, Ihre Apache Flink-Anwendung im Managed Service for Apache Flink-Dienst zu testen, um sicherzustellen, dass wir alle Feature unterstützen, die Ihre Anwendung benötigt.

# Erstellen Sie eine Anwendung mit Apache Beam
<a name="examples-beam"></a>

In dieser Übung erstellen Sie eine Managed Service for Apache Flink-Anwendung, die Daten mithilfe von [Apache Beam](https://beam.apache.org/) transformiert. Apache Beam ist ein Programmiermodell für die Verarbeitung von Streaming-Daten. Informationen zur Verwendung von Apache Beam mit Managed Service für Apache Flink finden Sie unter[Verwenden Sie Apache Beam mit Managed Service für Apache Flink-Anwendungen](how-creating-apps-beam.md).

**Anmerkung**  
Um die erforderlichen Voraussetzungen für diese Übung festzulegen, schließen Sie zunächst die [Tutorial: Erste Schritte mit der DataStream API in Managed Service für Apache Flink](getting-started.md) Übung ab.

**Topics**
+ [

## Erstellen Sie abhängige Ressourcen
](#examples-beam-resources)
+ [

## Schreiben Sie Beispieldatensätze in den Eingabestream
](#examples-beam-write)
+ [

## Laden Sie den Anwendungscode herunter und untersuchen Sie ihn
](#examples-beam-download)
+ [

## Kompilieren Sie den Anwendungscode
](#examples-beam-compile)
+ [

## Laden Sie den Apache Flink-Streaming-Java-Code hoch
](#examples-beam-upload)
+ [

## Erstellen Sie die Anwendung Managed Service for Apache Flink und führen Sie sie aus
](#examples-beam-create-run)
+ [

## Ressourcen bereinigen AWS
](#examples-beam-cleanup)
+ [

## Nächste Schritte
](#examples-beam-nextsteps)

## Erstellen Sie abhängige Ressourcen
<a name="examples-beam-resources"></a>

Bevor Sie für diese Übung eine Anwendung von Managed Service für Apache Flink erstellen, erstellen Sie die folgenden abhängigen Ressourcen: 
+ Zwei Kinesis Data Streams (`ExampleInputStream` und `ExampleOutputStream`)
+ Einen Amazon S3-Bucket zum Speichern des Codes der Anwendung (`ka-app-code-<username>`) 

Sie können die Kinesis Streams und den Amazon-S3-Bucket mithilfe der Konsole erstellen. Anweisungen zum Erstellen dieser Ressourcen finden Sie in den folgenden Themen:
+ [Data Streams erstellen und aktualisieren](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html) im *Amazon Kinesis Data Streams Entwicklerleitfaden*. Benennen Sie Ihre Data Streams **ExampleInputStream** und **ExampleOutputStream**.
+ [Wie erstelle ich einen S3-Bucket?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html) im *Amazon Simple Storage Service Benutzerhandbuch*. Geben Sie dem Amazon S3-Bucket einen global eindeutigen Namen, indem Sie Ihren Anmeldenamen anhängen, z. B. **ka-app-code-*<username>***.

## Schreiben Sie Beispieldatensätze in den Eingabestream
<a name="examples-beam-write"></a>

In diesem Abschnitt verwenden Sie ein Python-Skript zum Schreiben von Datensätzen in den Stream für die zu verarbeitende Anwendung.

**Anmerkung**  
Dieser Abschnitt erfordert [AWS SDK für Python (Boto)](https://aws.amazon.com/developers/getting-started/python/).

1. Erstellen Sie eine Datei `ping.py` mit dem folgenden Inhalt:

   ```
   import json
   import boto3
   import random
   
   kinesis = boto3.client('kinesis')
   
   while True:
           data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat'])
           print(data)
           kinesis.put_record(
                   StreamName="ExampleInputStream",
                   Data=data,
                   PartitionKey="partitionkey")
   ```

1. Führen Sie das `ping.py`Skript aus: 

   ```
   $ python ping.py
   ```

   Lassen Sie das Skript laufen, während Sie den Rest des Tutorials abschließen.

## Laden Sie den Anwendungscode herunter und untersuchen Sie ihn
<a name="examples-beam-download"></a>

Der Java-Anwendungscode für dieses Beispiel ist verfügbar unter GitHub. Zum Herunterladen des Anwendungscodes gehen Sie wie folgt vor:

1. Installieren Sie den Git-Client, wenn Sie dies noch nicht getan haben. Weitere Informationen finden Sie unter [Git installieren](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git). 

1. Klonen Sie das Remote-Repository mit dem folgenden Befehl:

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
   ```

1. Navigieren Sie zum `amazon-kinesis-data-analytics-java-examples/Beam` Verzeichnis .

Der Anwendungscode befindet sich in der `BasicBeamStreamingJob.java`-Datei. Beachten Sie Folgendes zum Anwendungscode:
+ Die Anwendung verwendet den Apache Beam [ParDo](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/ParDo.html), um eingehende Datensätze zu verarbeiten, indem sie eine benutzerdefinierte Transformationsfunktion namens `PingPongFn` aufruft.

  Der Code zum Aufrufen der `PingPongFn` Funktion lautet wie folgt:

  ```
  .apply("Pong transform",
      ParDo.of(new PingPongFn())
  ```
+ Managed Service für Apache Flink-Anwendungen, die Apache Beam verwenden, erfordert die folgenden Komponenten. Wenn Sie diese Komponenten und Versionen nicht in Ihre `pom.xml` aufnehmen, lädt Ihre Anwendung die falschen Versionen aus den Umgebungsabhängigkeiten, und da die Versionen nicht übereinstimmen, stürzt Ihre Anwendung zur Laufzeit ab.

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```
+ Die `PingPongFn` Transformationsfunktion übergibt die Eingabedaten an den Ausgabestrom, sofern es sich bei den Eingabedaten nicht um einen **Ping-Wert** handelt. In diesem Fall gibt sie die Zeichenfolge **pong\$1n**an den Ausgabestrom aus. 

  Der Code der Transformationsfunktion lautet wie folgt:

  ```
      private static class PingPongFn extends DoFn<KinesisRecord, byte[]> {
      private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class);
      
      @ProcessElement
      public void processElement(ProcessContext c) {
          String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8);
          if (content.trim().equalsIgnoreCase("ping")) {
              LOG.info("Ponged!");
              c.output("pong\n".getBytes(StandardCharsets.UTF_8));
          } else {
              LOG.info("No action for: " + content);
              c.output(c.element().getDataAsBytes());
          }
      }
  }
  ```

## Kompilieren Sie den Anwendungscode
<a name="examples-beam-compile"></a>

Zum Kompilieren der Anwendung gehen Sie wie folgt vor:

1. Installieren Sie Java und Maven, wenn das noch nicht geschehen ist. Weitere Informationen finden Sie unter [Erfüllen Sie die erforderlichen Voraussetzungen](getting-started.md#setting-up-prerequisites) im [Tutorial: Erste Schritte mit der DataStream API in Managed Service für Apache Flink](getting-started.md) Tutorial.

1. Kompilieren Sie die Anwendung mit dem folgenden Befehl: 

   ```
   mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
   ```
**Anmerkung**  
Der bereitgestellte Quellcode basiert auf Bibliotheken von Java 11. 

Beim Kompilieren der Anwendung wird die JAR-Datei der Anwendung (`target/basic-beam-app-1.0.jar`) erstellt.

## Laden Sie den Apache Flink-Streaming-Java-Code hoch
<a name="examples-beam-upload"></a>

In diesem Abschnitt laden Sie Ihren Anwendungscode in das Amazon S3-Bucket hoch, das Sie im [Erstellen Sie abhängige Ressourcen](#examples-beam-resources) Abschnitt erstellt haben.

1. Wählen Sie in der Amazon S3 S3-Konsole den *<username>* Bucket **ka-app-code-** und wählen Sie **Upload** aus.

1. Klicken Sie im Schritt **Auswählen von Dateien** auf **Hinzufügen von Dateien**. Navigieren Sie zu der `basic-beam-app-1.0.jar`Datei, die Sie im vorherigen Schritt erstellt haben. 

1. Sie müssen keine der Einstellungen für das Objekt ändern. Wählen Sie daher **Hochladen**.

Ihr Anwendungscode ist jetzt in einem Amazon-S3-Bucket gespeichert, in dem Ihre Anwendung darauf zugreifen kann.

## Erstellen Sie die Anwendung Managed Service for Apache Flink und führen Sie sie aus
<a name="examples-beam-create-run"></a>

Befolgen Sie diese Schritte, um die Anwendung über die Konsole zu erstellen, zu konfigurieren, zu aktualisieren und auszuführen.

### Erstellen Sie die Anwendung
<a name="examples-beam-create"></a>

1. Melden Sie sich bei der AWS-Managementkonsole an und öffnen Sie die Amazon MSF-Konsole unter https://console.aws.amazon.com /flink.

1. Wählen Sie im Dashboard Managed Service für Apache Flink die Option **Analyseanwendung erstellen** aus.

1. Geben Sie auf der Seite **Managed Service für Apache Flink – Anwendung erstellen** die Anwendungsdetails wie folgt ein:
   + Geben Sie als **Anwendungsname** ein **MyApplication**.
   + Wählen Sie für **Laufzeit** die Option **Apache Flink** aus.
**Anmerkung**  
Apache Beam ist derzeit nicht mit Apache Flink Version 1.19 oder höher kompatibel.
   + Wählen Sie **Apache Flink Version 1.15 aus dem Versions-Pulldown** aus.

1. Wählen Sie für **Zugriffsberechtigungen** die Option **Erstellen / Aktualisieren Sie IAM-Rolle `kinesis-analytics-MyApplication-us-west-2`** aus.

1. Wählen Sie **Create application** aus.

**Anmerkung**  
Beim Erstellen einer Anwendung von Managed Service für Apache Flink mit der Konsole haben Sie die Möglichkeit, eine IAM-Rolle und -Richtlinie für Ihre Anwendung erstellen zu lassen. Ihre Anwendung verwendet diese Rolle und Richtlinie für den Zugriff auf ihre abhängigen Ressourcen. Diese IAM-Ressourcen werden unter Verwendung Ihres Anwendungsnamens und der Region wie folgt benannt:  
Richtlinie: `kinesis-analytics-service-MyApplication-us-west-2`
Rolle: `kinesis-analytics-MyApplication-us-west-2`

### Bearbeiten Sie die IAM-Richtlinie
<a name="get-started-exercise-7-console-iam"></a>

Bearbeiten Sie die IAM-Richtlinie zum Hinzufügen von Berechtigungen für den Zugriff auf die Kinesis-Datenströme.

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Wählen Sie **Policies (Richtlinien)**. Wählen Sie die **`kinesis-analytics-service-MyApplication-us-west-2`**-Richtlinie aus, die die Konsole im vorherigen Abschnitt für Sie erstellt hat. 

1. Wählen Sie auf der Seite **Summary (Übersicht)** die Option **Edit policy (Richtlinie bearbeiten)** aus. Wählen Sie den Tab **JSON**.

1. Fügen Sie den markierten Abschnitt der folgenden Beispielrichtlinie der Richtlinie hinzu. Ersetzen Sie das Beispielkonto IDs (*012345678901*) durch Ihre Konto-ID.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "logs:DescribeLogGroups",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*",
                   "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar"
               ]
           },
           {
               "Sid": "DescribeLogStreams",
               "Effect": "Allow",
               "Action": "logs:DescribeLogStreams",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
           },
           {
               "Sid": "PutLogEvents",
               "Effect": "Allow",
               "Action": "logs:PutLogEvents",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

### Konfigurieren Sie die Anwendung
<a name="examples-beam-configure"></a>

1. Wählen Sie auf der **MyApplication**Seite **Configure** aus.

1. Klicken Sie auf der Seite **Configure application (Anwendung konfigurieren)** auf die Option **Code location (Codespeicherort)**:
   + Geben Sie für **Amazon-S3-Bucket** **ka-app-code-*<username>*** ein.
   + Geben Sie als **Pfad zum Amazon-S3-Objekt** den Wert **basic-beam-app-1.0.jar** ein.

1. Wählen Sie unter **Zugriff auf Anwendungsressourcen** für **Zugriffsberechtigungen** die Option **IAM-Rolle `kinesis-analytics-MyApplication-us-west-2` erstellen/aktualisieren** aus.

1. Geben Sie Folgendes ein:    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/de_de/managed-flink/latest/java/examples-beam.html)

1. Stellen Sie unter **Überwachung** sicher, dass die **Ebene der Überwachungsmetriken** auf **Anwendung** eingestellt ist.

1. Wählen Sie für die **CloudWatch Protokollierung** das Kontrollkästchen **Aktivieren** aus.

1. Wählen Sie **Aktualisieren** aus.

**Anmerkung**  
Wenn Sie die CloudWatch Protokollierung aktivieren möchten, erstellt Managed Service for Apache Flink eine Protokollgruppe und einen Protokollstream für Sie. Die Namen dieser Ressourcen lauten wie folgt:   
Protokollgruppe: `/aws/kinesis-analytics/MyApplication`
Protokollstream: `kinesis-analytics-log-stream`
Dieser Protokollstream wird zur Überwachung der Anwendung verwendet. Dies ist nicht derselbe Protokollstream, den die Anwendung zum Senden von Ergebnissen verwendet.

### Führen Sie die Anwendung aus.
<a name="examples-beam-run"></a>

Das Flink-Jobdiagramm kann angezeigt werden, indem Sie die Anwendung ausführen, das Apache Flink-Dashboard öffnen und den gewünschten Flink-Job auswählen.

Sie können die Messwerte von Managed Service for Apache Flink auf der CloudWatch Konsole überprüfen, um sicherzustellen, dass die Anwendung funktioniert. 

## Ressourcen bereinigen AWS
<a name="examples-beam-cleanup"></a>

Dieser Abschnitt enthält Verfahren zum Bereinigen von AWS Ressourcen, die im Tumbling Window-Tutorial erstellt wurden.

**Topics**
+ [

### Löschen Sie Ihre Managed Service for Apache Flink-Anwendung
](#examples-beam-cleanup-app)
+ [

### Löschen Sie Ihre Kinesis-Datenstreams
](#examples-beam-cleanup-stream)
+ [

### Löschen Sie Ihr Amazon S3 S3-Objekt und Ihren Bucket
](#examples-beam-cleanup-s3)
+ [

### Löschen Sie Ihre IAM-Ressourcen
](#examples-beam-cleanup-iam)
+ [

### CloudWatch Löschen Sie Ihre Ressourcen
](#examples-beam-cleanup-cw)

### Löschen Sie Ihre Managed Service for Apache Flink-Anwendung
<a name="examples-beam-cleanup-app"></a>

1. Melden Sie sich bei der AWS-Managementkonsole an und öffnen Sie die Amazon MSF-Konsole unter https://console.aws.amazon.com /flink.

1. wählen Sie im Bereich Managed Service for Apache Flink die Option. **MyApplication**

1. Wählen Sie auf der Seite der Anwendung die Option **Löschen** aus und bestätigen Sie dann den Löschvorgang.

### Löschen Sie Ihre Kinesis-Datenstreams
<a name="examples-beam-cleanup-stream"></a>

1. Öffnen Sie die Kinesis-Konsole unter [https://console.aws.amazon.com/kinesis.](https://console.aws.amazon.com/kinesis)

1. Wählen Sie im Bereich Kinesis Data Streams die Option **ExampleInputStream**.

1. Wählen Sie auf der **ExampleInputStream**Seite **Delete Kinesis Stream** aus und bestätigen Sie dann den Löschvorgang.

1. Wählen Sie auf der **Kinesis-Streams-Seite** die **ExampleOutputStream**, wählen Sie **Aktionen**, wählen Sie **Löschen** und bestätigen Sie dann den Löschvorgang.

### Löschen Sie Ihr Amazon S3 S3-Objekt und Ihren Bucket
<a name="examples-beam-cleanup-s3"></a>

1. Öffnen Sie die Amazon S3 S3-Konsole unter [https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/).

1. Wählen Sie den ***<username>*Bucket ka-app-code -.**

1. Wählen Sie **Löschen** und geben Sie dann den Bucketnamen ein, um das Löschen zu bestätigen.

### Löschen Sie Ihre IAM-Ressourcen
<a name="examples-beam-cleanup-iam"></a>

1. Öffnen Sie unter [https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/) die IAM-Konsole.

1. Wählen Sie in der Navigationsleiste **Policies** aus.

1. Geben Sie in der Filtersteuerung **Kinesis** ein.

1. Wählen Sie die Richtlinie **kinesis-analytics-service- MyApplication -us-west-2**.

1. Klicken Sie auf **Richtlinienaktionen** und anschließend auf **Löschen**.

1. Wählen Sie in der Navigationsleiste **Roles (Rollen)** aus.

1. Wählen Sie die Rolle **kinesis-analytics- MyApplication** -us-west-2.

1. Wählen Sie dann **Rolle löschen** und bestätigen Sie das Löschen.

### CloudWatch Löschen Sie Ihre Ressourcen
<a name="examples-beam-cleanup-cw"></a>

1. Öffnen Sie die CloudWatch Konsole unter [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/).

1. Wählen Sie in der Navigationsleiste **Protokolle** aus.

1. Wählen Sie die Gruppe**/aws/kinesis-analytics/MyApplication**log aus.

1. Wählen Sie dann **Protokollgruppe löschen** und bestätigen Sie das Löschen.

## Nächste Schritte
<a name="examples-beam-nextsteps"></a>

Nachdem Sie nun eine grundlegende Managed Service for Apache Flink-Anwendung erstellt und ausgeführt haben, die Daten mithilfe von Apache Beam transformiert, finden Sie in der folgenden Anwendung ein Beispiel für eine erweiterte Managed Service für Apache Flink-Lösung.
+ **[Workshop „Beam on Managed Service for Apache Flink Streaming](https://streaming-analytics.workshop.aws/beam-on-kda/)**“: In diesem Workshop untersuchen wir ein durchgängiges Beispiel, das Batch- und Streaming-Aspekte in einer einheitlichen Apache Beam-Pipeline kombiniert. 