Amazon MWAA での Apache Airflow のパフォーマンス調整 - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon MWAA での Apache Airflow のパフォーマンス調整

このトピックでは、 を使用して Amazon Managed Workflows for Apache Airflow 環境のパフォーマンスを調整する方法について説明しますAmazon MWAA での Apache Airflow 構成オプションの使用

Apache Airflow 構成オプションの追加

次の手順を使用して、Airflow 設定オプションを環境に追加します。

  1. Amazon MWAA コンソールで、環境ページを開きます。

  2. 環境を選択します。

  3. [編集] を選択します。

  4. [次へ] を選択します。

  5. Airflow 構成オプション ペインで [カスタム構成を追加] を選択します。

  6. ドロップダウンリストから構成を選択して値を入力するか、カスタム構成を入力して値を入力します。

  7. 追加する構成ごとに [カスタム構成を追加] を選択します。

  8. [保存] を選択します。

詳細については、「Amazon MWAA での Apache Airflow 構成オプションの使用」を参照してください。

Apache Airflow スケジューラー

Apache Airflow スケジューラーは Apache Airflow のコアコンポーネントです。スケジューラーに問題があると、DAG の解析やタスクのスケジュール設定ができなくなる可能性があります。Apache Airflow スケジューラのチューニングの詳細については、Apache Airflow ドキュメントウェブサイトの「スケジューラのパフォーマンスの微調整」を参照してください。

パラメータ

このセクションでは、Apache Airflow スケジューラ (Apache Airflow v2 以降) で使用できる設定オプションとそのユースケースについて説明します。

Apache Airflow v3
設定 ユースケース

celery.sync_parallelism

Celery エグゼキュターがタスクの状態を同期するために使用するプロセスの数です。

デフォルト: 1

このオプションを使用すると、Celery エグゼキュターが使用するプロセスを制限することでキューの競合を防ぐことができます。デフォルトでは、CloudWatch Logs にタスクログを配信する際にエラーが発生しないように値が 1 に設定されています。この値を 0 に設定すると最大数のプロセスを使用することになりますが、タスクログの配信時にエラーが発生する可能性があります。

scheduler.scheduler_idle_sleep_time

スケジューラの「ループ」で連続する DAG ファイル処理の間に待機する秒数。

デフォルト: 1

このオプションを使用すると、DAG 解析結果の取得、タスクの検索とキューイング、Executor でのキューに入れられたタスクの実行が完了した後にスケジューラがスリープする時間を増やすことで、スケジューラの CPU 使用率を解放できます。この値を増やすと、 for dag_processor.parsing_processes Apache Airflow v2 および Apache Airflow v3 の 環境で実行されるスケジューラスレッドの数が消費されます。これにより、スケジューラが DAGs を解析する容量が減少し、DAGs がウェブサーバーに入力されるまでにかかる時間が長くなる可能性があります。

scheduler.max_dagruns_to_create_per_loop

スケジューラあたりの DagRuns を作成する DAGs の最大数「ループ」。

デフォルト: 10

このオプションを使用すると、スケジューラの「ループ」の DagRuns の最大数を減らすことで、タスクをスケジュールするためのリソースを解放できます。

dag_processor.parsing_processes

スケジューラが DAGs をスケジュールするために並行して実行できるスレッドの数。

デフォルト: を使用する (2 * number of vCPUs) - 1

このオプションを使用すると、スケジューラが DAG を解析するために並行して実行するプロセスの数を減らすことで、リソースを解放できます。 DAGs DAG の解析がタスクスケジューリングに影響している場合は、この数を低く抑えることをお勧めします。環境の vCPU 数よりも小さい値を指定する必要があります。詳細については、「 の制限」を参照してください。

Apache Airflow v2
設定 ユースケース

celery.sync_parallelism

Celery エグゼキュターがタスクの状態を同期するために使用するプロセスの数です。

デフォルト: 1

このオプションを使用すると、Celery エグゼキュターが使用するプロセスを制限することでキューの競合を防ぐことができます。デフォルトでは、CloudWatch Logs にタスクログを配信する際にエラーが発生しないように値が 1 に設定されています。この値を 0 に設定すると最大数のプロセスを使用することになりますが、タスクログの配信時にエラーが発生する可能性があります。

scheduler.idle_sleep_time

スケジューラの「ループ」で連続する DAG ファイル処理の間に待機する秒数。

デフォルト: 1

このオプションを使用すると、DAG 解析結果の取得、タスクの検索とキューイング、Executor でのキューに入れられたタスクの実行が完了した後にスケジューラがスリープする時間を増やすことで、スケジューラの CPU 使用率を解放できます。この値を増やすと、Apache Airflow v2 および Apache Airflow v3 scheduler.parsing_processesの 環境で実行されるスケジューラスレッドの数が消費されます。これにより、スケジューラが DAGs を解析する容量が減少し、DAGs がウェブサーバーに入力されるまでにかかる時間が長くなる可能性があります。

