

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Managed Service for Apache Flink の Studio ノートブックの例とチュートリアル
<a name="how-zeppelin-examples"></a>

**Topics**
+ [チュートリアル: Managed Service for Apache Flink で Studio ノートブックを作成](example-notebook.md)
+ [チュートリアル: 永続的な状態で、Managed Service for Apache Flink アプリケーションとして Studio ノートブックをデプロイする](example-notebook-deploy.md)
+ [Studio ノートブックでデータを分析するため、クエリの例を表示する](how-zeppelin-sql-examples.md)

# チュートリアル: Managed Service for Apache Flink で Studio ノートブックを作成
<a name="example-notebook"></a>

次のチュートリアルでは、Kinesis データストリームまたは Amazon MSK クラスターからデータを読み取る Studio ノートブックを作成する方法を示しています。

**Topics**
+ [の前提条件を満たす](#example-notebook-setup)
+ [AWS Glue データベースを作成する](#example-notebook-glue)
+ [次のステップ: Kinesis Data Streams または Amazon MSK を使用して Studio ノートブックを作成する](#examples-notebook-nextsteps)
+ [Kinesis Data Streams を使用した Studio ノートブックの作成](example-notebook-streams.md)
+ [Amazon MSK による Studio ノートブックの作成](example-notebook-msk.md)
+ [アプリケーションおよび依存関係リソースをクリーンアップする](example-notebook-cleanup.md)

## の前提条件を満たす
<a name="example-notebook-setup"></a>

 AWS CLI がバージョン 2 以降であることを確認します。最新の をインストールするには AWS CLI、[「 AWS CLI バージョン 2 のインストール、更新、アンインストール](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html)」を参照してください。

## AWS Glue データベースを作成する
<a name="example-notebook-glue"></a>

Studio ノートブックは、Amazon MSK データソースに関するメタデータ用の「[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)」データベースを使用します。

**AWS Glue データベースを作成する**

1. [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) で AWS Glue コンソールを開きます。

1. **[Add database]** (データベースの追加) を選択します。[**データベースの追加**] ウィンドウで、[**データベース名**] に **default** を入力します。**[作成]** を選択します。

## 次のステップ: Kinesis Data Streams または Amazon MSK を使用して Studio ノートブックを作成する
<a name="examples-notebook-nextsteps"></a>

このチュートリアルでは、Kinesis Data Streams または Amazon MSK のいずれかを使用する Studio ノートブックを作成できます。
+ [Kinesis Data Streams を使用した Studio ノートブックの作成](example-notebook-streams.md) : Kinesis Data Streams を使用すると、ソースとして Kinesis データストリーム を使用するアプリケーションをすばやく作成できます。依存リソースとして Kinesis データストリームを作成するだけで済みます。
+ [Amazon MSK による Studio ノートブックの作成](example-notebook-msk.md) : Amazon MSK で、Amazon MSK クラスターをソースとして使用するアプリケーションを作成します。依存リソースとして Amazon VPC、Amazon EC2 クライアントインスタンス、および Amazon MSK クラスターを作成する必要があります。

# Kinesis Data Streams を使用した Studio ノートブックの作成
<a name="example-notebook-streams"></a>

このチュートリアルでは、Kinesis Data Stream をソースとして使用する Studio ノートブックを作成する方法について説明します。

**Topics**
+ [の前提条件を満たす](#example-notebook-streams-setup)
+ [AWS Glue テーブルを作成する](#example-notebook-streams-glue)
+ [Kinesis Data Streams を使用した Studio ノートブックの作成](#example-notebook-streams-create)
+ [Kinesis データストリームへのデータ送信](#example-notebook-streams-send)
+ [Studio ノートブックをテストします。](#example-notebook-streams-test)

## の前提条件を満たす
<a name="example-notebook-streams-setup"></a>

Studio ノートブックを作成する前に、Kinesis データストリーム (`ExampleInputStream`) を作成します。アプリケーションはこのストリームをアプリケーションソースとして使用します。

このストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールでの操作方法については、「Amazon Kinesis Data Streams デベロッパーガイド」 の 「[データストリームの作成および更新](https://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-streams.html)」 を参照してください。ストリーム **ExampleInputStream** に名前を付け、[**オープンシャード数**] を **1** に設定します。

を使用してストリーム (`ExampleInputStream`) を作成するには AWS CLI、次の Amazon Kinesis `create-stream` AWS CLI コマンドを使用します。

```
$ aws kinesis create-stream \
--stream-name ExampleInputStream \
--shard-count 1 \
--region us-east-1 \
--profile adminuser
```

## AWS Glue テーブルを作成する
<a name="example-notebook-streams-glue"></a>

Studio ノートブックは、Kinesis Data Streams データソースに関するメタデータに「[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)」データベースを使用します。

**注記**  
データベースは最初に手動で作成することも、ノートブックの作成時に Apache Flink 用 Managed Service に自動的に作成させることもできます。同様に、このセクションで説明されているように手動でテーブルを作成することも、Apache Zeppelin 内のノートブックで Apache Flink 用 Managed Service のテーブル作成コネクタコードで DDL ステートメントを使用してテーブルを作成することもできます。その後、チェックイン AWS Glue して、テーブルが正しく作成されたことを確認できます。

**テーブルを作成する**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) で AWS Glue コンソールを開きます。

1.  AWS Glue データベースがまだない場合は、左側のナビゲーションバーから**データベース**を選択します。**[データベースの追加]** を選択します。[**データベースの追加**] ウィンドウで、[**データベース名**] に **default** を入力します。[**Create**] (作成) を選択します。

1. 左のナビゲーションバーで、[**テーブル**] を選択します。「**テーブル**」ページで、「**テーブルを追加**」、「**テーブルを手動で追加**」を選択します。

1. 「**テーブルのプロパティの設定**」ページで、「**テーブル名**」に **stock** を入力します。以前に作成したデータベースを選択していることを確認してください。[**Next (次へ)**] を選択します。

1. [**データストアの追加**] ページで、[**Kinesis**] を選択します。**[Stream name]**（ストリーム名）に **ExampleInputStream** を入力します。[**Kinesis ソース URL**] には、 **https://kinesis.us-east-1.amazonaws.com** の入力を選択します。[**Kinesis ソース URL**] をコピーして貼り付ける場合は、先頭または末尾のスペースを必ず削除してください。[**Next (次へ)**] を選択します。

1. 「**分類**」ページで「**JSON**」を選択します。[**Next (次へ)**] を選択します。

1. **スキーマを定義する**で、[Add column] を編集して列を追加します。以下のプロパティを持つ列を追加します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/example-notebook-streams.html)

   [**Next (次へ)**] を選択します。

1. 次のページで設定を確認し、「**終了**」を選択します。

1. テーブルの一覧で、新しく作成したテーブルを選択します。

1. 「**テーブル編集**」を選択し、キー `managed-flink.proctime` と値 `proctime` を含むプロパティを追加します。

1. [**Apply**] を選択します。

## Kinesis Data Streams を使用した Studio ノートブックの作成
<a name="example-notebook-streams-create"></a>

アプリケーションで使用するリソースを作成したので、次は Studio ノートブックを作成します。

**Topics**
+ [を使用して Studio ノートブックを作成する AWS マネジメントコンソール](#example-notebook-create-streams-console)
+ [を使用して Studio ノートブックを作成する AWS CLI](#example-notebook-msk-create-api)

### を使用して Studio ノートブックを作成する AWS マネジメントコンソール
<a name="example-notebook-create-streams-console"></a>

1. 「[https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)」にある Apache Flink コンソール用 Managed Service を開きます。

1. 「**Apache Flink アプリケーション用 Managed Service**」ページで、「**Studio**」タブを選択します。[**Studio ノートブックの作成**] を選択します。
**注記**  
入力する Amazon MSK クラスターまたは Kinesis Data Streams を選択して [**リアルタイムでデータを処理**] を選択することで、Amazon MSK または Kinesis Data Streams コンソールから Studio ノートブックを作成することもできます。

1. [**Studio ノートブックの作成**] ページで、次の情報を入力します。
   + ノートブックの名前に「**MyNotebook**」を入力します。
   + 「**AWS Glue データベース**」の「**デフォルト**」を選択します。

   [**Studio ノートブックの作成**] を選択します。

1. 「**MyNotebook**」 ページで、[**実行**] を選択します。「**ステータス**」に「**実行中**」が表示されるまで待ちます。ノートブックの実行中は料金が発生します。

### を使用して Studio ノートブックを作成する AWS CLI
<a name="example-notebook-msk-create-api"></a>

を使用して Studio ノートブックを作成するには AWS CLI、次の手順を実行します。

1. アカウント ID を確認します。アプリケーションを作成する際にこの値が必要になります。

1. ロール `arn:aws:iam::AccountID:role/ZeppelinRole` を作成し、コンソールで自動作成されたロールに以下の権限を追加します。

   `"kinesis:GetShardIterator",`

   `"kinesis:GetRecords",`

   `"kinesis:ListShards"`

1. `create.json` というファイルを次の内容で作成します。プレースホルダー値を、ユーザー自身の情報に置き換えます。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. アプリケーションを作成するには、次のコマンドを実行します。

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. コマンドが完了すると、新しい Studio ノートブックの詳細を示す出力結果が表示されます。次は出力の例です。

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. アプリケーションを起動するには、次のコマンドを実行します。サンプル値をアカウント ID に置き換えます。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Kinesis データストリームへのデータ送信
<a name="example-notebook-streams-send"></a>

Kinesis データストリームにテストデータを送信するには、次の手順に従います。

1. [Kinesis Data Generator](https://awslabs.github.io/amazon-kinesis-data-generator/web/help.html) を開きます。

1. [**CloudFormation を使用して CognitoUser を作成**] を選択します。

1.  CloudFormation コンソールが開き、Kinesis Data Generator テンプレートが表示されます。[**Next (次へ)**] を選択します。

1. **[コンポーネントの詳細の指定]** ページで、Cognito ユーザーのユーザー名とパスワードを入力します。[**Next (次へ)**] を選択します。

1. **[スタックオプションの設定]** ページで、**[次へ]** を選択します。

1. **Kinesis-Data-Generator-Cognito-User の確認**」ページで、** AWS CloudFormation が IAM リソースを作成する場合があることを承認します**」チェックボックスを選択します。**[Create Stack]** (スタックの作成) を選択します。

1.  CloudFormation スタックの作成が完了するまで待ちます。スタックが完了したら、 CloudFormation コンソールで **Kinesis-Data-Generator-Cognito-User** スタックを開き、**出力**タブを選択します。**KinesisDataGeneratorUrl**の出力値としてリストされている URL を開きます。

1. [**Amazon Kinesis Data Generator**] ページで、ステップ 4 で作成した認証情報を使用してログインします。

1. 次のページで、次の値を入力します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/example-notebook-streams.html)

   [**記録テンプレート**] に、次の内容を貼り付けます。

   ```
   {
       "ticker": "{{random.arrayElement(
           ["AMZN","MSFT","GOOG"]
       )}}",
       "price": {{random.number(
           {
               "min":10,
               "max":150
           }
       )}}
   }
   ```

1. [**データ送信**] を選択します。

1. ジェネレータは、Kinesis データストリームにデータを送信します。

   次のセクションを完了する間、ジェネレータを作動させたままにしておきます。

## Studio ノートブックをテストします。
<a name="example-notebook-streams-test"></a>

このセクションでは、Studio ノートブックを使用して Kinesis データストリームのデータをクエリします。

1. 「[https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)」にある Apache Flink コンソール用 Managed Service を開きます。

1. [**Apache Flink アプリケーション用 Managed Service**] ページで、[**Studio ノートブック**] タブを選択します。「**MyNotebook**」を選択します。

1. 「**MyNotebook**」ページで、[**Apache Zeppelin で開く**] を選択します。

   新しいタブで Apache Zeppelin インターフェイスが開きます。

1. [**Zeppelinへようこそ！]**ページで [**Zeppelin Note**] を選択します。

1. 「**Zeppelin Note**」ページで、新しいノートに次のクエリを入力します。

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   実行アイコンを選択します。

   しばらくすると、ノートには Kinesis データストリームのデータが表示されます。

アプリケーションの Apache Flink ダッシュボードを開いて運用状況を表示するには、「**FLINK JOB**」を選択します。Flink Dashboard の詳細については、[「Managed Service for Apache Flink デベロッパーガイド」](https://docs.aws.amazon.com/)の「[Apache Flink ダッシュボード](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)」を参照してください。

Flink ストリーミング SQL クエリの他の例については、「[Apache Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15/)」の「[クエリ](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)」を参照してください。

# Amazon MSK による Studio ノートブックの作成
<a name="example-notebook-msk"></a>

このチュートリアルでは、Amazon MSK クラスターをソースとして使用する Studio ノートブックを作成する方法について説明します。

**Topics**
+ [Amazon MSK クラスターのセットアップ](#example-notebook-msk-setup)
+ [VPC に NAT ゲートウェイを追加する](#example-notebook-msk-nat)
+ [AWS Glue 接続とテーブルを作成する](#example-notebook-msk-glue)
+ [Amazon MSK による Studio ノートブックの作成](#example-notebook-msk-create)
+ [Amazon MSK クラスターにデータを送信します。](#example-notebook-msk-send)
+ [Studio ノートブックをテストします。](#example-notebook-msk-test)

## Amazon MSK クラスターのセットアップ
<a name="example-notebook-msk-setup"></a>

このチュートリアルでは、プレーンテキストでアクセスできる Amazon MSK クラスターが必要です。Amazon MSK クラスターをまだセットアップしていない場合は、「[Amazon MSK の使用入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)」チュートリアルに従って、Amazon VPC、Amazon MSK クラスター、トピック、および Amazon EC2 クライアントインスタンスを作成してください。

チュートリアルを実行するときは、以下の手順を実行します。
+ 「[ステップ 3: Amazon MSK クラスターを作成する](https://docs.aws.amazon.com/msk/latest/developerguide/create-cluster.html)」のステップ 4 で、 `ClientBroker` 値を `TLS` から **PLAINTEXT** に変更します。

## VPC に NAT ゲートウェイを追加する
<a name="example-notebook-msk-nat"></a>

「[Amazon MSK の使用入門](https://docs.aws.amazon.com/msk/latest/developerguide/getting-started.html)」チュートリアルに従って Amazon MSK クラスターを作成した場合、または既存の Amazon VPC にプライベートサブネット用の NAT ゲートウェイがまだない場合は、Amazon VPC に NAT ゲートウェイを追加する必要があります。アーキテクチャを次の図に示します。

![\[AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/vpc_05.png)


Amazon VPC の NAT ゲートウェイを作成するには、次の手順を実行します。

1. Amazon VPC コンソール ([https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/)) を開きます。

1. 左のナビゲーションバーから、[**NAT ゲートウェイ**] を選択します。

1. 「**NAT ゲートウェイ**」ページで「**NAT ゲートウェイの作成**」を選択します。

1. [**NAT ゲートウェイの作成**] ページで、以下の値を入力します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/example-notebook-msk.html)

   **[Create NAT Gateway]** (NAT ゲートウェイの作成) を選択します。

1. 左のナビゲーションバーで、**[ルートテーブル ]** を選択します。

1. [**ルートテーブルの作成**] を選択します。

1. [**ルートテーブルの作成**] ページで、以下の情報を指定します。
   + **名前タグ:** **ZeppelinRouteTable**
   + 「**VPC**」: 自分の VPC (例:「**AWS KafkaTutorialVPC**」)を選択します。

   [**Create**] (作成) を選択します。

1. ルートテーブルのリストから「**ZeppelinRouteTable**」を選択します。[**ルート**] タブを選択し、[**ルート編集**] を選択します。

1. **[ルートの編集]** ページで、**[ルートの追加]** を選択します。

1. ******[送信先]** に「**0.0.0.0/0**」と入力します。「**Target**」には「**NAT ゲートウェイ**」、「**ZeppelinGateway**」。[**ルートの保存**] を選択します。[**閉じる**] を選択します。

1. 「ルートテーブル」ページで「**ZeppelinRouteTable**」を選択した状態で、「**サブネット関連付け**」タブを選択します。「**サブネット関連付けの編集**」を選択します。

1. 「**サブネット関連付けの編集**」ページで、「**AWS KafkaTutorialSubnet2**」と「**AWS KafkaTutorialSubnet3**」を選択します。[**Save**] を選択します。

## AWS Glue 接続とテーブルを作成する
<a name="example-notebook-msk-glue"></a>

Studio ノートブックは、Amazon MSK データソースに関するメタデータ用の「[AWS Glue](https://docs.aws.amazon.com/glue/latest/dg/what-is-glue.html)」データベースを使用します。このセクションでは、Amazon MSK クラスターにアクセスする方法を説明する AWS Glue 接続と、Studio ノートブックなどのクライアントにデータソース内のデータを表示する方法を説明する AWS Glue テーブルを作成します。

**接続を作成する**

1. にサインイン AWS マネジメントコンソール し、[https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) で AWS Glue コンソールを開きます。

1.  AWS Glue データベースがまだない場合は、左側のナビゲーションバーから**データベース**を選択します。**[データベースの追加]** を選択します。[**データベースの追加**] ウィンドウで、[**データベース名**] に **default** を入力します。[**Create**] (作成) を選択します。

1. 左のナビゲーションバーから、[**接続**]を選択します。[**接続の追加**] を選択します。

1. 「**接続を追加**」ウィンドウで、次の値を入力します。
   + **[接続名]** に、**ZeppelinConnection** と入力します。
   + [**接続タイプ**] で、[**Kafka**] を選択します。
   + 「**Kafka ブートストラップサーバー URL**」には、クラスターのブートストラップブローカーの文字列を指定します。ブートストラップブローカーは、MSK コンソールから、または次の CLI コマンドを入力して取得できます。

     ```
     aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
     ```
   + 「**SSL 接続が必要**」チェックボックスをオフにします。

   [**Next (次へ)**] を選択します。

1. **[VPC]** ページで、次の値を入力します。
   + **VPC** の場合は、VPC の名前 (** AWS KafkaTutorialVPC** など) を選択します。
   + 「**サブネット**」には、「**AWS KafkaTutorialSubnet2**」を選択します。
   + 「**セキュリティグループ**」では、使用可能なすべてのグループを選択します。

   [**Next (次へ)**] を選択します。

1. 「**接続プロパティ**」/「**接続アクセス**」ページで 「**完了**」を選択します。

**テーブルを作成する**
**注記**  
次の手順で説明するように手動でテーブルを作成することも、Apache Zeppelin 内のノートブックにある Apache Flink 用 Managed Service のテーブル作成コネクタコードを使用して DDL ステートメントでテーブルを作成することもできます。その後、チェックイン AWS Glue して、テーブルが正しく作成されたことを確認できます。

1. 左のナビゲーションバーで、[**テーブル**] を選択します。「**テーブル**」ページで、「**テーブルを追加**」、「**テーブルを手動で追加**」を選択します。

1. 「**テーブルのプロパティの設定**」ページで、「**テーブル名**」に **stock** を入力します。以前に作成したデータベースを選択していることを確認してください。[**Next (次へ)**] を選択します。

1. 「**データストアの追加**」ページで「**Kafka**」を選択します。トピック名には、「**トピック名**」 (「**AWS KafkaTutorialTopic**」など) を入力します。「**接続**」には「**ZeppelinConnection**」を選択します。

1. 「**分類**」ページで「**JSON**」を選択します。[**Next (次へ)**] を選択します。

1. **スキーマを定義する**で、[Add column] を編集して列を追加します。以下のプロパティを持つ列を追加します。    
[\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/example-notebook-msk.html)

   [**Next (次へ)**] を選択します。

1. 次のページで設定を確認し、「**終了**」を選択します。

1. テーブルの一覧で、新しく作成したテーブルを選択します。

1. **テーブルの編集** を選択し、次のプロパティを追加します。
   + キー: `managed-flink.proctime`、値: `proctime`
   + キー: `flink.properties.group.id`、値: `test-consumer-group`
   + キー: `flink.properties.auto.offset.reset`、値: `latest`
   + キー: `classification`、値: `json`

   これらのキーと値のペアがないと、Flink ノートブックはエラーになります。

1. [**Apply**] を選択します。

## Amazon MSK による Studio ノートブックの作成
<a name="example-notebook-msk-create"></a>

アプリケーションで使用するリソースを作成したので、次は Studio ノートブックを作成します。

**Topics**
+ [を使用して Studio ノートブックを作成する AWS マネジメントコンソール](#example-notebook-create-msk-console)
+ [を使用して Studio ノートブックを作成する AWS CLI](#example-notebook-msk-create-api)

**注記**  
Amazon MSK コンソールから既存のクラスターを選択し、「**データをリアルタイムで処理**」を選択することで Studio ノートブックを作成することもできます。

### を使用して Studio ノートブックを作成する AWS マネジメントコンソール
<a name="example-notebook-create-msk-console"></a>

1. 「[https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)」にある Apache Flink コンソール用 Managed Service を開きます。

1. 「**Apache Flink アプリケーション用 Managed Service**」ページで、「**Studio**」タブを選択します。「**Studio ノートブックの作成**」を選択します。
**注記**  
Amazon MSK または Kinesis Data Streams コンソールから Studio ノートブックを作成するには、入力の Amazon MSK クラスターまたは Kinesis データストリームを選択し、「**データをリアルタイムで処理**」を選択します。

1. [**Studio ノートブックの作成**] ページで、次の情報を入力します。
   + 「**Studio ノートブック名**」に **MyNotebook** を入力します。
   + 「**AWS Glue データベース**」の「**デフォルト**」を選択します。

   「**Studio ノートブックの作成**」を選択します。

1. 「**MyNotebook**」ページで、「**構成**」タブを選択します。「**Networking**」セクションで、「**編集**」を選択します。

1. 「**MyNotebook のネットワークの編集**」ページで、「**Amazon MSK クラスターに基づく VPC 設定**」を選択します。「**Amazon MSK クラスター**」には Amazon MSK クラスターを選択します。[**Save changes**] (変更の保存) をクリックします。

1. 「**MyNotebook**」ページで、「**実行**」を選択します。「**ステータス**」に「**実行中**」が表示されるまで待ちます。

### を使用して Studio ノートブックを作成する AWS CLI
<a name="example-notebook-msk-create-api"></a>

を使用して Studio ノートブックを作成するには AWS CLI、次の手順を実行します。

1. 次の情報があることを確認します。アプリケーションを作成するにはこれらの値が必要です。
   +  アカウント ID。
   + Amazon MSK クラスターを含む Amazon VPC 用のサブネット ID やセキュリティグループ ID。

1. `create.json` というファイルを次の内容で作成します。プレースホルダー値を、ユーザー自身の情報に置き換えます。

   ```
   {
       "ApplicationName": "MyNotebook",
       "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
       "ApplicationMode": "INTERACTIVE",
       "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole",
       "ApplicationConfiguration": {
           "ApplicationSnapshotConfiguration": {
               "SnapshotsEnabled": false
           },
           "VpcConfigurations": [
               {
                   "SubnetIds": [
                       "SubnetID 1",
                       "SubnetID 2",
                       "SubnetID 3"
                   ],
                   "SecurityGroupIds": [
                       "VPC Security Group ID"
                   ]
               }
           ],
           "ZeppelinApplicationConfiguration": {
               "CatalogConfiguration": {
                   "GlueDataCatalogConfiguration": {
                       "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default"
                   }
               }
           }
       }
   }
   ```

1. アプリケーションを作成するには、次のコマンドを実行します。

   ```
   aws kinesisanalyticsv2 create-application --cli-input-json file://create.json 
   ```

1. コマンドが完了すると、次のような出力が表示され、新しい Studio ノートブックの詳細が表示されます。

   ```
   {
       "ApplicationDetail": {
           "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook",
           "ApplicationName": "MyNotebook",
           "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0",
           "ApplicationMode": "INTERACTIVE",
           "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole",
   ...
   ```

1. アプリケーションを起動するには、次のコマンドを実行します。サンプル値をアカウント ID に置き換えます。

   ```
   aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\
   ```

## Amazon MSK クラスターにデータを送信します。
<a name="example-notebook-msk-send"></a>

このセクションでは、Amazon EC2 クライアントで Python スクリプトを実行して Amazon MSK データソースにデータを送信します。

1. Amazon EC2 クライアントに接続します。

1. 以下のコマンドを実行して Python バージョン 3、Pip、および Kafka for Python パッケージをインストールし、アクションを確認します。

   ```
   sudo yum install python37
   curl -O https://bootstrap.pypa.io/get-pip.py
   python3 get-pip.py --user
   pip install kafka-python
   ```

1. 次のコマンドを入力して、クライアントマシン AWS CLI で を設定します。

   ```
   aws configure
   ```

   アカウントの認証情報と **us-east-1** を `region` に入力します。

1. `stock.py` というファイルを次の内容で作成します。サンプル値を Amazon MSK クラスターのブートストラップブローカー文字列に置き換え、トピックが「**AWS KafkaTutorialTopic**」でない場合はトピック名を更新します。

   ```
   from kafka import KafkaProducer
   import json
   import random
   from datetime import datetime
   
   BROKERS = "<<Bootstrap Broker List>>"
   producer = KafkaProducer(
       bootstrap_servers=BROKERS,
       value_serializer=lambda v: json.dumps(v).encode('utf-8'),
       retry_backoff_ms=500,
       request_timeout_ms=20000,
       security_protocol='PLAINTEXT')
   
   
   def getStock():
       data = {}
       now = datetime.now()
       str_now = now.strftime("%Y-%m-%d %H:%M:%S")
       data['event_time'] = str_now
       data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV'])
       price = random.random() * 100
       data['price'] = round(price, 2)
       return data
   
   
   while True:
       data =getStock()
       # print(data)
       try:
           future = producer.send("AWSKafkaTutorialTopic", value=data)
           producer.flush()
           record_metadata = future.get(timeout=10)
           print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset))
       except Exception as e:
           print(e.with_traceback())
   ```

1. 次のコマンドを使用してスクリプトを実行します。

   ```
   $ python3 stock.py
   ```

1. 以下のセクションを実行している間は、スクリプトを実行したままにしておきます。

## Studio ノートブックをテストします。
<a name="example-notebook-msk-test"></a>

このセクションでは、Studio ノートブックを使用して Amazon MSK クラスターのデータをクエリします。

1. 「[https://console.aws.amazon.com/managed-flink/home?region=us-east-1\$1/applications/dashboard](https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard)」にある Apache Flink 用 Managed Serviceコンソールを開きます。

1. [**Apache Flink アプリケーション用 Managed Service**] ページで、[**Studio ノートブック**] タブを選択します。「**MyNotebook**」を選択します。

1. 「**MyNotebook**」ページで、[**Apache Zeppelin で開く**] を選択します。

   新しいタブで Apache Zeppelin インターフェイスが開きます。

1. 「**Zeppelinへようこそ！**」でページで「**Zeppelinの新ノート**」を選択します。

1. 「**Zeppelin Note**」ページで、新しいノートに次のクエリを入力します。

   ```
   %flink.ssql(type=update)
   select * from stock
   ```

   実行アイコンを選択します。

   アプリケーションは Amazon MSK クラスターのデータを表示します。

アプリケーションの Apache Flink ダッシュボードを開いて運用状況を表示するには、「**FLINK JOB**」を選択します。Flink Dashboard の詳細については、[「Managed Service for Apache Flink デベロッパーガイド」](https://docs.aws.amazon.com/)の「[Apache Flink ダッシュボード](https://docs.aws.amazon.com/managed-flink/latest/java/how-dashboard.html)」を参照してください。

Flink ストリーミング SQL クエリの他の例については、「[Apache Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15/)」の「[クエリ](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)」を参照してください。

# アプリケーションおよび依存関係リソースをクリーンアップする
<a name="example-notebook-cleanup"></a>

## Studio ノートブックの削除
<a name="example-notebook-cleanup-app"></a>

1. Apache Flink コンソール用 Managed Service を開きます。

1. 「**MyNotebook**」を選択します。

1. [**Actions (アクション)**] を選択してから [**Delete (削除)**] を選択します。

## AWS Glue データベースと接続を削除する
<a name="example-notebook-cleanup-glue"></a>

1. [https://console.aws.amazon.com/glue/](https://console.aws.amazon.com/glue/) で AWS Glue コンソールを開きます。

1. 左のナビゲーションペインで、[**Databases**] (データベース) を選択します。[**Default**]（デフォルト）の横にあるチェックボックスをチェックして選択します。[**Action**]（アクション）、[**Delete Database**]（データベースの削除）を選択します。[Confirm] (確認) をクリックして、選択内容を確認します。

1. 左のナビゲーションバーから、[**接続**]を選択します。[**ZeppelinConnection**] の横にあるチェックボックスをチェックして選択してください。[**Action**]（アクション）、[**Delete Connection**]（接続の削除）を選択します。[Confirm] (確認) をクリックして、選択内容を確認します。

## ポリシーと IAM ロールを削除するには
<a name="example-notebook-msk-cleanup-iam"></a>

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. 左のナビゲーションメニューから **[ロール]** を選択します。

1. 検索バーを使用して「**ZeppelinRole**」ロールを検索します。

1. 「**ZeppelinRole**」ロールを選択します。**ロールの削除** を選択します。削除を確定します。

## CloudWatch ロググループを削除します
<a name="example-notebook-cleanup-cw"></a>

コンソールを使用してアプリケーションを作成すると、コンソールは CloudWatch Logs グループとログストリームを自動的に作成します。 AWS CLIを使用してアプリケーションを作成した場合、ロググループとストリームはありません。

1. CloudWatch コンソールの [https://console.aws.amazon.com/cloudwatch/](https://console.aws.amazon.com/cloudwatch/) を開いてください。

1. 左側のナビゲーションバーで、[**Log groups**]（ロググループ）を選択します。

1. 「**/AWS/Kinesis Analytics/MyNotebook ロググループ**」を選択します。

1. [**アクション**]、[**ロググループの削除**] の順にクリックします。削除を確定します。

## Kinesis Data Streams リソースをクリーンアップする
<a name="example-notebook-cleanup-streams"></a>

Kinesis ストリームを削除するには、Kinesis Data Streams コンソールを開き、Kinesis ストリームを選択して、[**Actions**]、[**Delete**] を選択します。

## MSK リソースをクリーンアップする
<a name="example-notebook-cleanup-msk"></a>

このチュートリアル用の Amazon MSK クラスターを作成した場合は、このセクションのステップに従います。このセクションでは、Amazon EC2 クライアントインスタンス、Amazon VPC、および Amazon MSK クラスターをクリーンアップする方法について説明します。

### Amazon MSK クラスターを削除する
<a name="example-notebook-msk-cleanup-msk"></a>

このチュートリアル用に Amazon MSK クラスターを作成した場合は、以下の手順に従います。

1. [https://console.aws.amazon.com/msk/home?region=us-east-1\$1/home/](https://console.aws.amazon.com/msk/home?region=us-east-1#/home/) で Amazon MSK コンソールを開きます。

1. [**AWS KafkaTutorialCluster**] を選択します。**[削除]** を選択します。表示されるウィンドウに **delete** を入力し、選択を確定します。

### クライアントインスタンスの終了
<a name="example-notebook-msk-cleanup-client"></a>

このチュートリアル用に Amazon EC2 クライアントインスタンスを作成した場合は、次の手順に従います。

1. Amazon EC2 コンソールの [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) を開いてください。

1. 左側のナビゲーションペインから、[**Instances**]（インスタンス）を選択します。

1. [**ZeppelinClient**] の横にあるチェックボックスをチェックして選択します。

1. [**インスタンスの状態**] 、[**インスタンスの終了**] の順に選択します。

### Amazon VPC の削除
<a name="example-notebook-msk-cleanup-vpc"></a>

このチュートリアル用に Amazon VPC を作成した場合は、次の手順に従います。

1. Amazon EC2 コンソールの [https://console.aws.amazon.com/ec2/](https://console.aws.amazon.com/ec2/) を開いてください。

1. 左のナビゲーションバーから [**Network Interfaces**]（ネットワークインターフェース）を選択します。

1. 検索ボックスに VPC ID を入力し、Enter キーを押します。

1. テーブルヘッダーのチェックボックスを選択して、表示されているすべてのネットワークインターフェースを選択します。

1. [**アクション**]、[**デタッチ**] の順にクリックしてください。表示されるウィンドウで、[**Force detachment**]（強制デタッチメント）の [**Enable**]（有効化）を選択します。[**Detach**]（デタッチ）を選択し、すべてのネットワークインターフェースが [**Available**]（利用可能）ステータスになるまで待ちます。

1. テーブルヘッダーのチェックボックスを選択して、表示されているすべてのネットワークインターフェースを再び選択します。

1. **[アクション]**、**[削除]** の順に選択します。アクションを確認します。

1. Amazon VPC コンソールの [https://console.aws.amazon.com/vpc/](https://console.aws.amazon.com/vpc/) を開いてください。

1. [**AWS KafkaTutorialVPC**] を選択します。**[アクション]** 、**[VPC を削除]** の順に選択します。**delete** を入力して、削除を確定します。

# チュートリアル: 永続的な状態で、Managed Service for Apache Flink アプリケーションとして Studio ノートブックをデプロイする
<a name="example-notebook-deploy"></a>

以下のチュートリアルでは、Apache Flink アプリケーション用 Managed Service として、 Studio ノートブックを永続的な状態でデプロイする方法を示します。

**Topics**
+ [前提条件を満たす](#example-notebook-durable-setup)
+ [を使用して永続的な状態のアプリケーションをデプロイする AWS マネジメントコンソール](#example-notebook-deploy-console)
+ [を使用して永続的な状態のアプリケーションをデプロイする AWS CLI](#example-notebook-deploy-cli)

## 前提条件を満たす
<a name="example-notebook-durable-setup"></a>

Kinesis Data Streams または Amazon MSK のいずれかを使用して、 [チュートリアル: Managed Service for Apache Flink で Studio ノートブックを作成](example-notebook.md) に従って新しい Studio ノートブックを作成します。Studio ノートブック `ExampleTestDeploy` に名前を付けます。

## を使用して永続的な状態のアプリケーションをデプロイする AWS マネジメントコンソール
<a name="example-notebook-deploy-console"></a>

1. コンソールの **[アプリケーションコードの場所 - *オプショナル*]** にパッケージ化されたコードを保存する S3 バケットの場所を追加します。これにより、ノートブックから直接アプリケーションをデプロイして実行できるようになります。

1. アプリケーションの役割に必要な権限を追加して、Amazon S3 バケットの読み取りと書き込みに使用している役割を有効にし、 Apache Flink アプリケーションのホスティングサービスを起動します。
   + AmazonS3FullAccess
   + Amazonmanaged-flinkFullAccess
   + ソース、宛先、VPC へのアクセス (該当する場合)。詳細については、 [Studio ノートブックの IAM アクセス許可を確認する](how-zeppelin-iam.md) を参照してください。

1. 次のサンプルコードを使用してください。

   ```
   %flink.ssql(type=update) 
   CREATE TABLE exampleoutput (
     'ticket' VARCHAR,
     'price' DOUBLE
   )
   WITH (
     'connector' = 'kinesis',
     'stream' = 'ExampleOutputStream',
     'aws.region' = 'us-east-1',
     'scan.stream.initpos' = 'LATEST',
     'format' = 'json'
   );
   
   INSERT INTO exampleoutput SELECT ticker, price FROM exampleinputstream
   ```

1. この機能を有効にすると、ノートブック内の各ノートの右上隅に、ノートブックの名前が記載された新しいドロップダウンが表示されます。以下を行うことができます。
   + Studio ノートブックの設定は、 AWS マネジメントコンソールで確認できます。
   + Zeppelin Note を作成して、Amazon S3 にエクスポートします。この時点で、アプリケーションの名前を入力し、「**Build および Export**」を選択します。エクスポートが完了すると、通知が届きます。
   + 必要に応じて、Amazon S3 で実行可能ファイルの追加テストを表示して実行することができます。
   + 構築が完了すると、永続的な状態と自動スケーリング機能を備えた Kinesis ストリーミングアプリケーションとしてコードをデプロイできるようになります。
   + ドロップダウンを使用して、「**Zeppelin Note を Kinesis ストリーミングアプリケーションとしてデプロイ**」を選択します。アプリケーション名を確認し、** AWS コンソール経由でデプロイ**を選択します。
   + これにより、Managed Service for Apache Flink アプリケーションを作成するための AWS マネジメントコンソール ページが表示されます。アプリケーション名、並列処理、コードの場所、デフォルトの Glue DB、VPC (該当する場合)、IAM ロールが事前に入力されていることに注意してください。IAM ロールがソースと宛先に対して必要な権限を持っていることを確認します。永続的なアプリケーション状態管理のため、スナップショットはデフォルトで有効になっています。
   + [**Create application**] を選択します。
   + 「**コンフィグ**」を選択して任意の設定を変更し、「**Run**」を選択してストリーミング・アプリケーションを起動することができます。

## を使用して永続的な状態のアプリケーションをデプロイする AWS CLI
<a name="example-notebook-deploy-cli"></a>

を使用してアプリケーションをデプロイするには AWS CLI、ベータ 2 情報で提供されるサービスモデルを使用する AWS CLI ように を更新する必要があります。更新されたサービスモデルの使用方法については、 [の前提条件を満たす前提条件を満たす](example-notebook.md#example-notebook-setup) を参照してください。

次のコード例で、新規の Studio ノートブックを作成します。

```
aws kinesisanalyticsv2 create-application \
     --application-name <app-name> \
     --runtime-environment ZEPPELIN-FLINK-3_0 \
     --application-mode INTERACTIVE \
     --service-execution-role <iam-role>
     --application-configuration '{ 
       "ZeppelinApplicationConfiguration": { 
         "CatalogConfiguration": { 
           "GlueDataCatalogConfiguration": { 
             "DatabaseARN": "arn:aws:glue:us-east-1:<account>:database/<glue-database-name>" 
           } 
         } 
       },
       "FlinkApplicationConfiguration": {
         "ParallelismConfiguration": {
           "ConfigurationType": "CUSTOM",
           "Parallelism": 4,
           "ParallelismPerKPU": 4
         }
       },
       "DeployAsApplicationConfiguration": {
            "S3ContentLocation": { 
               "BucketARN": "arn:aws:s3:::<s3bucket>",
               "BasePath": "/something/"
            }
        },
       "VpcConfigurations": [
         {
           "SecurityGroupIds": [
             "<security-group>"
           ],
           "SubnetIds": [
             "<subnet-1>",
             "<subnet-2>"
           ]
         }
       ]
     }' \
     --region us-east-1
```

次のコード例で、Studio ノートブックを起動します。

```
aws kinesisanalyticsv2 start-application \
    --application-name <app-name> \
    --region us-east-1 \
    --no-verify-ssl
```

次のコードは、アプリケーションの Apache Zeppelin ノートブックページの URL を返します。

```
aws kinesisanalyticsv2 create-application-presigned-url \
    --application-name <app-name> \
    --url-type ZEPPELIN_UI_URL \

    --region us-east-1 \
    --no-verify-ssl
```

# Studio ノートブックでデータを分析するため、クエリの例を表示する
<a name="how-zeppelin-sql-examples"></a>

**Topics**
+ [Amazon MSK/Apache Kafka でテーブルを作成する](#how-zeppelin-examples-creating-tables)
+ [Kinesis でテーブルを作成する](#how-zeppelin-examples-creating-tables-with-kinesis)
+ [タンブリングウィンドウをクエリする](#how-zeppelin-examples-tumbling)
+ [スライディングウィンドウをクエリする](#how-zeppelin-examples-sliding)
+ [インタラクティブ SQL を使用する](#how-zeppelin-examples-interactive-sql)
+ [BlackHole SQL コネクタを使用する](#how-zeppelin-examples-blackhole-connector-sql)
+ [Scala を使用してサンプルデータを生成する](#notebook-example-data-generator)
+ [インタラクティブ Scala を使用する](#notebook-example-interactive-scala)
+ [インタラクティブ Python を使用する](#notebook-example-interactive-python)
+ [インタラクティブ Python、SQL、Scala の組み合わせを使用する](#notebook-example-interactive-pythonsqlscala)
+ [クロスアカウント Kinesis データストリームを使用する](#notebook-example-crossaccount-kds)

Apache Flink SQL クエリ設定の情報については、「[インタラクティブなデータ分析のための Zeppelin ノートブック上の Flink](https://flink.apache.org/ecosystem/2020/06/23/flink-on-zeppelin-part2.html)」を参照してください。

Apache Flink ダッシュボードでアプリケーションを表示するには、アプリケーションの「**Zeppelin Note**」ページで「**FLINK JOB**」を選択します。

ウィンドウクエリの詳細については、「[Apache Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15/)」の「[Windows](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/stream/operators/windows.html)」を参照してください。

Apache Flink Streaming SQL クエリの他の例については、「[Apache Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15/)」の「[クエリ](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)」を参照してください。

## Amazon MSK/Apache Kafka でテーブルを作成する
<a name="how-zeppelin-examples-creating-tables"></a>

Apache Flink Studio 用 Managed Service を搭載した Amazon MSK Flink コネクタを使用して、Plaintext、SSL または IAM 認証で接続を認証できます。要件に応じて特定のプロパティを使用してテーブルを作成します。

```
-- Plaintext connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- SSL connection

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
   'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SSL',
  'properties.ssl.truststore.location' = '/usr/lib/jvm/java-11-amazon-corretto/lib/security/cacerts',
  'properties.ssl.truststore.password' = 'changeit',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);

-- IAM connection (or for MSK Serverless)

CREATE TABLE your_table (
  `column1` STRING,
  `column2` BIGINT
) WITH (
  'connector' = 'kafka',
  'topic' = 'your_topic',
  'properties.bootstrap.servers' = '<bootstrap servers>',
  'properties.security.protocol' = 'SASL_SSL',
  'properties.sasl.mechanism' = 'AWS_MSK_IAM',
  'properties.sasl.jaas.config' = 'software.amazon.msk.auth.iam.IAMLoginModule required;',
  'properties.sasl.client.callback.handler.class' = 'software.amazon.msk.auth.iam.IAMClientCallbackHandler',
  'properties.group.id' = 'myGroup',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json'
);
```

これらのプロパティは、「[Apache Kafka SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/)」の他のプロパティと組み合わせることができます。

## Kinesis でテーブルを作成する
<a name="how-zeppelin-examples-creating-tables-with-kinesis"></a>

次の例では、Kinesis を使用してテーブルを作成します。

```
CREATE TABLE KinesisTable (
  `column1` BIGINT,
  `column2` BIGINT,
  `column3` BIGINT,
  `column4` STRING,
  `ts` TIMESTAMP(3)
)
PARTITIONED BY (column1, column2)
WITH (
  'connector' = 'kinesis',
  'stream' = 'test_stream',
  'aws.region' = '<region>',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'csv'
);
```

使用可能な他のプロパティの詳細については、「[Amazon Kinesis Data Streams SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kinesis/)」を参照してください。

## タンブリングウィンドウをクエリする
<a name="how-zeppelin-examples-tumbling"></a>

次の Flink Streaming SQL クエリは、 `ZeppelinTopic` テーブルから 5 秒ごとのタンブリングウィンドウの最大値を選択します。

```
%flink.ssql(type=update)
SELECT TUMBLE_END(event_time, INTERVAL '5' SECOND) as winend, MAX(price) as five_second_high, ticker
FROM ZeppelinTopic
GROUP BY ticker, TUMBLE(event_time, INTERVAL '5' SECOND)
```

## スライディングウィンドウをクエリする
<a name="how-zeppelin-examples-sliding"></a>

次の Apache Flink Streaming SQL クエリは、 `ZeppelinTopic` テーブルから 5 秒ごとのスライディングウィンドウの最大値を選択します。

```
%flink.ssql(type=update)
SELECT HOP_END(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND) AS winend, MAX(price) AS sliding_five_second_max
FROM ZeppelinTopic//or your table name in AWS Glue
GROUP BY HOP(event_time, INTERVAL '3' SECOND, INTERVAL '5' SECOND)
```

## インタラクティブ SQL を使用する
<a name="how-zeppelin-examples-interactive-sql"></a>

この例では、イベント時間と処理時間の最大値、キー値テーブルの値の合計を出力します。[Scala を使用してサンプルデータを生成する](#notebook-example-data-generator) のサンプル・データ生成スクリプトが実行されていることを確認します。Studio ノートブックでフィルタリングや結合などの他の SQL クエリを試すには、Apache Flink ドキュメントのApache Flink ドキュメント:「[クエリ](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/sql/queries.html)」を参照してください。

```
%flink.ssql(type=single, parallelism=4, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints how many records from the `key-value-stream` we have seen so far, along with the current processing and event time.
SELECT
  MAX(`et`) as `et`,
  MAX(`pt`) as `pt`,
  SUM(`value`) as `sum`
FROM
  `key-values`
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive tumbling window query that displays the number of records observed per (event time) second.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT
  TUMBLE_START(`et`, INTERVAL '1' SECONDS) as `window`,
  `key`,
  SUM(`value`) as `sum`
FROM
  `key-values`
GROUP BY
  TUMBLE(`et`, INTERVAL '1' SECONDS),
  `key`;
```

## BlackHole SQL コネクタを使用する
<a name="how-zeppelin-examples-blackhole-connector-sql"></a>

BlackHole SQL コネクタでは、クエリをテストするために Kinesis データストリームや Amazon MSK クラスターを作成する必要はありません。BlackHole SQL コネクタの詳細については、Apache Flink ドキュメントの「[BlackHole SQL Connector](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/connectors/blackhole.html)」を参照してください。この例では、デフォルトカタログはインメモリカタログです。

```
%flink.ssql

CREATE TABLE default_catalog.default_database.blackhole_table (
 `key` BIGINT,
 `value` BIGINT,
 `et` TIMESTAMP(3)
) WITH (
 'connector' = 'blackhole'
)
```

```
%flink.ssql(parallelism=1)

INSERT INTO `test-target`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-source`
WHERE
  `key` > 3
```

```
%flink.ssql(parallelism=2)

INSERT INTO `default_catalog`.`default_database`.`blackhole_table`
SELECT
  `key`,
  `value`,
  `et`
FROM
  `test-target`
WHERE
  `key` > 7
```

## Scala を使用してサンプルデータを生成する
<a name="notebook-example-data-generator"></a>

この例では Scala を使用してサンプルデータを生成します。このサンプルデータを使用して、さまざまなクエリをテストできます。テーブル作成ステートメントを使用して key-values テーブルを作成します。

```
import org.apache.flink.streaming.api.functions.source.datagen.DataGeneratorSource
import org.apache.flink.streaming.api.functions.source.datagen.RandomGenerator
import org.apache.flink.streaming.api.scala.DataStream

import java.sql.Timestamp

// ad-hoc convenience methods to be defined on Table 
implicit class TableOps[T](table: DataStream[T]) {
    def asView(name: String): DataStream[T] = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView("`" + name + "`")
      }
      stenv.createTemporaryView("`" + name + "`", table)
      return table;
    }
}
```

```
%flink(parallelism=4)
val stream = senv
 .addSource(new DataGeneratorSource(RandomGenerator.intGenerator(1, 10), 1000))
 .map(key => (key, 1, new Timestamp(System.currentTimeMillis)))
 .asView("key-values-data-generator")
```

```
%flink.ssql(parallelism=4)
-- no need to define the paragraph type with explicit parallelism (such as "%flink.ssql(parallelism=2)")
-- in this case the INSERT query will inherit the parallelism of the of the above paragraph
INSERT INTO `key-values`
SELECT
 `_1` as `key`,
 `_2` as `value`,
 `_3` as `et`
FROM
 `key-values-data-generator`
```

## インタラクティブ Scala を使用する
<a name="notebook-example-interactive-scala"></a>

これは [インタラクティブ SQL を使用する](#how-zeppelin-examples-interactive-sql) の Scala 翻訳です。Scala の他の例については、Apache Flink ドキュメントの「[Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)」を参照してください。

```
%flink
import org.apache.flink.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
    def asView(name: String): Table = {
      if (stenv.listTemporaryViews.contains(name)) {
        stenv.dropTemporaryView(name)
      }
      stenv.createTemporaryView(name, table)
      return table;
    }
}
```

```
%flink(parallelism=4)

// A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time.
val query01 = stenv
  .from("`key-values`")
  .select(
    $"et".max().as("et"),
    $"pt".max().as("pt"),
    $"value".sum().as("sum")
  ).asView("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink(parallelism=4)

// An tumbling window view that displays the number of records observed per (event time) second.
val query02 = stenv
  .from("`key-values`")
  .window(Tumble over 1.seconds on $"et" as $"w")
  .groupBy($"w", $"key")
  .select(
    $"w".start.as("window"),
    $"key",
    $"value".sum().as("sum")
  ).asView("query02")
```

```
%flink.ssql(type=update, parallelism=4, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## インタラクティブ Python を使用する
<a name="notebook-example-interactive-python"></a>

これは [インタラクティブ SQL を使用する](#how-zeppelin-examples-interactive-sql) の Python 翻訳です。Python の他のサンプルについては、Apache Flink ドキュメントの「[Table API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/tableApi.html)」を参照してください。

```
%flink.pyflink
from pyflink.table.table import Table

def as_view(table, name):
  if (name in st_env.list_temporary_views()):
    st_env.drop_temporary_view(name)
  st_env.create_temporary_view(name, table)
  return table

Table.as_view = as_view
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`keyvalues`") \
  .select(", ".join([
    "max(et) as et",
    "max(pt) as pt",
    "sum(value) as sum"
  ])) \
  .as_view("query01")
```

```
%flink.ssql(type=single, parallelism=16, refreshInterval=1000, template=<h1>{2}</h1> records seen until <h1>Processing Time: {1}</h1> and <h1>Event Time: {0}</h1>)

-- An interactive query prints the query01 output.
SELECT * FROM query01
```

```
%flink.pyflink(parallelism=16)

# A view that computes many records from the `key-values` we have seen so far, along with the current processing and event time
st_env \
  .from_path("`key-values`") \
  .window(Tumble.over("1.seconds").on("et").alias("w")) \
  .group_by("w, key") \
  .select(", ".join([
    "w.start as window",
    "key",
    "sum(value) as sum"
  ])) \
  .as_view("query02")
```

```
%flink.ssql(type=update, parallelism=16, refreshInterval=1000)

-- An interactive query prints the query02 output.
-- Browse through the chart views to see different visualizations of the streaming result.
SELECT * FROM `query02`
```

## インタラクティブ Python、SQL、Scala の組み合わせを使用する
<a name="notebook-example-interactive-pythonsqlscala"></a>

ノートブックでは、SQL、Python、Scala を自由に組み合わせてインタラクティブな分析を行うことができます。永続的な状態を持つアプリケーションとしてデプロイする予定の Studio ノートブックでは、SQL と Scala を組み合わせて使用できます。この例では、無視されるセクションと、永続的な状態でアプリケーションにデプロイされるセクションを示しています。

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-source` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-source-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink.ssql
CREATE TABLE `default_catalog`.`default_database`.`my-test-target` (
  `key` BIGINT NOT NULL,
  `value` BIGINT NOT NULL,
  `et` TIMESTAMP(3) NOT NULL,
  `pt` AS PROCTIME(),
  WATERMARK FOR `et` AS `et` - INTERVAL '5' SECOND
)
WITH (
  'connector' = 'kinesis',
  'stream' = 'kda-notebook-example-test-target-stream',
  'aws.region' = 'eu-west-1',
  'scan.stream.initpos' = 'LATEST',
  'format' = 'json',
  'json.timestamp-format.standard' = 'ISO-8601'
)
```

```
%flink()

// ad-hoc convenience methods to be defined on Table
implicit class TableOps(table: Table) {
  def asView(name: String): Table = {
    if (stenv.listTemporaryViews.contains(name)) {
      stenv.dropTemporaryView(name)
    }
    stenv.createTemporaryView(name, table)
    return table;
  }
}
```

```
%flink(parallelism=1)
val table = stenv
  .from("`default_catalog`.`default_database`.`my-test-source`")
  .select($"key", $"value", $"et")
  .filter($"key" > 10)
  .asView("query01")
```

```
%flink.ssql(parallelism=1)

-- forward data
INSERT INTO `default_catalog`.`default_database`.`my-test-target`
SELECT * FROM `query01`
```

```
%flink.ssql(type=update, parallelism=1, refreshInterval=1000)

-- forward data to local stream (ignored when deployed as application)
SELECT * FROM `query01`
```

```
%flink

// tell me the meaning of life (ignored when deployed as application!)
print("42!")
```

## クロスアカウント Kinesis データストリームを使用する
<a name="notebook-example-crossaccount-kds"></a>

Studio ノートブックを所有するアカウント以外のアカウントにおける Kinesis データ・ストリームを使用するには、Studio ノートブックが実行されているアカウントにサービス実行ロールを作成し、データストリームを所有するアカウントにロール信頼ポリシーを作成します。Create table DDL ステートメントの Kinesis コネクタで `aws.credentials.provider`、`aws.credentials.role.arn`、`aws.credentials.role.sessionName` を使用して、データストリームに対してテーブルを作成します。

Studio ノートブックアカウントには、次のサービス実行ロールを使用します。

```
{
 "Sid": "AllowNotebookToAssumeRole",
 "Effect": "Allow",
 "Action": "sts:AssumeRole"
 "Resource": "*"
}
```

データストリームアカウントには、 `AmazonKinesisFullAccess` ポリシーと以下のロール信頼ポリシーを使用してください。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": "arn:aws:iam::123456789012:root"
            },
            "Action": "sts:AssumeRole",
            "Condition": {}
        }
    ]
}
```

------

create table ステートメントには以下の段落を使用してます。

```
%flink.ssql
CREATE TABLE test1 (
name VARCHAR,
age BIGINT
) WITH (
'connector' = 'kinesis',
'stream' = 'stream-assume-role-test',
'aws.region' = 'us-east-1',
'aws.credentials.provider' = 'ASSUME_ROLE',
'aws.credentials.role.arn' = 'arn:aws:iam::<accountID>:role/stream-assume-role-test-role',
'aws.credentials.role.sessionName' = 'stream-assume-role-test-session',
'scan.stream.initpos' = 'TRIM_HORIZON',
'format' = 'json'
)
```