使用 Apache Airflow REST API - Amazon Managed Workflows for Apache Airflow

使用 Apache Airflow REST API

对于运行 Apache Airflow v2.4.3 及更高版本的环境,Amazon Managed Workflows for Apache Airflow (Amazon MWAA) 支持使用 Apache Airflow REST API 直接与您的 Apache Airflow 环境进行交互。这让您能够以编程方式访问和管理 Amazon MWAA 环境,从而通过标准化调用数据编排工作流、管理 DAG 以及监控各种 Apache Airflow 组件(例如元数据数据库、触发器和调度器)的状态。

为了在使用 Apache Airflow REST API 时支持可扩展性,Amazon MWAA 提供了水平扩缩 Web 服务器容量的选项,以满足增加的需求,无论是来自 REST API 请求、命令行界面 (CLI) 的使用还是并发 Apache Airflow 用户界面 (UI) 用户数量的增加。有关 Amazon MWAA 如何扩展 Web 服务器的更多信息,请参阅 配置 Amazon MWAA Web 服务器自动扩缩

您可以使用 Apache Airflow REST API 实现环境的以下使用案例:

  • 编程访问 – 您现在可以在不依赖 Apache Airflow 用户界面或 CLI 的情况下,启动 Apache Airflow DAG 运行、管理数据集以及检索元数据数据库、触发器和调度器等各种组件的状态。

  • 与外部应用程序和微服务集成 – 由于支持 REST API,您可以构建自定义解决方案以将您的 Amazon MWAA 环境与其他系统集成。例如,您可以启动工作流以响应来自外部系统的事件,例如已完成的数据库作业或新用户注册等。

  • 集中监控 – 您可以构建监控控制面板来聚合多个 Amazon MWAA 环境中的 DAG 状态,从而实现集中监控和管理。

有关 Apache Airflow REST API 的更多信息,请参阅 Apache Airflow REST API 参考

通过使用 InvokeRestApi,您可以使用 AWS 凭证访问 Apache Airflow REST API。您也可以通过获取 Web 服务器访问令牌,然后使用该令牌进行调用的方式来访问。

如果您在使用 InvokeRestApi 操作时遇到消息 Update your environment to use InvokeRestApi 错误,则表示您需要更新 Amazon MWAA 环境。当您的 Amazon MWAA 环境与 InvokeRestApi 功能相关的最新更改不兼容时,就会发生此错误。要解决此问题,请更新您的 Amazon MWAA 环境以纳入 InvokeRestApi 功能的必要更改。

InvokeRestApi 操作的默认超时时间为 10 秒。如果操作未在这 10 秒的时间范围内完成,则会自动终止并引发错误。确保您的 REST API 调用设计为在此超时时间内完成,以避免出现错误。

为了在使用 Apache Airflow REST API 时支持可扩展性,Amazon MWAA 提供了水平扩缩 Web 服务器容量的选项,以满足增加的需求,无论是来自 REST API 请求、命令行界面 (CLI) 的使用还是并发 Apache Airflow 用户界面 (UI) 用户数量的增加。有关 Amazon MWAA 如何扩展 Web 服务器的更多信息,请参阅 配置 Amazon MWAA Web 服务器自动扩缩

您可以使用 Apache Airflow REST API 实现环境的以下使用案例:

  • 编程访问 – 您现在可以在不依赖 Apache Airflow 用户界面或 CLI 的情况下,启动 Apache Airflow DAG 运行、管理数据集以及检索元数据数据库、触发器和调度器等各种组件的状态。

  • 与外部应用程序和微服务集成 – 由于支持 REST API,您可以构建自定义解决方案以将您的 Amazon MWAA 环境与其他系统集成。例如,您可以启动工作流以响应来自外部系统的事件,例如已完成的数据库作业或新用户注册等。

  • 集中监控 – 您可以构建监控控制面板来聚合多个 Amazon MWAA 环境中的 DAG 状态,从而实现集中监控和管理。

有关 Apache Airflow REST API 的更多信息,请参阅 Apache Airflow REST API 参考