scheduler.max_dagruns_to_create_per_loop

スケジューラあたりの DagRuns を作成する DAGs の最大数「ループ」。

デフォルト: 10

このオプションを使用すると、スケジューラの「ループ」の DagRuns の最大数を減らすことで、タスクをスケジュールするためのリソースを解放できます。

scheduler.parsing_processes

スケジューラが DAGs をスケジュールするために並行して実行できるスレッドの数。

デフォルト: を使用する (2 * number of vCPUs) - 1

このオプションを使用すると、スケジューラが DAG を解析するために並行して実行するプロセスの数を減らすことで、リソースを解放できます。 DAGs DAG の解析がタスクスケジューリングに影響している場合は、この数を低く抑えることをお勧めします。環境の vCPU 数よりも小さい値を指定する必要があります。詳細については、「 の制限」を参照してください。

制限

このセクションでは、スケジューラのデフォルトパラメータを調整するときに考慮すべき制限について説明します。

scheduler.parsing_processes、scheduler.max_threads (v2 のみ)

1 つの環境クラスの vCPU ごとに 2 つのスレッドを使用できます。環境クラスのスケジューラー用に少なくとも 1 つのスレッドを予約する必要があります。タスクのスケジュールに遅延が見られる場合は、環境クラスを増やす必要がある場合があります。たとえば、大規模な環境では、スケジューラ用に 4 vCPU の Fargate コンテナインスタンスがあります。つまり、他のプロセスに使用できるスレッドの 7 合計は最大数になります。つまり、2 つのスレッドに 4 つの vCPUs を乗算した値から、スケジューラー自体の 1 を引いた値です。で指定する値 scheduler.max_threads (v2 のみ) と scheduler.parsing_processes は、次に示すように、環境クラスで使用できるスレッドの数を超えることはできません。

  • mw1.small — 他のプロセスの 1 スレッド数を超えてはいけません。残りのスレッドはスケジューラ用に予約されています。

  • mw1.medium — 他のプロセスの 3 スレッド数を超えてはいけません。残りのスレッドはスケジューラ用に予約されています。

  • mw1.large — 他のプロセスの 7 スレッド数を超えてはいけません。残りのスレッドはスケジューラ用に予約されています。

DAG フォルダー

Apache Airflow スケジューラは、環境の DAGsフォルダを継続的にスキャンします。含まれている plugins.zip ファイル、または「airflow」インポートステートメントを含む Python (.py) ファイル。その後、結果の Python DAG オブジェクトは DagBag に配置され、スケジューラがそのファイルを処理して、スケジュールする必要があるタスクがあるかどうかを判断します。DAG ファイルの解析は、そのファイルに実行可能な DAG オブジェクトが含まれているかどうかに関係なく行われます。

パラメータ

このセクションでは、DAGs フォルダ (Apache Airflow v2 以降) で使用できる設定オプションとそのユースケースについて説明します。

Apache Airflow v3
設定 ユースケース

dag_processor.refresh_interval

新しいファイルについて DAGs フォルダをスキャンしなければならない秒数。

デフォルト: 300 秒

このオプションを使用すると、DAGs フォルダーを解析する秒数を増やすことでリソースを解放できます。で解析時間が長い場合はtotal_parse_time metrics、この値を増やすことをお勧めします。これは、DAGsフォルダ内の多数のファイルが原因である可能性があります。

dag_processor.min_file_process_interval

スケジューラーが DAG を解析して DAG への更新が反映されるまでの秒数。

デフォルト: 30 秒

このオプションを使用して、DAG を解析する前にスケジューラーが待機する秒数を増やすことで、リソースを解放できます。たとえば、30 の値を指定すると、DAG ファイルは 30 秒ごとに解析されます。お使いの環境の CPU 使用率を下げるには、この数値を高くしておくことをお勧めします。

Apache Airflow v2
設定 ユースケース

scheduler.dag_dir_list_interval

新しいファイルについて DAGs フォルダをスキャンしなければならない秒数。

デフォルト: 300 秒

このオプションを使用すると、DAGs フォルダーを解析する秒数を増やすことでリソースを解放できます。で解析時間が長い場合はtotal_parse_time metrics、この値を増やすことをお勧めします。これは、DAGsフォルダ内の多数のファイルが原因である可能性があります。

scheduler.min_file_process_interval

スケジューラーが DAG を解析して DAG への更新が反映されるまでの秒数。

デフォルト: 30 秒

