

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Managed Service for Apache Flink: 仕組み
<a name="how-it-works"></a>

Managed Service for Apache Flink は、ストリーミングデータを処理するために Apache Flink アプリケーションを使用できるフルマネージド Amazon サービスです。まず、Apache Flink アプリケーションをプログラミングし、次に Managed Service for Apache Flink アプリケーションを作成します。

## Apache Flink アプリケーションのプログラミング
<a name="how-it-works-programming"></a>

Apache Flink アプリケーションは、Apache Flink フレームワークを使用して作成された Java または Scala アプリケーションです。Apache Flink アプリケーションはローカルで作成してビルドします。

アプリケーションは主に「[DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html)」 または「[テーブル API](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/table/)」を使用します。他の Apache Flink API も使用できますが、ストリーミングアプリケーションの構築にはあまり使用されません。

2 つの API の特徴は、次のとおりです。

### Datastream API
<a name="how-it-works-prog-datastream"></a>

Apache Flink データストリーム API プログラミングモデルは次の 2 つのコンポーネントに基づいています。
+ 「**データストリーム:**」データレコードの連続フローを構造化して表現したものです。
+ 「**変換演算子:**」1 つ以上のデータストリームを入力として受け取り、1 つ以上のデータストリームを出力として生成します。

DataStream API で作成されたアプリケーションは次のことを行います。
+ データソース (Kinesis ストリームや Amazon MSK トピックなど) からデータを読み取ります。
+ フィルタリング、集約、エンリッチメントなどの変換をデータに適用します。
+ 変換したデータをデータシンクに書き込みます。

DataStream API を使用するアプリケーションは Java または Scala で記述でき、Kinesis データストリーム、Amazon MSK トピック、またはカスタムソースから読み取ることができます。

アプリケーションは「コネクタ」を使用してデータを処理します。Apache Flink は、次のタイプのコネクタを使用しています。
+ 「**ソース**」:外部データの読み取りに使用されるコネクター。
+ 「**シンク**」:外部への書き込みに使用されるコネクター。
+ 「**オペレータ**」:アプリケーション内のデータを処理するために使用されるコネクタ。

一般的なアプリケーションは、ソース付きの少なくとも 1 つのデータストリーム、1 つ以上のオペレータを含むデータストリーム、および少なくとも 1 つのデータシンクで構成されます。

DataStream API の使用の詳細については、「[DataStream API コンポーネントを確認する](how-datastream.md)」 を参照してください。

### Table API
<a name="how-it-works-prog-table"></a>

Apache Flink Table API プログラミングモデルは、以下のコンポーネントに基づいています。
+ 「**テーブル環境:**」1 つ以上のテーブルを作成およびホストするために使用する基礎データへのインターフェースです。
+ 「**テーブル:**」SQL テーブルまたはビューへのアクセスを提供するオブジェクト。
+ 「**テーブルソース:**」Amazon MSK トピックなどの外部ソースからデータを読み取るために使用されます。
+ 「**テーブル関数:**」データ変換に使用される SQL クエリまたは API 呼び出し。
+ 「**テーブルシンク:**」Amazon S3 バケットなどの外部の場所にデータを書き込むために使用されます。

Table API で作成されたアプリケーションは次のことを行います。
+ `Table Source` に接続して `TableEnvironment` を作成します。
+ SQL クエリまたはテーブル API 関数を使用して、`TableEnvironment` にテーブルを作成します。
+ テーブル API または SQL を使用してテーブルに対してクエリを実行します。
+ テーブルファンクションまたは SQL クエリを使用して、クエリの結果に変換を適用します。
+ クエリまたは関数の結果を `Table Sink` に書き込みます。

Table API を使用するアプリケーションは Java または Scala で作成でき、API 呼び出しまたは SQL クエリを使用してデータをクエリできます。

テーブル API の使用方法の詳細については、「[Table API コンポーネントを確認する](how-table.md)」 を参照してください。

## Managed Service for Apache Flink アプリケーションを作成する
<a name="how-it-works-app"></a>

Managed Service for Apache Flink は、Apache Flink アプリケーションをホストするための環境を作成し、次の設定を提供する AWS サービスです。
+ 「**[ランタイムプロパティを使用する](how-properties.md): **」 アプリケーションに提供できるパラメータ。これらのパラメータは、アプリケーションコードを再コンパイルしなくても変更できます。
+ 「**[耐障害性を実装する](how-fault.md)**」: アプリケーションが中断や再起動から回復する方法。
+ 「**[Amazon Managed Service for Apache Flink でのロギングとモニタリング](monitoring-overview.md)**」: アプリケーションが CloudWatch Logs にイベントを記録する方法。
+ 「**[アプリケーションスケーリングを実装する](how-scaling.md)**」: アプリケーションがコンピューティングリソースをプロビジョニングする方法。

Apache Flink アプリケーション用 Managed Serviceは、コンソールまたは AWS CLIを使用して作成します。Apache Flink 用 Managed Serviceの作成を開始するには、 [チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要](getting-started.md) を参照してください。

# Managed Service for Apache Flink アプリケーションを作成する
<a name="how-creating-apps"></a>

このトピックには、Managed Service for Apache Flink アプリケーションの作成に関する情報が含まれています。

