Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
開始方法 (Scala)
注記
バージョン 1.15 以降、Flink は Scala フリーになりました。アプリケーションが Scala の任意のバージョンから Java API を使用できるようになっています。Flink はまだ内部的にいくつかの主要コンポーネントで Scala を使用していますが、Scala をユーザーコードのクラスローダーに公開していません。そのため、Scala の依存関係を JAR アーカイブに追加する必要があります。
Flink 1.15 での Scala の変更についての詳しい情報は、Scala Free in One Fifteen
この演習では、Kinesis ストリームをソースおよびシンクとして使用して、Scala 用の Managed Service for Apache Flink アプリケーションを作成します。
このトピックには、次のセクションが含まれています。
依存リソースを作成する
この練習用の Managed Service for Apache Flink を作成する前に、以下の依存リソースを作成します。
入力用と出力用の 2 つの Kinesis ストリーム。
アプリケーションのコードを保存するためのAmazon S3バケット (
ka-app-code-)<username>
Kinesis ストリームと Amazon S3 バケットは、コンソールを使用して作成できます。これらのリソースの作成手順については、次の各トピックを参照してください。
「Amazon Kinesis Data Streams デベロッパーガイド」の「Creating and Updating Data Streams」。データストリーム
ExampleInputStreamとExampleOutputStreamに名前を付けます。データストリームを作成するには (AWS CLI)
次の Amazon Kinesis create-stream AWS CLI コマンドを使用して、1 つ目のストリーム (
ExampleInputStream) を作成します。aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuserアプリケーションが出力の書き込みに使用する 2 つめのストリームを作成するには、ストリーム名を
ExampleOutputStreamに変更して同じコマンドを実行します。aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
「Amazon Simple Storage Service ユーザーガイド」の「How Do I Create an S3 Bucket?」。ログイン名 (
ka-app-code-など) を追加して、Amazon S3 バケットにグローバルに一意の名前を付けます。<username>
その他のリソース
アプリケーションを作成すると、Apache Flink 用 Managed Service によって次の Amazon CloudWatch リソースが作成されます(これらのリソースがまだ存在しない場合)。
/AWS/KinesisAnalytics-java/MyApplicationという名前のロググループ。kinesis-analytics-log-streamというログストリーム
入力ストリームにサンプルレコードを書き込む
このセクションでは、Python スクリプトを使用して、アプリケーションが処理するサンプルレコードをストリームに書き込みます。
注記
このセクションでは AWS SDK for Python (Boto)
注記
このセクションの Python スクリプトでは、AWS CLI を使用しています。アカウント認証情報とデフォルトリージョンを使用するように AWS CLI を設定する必要があります。AWS CLI を設定するには、次の内容を入力します。
aws configure
-
次の内容で、
stock.pyという名前のファイルを作成します。import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2')) -
stock.pyスクリプトを実行します。$ python stock.pyチュートリアルの残りの部分を完了する間、スクリプトを実行し続けてください。
アプリケーションコードをダウンロードして調べる
この例の Python アプリケーションコードは GitHub から入手できます。アプリケーションコードをダウンロードするには、次の操作を行います。
Git クライアントをまだインストールしていない場合は、インストールします。詳細については、「Git のインストール
」をご参照ください。 次のコマンドを使用してリモートリポジトリのクローンを作成します。
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.gitamazon-kinesis-data-analytics-java-examples/scala/GettingStartedディレクトリに移動します。
アプリケーションコードに関して、以下の点に注意してください。
build.sbtファイルには、Managed Service for Apache Flink ライブラリなど、アプリケーションの設定と依存関係に関する情報が含まれています。この
BasicStreamingJob.scalaファイルには、アプリケーションの機能を定義するメインメソッドが含まれています。アプリケーションは Kinesis ソースを使用して、ソースストリームから読み取りを行います。次のスニペットでは、Kinesis ソースが作成されます。
private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }また、アプリケーションは Kinesis シンクを使用して結果ストリームに書き込みます。次のスニペットでは、Kinesis シンクが作成されます。
private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }アプリケーションでは、ソースおよびシンクコネクタを作成し、 オブジェクトを使用して外部リソースにアクセスします。
アプリケーションは、動的アプリケーションプロパティを使用してソースコネクタとシンクコネクタを作成します。アプリケーションのプロパティを読み取ってコネクタを設定します。ランタイムプロパティの詳細については、ランタイムプロパティを参照してください。
アプリケーション・コードをコンパイルしてアップロードするには
このセクションでは、アプリケーションコードをコンパイルし、依存リソースを作成する セクションで作成したAmazon S3バケットにアップロードします。
アプリケーションコードのコンパイル
このセクションでは、「SBT
アプリケーションコードを使用するには、コードをコンパイルして JAR ファイルにパッケージ化します。SBT を使用してコードをコンパイルしてパッケージ化できます。
sbt assembly-
アプリケーションのコンパイルに成功すると、次のファイルが作成されます。
target/scala-3.2.0/getting-started-scala-1.0.jar
Apache Flink Streaming Scala Code のアップロード
このセクションでは、Amazon S3 バケットを作成し、アプリケーションコードをアップロードします。
Amazon S3 コンソール (https://console.aws.amazon.com/s3/
) を開きます。 [バケットを作成] を選択します。
[Bucket name (バケット名)] フィールドに
ka-app-code-<username>と入力します。バケット名にユーザー名などのサフィックスを追加して、グローバルに一意にします。[次へ] を選択します。設定オプションのステップでは、設定をそのままにし、[次へ] を選択します。
アクセス許可の設定のステップでは、設定をそのままにし、[次へ] を選択します。
[バケットを作成] を選択します。
ka-app-code-<username>バケットを選択し、アップロード を選択します。-
ファイルの選択のステップで、[ファイルを追加] を選択します。前のステップで作成した
getting-started-scala-1.0.jarファイルに移動します。 オブジェクトの設定を変更する必要はないので、[アップロード] を選択してください。
アプリケーションコードが Amazon S3 バケットに保存され、アプリケーションからアクセスできるようになります。