Kinesis Data Streams を使用した Studio ノートブックの作成 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

Kinesis Data Streams を使用した Studio ノートブックの作成

このチュートリアルでは、Kinesis Data Stream をソースとして使用する Studio ノートブックを作成する方法について説明します。

前提条件を満たす

Studio ノートブックを作成する前に、Kinesis データストリーム (ExampleInputStream) を作成します。アプリケーションはこのストリームをアプリケーションソースとして使用します。

このストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールの操作方法については、「Amazon Kinesis Data Streams デベロッパーガイド」の「Creating and Updating Data Streams」を参照してください。ストリーム ExampleInputStream に名前を付け、[オープンシャード数] を 1 に設定します。

AWS CLI を使用してストリーム (ExampleInputStream) を作成するには、次の Amazon Kinesis create-streamAWS CLI コマンドを使用してします。

$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser

AWS Glue テーブルの作成

Studio ノートブックは、Kinesis Data Streams データソースに関するメタデータに「AWS Glue」データベースを使用します。

注記

データベースは最初に手動で作成することも、ノートブックの作成時に Apache Flink 用 Managed Service に自動的に作成させることもできます。同様に、このセクションで説明されているように手動でテーブルを作成することも、Apache Zeppelin 内のノートブックで Apache Flink 用 Managed Service のテーブル作成コネクタコードで DDL ステートメントを使用してテーブルを作成することもできます。次に、AWS Glue にチェックインして、テーブルが正しく作成されたことを確認できます。

