Uso de la API de REST de Apache Airflow - Amazon Managed Workflows para Apache Airflow

Uso de la API de REST de Apache Airflow

Amazon Managed Workflows para Apache Airflow (Amazon MWAA) permite interactuar directamente con los entornos de Apache Airflow mediante la API de REST de Apache Airflow para entornos que ejecutan Apache Airflow v2.4.3 y versiones posteriores. Esto le permite acceder a sus entornos de Amazon MWAA y administrarlos mediante programación, lo que proporciona una forma estandarizada de invocar flujos de trabajo de orquestación de datos, gestionar sus DAG y supervisar el estado de varios componentes de Apache Airflow, como la base de datos de metadatos, el activador y el programador.

Para tener escalabilidad cuando se usa la API de REST de Apache Airflow, Amazon MWAA ofrece la opción de escalar horizontalmente la capacidad del servidor web para gestionar el aumento de la demanda, ya sea de solicitudes de API de REST, uso de la interfaz de la línea de comandos (CLI) o más usuarios simultáneos de la interfaz de usuario (UI) de Apache Airflow. Para obtener más información sobre cómo Amazon MWAA escala los servidores web, consulte Configuración del escalado automático del servidor web de Amazon MWAA.

Puede usar la API de REST de Apache Airflow para implementar los siguientes casos de uso en sus entornos:

  • Acceso mediante programación: ahora puede iniciar las ejecuciones del DAG de Apache Airflow, administrar conjuntos de datos y recuperar el estado de varios componentes, como la base de datos de metadatos, los activadores y los programadores, sin depender de la interfaz de usuario o la CLI de Apache Airflow.

  • Integración con aplicaciones y microservicios externos: la compatibilidad con la API de REST permite crear soluciones personalizadas que integran sus entornos de Amazon MWAA con otros sistemas. Por ejemplo, puede iniciar flujos de trabajo en respuesta a eventos de sistemas externos, como trabajos de base de datos completados o registros de nuevos usuarios.

  • Supervisión centralizada: puede crear paneles de supervisión que agreguen el estado de sus DAG en varios entornos de Amazon MWAA, lo que permite una supervisión y una administración centralizadas.

Para obtener más información sobre la API de REST de Apache Airflow, consulte la referencia de la API de REST de Apache Airflow.

Usando InvokeRestApi, puede acceder a la API de REST de Apache Airflow con credenciales de AWS. Como alternativa, también puede acceder a ella si obtiene un token de acceso al servidor web y lo usa para llamarlo.

Si obtiene un error con el mensaje Update your environment to use InvokeRestApi mientras usa la operación InvokeRestApi, significa que necesita actualizar su entorno de Amazon MWAA. Este error se produce cuando el entorno de Amazon MWAA no es compatible con los cambios más recientes relacionados con la característica InvokeRestApi. Para resolver este problema, actualice el entorno de Amazon MWAA para incorporar los cambios necesarios en la característica InvokeRestApi.

El tiempo de espera predeterminado para la operación InvokeRestApi de 10 segundos. Si la operación no se completa dentro de ese lapso, finaliza automáticamente y se produce un error. Procure que las llamadas a la API de REST estén diseñadas para completarse dentro de este período de espera para evitar errores.

Para tener escalabilidad cuando se usa la API de REST de Apache Airflow, Amazon MWAA ofrece la opción de escalar horizontalmente la capacidad del servidor web para gestionar el aumento de la demanda, ya sea de solicitudes de API de REST, uso de la interfaz de la línea de comandos (CLI) o más usuarios simultáneos de la interfaz de usuario (UI) de Apache Airflow. Para obtener más información sobre cómo Amazon MWAA escala los servidores web, consulte Configuración del escalado automático del servidor web de Amazon MWAA.

Puede usar la API de REST de Apache Airflow para implementar los siguientes casos de uso en sus entornos:

  • Acceso mediante programación: ahora puede iniciar las ejecuciones del DAG de Apache Airflow, administrar conjuntos de datos y recuperar el estado de varios componentes, como la base de datos de metadatos, los activadores y los programadores, sin depender de la interfaz de usuario o la CLI de Apache Airflow.

  • Integración con aplicaciones y microservicios externos: la compatibilidad con la API de REST permite crear soluciones personalizadas que integran sus entornos de Amazon MWAA con otros sistemas. Por ejemplo, puede iniciar flujos de trabajo en respuesta a eventos de sistemas externos, como trabajos de base de datos completados o registros de nuevos usuarios.

  • Supervisión centralizada: puede crear paneles de supervisión que agreguen el estado de sus DAG en varios entornos de Amazon MWAA, lo que permite una supervisión y una administración centralizadas.

