Utilizzo dell'API REST di Apache Airflow - Flussi di lavoro gestiti da Amazon per Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo dell'API REST di Apache Airflow

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) supporta l'interazione con gli ambienti Apache Airflow direttamente utilizzando l'API REST di Apache Airflow per ambienti che eseguono Apache Airflow v2.4.3 e versioni successive. Ciò consente di accedere e gestire gli ambienti Amazon MWAA in modo programmatico, fornendo un modo standardizzato per richiamare flussi di lavoro di orchestrazione dei dati, gestire e DAGs monitorare lo stato di vari componenti di Apache Airflow come il database di metadati, il trigger e lo scheduler.

Per supportare la scalabilità durante l'utilizzo dell'API REST di Apache Airflow, Amazon MWAA offre la possibilità di scalare orizzontalmente la capacità del server Web per gestire l'aumento della domanda, sia che si tratti di richieste API REST, utilizzo dell'interfaccia a riga di comando (CLI) o più utenti simultanei dell'interfaccia utente (UI) Apache Airflow. Per ulteriori informazioni su come Amazon MWAA ridimensiona i server web, consulta. Configurazione della scalabilità automatica del server web Amazon MWAA

Puoi utilizzare l'API REST di Apache Airflow per implementare i seguenti casi d'uso per i tuoi ambienti:

  • Accesso programmatico: ora puoi avviare le esecuzioni di Apache Airflow DAG, gestire set di dati e recuperare lo stato di vari componenti come il database dei metadati, i trigger e gli scheduler senza fare affidamento sull'interfaccia utente o sulla CLI di Apache Airflow.

  • Integrazione con applicazioni e microservizi esterni: il supporto dell'API REST ti consente di creare soluzioni personalizzate che integrano i tuoi ambienti Amazon MWAA con altri sistemi. Ad esempio, puoi avviare flussi di lavoro in risposta a eventi provenienti da sistemi esterni, come lavori di database completati o registrazioni di nuovi utenti.

  • Monitoraggio centralizzato: puoi creare dashboard di monitoraggio che aggregano lo stato del tuo ambiente in DAGs più ambienti Amazon MWAA, abilitando monitoraggio e gestione centralizzati.

Per ulteriori informazioni sull'API REST di Apache Airflow, consulta l'Apache Airflow REST API Reference.

UtilizzandoInvokeRestApi, è possibile accedere all'API REST di Apache Airflow utilizzando le credenziali. AWS In alternativa, puoi accedervi anche ottenendo un token di accesso al server web e quindi utilizzando il token per chiamarlo.

Se si verifica un errore nel messaggio Update your environment to use InvokeRestApi durante l'utilizzo dell'InvokeRestApioperazione, significa che è necessario aggiornare l'ambiente Amazon MWAA. Questo errore si verifica quando l'ambiente Amazon MWAA non è compatibile con le ultime modifiche relative alla InvokeRestApi funzionalità. Per risolvere questo problema, aggiorna il tuo ambiente Amazon MWAA per incorporare le modifiche necessarie alla InvokeRestApi funzionalità.

L'InvokeRestApioperazione ha una durata di timeout predefinita di 10 secondi. Se l'operazione non viene completata entro questo intervallo di tempo di 10 secondi, verrà terminata automaticamente e verrà generato un errore. Assicurati che le chiamate all'API REST siano progettate per essere completate entro questo periodo di timeout per evitare errori.

Per supportare la scalabilità durante l'utilizzo dell'API REST di Apache Airflow, Amazon MWAA offre la possibilità di scalare orizzontalmente la capacità del server Web per gestire l'aumento della domanda, sia che si tratti di richieste API REST, utilizzo dell'interfaccia a riga di comando (CLI) o più utenti simultanei dell'interfaccia utente (UI) Apache Airflow. Per ulteriori informazioni su come Amazon MWAA ridimensiona i server Web, consulta. Configurazione della scalabilità automatica del server web Amazon MWAA

Puoi utilizzare l'API REST di Apache Airflow per implementare i seguenti casi d'uso per i tuoi ambienti:

  • Accesso programmatico: ora puoi avviare le esecuzioni di Apache Airflow DAG, gestire set di dati e recuperare lo stato di vari componenti come il database dei metadati, i trigger e gli scheduler senza fare affidamento sull'interfaccia utente o sulla CLI di Apache Airflow.

  • Integrazione con applicazioni e microservizi esterni: il supporto dell'API REST ti consente di creare soluzioni personalizzate che integrano i tuoi ambienti Amazon MWAA con altri sistemi. Ad esempio, puoi avviare flussi di lavoro in risposta a eventi provenienti da sistemi esterni, come lavori di database completati o registrazioni di nuovi utenti.

  • Monitoraggio centralizzato: puoi creare dashboard di monitoraggio che aggregano lo stato del tuo ambiente in DAGs più ambienti Amazon MWAA, abilitando monitoraggio e gestione centralizzati.

