在 Amazon MWAA 上使用 Apache Airflow 配置选项
Apache Airflow 配置选项可以作为环境变量附加到 Amazon MWAA 环境中。您可以从建议的下拉列表中进行选择,也可以在 Amazon MWAA 控制台上为 Apache Airflow 版本指定自定义配置选项。本主题介绍可用的 Apache Airflow 配置选项,以及如何使用这些选项来覆盖环境中的 Apache Airflow 配置设置。
目录
先决条件
在完成本页上的步骤之前,您需要具备以下条件。
-
权限 — 您的 AWS 账户 必须已获得管理员授权,访问适用于环境的 AmazonMWAAFullConsoleAccess 访问控制策略。此外,执行角色必须允许 Amazon MWAA 环境访问环境所使用的 AWS 资源。
-
访问权限 — 如果您需要访问公共存储库以便直接在 Web 服务器上安装依赖项,则必须将环境配置为具有公共网络 Web 服务器访问权限。有关更多信息,请参阅Apache Airflow 访问模式。
-
Amazon S3 配置 — 用于存储 DAG 的 Amazon S3 存储桶、在
plugins.zip中的自定义插件和在requirements.txt中的 Python 依赖项必须配置为已阻止公共访问和已启用版本控制。
工作方式
创建环境时,Amazon MWAA 会将您在 Amazon MWAA 控制台的 Airflow 配置选项中指定的配置设置作为环境变量附加到环境 AWS Fargate 容器中。如果您在 airflow.cfg 中使用同名设置,则您在 Amazon MWAA 控制台上指定的选项将覆盖 airflow.cfg 中的值。
虽然默认情况下我们不会在 Amazon MWAA 环境的 Apache Airflow UI 中公开 airflow.cfg,但您可以直接在 Amazon MWAA 控制台上更改 Apache Airflow 配置选项,然后通过设置 webserver.expose_config 来公开配置。
使用配置选项加载插件
默认情况下,在 Apache Airflow v2 和更高版本中,使用 core.lazy_load_plugins : True 设置将插件配置为“延迟”加载。如果您使用自定义插件,则必须添加 core.lazy_load_plugins : False 为 Apache Airflow 配置选项,以便在每个 Airflow 流程开始时加载插件,从而覆盖默认设置。
配置选项概述
当您在 Amazon MWAA 控制台上添加配置时,Amazon MWAA 会将该配置作为环境变量写入。
-
列出的选项。您可以在下拉列表中从适用于 Apache Airflow 的版本中选择一项配置设置。例如
dag_concurrency:16。配置设置将环境的 Fargate 容器转化为AIRFLOW__CORE__DAG_CONCURRENCY : 16 -
自定义选项。您还可以指定未在下拉列表中列出的 Apache Airflow 版本的 Airflow 配置选项。例如
foo.user:YOUR_USER_NAME。配置设置将环境的 Fargate 容器转化为AIRFLOW__FOO__USER : YOUR_USER_NAME
Apache Airflow 配置选项
下图显示了您可以在 Amazon MWAA 控制台上自定义 Apache Airflow 配置选项的位置。
Apache Airflow 参考
有关 Apache Airflow 支持的配置选项列表,请参阅《Apache Airflow 参考指南》中的配置参考
使用 Amazon MWAA 控制台
以下过程将指导您完成将 Airflow 配置选项添加到环境中的步骤。
-
在 Amazon MWAA 控制台上打开环境页面
。 -
选择环境。
-
选择编辑。
-
选择下一步。
-
在 Airflow 配置选项窗格中选择添加自定义配置。
-
从下拉列表中选择配置并输入值,或者输入自定义配置并输入值。
-
为每个您想要添加的配置选择添加自定义配置选项。
-
选择保存。
配置参考
下一节包含 Amazon MWAA 控制台下拉列表中可用的 Apache Airflow 配置列表。
电子邮件配置
以下列表显示了 Amazon MWAA 上可用于 Apache Airflow v2 和 v3 的 Airflow 电子邮件通知配置选项。
我们建议对 SMTP 流量使用端口 587。默认情况下,AWS 会阻止所有 Amazon EC2 实例的端口 25 上的出站 SMTP 流量。要在端口 25 上发送出站流量,可请求移除此限制
| Airflow 配置选项 | 描述 | 示例值 |
|---|---|---|
|
email.email_backend |
Apache Airflow 实用工具用于在 email_backend |
airflow.utils.email.send_email_smtp |
|
smtp.smtp_host |
在 smtp_host |
localhost |
|
smtp.smtp_starttls |
在 smtp_startttls |
False |
|
smtp.smtp_sll |
安全套接字层(SSL)用于连接 smtp_ssl |
True |
|
smtp.smtp_port |
在 smtp_port |
587 |
|
smtp.smtp_mail_from |
smtp_mail_from |
myemail@domain.com |
任务配置数
以下列表显示了 Amazon MWAA 上可用于 Apache Airflow v2 和 v3 的 Airflow 任务下拉列表中可用的配置。
| Airflow 配置选项 | 描述 | 示例值 |
|---|---|---|
|
core.default_task_ret |
在 default_task_retries |
3 |
|
core.parallelism |
可以在整个环境中并行运行的最大任务实例数(并行 |
40 |
计划程序配置数
以下列表显示了 Amazon MWAA 上可用于 Apache Airflow v2 和 v3 的下拉列表中可用的 Apache Airflow 计划程序配置。
| Airflow 配置选项 | 描述 | 示例值 |
|---|---|---|
|
scheduler.catchup_by_default |
告诉计划程序创建 DAG 运行以“赶上”在 catchup_by_default |
False |
|
scheduler.scheduler_zombie_task_task_ 注意在 Apache Airflow v3 中不可用。 |
告诉计划程序是否将任务实例标记为失败并在 scheduler_zombie_task_treshold |
300 |
工作线程配置数
以下列表显示了 Amazon MWAA 上可用于 Apache Airflow v2 和 v3 的下拉列表中可用的 Airflow 工作线程配置。
| Airflow 配置选项 | 描述 | 示例值 |
|---|---|---|
|
celery.worker_autoscale |
在 worker_autoscale |
16,12 |
Web 服务器配置
以下列表显示了 Amazon MWAA 上可用于 Apache Airflow v2 和 v3 的下拉列表中可用的 Apache Airflow Web 服务器配置。
| Airflow 配置选项 | 描述 | 示例值 |
|---|---|---|
|
网络服务器.default_ui_timezone 注意在 Apache Airflow v3 中不可用。 |
在 default_ui_timezone 注意设置 |
America/New_York |
触发器配置
以下列表显示了在 Amazon MWAA 上可用于 Apache Airflow v2 和 v3 的 Apache Airflow 触发器
| Airflow 配置选项 | 描述 | 示例值 |
|---|---|---|
|
mwaa.triggerer_enabled |
用于在 Amazon MWAA 上激活和停用触发器。默认情况下,该值设置为 |
True |
|
triggerer.default_capacity(在 v2 中) trigger.capacity(在 v3 中) |
定义每个触发器可以并行运行的触发器数量。在 Amazon MWAA 上,此容量是按每个触发器和每个计划程序设置的,因为这两个组件并排运行。对于 small、medium、large、xlarge 和 2xlarge实例,每个调度器的默认值分别设置为 |
125 |
示例和示例代码
示例 DAG
您可以使用以下 DAG 来打印 email_backend Apache Airflow 配置选项。要运行以响应 Amazon MWAA 事件,请将代码复制到 Amazon S3 存储桶上环境的 DAG 文件夹。
from airflow.decorators import dag from datetime import datetime def print_var(**kwargs): email_backend = kwargs['conf'].get(section='email', key='email_backend') print("email_backend") return email_backend @dag( dag_id="print_env_variable_example", schedule_interval=None, start_date=datetime(yyyy,m,d), catchup=False, ) def print_variable_dag(): email_backend_test = PythonOperator( task_id="email_backend_test", python_callable=print_var, provide_context=True ) print_variable_test = print_variable_dag()
示例电子邮件通知设置
以下 Apache Airflow 配置选项可用于使用应用程序密码的 Gmail.com 电子邮件帐户。有关更多信息,请参阅《Gmail 帮助参考指南》中的使用应用程序密码登录
接下来做什么?
-
要了解如何将 DAG 文件夹上传到 Amazon S3 存储桶,请参阅 添加或更新 DAG。