Para obtener más información sobre la API de REST de Apache Airflow, consulte la referencia de la API de REST de Apache Airflow.

Usando InvokeRestApi, puede acceder a la API de REST de Apache Airflow con credenciales de AWS. Como alternativa, también puede acceder a ella si obtiene un token de acceso al servidor web y, a continuación, lo utiliza para llamarlo.

  • Si obtiene un error con el mensaje Update your environment to use InvokeRestApi mientras usa la operación InvokeRestApi, significa que necesita actualizar su entorno de Amazon MWAA. Este error se produce cuando el entorno de Amazon MWAA no es compatible con los cambios más recientes relacionados con la característica InvokeRestApi. Para resolver este problema, actualice el entorno de Amazon MWAA para incorporar los cambios necesarios en la característica InvokeRestApi.

  • El tiempo de espera predeterminado para la operación InvokeRestApi de 10 segundos. Si la operación no se completa dentro de ese lapso, finaliza automáticamente y se produce un error. Procure que las llamadas a la API de REST estén diseñadas para completarse dentro de este período de espera para evitar errores.

importante

El tamaño de la carga útil de respuesta no puede ser superior a 6 MB. Si RestApi se ha superado este límite, se produce un error.

En los siguientes ejemplos, se muestra cómo realizar llamadas a la API de REST de Apache Airflow e iniciar una nueva ejecución del DAG:

Concesión de acceso a la API de REST de Apache Airflow: airflow:InvokeRestApi

Para acceder a la API de REST de Apache Airflow con credenciales de AWS, debe otorgar el permiso de airflow:InvokeRestApi en su política de IAM. En el siguiente ejemplo de política, especifique el rol de Admin, Op, User, Viewer o Public en {airflow-role} para personalizar el nivel de acceso del usuario. Para obtener más información, consulte la sección Roles predeterminados en la guía de referencia de Apache Airflow.

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:us-east-1:111122223333:role/{your-environment-name}/{airflow-role}" ] } ] }
nota

Mientras configura un servidor web privado, la acción InvokeRestApi no se puede invocar desde fuera de una nube privada virtual (VPC). Puede utilizar la clave aws:SourceVpc para aplicar un control de acceso más detallado para esta operación. Para obtener más información, consulte aws:SourceVpc.

Llamado a la API de REST de Apache Airflow

En el siguiente script de ejemplo, se explica cómo utilizar la API de REST de Apache Airflow para enumerar los DAG disponibles en su entorno y cómo crear una variable de Apache Airflow:

import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)

Creación de un token de sesión de servidor web y llamada a la API de REST de Apache Airflow

Para crear un token de acceso al servidor web, use la siguiente función de Python. Esta función primero llama a la API de Amazon MWAA para obtener un token de inicio de sesión web. El token de inicio de sesión web, que caduca después de 60 segundos, se cambia luego por un token de sesión web, que le permite acceder al servidor web y usar la API de REST de Apache Airflow. Si necesita más de 10 transacciones por segundo (TPS) de capacidad de limitación, puede usar este método para acceder a la API de REST de Apache Airflow.

El token de sesión caduca después de 12 horas.

sugerencia

A continuación, se enumeran los cambios clave en los siguientes ejemplos de código de Apache Airflow v2 a v3:

  • La ruta de la API de REST cambió de /api/v1 a /api/v2

  • La ruta de inicio de sesión cambió de /aws_maa/login a /pluginsv2/aws_mwaa/login

  • La respuesta del inicio de sesión response.cookies["_token"] contiene información del token que debe usar para las siguientes llamadas a la API

  • Para una llamada a la API de REST, debe pasar información de jwt_token en los encabezados de la siguiente manera:

    headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" }
Apache Airflow v3
def get_token_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was successful if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies['_token'] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None
Apache Airflow v2
def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

Una vez finalizada la autenticación, dispondrá de las credenciales para empezar a enviar solicitudes a los puntos de conexión de la API. En el ejemplo de la siguiente sección, use el punto de conexión dags/{dag_name}/dagRuns.

Apache Airflow v3
def trigger_dag(region, env_name, dag_id): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_id (str): ID of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}") # Retrieve the web server hostname and token for authentication try: web_server_host_name, jwt_token = get_token_info(region, env_name) if not jwt_token: logging.error("Authentication failed, no jwt token retrieved.") return except Exception as e: logging.error(f"Error retrieving token info: {str(e)}") return # Prepare headers and payload for the request request_headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" # Good practice to include, even for GET } # sample request body input json_body = {"logical_date": "2025-09-17T14:15:00Z"} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, headers=request_headers, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_id = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_id)
Apache Airflow v2
def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)