View a markdown version of this page

Amazon MWAA 上的 Apache Airflow 的性能调整 - Amazon Managed Workflows for Apache Airflow

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Amazon MWAA 上的 Apache Airflow 的性能调整

本主题介绍如何使用 在 Amazon MWAA 上使用 Apache Airflow 配置选项 调整 Amazon MWAA 环境的性能。

添加 Apache Airflow 配置选项。

执行以下过程将 Airflow 配置选项添加到环境中。

  1. 在 Amazon MWAA 控制台上打开环境页面

  2. 选择环境。

  3. 选择编辑

  4. 选择下一步

  5. Airflow 配置选项窗格中选择添加自定义配置。

  6. 从下拉列表中选择配置并输入值,或者输入自定义配置并输入值。

  7. 为每个您想要添加的配置选择添加自定义配置选项

  8. 选择保存

要了解更多信息,请参阅 在 Amazon MWAA 上使用 Apache Airflow 配置选项

Apache Airflow 计划程序

Apache Airflow 计划程序是 Apache Airflow 的核心组件。计划程序出现问题可能会导致无法解析 DAG 且无法调度任务。有关 Apache Airflow 调度程序调整的更多信息,请参阅 Fine-tuning Apache Airflow 文档网站上的调度器性能。

参数

本节介绍可用于 Apache Airflow 计划程序(Apache Airflow v2 和更高版本)的配置选项及其使用案例。

Apache Airflow v3
配置 使用案例

celery.sync_parallelism

Celery 执行程序用于同步任务状态的进程数。

默认值:1

您可以使用此选项通过限制 Celery 执行程序使用的进程来防止队列冲突。默认情况下,将值设置为,1以防止在将任务日志传送到 CloudWatch 日志时出错。将该值设为 0 意味着使用最大进程数,但在传送任务日志时可能会导致错误。

scheduler.scheduler_idle_sleep_time

在计划程序“循环”中连续处理 DAG 文件之间等待的秒数。

默认值:1

在计划程序检索 DAG 解析结果、查找任务和对任务进行排队以及在执行程序中执行排队任务后,您可以使用此选项通过延长计划程序的休眠时间来释放计划程序上的 CPU 使用率。增加此值会消耗在 Apache Airflow v2 和 Apache Airflow v3 的 dag_processor.parsing_processes 的环境中运行的计划程序线程数。这可能会降低计划程序解析 DAG 的容量,并增加 DAG 出现在 Web 服务器上的时间。

scheduler.max_dagruns_to_create_per_loop

为每个调度程序 “循环” 创建DagRuns的最大 DAG 数。

默认值:10

您可以使用此选项通过减少调度程序 “循环” 的最大数量来腾出资源DagRuns用于调度任务。

dag_processor.parsing_processes

计划程序可以并行运行以调度 DAG 的线程数。

默认:使用 (2 * number of vCPUs) - 1

您可以使用此选项通过减少计划程序并行运行解析 DAG 的进程数来释放资源。如果 DAG 解析影响任务调度,我们建议将此数量保持在较低水平。您必须指定一个小于环境中 vCPU 计数的值。要了解更多信息,请参阅限制

Apache Airflow v2
配置 使用案例

celery.sync_parallelism

Celery 执行程序用于同步任务状态的进程数。

默认值:1

您可以使用此选项通过限制 Celery 执行程序使用的进程来防止队列冲突。默认情况下,将值设置为,1以防止在将任务日志传送到 CloudWatch 日志时出错。将该值设为 0 意味着使用最大进程数,但在传送任务日志时可能会导致错误。

scheduler.idle_sleep_time

在计划程序“循环”中连续处理 DAG 文件之间等待的秒数。

默认值:1