通过使用 InvokeRestApi,您可以使用 AWS 凭证访问 Apache Airflow REST API。您也可以通过获取 Web 服务器访问令牌,然后使用该令牌进行调用的方式来访问。

  • 如果您在使用 InvokeRestApi 操作时遇到消息 Update your environment to use InvokeRestApi 错误,则表示您需要更新 Amazon MWAA 环境。当您的 Amazon MWAA 环境与 InvokeRestApi 功能相关的最新更改不兼容时,就会发生此错误。要解决此问题,请更新您的 Amazon MWAA 环境以纳入 InvokeRestApi 功能的必要更改。

  • InvokeRestApi 操作的默认超时时间为 10 秒。如果操作未在这 10 秒的时间范围内完成,则会自动终止并引发错误。确保您的 REST API 调用设计为在此超时时间内完成,以避免出现错误。

重要

响应有效载荷大小不能超过 6 MB。如果超出限制,RestApi 将失败。

根据以下示例对 Apache Airflow REST API 进行 API 调用并启动新的 DAG 运行:

授予对 Apache Airflow REST API 的访问权限:airflow:InvokeRestApi

要使用 AWS 凭证访问 Apache Airflow REST API,您必须在 IAM 策略中授予 airflow:InvokeRestApi 权限。在以下策略示例中,在 {airflow-role} 中指定 AdminOpUserViewerPublic 角色以自定义用户访问权限级别。有关更多信息,请参阅《Apache Airflow 参考指南》中的默认角色

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:us-east-1:111122223333:role/{your-environment-name}/{airflow-role}" ] } ] }
注意

配置私有 Web 服务器时,无法从虚拟私有云 (VPC) 之外调用 InvokeRestApi 操作。您可以使用 aws:SourceVpc 键对此操作执行更精细的访问控制。有关更多信息,请参阅 aws:SourceVpc

调用 Apache Airflow REST API

以下示例脚本说明了如何使用 Apache Airflow REST API 列出环境中可用的 DAG 以及如何创建 Apache Airflow 变量:

import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)

创建 Web 服务器会话令牌并调用 Apache Airflow REST API

要创建 Web 服务器访问令牌,请使用以下 Python 函数。该函数会首先调用 Amazon MWAA API 来获取 Web 登录令牌。Web 登录令牌将在 60 秒后过期,然后会交换为一个 Web 会话令牌,后者可让您访问 Web 服务器和使用 Apache Airflow REST API。如果您需要每秒 10 个事务(TPS)以上的节流容量,则可以使用此方法访问 Apache Airflow REST API。

会话令牌将在 12 小时后过期。

提示

以下代码示例中从 Apache Airflow v2 到 v3 的主要变化是:

  • REST API 路径从 /api/v1 更改为 /api/v2

  • 登录路径从 /aws_maa/login 更改为 /pluginsv2/aws_mwaa/login

  • 登录 response.cookies["_token"] 的响应包含令牌信息,您必须在后续的 API 调用中使用这些信息

  • 对于 REST API 调用,您必须在标头中传递以下 jwt_token 信息:

    headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" }
Apache Airflow v3
def get_token_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was successful if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies['_token'] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None
Apache Airflow v2
def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

完成身份验证后,您将会获得开始向 API 端点发送请求的凭证。在下一部分的示例中,使用端点 dags/{dag_name}/dagRuns

Apache Airflow v3
def trigger_dag(region, env_name, dag_id): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_id (str): ID of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}") # Retrieve the web server hostname and token for authentication try: web_server_host_name, jwt_token = get_token_info(region, env_name) if not jwt_token: logging.error("Authentication failed, no jwt token retrieved.") return except Exception as e: logging.error(f"Error retrieving token info: {str(e)}") return # Prepare headers and payload for the request request_headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" # Good practice to include, even for GET } # sample request body input json_body = {"logical_date": "2025-09-17T14:15:00Z"} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, headers=request_headers, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_id = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_id)
Apache Airflow v2
def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)