Per ulteriori informazioni sull'API REST di Apache Airflow, consulta The Apache Airflow REST API Reference.

UtilizzandoInvokeRestApi, è possibile accedere all'API REST di Apache Airflow utilizzando le credenziali. AWS In alternativa, è possibile accedervi anche ottenendo un token di accesso al server Web e quindi utilizzando il token per chiamarlo.

  • Se si verifica un errore nel messaggio Update your environment to use InvokeRestApi durante l'utilizzo dell'InvokeRestApioperazione, significa che è necessario aggiornare l'ambiente Amazon MWAA. Questo errore si verifica quando l'ambiente Amazon MWAA non è compatibile con le ultime modifiche relative alla InvokeRestApi funzionalità. Per risolvere questo problema, aggiorna il tuo ambiente Amazon MWAA per incorporare le modifiche necessarie alla InvokeRestApi funzionalità.

  • L'InvokeRestApioperazione ha una durata di timeout predefinita di 10 secondi. Se l'operazione non viene completata entro questo intervallo di tempo di 10 secondi, verrà terminata automaticamente e verrà generato un errore. Assicurati che le chiamate all'API REST siano progettate per essere completate entro questo periodo di timeout per evitare errori.

Importante

La dimensione del payload di risposta non può superare i 6 MB. Il tuo RestApi fallisce se questo limite viene superato.

Utilizza i seguenti esempi per effettuare chiamate API all'API REST di Apache Airflow e avviare una nuova esecuzione DAG:

Concessione dell'accesso all'API REST di Apache Airflow: airflow:InvokeRestApi

Per accedere all'API REST di Apache Airflow AWS utilizzando le credenziali, devi concedere airflow:InvokeRestApi l'autorizzazione nella tua policy IAM. Nel seguente esempio di policy, specifica il Public ruoloAdmin,Op, UserViewer, o {airflow-role} per personalizzare il livello di accesso degli utenti. Per ulteriori informazioni, consulta Ruoli predefiniti nella guida di riferimento di Apache Airflow.

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:us-east-1:111122223333:role/{your-environment-name}/{airflow-role}" ] } ] }
Nota

Durante la configurazione di un server web privato, l'InvokeRestApiazione non può essere richiamata dall'esterno di un Virtual Private Cloud (VPC). È possibile utilizzare la aws:SourceVpc chiave per applicare un controllo di accesso più granulare per questa operazione. Per ulteriori informazioni, fare riferimento a aws: SourceVpc.

Chiamata dell'API REST di Apache Airflow

Questo script di esempio seguente illustra come utilizzare l'API REST di Apache Airflow per elencare le opzioni DAGs disponibili nell'ambiente e come creare una variabile Apache Airflow:

import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)

Creazione di un token di sessione del server Web e chiamata all'API REST di Apache Airflow

Per creare un token di accesso al server web, usa la seguente funzione Python. Questa funzione chiama innanzitutto l'API Amazon MWAA per ottenere un token di accesso Web. Il token di accesso Web, che scade dopo 60 secondi, viene quindi sostituito con un token di sessione Web, che consente di accedere al server Web e utilizzare l'API REST Apache Airflow. Se hai bisogno di più di 10 transazioni al secondo (TPS) di capacità di limitazione, puoi utilizzare questo metodo per accedere all'API REST di Apache Airflow.

Il token di sessione scade dopo 12 ore.

Suggerimento

Le principali modifiche nei seguenti esempi di codice da Apache Airflow v2 a v3 sono:

  • Il percorso dell'API REST è stato modificato da a /api/v1 /api/v2

  • Percorso di accesso modificato da /aws_maa/login a /pluginsv2/aws_mwaa/login

  • La risposta di login response.cookies["_token"] contiene informazioni sul token che è necessario utilizzare per le successive chiamate API

  • Per una chiamata all'API REST, è necessario passare jwt_token informazioni nelle intestazioni come:

    headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" }
Apache Airflow v3
def get_token_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was successful if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies['_token'] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None
Apache Airflow v2
def get_token_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was successful if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies['_token'] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

Una volta completata l'autenticazione, disponi delle credenziali per iniziare a inviare richieste agli endpoint dell'API. Nell'esempio riportato nella sezione seguente, utilizzate l'endpoint. dags/{dag_name}/dagRuns

Apache Airflow v3
def trigger_dag(region, env_name, dag_id): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_id (str): ID of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}") # Retrieve the web server hostname and token for authentication try: web_server_host_name, jwt_token = get_token_info(region, env_name) if not jwt_token: logging.error("Authentication failed, no jwt token retrieved.") return except Exception as e: logging.error(f"Error retrieving token info: {str(e)}") return # Prepare headers and payload for the request request_headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" # Good practice to include, even for GET } # sample request body input json_body = {"logical_date": "2025-09-17T14:15:00Z"} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, headers=request_headers, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_id = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_id)
Apache Airflow v2
def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)