在计划程序检索 DAG 解析结果、查找任务和对任务进行排队以及在执行程序中执行排队任务后,您可以使用此选项通过延长计划程序的休眠时间来释放计划程序上的 CPU 使用率。增加此值会消耗在 Apache Airflow v2 和 Apache Airflow v3 的 scheduler.parsing_processes 的环境中运行的计划程序线程数。这可能会降低计划程序解析 DAG 的容量,并增加 DAG 出现在 Web 服务器上的时间。

scheduler.max_dagruns_to_create_per_loop

为每个调度程序 “循环” 创建DagRuns的最大 DAG 数。

默认值:10

您可以使用此选项通过减少调度程序 “循环” 的最大数量来腾出资源DagRuns用于调度任务。

scheduler.parsing_processes

计划程序可以并行运行以调度 DAG 的线程数。

默认:使用 (2 * number of vCPUs) - 1

您可以使用此选项通过减少计划程序并行运行解析 DAG 的进程数来释放资源。如果 DAG 解析影响任务调度,我们建议将此数量保持在较低水平。您必须指定一个小于环境中 vCPU 计数的值。要了解更多信息,请参阅限制

限制

本节介绍调整计划程序的默认参数时要考虑的限制。

scheduler.parsing_processes、scheduler.max_threads(仅限于 v2)

对于环境类,每个 vCPU 允许使用两个线程。必须为环境类的计划程序保留至少一个线程。如果您发现任务计划出现延迟,则可能需要增加环境类。例如,大型环境为其计划程序设有一个 4 vCPU 的 Fargate 容器实例。这意味着可供其他进程使用的 7 个线程总数的上限。也就是说,两个线程乘以四个 vCPU,计划程序本身减一。您在 scheduler.max_threads(仅限于 v2)和 scheduler.parsing_processes 中指定的值不得超过环境类的可用线程数,如下所列:

  • mw1.small — 其他进程不得超过 1 个线程。剩下的线程是为计划程序保留的。

  • mw1.medium — 其他进程不得超过 3 个线程。剩下的线程是为计划程序保留的。

  • mw1.large — 其他进程不得超过 7 个线程。剩下的线程是为计划程序保留的。

DAG 文件夹

Apache Airflow 计划程序会持续扫描环境中的 DAG 文件夹。任何包含的 plugins.zip 文件,或包含“Airflow”导入语句的 Python (.py) 文件。然后,所有生成的 Python DAG 对象都将放入该文件中,由调度器处理,以确定需要安排哪些任务(如果有)。DagBag无论文件是否包含任何可行的 DAG 对象,都会进行 DAG 文件解析。

参数

本节介绍可用于 DAG 文件夹的配置选项(Apache Airflow v2 和更高版本)及其使用案例。

Apache Airflow v3
配置 使用案例

dag_processor.refresh_interval

必须扫描 DAG 文件夹以查找新文件的秒数。

默认值:300 秒

您可以使用此选项通过增加解析 DAG 文件夹的秒数来释放资源。如果您发现 total_parse_time metrics 中的解析时间较长(这可能是由于 DAG 文件夹中有大量文件所致),建议您加大此值。

dag_processor.min_file_process_interval

反映计划程序解析 DAG 并更新 DAG 之后的秒数。

默认值:30 秒

您可以使用此选项通过增加计划程序在解析 DAG 之前等待的秒数来释放资源。例如,如果指定 30 的值,则将每 30 秒解析一次 DAG 文件。我们建议将此秒数保持在较高水平,以减小环境上的 CPU 使用率。

Apache Airflow v2
配置 使用案例

scheduler.dag_dir_list_interval

必须扫描 DAG 文件夹以查找新文件的秒数。

默认值:300 秒

您可以使用此选项通过增加解析 DAG 文件夹的秒数来释放资源。如果您发现 total_parse_time metrics 中的解析时间较长(这可能是由于 DAG 文件夹中有大量文件所致),建议您加大此值。

scheduler.min_file_process_interval

反映计划程序解析 DAG 并更新 DAG 之后的秒数。

默认值:30 秒

