Managed Service for Apache Flink で耐障害性を実装する - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

Managed Service for Apache Flink で耐障害性を実装する

チェックポインティングは、Amazon Managed Service for Apache Flink でフォールトトレランスを実装するために使用される方法です。「チェックポイント」とは、実行中のアプリケーションの最新のバックアップのことで、予期せぬアプリケーションの中断やフェイルオーバーから即座に回復するために使用されます。

Apache Flink アプリケーションのチェックポイントについて詳しくは、Apache Flink ドキュメントの「Checkpoints」を参照してください。

スナップショットは、アプリケーションの状態のバックアップを手動で作成して管理するものです。スナップショットを使うと、「UpdateApplication」の呼び出しによってアプリケーションを以前の状態に復元できます。詳細については、「スナップショットを使用してアプリケーションバックアップを管理する」を参照してください。

アプリケーションのチェックポイント機能が有効になっていると、アプリケーションが予期せず再起動した場合にアプリケーションデータのバックアップを作成して読み込むことができるため、耐障害性が確保されます。このような予期しないアプリケーションの再起動は、ジョブの予期しない再起動、インスタンスの障害などが原因である可能性があります。これにより、アプリケーションは、これらの再起動時に無障害実行と同じセマンティクスを持つことになります。

アプリケーションのスナップショットが有効になっていて、アプリケーションの「ApplicationRestoreConfiguration」を使用して設定されている場合、サービスはアプリケーションの更新中、またはサービス関連のスケーリングやメンテナンスの際に 1 回限りの処理セマンティクスを提供します。

Managed Service for Apache Flink でチェックポインティングを設定する

アプリケーションのチェックポインティング動作を構成できます。チェックポイントの状態を持続させるかどうか、チェックポイントに状態を保存する頻度、1 つのチェックポイント操作の終了から別のチェックポイント操作の開始までの最小間隔を定義できます。

CreateApplication」または「UpdateApplication」API 操作を使用して次の設定を行います。

  • CheckpointingEnabled」— アプリケーションでチェックポイントが有効になっているかどうかを示します。

  • CheckpointInterval」— チェックポイント (パーシスタンス) 操作の間隔をミリ秒単位で含みます。

  • ConfigurationType」— デフォルトのチェックポイント動作を使用するには、この値を「DEFAULT」に設定します。この値を「CUSTOM」に設定すると、他の値を設定できます。

    注記

    デフォルトのチェックポイント動作は次のとおりです。

    • CheckpointingEnabled: true

    • CheckpointInterval: 60000

    • MinPauseBetweenCheckpoints: 5000

    ConfigurationType」が DEFAULT に設定されている場合、AWS Command Line Interface を使用するか、アプリケーションコードで値を設定することで、他の値に設定されていても直前の値が使用されます。

    注記

    Flink 1.15 以降では、Apache Flink 用 Managed Serviceは、アプリケーションの更新、スケーリング、停止などの自動スナップショット作成時に stop-with-savepoint を使用します。

  • MinPauseBetweenCheckpoints — 1 つのチェックポイント操作が終了してから別のチェックポイント操作が開始されるまでの最小時間 (ミリ秒単位)。この値を設定すると、チェックポイントオペレーションが CheckpointInterval よりも時間がかかる場合でも、アプリケーションは継続的にチェックポイント機能を実行できなくなります。

チェックポインティング API の例を確認する

このセクションには、アプリケーションのチェックポイントを構成するための API アクションのリクエスト例が含まれています。JSON ファイルを API アクションの入力に使用する方法の詳細については、Managed Service for Apache Flink API コードの例 を参照してください。

新しいアプリケーションのチェックポインティングを設定する

以下の「CreateApplication」アクションのリクエスト例では、アプリケーションの作成時にチェックポインティングを設定しています。

{ "ApplicationName": "MyApplication", "RuntimeEnvironment":"FLINK-1_19", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "ApplicationConfiguration": { "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "FlinkApplicationConfiguration": { "CheckpointConfiguration": { "CheckpointingEnabled": "true", "CheckpointInterval": 20000, "ConfigurationType": "CUSTOM", "MinPauseBetweenCheckpoints": 10000 } } }

新しいアプリケーションのチェックポインティングを無効にする

以下の「CreateApplication」アクションのリクエスト例では、アプリケーションの作成時にチェックポインティングを無効にします。

{ "ApplicationName": "MyApplication", "RuntimeEnvironment":"FLINK-1_19", "ServiceExecutionRole":"arn:aws:iam::123456789123:role/myrole", "ApplicationConfiguration": { "ApplicationCodeConfiguration":{ "CodeContent":{ "S3ContentLocation":{ "BucketARN":"arn:aws:s3:::amzn-s3-demo-bucket", "FileKey":"myflink.jar", "ObjectVersion":"AbCdEfGhIjKlMnOpQrStUvWxYz12345" } }, "FlinkApplicationConfiguration": { "CheckpointConfiguration": { "CheckpointingEnabled": "false" } } }

既存のアプリケーションのチェックポインティングを設定する

以下の「UpdateApplication」アクションリクエスト例では、既存のアプリケーションのチェックポインティングを設定しています。

{ "ApplicationName": "MyApplication", "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "CheckpointingEnabledUpdate": true, "CheckpointIntervalUpdate": 20000, "ConfigurationTypeUpdate": "CUSTOM", "MinPauseBetweenCheckpointsUpdate": 10000 } } } }

既存のアプリケーションのチェックポインティングを無効にする

以下の「UpdateApplication」アクションのリクエスト例では、既存のアプリケーションのチェックポインティングを無効にします。

{ "ApplicationName": "MyApplication", "ApplicationConfigurationUpdate": { "FlinkApplicationConfigurationUpdate": { "CheckpointConfigurationUpdate": { "CheckpointingEnabledUpdate": false, "CheckpointIntervalUpdate": 20000, "ConfigurationTypeUpdate": "CUSTOM", "MinPauseBetweenCheckpointsUpdate": 10000 } } } }