Uso de la API de REST de Apache Airflow - Amazon Managed Workflows para Apache Airflow

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Uso de la API de REST de Apache Airflow

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) permite interactuar directamente con sus entornos de Apache Airflow mediante la API REST de Apache Airflow para entornos que ejecutan Apache Airflow v2.4.3 y versiones posteriores. Esto le permite acceder a sus entornos de Amazon MWAA y gestionarlos mediante programación, lo que proporciona una forma estandarizada de invocar flujos de trabajo de orquestación de datos, gestionar y supervisar el estado de varios componentes de Apache Airflow DAGs, como la base de datos de metadatos, el activador y el programador.

Para respaldar la escalabilidad al utilizar la API REST de Apache Airflow, Amazon MWAA le ofrece la opción de escalar horizontalmente la capacidad del servidor web para gestionar el aumento de la demanda, ya sea por solicitudes de API REST, por el uso de la interfaz de línea de comandos (CLI) o por más usuarios simultáneos de la interfaz de usuario (UI) de Apache Airflow. Para obtener más información sobre cómo Amazon MWAA escala los servidores web, consulte. Configuración del escalado automático del servidor web Amazon MWAA

Puede usar la API REST de Apache Airflow para implementar los siguientes casos de uso en sus entornos:

  • Acceso mediante programación: ahora puede iniciar las ejecuciones del DAG de Apache Airflow, administrar conjuntos de datos y recuperar el estado de varios componentes, como la base de datos de metadatos, los activadores y los programadores, sin depender de la interfaz de usuario o la CLI de Apache Airflow.

  • Integre con aplicaciones y microservicios externos: el soporte de la API REST lo puede utilizar para crear soluciones personalizadas que integren sus entornos de Amazon MWAA con otros sistemas. Por ejemplo, puede iniciar flujos de trabajo en respuesta a eventos de sistemas externos, como trabajos de base de datos completados o registros de nuevos usuarios.

  • Supervisión centralizada: puede crear paneles de supervisión que agreguen el estado de sus múltiples DAGs entornos de Amazon MWAA, lo que permite una supervisión y administración centralizadas.

Para obtener más información sobre la API REST de Apache Airflow, consulta la Referencia de la API REST de Apache Airflow.

Al usarloInvokeRestApi, puede acceder a la API REST de Apache Airflow mediante AWS credenciales. Como alternativa, también puede acceder a ella obteniendo un token de acceso al servidor web y, a continuación, utilizándolo para llamarlo.

Si encuentra un error en el mensaje Update your environment to use InvokeRestApi mientras utiliza la InvokeRestApi operación, indica que necesita actualizar su entorno de Amazon MWAA. Este error se produce cuando el entorno de Amazon MWAA no es compatible con los cambios más recientes relacionados con la InvokeRestApi función. Para resolver este problema, actualice su entorno Amazon MWAA para incorporar los cambios necesarios en la InvokeRestApi función.

La InvokeRestApi operación tiene un tiempo de espera predeterminado de 10 segundos. Si la operación no se completa dentro de este período de 10 segundos, finaliza automáticamente y se produce un error. Asegúrese de que las llamadas a la API REST estén diseñadas para completarse dentro de este período de tiempo de espera para evitar errores.

Para respaldar la escalabilidad al utilizar la API REST de Apache Airflow, Amazon MWAA le ofrece la opción de escalar horizontalmente la capacidad del servidor web para gestionar el aumento de la demanda, ya sea por solicitudes de API REST, por el uso de la interfaz de línea de comandos (CLI) o por más usuarios simultáneos de la interfaz de usuario (UI) de Apache Airflow. Para obtener más información sobre cómo Amazon MWAA escala los servidores web, consulte. Configuración del escalado automático del servidor web Amazon MWAA

Puede usar la API REST de Apache Airflow para implementar los siguientes casos de uso en sus entornos:

  • Acceso mediante programación: ahora puede iniciar las ejecuciones del DAG de Apache Airflow, administrar conjuntos de datos y recuperar el estado de varios componentes, como la base de datos de metadatos, los activadores y los programadores, sin depender de la interfaz de usuario o la CLI de Apache Airflow.

  • Integre con aplicaciones y microservicios externos: el soporte de la API REST lo puede utilizar para crear soluciones personalizadas que integren sus entornos de Amazon MWAA con otros sistemas. Por ejemplo, puede iniciar flujos de trabajo en respuesta a eventos de sistemas externos, como trabajos de base de datos completados o registros de nuevos usuarios.

  • Supervisión centralizada: puede crear paneles de supervisión que agreguen el estado de sus múltiples DAGs entornos de Amazon MWAA, lo que permite una supervisión y administración centralizadas.

