

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Managed Service for Apache Flink アプリケーションに Apache Beam を使用する
<a name="how-creating-apps-beam"></a>

**注記**  
**Flink 1.20 と互換性のある Apache Flink Runner はありません。詳細については、「Apache Beam ドキュメント」の「[Flink Version Compatibility](https://beam.apache.org/documentation/runners/flink/#flink-version-compatibility)」を参照してください。**>

「[Apache Beam](https://beam.apache.org/)」フレームワークを Apache Flink アプリケーション用 Managed Serviceで使用して、ストリーミングデータを処理できます。Apache Beam を使用する Apache Flink アプリケーション用 Managed Serviceでは、「[Apache Flink ランナー](https://beam.apache.org/documentation/runners/flink/)」を使用して Beam パイプラインを実行します。

Apache Flink アプリケーション用 Managed Serviceで Apache Beam を使用する方法に関するチュートリアルについては、 [CloudFormation を使用するApache Beam を使用してアプリケーションを作成する](examples-beam.md) を参照してください。

**Topics**
+ [

## Managed Service for Apache Flink を使用した Apache Flink ランナーの制限事項
](#how-creating-apps-beam-using)
+ [

## Managed Service for Apache Flink での Apache Beam 機能
](#how-creating-apps-beam-capabilities)
+ [

# Apache Beam を使用してアプリケーションを作成する
](examples-beam.md)

## Managed Service for Apache Flink を使用した Apache Flink ランナーの制限事項
<a name="how-creating-apps-beam-using"></a>

Apache Flink 用 Managed Serviceで Apache Flink ランナーを使用する際には、次の点に注意してください。
+ Apache Beam メトリクスは Apache Flink 用 Managed Serviceコンソールでは表示できません。
+ 「**Apache Beam は Apache Flink バージョン 1.8 以降を使用する Apache Flink アプリケーション用 Managed Serviceでのみサポートされています。」 「Apache Beam は、Apache Flink バージョン 1.6 を使用する Apache Flink アプリケーション用 Managed Serviceではサポートされていません。**」

## Managed Service for Apache Flink での Apache Beam 機能
<a name="how-creating-apps-beam-capabilities"></a>

Apache Flink のマネージドサービスは、Apache Flink ランナーと同じ Apache Beam 能力をサポートしています。Apache Flink ランナーでサポートされている機能については、「[ビーム互換性マトリックス](https://beam.apache.org/documentation/runners/capability-matrix/)」を参照してください。

Apache Flink 用 Managed Serviceで Apache Flink アプリケーションをテストして、アプリケーションに必要なすべての機能がサポートされていることを確認することをお勧めします。

# Apache Beam を使用してアプリケーションを作成する
<a name="examples-beam"></a>

この課題では、「[Apache Beam](https://beam.apache.org/)」を使用してデータを変換する Apache Flink アプリケーション用 Managed Serviceを作成します。Apache Beam はストリーミングデータを処理するためのプログラミングモデルです。Apache Flink 用 Managed Service での Apache Beam の使用ついては、 [Managed Service for Apache Flink アプリケーションに Apache Beam を使用する](how-creating-apps-beam.md) を参照してください。

**注記**  
この演習に必要な前提条件を設定するには、まず[チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要](getting-started.md)演習を完了してください。

**Topics**
+ [

## 依存リソースを作成する
](#examples-beam-resources)
+ [

## 入力ストリームにサンプルレコードを書き込む
](#examples-beam-write)
+ [

## アプリケーションコードをダウンロードして調べる
](#examples-beam-download)
+ [

## アプリケーションコードのコンパイル
](#examples-beam-compile)
+ [

## Apache Flink Streaming Java Code のアップロードしてください
](#examples-beam-upload)
+ [

## Managed Service for Apache Flink アプリケーションを作成して実行する
](#examples-beam-create-run)
+ [

## AWS リソースをクリーンアップする
](#examples-beam-cleanup)
+ [

## 次の手順
](#examples-beam-nextsteps)

## 依存リソースを作成する
<a name="examples-beam-resources"></a>

この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。
+ 2 つの Kinesis Data Streams (`ExampleInputStream`と`ExampleOutputStream`)
+ アプリケーションのコードを保存するためのAmazon S3バケット (`ka-app-code-<username>`) 

Kinesis ストリームと Amazon S3 バケットは、コンソールを使用して作成できます。これらのリソースの作成手順については、次の各トピックを参照してください。
+ 「*Amazon Kinesis Data Streams デベロッパーガイド*」の「[Creating and Updating Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)」。データストリーム**ExampleInputStream**と**ExampleOutputStream**に名前を付けます。
+ 「*Amazon Simple Storage Service ユーザーガイド*」の「[How Do I Create an S3 Bucket?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)」。ログイン名 (**ka-app-code-*<username>*** など) を追加して、Amazon S3 バケットにグローバルに一意の名前を付けます。

## 入力ストリームにサンプルレコードを書き込む
<a name="examples-beam-write"></a>

このセクションでは、Python スクリプトを使用して、アプリケーションが処理するランダムな文字列をストリームに書き込みます。

**注記**  
このセクションでは [AWS SDK for Python (Boto)](https://aws.amazon.com/developers/getting-started/python/) が必要です。

1. 次の内容で、`ping.py` という名前のファイルを作成します。

   ```
   import json
   import boto3
   import random
   
   kinesis = boto3.client('kinesis')
   
   while True:
           data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat'])
           print(data)
           kinesis.put_record(
                   StreamName="ExampleInputStream",
                   Data=data,
                   PartitionKey="partitionkey")
   ```

1. `ping.py` スクリプトを実行します。

   ```
   $ python ping.py
   ```

   チュートリアルの残りの部分を完了する間、スクリプトを実行し続けてください。

## アプリケーションコードをダウンロードして調べる
<a name="examples-beam-download"></a>

この例の Java アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。

1. Git クライアントをまだインストールしていない場合は、インストールします。詳細については、「[Git のインストール](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)」をご参照ください。

1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

   ```
   git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
   ```

1. `amazon-kinesis-data-analytics-java-examples/Beam` ディレクトリに移動します。

アプリケーションコードは`BasicBeamStreamingJob.java`ファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。
+ アプリケーションは Apache Beam 「[ParDo](https://beam.apache.org/releases/javadoc/2.0.0/org/apache/beam/sdk/transforms/ParDo.html)」を使用して、 `PingPongFn` というカスタム変換関数を呼び出して受信レコードを処理します。

  `PingPongFn` 関数を呼び出すコードは次のとおりです。

  ```
  .apply("Pong transform",
      ParDo.of(new PingPongFn())
  ```
+ Apache Beam を使用する Apache Flink アプリケーション用 Managed Serviceには、以下のコンポーネントが必要です。これらのコンポーネントとバージョンを `pom.xml` に含めないと、アプリケーションは環境の依存関係から誤ったバージョンをロードし、バージョンが一致しないため、実行時にアプリケーションがクラッシュします。

  ```
  <jackson.version>2.10.2</jackson.version>
  ...
  <dependency>
      <groupId>com.fasterxml.jackson.module</groupId>
      <artifactId>jackson-module-jaxb-annotations</artifactId>
      <version>2.10.2</version>
  </dependency>
  ```
+ `PingPongFn` 変換関数は、入力データが ping でない限り、入力データを出力ストリームに渡します。「**ping**」である場合は、文字列「**pong\$1n**」を出力ストリームに出力します。

  変換関数のコードは以下のとおりです。

  ```
      private static class PingPongFn extends DoFn<KinesisRecord, byte[]> {
      private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class);
      
      @ProcessElement
      public void processElement(ProcessContext c) {
          String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8);
          if (content.trim().equalsIgnoreCase("ping")) {
              LOG.info("Ponged!");
              c.output("pong\n".getBytes(StandardCharsets.UTF_8));
          } else {
              LOG.info("No action for: " + content);
              c.output(c.element().getDataAsBytes());
          }
      }
  }
  ```

## アプリケーションコードのコンパイル
<a name="examples-beam-compile"></a>

アプリケーションをコンパイルするには、次の操作を行います。

1. Java と Maven がまだインストールされていない場合は、インストールします。詳細については、[チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要](getting-started.md)チュートリアルの「[必要な前提条件を完了する](getting-started.md#setting-up-prerequisites)」を参照してください。

1. 次のコマンドを使用して、アプリケーションをコンパイルします。

   ```
   mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
   ```
**注記**  
提供されているソースコードは Java 11 のライブラリーに依存しています。

アプリケーションをコンパイルすると、アプリケーション JAR ファイル (`target/basic-beam-app-1.0.jar`) が作成されます。

## Apache Flink Streaming Java Code のアップロードしてください
<a name="examples-beam-upload"></a>

このセクションでは、[依存リソースを作成する](#examples-beam-resources) のセクションで作成した Amazon S3 バケットにアプリケーションコードをアップロードします。

1. Amazon S3 コンソールで **ka-app-code-*<username>*** バケットを選択し、[**アップロード**] を選択します。

1. **ファイルの選択**のステップで、[**ファイルを追加**] を選択します。前のステップで作成した `basic-beam-app-1.0.jar` ファイルに移動します。

1. オブジェクトの設定を変更する必要はないので、[**アップロード**] を選択してください。

アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。

## Managed Service for Apache Flink アプリケーションを作成して実行する
<a name="examples-beam-create-run"></a>

以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。

### アプリケーションの作成
<a name="examples-beam-create"></a>

1. にサインインし AWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

1. Managed Service for Apache Flinkのダッシュボードで、「**分析アプリケーションの作成**」 を選択します。

1. 「**Managed Service for Apache Flink-アプリケーションの作成**」ページで、次のようにアプリケーションの詳細を入力します。
   + [**アプリケーション名**] には **MyApplication** と入力します。
   + [**ランタイム**] には、[**Apache Flink**] を選択します。
**注記**  
Apache Beam は現在、Apache Flink バージョン 1.19 以降と互換性がありません。
   + バージョンプルダウンで **[Apache Flink バージョン 1.15]** を選択します。

1. [**アクセス許可**] には、[**IAM ロールの作成 / 更新`kinesis-analytics-MyApplication-us-west-2`**] を選択します。

1. [**アプリケーションを作成**] を選択します。

**注記**  
コンソールを使用して Apache Flink アプリケーション用 Managed Service を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。  
ポリシー: `kinesis-analytics-service-MyApplication-us-west-2`
ロール: `kinesis-analytics-MyApplication-us-west-2`

### IAM ポリシーを編集する
<a name="get-started-exercise-7-console-iam"></a>

IAM ポリシーを編集し、Kinesis Data Streamsにアクセスするための許可を追加します。

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. **[ポリシー]** を選択します。前のセクションでコンソールによって作成された **`kinesis-analytics-service-MyApplication-us-west-2`** ポリシーを選択します。

1. [**概要**] ページで、[**ポリシーの編集**] を選択します。**JSON** タブを選択します。

1. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (*012345678901*) を自分のアカウント ID に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "logs:DescribeLogGroups",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*",
                   "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar"
               ]
           },
           {
               "Sid": "DescribeLogStreams",
               "Effect": "Allow",
               "Action": "logs:DescribeLogStreams",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
           },
           {
               "Sid": "PutLogEvents",
               "Effect": "Allow",
               "Action": "logs:PutLogEvents",
               "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:us-west-2:012345678901:log-group:*"
               ]
           },
           {
               "Sid": "ReadInputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream"
           },
           {
               "Sid": "WriteOutputStream",
               "Effect": "Allow",
               "Action": "kinesis:*",
               "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream"
           }
       ]
   }
   ```

------

### アプリケーションを設定する
<a name="examples-beam-configure"></a>

1. [**MyApplication**] ページで、[**Congirue**] を選択します。

1. [**Configure application**] ページで、[**Code location**] を次のように指定します。
   + [**Amazon S3 バケット**] で、**ka-app-code-*<username>***と入力します。
   + [**Amazon S3 オブジェクトへのパス**] で、**basic-beam-app-1.0.jar**と入力します。

1. [**Access to application resources**] の [**Access permissions**] では、[**Create / update IAM role`kinesis-analytics-MyApplication-us-west-2`**] を選択します。

1. 次のように入力します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/examples-beam.html)

1. [**Monitoring**] の [**Monitoring metrics level**] が [**Application**] に設定されていることを確認します。

1. [**CloudWatch logging**] では、[**Enable**] チェックボックスをオンにします。

1. **[更新]** を選択します。

**注記**  
CloudWatch ログ記録の有効化を選択すると、Managed Service for Apache Flink がユーザーに代わってロググループとログストリームを作成します。これらのリソースの名前は次のとおりです。  
ロググループ: `/aws/kinesis-analytics/MyApplication`
ログストリーム: `kinesis-analytics-log-stream`
このログストリームはアプリケーションのモニターに使用されます。このログストリームは、アプリケーションの結果の送信に使用されたログストリームとは異なります。

### アプリケーションを実行する
<a name="examples-beam-run"></a>

Flink ジョブグラフは、アプリケーションを実行し、Apache Flink ダッシュボードを開き、目的の Flink ジョブを選択すると表示できます。

CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが機能していることを確認できます。

## AWS リソースをクリーンアップする
<a name="examples-beam-cleanup"></a>

このセクションでは、タンブリングウィンドウチュートリアルで作成した AWS リソースをクリーンアップする手順について説明します。

**Topics**
+ [

### Managed Service for Apache Flink アプリケーションを削除する
](#examples-beam-cleanup-app)
+ [

### Kinesis Data Streams を削除する
](#examples-beam-cleanup-stream)
+ [

### Amazon S3 オブジェクトとバケットを削除する
](#examples-beam-cleanup-s3)
+ [

### IAM リソースを削除する
](#examples-beam-cleanup-iam)
+ [

### CloudWatch リソースを削除する
](#examples-beam-cleanup-cw)

### Managed Service for Apache Flink アプリケーションを削除する
<a name="examples-beam-cleanup-app"></a>

1. にサインインし AWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

1. Apache Flink 用 Managed Serviceパネルで、**MyApplication** を選択します。

1. アプリケーションのページで[**削除**]を選択し、削除を確認します。

### Kinesis Data Streams を削除する
<a name="examples-beam-cleanup-stream"></a>

1. 「[https://console.aws.amazon.com/kinesis](https://console.aws.amazon.com/kinesis)」で Kinesis コンソールを開きます。

1. Kinesis Data Streams パネルで、「**ExampleInputStream**」を選択します。

1. 「**ExampleInputStream**」ページで、「**Kinesis ストリームを削除**」を選択し、削除を確定します。

1. **[Kinesis ストリーム]** ページで **[ExampleOutputStream]** を選択し、**[アクション]** を選択してから **[削除]** を選択して、削除を確定します。

### Amazon S3 オブジェクトとバケットを削除する
<a name="examples-beam-cleanup-s3"></a>

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. **ka-app-code-*<username＞*バケット**を選択します。

1. **[削除]** を選択し、バケット名を入力して削除を確認します。

### IAM リソースを削除する
<a name="examples-beam-cleanup-iam"></a>

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. ナビゲーションバーで、[**ポリシー**] を選択します。

1. フィルターコントロールに「**kinesis**」と入力します。

1. 「**kinesis-analytics-service-MyApplication-us-west-2**」ポリシーを選択します。

1. [**ポリシーアクション**]、[**削除**] の順に選択します。

1. ナビゲーションバーで ［**ロール**］を選択します。

1. 「**kinesis-analytics-MyApplication-us-west-2**」ロールを選択します。

1. **[ロールの削除]** を選択し、削除を確定します。

### CloudWatch リソースを削除する
<a name="examples-beam-cleanup-cw"></a>

1. CloudWatch コンソールの [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/) を開いてください。

1. ナビゲーションバーで [**ログ**] を選択します。

1. 「**/aws/kinesis-analytics/MyApplication**」ロググループを選択してください。

1. [**ロググループの削除**]を選択し、削除を確認してください。

## 次の手順
<a name="examples-beam-nextsteps"></a>

Apache Beam を使用してデータを変換する基本的なApache Flink アプリケーション用 Managed Service を作成し、実行しますた。次に、より高度なApache Flink ソリューション用 Managed Service の例として次のアプリケーションをご覧ください。
+ 「**[Apache Flink Streaming Workshop 用 Managed Service上のビーム](https://streaming-analytics.workshop.aws/beam-on-kda/)**」:このワークショップでは、バッチとストリーミングを1つの Apache Beam パイプラインに統合したエンド・ツー・エンドの例について説明します。