您可以使用此选项通过增加计划程序在解析 DAG 之前等待的秒数来释放资源。例如,如果指定 30 的值,则将每 30 秒解析一次 DAG 文件。我们建议将此秒数保持在较高水平,以减小环境上的 CPU 使用率。

DAG 文件

作为 Apache Airflow 计划程序循环的一部分,将解析单个 DAG 文件以提取 DAG Python 对象。在 Apache Airflow v2 及更高版本中,计划程序可以同时解析最大数量的解析进程scheduler.min_file_process_interval (v2) 或 dag_processor.min_file_process_interval (v3) 中指定的秒数必须流逝后,才能再次解析同一个文件。

参数

本节介绍可用于 Apache Airflow DAG 文件的配置选项(Apache Airflow v2 和更高版本)及其使用案例。

Apache Airflow v3
配置 使用案例

dag_processor.dag_file_processor_timeout

处理 DAG 文件超DagFileProcessor时之前的秒数。

默认值:50 秒

您可以使用此选项通过增加超时之前所需的时间来释放资源。DagFileProcessor如果您在 DAG 处理日志中遇到超时导致无法加载可行的 DAG,我们建议您增大此值。

core.dagbag_import_timeout

导入 Python 文件之前的秒数超时。

默认值:30 秒

在导入 Python 文件来提取 DAG 对象的同时延长计划程序超时之前所需的时间,您可以使用此选项来释放资源。此选项作为计划程序“循环”的一部分进行处理,并且必须包含一个小于 dag_processor.dag_file_processor_timeout 中指定值的值。

core.min_serialized_dag_update_interval

更新数据库中序列化的 DAG 之后的最小秒数。

默认值:30

通过增加更新数据库中序列化的 DAG 之后的秒数,您可以使用此选项来释放资源。如果您有大量 DAG 或 DAG 复杂,我们建议您增加此值。当 DAG 被序列化时,增大此值可减少计划程序和数据库的负载。

core.min_serialized_dag_fetch_interval

序列化的 DAG 已加载到数据库中时从数据库中重新提取的秒数。 DagBag

默认值:10

通过增加序列化的 DAG 重新提取的秒数,您可以使用此选项来释放资源。该值必须大于 core.min_serialized_dag_update_interval 中指定的值才能降低数据库的“写入”速率。当 DAG 被序列化时,增大此值可减少 Web 服务器和数据库的负载。

Apache Airflow v2
配置 使用案例

core.dag_file_processor_timeout

处理 DAG 文件超DagFileProcessor时之前的秒数。

默认值:50 秒

您可以使用此选项通过增加超时之前所需的时间来释放资源。DagFileProcessor如果您在 DAG 处理日志中遇到超时导致无法加载可行的 DAG,我们建议您增大此值。

core.dagbag_import_timeout

导入 Python 文件之前的秒数超时。

默认值:30 秒

在导入 Python 文件来提取 DAG 对象的同时延长计划程序超时之前所需的时间,您可以使用此选项来释放资源。此选项作为计划程序“循环”的一部分进行处理,并且必须包含一个小于 core.dag_file_processor_timeout 中指定值的值。

core.min_serialized_dag_update_interval

更新数据库中序列化的 DAG 之后的最小秒数。

默认值:30

通过增加更新数据库中序列化的 DAG 之后的秒数,您可以使用此选项来释放资源。如果您有大量 DAG 或 DAG 复杂,我们建议您增加此值。当 DAG 被序列化时,增大此值可减少计划程序和数据库的负载。

core.min_serialized_dag_fetch_interval

序列化的 DAG 已加载到数据库中时从数据库中重新提取的秒数。 DagBag

默认值:10

通过增加序列化的 DAG 重新提取的秒数,您可以使用此选项来释放资源。该值必须大于 core.min_serialized_dag_update_interval 中指定的值才能降低数据库的“写入”速率。当 DAG 被序列化时,增大此值可减少 Web 服务器和数据库的负载。

