Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
Studio ノートブックでデータを分析するため、クエリの例を表示する
以下のクエリ例は、Studio ノートブックでウィンドウクエリを使用してデータを分析する方法を示しています。
Apache Flink SQL クエリ設定の情報については、「インタラクティブなデータ分析のための Zeppelin ノートブック上の Flink
Apache Flink ダッシュボードでアプリケーションを表示するには、アプリケーションの「Zeppelin Note」ページで「FLINK JOB」を選択します。
ウィンドウクエリの詳細については、「Apache Flink ドキュメント
Apache Flink Streaming SQL クエリの他の例については、「Apache Flink ドキュメント
Amazon MSK/Apache Kafka でテーブルを作成する
Apache Flink Studio 用 Managed Service を搭載した Amazon MSK Flink コネクタを使用して、Plaintext、SSL または IAM 認証で接続を認証できます。要件に応じて特定のプロパティを使用してテーブルを作成します。
-- Plaintext connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- SSL connection CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SSL', 'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts', 'properties.ssl.truststore.password' = 'changeit', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' ); -- IAM connection (or for MSK Serverless) CREATE TABLE your_table ( `column1` STRING, `column2` BIGINT ) WITH ( 'connector' = 'kafka', 'topic' = 'your_topic', 'properties.bootstrap.servers' = '<bootstrap servers>', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'AWS_MSK_IAM', 'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;', 'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler', 'properties.group.id' = 'myGroup', 'scan.startup.mode' = 'earliest-offset', 'format' = 'json' );
これらのプロパティは、「Apache Kafka SQL Connector
Kinesis でテーブルを作成する
次の例では、Kinesis を使用してテーブルを作成します。
CREATE TABLE KinesisTable ( `column1` BIGINT, `column2` BIGINT, `column3` BIGINT, `column4` STRING, `ts` TIMESTAMP(3) ) PARTITIONED BY (column1, column2) WITH ( 'connector' = 'kinesis', 'stream' = 'test_stream', 'aws.region' = '<region>', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' );
使用可能な他のプロパティの詳細については、「Amazon Kinesis Data Streams SQL Connector
タンブリングウィンドウをクエリする
次の Flink Streaming SQL クエリは、 ZeppelinTopic テーブルから 5 秒ごとのタンブリングウィンドウの最大値を選択します。
%flink.ssql(type=update) SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker FROM ZeppelinTopic GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
スライディングウィンドウをクエリする
次の Apache Flink Streaming SQL クエリは、 ZeppelinTopic テーブルから 5 秒ごとのスライディングウィンドウの最大値を選択します。
%flink.ssql(type=update) SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max FROM ZeppelinTopic//or your table name in AWS Glue GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
インタラクティブ SQL を使用する
この例では、イベント時間と処理時間の最大値、キー値テーブルの値の合計を出力します。Scala を使用してサンプルデータを生成する のサンプル・データ生成スクリプトが実行されていることを確認します。Studio ノートブックでフィルタリングや結合などの他の SQL クエリを試すには、Apache Flink ドキュメントのApache Flink ドキュメント:「クエリ
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time. SELECT MAX(`et`) as `et`, MAX(`pt`) as `pt`, SUM(`value`) as `sum` FROM `key-values`
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive tumbling window query that displays the number of records observed per (event time) second. -- Browse through the chart views to see different visualizations of the streaming result. SELECT TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`, `key`, SUM(`value`) as `sum` FROM `key-values` GROUP BY TUMBLE(`et`, INTERVAL '1' SECONDS), `key`;
BlackHole SQL コネクタを使用する
BlackHole SQL コネクタでは、クエリをテストするために Kinesis データストリームや Amazon MSK クラスターを作成する必要はありません。BlackHole SQL コネクタの詳細については、Apache Flink ドキュメントの「BlackHole SQL Connector
%flink.ssql CREATE TABLE default_catalog.default_database.blackhole_table ( `key` BIGINT, `value` BIGINT, `et` TIMESTAMP(3) ) WITH ( 'connector' = 'blackhole' )
%flink.ssql(parallelism=1) INSERT INTO `test-target` SELECT `key`, `value`, `et` FROM `test-source` WHERE `key` > 3
%flink.ssql(parallelism=2) INSERT INTO `default_catalog`.`default_database`.`blackhole_table` SELECT `key`, `value`, `et` FROM `test-target` WHERE `key` > 7
Scala を使用してサンプルデータを生成する
この例では Scala を使用してサンプルデータを生成します。このサンプルデータを使用して、さまざまなクエリをテストできます。テーブル作成ステートメントを使用して key-values テーブルを作成します。
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator import org.apache.flink.streaming.api.scala.DataStream import java.sql.Timestamp // ad-hoc convenience methods to be defined on Table implicit class TableOps[T](table: DataStream[T]) { def asView(name: String): DataStream[T] = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView("`" + name + "`") } stenv.createTemporaryView("`" + name + "`", table) return table; } }
%flink(parallelism=4) val stream = senv .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000)) .map(key => (key, 1, new Timestamp(System.currentTimeMillis))) .asView("key-values-data-generator")
%flink.ssql(parallelism=4) -- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)") -- in this case the INSERT query will inherit the parallelism of the of the above paragraph INSERT INTO `key-values` SELECT `_1` as `key`, `_2` as `value`, `_3` as `et` FROM `key-values-data-generator`
インタラクティブ Scala を使用する
これは インタラクティブ SQL を使用する の Scala 翻訳です。Scala の他の例については、Apache Flink ドキュメントの「Table API
%flink import org.apache.flink.api.scala._ import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=4) // A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time. val query01 = stenv .from("`key-values`") .select( $"et".max().as("et"), $"pt".max().as("pt"), $"value".sum().as("sum") ).asView("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink(parallelism=4) // An tumbling window view that displays the number of records observed per (event time) second. val query02 = stenv .from("`key-values`") .window(Tumble over 1.seconds on $"et" as $"w") .groupBy($"w", $"key") .select( $"w".start.as("window"), $"key", $"value".sum().as("sum") ).asView("query02")
%flink.ssql(type=update, parallelism=4, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`
インタラクティブ Python を使用する
これは インタラクティブ SQL を使用する の Python 翻訳です。Python の他のサンプルについては、Apache Flink ドキュメントの「Table API
%flink.pyflink from pyflink.table.table import Table def as_view(table, name): if (name in st_env.list_temporary_views()): st_env.drop_temporary_view(name) st_env.create_temporary_view(name, table) return table Table.as_view = as_view
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`keyvalues`") \ .select(", ".join([ "max(et) as et", "max(pt) as pt", "sum(value) as sum" ])) \ .as_view("query01")
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>) -- An interactive query prints the query01 output. SELECT * FROM query01
%flink.pyflink(parallelism=16) # A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time st_env \ .from_path("`key-values`") \ .window(Tumble.over("1.seconds").on("et").alias("w")) \ .group_by("w, key") \ .select(", ".join([ "w.start as window", "key", "sum(value) as sum" ])) \ .as_view("query02")
%flink.ssql(type=update, parallelism=16, refreshInterval=1000) -- An interactive query prints the query02 output. -- Browse through the chart views to see different visualizations of the streaming result. SELECT * FROM `query02`
インタラクティブ Python、SQL、Scala の組み合わせを使用する
ノートブックでは、SQL、Python、Scala を自由に組み合わせてインタラクティブな分析を行うことができます。永続的な状態を持つアプリケーションとしてデプロイする予定の Studio ノートブックでは、SQL と Scala を組み合わせて使用できます。この例では、無視されるセクションと、永続的な状態でアプリケーションにデプロイされるセクションを示しています。
%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-source` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-source-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink.ssql CREATE TABLE `default_catalog`.`default_database`.`my-test-target` ( `key` BIGINT NOT NULL, `value` BIGINT NOT NULL, `et` TIMESTAMP(3) NOT NULL, `pt` AS PROCTIME(), WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kinesis', 'stream' = 'kda-notebook-example-test-target-stream', 'aws.region' = 'eu-west-1', 'scan.stream.initpos' = 'LATEST', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )
%flink() // ad-hoc convenience methods to be defined on Table implicit class TableOps(table: Table) { def asView(name: String): Table = { if (stenv.listTemporaryViews.contains(name)) { stenv.dropTemporaryView(name) } stenv.createTemporaryView(name, table) return table; } }
%flink(parallelism=1) val table = stenv .from("`default_catalog`.`default_database`.`my-test-source`") .select($"key", $"value", $"et") .filter($"key" > 10) .asView("query01")
%flink.ssql(parallelism=1) -- forward data INSERT INTO `default_catalog`.`default_database`.`my-test-target` SELECT * FROM `query01`
%flink.ssql(type=update, parallelism=1, refreshInterval=1000) -- forward data to local stream (ignored when deployed as application) SELECT * FROM `query01`
%flink // tell me the meaning of life (ignored when deployed as application!) print("42!")
クロスアカウント Kinesis データストリームを使用する
Studio ノートブックを所有するアカウント以外のアカウントにおける Kinesis データ・ストリームを使用するには、Studio ノートブックが実行されているアカウントにサービス実行ロールを作成し、データストリームを所有するアカウントにロール信頼ポリシーを作成します。Create table DDL ステートメントの Kinesis コネクタで aws.credentials.provider、aws.credentials.role.arn、aws.credentials.role.sessionName を使用して、データストリームに対してテーブルを作成します。
Studio ノートブックアカウントには、次のサービス実行ロールを使用します。
{ "Sid": "AllowNotebookToAssumeRole", "Effect": "Allow", "Action": "sts:AssumeRole" "Resource": "*" }
データストリームアカウントには、 AmazonKinesisFullAccess ポリシーと以下のロール信頼ポリシーを使用してください。
create table ステートメントには以下の段落を使用してます。
%flink.ssql CREATE TABLE test1 ( name VARCHAR, age BIGINT ) WITH ( 'connector' = 'kinesis', 'stream' = 'stream-assume-role-test', 'aws.region' = 'us-east-1', 'aws.credentials.provider' = 'ASSUME_ROLE', 'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role', 'aws.credentials.role.sessionName' = 'stream-assume-role-test-session', 'scan.stream.initpos' = 'TRIM_HORIZON', 'format' = 'json' )