**Topics**
+ [Managed Service for Apache Flink アプリケーションコードをビルドする](#how-creating-apps-building)
+ [Managed Service for Apache Flink アプリケーションを作成する](#how-creating-apps-creating)
+ [カスタマーマネージドキーを使用する](#how-creating-apps-use-cmk)
+ [Managed Service for Apache Flink アプリケーションを起動する](#how-creating-apps-starting)
+ [Managed Service for Apache Flink アプリケーションを検証する](#how-creating-apps-verifying)
+ [Managed Service for Apache Flink アプリケーションのシステムロールバックを有効にする](how-system-rollbacks.md)

## Managed Service for Apache Flink アプリケーションコードをビルドする
<a name="how-creating-apps-building"></a>

このセクションでは、Managed Service for Apache Flink アプリケーションのアプリケーションコードをビルドするために使用するコンポーネントについて説明します。

アプリケーションコードに対してサポートされている最新バージョンの Apache Flink を使用することをお勧めします。Apache Flink アプリケーション用 Managed Service のアップグレードについては、[Apache Flink のインプレースバージョンアップグレードを使用する](how-in-place-version-upgrades.md) を参照してください。

アプリケーションコードは「[Apache Maven](https://maven.apache.org/)」を使用してビルドします。Apache Maven プロジェクトは「`pom.xml`」ファイルを使用して、使用するコンポーネントのバージョンを指定します。

**注記**  
Apache Flink 用 Managed Service は、最大 512 MB の JAR ファイルをサポートします。これより大きい JAR ファイルを使用すると、アプリケーションは起動に失敗します。

アプリケーションが Scala の任意のバージョンから Java API を使用できるようになっています。選択した Scala 標準ライブラリーを Scala アプリケーションにバンドルする必要があります。

「**Apache Beam**」を使用する Apache Flink アプリケーション用 Managed Service の作成については、 [Managed Service for Apache Flink アプリケーションに Apache Beam を使用する](how-creating-apps-beam.md) を参照してください。

### アプリケーションの Apache Flink バージョンを指定する
<a name="how-creating-apps-building-flink"></a>

Apache Flink Runtime バージョン 1.1.0 以降の Managed Service を使用する場合は、アプリケーションをコンパイルするときにアプリケーションが使用する Apache Flink のバージョンを指定します。`-Dflink.version` パラメータを使用して Apache Flink のバージョンを指定します。たとえば、Apache Flink 2.2.0 を使用している場合は、以下を指定します。

```
mvn package -Dflink.version=2.2.0
```

以前のバージョンの Apache Flink でアプリケーションをビルドする方法については、「[以前のバージョン](earlier.md)」を参照してください。

## Managed Service for Apache Flink アプリケーションを作成する
<a name="how-creating-apps-creating"></a>

アプリケーションコードをビルドしたら、次の手順を実行して Managed Service for Apache Flink (Amazon MSF) アプリケーションを作成します。
+ 「**アプリケーションコードのアップロード**:」アプリケーションコードを Amazon S3 バケットにアップロードします。アプリケーションを作成する際は、アプリケーションコードの S3 バケット名とオブジェクト名を指定します。アプリケーションコードのアップロード方法を示すチュートリアルについては、「[チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要](getting-started.md) チュートリアル」を参照してください。
+ **Managed Service for Apache Flink アプリケーションを作成する**: 次のいずれかの方法を使用して Amazon MSF アプリケーションを作成します。
**注記**  
Amazon MSF は、デフォルトで を使用してアプリケーションを暗号化します AWS 所有のキー。 AWS KMS カスタマーマネージドキー (CMKs) を使用して新しいアプリケーションを作成し、キーを自分で作成、所有、管理することもできます。CMK の詳細については、「[Amazon Managed Service for Apache Flink のキー管理](key-management-flink.md)」を参照してください。
  + ** AWS コンソールを使用して Amazon MSF アプリケーションを作成する:** AWS コンソールを使用してアプリケーションを作成および設定できます。

    コンソールを使用してアプリケーションを作成する場合は、アプリケーションの依存リソース (CloudWatch Logs ストリーム、IAM ロール、IAM ポリシーなど) が作成されます。

    コンソールを使用してアプリケーションを作成する場合、「**Apache Flink 用 Managed Service - アプリケーションの作成**」ページのプルダウンから選択して、アプリケーションが使用する Apache Flink のバージョンを指定します。

    コンソールを使用してアプリケーションを作成する方法に関するチュートリアルについては、「[チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要](getting-started.md) チュートリアル」を参照してください。
  + **CLI を使用して Amazon MSF AWS アプリケーションを作成する:** CLI AWS を使用してアプリケーションを作成および設定できます。

    CLI を使用してアプリケーションを作成する場合、アプリケーションの依存リソース (CloudWatch Logs ストリーム、IAM ロール、IAM ポリシーなど) も手動で作成する必要があります。

    CLI を使用してアプリケーションを作成する場合、 `CreateApplication` アクションの `RuntimeEnvironment` パラメータを使用して、アプリケーションが使用する Apache Flink のバージョンを指定します。
**注記**  
既存のアプリケーションの `RuntimeEnvironment` を変更できます。この方法の詳細は、「[Apache Flink のインプレースバージョンアップグレードを使用する](how-in-place-version-upgrades.md)」を参照してください。

## カスタマーマネージドキーを使用する
<a name="how-creating-apps-use-cmk"></a>

Amazon MSF では、カスタマーマネージドキー (CMKs) は、 AWS Key Management Service () で作成、所有、管理するキーを使用してアプリケーションのデータを暗号化できる機能ですAWS KMS。Amazon MSF アプリケーションの場合、Flink [チェックポイント](how-fault.md)または[スナップショット](how-snapshots.md)の対象となるすべてのデータが、そのアプリケーション用に定義した CMK で暗号化されます。

アプリケーションで CMK を使用するには、まず[新しいアプリケーションを作成](#how-creating-apps-creating)してから、CMK を適用する必要があります。CMK の使用方法の詳細については、「[Amazon Managed Service for Apache Flink のキー管理](key-management-flink.md)」を参照してください。

## Managed Service for Apache Flink アプリケーションを起動する
<a name="how-creating-apps-starting"></a>

アプリケーションコードを作成し、S3 にアップロードし、Apache Flink アプリケーション用 Managed Service を作成したら、アプリケーションを起動します。Apache Flink 用 Managed Service アプリケーションの起動には、通常数分かかります。

アプリケーションを起動するには、以下のいずれかの方法を使用します。
+ ** AWS コンソールを使用して Managed Service for Apache Flink アプリケーションを起動する:** コンソールのアプリケーションのページで**実行**を選択して、アプリケーションを実行できます AWS 。
+ ** AWS API を使用して Managed Service for Apache Flink アプリケーションを起動する:** [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) アクションを使用してアプリケーションを実行できます。

## Managed Service for Apache Flink アプリケーションを検証する
<a name="how-creating-apps-verifying"></a>

アプリケーションが動作していることを確認するには、次の方法があります。
+ 「**CloudWatch Logs の使用:**」CloudWatch Logs と CloudWatch Logs インサイトを使用して、アプリケーションが正しく実行されていることを確認できます。Apache Flink アプリケーション用 Managed Service で CloudWatch Logs を使用する方法については、「[Amazon Managed Service for Apache Flink でのロギングとモニタリング](monitoring-overview.md)」を参照してください。
+ **CloudWatch メトリクスを使用する**: CloudWatch メトリクスを使用して、アプリケーションのアクティビティ、またはアプリケーションが入力または出力に使用するリソース (Kinesis ストリーム、Firehose ストリーム、Amazon S3 バケットなど) のアクティビティを監視できます。CloudWatch メトリクスの保持の詳細については、Amazon CloudWatch ユーザーガイドの[「メトリクスの保持」](https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/working_with_metrics.html)を参照してください。
+ 「**出力ロケーションのモニタリング:**」アプリケーションが出力を特定のロケーション (Amazon S3 バケットやデータベースなど) に書き込む場合、そのロケーションに書き込まれたデータを監視できます。

# Managed Service for Apache Flink アプリケーションのシステムロールバックを有効にする
<a name="how-system-rollbacks"></a>

システムロールバック機能を使用すると、Amazon Managed Service for Apache Flink で実行中の Apache Flink アプリケーションの高可用性を実現できます。この設定をオプトインすると、`UpdateApplication` や `autoscaling` などのアクションでコードや設定のバグが発生したときに、サービスによってアプリケーションを、以前に実行されていたバージョンに自動的に戻すことができます。

**注記**  
システムロールバック機能を使用するには、アプリケーションを更新してオプトインする必要があります。既存のアプリケーションは、デフォルトではシステムロールバックを自動的に使用しません。

## 仕組み
<a name="how-rollback-works"></a>

更新やアクションのスケールなど、アプリケーション操作を開始すると、Amazon Managed Service for Apache Flink は、最初にその操作を実行しようとします。コードのバグやアクセス許可の不足など、問題が検出されて操作が成功しない場合、サービスは自動的に `RollbackApplication` 操作を開始します。

ロールバックでは、アプリケーションを、関連するアプリケーションの状態とともに、正常に実行されていた以前のバージョンにリストアしようとします。ロールバックが成功すると、アプリケーションは以前のバージョンを使用して、最小限のダウンタイムでデータを処理し続けます。自動ロールバックも失敗した場合、Amazon Managed Service for Apache Flink は、アプリケーションを `READY` ステータスに移行させるため、ユーザーがエラーを修正して操作を再試行するなどのアクションを実行できます。

自動システムロールバックを使用するには、オプトインする必要があります。コンソールまたは API を使用して、アプリケーションに対するこの時点以降のすべての操作で有効にすることができます。

`UpdateApplication` アクションの以下のリクエスト例は、アプリケーションのシステムロールバックを有効にします。

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSystemRollbackConfigurationUpdate": { 
         "RollbackEnabledUpdate": "true"
       }
    }
}
```

## 自動システムロールバックの一般的なシナリオを確認する
<a name="common-scenarios"></a>

次のシナリオは、自動システムロールバックが役立つケースを示しています。
+ **アプリケーションの更新:** メインメソッドを使用して Flink ジョブを初期化するときにバグがある新しいコードでアプリケーションを更新した場合、自動ロールバックで、以前に動作していたバージョンに復元できます。システムロールバックが役立つその他の更新シナリオは次のとおりです。
  + アプリケーションが [maxParallelism](https://docs.aws.amazon.com/managed-flink/latest/java/how-scaling.html#how-scaling-auto) よりも多い並列処理数で実行されるように更新された場合。
  + VPC アプリケーションのサブネットが正しくない状態で実行されるようにアプリケーションが更新され、Flink ジョブの起動中に失敗する場合。
+ **Flink バージョンのアップグレード:** 新しい Apache Flink バージョンにアップグレードし、アップグレードしたアプリケーションでスナップショットの互換性の問題が発生した場合、システムロールバックで以前の Flink バージョンに自動的に戻すことができます。
+ **AutoScaling:** スナップショットと Flink ジョブグラフの間でオペレーターの不一致があるため、アプリケーションがスケールアップしてもセーブポイントから復元する際に問題が発生する場合。

## システムロールバックにオペレーション API を使用する
<a name="operation-apis"></a>

可視性を高めるために、Amazon Managed Service for Apache Flink には、障害や関連するシステムロールバックの追跡に役立つアプリケーションオペレーションに関連する 2 つの API があります。

`ListApplicationOperations`

この API は、`UpdateApplication`、`Maintenance`、`RollbackApplication` など、アプリケーションで実行されたすべてのオペレーションを逆の時系列で一覧表示します。`ListApplicationOperations` アクションの以下のリクエスト例では、アプリケーションに対する最初の 10 件のアプリケーションオペレーションが一覧表示されます。

```
{
   "ApplicationName": "MyApplication",
   "Limit": 10
}
```

次の `ListApplicationOperations` のリクエスト例は、リストをフィルタリングしてアプリケーションの以前の更新のみ表示するのに役立ちます。

```
{
   "ApplicationName": "MyApplication",
   "operation": "UpdateApplication"
}
```

`DescribeApplicationOperation`

この API は、障害の理由 (該当する場合) など、`ListApplicationOperations` に一覧表示されている特定のオペレーションに関する詳細情報を提供します。`DescribeApplicationOperation` アクションの以下のリクエスト例では、特定のアプリケーションオペレーションの詳細を一覧表示します。

```
{
   "ApplicationName": "MyApplication",
   "OperationId": "xyzoperation"
}
```

トラブルシューティング情報については、「[システムロールバックのベストプラクティス](troubleshooting-system-rollback.md)」を参照してください。

# Managed Service for Apache Flink アプリケーションを実行する
<a name="how-running-apps"></a>

このトピックには、Apache Flink 用 Managed Serviceの実行に関する情報が含まれています。

Apache Flink アプリケーション用 Managed Serviceを実行すると、サービスによって Apache Flink ジョブが作成されます。Apache Flink ジョブは、Apache Flink アプリケーション用 Managed Serviceの実行ライフサイクルです。Job の実行とそれが使用するリソースは、ジョブマネージャーによって管理されます。ジョブマネージャは、アプリケーションの実行をタスクに分割します。各タスクはタスクマネージャーによって管理されます。アプリケーションのパフォーマンスを監視する場合、各タスクマネージャーまたはジョブマネージャー全体のパフォーマンスを調べることができます。

Apache Flink ジョブの詳細については、Apache Flink ドキュメントの「[Jobs and Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/)」を参照してください。

## アプリケーションとジョブのステータスを特定する
<a name="how-running-job-status"></a>

アプリケーションとアプリケーションのジョブの両方に現在の実行ステータスがあります。
+ 「**アプリケーションステータス:**」アプリケーションには、実行フェーズを説明する現在のステータスがあります。アプリケーション状態には以下のものがあります。
  + 「**安定したアプリケーションステータス:**」通常、ステータスを変更するまで、アプリケーションは次のステータスのままになります。
    + 「**READY:**」新規または停止中のアプリケーションは、実行するまで準備完了状態です。
    + 「**RUNNING**」正常に起動したアプリケーションは RUNNING ステータスになります。
  + 「**一時的なアプリケーションステータス:**」これらのステータスのアプリケーションは、通常、別のステータスへの移行中です。アプリケーションが一定時間一時的な状態のままである場合は、 `Force` パラメータを `true` に設定して 「[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)」 アクションを使用してアプリケーションを停止できます。これらのステータスには以下のものが含まれる：
    + `STARTING:` は「[StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)」アクションの後に発生します。アプリケーションは `READY` から `RUNNING` ステータスに移行中です。
    + `STOPPING:` は 「[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)」 アクションの後に発生します。アプリケーションは `RUNNING` から `READY` ステータスに移行中です。
    + `DELETING:` は 「[DeleteApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplication.html)」 アクションの後に発生します。アプリケーション は削除中です。
    + `UPDATING:` は 「[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)」 アクションの後に発生します。アプリケーションは更新中で、 `RUNNING` または `READY` ステータスに戻ります。
    + `AUTOSCALING:` アプリケーションでは 「[ParallelismConfiguration ](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ParallelismConfiguration.html)」 の `AutoScalingEnabled` プロパティが `true` に設定されており、サービスはアプリケーションの並列度を増やしています。「[アプリケーション](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)」 がこの状態の場合、使用できる有効な API アクションは、 `Force` パラメータを `true` に設定した StopApplication アクションだけです。自動スケーリングの詳細については、 「[Managed Service for Apache Flink で自動スケーリングを使用する](how-scaling-auto.md)」 を参照してください。
    + `FORCE_STOPPING:` は `Force` パラメータを `true` に設定して「[StopApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StopApplication.html)」アクションが呼び出された後に発生します。アプリケーションは強制停止中です。アプリケーションは`STARTING` 、`UPDATING` 、`STOPPING` または `AUTOSCALING` ステータスから `READY` ステータスに移行中です。
    + `ROLLING_BACK:` は 「[rollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html)」 アクションが呼び出された後に発生します。アプリケーションを以前のバージョンにロールバックしています。アプリケーションは `UPDATING` または `AUTOSCALING` ステータスから `RUNNING` ステータスに移行します。
    + `MAINTENANCE:` は Managed Service for Apache Flinkがアプリケーションにパッチを適用しているときに発生します。詳細については、「[Managed Service for Apache Flink のメンテナンスタスクを管理する](maintenance.md)」を参照してください。

  アプリケーションのステータスは、コンソールを使用するか、 「[DescribeApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplication.html)」 アクションを使用して確認できます。
+ 「**Job ステータス:**」 アプリケーションが `RUNNING` ステータスになると、ジョブには現在の実行フェーズを説明するステータスが表示されます。ジョブは `CREATED` ステータスで開始し、開始後 `RUNNING` ステータスに進みます。エラー状態が発生すると、アプリケーションは次のステータスになります。
  + Apache Flink 1.11 以降を使用するアプリケーションでは、アプリケーションが `RESTARTING` ステータスになります。
  + Apache Flink 1.8 以前を使用するアプリケーションでは、アプリケーションが `FAILING` ステータスに入ります。

  その後、アプリケーションは、ジョブを再開できるかどうかに応じて、 `RESTARTING` または `FAILED` のステータスに進みます。

  ジョブのステータスは、アプリケーションの CloudWatch ログでステータスの変化を確認することで確認できます。

## バッチワークロードを実行する
<a name="batch-workloads"></a>

Apache Flink 用 Managed ServiceApache Flink 用 Managed Serviceは、バッチワークロードの実行をサポートしています。バッチジョブでは、Apache Flink ジョブが 「**FINISHED**」 ステータスになると、Apache Flink アプリケーションス用 Managed Service のテータスは 「**READY**」 に設定されます。Flink ジョブのステータスについて詳しくは、「[ジョブとスケジューリング](https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/internals/job_scheduling/)」 を参照してください。

# Managed Service for Apache Flink アプリケーションリソースを確認する
<a name="how-resources"></a>

このセクションでは、アプリケーションが使用するシステムリソースについて説明します。Apache Flink 用 Managed Serviceがどのようにリソースをプロビジョニングして使用するかを理解しておくと、パフォーマンスが高く安定した Apache Flink アプリケーション用 Managed Serviceの設計、作成、維持に役立ちます。

## Managed Service for Apache Flink アプリケーションリソース
<a name="how-resources-kda"></a>

Managed Service for Apache Flink は、Apache Flink アプリケーションをホストするための環境を作成する AWS サービスです。Apache Flink 用 Managed Serviceは、「**Kinesis プロセッシングユニット (KPU)**」と呼ばれるユニットを使用してリソースを提供します。

1 つの KPU は次のシステムリソースを表します。
+ 1つのCPUコア
+ 4 GBのメモリ(1 GBがネイティブメモリ、3 GBがヒープメモリ)
+ 50 GB のディスクスペース

KPU は、「**タスク**」と「**サブタスク**」と呼ばれる別々の実行単位でアプリケーションを実行します。サブタスクはスレッドと同等と考えることができます。

アプリケーションで使用できる KPU の数は、アプリケーションの `Parallelism` 設定をアプリケーションの `ParallelismPerKPU` 設定で割った数です。

アプリケーションの並列処理については、 [アプリケーションスケーリングを実装する](how-scaling.md) をご参照ください。

## Apache Flink アプリケーションリソース
<a name="how-resources-flink"></a>

Apache Flink 環境は、「**タスクスロット**」と呼ばれる単位を使用してアプリケーションのリソースを割り当てます。Apache Flink 用 Managed Serviceがアプリケーションにリソースを割り当てると、1 つ以上の Apache Flink タスクスロットが 1 つの KPU に割り当てられます。1 つの KPU に割り当てられるスロット数は、アプリケーションの `ParallelismPerKPU` 設定と同じです。タスクスロットの詳細については、Apache Flink ドキュメントの「[Job Scheduling](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/internals/job_scheduling/)」を参照してください。

### オペレーターの並列処理
<a name="how-resources-flink-operatorparallelism"></a>

オペレータが使用できるサブタスクの最大数を設定できます。この値は「**オペレータ並列度**」と呼ばれます。デフォルトでは、アプリケーション内の各オペレータの並列度はアプリケーションの並列度と同じです。つまり、デフォルトでは、アプリケーション内の各オペレータは、必要に応じてアプリケーションで使用可能なすべてのサブタスクを使用できます。

`setParallelism` メソッドを使用して、アプリケーション内のオペレータの並列度を設定できます。この方法を使用すると、各オペレータが一度に使用できるサブタスクの数を制御できます。

オペレーターの詳細については、Apache Flink ドキュメントの「[Operators](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)」を参照してください。

### オペレーターの連鎖
<a name="how-resources-flink-operatorchaining"></a>

通常、各オペレータは別々のサブタスクを使用して実行しますが、複数のオペレータが常に順番に実行する場合、ランタイムはそれらすべてを同じタスクに割り当てることができます。このプロセスは「**オペレータチェイニング**」と呼ばれます。

複数のシーケンシャルオペレータがすべて同じデータを操作する場合、それらを 1 つのタスクにまとめることができます。そのために必要ないくつかの基準を以下に挙げます。
+ オペレータは 1 対 1 の単純な転送を行います。
+ オペレータの並列度はすべて同じです。

アプリケーションがオペレータを 1 つのサブタスクにチェーンすると、サービスがネットワーク操作を実行したり、オペレータごとにサブタスクを割り当てたりする必要がなくなるため、システムリソースを節約できます。アプリケーションがオペレータチェイニングを使用しているかどうかを確認するには、Apache Flink 用 Managed Serviceコンソールのジョブグラフを見てください。アプリケーションの各頂点は 1 つ以上のオペレータを表します。グラフには、1 つの頂点として連結されたオペレータが表示されます。

# Managed Service for Apache Flink での 1 秒単位の請求
<a name="how-pricing"></a>

Managed Service for Apache Flink は 1 秒単位で請求されるようになりました。アプリケーションごとに 10 分の最低料金がかかります。1 秒あたりの請求は、新しく起動したアプリケーションまたは既に実行されているアプリケーションに適用されます。このセクションでは、Managed Service for Apache Flink が使用量を計測して請求する仕組みについて説明します。Amazon Managed Service for Apache Flink の料金の詳細については、「[Amazon Managed Service for Apache Flink の料金](https://aws.amazon.com/managed-service-apache-flink/pricing/)」を参照してください。

## 仕組み
<a name="how-resources-kda"></a>

Managed Service for Apache Flink は、サポートされている AWS リージョン内の **Kinesis 処理ユニット (KPU)** の期間と数に対して課金が発生し、KPU ごとに 1 秒単位で請求されます。1 つの KPU は、1vCPU のコンピューティングと 4 GB のメモリで構成されています。アプリケーションの実行に使用された KPU の数に基づいて時間あたりの料金が請求されます。

例えば、20 分 10 秒間実行されているアプリケーションは、20 分 10 秒間に対して課金が発生し、使用したリソースが乗算されます。5 分間実行されているアプリケーションでは、10 分の最低期間に、使用したリソースを掛けた料金が請求されます。

Managed Service for Apache Flink は使用量を時間単位で示します。例えば、15 分間は 0.25 時間に相当します。

Apache Flink アプリケーションの場合、オーケストレーションに使用された KPU がアプリケーションあたり 1 単位追加される方式で課金されます。アプリケーションは、稼動中のストレージや永続性バックアップに対しても課金されます。稼動中のアプリケーションストレージは、Managed Service for Apache Flink のステートフル処理機能に使用され、GB 単位で月ごとに課金されます。永続性バックアップはオプションであり、GB 単位で月ごとに課金されるアプリケーションのポイントインリカバリ機能を提供します。

ストリーミングモードでは、Managed Service for Apache Flink は、メモリやコンピューティングの需要の変動に応じて、ストリーム処理アプリケーションが必要とする KPU の数を自動的にスケーリングします。ユーザーは、必要な数の KPU を使用してアプリケーションをプロビジョニングすることを選択できます。

## AWS リージョン 可用性
<a name="how-pricing-regions"></a>

**注記**  
現時点では、 AWS GovCloud (米国東部)、および AWS GovCloud (米国西部)、中国 (北京)、中国 (寧夏) リージョンでは 1 秒単位の請求を利用できません。

1 秒単位の請求は、次の AWS リージョンで利用できます。
+ 米国東部 (バージニア北部) - us-east-1
+ 米国東部 (オハイオ) - us-east-2
+ 米国西部 (北カリフォルニア) us-west-1
+ 米国東部 (オレゴン) - us-west-2
+ アフリカ (ケープタウン) – af-south-1
+ アジアパシフィック (香港) – ap–east–1
+ アジアパシフィック (ハイデラバード) - ap-south-1
+ アジアパシフィック (ジャカルタ): ap-southeast-3
+ アジアパシフィック (メルボルン) - ap-southeast-4
+ アジアパシフィック (ムンバイ) ap-south-1
+ アジアパシフィック (大阪) ap-northeast-3
+ アジアパシフィック (ソウル) ap-northeast-2
+ アジアパシフィック (シンガポール) ap-southeast-1
+ アジアパシフィック (シドニー) - ap-southeast-2
+ アジアパシフィック (東京) - ap-northeast-1
+ カナダ (中部) ca-central-1
+ カナダ西部 (カルガリー) - ca-west-1
+ 欧州 (フランクフルト) eu-central-1
+ ヨーロッパ (アイルランド) eu-west-1
+ 欧州 (ロンドン) eu-west-2
+ 欧州 (ミラノ): eu-south-1
+ 欧州 (パリ) eu-west-3
+ 欧州 (スペイン): eu-south-2
+ 欧州 (ストックホルム) eu-north-1
+ 欧州 (チューリッヒ): eu-central-2
+ イスラエル (テルアビブ) - il-central-1
+ 中東 (バーレーン) – me-south-1
+ 中東 (UAE): me-central-1
+ 南米 (サンパウロ) sa-east-1

## 料金の例
<a name="how-pricing-examples"></a>

Managed Service for Apache Flink の料金ページに料金の例があります。詳細については、「[Amazon Managed Service for Apache Flink の料金](https://aws.amazon.com/managed-service-apache-flink/pricing/)」を参照してください 以下に、その他の例とそれぞれのコスト使用状況レポートの図を示します。

### 長時間実行される、負荷の高いワークロード
<a name="pricing-example-1"></a>

大規模なビデオストリーミングサービスを提供していて、ユーザーのインタラクションに基づいてリアルタイムのビデオレコメンデーションを構築したいと考えているとします。Managed Service for Apache Flink の Apache Flink アプリケーションを使用して、複数の Kinesis Data Streams からユーザーインタラクションイベントを継続的に取り込み、イベントをリアルタイムで処理してダウンストリームシステムに出力します。ユーザーインタラクションイベントは、複数のオペレーターを使用して変換されます。これには、イベントタイプによるデータのパーティショニング、追加のメタデータによるデータのエンリッチメント、タイムスタンプによるデータのソート、配信前の 5 分間のデータのバッファリングが含まれます。アプリケーションには、並列化可能で大量の演算を行う変換ステップが多数あります。Flink アプリケーションは、このワークロードに対応するために 20 KPU を使用して実行されるように設定されています。アプリケーションは、毎日 1 GB の永続性アプリケーションバックアップを使用します。Managed Service for Apache Flink の月額料金は、次のように計算されます。

**月額料金**

米国東部 (バージニア北部) リージョンでの料金は、KPU 1 時間あたり 0.11 USD です。Managed Service for Apache Flink は、稼動中アプリケーションストレージを KPU ごとに 50 GB 割り当てて、1 か月あたり 1 GB 単位で 0.10 USD を課金します。
+ 1 か月あたりの KPU 料金: 24 時間 \$1 30 日 \$1 (20 KPU \$1 ストリーミングアプリケーション用に追加された 1 KPU) \$1 0.11 USD/時間 = 1,584.00 USD
+ 1 か月あたりの稼動中アプリケーションストレージ料金: 30 日 \$1 20 KPU \$1 50 GB/KPU \$1 0.10 USD/GB/月 = 100.00 USD
+ 1 か月あたりの永続性アプリケーションストレージ料金: 30 日 \$1 1 GB \$1 0.023/GB/月 = 0.03 USD
+ 合計料金: 1,584.00 USD \$1 100 USD \$1 0.03 USD = **1,684.03 USD**

**請求およびコストマネジメントコンソールに表示される、この月の Managed Service for Apache Flink のコスト使用状況レポート**

Kinesis Analytics
+ 1,684.03 USD - 米国東部 (バージニア北部)
+ Amazon Kinesis Analytics CreateSnapshot
  + 永続性アプリケーションバックアップ: GB/月あたり 0.023 USD
    + 1 GB/月 - 0.03 USD
+ Amazon Kinesis Analytics StartApplication
  + 稼動中アプリケーションストレージ: GB/月あたり 0.10 USD
    + 1,000 GB/月 - 100 USD
  + Apache Flink アプリケーション: Kinesis 処理ユニット時間あたり 0.11 USD
    + 15,120 KPU 時間 - 1,584 USD

### 毎日、約 15 分間実行されるバッチワークロード
<a name="pricing-example-2"></a>

Managed Service for Apache Flink の Apache Flink アプリケーションを使用して、Amazon Simple Storage Service (Amazon S3) のログデータをバッチモードで変換します。ログデータは、複数のオペレーターを使用して変換されます。これには、さまざまなログイベントへのスキーマの適用、イベントタイプによるデータのパーティショニング、タイムスタンプによるデータのソートが含まれます。アプリケーションには多数の変換ステップがありますが、演算負荷の高いものはありません。このアプリケーションは、30 日の月に毎日 15 分間、1 秒あたり 2,000 レコードの速度でデータを取り込みます。永続性アプリケーションバックアップは作成しません。Managed Service for Apache Flink の月額料金は、次のように計算されます。

**月額料金**

米国東部 (バージニア北部) リージョンでの料金は、KPU 1 時間あたり 0.11 USD です。Managed Service for Apache Flink は、稼動中アプリケーションストレージを KPU ごとに 50 GB 割り当てて、1 か月あたり 1 GB 単位で 0.10 USD を課金します。
+ バッチワークロード: Managed Service for Apache Flink アプリケーションは 1 日あたり 15 分間、1 秒あたり 2,000 レコードを処理します。これには 2KPU が必要であるため、30 日/月 \$1 15 分/日 = 450 分/月 になります。
+ 1 か月あたりの KPU 料金: 450 分/月 \$1 (2 KPU \$1 ストリーミングアプリケーション用に追加された 1 KPU) \$1 0.11 USD/時間 = 2.48 USD
+ 1 か月あたりの稼動中アプリケーションストレージ料金: 450 分/月 \$1 2 KPU \$1 50 GB/KPU \$1 0.10 USD/GB/月 = 0.11 USD
+ 合計料金: 2.48 USD \$1 0.11 = **2.59 USD**

**請求およびコストマネジメントコンソールに表示される、この月の Managed Service for Apache Flink のコスト使用状況レポート**

Kinesis Analytics
+ 2.59 USD - 米国東部 (バージニア北部)
+ Amazon Kinesis Analytics StartApplication
  + 稼動中アプリケーションバックアップ: GB/月あたり 0.10 USD
    + 1.042 GB/月 - 0.11 USD
  + Apache Flink アプリケーション: Kinesis 処理ユニット時間あたり 0.11 USD
    + 22.5 KPU 時間 - 2.48 USD

### 継続的に同じ時間に停止して開始し、複数の最低料金が適用されるテストアプリケーション
<a name="pricing-example-3"></a>

毎日何百万件ものトランザクションを処理する大規模な E コマースプラットフォームだとします。リアルタイムの不正検出機能を開発したいと考えています。Managed Service for Apache Flink の Apache Flink アプリケーションを使用して、Kinesis Data Streams からトランザクションイベントを取り込み、さまざまな変換ステップでイベントをリアルタイムで処理します。これには、スライディングウィンドウを使用してイベントを集約したり、イベントタイプ別にイベントをパーティショニングしたり、さまざまなイベントタイプに特定の検出ルールを適用したりすることが含まれます。開発中は、動作をテストしてデバッグするために、アプリケーションの起動と停止を複数回実行します。アプリケーションが数分しか実行されない場合もあります。1 時間かけて 4 KPU でアプリケーションをテストし、アプリケーションでは永続性アプリケーションバックアップは使用しません。
+ 午前 10 時 5 分にアプリケーションを起動します。アプリケーションは 30 分間実行されて、午前 10 時 35 分に停止します。
+ 午前 10 時 40 分にアプリケーションを再起動します。アプリケーションは 5 分間実行されて、午前 10 時 45 分に停止されます。
+ 午前 10 時 50 分にアプリケーションを再起動します。アプリケーションは 2 分間実行されて、午前 10 時 52 分に停止されます。

Managed Service for Apache Flink では、アプリケーションの実行が開始されるたびに最低 10 分間の使用料金が課金されます。アプリケーションの Managed Service for Apache Flink の月額使用料金は、次のように計算されます。
+ アプリケーションの初回起動と停止: 30 分間の使用
+ アプリケーションの 2 回目の起動と停止: 10 分間の使用 (アプリケーションは 5 分間実行され、10 分間の最低料金に切り上げられます)
+ アプリケーションの 3 回目の起動と停止: 10 分間の使用 (アプリケーションは 2 分間実行され、10 分間の最低料金に切り上げられます)

合計で、アプリケーションには 50 分間の使用料金が課金されます。この月の他の時間にこのアプリケーションを実行していない場合、Managed Service for Apache Flink の月額料金は次のように計算されます。

**月額料金**

米国東部 (バージニア北部) リージョンでの料金は、KPU 1 時間あたり 0.11 USD です。Managed Service for Apache Flink は、稼動中アプリケーションストレージを KPU ごとに 50 GB 割り当てて、1 か月あたり 1 GB 単位で 0.10 USD を課金します。
+ 1 か月あたりの KPU 料金: 50 分 \$1 (4 KPU \$1 ストリーミングアプリケーション用に追加された 1 KPU) \$1 0.11 USD/時間 = 0.46 USD (ペニー単位で四捨五入)
+ 1 か月あたりの稼動中アプリケーションストレージ料金: 50 分 \$1 4 KPU \$1 50 GB/KPU \$1 0.10 USD/GB/月 = 0.03 USD (ペニー単位で四捨五入)
+ 合計料金: 0.46 USD \$1 0.03 = **0.49 USD**

**請求およびコストマネジメントコンソールに表示される、この月の Managed Service for Apache Flink のコスト使用状況レポート**

Kinesis Analytics
+ 0.49 USD - 米国東部 (バージニア北部)
+ Amazon Kinesis Analytics StartApplication
  + 稼動中アプリケーションストレージ: GB/月あたり 0.10 USD
    + 0.232 GB/月 - 0.03 USD
  + Apache Flink アプリケーション: Kinesis 処理ユニット時間あたり 0.11 USD
    + 4.167 KPU 時間 - 0.46 USD

# DataStream API コンポーネントを確認する
<a name="how-datastream"></a>

Apache Flink アプリケーションは「[Apache Flink データストリーム API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/)」を使用してデータストリーム内のデータを変換します。

このセクションでは、データを移動、変換、追跡するさまざまなコンポーネントについて説明します。
+ [コネクタと DataStream API を使用して、Managed Service for Apache Flink でデータを移動する](how-connectors.md): これらのコンポーネントは、アプリケーションと外部データソースおよび宛先との間でデータを移動します。
+ [DataStream API と Managed Service for Apache Flink のオペレーターを使用してデータを変換する](how-operators.md): これらのコンポーネントは、アプリケーション内のデータ要素を変換またはグループ化します。
+ [DataStream API を使用して Managed Service for Apache Flink のイベントをトラッキングする](how-time.md): このトピックでは、Apache Flink 用 Managed Service が DataStream API を使用する際にイベントをトラッキングする方法について説明します。

# コネクタと DataStream API を使用して、Managed Service for Apache Flink でデータを移動する
<a name="how-connectors"></a>

Amazon Managed Service for Apache Flink DataStream API では、「コネクタ」とはApache Flink 用 Managed Serviceアプリケーションとの間でデータをやり取りするソフトウェアコンポーネントです。コネクタは、ファイルやディレクトリからの読み取りが可能になる柔軟な統合です。コネクタは、Amazon のサービスやサードパーティのシステムとやり取りするための完全なモジュールで構成されています。

コネクターの種類には、次のものがあります。
+ [ストリーミングデータソースを追加する](how-sources.md): Kinesis データストリーム、ファイル、またはその他のデータソースからアプリケーションにデータを提供します。
+ [シンクを使用してデータを書き込む](how-sinks.md): アプリケーションから、Kinesis データストリーム、Firehose ストリーム、またはその他のデータ送信先にデータを送信します。
+ [非同期 I/O を使用する](how-async.md): データソース (データベースなど) への非同期アクセスを提供し、ストリームイベントを充実させます。

## 使用可能なコネクタ
<a name="how-connectors-list"></a>

Apache Flink フレームワークには、さまざまなソースのデータにアクセスするためのコネクターが含まれています。Apache Flink フレームワークで使用できるコネクタについては、「[Apache Flink ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-1.15/)」の「[コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/)」を参照してください。

**警告**  
Flink 1.6、1.8、1.11、または 1.13 で実行中のアプリケーションがあり、中東 (UAE)、アジアパシフィック (ハイデラバード)、イスラエル (テルアビブ)、欧州 (チューリッヒ)、アジアパシフィック (メルボルン)、またはアジアパシフィック (ジャカルタ) リージョンで実行したい場合は、更新されたコネクタを使用してアプリケーションアーカイブを再構築するか、Flink 1.18 にアップグレードすることが必要になる可能性があります。  
Apache Flink コネクタは、独自のオープンソースリポジトリに保存されます。バージョン 1.18 以降にアップグレードする場合は、依存関係を更新する必要があります。Apache Flink AWS コネクタのリポジトリにアクセスするには、[flink-connector-aws](https://github.com/apache/flink-connector-aws)」を参照してください。  
以前の Kinesis ソース `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` は廃止されており、Flink の今後のリリースでは削除される可能性があります。代わりに [Kinesis ソース](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source)を使用してください。  
`FlinkKinesisConsumer` と `KinesisStreamsSource` の間に状態の互換性はありません。詳細については、Apache Flink ドキュメントの「[Migrating existing jobs to new Kinesis Streams Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer)」を参照してください。  
 以下は推奨されるガイドラインです。  


**コネクタのアップグレード**  

| Flink バージョン | 使用されるコネクタ | 解決策 | 
| --- | --- | --- | 
| 1.19、1.20 | Kinesis ソース |  Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の Kinesis Data Streams ソースコネクタを使用していることを確認してください。5.0.0 以降のバージョンである必要があります。詳細については、「[Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)」を参照してください。  | 
| 1.19、1.20 | Kinesis シンク |  Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の Kinesis Data Streams シンクコネクタを使用していることを確認してください。5.0.0 以降のバージョンである必要があります。詳細については、「[Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink)」を参照してください。  | 
| 1.19、1.20 | DynamoDB Streams ソース |  Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の DynamoDB Streams ソースコネクタを使用していることを確認してください。5.0.0 以降のバージョンである必要があります。詳細については、「[Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)」を参照してください。  | 
| 1.19、1.20 | DynamoDB シンク | Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の DynamoDB シンクコネクタを使用していることを確認してください。5.0.0 以降のバージョンである必要があります。詳細については、「[Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)」を参照してください。 | 
| 1.19、1.20 | Amazon SQS シンク |  Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の Amazon SQS シンクコネクタを使用していることを確認してください。5.0.0 以降のバージョンである必要があります。詳細については、「[Amazon SQS Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)」を参照してください。  | 
| 1.19、1.20 | Amazon Managed Service for Prometheus シンク |  Managed Service for Apache Flink バージョン 1.19 および 1.20 にアップグレードする場合は、最新の Amazon Managed Service for Prometheus シンクコネクタを使用していることを確認してください。1.0.0 以降のバージョンである必要があります。詳細については、「[Prometheus Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/)」を参照してください。  | 

# Managed Service for Apache Flink にストリーミングデータソースを追加する
<a name="how-sources"></a>

Apache Flink には、ファイル、ソケット、コレクション、カスタムソースから読み取るためのコネクタが用意されています。アプリケーションコードでは、「[Apache Flink ソース](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources)」を使用してストリームからデータを受信します。このセクションでは、Amazon サービスで利用できるソースについて説明します。

## Kinesis Data Streams を使用する
<a name="input-streams"></a>

`KinesisStreamsSource` は Amazon Kinesis データストリームからアプリケーションにストリーミングデータを提供します。

### `KinesisStreamsSource` を作成する
<a name="input-streams-create"></a>

次のコード例は、`KinesisStreamsSource` の作成を示しています。

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

`KinesisStreamsSource` の使用の詳細については、Apache Flink ドキュメントの「[Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)」および [Github で が公開している KinesisConnectors の例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)を参照してください。

### EFO コンシューマーを使用する `KinesisStreamsSource` を作成する
<a name="input-streams-efo"></a>

`KinesisStreamsSource` で[拡張ファンアウト (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/) がサポートされるようになりました。

Kinesis コンシューマーが EFO を使用する場合、Kinesis Data Streams サービスは、コンシューマーがストリームの固定帯域幅を、ストリームから読み取る他のコンシューマーと共有するのではなく、独自の専用帯域幅を提供します。

Kinesis コンシューマーで EFO を使用する方法の詳細については、[FLIP-128: Kinesis AWS コンシューマーの拡張ファンアウト](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers)」を参照してください。

EFO コンシューマーを有効にするには、Kinesis コンシューマーで次のパラメータを設定します。
+ **READER\$1TYPE:** アプリケーションが EFO コンシューマーを使用して Kinesis Data Streams データにアクセスできるようにするには、このパラメータを **EFO** に設定します。
+ **EFO\$1CONSUMER\$1NAME: **このパラメータを、このストリームのコンシューマー間で一意の文字列値に設定します。同じ Kinesis Data Stream でコンシューマー名を再利用すると、その名前を使用していた以前のコンシューマーは終了します。

EFO を使用するように `KinesisStreamsSource` を設定するには、コンシューマーに以下のパラメータを追加します。

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

EFO コンシューマーを使用する Managed Service for Apache Flink アプリケーションの例については、[Github で が公開している Kinesis コネクタの例](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors)を参照してください。

## Amazon MSK を使用する
<a name="input-msk"></a>

`KafkaSource` ソースは Amazon MSK トピックからアプリケーションにストリーミングデータを提供します。

### `KafkaSource` を作成する
<a name="input-msk-create"></a>

次のコード例は、`KafkaSource` の作成を示しています。

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

`KafkaSource` の使用方法の詳細については、「[MSK レプリケーション](earlier.md#example-msk)」を参照してください。

# Managed Service for Apache Flink でシンクを使用してデータを書き込む
<a name="how-sinks"></a>

アプリケーションコードでは、任意の [Apache Flink シンク](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) コネクタを使用して、Kinesis Data Streams や DynamoDB などの AWS サービスを含む外部システムに書き込むことができます。

Apache Flink には、ファイル用やソケット用のシンクも用意されており、カスタムシンクを実行することもできます。さまざまなシンクがサポートされていますが、その中でも、次のものがよく使用されます。

## Kinesis Data Streams を使用する
<a name="sinks-streams"></a>

Apache Flink では、「[Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/)」に関する情報が Apache Flink ドキュメントに記載されています。

Kinesis データストリームを入力と出力に使用するアプリケーションの例については、 [チュートリアル: Managed Service for Apache Flink で DataStream API 使用の概要](getting-started.md) を参照してください。

## Apache Kafka と Amazon Managed Streaming for Apache Kafka (MSK) を使用する
<a name="sinks-MSK"></a>

[Apache Flink Kafka コネクタ](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink)は、Apache Kafka と Amazon MSK にデータを公開するための広範なサポートを提供します。これには、1 回限りの保証が含まれます。Kafka に書き込む方法については、Apache Flink ドキュメントの「[Kafka Connectors examples](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors)」を参照してください。

## Amazon S3 を使用する
<a name="sinks-s3"></a>

Amazon S3 バケットにオブジェクトを書き込むには、Apache Flink `StreamingFileSink` を使用できます。

S3 にオブジェクトを書き込む方法の例については、 [「例: Amazon S3 バケットに書き込む」](earlier.md#examples-s3) を参照してください。

## Firehose を使用する
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer` は、[Firehose](https://docs.aws.amazon.com/firehose/latest/dev/) サービスを使用してアプリケーション出力を保存するための、信頼性が高くスケーラブルな Apache Flink シンクです。このセクションでは、Maven プロジェクトをセットアップして `FlinkKinesisFirehoseProducer` を作成・使用するための設定方法について説明します。

**Topics**
+ [`FlinkKinesisFirehoseProducer` を作成する](#sinks-firehose-create)
+ [`FlinkKinesisFirehoseProducer` コード例](#sinks-firehose-sample)

### `FlinkKinesisFirehoseProducer` を作成する
<a name="sinks-firehose-create"></a>

次のコード例は、`FlinkKinesisFirehoseProducer` の作成を示しています。

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### `FlinkKinesisFirehoseProducer` コード例
<a name="sinks-firehose-sample"></a>

次のコード例は、`FlinkKinesisFirehoseProducer` を作成して設定し、Apache Flink データストリームから Firehose サービスにデータを送信する方法を示しています。

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Firehose シンクの使用方法に関する詳細なチュートリアルについては、「[「例: Firehose に書き込む」](earlier.md#get-started-exercise-fh)」を参照してください。

# Managed Service for Apache Flink で非同期 I/O を使用する
<a name="how-async"></a>

非同期 I/O オペレータは、データベースなどの外部データソースを使用してストリームデータを強化します。Apache Flink 用 Managed Serviceはストリームイベントを非同期的に強化するため、リクエストを一括処理して効率を高めることができます。

詳細については、Apache Flink ドキュメントの「[Asynchronous I/O](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/)」を参照してください。

# DataStream API と Managed Service for Apache Flink のオペレーターを使用してデータを変換する
<a name="how-operators"></a>

Apache Flink 用 Managed Service の受信データを変換するには、Apache Flink「オペレータ」を使用します。Apache Flink オペレータは 1 つ以上のデータストリームを新しいデータストリームに変換します。新しいデータストリームには、元のデータストリームから変更されたデータが含まれます。Apache Flink には 25 種類以上のストリーム処理オペレータがあらかじめ組み込まれています。詳細については、Apache Flink ドキュメントの「[オペレーター](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/)」を参照してください。

**Topics**
+ [変換オペレーターを使用する](#how-operators-transform)
+ [集約オペレーターを使用する](#how-operators-agg)

## 変換オペレーターを使用する
<a name="how-operators-transform"></a>

JSON データストリームのいずれか１つのフィールドの簡単なテキスト変換の例を次に示します。

このコードは変換されたデータストリームを作成します。新しいデータストリームは、元のストリームと同じデータを持ち、 `TICKER` フィールドの内容に文字列 ` Company` が追加されます。

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## 集約オペレーターを使用する
<a name="how-operators-agg"></a>

次は集約オペレータの例です。このコードは集約されたデータストリームを作成します。このオペレータは 5 秒間のタンブリングウィンドウを作成し、ウィンドウ内の同じ `TICKER` 値を持つレコードの `PRICE` 値の合計を返します。

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

コード例については、「[Managed Service for Apache Flink アプリケーションの作成と操作の例](examples-collapsibles.md)」を参照してください。

# DataStream API を使用して Managed Service for Apache Flink のイベントをトラッキングする
<a name="how-time"></a>

Apache Flink 用 Managed Service は、次のタイムスタンプを使用してイベントを追跡します。
+ 「**Processing Time:**」それぞれの操作を実行しているマシンのシステム時間を指します。
+ 「**イベント時間:**」各イベントが発生デバイスで発生した時刻を指します。
+ 「**取り込み時間:**」Apache Flink サービス用 Managed Service にイベントが入るまでの時間を指します。

`setStreamTimeCharacteristic` を使用してストリーミング環境が使用する時間を設定します。

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

詳細については、「Apache Flink ドキュメント」の「[Generating Watermarks](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/)」を参照してください。

# Table API コンポーネントを確認する
<a name="how-table"></a>

Apache Flink アプリケーションは、「[Apache Flink テーブル API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/tableapi/)」を使用して、リレーショナルモデルを使用してストリーム内のデータを操作します。Table API を使用してテーブルソースを使用してデータにアクセスし、次にテーブル関数を使用してテーブルデータを変換およびフィルタリングします。API 関数または SQL コマンドを使用して、表形式のデータを変換およびフィルタリングできます。

このセクションは、以下のトピックで構成されます。
+ [Table API connectors](how-table-connectors.md): これらのコンポーネントは、アプリケーションと外部データソースおよび宛先との間でデータを移動します。
+ [Table API 時間属性](how-table-timeattributes.md): このトピックでは、Managed Service for Apache Flinkが Table API を使用する際にイベントをトラッキングする方法について説明します。

# Table API connectors
<a name="how-table-connectors"></a>

Apache Flink プログラミングモデルでは、コネクタは、アプリケーションが他の AWS サービスなどの外部ソースからデータを読み書きするために使用するコンポーネントです。

Apache Flink テーブル API では、以下のタイプのコネクタを使用できます。
+ [Table API ソース](#how-table-connectors-source): テーブルAPIソースコネクタを使用して、APIコールまたはSQLクエリを使用して `TableEnvironment` 内にテーブルを作成します。
+ [Table API シンク](#how-table-connectors-sink): SQL コマンドを使用して、Amazon MSK トピックや Amazon S3 バケットなどの外部ソースにテーブルデータを書き込みます。

## Table API ソース
<a name="how-table-connectors-source"></a>

データストリームからテーブルソースを作成します。次のコードは Amazon MSK トピックからテーブルを作成します。

```
//create the table
    final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties);
    consumer.setStartFromEarliest();
    //Obtain stream
    DataStream<StockRecord> events = env.addSource(consumer);

    Table table = streamTableEnvironment.fromDataStream(events);
```

テーブルソースの詳細については、「Apache Flink ドキュメント」の「[Table & SQL Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)」を参照してください。

## Table API シンク
<a name="how-table-connectors-sink"></a>

テーブルデータをシンクに書き込むには、SQL でシンクを作成し、その `StreamTableEnvironment` オブジェクトで SQL ベースのシンクを実行します。

次のコードの例は、テーブルのデータを Amazon S3 シンクに書き込む方法を示しています。

```
final String s3Sink = "CREATE TABLE sink_table (" +
    "event_time TIMESTAMP," +
    "ticker STRING," +
    "price DOUBLE," +
    "dt STRING," +
    "hr STRING" +
    ")" +
    " PARTITIONED BY (ticker,dt,hr)" +
    " WITH" +
    "(" +
    " 'connector' = 'filesystem'," +
    " 'path' = '" + s3Path + "'," +
    " 'format' = 'json'" +
    ") ";

    //send to s3
    streamTableEnvironment.executeSql(s3Sink);
    filteredTable.executeInsert("sink_table");
```

 この `format` パラメータを使用して、Apache Flink 用 Managed Serviceが出力をシンクに書き込む際に使用するフォーマットを制御できます。フォーマットについて詳しくは、「Apache Flink ドキュメント」の「[Supported Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/table/overview/)」を参照してください。

## ユーザー定義のソースとシンク
<a name="how-table-connectors-userdef"></a>

既存の Apache Kafka コネクタを使用して、Amazon MSK や Amazon S3 などの他の AWS サービスとの間でデータを送受信できます。他のデータソースや送信先とやり取りする場合は、独自のソースとシンクを定義できます。詳細については、「Apache Flink ドキュメント」の「[User-defined Sources and Sinks](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/sourcessinks/)」を参照してください。

# Table API 時間属性
<a name="how-table-timeattributes"></a>

データストリーム内の各レコードには、そのレコードに関連するイベントがいつ発生したかを定義する複数のタイムスタンプがあります。
+ 「**イベント時間**」:レコードを作成したイベントがいつ発生したかを定義するユーザー定義のタイムスタンプ。
+ 「**取り込み時間**」:アプリケーションがデータストリームからレコードを取得した時刻。
+ 「**処理時間**」:アプリケーションがレコードを処理した時間。

Apache Flink Table API がレコード時間に基づいてウィンドウを作成する場合、`setStreamTimeCharacteristic` メソッドを使用して、どのタイムスタンプを使用するかを定義します。

Table API でのタイムスタンプの使用について詳しくは、「Apache Flink ドキュメント」の「[Time Attributes](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/table/concepts/time_attributes/)」と「[Timely Stream Processing](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/concepts/time/)」を参照してください。

# Managed Service for Apache Flink で Python を使用する
<a name="how-python"></a>

**注記**  
Apple シリコンチップを搭載した新しい Mac で Python Flink アプリケーションを開発している場合、PyFlink 1.15 の Python 依存関係に関する 「[既知の問題](https://issues.apache.org/jira/browse/FLINK-26981)」 が発生する可能性があります。この場合、Docker で Python インタープリターを実行することをお勧めします。ステップバイステップの手順については、「[Apple Silicon Mac での PyFlink 1.15 開発](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/python/LocalDevelopmentOnAppleSilicon)」を参照してください。

Apache Flink バージョン 2.2 には、Python バージョン 3.12 を使用してアプリケーションを作成するためのサポートが含まれています。Python バージョン 3.8 のサポートは削除されました。詳細については、[Flink Python のドキュメント](https://nightlies.apache.org/flink/flink-docs-release-2.2/api/python/)を参照してください。Python を使用して Apache Flink アプリケーション用マネージドサービスを作成するには、次の手順を実行します。
+ Python `main` アプリケーションコードをメソッドを含むテキストファイルとして作成します。
+ アプリケーションコードファイルおよび Python または Java の依存関係を zip ファイルにバンドルし、Amazon S3 バケットにアップロードします。
+ Amazon S3 コードの場所、アプリケーションプロパティ、およびアプリケーション設定を指定して、Apache Flink アプリケーション用 Managed Service を作成します。

大まかに言うと、Python テーブル API は Java テーブル API のラッパーのようなものです。Python Table API の詳細については、Apache Flink ドキュメントの「[Table API Tutorial](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/table_api_tutorial/)」を参照してください。

# Managed Service for Apache Flink の Python アプリケーションをプログラムする
<a name="how-python-programming"></a>

Python アプリケーション向けのApache Flink 用 Managed Serviceは、Apache Flink Python テーブル API を使用してコーディングします。Apache Flink エンジンは Python テーブル API ステートメント (Python VM で実行されている) を Java テーブル API ステートメント (Java VM で実行されている) に変換します。

Python テーブル API を使用するには、次の操作を行います。
+ `StreamTableEnvironment` へのリファレンスを作成します。
+ `StreamTableEnvironment` リファレンスに対してクエリを実行して、`table` ソースストリーミングデータからオブジェクトを作成します。
+ `table` オブジェクトに対してクエリを実行して出力テーブルを作成します。
+ `StatementSet` を使用して出力テーブルを宛先に書き込みます。

Apache Flink 用 Managed Serviceで Python テーブル API を使い始めるには、[Python 用 Amazon Managed Service for Apache Flink 入門](gs-python.md) を参照してください。

## ストリーミングデータの読み取りと書き込み
<a name="how-python-programming-readwrite"></a>

ストリーミングデータを読み書きするには、テーブル環境で SQL クエリを実行します。

### テーブルを作成する
<a name="how-python-programming-readwrite-createtable"></a>

次のコード例は、SQL クエリを作成するユーザー定義関数を示しています。SQL クエリは Kinesis ストリームと相互作用するテーブルを作成します。

```
def create_table(table_name, stream_name, region, stream_initpos):
   return """ CREATE TABLE {0} (
                `record_id` VARCHAR(64) NOT NULL,
                `event_time` BIGINT NOT NULL,
                `record_number` BIGINT NOT NULL,
                `num_retries` BIGINT NOT NULL,
                `verified` BOOLEAN NOT NULL
              )
              PARTITIONED BY (record_id)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'scan.stream.initpos' = '{3}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.producer.collection-max-count' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(table_name, stream_name, region, stream_initpos)
```

### ストリーミングデータの読み取り
<a name="how-python-programming-readwrite-read"></a>

次のコード例は、前述の `CreateTable` SQL クエリをテーブル環境参照に対して使用してデータを読み取る方法を示しています。

```
   table_env.execute_sql(create_table(input_table, input_stream, input_region, stream_initpos))
```

### ストリーミングデータの書き込み
<a name="how-python-programming-readwrite-write"></a>

次のコード例は、`CreateTable` 例の SQL クエリを使用して出力テーブル参照を作成する方法と、`StatementSet`を使用してテーブルを操作して宛先の Kinesis ストリームにデータを書き込む方法を示しています。

```
   table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                       .format(output_table_name, input_table_name))
```

## ランタイムプロパティの読み取り
<a name="how-python-programming-properties"></a>

ランタイムプロパティを使用すると、アプリケーションコードを変更せずにアプリケーションを設定できます。

アプリケーションのアプリケーションプロパティは、Java アプリケーション向けの Apache Flink 用 Managed Service と同じ方法で指定します。ランタイムプロパティは次の方法で指定できます。
+ 「[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)」アクションを使用。
+ 「[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)」アクションを使用。
+ コンソールを使ってアプリケーションを設定します。

コード内でアプリケーション・プロパティを取得するには、Managed Service for Apache Flinkランタイムが作成する `application_properties.json` と呼ばれるjsonファイルを読み込みます。

以下のサンプルコードは、`application_properties.json` ファイルからのアプリケーションプロパティの読み取りの例です。

```
file_path = '/etc/flink/application_properties.json'
   if os.path.isfile(file_path):
       with open(file_path, 'r') as file:
           contents = file.read()
           properties = json.loads(contents)
```

次のユーザー定義関数のコード例は、アプリケーションプロパティオブジェクト:retrieves からプロパティグループを読み取る方法を示しています。

```
def property_map(properties, property_group_id):
   for prop in props:
       if prop["PropertyGroupId"] == property_group_id:
           return prop["PropertyMap"]
```

次のコード例は、前の例で返されたプロパティグループから INPUT\$1STREAM\$1KEY というプロパティを読み取る方法を示しています。

```
input_stream = input_property_map[INPUT_STREAM_KEY]
```

## アプリケーションのコードパッケージの作成
<a name="how-python-programming-package"></a>

Python アプリケーションを作成したら、コードファイルと依存関係を zip ファイルにバンドルします。

zip ファイルには `main` メソッドを含む Python スクリプトが含まれている必要があり、オプションで以下を含めることができます。
+ その他の Python コードファイル
+ JAR ファイル内のユーザー定義 Java コード
+ JAR ファイル内の Java ライブラリ

**注記**  
アプリケーションの ZIP ファイルには、アプリケーションの依存関係がすべて含まれている必要があります。アプリケーションの他のソースからのライブラリを参照することはできません。

# Managed Service for Apache Flink の Python アプリケーションを作成する
<a name="how-python-creating"></a>

## コードファイルを指定する
<a name="how-python-creating-code"></a>

アプリケーションのコードパッケージを作成したら、Amazon S3 バケットにアップロードします。次に、コンソールまたは「[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)」アクションを使用してアプリケーションを作成します。

「[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html)」アクションを使用してアプリケーションを作成する場合、`kinesis.analytics.flink.run.options` という特別なアプリケーションプロパティグループを使用して ZIP ファイル内のコードファイルとアーカイブを指定します。以下のタイプファイルを定義できます。
+ 「**python**」: Python のメインメソッドを含むテキストファイル。
+ 「**jarfile**」:Java ユーザー定義関数を含む Java JAR ファイル。
+ 「**pyFiles**」: アプリケーションが使用するリソースを含む Python リソースファイル。
+ 「**pyArchives**:」 アプリケーションのリソースファイルを含む zip ファイル。

Apache Flink Python コードファイルタイプの詳細については、Apache Flink ドキュメントの「[Command-Line Interface](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/cli/)」を参照してください。

**注記**  
Apache Flink のマネージドサービスは、`pyModule`、`pyExecutable` または `pyRequirements` ファイルタイプをサポートしていません。コード、要件、依存関係はすべて zip ファイルに含まれている必要があります。pip を使用してインストールする依存関係を指定することはできません。

次の JSON スニペットの例は、アプリケーションの zip ファイル内のファイルの場所を指定する方法を示しています。

```
"ApplicationConfiguration": {
    "EnvironmentProperties": {
      "PropertyGroups": [
        {
          "PropertyGroupId": "kinesis.analytics.flink.run.options",
          "PropertyMap": {
            "python": "MyApplication/main.py",
            "jarfile": "MyApplication/lib/myJarFile.jar",
            "pyFiles": "MyApplication/lib/myDependentFile.py",
            "pyArchives": "MyApplication/lib/myArchive.zip"
          }
        },
```

# Managed Service for Apache Flink の Python アプリケーションをモニタリングする
<a name="how-python-monitoring"></a>

アプリケーションの CloudWatch ログを使用して、Apache Flink Python アプリケーション用 Managed Service をモニタリングします。

Apache Flink 用 Managed Service は Python アプリケーションの以下のメッセージをログに記録します。
+ アプリケーションの `main` メソッドで `print()` を使用してコンソールに書き込まれるメッセージ。
+ `logging` パッケージを使用してユーザー定義関数で送信されるメッセージ。次のコード例は、ユーザー定義関数からアプリケーションログへの書き込みを示しています。

  ```
  import logging
  
  @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT())
  def doNothingUdf(i):
      logging.info("Got {} in the doNothingUdf".format(str(i)))
      return i
  ```
+ アプリケーションから返されるエラーメッセージ。

  アプリケーションが `main` 関数内で例外を投げると、その例外はアプリケーションのログに記録されます。

  次の例は、Python コードから発生した例外のログエントリを示しています。

  ```
  2021-03-15 16:21:20.000   --------------------------- Python Process Started --------------------------
  2021-03-15 16:21:21.000   Traceback (most recent call last):
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 101, in <module>"
  2021-03-15 16:21:21.000       main()
  2021-03-15 16:21:21.000   "  File ""/tmp/flink-web-6118109b-1cd2-439c-9dcd-218874197fa9/flink-web-upload/4390b233-75cb-4205-a532-441a2de83db3_code/PythonKinesisSink/PythonUdfUndeclared.py"", line 54, in main"
  2021-03-15 16:21:21.000   "    table_env.register_function(""doNothingUdf"", doNothingUdf)"
  2021-03-15 16:21:21.000   NameError: name 'doNothingUdf' is not defined
  2021-03-15 16:21:21.000   --------------------------- Python Process Exited ---------------------------
  2021-03-15 16:21:21.000   Run python process failed
  2021-03-15 16:21:21.000   Error occurred when trying to start the job
  ```

**注記**  
パフォーマンス上の問題から、アプリケーション開発中はカスタムログメッセージのみを使用することをおすすめします。

## CloudWatch Insights でログをクエリする
<a name="how-python-monitoring-insights"></a>

次の CloudWatch Insights クエリは、アプリケーションのメイン機能の実行中に Python エントリポイントによって作成されたログを検索します。

```
fields @timestamp, message
| sort @timestamp asc
| filter logger like /PythonDriver/
| limit 1000
```

# Managed Service for Apache Flink のランタイムプロパティを使用する
<a name="how-properties"></a>

「*ランタイムプロパティ*」を使用すると、アプリケーションコードを再コンパイルせずにアプリケーションを設定できます。

**Topics**
+ [コンソールを使用してランタイムプロパティを管理する](#how-properties-console)
+ [CLI を使用してランタイムプロパティを管理する](#how-properties-cli)
+ [Managed Service for Apache Flink アプリケーションのランタイムプロパティにアクセスする](#how-properties-access)

## コンソールを使用してランタイムプロパティを管理する
<a name="how-properties-console"></a>

 AWS マネジメントコンソールを使用して Managed Service for Apache Flink アプリケーションのランタイムプロパティを追加、更新、または削除できます。

**注記**  
過去にサポートされていたバージョンの Apache Flink を使用していて、既存のアプリケーションを Apache Flink 1.19.1 にアップグレードする場合は、インプレースの Apache Flink バージョンアップグレードを使用してアップグレードできます。インプレースのバージョンアップグレードでは、スナップショット、ログ、メトリクス、タグ、Flink 設定など、Apache Flink バージョン全体で、単一の ARN に対するアプリケーションのトレーサビリティを保持します。この機能は、`RUNNING` および `READY` 状態で使用できます。詳細については、「[Apache Flink のインプレースバージョンアップグレードを使用する](how-in-place-version-upgrades.md)」を参照してください。

**Apache Flink アプリケーション用 Managed Serviceのランタイムプロパティの更新**

1. にサインインし AWS マネジメントコンソール、https://console.aws.amazon.com/flink で Amazon MSF コンソールを開きます。

1. Apache Flink アプリケーション用 Managed Serviceを選択します。[**Application details (アプリケーションの詳細)**] を選択します。

1. アプリケーションのページで、**構成** をクリックします。

1. 「**プロパティ**」セクションを展開します。

1. 「**プロパティ‭**」セクションのコントロールを使用して、キーと値のペアを含むプロパティグループを定義します。これらのコントロールを使用して、プロパティグループとランタイムプロパティを追加、更新、削除します。

1. **[更新]** を選択します。

## CLI を使用してランタイムプロパティを管理する
<a name="how-properties-cli"></a>

「[AWS CLI](https://docs.aws.amazon.com/cli)」を使用してランタイムプロパティを追加、更新、削除できます。

このセクションには、アプリケーションのランタイムプロパティを設定するための API アクションのリクエスト例が含まれています。JSON ファイルを API アクションの入力に使用する方法の詳細については、[Managed Service for Apache Flink API コードの例](api-examples.md)　を参照してください。

**注記**  
次の例のサンプルのアカウント ID (*`012345678901`*) をアカウント ID に置き換えます。

### アプリケーション作成時にランタイムプロパティを追加する
<a name="how-properties-create"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)」アクションリクエスト例では、アプリケーションの作成時に 2 つのランタイムプロパティグループ (`ProducerConfigProperties` と `ConsumerConfigProperties`) を追加します。

```
{
    "ApplicationName": "MyApplication",
    "ApplicationDescription": "my java test app",
    "RuntimeEnvironment": "FLINK-1_19",
    "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role",
    "ApplicationConfiguration": {
        "ApplicationCodeConfiguration": {
            "CodeContent": {
                "S3ContentLocation": {
                    "BucketARN": "arn:aws:s3:::ka-app-code-username",
                    "FileKey": "java-getting-started-1.0.jar"
                }
            },
            "CodeContentType": "ZIPFILE"
        },
        "EnvironmentProperties":  { 
         "PropertyGroups": [ 
            { 
               "PropertyGroupId": "ProducerConfigProperties",
               "PropertyMap" : {
                    "flink.stream.initpos" : "LATEST",
                    "aws.region" : "us-west-2",
                    "AggregationEnabled" : "false"
               }
            },
            { 
               "PropertyGroupId": "ConsumerConfigProperties",
               "PropertyMap" : {
                    "aws.region" : "us-west-2"
               }
            }
         ]
      }
    }
}
```

### 既存のアプリケーションのランタイムプロパティを追加および更新する
<a name="how-properties-update"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)」アクションリクエスト例は、既存のアプリケーションのランタイムプロパティを追加または更新します。

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 2,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": [ 
        { 
          "PropertyGroupId": "ProducerConfigProperties",
          "PropertyMap" : {
            "flink.stream.initpos" : "LATEST",
            "aws.region" : "us-west-2",
            "AggregationEnabled" : "false"
          }
        },
        { 
          "PropertyGroupId": "ConsumerConfigProperties",
          "PropertyMap" : {
            "aws.region" : "us-west-2"
          }
        }
      ]
    }
  }
}
```

**注記**  
プロパティグループに対応するランタイムプロパティがないキーを使用すると、Apache Flink 用 Managed Service はそのキーと値のペアを新しいプロパティとして追加します。プロパティグループ内の既存のランタイムプロパティのキーを使用すると、Apache Flink 用 Managed Service はそのプロパティ値を更新します。

### ランタイムプロパティを削除する
<a name="how-properties-remove"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)」アクションリクエスト例では、既存のアプリケーションからすべてのランタイムプロパティとプロパティグループを削除します。

```
{
  "ApplicationName": "MyApplication",
  "CurrentApplicationVersionId": 3,
  "ApplicationConfigurationUpdate": {
    "EnvironmentPropertyUpdates": {
      "PropertyGroups": []
    }
  }
}
```

**重要**  
既存のプロパティグループまたはプロパティグループ内の既存のプロパティキーを省略すると、そのプロパティグループまたはプロパティは削除されます。

## Managed Service for Apache Flink アプリケーションのランタイムプロパティにアクセスする
<a name="how-properties-access"></a>

Java アプリケーションコード内のランタイムプロパティは、`Map<String, Properties>` オブジェクトを返す静的 `KinesisAnalyticsRuntime.getApplicationProperties()` メソッドを使用して取得します。

次の Java コードの例では、アプリケーションのランタイムプロパティを取得します。

```
 Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
```

プロパティグループを (`Java.Util.Properties` オブジェクトとして) 次のように取得します。

```
Properties consumerProperties = applicationProperties.get("ConsumerConfigProperties");
```

Apache Flink のソースまたはシンクは、通常、個々のプロパティを取得せずに `Properties` オブジェクトを渡すことで設定します。以下のコード例は、ランタイムプロパティから取得した `Properties` オブジェクトを渡して Flink ソースを作成する方法を示しています。

```
private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException {
  Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
  FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<String>(new SimpleStringSchema(),
    applicationProperties.get("ProducerConfigProperties"));

  sink.setDefaultStream(outputStreamName);
  sink.setDefaultPartition("0");
  return sink;
}
```

コード例については、[Managed Service for Apache Flink アプリケーションの作成と操作の例](examples-collapsibles.md) を参照してください。

# Managed Service for Apache Flink で Apache Flink コネクタを使用する
<a name="how-flink-connectors"></a>

Apache Flink コネクタとは、Amazon Managed Service for Apache Flink アプリケーションとの間でデータを移動するソフトウェアコンポーネントです。コネクタは、ファイルやディレクトリからの読み取りが可能になる柔軟な統合です。コネクタは、Amazon のサービスやサードパーティのシステムとやり取りするための完全なモジュールで構成されています。

コネクターの種類には、次のものがあります。
+ **ソース:** Kinesis データストリーム、ファイル、Apache Kafka トピック、ファイル、またはその他のデータソースからアプリケーションにデータを提供します。
+ **シンク:** アプリケーションから、Kinesis データストリーム、Firehose ストリーム、Apache Kafka トピック、またはその他のデータ送信先にデータを送信します。
+ **非同期 I/O:** データソース (データベースなど) への非同期アクセスを提供し、ストリームをエンリッチ化します。

Apache Flink コネクタは、独自のソースリポジトリに保存されます。Apache Flink コネクタのバージョンとアーティファクトは、使用している Apache Flink のバージョンと、DataStream API、Table API、SQL API のどれを使用しているかによって異なります。

Amazon Managed Service for Apache Flink は、40 を超える構築済みの Apache Flink ソースコネクタとシンクコネクタをサポートしています。次の表は、最も一般的なコネクタとその関連バージョンの概要を示します。非同期シンクフレームワークを使用してカスタムシンクを構築することもできます。詳細については、Apache Flink ドキュメントの「[The Generic Asynchronous Base Sink](https://flink.apache.org/2022/03/16/the-generic-asynchronous-base-sink/)」を参照してください。

 Apache Flink AWS コネクタのリポジトリにアクセスするには、[flink-connector-aws](https://github.com/apache/flink-connector-aws)」を参照してください。

## Flink 2.2 用コネクタ
<a name="connectors-flink-2-2"></a>

Flink 2.2 にアップグレードする場合、コネクタの依存関係を Flink 2.x ランタイムと互換性のあるバージョンに更新する必要があります。Flink コネクタは Flink ランタイムとは独立してリリースされ、すべてのコネクタにまだ Flink 2.x 互換リリースがあるわけではありません。次の表は、この執筆時点で Amazon Managed Service for Apache Flink で一般的に使用されているコネクタの可用性をまとめたものです。


**Flink 2.2 用コネクタ**  

| コネクタ | Flink 2.0\$1 バージョン | 注意事項 | 
| --- | --- | --- | 
| Apache Kafka | flink-connector-kafka 4.0.0-2.0 | Flink 2.2 に推奨 | 
| Kinesis Data Streams (ソース) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Flink 2.2 に推奨 | 
| Kinesis Data Streams (シンク) | flink-connector-aws-kinesis-streams 6.0.0-2.0 | Flink 2.2 に推奨 | 
| FileSystem (S3、HDFS) | Flink にバンドル | Flink ディストリビューションに組み込まれている — 常に利用可能 | 
| JDBC | 2.x 用にまだリリースされていない | Flink 2.x 互換リリースは利用できません | 
| OpenSearch | 2.x 用にまだリリースされていない | Flink 2.x 互換リリースは利用できません | 
| Elasticsearch | 2.x 用にまだリリースされていない | OpenSearch コネクタへの移行を検討する | 
| Amazon Managed Service for Prometheus | 2.x 用にまだリリースされていない | 書き込み時に Flink 2.x 互換リリースがない | 

アプリケーションが Flink 2.2 リリースをまだ持っていないコネクタに依存している場合は、コネクタが互換性のあるバージョンをリリースするのを待つか、代替バージョンに置き換えることができるかどうかを評価する (JDBC カタログやカスタムシンクを使用するなど) という 2 つのオプションがあります。

**既知の問題**
+ コネクタ v5.0.0 および v6.0.0 で導入された EFO (拡張ファンアウト/SubscribeToShard) パス`KinesisStreamsSource`で を使用するアプリケーションは、Kinesis ストリームがリシャーディングされると失敗することがあります。これはコミュニティの既知の問題です。詳細については、「[FLINK-37648](https://issues.apache.org/jira/browse/FLINK-37648)」を参照してください。
+ コネクタ v5.0.0 および v6.0.0 で導入された EFO (拡張ファンアウト/SubscribeToShard) パス`KinesisStreamsSource`で を使用するアプリケーションは、Flink アプリケーションにバックプレッシャーがかかっている場合にデッドロックが発生し、1 つ以上の TaskManagers でのデータ処理が完全に停止`KinesisStreamsSink`する可能性があります。アプリを復旧するには、強制停止オペレーションとアプリの開始オペレーションが必要です。これは、コミュニティの既知の問題のサブケースである [FLINK-34071](https://issues.apache.org/jira/browse/FLINK-34071) です。

## 古い Flink バージョンのコネクタ
<a name="connectors-older-versions"></a>


**古い Flink バージョンのコネクタ**  

| コネクタ | Flink バージョン 1.15 | Flink バージョン 1.18 | Flink バージョン 1.19 | Flink バージョン 1.20 | 
| --- | --- | --- | --- | --- | 
| Kinesis Data Stream - ソース - DataStream API と Table API | flink-connector-kinesis、1.15.4 | flink-connector-kinesis、4.3.0-1.18 | flink-connector-kinesis、5.0.0-1.19 | flink-connector-kinesis、5.0.0-1.20 | 
| Kinesis Data Stream - シンク - DataStream API と Table API | flink-connector-aws-kinesis-streams、1.15.4 | flink-connector-aws-kinesis-streams、4.3.0-1.18 | flink-connector-aws-kinesis-streams、5.0.0-1.19 | flink-connector-aws-kinesis-streams、5.0.0-1.20 | 
| Kinesis Data Streams - ソース/シンク - SQL | flink-sql-connector-kinesis、1.15.4 | flink-sql-connector-kinesis、4.3.0-1.18 | flink-sql-connector-kinesis、5.0.0-1.19 | flink-sql-connector-kinesis-streams、5.0.0-1.20 | 
| Kafka - DataStream API と Table API | flink-connector-kafka、1.15.4 | flink-connector-kafka、3.2.0-1.18 | flink-connector-kafka、3.3.0-1.19 | flink-connector-kafka、3.3.0-1.20 | 
| Kafka - SQL | flink-sql-connector-kafka、1.15.4 | flink-sql-connector-kafka、3.2.0-1.18 | flink-sql-connector-kafka、3.3.0-1.19 | flink-sql-connector-kafka、3.3.0-1.20 | 
| Firehose - DataStream API と Table API | flink-connector-aws-kinesis-firehose、1.15.4 | flink-connector-aws-firehose、4.3.0-1.18 | flink-connector-aws-firehose、5.0.0-1.19 | flink-connector-aws-firehose、5.0.0-1.20 | 
| Firehose - SQL | flink-sql-connector-aws-kinesis-firehose、1.15.4 | flink-sql-connector-aws-firehose、4.3.0-1.18 | flink-sql-connector-aws-firehose、5.0.0-1.19 | flink-sql-connector-aws-firehose、5.0.0-1.20 | 
| DynamoDB - DataStream API と Table API | flink-connector-dynamodb、3.0.0-1.15 | flink-connector-dynamodb、4.3.0-1.18 | flink-connector-dynamodb、5.0.0-1.19 | flink-connector-dynamodb、5.0.0-1.20 | 
| DynamoDB - SQL | flink-sql-connector-dynamodb、3.0.0-1.15 | flink-sql-connector-dynamodb、4.3.0-1.18 | flink-sql-connector-dynamodb、5.0.0-1.19 | flink-sql-connector-dynamodb、5.0.0-1.20 | 
| OpenSearch - DataStream API と Table API | - | flink-connector-opensearch、1.2.0-1.18 | flink-connector-opensearch、1.2.0-1.19 | flink-connector-opensearch、1.2.0-1.19 | 
| OpenSearch - SQL | - | flink-sql-connector-opensearch、1.2.0-1.18 | flink-sql-connector-opensearch、1.2.0-1.19 | flink-sql-connector-opensearch、1.2.0-1.19 | 
| Amazon Managed Service for Prometheus DataStream | - | flink-sql-connector-opensearch、1.2.0-1.18 | flink-connector-prometheus、1.0.0-1.19 | flink-connector-prometheus、1.0.0-1.20 | 
| Amazon SQS DataStream API と Table API | - | flink-sql-connector-opensearch、1.2.0-1.18 | flink-connector-sqs、5.0.0-1.19 | flink-connector-sqs、5.0.0-1.20 | 

Amazon Managed Service for Apache Flink のコネクタの詳細については、以下を参照してください。
+ [DataStream API connectors](https://docs.aws.amazon.com/managed-flink/latest/java/how-connectors.html)
+ [Table API connectors](https://docs.aws.amazon.com/managed-flink/latest/java/how-table-connectors.html)

### 既知の問題
<a name="connectors-known-issues"></a>

Apache Flink 1.15 の Apache Kafka コネクタには、オープンソース Apache Flink の既知の問題があります。この問題は、それ以降のバージョンの Apache Flink で解決されています。

詳細については、「[既知の問題](flink-1-15-2.md#flink-1-15-known-issues)」を参照してください。

# Managed Service for Apache Flink で耐障害性を実装する
<a name="how-fault"></a>

チェックポインティングは、Amazon Managed Service for Apache Flink でフォールトトレランスを実装するために使用される方法です。「チェックポイント」とは、実行中のアプリケーションの最新のバックアップのことで、予期せぬアプリケーションの中断やフェイルオーバーから即座に回復するために使用されます。

Apache Flink アプリケーションのチェックポイントについて詳しくは、Apache Flink ドキュメントの「[Checkpoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/checkpoints/)」を参照してください。

*スナップショット*は、アプリケーションの状態のバックアップを手動で作成して管理するものです。スナップショットを使うと、「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)」の呼び出しによってアプリケーションを以前の状態に復元できます。詳細については、「[スナップショットを使用してアプリケーションバックアップを管理する](how-snapshots.md)」を参照してください。

アプリケーションのチェックポイント機能が有効になっていると、アプリケーションが予期せず再起動した場合にアプリケーションデータのバックアップを作成して読み込むことができるため、耐障害性が確保されます。このような予期しないアプリケーションの再起動は、ジョブの予期しない再起動、インスタンスの障害などが原因である可能性があります。これにより、アプリケーションは、これらの再起動時に無障害実行と同じセマンティクスを持つことになります。

アプリケーションのスナップショットが有効になっていて、アプリケーションの「[ApplicationRestoreConfiguration](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html)」を使用して設定されている場合、サービスはアプリケーションの更新中、またはサービス関連のスケーリングやメンテナンスの際に 1 回限りの処理セマンティクスを提供します。

## Managed Service for Apache Flink でチェックポインティングを設定する
<a name="how-fault-configure"></a>

アプリケーションのチェックポインティング動作を構成できます。チェックポイントの状態を持続させるかどうか、チェックポイントに状態を保存する頻度、1 つのチェックポイント操作の終了から別のチェックポイント操作の開始までの最小間隔を定義できます。

「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)」または「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)」API 操作を使用して次の設定を行います。
+ 「`CheckpointingEnabled`」— アプリケーションでチェックポイントが有効になっているかどうかを示します。
+ 「`CheckpointInterval`」— チェックポイント (パーシスタンス) 操作の間隔をミリ秒単位で含みます。
+ 「`ConfigurationType`」— デフォルトのチェックポイント動作を使用するには、この値を「`DEFAULT`」に設定します。この値を「`CUSTOM`」に設定すると、他の値を設定できます。
**注記**  
デフォルトのチェックポイント動作は次のとおりです。  
**CheckpointingEnabled:** true
**CheckpointInterval:** 60000
**MinPauseBetweenCheckpoints:** 5000
**ConfigurationType** が に設定されている場合`DEFAULT`、 を使用するか、アプリケーションコードの値を設定して他の値に設定されていても AWS Command Line Interface、前述の値が使用されます。
**注記**  
Flink 1.15 以降では、Apache Flink 用 Managed Serviceは、アプリケーションの更新、スケーリング、停止などの自動スナップショット作成時に `stop-with-savepoint` を使用します。
+ `MinPauseBetweenCheckpoints` — 1 つのチェックポイント操作が終了してから別のチェックポイント操作が開始されるまでの最小時間 (ミリ秒単位)。この値を設定すると、チェックポイントオペレーションが `CheckpointInterval` よりも時間がかかる場合でも、アプリケーションは継続的にチェックポイント機能を実行できなくなります。

## チェックポインティング API の例を確認する
<a name="how-fault-examples"></a>

このセクションには、アプリケーションのチェックポイントを構成するための API アクションのリクエスト例が含まれています。JSON ファイルを API アクションの入力に使用する方法の詳細については、[Managed Service for Apache Flink API コードの例](api-examples.md) を参照してください。

### 新しいアプリケーションのチェックポインティングを設定する
<a name="how-fault-examples-create-config"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)」アクションのリクエスト例では、アプリケーションの作成時にチェックポインティングを設定しています。

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "true",
            "CheckpointInterval": 20000,
            "ConfigurationType": "CUSTOM",
            "MinPauseBetweenCheckpoints": 10000
         }
      }
}
```

### 新しいアプリケーションのチェックポインティングを無効にする
<a name="how-fault-examples-create-disable"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)」アクションのリクエスト例では、アプリケーションの作成時にチェックポインティングを無効にします。

```
{
   "ApplicationName": "MyApplication",
   "RuntimeEnvironment":"FLINK-1_19",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
        "S3ContentLocation":{
          "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
          "FileKey":"myflink.jar",
          "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
        }
      },
      "FlinkApplicationConfiguration": { 
         "CheckpointConfiguration": { 
            "CheckpointingEnabled": "false"
         }
      }
}
```

### 既存のアプリケーションのチェックポインティングを設定する
<a name="how-fault-examples-update-config"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)」アクションリクエスト例では、既存のアプリケーションのチェックポインティングを設定しています。

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": true,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

### 既存のアプリケーションのチェックポインティングを無効にする
<a name="how-fault-examples-update-update-disable"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)」アクションのリクエスト例では、既存のアプリケーションのチェックポインティングを無効にします。

```
{
   "ApplicationName": "MyApplication",
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "CheckpointConfigurationUpdate": { 
            "CheckpointingEnabledUpdate": false,
            "CheckpointIntervalUpdate": 20000,
            "ConfigurationTypeUpdate": "CUSTOM",
            "MinPauseBetweenCheckpointsUpdate": 10000
         }
      }
   }
}
```

# スナップショットを使用してアプリケーションバックアップを管理する
<a name="how-snapshots"></a>

*スナップショット*は、Apache Flink *セーブポイント*の Managed Service for Apache Flink 実装です。スナップショットは、ユーザーまたはサービスによってトリガーされ、作成され、管理されるアプリケーション状態のバックアップです。Apache Flink セーブポイントについては、「Apache Flink ドキュメント」の「[Savepoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/)」を参照してください。スナップショットを使用すると、アプリケーション状態の特定のスナップショットからアプリケーションを再起動できます。

**注記**  
アプリケーションが正しい状態データで正しく再起動できるように、1 日に数回スナップショットを作成することをおすすめします。スナップショットの正しい頻度は、アプリケーションのビジネスロジックによって異なります。頻繁にスナップショットを作成すると、より新しいデータを復元できますが、コストが増加して必要なシステムリソースが増えます。

Apache Flink 用 Managed Serviceでは、次の API アクションを使用してスナップショットを管理します。
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DescribeApplicationSnapshot.html)
+ [https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)

アプリケーションごとのスナップショット数の制限については、[Managed Service for Apache Flink および Studio ノートブッククォータ](limits.md) を参照してください。アプリケーションがスナップショットの上限に達すると、スナップショットを手動で作成すると失敗し、「`LimitExceededException`」が表示されます。

Apache Flink 用 Managed Serviceは決してスナップショットを削除しません。これらのスナップショットは、 [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html) アクションを使用して手動で削除する必要があります。

アプリケーションの起動時に、保存されているアプリケーション状態のスナップショットを読み込むには、「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html)」または「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html)」アクションの「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ApplicationRestoreConfiguration.html)」パラメータを使用します。

**Topics**
+ [自動スナップショットの作成を管理する](#how-fault-snapshot-update)
+ [互換性のない状態データを含むスナップショットから復元する](#how-fault-snapshot-restore)
+ [スナップショット API の例を確認する](#how-fault-snapshot-examples)

## 自動スナップショットの作成を管理する
<a name="how-fault-snapshot-update"></a>

アプリケーションの「[ApplicationSnapshotConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationSnapshotConfiguration.html)」で `SnapshotsEnabled` が `true` に設定されている場合、Apache Flink 用 Managed Serviceは、アプリケーションが更新、スケーリング、または停止されたときに、1 回限りの処理セマンティクスを実現するために自動的にスナップショットを作成して使用します。

**注記**  
`ApplicationSnapshotConfiguration::SnapshotsEnabled` が `false` に設定されると、アプリケーションの更新中にデータが失われます。

**注記**  
Apache Flink 用 Managed Serviceは、スナップショット作成中に中間セーブポイントをトリガーします。Flink バージョン 1.15 以降では、中間セーブポイントによる副作用は発生しなくなりました。「[Triggering Savepoints](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints)」を参照してください。

自動的に作成されたスナップショットには以下の特性があります。
+ スナップショットはサービスによって管理されますが、「[ListApplicationSnapshots](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListApplicationSnapshots.html)」アクションを使用してスナップショットを表示できます。自動的に作成されたスナップショットは、スナップショットの制限に含まれます。
+ アプリケーションがスナップショットの制限を超えると、手動で作成したスナップショットは失敗しますが、Apache Flink 用 Managed Service サービスは、アプリケーションの更新、スケーリング、または停止時に引き続き正常にスナップショットを作成します。手動でさらにスナップショットを作成する前に、「[ DeleteApplicationSnapshot](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)」アクションを使用してスナップショットを手動で削除する必要があります。

## 互換性のない状態データを含むスナップショットから復元する
<a name="how-fault-snapshot-restore"></a>

スナップショットにはオペレータに関する情報が含まれているため、以前のアプリケーションバージョン以降に変更されたオペレータの状態データをスナップショットから復元すると、予期しない結果が生じることがあります。現在のオペレータに対応していないスナップショットから状態データを復元しようとすると、アプリケーションに障害が発生します。障害が発生したアプリケーションは、「`STOPPING`」または「`UPDATING`」のいずれかの状態のままになります。

互換性のない状態データを含むスナップショットからアプリケーションが復元できるようにするには、「[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)」アクションを使用して「[FlinkRunConfiguration](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_FlinkRunConfiguration.html)」の `AllowNonRestoredState` パラメータを `true` に設定します。

古いスナップショットからアプリケーションを復元すると、次のような動作になります。
+ 「**オペレータ追加:**」新しいオペレータが追加されても、セーブポイントには新しいオペレータの状態データはありません。障害は発生せず、「`AllowNonRestoredState`」を設定する必要はありません。
+ 「**オペレータが削除された:**」既存のオペレータが削除されると、そのオペレータの状態データがセーブポイントに格納されます。`AllowNonRestoredState` が `true` に設定されていないと障害が発生します。
+ 「**オペレータ修正:**」パラメータのタイプを互換性のあるタイプに変更するなど、互換性のある変更が行われた場合、アプリケーションは古いスナップショットから復元できます。スナップショットからの復元の詳細については、「Apache Flink ドキュメント」の「[Savepoints](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/ops/state/savepoints/)」を参照してください。Apache Flink バージョン 1.8 以降を使用するアプリケーションは、別のスキーマのスナップショットから復元できる可能性があります。Apache Flink バージョン 1.6 を使用するアプリケーションは復元できません。2 フェーズコミットシンクでは、ユーザーが作成したスナップショット (CreateApplicationSnapshot) の代わりにシステムスナップショット (SwS) を使用することをお勧めします。

  Flink の場合、Apache Flink 用 Managed Serviceは、スナップショットの作成中に中間セーブポイントをトリガーします。Flink 1.15 以降では、中間セーブポイントによる副作用は発生しなくなりました。「[セーブポイントのトリガー](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#triggering-savepoints)」を参照してください。

既存のセーブポイントデータと互換性のないアプリケーションを再開する必要がある場合は、「[StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html)」アクションの「`ApplicationRestoreType`」パラメータを「`SKIP_RESTORE_FROM_SNAPSHOT`」に設定して、スナップショットからの復元をスキップすることをお勧めします。

Apache Flink が互換性のない状態データを処理する方法の詳細については、「*Apache Flink ドキュメント*」の「[State Schema Evolution](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/)」を参照してください。

## スナップショット API の例を確認する
<a name="how-fault-snapshot-examples"></a>

このセクションには、アプリケーションでスナップショットを使用するための API アクションのリクエスト例が含まれています。JSON ファイルを API アクションの入力に使用する方法の詳細については、[Managed Service for Apache Flink API コードの例](api-examples.md)　を参照してください。

### アプリケーションのスナップショットを有効にする
<a name="how-fault-savepoint-examples-enable"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) アクションの以下のリクエスト例は、アプリケーションのスナップショットを有効にします。

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 1,
   "ApplicationConfigurationUpdate": { 
      "ApplicationSnapshotConfigurationUpdate": { 
         "SnapshotsEnabledUpdate": "true"
       }
    }
}
```

### スナップショットを作成する
<a name="how-fault-savepoint-examples-create"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplicationSnapshot.html)」アクションのリクエスト例では、現在のアプリケーション状態のスナップショットを作成します。

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### アプリケーションのスナップショットを一覧表示する
<a name="how-fault-snapshot-examples-list"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html)」アクションリクエスト例では、現在のアプリケーション状態の最初の 50 個のスナップショットが一覧表示されます。

```
{
   "ApplicationName": "MyApplication",
   "Limit": 50
}
```

### アプリケーションのスナップショットの詳細を一覧表示する
<a name="how-fault-snapshot-examples-describe"></a>

[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DescribeApplicationSnapshot.html) アクションの以下のリクエスト例では、特定のアプリケーションスナップショットの詳細を一覧表示します。

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot"
}
```

### スナップショットを削除する
<a name="how-fault-snapshot-examples-delete"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)」アクションリクエスト例では、以前に保存したスナップショットを削除します。`SnapshotCreationTimestamp` 値は、「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_ListApplicationSnapshots.html)」または「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_DeleteApplicationSnapshot.html)」を使用して取得できます。

```
{
   "ApplicationName": "MyApplication",
   "SnapshotName": "MyCustomSnapshot",
   "SnapshotCreationTimestamp": 12345678901.0,
}
```

### 名前付きスナップショットを使用したアプリケーションを再起動する
<a name="how-fault-snapshot-examples-load-custom"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html)」アクションリクエスト例では、特定のスナップショットから保存された状態を使用してアプリケーションを起動します。

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_CUSTOM_SNAPSHOT",
         "SnapshotName": "MyCustomSnapshot"
      }
   }
}
```

### 最新のスナップショットを使用してアプリケーションを再起動する
<a name="how-fault-snapshot-examples-load-recent"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html)」アクションリクエスト例では、最新のスナップショットを使用してアプリケーションを起動します。

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"
      }
   }
}
```

