

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Apache Flink アプリケーション用 Managed Serviceを作成して実行する
<a name="gs-table-create"></a>

この演習では、Kinesis Data Streams をソースおよびシンクとして使用して、Managed Service for Apache Flink アプリケーションを作成します。

**Topics**
+ [依存リソースを作成する](#gs-table-resources)
+ [ローカルの開発環境のセットアップ](#gs-table-2)
+ [Apache Flink Streaming Java Code のダウンロードと検証](#gs-table-5)
+ [アプリケーションをローカルで実行する](#gs-table-run-locally)
+ [アプリケーションが S3 バケットに書き込むデータを観察する](#gs-table-input-output)
+ [ローカルで実行されているアプリケーションを停止する](#gs-table-stop)
+ [アプリケーションコードをコンパイルしてパッケージ化する](#gs-table-5.5)
+ [アプリケーションコードの JAR ファイルをアップロードする](#gs-table-6)
+ [Managed Service for Apache Flink アプリケーションを作成して設定する](#gs-table-7)

## 依存リソースを作成する
<a name="gs-table-resources"></a>

このエクササイズで Apache Flink 用 Managed Service を作成する前に、以下の依存リソースを作成します。
+ アプリケーションのコードを保存してアプリケーション出力を書き込む Amazon S3 バケット。
**注記**  
このチュートリアルでは、アプリケーションを us-east-1 リージョンにデプロイすることが前提とされます。別のリージョンを使用する場合、必要に応じてすべてのステップを調整する必要があります。

### Amazon S3 バケットを作成する
<a name="gs-table-resources-s3"></a>

Amazon S3 バケットは、コンソールを使用して作成できます。このリソースの作成手順については、次のトピックを参照してください。
+ 「*Amazon Simple Storage Service ユーザーガイド*」の「[How Do I Create an S3 Bucket?](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket.html)」。ログイン名を追加して、Amazon S3 バケットにグローバルに一意の名前を付けます。
**注記**  
このチュートリアルで使用するリージョンでバケットを作成してください。チュートリアルのデフォルトは us-east-1 です。

### その他のリソース
<a name="gs-table-resources-cw"></a>

アプリケーションを作成すると、Apache Flink 用 Managed Service によって次の Amazon CloudWatch リソースが作成されます（これらのリソースがまだ存在しない場合）。
+ `/AWS/KinesisAnalytics-java/<my-application>`という名前のロググループ。
+ `kinesis-analytics-log-stream` というログストリーム。

## ローカルの開発環境のセットアップ
<a name="gs-table-2"></a>

開発およびデバッグの場合、選択した IDE から直接マシンで Apache Flink アプリケーションを実行できます。すべての Apache Flink の依存関係は、Maven を使用して通常の Java の依存関係として処理されます。

**注記**  
開発マシンには、Java JDK 11、Maven、Git がインストールされている必要があります。[Eclipse Java Neon](https://www.eclipse.org/downloads/packages/release/neon/3) や [IntelliJ IDEA](https://www.jetbrains.com/idea/) などの開発環境を使用することをお勧めします。すべての前提条件を満たしていることを確認するには、「[演習を完了するための前提条件を満たす](getting-started.md#setting-up-prerequisites)」を参照してください。マシンに Apache Flink クラスターをインストールする必要は**ありません**。

### AWS セッションを認証する
<a name="get-started-exercise-table-authenticate"></a>

アプリケーションは Kinesis Data Streams を使用してデータを発行します。ローカルで実行する場合は、Kinesis データストリームに書き込むアクセス許可を持つ有効な AWS 認証済みセッションが必要です。次のステップに従って、セッションを認証します。

1.  AWS CLI と、有効な認証情報が設定された名前付きプロファイルがない場合は、「」を参照してください[AWS Command Line Interface (AWS CLI) のセットアップ](setup-awscli.md)。

1. IDE に統合するプラグインがある場合は AWS、それを使用して IDE で実行されているアプリケーションに認証情報を渡すことができます。詳細については、「[AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/)」および「[AWS Toolkit for compiling the application or running Eclipse](https://docs.aws.amazon.com/toolkit-for-eclipse/v1/user-guide/welcome.html)」を参照してください。

## Apache Flink Streaming Java Code のダウンロードと検証
<a name="gs-table-5"></a>

この例のアプリケーションコードは GitHub で入手できます。

**Java アプリケーションコードのダウンロード**

1. 次のコマンドを使用してリモートリポジトリのクローンを作成します。

   ```
   git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
   ```

1. `./java/GettingStartedTable` ディレクトリに移動します。

### アプリケーションコンポーネントを確認する
<a name="get-started-exercise-table-components"></a>

アプリケーションは `com.amazonaws.services.msf.BasicTableJob` クラスで完全に実装されています。`main()` メソッドはソース、変換、シンクを定義します。実行は、このメソッドの最後にある実行ステートメントによって開始されます。

**注記**  
最適な開発者エクスペリエンスを IDE で開発するため、アプリケーションは Amazon Managed Service for Apache Flink とローカルの両方でコードを変更せずに実行されるように設計されています。
+ Amazon Managed Service for Apache Flink および IDE で実行するときに動作するようにランタイム設定を読み取るため、アプリケーションは IDE でローカルにスタンドアロンとして実行されているかどうか自動的に検出します。この場合、アプリケーションはランタイム設定を異なる方法で読み込みます。

  1. アプリケーションが IDE でスタンドアロンモードで実行されていることが検出されたら、プロジェクトの **[リソース]** フォルダに含まれている `application_properties.json` ファイルを作成します。ファイルの内容は次のようになります。

  1. アプリケーションが Amazon Managed Service for Apache Flink で実行されると、デフォルトの動作により、Amazon Managed Service for Apache Flink アプリケーションで定義するランタイムプロパティからアプリケーション設定が読み込まれます。「[Managed Service for Apache Flink アプリケーションを作成して設定する](get-started-exercise.md#get-started-exercise-7)」を参照してください。

     ```
     private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException {
         if (env instanceof LocalStreamEnvironment) {
             LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE);
             return KinesisAnalyticsRuntime.getApplicationProperties(
                     BasicStreamingJob.class.getClassLoader()
                             .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath());
         } else {
             LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink");
             return KinesisAnalyticsRuntime.getApplicationProperties();
         }
     }
     ```
+ `main()` メソッドにより、アプリケーションのデータフローが定義されて実行されます。
  + デフォルトのストリーミング環境を初期化します。この例では、DataStream API で使用する `StreamExecutionEnvironment`、ならびに SQL および Table API で使用する `StreamTableEnvironment` の両方を作成する方法について紹介します。2 つの環境オブジェクトは、異なる API を使用するための、同じランタイム環境への 2 つの別々のリファレンスです。

    ```
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    ```
  + アプリケーション設定パラメータを読み込みます。アプリケーションが実行されている場所に応じて、正しい場所から自動的にロードします。

    ```
    Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    ```
  + Flink が[チェックポイント](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/stateful-stream-processing/#checkpointing)の完了時に、Amazon S3 出力ファイルに結果を書き込むためにアプリケーションが使用する [FileSystem シンクコネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink)。送信先にファイルを書き込むには、チェックポイントを有効にする必要があります。アプリケーションが Amazon Managed Service for Apache Flink で実行されているとき、アプリケーション設定はチェックポイントを制御してデフォルトで有効にします。逆に、ローカルで実行すると、チェックポイントはデフォルトで無効になります。アプリケーションはローカルで実行されていることを検出し、5,000 ミリ秒ごとにチェックポイントを設定します。

    ```
     if (env instanceof LocalStreamEnvironment) {
        env.enableCheckpointing(5000);
     }
    ```
  + このアプリケーションは、実際の外部ソースからデータを受信しません。[DataGen コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/) を介して処理するランダムデータを生成します。このコネクタは、DataStream API、SQL、Table API で利用できます。API 間の統合を実証するため、アプリケーションはより柔軟性が高い DataStram API バージョンを使用します。この場合、各レコードは `StockPriceGeneratorFunction` と呼ばれるジェネレーター関数によって生成され、カスタムロジックを配置することができます。

    ```
    DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>(
            new StockPriceGeneratorFunction(),
            Long.MAX_VALUE,
            RateLimiterStrategy.perSecond(recordPerSecond),
            TypeInformation.of(StockPrice.class));
    ```
  + DataStream API では、レコードにカスタムクラスを含めることができます。Flink がレコードとして使用できるように、クラスは特定のルールに従う必要があります。詳細については、「[Supported Data Types](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#supported-data-types)」を参照してください。この例では、`StockPrice` クラスは「[POJO](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#pojos)」です。
  + その後、ソースは実行環境にアタッチされ、`StockPrice` の `DataStream` が生成されます。このアプリケーションは[イベントタイムセマンティクス](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/#notions-of-time-event-time-and-processing-time)を使用せず、ウォーターマークを生成しません。DataGenerator ソースは、アプリケーションの残りの並列処理とは独立した並列処理 1 で実行します。

    ```
    DataStream<StockPrice> stockPrices = env.fromSource(
            source, 
            WatermarkStrategy.noWatermarks(),
            "data-generator"
        ).setParallelism(1);
    ```
  + データ処理フローの次の内容は、Table API および SQL を使用して定義されます。これを行うには、StockPrices の DataStream をテーブルに変換します。テーブルのスキーマは `StockPrice` クラスから自動的に推測されます。

    ```
    Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    ```
  + 次のコードのスニペットでは、プログラムによる Table API を使用してビューおよびクエリを定義する方法が示されます。

    ```
    Table filteredStockPricesTable = stockPricesTable.
            select(
                    $("eventTime").as("event_time"),
                    $("ticker"),
                    $("price"),
                    dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"),
                    dateFormat($("eventTime"), "HH").as("hr")
            ).where($("price").isGreater(50));
            
    tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    ```
  + シンクテーブルは、結果を JSON ファイルとして Amazon S3 バケットに書き込むように定義されます。プログラムでビューを定義する場合との違いを説明するために、Table API では SQL を使用してシンクテーブルが定義されます。

    ```
    tableEnv.executeSql("CREATE TABLE s3_sink (" +
                "eventTime TIMESTAMP(3)," +
                "ticker STRING," +
                "price DOUBLE," +
                "dt STRING," +
                "hr STRING" +
            ") PARTITIONED BY ( dt, hr ) WITH (" +
            "'connector' = 'filesystem'," +
            "'fmat' = 'json'," +
            "'path' = 's3a://"  + s3Path + "'" +
            ")");
    ```
  + 最後のステップは、フィルタリングされた株価ビューをシンクテーブルに挿入する `executeInsert()` です。このメソッドでは、これまでに定義したデータフローの実行が開始されます。

    ```
    filteredStockPricesTable.executeInsert("s3_sink");
    ```

### pom.xml ファイルを使用する
<a name="get-started-exercise-5-2"></a>

pom.xml ファイルによってアプリケーションに必要なすべての依存関係が定義され、Flink に必要なすべての依存関係を含む fat-jar を構築するため、Maven Shade プラグインが設定されます。
+ 一部の依存関係には `provided` スコープがあります。これらの依存関係は、アプリケーションが Amazon Managed Service for Apache Flink で実行されると自動的に利用可能となります。アプリケーションまたは IDE でアプリケーションをローカルに実行するために必要です。詳細については、「(TableAPI への更新)」[アプリケーションをローカルで実行する](get-started-exercise.md#get-started-exercise-5-run) を参照してください。Amazon Managed Service for Apache Flink で使用するランタイムと同じ Flink バージョンを使用していることを確認してください。TableAPI および SQL を使用するには、`flink-table-planner-loader` および `flink-table-runtime-dependencies` の両方に `provided` スコープがある状態で含める必要があります。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-clients</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-planner-loader</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-table-runtime</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ デフォルトのスコープで pom にその他の Apache Flink 依存関係を追加する必要があります。例えば、[DataGen コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/datagen/)、[FileSystem SQL コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/)、[JSON 形式](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/formats/json/)などです。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-datagen</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-files</artifactId>
      <version>${flink.version}</version>
  </dependency>
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-json</artifactId>
      <version>${flink.version}</version>
  </dependency>
  ```
+ ローカルで実行するときに Amazon S3 に書き込むには、S3 Hadoop ファイルシステムも `provided` スコープがある状態で含まれています。

  ```
  <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-s3-fs-hadoop</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
  </dependency>
  ```
+ Maven Java Compiler プラグインにより、Java 11 (Apache Flink で現在サポートされている JDK バージョン) に対してコードがコンパイルされていることが確認されます。
+ ランタイムによって提供される一部のライブラリを除き、Maven Shade プラグインによって fat-jar がパッケージ化されます。`ServicesResourceTransformer` および `ManifestResourceTransformer` という 2 つのトランスフォーマーも指定されます。後者は `main` メソッドを含むクラスを設定して、アプリケーションを起動します。メインクラスの名前を変更する場合、このトランスフォーマーを必ず更新してください。
+ 

  ```
  <plugin>
      <groupId>org.apache.maven.plugins</groupId>
      <artifactId>maven-shade-plugin</artifactId>
      ...
          <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
              <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass>
          </transformer>
      ...
  </plugin>
  ```

## アプリケーションをローカルで実行する
<a name="gs-table-run-locally"></a>

Flink アプリケーションを IDE でローカルで実行およびデバッグできます。

**注記**  
続行する前に、入力ストリームと出力ストリームが利用できることを確認してください。「[2 つの Amazon Kinesis Data Streams を作成する](get-started-exercise.md#get-started-exercise-1)」を参照してください。また、両方のストリームから読み書きするアクセス許可があることを確認してください。「[AWS セッションを認証する](get-started-exercise.md#get-started-exercise-2-5)」を参照してください。  
ローカル開発環境をセットアップするには、Java 開発用に Java 11 JDK、Apache Maven、IDE が必要です。必要な前提条件を満たしていることを確認してください。「[演習を完了するための前提条件を満たす](getting-started.md#setting-up-prerequisites)」を参照してください。

### Java プロジェクトを IDE にインポートする
<a name="gs-table-import"></a>

IDE でアプリケーションの使用を開始するには、Java プロジェクトとしてインポートする必要があります。

クローンしたリポジトリには、複数の例が含まれています。各例は個別のプロジェクトです。このチュートリアルでは、`./jave/GettingStartedTable` サブディレクトリのコンテンツを IDE にインポートします。

Maven を使用して、コードを既存の Java プロジェクトとして挿入します。

**注記**  
新しい Java プロジェクトをインポートする正確なプロセスは、使用している IDE によって異なります。

### ローカルアプリケーション設定を変更する
<a name="gs-table-modify-2"></a>

ローカルで実行すると、アプリケーションでは `./src/main/resources` のプロジェクトのリソースフォルダにある `application_properties.json` ファイルの設定が使用されます。このチュートリアルアプリケーションでは、設定パラメータはバケットの名前およびデータが書き込まれるパスです。

設定を編集し、このチュートリアルの冒頭で作成したバケットと一致するように Amazon S3 バケットの名前を変更します。

```
[
 {
 "PropertyGroupId": "bucket",
 "PropertyMap": {
 "name": "{{<bucket-name>}}",
 "path": "output"
 }
 }
]
```

**注記**  
設定プロパティ `name` には、`my-bucket-name` などのバケット名のみを含める必要があります。`s3://` や末尾のスラッシュなどのプレフィックスは含めないでください。  
パスを変更する場合、先頭または末尾のスラッシュを省略します。

### IDE の実行設定を設定する
<a name="gs-table-setupIDE"></a>

任意の Java アプリケーションを実行する場合と同様に、メインクラス `com.amazonaws.services.msf.BasicTableJob` を実行することで、IDE から直接 Flink アプリケーションを実行およびデバッグできます。アプリケーションを実行する前に、実行設定を設定する必要があります。セットアップは使用している IDE によって異なります。例えば、「IntelliJ IDEA ドキュメント」の「[Run/debug configurations](https://www.jetbrains.com/help/idea/run-debug-configuration.html)」を参照してください。特に、次の内容を設定する必要があります。

1. **クラスパスに `provided` 依存関係を追加します**。ローカルで実行するとき、`provided` スコープを持つ依存関係がアプリケーションに渡されることを確認するために必要です。この設定を行わないと、アプリケーションに `class not found` エラーが直ちに表示されます。

1. **Kinesis ストリームにアクセスするための AWS 認証情報をアプリケーションに渡します**。最も手軽な方法は、[AWS Toolkit for IntelliJ IDEA](https://aws.amazon.com/intellij/) を使用することです。実行設定でこの IDE プラグインを使用すると、特定の AWS プロファイルを選択できます。 AWS 認証は、このプロファイルを使用して行われます。 AWS 認証情報を直接渡す必要はありません。

1. IDE が **JDK 11** を使用してアプリケーションを実行することを確認してください。

### IDE でアプリケーションを実行する
<a name="gs-table-runIDE"></a>

`BasicTableJob` の実行設定を設定したら、通常の Java アプリケーションのように実行またはデバッグできます。

**注記**  
コマンドラインから `java -jar ...` を使用して、Maven によって生成される fat-jar を直接実行することはできません。この jar には、アプリケーションをスタンドアロンで実行するために必要な Flink コア依存関係は含まれていません。

アプリケーションが正常に起動すると、スタンドアロンのミニクラスターおよびコネクタの初期化に関する情報の一部がログ記録されます。この後、アプリケーションの起動時に Flink が通常出力する多数の INFO およびいくつかの WARN ログが続きます。

```
21:28:34,982 INFO  com.amazonaws.services.msf.BasicTableJob                     
                [] - Loading application properties from 'flink-application-properties-dev.json'
21:28:35,149 INFO  com.amazonaws.services.msf.BasicTableJob                     
[] - s3Path is ExampleBucket/my-output-bucket
...
```

初期化が完了したら、アプリケーションはそれ以上のログエントリを出力しません。**データが流れている間、ログは出力されません。**

アプリケーションがデータを正しく処理しているかどうか確認するには、次のセクションで記述されているとおり、出力バケットの内容を確認することができます。

**注記**  
 フローデータに関するログが出力されないことは、Flink アプリケーションの通常の動作です。すべてのレコードに関するログが出力されるとデバッグに便利ですが、実稼働での実行時にかなりのオーバーヘッドが発生する可能性があります。

## アプリケーションが S3 バケットに書き込むデータを観察する
<a name="gs-table-input-output"></a>

このサンプルアプリケーションによって内部的にランダムデータが生成され、このデータが設定した送信先 S3 バケットに書き込まれます。デフォルトの設定パスを変更しない限り、データは `output` パスに書き込まれ、その後にデータと時間のパーティショニングが「`./output/<yyyy-MM-dd>/<HH>`」形式で実行されます。

[FileSystem シンクコネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/filesystem/#streaming-sink)は、Flink チェックポイントに新しいファイルを作成します。ローカルで実行すると、コードで指定されているとおり、アプリケーションは 5 秒 (5,000 ミリ秒) ごとにチェックポイントを実行します。

```
 if (env instanceof LocalStreamEnvironment) {
    env.enableCheckpointing(5000);
 }
```

**S3 バケットを参照して、アプリケーションによって書き込まれたファイルを観察する方法**

1. 

   1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. 以前に作成したバケットを選択します。

1. `output` パスに移動したら、UTC タイムゾーンで現在の時間に対応する日付および時刻のフォルダに移動します。

1. 定期的に更新して、5 秒ごとに新しいファイルが表示されることを観察します。

1. 1 つのファイルを選択してダウンロードし、コンテンツを確認してください。
**注記**  
デフォルトでは、ファイルには拡張子はありません。コンテンツは JSON 形式です。任意のテキストエディタでファイルを開いて、コンテンツを確認することができます。

## ローカルで実行されているアプリケーションを停止する
<a name="gs-table-stop"></a>

IDE で実行されているアプリケーションを停止します。　 通常、IDE には「停止」オプションがあります。正確な場所および方法は IDE によって異なります。

## アプリケーションコードをコンパイルしてパッケージ化する
<a name="gs-table-5.5"></a>

このセクションでは、Apache Maven を使用して Java コードをコンパイルし、JAR ファイルにパッケージ化します。Maven コマンドラインツールまたは IDE を使用して、コードをコンパイルおよびパッケージ化できます。

**Maven コマンドラインを使用してコンパイルおよびパッケージ化する方法**

Java 入門プロジェクトを含むディレクトリに移動して、次のコマンドを実行します。

```
$ mvn package
```

**IDE を使用してコンパイルおよびパッケージ化する方法**

IDE Maven 統合から `mvn package` を実行します。

どちらの場合も、JAR ファイル `target/amazon-msf-java-table-app-1.0.jar` が作成されます。

**注記**  
IDE から *ビルドプロジェクト* を実行しても、JAR ファイルが作成されない場合があります。

## アプリケーションコードの JAR ファイルをアップロードする
<a name="gs-table-6"></a>

このセクションでは、以前のセクションで作成した JAR ファイルを、このチュートリアルの冒頭で作成した Amazon S3 バケットにアップロードします。まだ完了していない場合、[Amazon S3 バケットを作成する](#gs-table-resources-s3) を完了してください。

**アプリケーションコードをアップロードするには**

1. Amazon S3 コンソール ([https://console.aws.amazon.com/s3/](https://console.aws.amazon.com/s3/)) を開きます。

1. アプリケーションコード用に以前作成したバケットを選択します。

1. **[アップロード]** フィールドを選択します。

1. **ファイルの追加**を選択します。

1. 以前のセクションで生成された JAR ファイル (`target/amazon-msf-java-table-app-1.0.jar`) に移動します。

1. 他の設定を変更せずに **[アップロード]** を選択します。
**警告**  
 `<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar` で正しい JAR ファイルを選択していることを確認してください。  
ターゲットディレクトリには、アップロードする必要のない他の JAR ファイルも含まれています。

## Managed Service for Apache Flink アプリケーションを作成して設定する
<a name="gs-table-7"></a>

コンソールまたは AWS CLIのいずれかを使用して Managed Service for Apache Flink アプリケーションを作成および設定することができます。このチュートリアルでは、コンソールを使用します。

**注記**  
コンソールを使用してアプリケーションを作成すると、 AWS Identity and Access Management (IAM) リソースと Amazon CloudWatch Logs リソースが自動的に作成されます。 AWS CLIを使用してアプリケーションを作成する場合、これらのリソースを個別に作成する必要があります。

### アプリケーションの作成
<a name="gs-table-7-console-create"></a>

1. にサインインし AWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

1. 正しいリージョンが選択されていることを確認してください: 米国東部 (バージニア北部) us-east-1。

1. 右側のメニューで **[Apache Flink アプリケーション]** を選択したら、**[ストリーミングアプリケーションの作成]** を選択します。または、最初のページの **[開始方法]** セクションで **[ストリーミングアプリケーションの作成]** を選択します。

1. **[ストリーミングアプリケーションの作成]** ページで、次の操作を完了します。
   + **[ストリーム処理アプリケーションの設定方法の選択]** には、**[最初から作成]** を選択します。
   + **[Apache Flink の設定、Application Flink バージョン]** には、**[Apache Flink 1.19]** を選択します。
   + **[アプリケーション設定]** セクションで、次の操作を完了します。
     + [**アプリケーション名**] には **MyApplication** と入力します。
     + [**Description (説明)**] に **My Java Table API test app** と入力します。
     + **[アプリケーションリソースへのアクセス]** には、**[必要なポリシーを使用して IAM ロール kinesis-analytics-MyApplication-us-east-1 を作成/更新]** を選択します。
   + **[アプリケーション設定のテンプレート]** で、次の操作を完了します。
     + **[テンプレート]** で **[開発]** を選択します。

1. **[ストリーミングアプリケーションの作成]** を選択します。

**注記**  
コンソールを使用して Apache Flink アプリケーション用 Managed Service を作成する場合は、IAM ロールとポリシーをアプリケーションが自動的に作成するオプションを選択できます。アプリケーションではこのロールとポリシーを使用して、依存リソースにアクセスします。これらの IAM リソースは、次のようにアプリケーション名とリージョンを使用して命名されます。  
ポリシー: `kinesis-analytics-service-{{MyApplication}}-{{us-east-1}}`
ロール: `kinesisanalytics-{{MyApplication}}-{{us-east-1}}`

### IAM ポリシーを編集する
<a name="gs-table-7-console-iam"></a>

Amazon S3 バケットにアクセスする許可を追加するように IAM ポリシーを編集します。

**IAM ポリシーを編集して S3 バケット権限を追加するには**

1. IAM コンソール ([https://console.aws.amazon.com/iam/](https://console.aws.amazon.com/iam/)) を開きます。

1. **[ポリシー]** を選択します。前のセクションでコンソールによって作成された **`kinesis-analytics-service-MyApplication-us-east-1`** ポリシーを選択します。

1. **[編集]** を選択して、**[JSON]** タブを選択します。

1. 次のポリシー例で強調表示されているセクションをポリシーに追加します。サンプルアカウント ID ({{012345678901}}) をアカウント ID に置き換え、{{<bucket-name>}} を作成した S3 バケットの名前に置き換えます。

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement": [
           {
               "Sid": "ReadCode",
               "Effect": "Allow",
               "Action": [
                   "s3:GetObject",
                   "s3:GetObjectVersion"
               ],
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket}}/kinesis-analytics-placeholder-s3-object"
               ]
           },
           {
               "Sid": "ListCloudwatchLogGroups",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogGroups"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:*"
               ]
           },
           {
               "Sid": "ListCloudwatchLogStreams",
               "Effect": "Allow",
               "Action": [
                   "logs:DescribeLogStreams"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*"
               ]
           },
           {
               "Sid": "PutCloudwatchLogs",
               "Effect": "Allow",
               "Action": [
                   "logs:PutLogEvents"
               ],
               "Resource": [
                   "arn:aws:logs:{{us-east-1}}:{{123456789012}}:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream"
               ]
           }{{, 
           {
               "Sid": "WriteOutputBucket", 
               "Effect": "Allow", 
               "Action": "s3:*", 
               "Resource": [
                   "arn:aws:s3:::{{amzn-s3-demo-bucket2}}"
               ]
          }}}
       ]
   }
   ```

------

1.  [**次へ**]、[**変更を保存**] の順に選択します。

### アプリケーションを設定する
<a name="gs-table-7-console-configure"></a>

アプリケーションを編集して、アプリケーションコードアーティファクトを設定します。

**アプリケーションを構成するには**

1. [**MyApplication**] ページで、[**Congirue**] を選択します。

1. **[アプリケーションコードの場所]** セクションで、**[設定]** を選択します。
   + **[Amazon S3 バケット]** には、アプリケーションコード用に以前作成したバケットを選択します。**[参照]** を選択して正しいバケットを選択したら、**[選択]** を選択します。バケット名はクリックしないでください。
   + [**Amazon S3 オブジェクトへのパス**] で、**amazon-msf-java-table-app-1.0.jar**と入力します。

1. [**アクセス許可**] には、[**IAM ロールの作成 / 更新`kinesis-analytics-MyApplication-us-east-1`**] を選択します。

1. **[ランタイムプロパティ]** セクションで、次のプロパティを追加します。

1. **[新しい項目の追加]** を選択して、次のパラメータをすべて追加します。    
[See the AWS documentation website for more details](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/gs-table-create.html)

1. 他の設定を変更しないでください。

1. **[Save changes]** (変更の保存) をクリックします。

**注記**  
Amazon CloudWatch ログ記録を有効にすることを選択すると、ロググループとログストリームが Kinesis Data Analytics によって作成されます。これらのリソースの名前は次のとおりです。  
ロググループ: `/aws/kinesis-analytics/MyApplication`
ログストリーム: `kinesis-analytics-log-stream`

### アプリケーションを実行する
<a name="gs-table-7-console-run"></a>

これでアプリケーションが設定され、実行する準備が整いました。

**アプリケーションを実行するには**

1. Amazon Managed Service for Apache Flink のコンソールページに戻り、**[MyApplication]** を選択します。

1. **[Run]** を選択してアプリケーションを起動します。

1. **[アプリケーションの復元設定]** で、**[最新のスナップショットで実行]** を選択します。

1. **[Run]** (実行) を選択します。

1. **[アプリケーション詳細]** の **[ステータス]** は「`Ready`」から「`Starting`」に移行し、アプリケーションが起動された後に「`Running`」に移行します。

アプリケーションが「`Running`」ステータスのとき、Flink ダッシュボードを開くことができます。

**ダッシュボードを開いてジョブを表示する方法**

1. **[Apache Flink ダッシュボードを開く]** を選択します。ダッシュボードは新しいページで開かれます。

1. **[実行中のジョブ]** リストで、確認できる 1 つのジョブを選択します。
**注記**  
ランタイムプロパティを設定したり、IAM ポリシーを誤って編集したりすると、アプリケーションのステータスが「`Running`」に変更されることがありますが、Flink ダッシュボードではジョブが継続的に再起動されていることが表示されます。ほれは、アプリケーションの設定が間違っているか、外部リソースへのアクセス許可がない場合の一般的な障害シナリオです。  
これが発生した場合、Flink ダッシュボードの **[例外]** タブを見て、問題の原因を調査します。

### 実行中のアプリケーションのメトリクスを観察する
<a name="gs-observe-metrics"></a>

**[MyApplication]** ページの **[Amazon CloudWatch メトリクス]** セクションで、実行中のアプリケーションの基本的なメトリクスの一部を確認できます。

**メトリクスを表示する方法**

1. **[更新]** ボタンの横にあるドロップダウンリストから **[10 秒]** を選択します。

1. アプリケーションが実行中で正常なとき、**[アップタイム]** メトリクスが継続的に増加していることを確認できます。

1. **[完全再起動]** メトリクスはゼロである必要があります。増加している場合、設定に問題がある可能性があります。Flink ダッシュボードの **[例外]** タブを確認して、問題を調査します。

1. 正常なアプリケーションでは、**[失敗したチェックポイント数]** メトリクスは 0 です。
**注記**  
このダッシュボードには、5 分の粒度で一定の一連のメトリクスが表示されます。CloudWatch ダッシュボードで任意のメトリクスを使用してカスタムアプリケーションのダッシュボードを作成できます。

### アプリケーションが送信先バケットにデータを書き込むことを観察する
<a name="gs-observe-output"></a>

Amazon Managed Service for Apache Flink で実行されているアプリケーションが Amazon S3 にファイルを書き込んでいることを観察できるようになりました。

ファイルを観察するには、アプリケーションがローカルで実行されていたときに書き込まれたファイルの確認に使用したのと同じプロセスに従います。「[アプリケーションが S3 バケットに書き込むデータを観察する](#gs-table-input-output)」を参照してください。

アプリケーションが Flink チェックポイントに新しいファイルを書き込むことに注意してください。Amazon Managed Service for Apache Flink で実行すると、チェックポイントはデフォルトで有効になり、60 秒ごとに実行されます。アプリケーションは、約 1 分ごとに新しいファイルを作成します。

### アプリケーションを停止する
<a name="gs-table-7-console-stop"></a>

アプリケーションを停止するには、`MyApplication` という名前の Managed Service for Apache Flink アプリケーションのコンソールページに移動します。

**アプリケーションを停止するには**

1. **[アクション]** ドロップダウンリストで、**[停止]** を選択します。

1. **[アプリケーション詳細]** の **[ステータス]** は「`Running`」から「`Stopping`」に移行し、アプリケーションが完全に停止すると「`Ready`」に移行します。
**注記**  
Python スクリプトまたは Kinesis Data Generator から入力ストリームへのデータ送信も必ず停止してください。