Amazon MWAA での Apache Airflow のパフォーマンス調整
このトピックでは、Amazon MWAA での Apache Airflow 設定オプションの使用 を使用して Amazon Managed Workflows for Apache Airflow 環境のパフォーマンスを調整する方法について説明します。
Apache Airflow 設定オプションの追加
環境に Airflow 設定オプションを追加するには、次の手順に従います。
-
Amazon MWAA コンソールで、環境ページ を開きます。
-
環境を選択します。
-
編集 を選択します。
-
次へ を選択します。
-
Airflow 設定オプション ペインで カスタム設定を追加 を選択します。
-
ドロップダウンリストから設定を選択して値を入力するか、カスタム設定を入力して値を入力します。
-
追加する設定ごとに カスタム設定を追加 を選択します。
-
保存 を選択します。
詳細については、Amazon MWAA での Apache Airflow 設定オプションの使用 を参照してください。
Apache Airflow スケジューラー
Apache Airflow スケジューラーは Apache Airflow のコアコンポーネントです。スケジューラーに問題があると、DAG の解析やタスクのスケジュール設定ができなくなる可能性があります。Apache Airflow スケジューラーの調整の詳細については、Apache Airflow ドキュメンテーションウェブサイトの スケジューラーのパフォーマンスのファインチューニング を参照してください。
パラメータ
このセクションでは、Apache Airflow スケジューラー (Apache Airflow v2 以降) で使用できる設定オプションとそのユースケースについて説明します。
- Apache Airflow v3
-
| 設定 |
ユースケース |
|
celery.sync_parallelism
Celery エグゼキュターがタスクの状態を同期するために使用するプロセスの数です。
デフォルト: 1
|
このオプションを使用すると、Celery エグゼキュターが使用するプロセスを制限することでキューの競合を防ぐことができます。デフォルトでは、CloudWatch Logs にタスクログを配信する際にエラーが発生しないように値が 1 に設定されています。この値を 0 に設定すると最大数のプロセスを使用することになりますが、タスクログの配信時にエラーが発生する可能性があります。
|
|
scheduler.scheduler_idle_sleep_time
スケジューラーの「ループ」で DAG ファイルが連続して処理されるまでに待機する秒数。
デフォルト: 1
|
このオプションを使用すると、DAG 解析結果の取得、タスクの検索とキューへの追加、エグゼキューター でのキュー内のタスクの実行が完了した後に、スケジューラーがスリープする時間を 長くする ことで、スケジューラーの CPU 使用率を解放できます。この値を増やすと、Airflow v2 および Apache Airflow v3 の dag_processor.parsing_processes 環境で実行されるスケジューラーのスレッド数が消費されます。これにより、スケジューラーが DAG を解析する容量が減り、DAG がウェブサーバーに取り込まれるのにかかる時間が長くなる可能性があります。
|
|
scheduler.max_dagruns_to_create_per_loop
スケジューラー「ループ」ごとに作成する DagRuns の DAG の最大数。
デフォルト: 10
|
このオプションを使用すると、スケジューラーの「ループ」での DAGRun の最大数を 減らす ことで、タスクのスケジューリングに必要なリソースを解放できます。
|
|
dag_processor.parsing_processes
スケジューラーが DAG をスケジュールするために並列して実行できるスレッドの数。
デフォルト: (2 * number of vCPUs) - 1 を使用する
|
このオプションを使用すると、スケジューラーが DAG を解析するために並列して実行するプロセスの数を 減らす ことで、リソースを解放できます。DAG の解析がタスクスケジューリングに影響している場合は、この数を低く抑えることをお勧めします。環境の vCPU 数よりも小さい値を指定する 必要があります。詳細については、制限 を参照してください。
|
- Apache Airflow v2
-
| 設定 |
ユースケース |
|
celery.sync_parallelism
Celery エグゼキュターがタスクの状態を同期するために使用するプロセスの数です。
デフォルト: 1
|
このオプションを使用すると、Celery エグゼキュターが使用するプロセスを制限することでキューの競合を防ぐことができます。デフォルトでは、CloudWatch Logs にタスクログを配信する際にエラーが発生しないように値が 1 に設定されています。この値を 0 に設定すると最大数のプロセスを使用することになりますが、タスクログの配信時にエラーが発生する可能性があります。
|
|
scheduler.idle_sleep_time
スケジューラーの「ループ」で DAG ファイルが連続して処理されるまでに待機する秒数。
デフォルト: 1
|
このオプションを使用すると、DAG 解析結果の取得、タスクの検索とキューへの追加、エグゼキューター でのキュー内のタスクの実行が完了した後に、スケジューラーがスリープする時間を 長くする ことで、スケジューラーの CPU 使用率を解放できます。この値を増やすと、Airflow v2 および Apache Airflow v3 の scheduler.parsing_processes 環境で実行されるスケジューラーのスレッド数が消費されます。これにより、スケジューラーが DAG を解析する容量が減り、DAG がウェブサーバーに取り込まれるのにかかる時間が長くなる可能性があります。
|
|
scheduler.max_dagruns_to_create_per_loop
スケジューラー「ループ」ごとに作成する DagRuns の DAG の最大数。
デフォルト: 10
|
このオプションを使用すると、スケジューラーの「ループ」での DAGRun の最大数を 減らす ことで、タスクのスケジューリングに必要なリソースを解放できます。
|
|
scheduler.parsing_processes
スケジューラーが DAG をスケジュールするために並列して実行できるスレッドの数。
デフォルト: (2 * number of vCPUs) - 1 を使用する
|
このオプションを使用すると、スケジューラーが DAG を解析するために並列して実行するプロセスの数を 減らす ことで、リソースを解放できます。DAG の解析がタスクスケジューリングに影響している場合は、この数を低く抑えることをお勧めします。環境の vCPU 数よりも小さい値を指定する 必要があります。詳細については、制限 を参照してください。
|
制限
このセクションでは、スケジューラーのデフォルトパラメータを調整する際に考慮する制限について説明します。
- scheduler.parsing_processes、scheduler.max_threads (v2のみ)
-
1 つの環境クラスの vCPU ごとに 2 つのスレッドを使用できます。環境クラスのスケジューラー用に少なくとも 1 つのスレッドを予約する必要があります。タスクのスケジュールが遅れていることに気付いた場合は、環境クラス を増やす必要があるかもしれません。例えば、大規模な環境では、スケジューラ用に 4 vCPU の Fargate コンテナインスタンスがあります。つまり、他のプロセスに使用できるスレッドの 7 合計は最大数になります。つまり、2 つのスレッドに 4 つの vCPUs を乗算した値から、スケジューラー自体の 1 を引いた値です。scheduler.max_threads (v2のみ) および scheduler.parsing_processes で指定する値は、環境クラスで使用できるスレッド数 (以下を参照) を超えてはなりません。
-
mw1.small — 他のプロセスの 1 スレッド数を超えてはいけません。残りのスレッドはスケジューラー用に予約されています。
-
mw1.medium — 他のプロセスの 3 スレッド数を超えてはいけません。残りのスレッドはスケジューラー用に予約されています。
-
mw1.large — 他のプロセスの 7 スレッド数を超えてはいけません。残りのスレッドはスケジューラー用に予約されています。
DAG フォルダー
Apache Airflow スケジューラーは、環境内の DAG フォルダーを継続的にスキャンします。含まれている plugins.zip ファイル、または「airflow」インポートステートメントを含む Python (.py) ファイル。生成されたすべての Python DAG オブジェクトは DagBag に格納され、そのファイルがスケジューラーによって処理され、スケジュールする必要があるタスク (ある場合) が決定されます。DAG ファイルの解析は、そのファイルに実行可能な DAG オブジェクトが含まれているかどうかに関係なく行われます。
パラメータ
このセクションでは、DAG フォルダー (Apache Airflow v2 以降) で使用できる設定オプションとそのユースケースについて説明します。
- Apache Airflow v3
-
| 設定 |
ユースケース |
|
dag_processor.refresh_interval
DAG フォルダーをスキャンして新しいファイルを探す必要のある秒数。
デフォルト: 300 秒
|
このオプションを使用すると、DAGs フォルダーを解析する秒数を 増やす ことでリソースを解放できます。DAG フォルダーに大量のファイルがあることが原因で、total_parse_time metrics で解析時間が長くなる場合は、この値を増やすことをお勧めします。
|
|
dag_processor.min_file_process_interval
スケジューラーが DAG を解析して DAG への更新が反映されるまでの秒数。
デフォルト: 30 秒
|
このオプションを使用して、DAG を解析する前にスケジューラーが待機する秒数を 増やす ことで、リソースを解放できます。例えば、30 の値を指定すると、DAG ファイルは 30 秒ごとに解析されます。お使いの環境の CPU 使用率を下げるには、この数値を高くしておくことをお勧めします。
|
- Apache Airflow v2
-
| 設定 |
ユースケース |
|
scheduler.dag_dir_list_interval
DAG フォルダーをスキャンして新しいファイルを探す必要のある秒数。
デフォルト: 300 秒
|
このオプションを使用すると、DAGs フォルダーを解析する秒数を 増やす ことでリソースを解放できます。DAG フォルダーに大量のファイルがあることが原因で、total_parse_time metrics で解析時間が長くなる場合は、この値を増やすことをお勧めします。
|
|
scheduler.min_file_process_interval
スケジューラーが DAG を解析して DAG への更新が反映されるまでの秒数。
デフォルト: 30 秒
|
このオプションを使用して、DAG を解析する前にスケジューラーが待機する秒数を 増やす ことで、リソースを解放できます。例えば、30 の値を指定すると、DAG ファイルは 30 秒ごとに解析されます。お使いの環境の CPU 使用率を下げるには、この数値を高くしておくことをお勧めします。
|
DAG ファイル
Apache Airflow スケジューラーループの一部として、個々の DAG ファイルが解析されて DAG Python オブジェクトが抽出されます。Apache Airflow v2 以降では、スケジューラーは同時に最大数の 解析プロセス を解析します。同じファイルが再び解析される前に、scheduler.min_file_process_interval (v2) または dag_processor.min_file_process_interval (v3) で指定された秒数が経過する必要があります。
パラメータ
このセクションでは、Apache Airflow DAG ファイル (Apache Airflow v2 以降) で使用できる設定オプションとそのユースケースについて説明します。
- Apache Airflow v3
-
| 設定 |
ユースケース |
|
dag_processor.dag_file_processor_timeout
DagFileProcessor が DAG ファイルの処理をタイムアウトするまでの秒数。
デフォルト: 50 秒
|
このオプションを使用すると、DAGFileProcessor がタイムアウトするまでの時間を 増やす ことでリソースを解放できます。DAG 処理ログにタイムアウトが発生し、実行可能な DAG が読み込まれなくなる場合は、この値を増やすことをお勧めします。
|
|
core.dagbag_import_timeout
Python ファイルをインポートするまでの秒数がタイムアウトします。
デフォルト: 30 秒
|
このオプションを使用すると、Python ファイルをインポートして DAG オブジェクトを抽出する際にスケジューラーがタイムアウトになるまでにかかる時間を 増やす ことで、リソースを解放できます。このオプションはスケジューラーの「ループ」の一部として処理され、dag_processor.dag_file_processor_timeout で指定されている値よりも小さい値が含まれている必要があります。
|
|
core.min_serialized_dag_update_interval
データベース内のシリアル化された DAG が更新されるまでの最小秒数。
デフォルト: 30
|
このオプションを使用すると、データベース内のシリアル化された DAG が更新されるまでの秒数を 増やす ことで、リソースを解放できます。DAG の数が多い場合、または複雑な DAG がある場合は、この値を増やすことをおすすめします。この値を増やすと、DAG がシリアル化されるときのスケジューラーとデータベースへの負荷が軽減されます。
|
|
core.min_serialized_dag_fetch_interval
シリアル化された DAG が DAGBag に既に読み込まれているときに、データベースから再フェッチされる秒数。
デフォルト: 10
|
このオプションを使用すると、シリアル化された DAG を再フェッチする秒数を 増やす ことでリソースを解放できます。データベースの「書き込み」速度を下げるには、この値を core.min_serialized_dag_update_interval で指定した値より大きくする必要があります。この値を増やすと、DAG がシリアル化される際のウェブサーバーとデータベースの負荷が軽減されます。
|
- Apache Airflow v2
-
| 設定 |
ユースケース |
|
core.dag_file_processor_timeout
DagFileProcessor が DAG ファイルの処理をタイムアウトするまでの秒数。
デフォルト: 50 秒
|
このオプションを使用すると、DAGFileProcessor がタイムアウトするまでの時間を 増やす ことでリソースを解放できます。DAG 処理ログにタイムアウトが発生し、実行可能な DAG が読み込まれなくなる場合は、この値を増やすことをお勧めします。
|
|
core.dagbag_import_timeout
Python ファイルをインポートするまでの秒数がタイムアウトします。
デフォルト: 30 秒
|
このオプションを使用すると、Python ファイルをインポートして DAG オブジェクトを抽出する際にスケジューラーがタイムアウトになるまでにかかる時間を 増やす ことで、リソースを解放できます。このオプションはスケジューラーの「ループ」の一部として処理され、core.dag_file_processor_timeout で指定されている値よりも小さい値が含まれている必要があります。
|
|
core.min_serialized_dag_update_interval
データベース内のシリアル化された DAG が更新されるまでの最小秒数。
デフォルト: 30
|
このオプションを使用すると、データベース内のシリアル化された DAG が更新されるまでの秒数を 増やす ことで、リソースを解放できます。DAG の数が多い場合、または複雑な DAG がある場合は、この値を増やすことをおすすめします。この値を増やすと、DAG がシリアル化されるときのスケジューラーとデータベースへの負荷が軽減されます。
|
|
core.min_serialized_dag_fetch_interval
シリアル化された DAG が DAGBag に既に読み込まれているときに、データベースから再フェッチされる秒数。
デフォルト: 10
|
このオプションを使用すると、シリアル化された DAG を再フェッチする秒数を 増やす ことでリソースを解放できます。データベースの「書き込み」速度を下げるには、この値を core.min_serialized_dag_update_interval で指定した値より大きくする必要があります。この値を増やすと、DAG がシリアル化される際のウェブサーバーとデータベースの負荷が軽減されます。
|
タスク
Apache Airflow スケジューラーとワーカーはどちらもタスクのキューイングとデキューに関与します。スケジューラーは、解析済みのスケジュール設定が完了したタスクを なし ステータスから スケジュール済み ステータスに移行します。同じく Fargate のスケジューラーコンテナーで実行されているエグゼキューターは、それらのタスクをキューに入れ、そのステータスを キューで待機中 に設定します。ワーカーにキャパシティがあると、キューからタスクを取り出してステータスを 実行中 に設定し、その後、タスクが成功したか失敗したかに基づいてステータスを 成功 または 失敗 に変更します。
パラメータ
このセクションでは、Apache Airflow タスクで使用できる設定オプションとそのユースケースについて説明します。
Amazon MWAA がオーバーライドするデフォルトの設定オプションは 赤 でマークされています。
- Apache Airflow v3
-
| 設定 |
ユースケース |
|
core.parallelism
Running のステータスを持つことができるタスクインスタンスの最大数。
デフォルト: (maxWorkers * maxCeleryWorkers) / schedulers * 1.5 に基づいて動的に設定されます。
|
このオプションを使用すると、同時に実行できるタスクインスタンスの数を 増やす ことでリソースを解放できます。指定する値は、使用可能なワーカーの数にワーカーのタスク密度を掛けた値である必要があります。この値を変更するのは、多数のタスクが「実行中」または「キューで待機中」の状態で停止している場合のみにすることをおすすめします。
|
|
core.execute_tasks_new_python_interpreter
Apache Airflow が親プロセスをフォークするか、新しい Python プロセスを作成してタスクを実行するかを決定します。
デフォルト: True
|
True に設定すると、Apache Airflow はプラグインに加えた変更を、タスクを実行するために作成された新しい Python プロセスとして認識します。
|
|
celery.worker_concurrency
Amazon MWAA は、このオプションの Airflow ベースインストールをオーバーライドして、自動スケーリングコンポーネントの一部としてワーカーをスケーリングします。
デフォルト: 該当なし
|
このオプションに指定された値はすべて無視されます。
|
|
celery.worker_autoscale
ワーカー向けのタスク同時実行性。
デフォルト:
mw1.micro - 3,0 mw1.small - 5,0 mw1.medium - 10,0 mw1.large - 20,0 mw1.xlarge - 40,0 mw1.2xlarge - 80,0
|
このオプションを使用すると、ワーカーの minimum、maximum のタスク同時実行を 減らす ことでリソースを解放できます。ワーカーは、そのための十分なリソースがあるかどうかに関係なく、構成された maximum までの同時タスク受け入れます。十分なリソースがない状態でタスクをスケジュールすると、タスクはすぐに失敗します。リソースを大量に消費するタスクの場合は、タスクあたりの容量を増やすために値をデフォルトより小さくしてこの値を変更することをお勧めします。
|
- Apache Airflow v2
-
| 設定 |
ユースケース |
|
core.parallelism
Running のステータスを持つことができるタスクインスタンスの最大数。
デフォルト: (maxWorkers * maxCeleryWorkers) / schedulers * 1.5 に基づいて動的に設定されます。
|
このオプションを使用すると、同時に実行できるタスクインスタンスの数を 増やす ことでリソースを解放できます。指定する値は、使用可能なワーカーの数にワーカーのタスク密度を掛けた値である必要があります。この値を変更するのは、多数のタスクが「実行中」または「キューで待機中」の状態で停止している場合のみにすることをおすすめします。
|
|
core.dag_concurrency
各 DAG で同時に実行できるタスクインスタンスの数。
デフォルト: 10000
|
このオプションを使用すると、同時に実行できるタスクインスタンスの数を 増やす ことでリソースを解放できます。例えば、10 個の並列タスクを含む 100 個の DAG があり、すべての DAG を同時に実行したい場合、利用可能なワーカーの数に celery.worker_concurrency でのワーカータスクの密度を掛け、それを DAG の数で割って最大並列処理を計算できます。
|
|
core.execute_tasks_new_python_interpreter
Apache Airflow が親プロセスをフォークするか、新しい Python プロセスを作成してタスクを実行するかを決定します。
デフォルト: True
|
True に設定すると、Apache Airflow はプラグインに加えた変更を、タスクを実行するために作成された新しい Python プロセスとして認識します。
|
|
celery.worker_concurrency
Amazon MWAA は、このオプションの Airflow ベースインストールをオーバーライドして、自動スケーリングコンポーネントの一部としてワーカーをスケーリングします。
デフォルト: 該当なし
|
このオプションに指定された値はすべて無視されます。
|
|
celery.worker_autoscale
ワーカー向けのタスク同時実行性。
デフォルト:
mw1.micro - 3,0 mw1.small - 5,0 mw1.medium - 10,0 mw1.large - 20,0 mw1.xlarge - 40,0 mw1.2xlarge - 80,0
|
このオプションを使用すると、ワーカーの minimum、maximum のタスク同時実行を 減らす ことでリソースを解放できます。ワーカーは、そのための十分なリソースがあるかどうかに関係なく、構成された maximum までの同時タスク受け入れます。十分なリソースがない状態でタスクをスケジュールすると、タスクはすぐに失敗します。リソースを大量に消費するタスクの場合は、タスクあたりの容量を増やすために値をデフォルトより小さくしてこの値を変更することをお勧めします。
|