### スナップショットなしでアプリケーションを再起動する
<a name="how-fault-snapshot-examples-load-none"></a>

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_StartApplication.html)」アクションのリクエスト例では、スナップショットがあってもアプリケーションの状態をロードせずにアプリケーションを起動します。

```
{
   "ApplicationName": "MyApplication",
   "RunConfiguration": { 
      "ApplicationRestoreConfiguration": { 
         "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
      }
   }
}
```

# Apache Flink のインプレースバージョンアップグレードを使用する
<a name="how-in-place-version-upgrades"></a>

Apache Flink でインプレースバージョンアップグレードを使用して、複数の Apache Flink バージョンにわたって単一の ARN に対するアプリケーションのトレーサビリティを保持します。これには、スナップショット、ログ、メトリクス、タグ、Flink 設定、リソース制限の引き上げ、VPC などが含まれます。

Apache Flink のインプレースバージョンアップグレードを実行して、既存のアプリケーションを Amazon Managed Service for Apache Flink の新しい Flink バージョンにアップグレードできます。このタスクを実行するには、、 AWS CLI AWS CloudFormation、 AWS SDK、または を使用できます AWS マネジメントコンソール。

**注記**  
Amazon Managed Service for Apache Flink Studio で Apache Flink のインプレースバージョンアップグレードを使用することはできません。