このオプションを使用して、DAG を解析する前にスケジューラーが待機する秒数を増やすことで、リソースを解放できます。たとえば、30 の値を指定すると、DAG ファイルは 30 秒ごとに解析されます。お使いの環境の CPU 使用率を下げるには、この数値を高くしておくことをお勧めします。

DAG ファイル

Apache Airflow スケジューラーループの一部として、個々の DAG ファイルが解析されて DAG Python オブジェクトが抽出されます。Apache Airflow v2 以降では、スケジューラは最大数の解析プロセスを同時に解析します。(v2) または scheduler.min_file_process_interval (dag_processor.min_file_process_intervalv3) で指定された秒数は、同じファイルが再度解析されるまで経過する必要があります。

パラメータ

このセクションでは、Apache Airflow DAG ファイル (Apache Airflow v2 以降) で使用できる設定オプションとそのユースケースについて説明します。

Apache Airflow v3
設定 ユースケース

dag_processor.dag_file_processor_timeout

DagFileProcessor が DAG ファイルの処理をタイムアウトするまでの秒数。

デフォルト: 50 秒

このオプションを使用すると、DAGFileProcessor がタイムアウトするまでの時間を増やすことでリソースを解放できます。DAG 処理ログでタイムアウトが発生し、有効な DAG がロードされない場合 DAGs は、この値を増やすことをお勧めします。

core.dagbag_import_timeout

Python ファイルをインポートするまでの秒数がタイムアウトします。

デフォルト: 30 秒

このオプションを使用すると、Python ファイルのインポート中にスケジューラがタイムアウトして DAG オブジェクトを抽出するまでの時間を長くすることで、リソースを解放できます。このオプションはスケジューラの「ループ」の一部として処理され、 で指定された値より小さい値が含まれている必要がありますdag_processor.dag_file_processor_timeout

core.min_serialized_dag_update_interval

データベース内のシリアル化された DAG が更新されるまでの最小秒数。

デフォルト: 30

このオプションを使用すると、データベース内のシリアル化された DAG が更新されるまでの秒数を増やすことで、リソースを解放できます。DAG の数が多い場合、または複雑な DAG がある場合は、この値を増やすことをおすすめします。この値を大きくすると、DAGsが軽減されます。

core.min_serialized_dag_fetch_interval

シリアル化された DAG が DAGBag に既に読み込まれているときに、データベースから再フェッチされる秒数。

デフォルト: 10

このオプションを使用すると、シリアル化された DAG を再フェッチする秒数を増やすことでリソースを解放できます。データベースの「書き込み」レートを減らすcore.min_serialized_dag_update_intervalには、 で指定された値より大きくする必要があります。この値を大きくすると、DAGsが軽減されます。

Apache Airflow v2
設定 ユースケース

core.dag_file_processor_timeout

DagFileProcessor が DAG ファイルの処理をタイムアウトするまでの秒数。

デフォルト: 50 秒

このオプションを使用すると、DAGFileProcessor がタイムアウトするまでの時間を増やすことでリソースを解放できます。DAG 処理ログでタイムアウトが発生し、有効な DAG がロードされない場合 DAGs は、この値を増やすことをお勧めします。

core.dagbag_import_timeout

Python ファイルをインポートするまでの秒数がタイムアウトします。

デフォルト: 30 秒

このオプションを使用すると、Python ファイルのインポート中にスケジューラがタイムアウトして DAG オブジェクトを抽出するまでの時間を長くすることで、リソースを解放できます。このオプションはスケジューラの「ループ」の一部として処理され、 で指定された値より小さい値が含まれている必要がありますcore.dag_file_processor_timeout

core.min_serialized_dag_update_interval

データベース内のシリアル化された DAG が更新されるまでの最小秒数。

デフォルト: 30

このオプションを使用すると、データベース内のシリアル化された DAG が更新されるまでの秒数を増やすことで、リソースを解放できます。DAG の数が多い場合、または複雑な DAG がある場合は、この値を増やすことをおすすめします。この値を大きくすると、DAGsが軽減されます。

core.min_serialized_dag_fetch_interval

シリアル化された DAG が DAGBag に既に読み込まれているときに、データベースから再フェッチされる秒数。

デフォルト: 10

このオプションを使用すると、シリアル化された DAG を再フェッチする秒数を増やすことでリソースを解放できます。データベースの「書き込み」レートを減らすcore.min_serialized_dag_update_intervalには、 で指定された値より大きくする必要があります。この値を大きくすると、DAGsが軽減されます。

タスク