Para obtener más información sobre la API REST de Apache Airflow, consulte La referencia de la API REST de Apache Airflow.

Al usarloInvokeRestApi, puede acceder a la API REST de Apache Airflow mediante AWS credenciales. Como alternativa, también puede acceder a ella si obtiene un token de acceso al servidor web y, a continuación, lo utiliza para llamarlo.

  • Si encuentra un error en el mensaje Update your environment to use InvokeRestApi mientras utiliza la InvokeRestApi operación, indica que necesita actualizar su entorno de Amazon MWAA. Este error se produce cuando el entorno de Amazon MWAA no es compatible con los cambios más recientes relacionados con la InvokeRestApi función. Para resolver este problema, actualice su entorno Amazon MWAA para incorporar los cambios necesarios en la InvokeRestApi función.

  • La InvokeRestApi operación tiene un tiempo de espera predeterminado de 10 segundos. Si la operación no se completa dentro de este período de 10 segundos, finaliza automáticamente y se produce un error. Asegúrese de que las llamadas a la API REST estén diseñadas para completarse dentro de este período de tiempo de espera para evitar errores.

importante

El tamaño de la carga útil de la respuesta no puede superar los 6 MB. Si RestApi se supera este límite, se produce un error.

Utilice los siguientes ejemplos para realizar llamadas de API a la API REST de Apache Airflow e iniciar una nueva ejecución de DAG:

Concesión de acceso a la API de REST de Apache Airflow: airflow:InvokeRestApi

Para acceder a la API REST de Apache Airflow con AWS credenciales, debe conceder el airflow:InvokeRestApi permiso en su política de IAM. En el siguiente ejemplo de política, especifique elAdmin, Op UserViewer, o el Public rol {airflow-role} para personalizar el nivel de acceso de los usuarios. Para obtener más información, consulte Funciones predeterminadas en la guía de referencia de Apache Airflow.

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:us-east-1:111122223333:role/{your-environment-name}/{airflow-role}" ] } ] }
nota

Al configurar un servidor web privado, la InvokeRestApi acción no se puede invocar desde fuera de una Nube Privada Virtual (VPC). Puede utilizar la clave aws:SourceVpc para aplicar un control de acceso más detallado para esta operación. Para obtener más información, consulte aws:. SourceVpc

Llamado a la API de REST de Apache Airflow

El siguiente script de ejemplo explica cómo usar la API REST de Apache Airflow para enumerar las variables disponibles DAGs en su entorno y cómo crear una variable de Apache Airflow:

import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)

Crear un token de sesión de servidor web y llamar a la API REST de Apache Airflow

Para crear un token de acceso al servidor web, utilice la siguiente función de Python. Esta función primero llama a la API de Amazon MWAA para obtener un token de inicio de sesión web. El token de inicio de sesión web, que caduca a los 60 segundos, se cambia por un token de sesión web, que le permite acceder al servidor web y utilizar la API REST de Apache Airflow. Si necesita más de 10 transacciones por segundo (TPS) de capacidad de limitación, puede usar este método para acceder a la API de REST de Apache Airflow.

El token de sesión caduca después de 12 horas.

sugerencia

Los cambios clave en los siguientes ejemplos de código de Apache Airflow v2 a v3 son:

  • La ruta de la API REST cambió de a /api/v1 /api/v2

  • La ruta de inicio de sesión cambió de /aws_maa/login a /pluginsv2/aws_mwaa/login

  • La respuesta del inicio de sesión response.cookies["_token"] contiene información simbólica que debes usar para las siguientes llamadas a la API

  • Para una llamada a la API de REST, debes pasar jwt_token información en los encabezados de la siguiente manera:

    headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" }
Apache Airflow v3
def get_token_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was successful if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies['_token'] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None
Apache Airflow v2
def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

Una vez completada la autenticación, dispondrá de las credenciales para empezar a enviar solicitudes a los puntos finales de la API. En el ejemplo de la siguiente sección, usa el punto finaldags/{dag_name}/dagRuns.

Apache Airflow v3
def trigger_dag(region, env_name, dag_id): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_id (str): ID of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}") # Retrieve the web server hostname and token for authentication try: web_server_host_name, jwt_token = get_token_info(region, env_name) if not jwt_token: logging.error("Authentication failed, no jwt token retrieved.") return except Exception as e: logging.error(f"Error retrieving token info: {str(e)}") return # Prepare headers and payload for the request request_headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" # Good practice to include, even for GET } # sample request body input json_body = {"logical_date": "2025-09-17T14:15:00Z"} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, headers=request_headers, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_id = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_id)
Apache Airflow v2
def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)