**Topics**
+ [Apache Flink のインプレースバージョンアップグレードを使用してアプリケーションをアップグレードする](upgrading-applications.md)
+ [アプリケーションを新しい Apache Flink バージョンにアップグレードする](upgrading-application-new-version.md)
+ [アプリケーションのアップグレードをロールバックする](rollback.md)
+ [アプリケーションのアップグレードに関する一般的なベストプラクティスと推奨事項](best-practices-recommendations.md)
+ [アプリケーションのアップグレードに関する注意事項と既知の問題](precautions.md)
+ [Flink 2.2 へのアップグレード: 完全なガイド](flink-2-2-upgrade-guide.md)
+ [Flink 2.2 アップグレードのステート互換性ガイド](state-compatibility.md)

# Apache Flink のインプレースバージョンアップグレードを使用してアプリケーションをアップグレードする
<a name="upgrading-applications"></a>

開始する前に、「[In-Place Version Upgrades](https://www.youtube.com/watch?v=f1qGGdaP2XI)」の動画を見ることをお勧めします。

Apache Flink のインプレースバージョンアップグレードを実行するには、 AWS CLI、、 AWS CloudFormation AWS SDK、または を使用できます AWS マネジメントコンソール。この機能は、`READY` または `RUNNING` 状態の Managed Service for Apache Flink と組み合わせて使用する既存のアプリケーションで使用できます。UpdateApplication API を使用して、Flink ランタイムを変更する機能を追加します。

## アップグレード前: Apache Flink アプリケーションを更新する
<a name="before-upgrading"></a>

Flink アプリケーションを記述するときに、それらを依存関係とともにアプリケーション JAR にバンドルし、その JAR を Amazon S3 バケットにアップロードします。そこから、Amazon Managed Service for Apache Flink は、選択した新しい Flink ランタイムでジョブを実行します。アップグレード先の Flink ランタイムとの互換性を実現するために、アプリケーションの更新が必要になる場合があります。Flink バージョン間に不整合がある可能性があり、その場合はバージョンのアップグレードが失敗します。最も一般的には、ソース (イングレス) または送信先 (シンク、エグレス) と Scala の依存関係のコネクタを使用します。Managed Service for Apache Flink の Flink 1.15 以降のバージョンは Scala に依存しないため、JAR には使用する予定の Scala のバージョンが含まれている必要があります。

**アプリケーションを更新するには**

1. 状態ありのアプリケーションのアップグレードに関する Flink コミュニティからのアドバイスをお読みください。「[Upgrading Applications and Flink Versions](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/)」を参照してください。

1. 既知の問題と制限のリストをお読みください。「[アプリケーションのアップグレードに関する注意事項と既知の問題](precautions.md)」を参照してください。

1. 依存関係を更新し、アプリケーションをローカルでテストします。通常、これらの依存関係は次のとおりです。

   1. Flink ランタイムと API。

   1. 新しい Flink ランタイムに推奨されるコネクタ。これらは、更新する特定のランタイムの[リリースバージョン](https://docs.aws.amazon.com/managed-flink/latest/java/release-version-list.html)で確認できます。

   1. Scala – Apache Flink は、Flink 1.15 以降は Scala に依存しません。使用する Scala の依存関係をアプリケーション JAR に含める必要があります。

1. zip ファイルに新しいアプリケーション JAR をビルドし、Amazon S3 にアップロードします。前の JAR/zip ファイルとは異なる名前を使用することをお勧めします。ロールバックする必要がある場合は、この情報を使用します。

1. ステートフルアプリケーションを実行している場合は、現在のアプリケーションのスナップショットを作成することを強くお勧めします。これにより、アップグレード中またはアップグレード後に問題が発生した場合に、ステートフルにロールバックできます。

# アプリケーションを新しい Apache Flink バージョンにアップグレードする
<a name="upgrading-application-new-version"></a>

[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) アクションを使用して Flink アプリケーションをアップグレードできます。

`UpdateApplication` API は複数の方法で呼び出すことができます。
+  AWS マネジメントコンソールで既存の **[設定]** ワークフローを使用する。
  +  AWS マネジメントコンソールのアプリページに移動します。
  + [**設定**] を選択します。
  + 新しいランタイムと起動元のスナップショット (復元設定とも呼ばれる) を選択します。最新のスナップショットからアプリケーションを起動するには、最新の設定を復元設定として使用します。Amazon S3 上にある、アップグレードされた新しいアプリケーション JAR/zip をポイントします。
+  AWS CLI [update-application](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html) アクションを使用します。
+  CloudFormation (CFN) を使用します。
  + [RuntimeEnvironment](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesisanalyticsv2-application.html#cfn-kinesisanalyticsv2-application-runtimeenvironment) フィールドを更新します。以前は、 によってアプリケーション CloudFormation が削除され、新しいアプリケーションが作成されていたため、スナップショットやその他のアプリケーション履歴が失われていました。これで、RuntimeEnvironment CloudFormation が更新され、アプリケーションは削除されません。
+  AWS SDK を使用します。
  + 選択したプログラミング言語については、SDK のドキュメントを参照してください。「[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)」を参照してください。

アップグレードは、アプリケーションが `RUNNING` 状態のとき、またはアプリケーションが `READY` 状態で停止しているときに実行できます。Amazon Managed Service for Apache Flink は、元のランタイムバージョンとターゲットランタイムバージョンの互換性を検証します。この互換性チェックは、`RUNNING` 状態のときに [UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) を実行した時点、または `READY` 状態のときにアップグレードする場合は次の [StartApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_StartApplication.html) の時点で、実行されます。

## `RUNNING` 状態のアプリケーションをアップグレードする
<a name="upgrading-running"></a>

次の例は、 を使用して `RUNNING`という名前のアプリケーションを米国東部 (バージニア北部) の Flink 1.18 `UpgradeTest`にアップグレード AWS CLI し、最新のスナップショットからアップグレードされたアプリケーションを起動する方法を示しています。

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --run-configuration-update '{"ApplicationRestoreConfiguration": '\
 '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \
 --current-application-version-id ${current_application_version}
```
+ サービススナップショットを有効にしていて、最新のスナップショットからアプリケーションを続行する場合、Amazon Managed Service for Apache Flink は、現在の `RUNNING` アプリケーションのランタイムが選択したターゲットランタイムと互換性があることを確認します。
+ ターゲットランタイムの続行元となるスナップショットを指定した場合、Amazon Managed Service for Apache Flink は、ターゲットランタイムが指定されたスナップショットと互換性があることを確認します。互換性チェックが失敗した場合、更新リクエストは拒否され、アプリケーションは `RUNNING` 状態のまま変更されません。
+ スナップショットを使用せずにアプリケーションを起動するよう選択した場合、Amazon Managed Service for Apache Flink は互換性チェックを実行しません。
+ アップグレードしたアプリケーションに障害が発生したり、過渡的な `UPDATING` 状態で停止したりした場合は、「[アプリケーションのアップグレードをロールバックする](rollback.md)」セクションの指示に従って正常な状態に戻します。

**[RUNNING 状態のアプリケーションのプロセスフロー]**

![\[次の図は、実行中のアプリケーションをアップグレードする場合に推奨するワークフローを示しています。アプリケーションがステートフルであり、スナップショットを有効にしていることを前提としています。このワークフローでは、更新する際に、更新前に Amazon Managed Service for Apache Flink によって自動的に作成された最新のスナップショットからアプリケーションを復元します。\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/in-place-update-while-running.png)


## **[READY]** 状態のアプリケーションをアップグレードする
<a name="upgrading-ready"></a>

次の例では、 AWS CLIを使用して `UpgradeTest` という名前の `READY` 状態のアプリケーションを米国東部 (バージニア北部) の Flink 1.18 にアップグレードする方法を示しています。アプリケーションは実行中ではないため、アプリケーションを起動するためのスナップショットは指定されていません。スナップショットは、アプリケーション起動リクエストを発行するときに指定できます。

```
            
aws --region us-east-1 kinesisanalyticsv2 update-application \
--application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \
--application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\
'{"CodeContentUpdate": {"S3ContentLocationUpdate": '\
'{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \
 --current-application-version-id ${current_application_version}
```
+ `READY` 状態のアプリケーションのランタイムを任意の Flink バージョンに更新できます。Amazon Managed Service for Apache Flink は、アプリケーションを起動するまでチェックを一切実行しません。
+  Amazon Managed Service for Apache Flink は、アプリケーションを起動するために選択したスナップショットに対する互換性チェックのみ実行します。これらは、[Flink 互換性テーブル](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/#compatibility-table)に従って行われる基本的な互換性チェックです。スナップショットが作成された Flink バージョンと、ターゲットとする Flink バージョンのみをチェックします。選択したスナップショットの Flink ランタイムがアプリケーションの新しいランタイムと互換性がない場合、起動リクエストが拒否されることがあります。

**[READY 状態のアプリケーションのプロセスフロー]**

![\[次の図は、READY 状態のアプリケーションをアップグレードする場合に推奨するワークフローを示しています。アプリケーションがステートフルであり、スナップショットを有効にしていることを前提としています。このワークフローでは、更新する際に、アプリケーション停止時に Amazon Managed Service for Apache Flink によって自動的に作成された最新のスナップショットからアプリケーションを復元します。\]](http://docs.aws.amazon.com/ja_jp/managed-flink/latest/java/images/in-place-update-while-ready.png)


# アプリケーションのアップグレードをロールバックする
<a name="rollback"></a>

アプリケーションに問題がある場合、または Flink バージョン間でアプリケーションコードに不整合がある場合は、 AWS CLI、、 AWS SDK AWS CloudFormation、または を使用してロールバックできます AWS マネジメントコンソール。次の例は、さまざまな障害シナリオでのロールバックの例を示しています。

## ランタイムのアップグレードは成功し、アプリケーションが `RUNNING` 状態だが、ジョブが失敗し、再起動を繰り返している
<a name="succeeded-restarting"></a>

米国東部 (バージニア北部) で、`TestApplication` という名前のステートフルアプリケーションを Flink 1.15 から Flink 1.18 にアップグレードしようとしているとします。しかし、アップグレードされた Flink 1.18 アプリケーションは、アプリケーションが `RUNNING` 状態であるのに、起動に失敗するか、再起動を繰り返しています。これはよくある障害シナリオです。ダウンタイムがこれ以上発生しないようにするために、アプリケーションを以前に実行していたバージョン (Flink 1.15) にすぐにロールバックし、その後で問題を診断することをお勧めします。

アプリケーションを以前の実行中のバージョンにロールバックするには、[rollback-application](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI コマンドまたは [RollbackApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) API アクションを使用します。この API アクションは、最新バージョンになった変更をロールバックします。次に、最後に成功したスナップショットを使用してアプリケーションを再起動します。

アップグレードを試みる前に、既存のアプリケーションが含まれたスナップショットを作成することを強くお勧めします。そうすることで、データ損失やデータの再処理が必要になる事態を回避できます。

この障害シナリオでは、 CloudFormation はアプリケーションをロールバックしません。CloudFormation テンプレートを更新して、以前のランタイムと以前のコードをポイントし、CloudFormation でアプリケーションを強制的に更新するようにする必要があります。そうしない場合、CloudFormation はアプリケーションが `RUNNING` 状態に移行したときに更新されていると想定します。

## `UPDATING` でスタックしているアプリケーションをロールバックする
<a name="stuck-updating"></a>

アップグレードの試行後にアプリケーションが `UPDATING`または `AUTOSCALING`状態でスタックした場合、Amazon Managed Service for Apache Flink は [rollback-applications](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/rollback-application.html) AWS CLI コマンド、またはアプリケーションをスタック`UPDATING`または `AUTOSCALING`状態になる前のバージョンにロールバックできる [RollbackApplications](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_RollbackApplication.html) API アクションを提供します。この API は、アプリケーションが `UPDATING` 状態または過渡的な `AUTOSCALING` 状態でスタックする原因となった変更をロールバックします。

# アプリケーションのアップグレードに関する一般的なベストプラクティスと推奨事項
<a name="best-practices-recommendations"></a>
+ 本番稼働環境のアップグレードを試みる前に、本番稼働以外の環境で、状態なしの新しいジョブ/ランタイムをテストします。
+ まず、本番稼働用ではないアプリケーションでステートフルアップグレードをテストすることを検討してください。
+ 新しいジョブグラフと、アップグレードしたアプリケーションの起動に使用するスナップショットに互換性があることを確認します。
  + オペレーター状態で保存されているタイプが同じままであることを確認します。タイプが変更された場合、Apache Flink はオペレーター状態を復元できません。
  + `uid` メソッドを使用して設定したオペレーター ID が同じままであることを確認します。Apache Flink では、オペレーターに一意の ID を割り当てることを強く推奨しています。詳細については、Apache Flink ドキュメントの「[Assigning Operator IDs](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/state/savepoints/#assigning-operator-ids)」を参照してください。

    オペレーターに ID を割り当てない場合、Flink が自動的に ID を生成します。この場合、ID はプログラム構造に依存し、変更されると互換性の問題が発生する可能性があります。Flink はオペレーター ID を使用して、スナップショットの状態とオペレーターを照合します。オペレーター ID を変更すると、アプリケーションが起動しないか、またはスナップショットに保存されている状態が削除され、新しいオペレーターが状態なしで起動します。
  + キー付き状態の保存に使用されるキーを変更しないでください。
  + ウィンドウや結合などのステートフル演算子の入力タイプを変更しないでください。これにより、オペレーターの内部状態のタイプが暗黙的に変更され、状態の互換性がなくなる原因になります。

# アプリケーションのアップグレードに関する注意事項と既知の問題
<a name="precautions"></a>

## ブローカーの再起動後にチェックポイント処理時の Kafka コミットが繰り返し失敗する
<a name="apache-kafka-connector"></a>

Apache Flink 1.15 の Apache Kafka コネクタには既知のオープンソースの問題があり、これは Kafka Client 2.8.1 のオープンソース Kafka Client に存在する重大なバグが原因です。詳細については、「[Kafka Commit on checkpointing fails repeatedly after a broker restart](https://issues.apache.org/jira/browse/FLINK-28060)」と「[KafkaConsumer is unable to recover connection to group coordinator after commitOffsetAsync exception](https://issues.apache.org/jira/browse/KAFKA-13840)」を参照してください。

この問題を回避するには、Amazon Managed Service for Apache Flink で Apache Flink 1.18 以降を使用することをお勧めします。

## 状態の互換性に関する既知の制限事項
<a name="state-precautions"></a>
+ Table API を使用している場合、Apache Flink は Flink バージョン間の状態の互換性を保証しません。詳細については、Apache Flink ドキュメントの「[Stateful Upgrades and Evolution](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/overview/#stateful-upgrades-and-evolution)」を参照してください。
+ Flink 1.6 の状態は Flink 1.18 と互換性がありません。状態付きで 1.6 から 1.18 以降にアップグレードしようとすると、API はリクエストを拒否します。1.8、1.11、1.13、1.15 にアップグレードしてスナップショットを作成し、1.18 以降にアップグレードできます。詳細については、Apache Flink ドキュメントの「[Upgrading Applications and Flink Versions](https://nightlies.apache.org/flink/flink-docs-master/docs/ops/upgrading/)」を参照してください。

## Flink Kinesis コネクタの既知の問題
<a name="kinesis-connector-precautions"></a>
+ Flink 1.11 以前を使用していて、拡張ファンアウト (EFO) サポート用の `amazon-kinesis-connector-flink` コネクタを使用している場合は、Flink 1.13 以降にステートフルアップグレードするために追加の手順を実行する必要があります。これは、コネクタのパッケージ名が変更されているためです。詳細については、[amazon-kinesis-connector-flink](https://github.com/awslabs/amazon-kinesis-connector-flink) を参照してください。

  Flink 1.11 以前の `amazon-kinesis-connector-flink` コネクタはパッケージ `software.amazon.kinesis` を使用しているのに対し、Flink 1.13 以降の Kinesis コネクタは `org.apache.flink.streaming.connectors.kinesis` を使用しています。移行をサポートするこのツールを使用してください。[amazon-kinesis-connector-flink-state-migrator](https://github.com/awslabs/amazon-kinesis-connector-flink-state-migrator)
+ `FlinkKinesisProducer` で Flink 1.13 以前していて、Flink 1.15 以降にアップグレードする場合は、ステートフルアップグレードするために、新しい `KinesisStreamsSink` ではなく Flink 1.15 以降で引き続き `FlinkKinesisProducer` を使用する必要があります。ただし、シンクにカスタム `uid` が既に設定されている場合、`FlinkKinesisProducer` では状態が維持されないため、`KinesisStreamsSink` に切り替えることができます。カスタム `uid` が設定されているため、Flink はこれを同じオペレーターとして扱います。

## Scala で記述された Flink アプリケーション
<a name="scala-precautions"></a>
+ Flink 1.15 以降、Apache Flink にはランタイムに Scala は含まれません。Flink 1.15 以降にアップグレードする場合は、使用する Scala のバージョンとその他の Scala の依存関係をコード JAR/zip に含める必要があります。詳細については、「[Amazon Managed Service for Apache Flink for Apache Flink 1.15.2 リリース](https://docs.aws.amazon.com/managed-flink/latest/java/flink-1-15-2.html)」を参照してください。
+ アプリケーションで Scala を使用していて、Flink 1.11 以前 (Scala 2.11) から Flink 1.13 (Scala 2.12) にアップグレードする場合は、コードで Scala 2.12 を使用していることを確認してください。それ以外の場合、Flink 1.13 アプリケーションは Flink 1.13 ランタイムで Scala 2.11 クラスを見つけられないことがあります。

## Flink アプリケーションをダウングレードする際の考慮事項
<a name="downgrading-precautions"></a>
+ Flink アプリケーションのダウングレードは可能ですが、アプリケーションが以前の Flink バージョンで実行されていた場合に限られます。ステートフルアップグレードの場合、Managed Service for Apache Flink では、ダウングレード対象と一致するバージョンまたはそれ以前のバージョンで作成されたスナップショットを使用する必要があります。
+ ランタイムを Flink 1.13 以降から Flink 1.11 以前に更新していて、アプリケーションが HashMap 状態バックエンドを使用している場合、アプリケーションは継続的に失敗します。

# Flink 2.2 へのアップグレード: 完全なガイド
<a name="flink-2-2-upgrade-guide"></a>

このガイドでは、Amazon Managed Service for Apache Flink アプリケーションを Flink 1.x から Flink 2.2 にアップグレードするstep-by-stepについて説明します。これは、慎重な計画とテストを必要とする重大な変更を伴うメジャーバージョンアップグレードです。

**メジャーバージョンのアップグレードは一方向です**  
アップグレードオペレーションでは、状態を保存してアプリケーションを Flink 1.x から 2.2 に移動できますが、2.2 状態を使用して 2.2 から 1.x に戻すことはできません。アップグレード後にアプリケーションが異常になった場合は、Rollback API を使用して、最新のスナップショットから元の 1.x 状態の 1.x バージョンに戻ります。

## 前提条件
<a name="upgrade-guide-prerequisites"></a>

アップグレードを開始する前に:
+ 確認 [重大な変更と廃止](flink-2-2.md#flink-2-2-breaking-changes)
+ 確認 [Flink 2.2 アップグレードのステート互換性ガイド](state-compatibility.md)
+ テスト用の非本番環境があることを確認する
+ 現在のアプリケーション設定と依存関係を文書化する

## 移行パスについて
<a name="upgrade-guide-migration-paths"></a>

アップグレードエクスペリエンスは、アプリケーションの Flink 2.2 との互換性によって異なります。これらのパスを理解することは、適切に準備し、現実的な期待を設定するのに役立ちます。

**パス 1: 互換性のあるバイナリとアプリケーションの状態**

**予想されること:**
+ アップグレードオペレーションを呼び出す
+ アプリケーションステータスが遷移した状態で 2.2 への移行を完了します。 `RUNNING` → `UPDATING` → `RUNNING`
+ データ損失や再処理なしですべてのアプリケーション状態を保持する
+ マイナーバージョン移行と同じエクスペリエンス

最適: ステートレスアプリケーションまたは互換性のあるシリアル化を使用するアプリケーション (Avro、互換性のある Protobuf スキーマ、コレクションのない POJOs)

**パス 2: バイナリ非互換性**

**予想されること:**
+ アップグレードオペレーションを呼び出す
+ オペレーションが失敗し、オペレーション API とログを通じてバイナリの非互換性が表示されます。
+ 自動ロールバックが有効になっている場合: アプリケーションは介入なしで数分以内に自動的にロールバックされます
+ 自動ロールバックが無効の場合: アプリケーションはデータ処理なしで実行状態のままになります。手動で古いバージョンにロールバックします。
+ バイナリが修正されたら、[UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) を使用して、パス 1 のようなエクスペリエンスを実現します。

最適: Flink ジョブの起動中に検出された削除APIs を使用するアプリケーション

**パス 3: 互換性のないアプリケーションの状態**

**予想されること:**
+ アップグレードオペレーションを呼び出す
+ 移行が最初に成功したように見える
+ 状態復元が失敗すると、アプリケーションは数秒以内に再起動ループに入る
+ 継続的な再起動を示す CloudWatch メトリクスによる障害の検出
+ ロールバックオペレーションを手動で呼び出す
+ ロールバックを開始してから数分以内に本番環境に戻る
+ アプリケーションの [状態移行](state-compatibility.md#state-compat-migration)を確認する

最適: 状態シリアル化の非互換性を持つアプリケーション (コレクションを持つ POJOs、特定の Kryo シリアル化状態)

**注記**  
本番稼働用アプリケーションのレプリカを作成し、本番稼働用アプリケーションと同じステップを実行する前に、レプリカでアップグレードの以下の各フェーズをテストすることを強くお勧めします。

## フェーズ 1: 準備
<a name="upgrade-guide-phase-1"></a>

**アプリケーションコードを更新する**

Flink 2.2 と互換性があるようにアプリケーションコードを更新します。
+ `pom.xml` または で **Flink の依存関係をバージョン 2.2.0 に更新**する `build.gradle`
+ **コネクタの依存関係を Flink 2.2 互換バージョンに更新**する (「」を参照[コネクタの可用性](flink-2-2.md#flink-2-2-connectors))
+ **非推奨の API の使用を削除します**。
  + DataSet API を DataStream API または Table API/SQL に置き換える
  + レガシー `SourceFunction`/`SinkFunction` を FLIP-27 ソースおよび FLIP-143 シンク APIs
  + Scala API の使用を Java API に置き換える
+ **Java 17 への更新**

**更新されたアプリケーションコードをアップロードする**
+ Flink 2.2 の依存関係を使用してアプリケーション JAR を構築する
+ 現在の JAR **とは異なるファイル名**で Amazon S3 にアップロードする (例: `my-app-flink-2.2.jar`)
+ アップグレードステップで使用する S3 バケットとキーを書き留めます。

## フェーズ 2: 自動ロールバックを有効にする
<a name="upgrade-guide-phase-2"></a>

自動ロールバックにより、Amazon Managed Service for Apache Flink は、アップグレードが失敗した場合に自動的に以前のバージョンに戻すことができます。

**自動ロールバックステータスを確認する**

*AWS マネジメントコンソール:*

1. アプリケーションに移動する

1. **設定**の選択

1. **アプリケーション設定**で、**システムのロールバック**が有効になっていることを確認します。

*AWS CLI:*

```
aws kinesisanalyticsv2 describe-application \
    --application-name MyApplication \
    --query 'ApplicationDetail.ApplicationConfigurationDescription.ApplicationSystemRollbackConfigurationDescription.RollbackEnabled'
```

**自動ロールバックを有効にする (有効になっていない場合)**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --application-configuration-update '{
        "ApplicationSystemRollbackConfigurationUpdate": {
            "RollbackEnabledUpdate": true
        }
    }'
```

## フェーズ 3: スナップショットを作成する (オプション)
<a name="upgrade-guide-phase-3"></a>

アプリケーションで自動スナップショットが有効になっている場合は、このステップをスキップできます。それ以外の場合は、アップグレード前にアプリケーションのスナップショットを作成して、アプリケーションの状態を保存します。

**実行中のアプリケーションからスナップショットを作成する**

*AWS マネジメントコンソール:*

1. アプリケーションに移動する

1. **スナップショット**の選択

1. **スナップショットの作成**を選択する

1. スナップショット名を入力する (例: `pre-flink-2.2-upgrade`)

1. **作成**を選択します。

*AWS CLI:*

```
aws kinesisanalyticsv2 create-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

**スナップショットの作成を確認する**

```
aws kinesisanalyticsv2 describe-application-snapshot \
    --application-name MyApplication \
    --snapshot-name pre-flink-2.2-upgrade
```

`SnapshotStatus` が になるまで待って`READY`から次に進みます。

## フェーズ 4: アプリケーションをアップグレードする
<a name="upgrade-guide-phase-4"></a>

[https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) アクションを使用して Flink アプリケーションをアップグレードできます。

`UpdateApplication` API は複数の方法で呼び出すことができます。
+ ** AWS マネジメントコンソールを使用します。**
  +  AWS マネジメントコンソールのアプリページに移動します。
  + [**設定**] を選択します。
  + 新しいランタイムと起動元のスナップショット (復元設定とも呼ばれる) を選択します。最新のスナップショットからアプリケーションを起動するには、最新の設定を復元設定として使用します。Amazon S3 上にある、アップグレードされた新しいアプリケーション JAR/zip をポイントします。
+ [https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/kinesisanalyticsv2/update-application.html) アクション**を使用します AWS CLI**。
+ **を使用します CloudFormation。**
  + `RuntimeEnvironment` フィールドを更新します。以前は、 CloudFormation がアプリケーションを削除して新しいアプリケーションを作成していたため、スナップショットやその他のアプリケーション履歴は失われていました。これで、 が所定の`RuntimeEnvironment`位置に CloudFormation 更新され、アプリケーションは削除されません。
+ ** AWS SDK を使用します。**
  + 選択したプログラミング言語については、SDK のドキュメントを参照してください。「[https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)」を参照してください。

アップグレードは、アプリケーションが `RUNNING` 状態のとき、またはアプリケーションが `READY` 状態で停止しているときに実行できます。Amazon Managed Service for Apache Flink は、元のランタイムバージョンとターゲットランタイムバージョンの互換性を検証します。この互換性チェックは、 `RUNNING` 状態`UpdateApplication`中に を実行したとき、または `READY`状態中にアップグレード`StartApplication`したときに実行されます。

**RUNNING 状態からのアップグレード**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

**READY 状態からのアップグレード**

```
aws kinesisanalyticsv2 update-application \
    --application-name MyApplication \
    --current-application-version-id <version-id> \
    --runtime-environment-update FLINK-2_2 \
    --application-configuration-update '{
        "ApplicationCodeConfigurationUpdate": {
            "CodeContentUpdate": {
                "S3ContentLocationUpdate": {
                    "FileKeyUpdate": "my-app-flink-2.2.jar"
                }
            }
        }
    }'
```

## フェーズ 5: アップグレードをモニタリングする
<a name="upgrade-guide-phase-5"></a>

**互換性チェック**
+ Operations API を使用して、アップグレードのステータスを確認します。ジョブの起動時にバイナリの非互換性や問題がある場合、アップグレードオペレーションは ログで失敗します。
+ アップグレードオペレーションは成功したが、アプリケーションが再起動ループで停止している場合、状態が新しい Flink バージョンと互換性がないか、更新されたコードに問題があることを意味します。状態の非互換性の問題を特定する方法[Flink 2.2 アップグレードのステート互換性ガイド](state-compatibility.md)を確認します。

**アプリケーションのヘルスをモニタリングする**

*アプリケーションの状態:*
+ アプリケーションステータスが遷移するはずです: `RUNNING` → `UPDATING` → `RUNNING`
+ アプリケーションのランタイムを確認します。2.2 の場合、アップグレードオペレーションは成功しました。
+ アプリケーションが にある`RUNNING`が、まだ古いランタイムにある場合、自動ロールバックが開始されました。Operations API はオペレーションを と表示します`FAILED`。ログをチェックして、失敗の例外を見つけます。

さらに、CloudWatch でこれらのメトリクスをモニタリングします。

*再起動メトリクス:*
+ `numRestarts`: 予期しない再起動をモニタリングする — `numRestarts`がゼロで、 `uptime`または が増加している場合、アップグレード`runningTime`は成功します。

*チェックポイントメトリクス:*
+ `lastCheckpointDuration`: アップグレード前の値と似ている必要があります
+ `numberOfFailedCheckpoints`: 0 のままにする必要があります

## フェーズ 6: アプリケーションの動作を検証する
<a name="upgrade-guide-phase-6"></a>

アプリケーションが Flink 2.2 で実行された後:

**機能検証**
+ ソースからデータが読み取られていることを確認する
+ シンクにデータが書き込まれていることを確認する
+ ビジネスロジックが期待される結果を生成することを確認する
+ 出力をアップグレード前のベースラインと比較する

**パフォーマンス検証**
+ レイテンシーメトリクスのモニタリング (end-to-endの処理時間)
+ スループットメトリクスのモニタリング (1 秒あたりのレコード数)
+ チェックポイントの期間とサイズをモニタリングする
+ メモリと CPU 使用率のモニタリング

**24 時間以上実行**

アプリケーションを本番環境で少なくとも 24 時間実行して、以下を確認します。
+ メモリリークなし
+ チェックポイントの動作が安定している
+ 予期しない再起動なし
+ 一貫したスループット

## フェーズ 7: ロールバック手順
<a name="upgrade-guide-phase-7"></a>

アップグレードが失敗するか、アプリケーションが実行されているが異常がある場合は、以前のバージョンにロールバックします。

**自動ロールバック**

自動ロールバックが有効で、起動中にアップグレードが失敗した場合、Amazon Managed Service for Apache Flink は自動的に以前のバージョンに戻ります。

**手動ロールバック**

アプリケーションが実行されているが異常がある場合は、 `RollbackApplication` API を使用します。

*AWS マネジメントコンソール:*

1. アプリケーションに移動する

1. **アクション**の選択 → **ロールバック**

1. ロールバックを確認する

*AWS CLI:*

```
aws kinesisanalyticsv2 rollback-application \
    --application-name MyApplication \
    --current-application-version-id <version-id>
```

**ロールバック中に何が起こるか:**
+ アプリケーションが停止する
+ ランタイムが以前の Flink バージョンに戻る
+ アプリケーションコードが以前の JAR に戻る
+ アップグレード**前に**最後に成功したスナップショットからアプリケーションが再起動する

**重要**  
Flink 1.x で Flink 2.2 スナップショットを復元することはできません
ロールバックは、アップグレード前に作成されたスナップショットを使用します。
アップグレードする前に必ずスナップショットを作成する (フェーズ 3)

## 次の手順
<a name="upgrade-guide-next-steps"></a>

アップグレード中の質問や問題については、「」を参照[Managed Service for Apache Flink をトラブルシューティングする](troubleshooting.md)するか、 AWS サポートにお問い合わせください。

# Flink 2.2 アップグレードのステート互換性ガイド
<a name="state-compatibility"></a>

Flink 1.x から Flink 2.2 にアップグレードする場合、状態互換性の問題により、アプリケーションがスナップショットから復元できなくなる可能性があります。このガイドは、潜在的な互換性の問題を特定し、移行戦略を提供するのに役立ちます。

## 状態の互換性の変更について
<a name="state-compat-understanding"></a>

Amazon Managed Service for Apache Flink 2.2 では、状態の互換性に影響するいくつかのシリアル化の変更が導入されています。主なものは次のとおりです。
+ **Kryo バージョンアップグレード**: Apache Flink 2.2 は、バンドルされた Kryo シリアライザーをバージョン 2 からバージョン 5 にアップグレードします。Kryo v5 は Kryo v2 とは異なるバイナリエンコード形式を使用するため、Flink 1.x セーブポイントで Kryo を介してシリアル化された演算子の状態を Flink 2.2 で復元することはできません。
+ **Java コレクションのシリアル化**: Flink 1.x では、POJOs 内の Java コレクション (`HashMap`、`ArrayList`、 など`HashSet`) は Kryo を使用してシリアル化されました。Flink 2.2 では、1.x の Kryo シリアル化された状態と互換性のないコレクション固有の最適化されたシリアライザーが導入されています。1.x の POJO または Kryo シリアライザーで Java コレクションを使用するアプリケーションは、Flink 2.2 でこの状態を復元できません。データ型とシリアル化の詳細については、「Flink [ドキュメント](https://nightlies.apache.org/flink/flink-docs-release-2.2/docs/dev/datastream/fault-tolerance/serialization/types_serialization/)」を参照してください。
+ **Kinesis Connector 互換性**: 5.0 より前の Kinesis Data Streams (KDS) コネクタバージョンは、Flink 2.2 Kinesis コネクタバージョン 6.0 と互換性のない状態を維持します。アップグレードする前に、コネクタバージョン 5.0 以降に移行する必要があります。

## シリアル化互換性リファレンス
<a name="state-compat-reference"></a>

アプリケーション内のすべての状態宣言を確認し、シリアル化タイプを以下の表に一致させます。いずれかの状態タイプに互換性がない場合は、アップグレードに進む前に[状態移行](#state-compat-migration)「」セクションを参照してください。


**シリアル化互換性リファレンス**  

| シリアル化タイプ | 互換性はありますか? | 詳細 | 
| --- | --- | --- | 
| Avro (SpecificRecord、GenericRecord) | はい | Kryo とは無関係に独自のバイナリ形式を使用します。Kryo シリアライザーとして登録されている Avro ではなく、Flink のネイティブ Avro タイプ情報を使用していることを確認します。 | 
| Protobuf | はい | Kryo とは無関係に独自のバイナリエンコーディングを使用します。スキーマの変更が下位互換性のある進化ルールに従っていることを確認します。 | 
| コレクションのない POJOs  | はい | Flink の POJO シリアライザーによって処理されます。ただし、クラスがパブリッククラス、パブリック no-arg コンストラクタ、すべてのフィールドがパブリックであるか、getters/setters を介してアクセス可能であり、すべてのフィールドタイプが Flink によってシリアル化可能であるすべての POJO 基準を満たしている場合に限ります。これらのいずれかに違反する POJO は、無音で Kryo にフォールバックし、互換性がなくなります。 | 
| カスタム TypeSerializers | はい | シリアライザーが内部的に Kryo に委任されていない場合にのみ互換性があります。 | 
| SQL およびテーブル API の状態 | はい (注意付き) | Flink の内部シリアライザーを使用します。ただし、Apache Flink は Table API アプリケーションのメジャーバージョン間の状態互換性を保証しません。最初に本番稼働用環境以外でテストします。 | 
| Java コレクションを含む POJOs (HashMap、ArrayList、HashSet) | いいえ | Flink 1.x では、POJOsは Kryo v2 を介してシリアル化されました。Flink 2.2 では、バイナリ形式が Kryo v2 形式と互換性のない専用のコレクションシリアライザーが導入されています。 | 
| Scala ケースクラス | いいえ | Flink 1.x の Kryo を介してシリアル化されます。Kryo v2 から v5 へのアップグレードにより、バイナリ形式が変更されます。 | 
| Java レコード | いいえ | 通常、Flink 1.x の Kryo シリアル化にフォールバックします。でテストして検証しますdisableGenericTypes()。 | 
| サードパーティーのライブラリタイプ | いいえ | カスタムシリアライザーが登録されていないタイプは Kryo にフォールバックします。Kryo v2 から v5 へのバイナリ形式の変更により、互換性が損なわれます。 | 
| Kryo フォールバックを使用する任意のタイプ | いいえ | Flink が組み込みまたは登録されたシリアライザーを使用してタイプを処理できない場合、Flink は Kryo にフォールバックします。1.x からのすべての Kryo シリアル化状態は 2.2 と互換性がありません。 | 

## 診断方法
<a name="state-compat-diagnostics"></a>

[UpdateApplication API](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) オペレーション後にアプリケーションログを確認するか、ログを調べることで、状態互換性の問題を積極的に特定できます。

**アプリケーションで Kryo フォールバックを特定する**

ログで次の正規表現パターンを使用して、アプリケーションの Kryo フォールバックを特定できます。

```
Class class (?<className>[^\s]+) cannot be used as a POJO type
```

サンプルログ:

```
Class class org.apache.flink.streaming.connectors.kinesis.model.SequenceNumber
cannot be used as a POJO type because not all fields are valid POJO fields,
and must be processed as GenericType. Please read the Flink documentation on
"Data Types & Serialization" for details of the effect on performance and
schema evolution.
```

UpdateApplication API を使用してアップグレードが失敗した場合、シリアライザーベースの状態の非互換性が発生していることを次の例外が通知することがあります。

**IndexOutOfBoundsException**

```
Caused by: java.lang.IndexOutOfBoundsException: Index 116 out of bounds for length 1
    at java.base/jdk.internal.util.Preconditions.outOfBounds(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.outOfBoundsCheckIndex(Unknown Source)
    at java.base/jdk.internal.util.Preconditions.checkIndex(Unknown Source)
    at java.base/java.util.Objects.checkIndex(Unknown Source)
    at java.base/java.util.ArrayList.get(Unknown Source)
    at com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:77)
    at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:923)
    ... 23 more
```

**StateMigrationException (POJOSerializer)**

```
Caused by: org.apache.flink.util.StateMigrationException: The new state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@8bf85b5d) must not be
incompatible with the old state serializer
(org.apache.flink.api.java.typeutils.runtime.PojoSerializer@3282ee3).
```

## アップグレード前のチェックリスト
<a name="state-compat-checklist"></a>
+ アプリケーション内のすべての状態宣言を確認する
+ コレクションを含む POJOs を確認する (`HashMap`、`ArrayList`、`HashSet`)
+ 各状態タイプのシリアル化方法を検証する
+ このレプリカで UpdateApplication API を使用して、Prod レプリカアプリケーションを作成し、状態の互換性をテストする
+ 状態が互換性がない場合は、 から戦略を選択します。 [状態移行](#state-compat-migration)
+ 本番稼働用 Flink アプリケーション設定で自動ロールバックを有効にする

## 状態移行
<a name="state-compat-migration"></a>

**再構築完了状態**

ソースデータから状態を再構築できるアプリケーションに最適です。

アプリケーションがソースデータから状態を再構築できる場合:

1. Flink 1.x アプリケーションの停止

1. 更新されたコードを使用して Flink 2.x にアップグレードする

1. で開始する `SKIP_RESTORE_FROM_SNAPSHOT`

1. アプリケーションに 状態の再構築を許可する

```
aws kinesisanalyticsv2 start-application \
    --application-name MyApplication \
    --run-configuration '{
        "ApplicationRestoreConfiguration": {
            "ApplicationRestoreType": "SKIP_RESTORE_FROM_SNAPSHOT"
        }
    }'