Apache Airflow スケジューラーとワーカーはどちらもタスクのキューイングとデキューに関与します。スケジューラーは、解析済みのスケジュール設定が完了したタスクを [なし] ステータスから [スケジュール済み] ステータスに移行します。同じく Fargate のスケジューラーコンテナーで実行されているエグゼキューターは、それらのタスクをキューに入れ、そのステータスを [キューで待機中] に設定します。ワーカーにキャパシティがあると、キューからタスクを取り出してステータスを [実行中] に設定し、その後、タスクが成功したか失敗したかに基づいてステータスを [成功] または [失敗] に変更します。

パラメータ

このセクションでは、Apache Airflow タスクで使用できる構成オプションとそのユースケースについて説明します。

Amazon MWAA がオーバーライドするデフォルトの構成オプションはでマークされています。

Apache Airflow v3
設定 ユースケース

core.parallelism

Running ステータスを持つことができるタスクインスタンスの最大数。

デフォルト: に基づいて動的に設定されます(maxWorkers * maxCeleryWorkers) / schedulers * 1.5

このオプションを使用すると、同時に実行できるタスクインスタンスの数を増やすことでリソースを解放できます。指定する値は、使用可能なワーカーの数にワーカーのタスク密度を掛けた値である必要があります。この値は、多数のタスクが「実行中」または「キューに入れられた」状態でスタックしている場合にのみ変更することをお勧めします。

core.execute_tasks_new_python_interpreter

Apache Airflow が親プロセスをフォークするか、新しい Python プロセスを作成してタスクを実行するかを決定します。

デフォルト: True

True に設定すると、Apache Airflow はプラグインに加えた変更を、タスクを実行するために作成された新しい Python プロセスとして認識します。

celery.worker_concurrency

Amazon MWAA は、このオプションの Airflow ベースインストールを上書きして、ワーカーを自動スケーリングコンポーネントの一部としてスケーリングします。

デフォルト: 該当なし

このオプションに指定された値はすべて無視されます。

celery.worker_autoscale

ワーカーのタスク同時実行数。

デフォルト:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge - 40,0

  • mw1.2xlarge - 80,0

このオプションを使用すると、ワーカーの maximumminimumタスクの同時実行数を減らすことで、リソースを解放できます。ワーカーは、十分なリソースがあるかどうかにかかわらず、設定されたmaximum同時タスクまで受け入れます。十分なリソースがない状態でタスクをスケジュールすると、タスクはすぐに失敗します。リソースを大量に消費するタスクの場合は、タスクあたりの容量を増やすために値をデフォルトより小さくしてこの値を変更することをお勧めします。

Apache Airflow v2
設定 ユースケース

core.parallelism

Running ステータスを持つことができるタスクインスタンスの最大数。

デフォルト: に基づいて動的に設定されます(maxWorkers * maxCeleryWorkers) / schedulers * 1.5

このオプションを使用すると、同時に実行できるタスクインスタンスの数を増やすことでリソースを解放できます。指定する値は、使用可能なワーカーの数にワーカーのタスク密度を掛けた値である必要があります。この値は、多数のタスクが「実行中」または「キューに入れられた」状態でスタックしている場合にのみ変更することをお勧めします。

core.dag_concurrency

各 DAG で同時に実行できるタスクインスタンスの数。

デフォルト: 10000

このオプションを使用すると、同時に実行できるタスクインスタンスの数を増やすことでリソースを解放できます。例えば、10 個の並列タスクを持つ 100 DAGs があり、すべての DAGs を同時に実行する場合は、使用可能なワーカーの数に のワーカータスク密度を掛けた値を DAGs の数で割ってcelery.worker_concurrency最大並列処理を計算できます。

core.execute_tasks_new_python_interpreter

Apache Airflow が親プロセスをフォークするか、新しい Python プロセスを作成してタスクを実行するかを決定します。

デフォルト: True

True に設定すると、Apache Airflow はプラグインに加えた変更を、タスクを実行するために作成された新しい Python プロセスとして認識します。

celery.worker_concurrency

Amazon MWAA は、このオプションの Airflow ベースインストールを上書きして、ワーカーを自動スケーリングコンポーネントの一部としてスケーリングします。

デフォルト: 該当なし

このオプションに指定された値はすべて無視されます。

celery.worker_autoscale

ワーカーのタスク同時実行数。

デフォルト:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge - 40,0

  • mw1.2xlarge - 80,0

このオプションを使用すると、ワーカーのタスクminimum同時実行数を減らすことでmaximumリソースを解放できます。ワーカーは、十分なリソースがあるかどうかにかかわらず、設定されたmaximum同時タスクまで受け入れます。十分なリソースがない状態でタスクをスケジュールすると、タスクはすぐに失敗します。リソースを大量に消費するタスクの場合は、タスクあたりの容量を増やすために値をデフォルトより小さくしてこの値を変更することをお勧めします。