任务

Apache Airflow 计划程序和工作线程都参与排队和出队任务。计划程序将已解析的准备调度的任务从状态变为已计划状态。也在 Fargate 的计划程序容器上运行的执行程序,对这些任务进行排队并将其状态设置为已排队。当工作线程有容量时,它会从队列中提取任务并将状态设置为正在运行,然后根据任务成功还是失败将其状态更改为成功失败

参数

本节介绍可用于 Apache Airflow 任务的配置选项及其用例。

标记 Amazon MWAA 覆盖的默认配置选项。red

Apache Airflow v3
配置 使用案例

core.parallelism

状态为 Running 的最大任务实例数。

默认值:基于 (maxWorkers * maxCeleryWorkers) / schedulers * 1.5 动态设置。

通过增加可以同时运行的任务实例数,您可以使用此选项来释放资源。指定的值必须为可用工作线程数乘以工作线程任务密度。建议您仅在有大量任务处于“正在运行”或“已排队”状态时才更改此值。

core.execute_tasks_new_python_interpreter

确定 Apache Airflow 是通过分叉父进程还是通过创建新的 Python 进程来执行任务。

默认值True

设置为 True 时,Apache Airflow 会将您对插件所做的更改识别为为执行任务而创建的新 Python 进程。

celery.worker_concurrency

Amazon MWAA 会覆盖此选项的 Airflow 基础版安装,以作为自动扩缩组件的一部分扩缩工作线程。

默认值:不适用

Any value specified for this option is ignored.

celery.worker_autoscale

工作线程的任务并发度。

默认值:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge – 40,0

  • mw1.2xlarge – 80,0

通过降低工作线程的 minimummaximum 任务并发度,您可以使用此选项来释放资源。无论是否有足够的资源,工作人员最多可以接受配置的maximum并发任务。如果在没有足够资源的情况下调度任务,则任务会立即失败。我们建议为资源密集型任务更改此值,方法是将该值减少到小于默认值,以允许每个任务有更多容量。

Apache Airflow v2
配置 使用案例

core.parallelism

状态为 Running 的最大任务实例数。

默认值:基于 (maxWorkers * maxCeleryWorkers) / schedulers * 1.5 动态设置。

通过增加可以同时运行的任务实例数,您可以使用此选项来释放资源。指定的值必须为可用工作线程数乘以工作线程任务密度。建议您仅在有大量任务处于“正在运行”或“已排队”状态时才更改此值。

core.dag_concurrency

允许为每个 DAG 同时运行的任务实例数。

默认值:10000

通过增加可以并发运行的任务实例数,您可以使用此选项来释放资源。例如,如果您有一百个 DAG 和十个并行任务,并且您希望所有 DAG 并发运行,则可以将最大并行度计算为可用工作线程数乘以 celery.worker_concurrency 中的工作线程任务密度,除以 DAG 数量。

core.execute_tasks_new_python_interpreter

确定 Apache Airflow 是通过分叉父进程还是通过创建新的 Python 进程来执行任务。

默认值True

设置为 True 时,Apache Airflow 会将您对插件所做的更改识别为为执行任务而创建的新 Python 进程。

celery.worker_concurrency

Amazon MWAA 会覆盖此选项的 Airflow 基础版安装,以作为自动扩缩组件的一部分扩缩工作线程。

默认值:不适用

Any value specified for this option is ignored.

celery.worker_autoscale

工作线程的任务并发度。

默认值:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge – 40,0

  • mw1.2xlarge – 80,0

通过降低工作线程的 minimummaximum 任务并发度,您可以使用此选项来释放资源。无论是否有足够的资源,工作人员最多可以接受配置的maximum并发任务。如果在没有足够资源的情况下调度任务,则任务会立即失败。我们建议为资源密集型任务更改此值,方法是将该值减少到小于默认值,以允许每个任务有更多容量。