```

## ベストプラクティス
<a name="state-compat-best-practices"></a>

1. **複雑な状態では常に Avro または Protobuf を使用する** — これらはスキーマの進化を提供し、Kryo に依存しません。

1. **POJOs** — 代わりに Flink のネイティブ `ListState`と `MapState`を使用します

1. **ローカルで状態復元をテスト**する — 本番稼働用アップグレードの前に、実際のスナップショットでテストします。

1. **スナップショットを頻繁に作成する** — 特にメジャーバージョンのアップグレード前

1. **自動ロールバックを有効にする** — 障害時に自動的にロールバックするように MSF アプリケーションを設定する

1. **状態タイプを文書化する** — すべての状態タイプとそのシリアル化方法のドキュメントを維持します。

1. **チェックポイントサイズのモニタリング** — チェックポイントサイズの増加はシリアル化の問題を示している可能性があります

## 次の手順
<a name="state-compat-next-steps"></a>

**アップグレードを計画する**: 「」を参照してください[Flink 2.2 へのアップグレード: 完全なガイド](flink-2-2-upgrade-guide.md)。

移行中の質問や問題については、「」を参照[Managed Service for Apache Flink をトラブルシューティングする](troubleshooting.md)するか、 AWS サポートにお問い合わせください。

# Managed Service for Apache Flink でアプリケーションスケーリングを実装する
<a name="how-scaling"></a>

スケーリングを実装するために、Amazon Managed Service for Apache Flink のタスクの並列実行とリソースの割り当てを設定できます。Apache Flink がタスクの並列インスタンスをスケジュールする方法については、Apache Flink ドキュメントの「[Parallel Execution](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/)」を参照してください。

**Topics**
+ [アプリケーションの並列処理と ParallelismPerKPU を設定する](#how-parallelism)
+ [Kinesis 処理ユニットを割り当てる](#how-scaling-kpus)
+ [アプリケーションの並列処理を更新する](#how-scaling-howto)
+ [Managed Service for Apache Flink で自動スケーリングを使用する](how-scaling-auto.md)
+ [最大並列度に関する考慮事項](#how-scaling-auto-max-parallelism)

## アプリケーションの並列処理と ParallelismPerKPU を設定する
<a name="how-parallelism"></a>

Apache Flink アプリケーション用 Managed Serviceタスク (ソースからの読み取りやオペレータの実行など) のparallel 実行は、次の「[https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ApplicationConfiguration.html)」プロパティを使用して設定します。
+ `Parallelism` — このプロパティを使用して、デフォルトの Apache Flink アプリケーション並列処理を設定します。すべてのオペレータ、ソース、シンクは、アプリケーションコードでオーバーライドされない限り、この並列処理で実行されます。デフォルトは `1` で、最大値は `256` です。
+ `ParallelismPerKPU` — このプロパティを使用して、使用しているアプリケーションの Kinesis Processing Unit (KPU) あたりにスケジュールできるparallel タスクの数を設定します。デフォルトは `1` で、最大は `8` です。ブロッキングオペレーション (I/O など) を行うアプリケーションでは、`ParallelismPerKPU` の値が大きいほど KPU リソースを最大限に活用できます。

**注記**  
`Parallelism`の上限は、KPUの上限（デフォルトは64）の `ParallelismPerKPU` 倍です（デフォルトは64）。KPU の上限は、制限の引き上げをリクエストすることで増やすことができます。制限の引き上げをリクエストする方法については、「[Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html)」 の 「制限の引き上げをリクエストするには」 を参照してください。

特定のオペレーターにタスクの並列処理を設定する方法については、Apache Flink ドキュメントの「[Setting the Parallelism: Operator](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#operator-level)」を参照してください。

## Kinesis 処理ユニットを割り当てる
<a name="how-scaling-kpus"></a>

Apache Flink 用 Managed Service 1 つの KPU で 1 つの vCPU および 4 GB のメモリーが提供されます。割り当てられた KPU ごとに、50 GB の実行中のアプリケーションストレージも提供されます。

Apache Flink 用 Managed Serviceは、次のように `Parallelism` および `ParallelismPerKPU` プロパティを使用してアプリケーションの実行に必要な KPU を計算します。

```
Allocated KPUs for the application = Parallelism/ParallelismPerKPU
```

Apache Flink 用 Managed Service は、スループットや処理アクティビティの急増に応じて、アプリケーションリソースを迅速に提供します。アクティビティの急増が過ぎると、アプリケーションから徐々にリソースを削除します。リソースの自動割り当てを無効にするには、[アプリケーションの並列処理を更新する](#how-scaling-howto) で後述するように、`AutoScalingEnabled` 値を `false` に設定します。

アプリケーションの KPU のデフォルト制限は 64 です。制限の引き上げをリクエストする方法については、「[Service Quotas](https://docs.aws.amazon.com/general/latest/gr/aws_service_limits.html)」の 「制限の引き上げをリクエストするには」 を参照してください。

**注記**  
オーケストレーションの目的で追加の KPU が課金されます。詳細については、「[Managed Service for Apache Flink の料金](https://aws.amazon.com/kinesis/data-analytics/pricing/)」 を参照してください

## アプリケーションの並列処理を更新する
<a name="how-scaling-howto"></a>

このセクションには、アプリケーションの並列処理を設定する API アクションのサンプルリクエストが含まれています。API アクションでリクエストブロックを使用する方法のその他の例と手順については、[Managed Service for Apache Flink API コードの例](api-examples.md) を参照してください。

以下の「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_CreateApplication.html)」アクションのリクエスト例では、アプリケーションの作成時に並列処理を設定します。

```
{
   "ApplicationName": "string",
   "RuntimeEnvironment":"FLINK-1_18",
   "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole",
   "ApplicationConfiguration": { 
      "ApplicationCodeConfiguration":{
      "CodeContent":{
         "S3ContentLocation":{
            "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket",
            "FileKey":"myflink.jar",
            "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345"
            }
         },
      "CodeContentType":"ZIPFILE"
   },   
      "FlinkApplicationConfiguration": { 
         "ParallelismConfiguration": { 
            "AutoScalingEnabled": "true",
            "ConfigurationType": "CUSTOM",
            "Parallelism": 4,
            "ParallelismPerKPU": 4
         }
      }
   }
}
```

次の [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) アクションのリクエスト例では、既存のアプリケーションの並列処理を設定します。

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "true",
            "ConfigurationTypeUpdate": "CUSTOM",
            "ParallelismPerKPUUpdate": 4,
            "ParallelismUpdate": 4
         }
      }
   }
}
```