テーブルを作成する
  1. AWS マネジメントコンソールにサインインし、AWS Glue コンソール (https://console.aws.amazon.com/glue/) を開きます。

  2. 「AWS Glue」データベースをまだお持ちでない場合は、左側のナビゲーションバーから「データベース」を選択します。[データベースの追加] を選択します。[データベースの追加] ウィンドウで、[データベース名] に default を入力します。[作成] を選択します。

  3. 左のナビゲーションバーで、[テーブル] を選択します。「テーブル」ページで、「テーブルを追加」、「テーブルを手動で追加」を選択します。

  4. テーブルのプロパティの設定」ページで、「テーブル名」に stock を入力します。以前に作成したデータベースを選択していることを確認してください。[次へ] を選択します。

  5. [データストアの追加] ページで、[Kinesis] を選択します。[Stream name](ストリーム名)に ExampleInputStream を入力します。[Kinesis ソース URL] には、 https://kinesis.us-east-1.amazonaws.com の入力を選択します。[Kinesis ソース URL] をコピーして貼り付ける場合は、先頭または末尾のスペースを必ず削除してください。[次へ] を選択します。

  6. 分類」ページで「JSON」を選択します。[次へ] を選択します。

  7. スキーマを定義するで、[Add column] を編集して列を追加します。以下のプロパティを持つ列を追加します。

    列名 データ型
    ticker 文字列
    料金 double

    [次へ] を選択します。

  8. 次のページで設定を確認し、「終了」を選択します。

  9. テーブルの一覧で、新しく作成したテーブルを選択します。

  10. テーブル編集」を選択し、キー managed-flink.proctime と値 proctime を含むプロパティを追加します。

  11. [Apply] (適用) を選択します。

Kinesis Data Streams を使用した Studio ノートブックの作成

アプリケーションで使用するリソースを作成したので、次は Studio ノートブックを作成します。

アプリケーションを作成するには、 AWS マネジメントコンソール または AWS CLI を使用することができます。

AWS マネジメントコンソール を使用して Studio ノートブックを作成します

  1. https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard」にある Apache Flink コンソール用 Managed Service を開きます。

  2. Apache Flink アプリケーション用 Managed Service」ページで、「Studio」タブを選択します。[Studio ノートブックの作成] を選択します。

    注記

    入力する Amazon MSK クラスターまたは Kinesis Data Streams を選択して [リアルタイムでデータを処理] を選択することで、Amazon MSK または Kinesis Data Streams コンソールから Studio ノートブックを作成することもできます。

  3. [Studio ノートブックの作成] ページで、次の情報を入力します。

    • ノートブックの名前に「MyNotebook」を入力します。

    • AWSGlue データベース」の「デフォルト」を選択します。

    [Studio ノートブックの作成] を選択します。

  4. MyNotebook」 ページで、[実行] を選択します。「ステータス」に「実行中」が表示されるまで待ちます。ノートブックの実行中は料金が発生します。

AWS CLI を使用して Studio ノートブックを作成します

AWS CLI を使用して Studio ノートブックを作成するには、次の手順に従います。

  1. アカウント ID を確認します。アプリケーションを作成する際にこの値が必要になります。

  2. ロール arn:aws:iam::AccountID:role/ZeppelinRole を作成し、コンソールで自動作成されたロールに以下の権限を追加します。

    "kinesis:GetShardIterator",

    "kinesis:GetRecords",

    "kinesis:ListShards"

  3. create.json というファイルを次の内容で作成します。プレースホルダー値を、ユーザー自身の情報に置き換えます。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  4. アプリケーションを作成するには、次のコマンドを実行します。

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  5. コマンドが完了すると、新しい Studio ノートブックの詳細を示す出力結果が表示されます。次は出力の例です。

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  6. アプリケーションを起動するには、次のコマンドを実行します。サンプル値をアカウント ID に置き換えます。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Kinesis データストリームへのデータ送信

Kinesis データストリームにテストデータを送信するには、次の手順に従います。

  1. Kinesis Data Generator を開きます。

  2. [CloudFormation を使用して CognitoUser を作成] を選択します。

  3. [CloudFormation] コンソールが開き、 Kinesis Data Generator テンプレートが表示されます。[次へ] を選択します。

  4. [コンポーネントの詳細の指定] ページで、Cognito ユーザーのユーザー名とパスワードを入力します。[次へ] を選択します。

  5. [スタックオプションの設定] ページで、[次へ] を選択します。

  6. [Review Kinesis-Data-Generator-Cognito-User] ページで、[AWSCloudFormation が IAM リソースを作成する可能性があることを認識しています] チェックボックスを選択します。[Create Stack] (スタックの作成) を選択します。

  7. CloudFormation スタックの作成が完了するまで待ちます。スタックが完了したら、 CloudFormation コンソールで [Kinesis-Data-Generator-Cognito-User] スタックを開き、[Outputs] タブを選択します。KinesisDataGeneratorUrlの出力値としてリストされている URL を開きます。

  8. [Amazon Kinesis Data Generator] ページで、ステップ 4 で作成した認証情報を使用してログインします。

  9. 次のページで、次の値を入力します。

    リージョン us-east-1
    ストリーム/Firehose ストリーム ExampleInputStream
    1 秒あたりのレコード数 1

    [記録テンプレート] に、次の内容を貼り付けます。

    { "ticker": "{{random.arrayElement( ["AMZN","MSFT","GOOG"] )}}", "price": {{random.number( { "min":10, "max":150 } )}} }
  10. [データ送信] を選択します。

  11. ジェネレータは、Kinesis データストリームにデータを送信します。

    次のセクションを完了する間、ジェネレータを作動させたままにしておきます。

Studio ノートブックをテストします。

このセクションでは、Studio ノートブックを使用して Kinesis データストリームのデータをクエリします。

  1. https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard」にある Apache Flink コンソール用 Managed Service を開きます。

  2. [Apache Flink アプリケーション用 Managed Service] ページで、[Studio ノートブック] タブを選択します。「MyNotebook」を選択します。

  3. MyNotebook」ページで、[Apache Zeppelin で開く] を選択します。

    新しいタブで Apache Zeppelin インターフェイスが開きます。

  4. [Zeppelinへようこそ!]ページで [Zeppelin Note] を選択します。

  5. Zeppelin Note」ページで、新しいノートに次のクエリを入力します。

    %flink.ssql(type=update) select * from stock

    実行アイコンを選択します。

    しばらくすると、ノートには Kinesis データストリームのデータが表示されます。

アプリケーションの Apache Flink ダッシュボードを開いて運用状況を表示するには、「FLINK JOB」を選択します。Flink Dashboard の詳細については、「Managed Service for Apache Flink デベロッパーガイド」の「Apache Flink ダッシュボード」を参照してください。

Flink ストリーミング SQL クエリの他の例については、「Apache Flink ドキュメント」の「クエリ」を参照してください。