Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Apache Beam を使用してアプリケーションを作成する
この課題では、「Apache Beam
注記
この演習に必要な前提条件を設定するには、まずチュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要演習を完了してください。
このトピックには、次のセクションが含まれています。
依存リソースを作成する
この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。
2 つの Kinesis Data Streams (
ExampleInputStreamとExampleOutputStream)アプリケーションのコードを保存するためのAmazon S3バケット (
ka-app-code-)<username>
Kinesis ストリームと Amazon S3 バケットは、コンソールを使用して作成できます。これらのリソースの作成手順については、次の各トピックを参照してください。
「Amazon Kinesis Data Streams デベロッパーガイド」の「Creating and Updating Data Streams」。データストリーム
ExampleInputStreamとExampleOutputStreamに名前を付けます。「Amazon Simple Storage Service ユーザーガイド」の「How Do I Create an S3 Bucket?」。ログイン名 (
ka-app-code-など) を追加して、Amazon S3 バケットにグローバルに一意の名前を付けます。<username>
入力ストリームにサンプルレコードを書き込む
このセクションでは、Python スクリプトを使用して、アプリケーションが処理するランダムな文字列をストリームに書き込みます。
注記
このセクションでは AWS SDK for Python (Boto)
-
次の内容で、
ping.pyという名前のファイルを作成します。import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey") -
ping.pyスクリプトを実行します。$ python ping.pyチュートリアルの残りの部分を完了する間、スクリプトを実行し続けてください。
アプリケーションコードをダウンロードして調べる
この例の Java アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。
Git クライアントをまだインストールしていない場合は、インストールします。詳細については、「Git のインストール
」をご参照ください。 次のコマンドを使用してリモートリポジトリのクローンを作成します。
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.gitamazon-kinesis-data-analytics-java-examples/Beamディレクトリに移動します。
アプリケーションコードはBasicBeamStreamingJob.javaファイルに含まれています。アプリケーションコードに関して、以下の点に注意してください。
アプリケーションは Apache Beam 「ParDo
」を使用して、 PingPongFnというカスタム変換関数を呼び出して受信レコードを処理します。PingPongFn関数を呼び出すコードは次のとおりです。.apply("Pong transform", ParDo.of(new PingPongFn())Apache Beam を使用する Apache Flink アプリケーション用 Managed Serviceには、以下のコンポーネントが必要です。これらのコンポーネントとバージョンを
pom.xmlに含めないと、アプリケーションは環境の依存関係から誤ったバージョンをロードし、バージョンが一致しないため、実行時にアプリケーションがクラッシュします。<jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>PingPongFn変換関数は、入力データが ping でない限り、入力データを出力ストリームに渡します。「ping」である場合は、文字列「pong\n」を出力ストリームに出力します。変換関数のコードは以下のとおりです。
private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }
アプリケーションコードのコンパイル
アプリケーションをコンパイルするには、次の操作を行います。
Java と Maven がまだインストールされていない場合は、インストールします。詳細については、チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要チュートリアルの「必要な前提条件を完了する」を参照してください。
次のコマンドを使用して、アプリケーションをコンパイルします。
mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8注記
提供されているソースコードは Java 11 のライブラリーに依存しています。
アプリケーションをコンパイルすると、アプリケーション JAR ファイル (target/basic-beam-app-1.0.jar) が作成されます。
Apache Flink Streaming Java Code のアップロードしてください
このセクションでは、依存リソースを作成する のセクションで作成した Amazon S3 バケットにアプリケーションコードをアップロードします。
-
Amazon S3 コンソールで ka-app-code-
<username>バケットを選択し、[アップロード] を選択します。 -
ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した
basic-beam-app-1.0.jarファイルに移動します。 オブジェクトの設定を変更する必要はないので、[アップロード] を選択してください。
アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。
Managed Service for Apache Flink アプリケーションを作成して実行する
以下の手順を実行し、コンソールを使用してアプリケーションを作成、設定、更新、および実行します。
アプリケーションの作成
にサインインしAWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。
-
Managed Service for Apache Flinkのダッシュボードで、「分析アプリケーションの作成」 を選択します。
-
「Managed Service for Apache Flink-アプリケーションの作成」ページで、次のようにアプリケーションの詳細を入力します。
-
[アプリケーション名] には
MyApplicationと入力します。 -
[ランタイム] には、[Apache Flink] を選択します。
注記
Apache Beam は現在、Apache Flink バージョン 1.19 以降と互換性がありません。
バージョンプルダウンで [Apache Flink バージョン 1.15] を選択します。
-
-
[アクセス許可] には、[IAM ロールの作成 / 更新
kinesis-analytics-MyApplication-us-west-2] を選択します。 -
[アプリケーションを作成] を選択します。
注記
コンソールを使用して Apache Flink アプリケーション用 Managed Service を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。
-
ポリシー:
kinesis-analytics-service-MyApplication-us-west-2 -
ロール:
kinesis-analytics-MyApplication-us-west-2
IAM ポリシーを編集する
IAM ポリシーを編集し、Kinesis Data Streamsにアクセスするための許可を追加します。
IAM コンソール (https://console.aws.amazon.com/iam/
) を開きます。 -
[ポリシー] を選択します。前のセクションでコンソールによって作成された
kinesis-analytics-service-MyApplication-us-west-2ポリシーを選択します。 -
[概要] ページで、[ポリシーの編集] を選択します。JSON タブを選択します。
-
次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルのアカウント ID (
012345678901) を自分のアカウント ID に置き換えます。
アプリケーションを設定する
-
[MyApplication] ページで、[Congirue] を選択します。
-
[Configure application] ページで、[Code location] を次のように指定します。
-
[Amazon S3 バケット] で、
ka-app-code-と入力します。<username> -
[Amazon S3 オブジェクトへのパス] で、
basic-beam-app-1.0.jarと入力します。
-
-
[Access to application resources] の [Access permissions] では、[Create / update IAM role
kinesis-analytics-MyApplication-us-west-2] を選択します。 -
次のように入力します。
グループ ID キー 値 BeamApplicationPropertiesInputStreamNameExampleInputStreamBeamApplicationPropertiesOutputStreamNameExampleOutputStreamBeamApplicationPropertiesAwsRegionus-west-2 -
[Monitoring] の [Monitoring metrics level] が [Application] に設定されていることを確認します。
-
[CloudWatch logging] では、[Enable] チェックボックスをオンにします。
-
[更新] を選択します。
注記
CloudWatch ログ記録の有効化を選択すると、Managed Service for Apache Flink がユーザーに代わってロググループとログストリームを作成します。これらのリソースの名前は次のとおりです。
-
ロググループ:
/aws/kinesis-analytics/MyApplication -
ログストリーム:
kinesis-analytics-log-stream
このログストリームはアプリケーションのモニターに使用されます。このログストリームは、アプリケーションの結果の送信に使用されたログストリームとは異なります。
アプリケーションを実行する
Flink ジョブグラフは、アプリケーションを実行し、Apache Flink ダッシュボードを開き、目的の Flink ジョブを選択すると表示できます。
CloudWatch コンソールで Managed Service for Apache Flink メトリクスをチェックして、アプリケーションが機能していることを確認できます。
AWSリソースをクリーンアップする
このセクションでは、「タンブリングウィンドウ」チュートリアルで作成したAWSリソースをクリーンアップする手順について説明します。
このトピックには、次のセクションが含まれています。
Managed Service for Apache Flink アプリケーションを削除する
にサインインしAWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。
Apache Flink 用 Managed Serviceパネルで、MyApplication を選択します。
アプリケーションのページで[削除]を選択し、削除を確認します。
Kinesis Data Streams を削除する
「https://console.aws.amazon.com/kinesis
」で Kinesis コンソールを開きます。 Kinesis Data Streams パネルで、「ExampleInputStream」を選択します。
「ExampleInputStream」ページで、「Kinesis ストリームを削除」を選択し、削除を確定します。
[Kinesis ストリーム] ページで [ExampleOutputStream] を選択し、[アクション] を選択してから [削除] を選択して、削除を確定します。
Amazon S3 オブジェクトとバケットを削除する
Amazon S3 コンソール (https://console.aws.amazon.com/s3/
) を開きます。 ka-app-code-
<username>バケットを選択します。[削除] を選択し、バケット名を入力して削除を確認します。
IAM リソースを削除する
IAM コンソール (https://console.aws.amazon.com/iam/
) を開きます。 ナビゲーションバーで、[ポリシー] を選択します。
フィルターコントロールに「kinesis」と入力します。
「kinesis-analytics-service-MyApplication-us-west-2」ポリシーを選択します。
[ポリシーアクション]、[削除] の順に選択します。
ナビゲーションバーで [ロール]を選択します。
「kinesis-analytics-MyApplication-us-west-2」ロールを選択します。
[ロールの削除] を選択し、削除を確定します。
CloudWatch リソースを削除する
CloudWatch コンソールの https://console.aws.amazon.com/cloudwatch/
を開いてください。 ナビゲーションバーで [ログ] を選択します。
「/aws/kinesis-analytics/MyApplication」ロググループを選択してください。
[ロググループの削除]を選択し、削除を確認してください。
次の手順
Apache Beam を使用してデータを変換する基本的なApache Flink アプリケーション用 Managed Service を作成し、実行しますた。次に、より高度なApache Flink ソリューション用 Managed Service の例として次のアプリケーションをご覧ください。
「Apache Flink Streaming Workshop 用 Managed Service上のビーム
」:このワークショップでは、バッチとストリーミングを1つの Apache Beam パイプラインに統合したエンド・ツー・エンドの例について説明します。