次の [https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_UpdateApplication.html) アクションのリクエストの例では、既存のアプリケーションの並列処理が無効になります。

```
{
   "ApplicationName": "MyApplication",
   "CurrentApplicationVersionId": 4,
   "ApplicationConfigurationUpdate": { 
      "FlinkApplicationConfigurationUpdate": { 
         "ParallelismConfigurationUpdate": { 
            "AutoScalingEnabledUpdate": "false"
         }
      }
   }
}
```

# Managed Service for Apache Flink で自動スケーリングを使用する
<a name="how-scaling-auto"></a>

Apache Flink 用 Managed Service は、ほとんどのシナリオでソースのデータスループットとオペレーターの複雑さに対応できるように、アプリケーションの並列度を柔軟にスケーリングします。自動スケーリングはデフォルトで有効になっている。Apache Flink 用 Managed Service は、アプリケーションのリソース (CPU) 使用状況を監視し、それに応じてアプリケーションの並列度を柔軟にスケールアップまたはスケールダウンします。
+ CloudWatch メトリクスの最大 `containerCPUUtilization` が 15 分間 75% 以上になると、アプリケーションがスケールアップ (並列処理数が増加) します。つまり、1 分間で 75 パーセント以上の連続したデータポイントが 15 個あると、`ScaleUp` アクションが開始されます。`ScaleUp` アクションはアプリケーションの `CurrentParallelism` を 2 倍にします。`ParallelismPerKPU` は変更されません。その結果、割り当てられた KPU の数も 2 倍になります。
+ CPU 使用率が 6 時間にわたって 10% を下回ると、アプリケーションはスケールダウン (並列処理が減少) します。つまり、1 分間の期間が 10% 未満の連続するデータポイントが 360 個ある場合に、`ScaleDown` アクションが開始されます。`ScaleDown` アクションは、アプリケーションの並列処理を半分にします (切り上げ)。`ParallelismPerKPU` は変更されず、割り当てられた KPU の数も半分になります (切り上げ)。

**注記**  
1 分間の `containerCPUUtilization` の最大値は、スケーリングアクションに使用するデータポイントとの相関を見つけるために参照できますが、アクションが開始された正確な瞬間を反映する必要はありません。

Apache Flink 用 Managed Service では、アプリケーションの `CurrentParallelism` 値がアプリケーションの `Parallelism` 設定を下回ることはありません。

Apache Flink 用 Managed Serviceサービスがアプリケーションをスケーリングしているときは、`AUTOSCALING` 状態になります。現在のアプリケーションのステータスは、「[DescribeApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_DescribeApplication.html)」アクションまたは「[ListApplications](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)」アクションを使用して確認できます。サービスがアプリケーションをスケーリングしている間、使用できる有効な API アクションは、`Force` パラメータを `true` に設定した「[StopApplication](https://docs.aws.amazon.com//managed-flink/latest/apiv2/API_ListApplications.html)」だけです。

`AutoScalingEnabled` プロパティ (「[https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html](https://docs.aws.amazon.com/managed-service-for-apache-flink/latest/apiv2/API_FlinkApplicationConfiguration.html)」の一部 ) を使用して、auto スケーリングの動作を有効または無効にすることができます。Managed Service for Apache Flink がプロビジョニングする KPUs に対して AWS アカウントが課金されます。これは、アプリケーションの `parallelism`および `parallelismPerKPU`設定の関数です。アクティビティが急増すると、Apache Flink 用 Managed Service のコストが増加します。

料金については、「[Amazon Managed Service for Apache Flink の料金](https://aws.amazon.com/kinesis/data-analytics/pricing/)」を参照してください。

アプリケーションのスケーリングについて、以下のことに注意してください：
+ 自動スケーリングはデフォルトで有効になっている。
+ Studio ノートブックにはスケーリングは適用されません。ただし、Studio Notebook を永続的状態のアプリケーションとしてデプロイすると、スケーリングはデプロイされたアプリケーションに適用されます。
+ 使用しているアプリケーションのデフォルトの上限は 64 KPU です。詳細については、「[Managed Service for Apache Flink および Studio ノートブッククォータ](limits.md)」を参照してください。
+ 自動スケーリングによってアプリケーションの並列度が更新されると、アプリケーションのダウンタイムが発生します。このダウンタイムを回避するには、以下を実行します。
  + 自動スケーリングを無効にする
  + 「[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html)」アクションを使用してアプリケーションの `parallelism` および `parallelismPerKPU` を設定します。使用しているアプリケーションの並列処理の設定の詳細については、「[アプリケーションの並列処理を更新する](how-scaling.md#how-scaling-howto)」を参照してください。
  + アプリケーションのリソース使用状況を定期的に監視して、アプリケーションがワークロードに適した並列度設定になっていることを確認してください。割り当てリソースの使用状況を監視する方法については、[Managed Service for Apache Flink でのメトリクスおよびディメンション](metrics-dimensions.md) を参照してください。

## カスタムの自動スケーリングを実装する
<a name="how-scaling-custom-autoscaling"></a>

自動スケーリングをよりきめ細かく制御したり、`containerCPUUtilization` 以外のトリガーメトリクスを使用したりする場合は、次の例を参考にしてください。
+ [AutoScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/AutoScaling)

  この例は、ソースまたはシンクとして使用される Amazon MSK および Amazon Kinesis Data Streams のメトリクスなど、Apache Flink アプリケーションとは異なる CloudWatch メトリクスを使用して Managed Service for Apache Flink アプリケーションをスケールする方法を示します。

追加情報については、「[Enhanced monitoring and automatic scaling for Apache Flink](https://aws.amazon.com/blogs/big-data/enhanced-monitoring-and-automatic-scaling-for-apache-flink/)」を参照してください。

## スケジュールされた自動スケーリングを実装する
<a name="how-scaling-scheduled-autoscaling"></a>

ワークロードが時間の経過に合わせて予測可能なプロファイルに従う場合は、Apache Flink アプリケーションを事前にスケールすることをお勧めします。これにより、メトリクスに基づいて事後にスケーリングするのではなく、スケジュールされた時間にアプリケーションがスケーリングされます。1 日の特定の時間にスケールアップとスケールダウンを設定するには、次の例を参考にしてください。
+ [ScheduledScaling](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/infrastructure/ScheduledScaling)

## 最大並列度に関する考慮事項
<a name="how-scaling-auto-max-parallelism"></a>

Flink ジョブがスケールできる最大並列処理数は、ジョブのすべてのオペレーターの*最小* `maxParallelism` によって制限されます。例えば、ソースとシンクのみを持つシンプルなジョブがあり、ソースの `maxParallelism` が 16 で、シンクが 8 の場合、アプリケーションは並列処理数の 8 を超えてスケールすることはできません。

オペレーターのデフォルト `maxParallelism` の計算方法とデフォルトを上書きする方法については、Apache Flink ドキュメントの「[Setting the Maximum Parallelism](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/execution/parallel/#setting-the-maximum-parallelism)」を参照してください。

基本的なルールとして、オペレーターに `maxParallelism` を定義せず、並列処理数を 128 以下に設定してアプリケーションを起動すると、すべてのオペレーターは `maxParallelism` が 128 になることに注意してください。

**注記**  
ジョブの最大並列処理数は、状態を保持しているアプリケーションをスケールする際の並列処理数の上限です。  
既存のアプリケーションの `maxParallelism` を変更すると、アプリケーションは従来の `maxParallelism` で作成された以前のスナップショットから再起動できなくなります。アプリケーションはスナップショットなしでしか再起動できません。  
アプリケーションを 128 を超える並列処理にスケールする場合は、アプリケーションで `maxParallelism` を明示的に設定する必要があります。
+ 自動スケーリングロジックは、ジョブの最大並列処理数を超える並列処理数にまで Flink ジョブをスケールすることを防ぎます。
+ カスタムの自動スケーリングまたはスケジュールされたスケーリングを使用する場合は、ジョブの最大並列処理数を超えないように設定します。
+ 最大並列処理数を超えてアプリケーションを手動でスケールすると、アプリケーションの起動に失敗します。

# Managed Service for Apache Flink アプリケーションにタグを追加する
<a name="how-tagging"></a>



このセクションでは、アプリケーションにキーバリューメタデータタグをManaged Service for Apache Flink アプリケーションに追加する方法について説明します。これらのタグは以下の目的に使用できます。
+ 個々のApache Flink アプリケーション用 Managed Serviceの課金を決定。詳細については、Billing と Cost Management ユーザーガイドの 「[コスト配分タグを使用する](https://docs.aws.amazon.com/awsaccountbilling/latest/aboutv2/cost-alloc-tags.html)」 を参照してください。
+ タグに基づいてアプリケーションリソースへのアクセスをコントロールする。詳細については、[AWS Identity and Access Management ユーザーガイド](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_tags.html)の*タグを使用したアクセス制御*を参照してください。
+ ユーザー定義の目的で。ユーザータグに基づいてアプリケーションの機能を定義できます。

タグ付けに関する以下の情報に注意してください。
+ アプリケーションタグの最大数にはシステムタグが含まれます。ユーザー定義のアプリケーションタグの最大数は 50 です。
+ アクションに含まれているタグリストで `Key` 値が重複している場合、サービスは `InvalidArgumentException` をスローします。

**Topics**
+ [アプリケーション作成時にタグを追加する](how-tagging-create.md)
+ [既存のアプリケーションに対しタグを追加または更新する](how-tagging-add.md)
+ [アプリケーションのタグを一覧表示する](how-tagging-list.md)
+ [アプリケーションからタグを削除する](how-tagging-remove.md)

# アプリケーション作成時にタグを追加する
<a name="how-tagging-create"></a>

タグの追加は、[CreateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplication.html) アクションの `tags` パラメータを使ってアプリケーションを作成する際に行います。

以下のリクエスト例では、`CreateApplication` リクエストの `Tags` ノードを示しています。

```
"Tags": [ 
    { 
        "Key": "Key1",
        "Value": "Value1"
    },
    { 
        "Key": "Key2",
        "Value": "Value2"
    }
]
```

# 既存のアプリケーションに対しタグを追加または更新する
<a name="how-tagging-add"></a>

[TagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_TagResource.html) アクションを使用して、アプリケーションにタグを追加します。[UpdateApplication](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UpdateApplication.html) アクションを使用して、アプリケーションにタグを追加することはできません。

既存のタグを更新するには、既存のタグのものと同じキーを含むタグを追加します。

`TagResource` アクションの以下のリクエスト例では、新しいタグを追加するか、既存のタグを更新します。

```
{
   "ResourceARN": "string",
   "Tags": [ 
      { 
         "Key": "NewTagKey",
         "Value": "NewTagValue"
      },
      { 
         "Key": "ExistingKeyOfTagToUpdate",
         "Value": "NewValueForExistingTag"
      }
   ]
}
```

# アプリケーションのタグを一覧表示する
<a name="how-tagging-list"></a>

既存のタグを一覧表示するには、[ListTagsForResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_ListTagsForResource.html) アクションを使用します。

`ListTagsForResource` アクションの以下のリクエスト例では、アプリケーションのタグを一覧表示します。

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication"
}
```

# アプリケーションからタグを削除する
<a name="how-tagging-remove"></a>

アプリケーションからタグを削除するには、[UntagResource](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_UntagResource.html) アクションを使用します。

`UntagResource` アクションの以下のリクエスト例では、アプリケーションからタグを削除します。

```
{
   "ResourceARN": "arn:aws:kinesisanalyticsus-west-2:012345678901:application/MyApplication",
   "TagKeys": [ "KeyOfFirstTagToRemove", "KeyOfSecondTagToRemove" ]
}
```

# Managed Service for Apache Flink で CloudFormation を使用する
<a name="lambda-cfn-flink"></a>

次の演習では、同じスタックで Lambda 関数 CloudFormation を使用して で作成された Flink アプリケーションを起動する方法を示します。

## [開始する前に]
<a name="before-you-begin"></a>

この演習を開始する前に、[AWS::KinesisAnalytics::Application](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-kinesis-analyticsapplication.html) CloudFormation で を使用して Flink アプリケーションを作成するステップに従います。

## Lambda 関数を書き込む
<a name="write-lambda-function"></a>

作成または更新後に Flink アプリケーションを起動するには、kinesisanalyticsv2 「[アプリケーションを起動](https://docs.aws.amazon.com/cli/latest/reference/kinesisanalyticsv2/start-application.html)」 API を使用します。呼び出しは、Flink アプリケーションの作成後に CloudFormation イベントによってトリガーされます。Lambda 関数をトリガーするためのスタックの設定方法については、このエクササイズの後半で説明しますが、まずは Lambda 関数の宣言とそのコードに焦点を当てます。この例では「`Python3.8`」ランタイムを使用しています。

```
StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration. 
              run_configuration = { 
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': 'RESTORE_FROM_LATEST_SNAPSHOT',
                }
              }
                            
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
```

前述のコードでは、Lambda は受信 CloudFormation イベントを処理し、 `Create`と 以外のすべてを除外し`Update`、アプリケーションの状態を取得し、状態が の場合は起動します`READY`。アプリケーションの状態を取得するには、次に示すように Lambda ロールを作成する必要があります。

## Lambda ロールを作成する
<a name="create-lambda-role"></a>

Lambda がアプリケーションと正常に「対話」してログを書き込むためのロールを作成します。このロールはデフォルトのマネージドポリシーを利用していますが、必要に応じてカスタムポリシーに絞り込むこともできます。

```
StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
```

Lambda リソースは Flink アプリケーションに依存しているため、同じスタックで Flink アプリケーションを作成した後に作成されることに注意してください。

## Lambda 関数の呼び出し
<a name="invoking-lambda-function"></a>

あとは、Lambda 関数を呼び出すだけです。これは「[カスタムリソース](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-cfn-customresource.html)」を使用して行われます。

```
StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
```

Lambdaを使って Flink アプリケーションを起動するのに必要なものはこれだけです。これで、独自のスタックを作成するか、以下の完全な例を使用して、これらすべてのステップが実際にどのように機能するかを確認する準備ができました。

## 拡張例を確認する
<a name="lambda-cfn-flink-full-example"></a>

以下の例は、前のステップを少し拡張したもので、「[テンプレートパラメーター](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/parameters-section-structure.html)」を使って `RunConfiguration` を微調整したものです。これは、試してみるための作業スタックです。添付の注意事項を必ずお読みください。

stack.yaml

```
Description: 'kinesisanalyticsv2 CloudFormation Test Application'
Parameters:
  ApplicationRestoreType:
    Description: ApplicationRestoreConfiguration option, can be SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT or RESTORE_FROM_CUSTOM_SNAPSHOT.
    Type: String
    Default: SKIP_RESTORE_FROM_SNAPSHOT
    AllowedValues: [ SKIP_RESTORE_FROM_SNAPSHOT, RESTORE_FROM_LATEST_SNAPSHOT, RESTORE_FROM_CUSTOM_SNAPSHOT ]
  SnapshotName:
    Description: ApplicationRestoreConfiguration option, name of a snapshot to restore to, used with RESTORE_FROM_CUSTOM_SNAPSHOT ApplicationRestoreType.
    Type: String
    Default: ''
  AllowNonRestoredState:
    Description: FlinkRunConfiguration option, can be true or false.
    Default: true
    Type: String
    AllowedValues: [ true, false ]
  CodeContentBucketArn:
    Description: ARN of a bucket with application code.
    Type: String
  CodeContentFileKey:
    Description: A jar filename with an application code inside a bucket.
    Type: String
Conditions:
  IsSnapshotNameEmpty: !Equals [ !Ref SnapshotName, '' ]
Resources:
  TestServiceExecutionRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service: 
                - kinesisanlaytics.amazonaws.com
            Action: sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/AmazonKinesisFullAccess
        - arn:aws:iam::aws:policy/AmazonS3FullAccess
      Path: /
  InputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  OutputKinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      ShardCount: 1
  TestFlinkApplication:
    Type: 'AWS::kinesisanalyticsv2::Application'
    Properties:
      ApplicationName: 'CFNTestFlinkApplication'
      ApplicationDescription: 'Test Flink Application'
      RuntimeEnvironment: 'FLINK-1_18'
      ServiceExecutionRole: !GetAtt TestServiceExecutionRole.Arn
      ApplicationConfiguration:
        EnvironmentProperties:
          PropertyGroups:
            - PropertyGroupId: 'KinesisStreams'
              PropertyMap:
                INPUT_STREAM_NAME: !Ref InputKinesisStream
                OUTPUT_STREAM_NAME: !Ref OutputKinesisStream
                AWS_REGION: !Ref AWS::Region
        FlinkApplicationConfiguration:
          CheckpointConfiguration:
            ConfigurationType: 'CUSTOM'
            CheckpointingEnabled: True
            CheckpointInterval: 1500
            MinPauseBetweenCheckpoints: 500
          MonitoringConfiguration:
            ConfigurationType: 'CUSTOM'
            MetricsLevel: 'APPLICATION'
            LogLevel: 'INFO'
          ParallelismConfiguration:
            ConfigurationType: 'CUSTOM'
            Parallelism: 1
            ParallelismPerKPU: 1
            AutoScalingEnabled: True
        ApplicationSnapshotConfiguration:
          SnapshotsEnabled: True
        ApplicationCodeConfiguration:
          CodeContent:
            S3ContentLocation:
              BucketARN: !Ref CodeContentBucketArn
              FileKey: !Ref CodeContentFileKey
          CodeContentType: 'ZIPFILE'     
  StartApplicationLambdaRole:
    Type: AWS::IAM::Role
    DependsOn: TestFlinkApplication
    Properties:
      Description: A role for lambda to use while interacting with an application.
      AssumeRolePolicyDocument:
        Version: '2012-10-17		 	 	 '
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - sts:AssumeRole
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/Amazonmanaged-flinkFullAccess
        - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess
      Path: /
  StartApplicationLambda:
    Type: AWS::Lambda::Function
    DependsOn: StartApplicationLambdaRole
    Properties:
      Description: Starts an application when invoked.
      Runtime: python3.8
      Role: !GetAtt StartApplicationLambdaRole.Arn
      Handler: index.lambda_handler
      Timeout: 30
      Code:
        ZipFile: |
          import logging
          import cfnresponse
          import boto3
          
          logger = logging.getLogger()
          logger.setLevel(logging.INFO)
          
          def lambda_handler(event, context):
            logger.info('Incoming CFN event {}'.format(event))
            
            try:
              application_name = event['ResourceProperties']['ApplicationName']
              
              # filter out events other than Create or Update,
              # you can also omit Update in order to start an application on Create only.
              if event['RequestType'] not in ["Create", "Update"]:
                logger.info('No-op for Application {} because CFN RequestType {} is filtered'.format(application_name, event['RequestType'])) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # use kinesisanalyticsv2 API to start an application.
              client_kda = boto3.client('kinesisanalyticsv2', region_name=event['ResourceProperties']['Region'])
              
              # get application status.
              describe_response = client_kda.describe_application(ApplicationName=application_name)
              application_status = describe_response['ApplicationDetail']['ApplicationStatus']
              
              # an application can be started from 'READY' status only.
              if application_status != 'READY':
                logger.info('No-op for Application {} because ApplicationStatus {} is filtered'.format(application_name, application_status)) 
                cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
                
                return
              
              # create RunConfiguration from passed parameters. 
              run_configuration = { 
                'FlinkRunConfiguration': {
                  'AllowNonRestoredState': event['ResourceProperties']['AllowNonRestoredState'] == 'true'
                },
                'ApplicationRestoreConfiguration': {
                  'ApplicationRestoreType': event['ResourceProperties']['ApplicationRestoreType'],
                }
              }
              
              # add SnapshotName to RunConfiguration if specified.
              if event['ResourceProperties']['SnapshotName'] != '':
                run_configuration['ApplicationRestoreConfiguration']['SnapshotName'] = event['ResourceProperties']['SnapshotName']
              
              logger.info('RunConfiguration for Application {}: {}'.format(application_name, run_configuration)) 
              
              # this call doesn't wait for an application to transfer to 'RUNNING' state.
              client_kda.start_application(ApplicationName=application_name, RunConfiguration=run_configuration)
              
              logger.info('Started Application: {}'.format(application_name)) 
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {})
            except Exception as err:
              logger.error(err)
              cfnresponse.send(event,context, cfnresponse.FAILED, {"Data": str(err)})
  StartApplicationLambdaInvoke:
    Description: Invokes StartApplicationLambda to start an application.
    Type: AWS::CloudFormation::CustomResource
    DependsOn: StartApplicationLambda
    Version: "1.0"
    Properties:
      ServiceToken: !GetAtt StartApplicationLambda.Arn
      Region: !Ref AWS::Region
      ApplicationName: !Ref TestFlinkApplication
      ApplicationRestoreType: !Ref ApplicationRestoreType
      SnapshotName: !Ref SnapshotName
      AllowNonRestoredState: !Ref AllowNonRestoredState
```

繰り返しになりますが、アプリケーション自体だけでなく、Lambda のロールも調整したい場合があります。

上記のスタックを作成する前に、パラメータを指定することを忘れないでください。

parameters.json

```
[
  {
    "ParameterKey": "CodeContentBucketArn",
    "ParameterValue": "YOUR_BUCKET_ARN"
  },
  {
    "ParameterKey": "CodeContentFileKey",
    "ParameterValue": "YOUR_JAR"
  },
  {
    "ParameterKey": "ApplicationRestoreType",
    "ParameterValue": "SKIP_RESTORE_FROM_SNAPSHOT"
  },
  {
    "ParameterKey": "AllowNonRestoredState",
    "ParameterValue": "true"
  }
]
```

「`YOUR_BUCKET_ARN`」と「`YOUR_JAR`」を特定の要件に置き換えてください。この「[ガイド](https://docs.aws.amazon.com/managed-flink/latest/java/get-started-exercise.html)」に従って Amazon S3 バケットとアプリケーション jar を作成できます。

次に、スタックを作成します (YOUR\$1REGION を us-east-1 などの任意のリージョンに置き換える)。

```
aws cloudformation create-stack --region YOUR_REGION --template-body "file://stack.yaml" --parameters "file://parameters.json" --stack-name "TestManaged Service for Apache FlinkStack" --capabilities CAPABILITY_NAMED_IAM
```

これで「[https://console.aws.amazon.com/cloudformation](https://console.aws.amazon.com/cloudformation)」に移行して進行状況を確認できるようになりました。作成されると、Flinkアプリケーションが「`Starting`」状態になるはずです。「`Running`」の起動には数分かかる場合があります。

詳細については次を参照してください:
+ [AWS CloudFormation を使用して AWS サービスプロパティを取得する 4 つの方法 (パート 1/3)](https://aws.amazon.com/blogs/mt/four-ways-to-retrieve-any-aws-service-property-using-aws-cloudformation-part-1/)。
+ [チュートリアル: Amazon マシンイメージ ID を参照する](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/walkthrough-custom-resources-lambda-lookup-amiids.html)。

# Managed Service for Apache Flink で Apache Flink Dashboard を使用する
<a name="how-dashboard"></a>

アプリケーションの Apache Flink Dashboard を使用して Apache Flink 用 Managed Service アプリケーションの健全性を監視することができます。アプリケーションのダッシュボードには、以下の情報が表示されます。
+ Task Managers や Task Slots など、使用中のリソース。
+ 実行中、完了、キャンセル、失敗したジョブなど、ジョブに関する情報。

Apache Flink Task Managers、Task Slots、Jobsに関する詳細は、Apache Flink ウェブサイトの「[Apache Flink Architecture](https://flink.apache.org/what-is-flink/flink-architecture/)」を参照してください。

Apache Flink アプリケーション用 Managed Service を搭載した Apache Flink Dashboard を使用する際には、次の点に注意してください。
+ Apache Flink アプリケーションの Managed Service 用 Apache Flink Dashboard は読み取り専用です。Apache Flink Dashboard で Apache Flink アプリケーション用 Managed Service を変更することはできません。
+ Apache Flink Dashboard は Microsoft Internet Explorer と互換性がありません。

## アプリケーションの Apache Flink Dashboard にアクセスする
<a name="how-dashboard-accessing"></a>

アプリケーションの Apache Flink Dashboard には、 Apache Flink コンソール用のManaged Service を使用するか、CLI を使用して安全な URL エンドポイントをリクエストすることでアクセスすることができます。

### Managed Service for Apache Flink コンソールを使用して、アプリケーションの Apache Flink Dashboard にアクセスする
<a name="how-dashboard-accessing-console"></a>

コンソールからアプリケーションの Apache Flink Dashboard にアクセスするには、アプリケーションのページで「**Apache Flink Dashboard**」を選択します。

**注記**  
Apache Flink コンソール用 Managed Service からダッシュボードを開くと、コンソールが生成する URL は 12 時間有効になります。

### Managed Service for Apache Flink CLI を使用して、アプリケーションの Apache Flink Dashboard にアクセスする
<a name="how-dashboard-accessing-cli"></a>

Apache Flink CLI 用 Managed Service を使用して、アプリケーションダッシュボードにアクセスするための URL を生成することができます。生成した URL は、指定された期間だけ有効になります。

**注記**  
生成された URL に 3 分以内にアクセスしなければ、その URL は無効となります。

「[CreateApplicationPreSignedURL](https://docs.aws.amazon.com/managed-flink/latest/apiv2/API_CreateApplicationPresignedUrl.html)」アクションを使用してダッシュボード URL を生成します。アクションには以下のパラメータを指定できます。
+ アプリケーション名
+ URL が有効になる時間（秒単位）
+ URLタイプとして `FLINK_DASHBOARD_URL` を指定します