Verwenden der Apache Airflow REST API - Von Amazon verwaltete Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden der Apache Airflow REST API

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) unterstützt die direkte Interaktion mit Ihren Apache Airflow-Umgebungen mithilfe der Apache Airflow Airflow-REST-API für Umgebungen, in denen Apache Airflow v2.4.3 und höher ausgeführt wird. Auf diese Weise können Sie programmgesteuert auf Ihre Amazon MWAA-Umgebungen zugreifen und diese verwalten. Dies bietet eine standardisierte Möglichkeit, Datenorchestrierungs-Workflows aufzurufen, Ihre verschiedenen Apache Airflow Airflow-Komponenten wie die Metadatendatenbank DAGs, den Trigger und den Scheduler zu verwalten und deren Status zu überwachen.

Um die Skalierbarkeit bei der Verwendung der Apache Airflow REST-API zu unterstützen, bietet Ihnen Amazon MWAA die Möglichkeit, die Webserverkapazität horizontal zu skalieren, um den erhöhten Bedarf zu bewältigen, sei es durch REST-API-Anfragen, die Verwendung der Befehlszeilenschnittstelle (CLI) oder mehr gleichzeitige Benutzer der Apache Airflow Airflow-Benutzeroberfläche (UI). Weitere Informationen zur Skalierung von Webservern durch Amazon MWAA finden Sie unter. Konfiguration der automatischen Skalierung des Amazon MWAA-Webservers

Sie können die Apache Airflow REST API verwenden, um die folgenden Anwendungsfälle für Ihre Umgebungen zu implementieren:

  • Programmatischer Zugriff — Sie können jetzt Apache Airflow DAG-Läufe starten, Datensätze verwalten und den Status verschiedener Komponenten wie der Metadatendatenbank, Trigger und Scheduler abrufen, ohne sich auf die Apache Airflow Airflow-Benutzeroberfläche oder CLI verlassen zu müssen.

  • Integration mit externen Anwendungen und Microservices — REST-API-Unterstützung, mit der Sie maßgeschneiderte Lösungen erstellen können, die Ihre Amazon MWAA-Umgebungen in andere Systeme integrieren. Sie können Workflows beispielsweise als Reaktion auf Ereignisse aus externen Systemen starten, wie z. B. abgeschlossene Datenbankaufträge oder neue Benutzeranmeldungen.

  • Zentralisierte Überwachung — Sie können Überwachungs-Dashboards erstellen, die den Status Ihrer DAGs verschiedenen Amazon MWAA-Umgebungen zusammenfassen und so eine zentrale Überwachung und Verwaltung ermöglichen.

Weitere Informationen zur Apache Airflow REST API finden Sie in der Apache Airflow REST API-Referenz.

Mit Hilfe InvokeRestApi von AWS Anmeldeinformationen können Sie auf die Apache Airflow REST API zugreifen. Alternativ können Sie auch darauf zugreifen, indem Sie ein Webserver-Zugriffstoken abrufen und es dann mit dem Token aufrufen.

Wenn Update your environment to use InvokeRestApi bei der Verwendung des InvokeRestApi Vorgangs ein Fehler mit der Meldung auftritt, bedeutet dies, dass Sie Ihre Amazon MWAA-Umgebung aktualisieren müssen. Dieser Fehler tritt auf, wenn Ihre Amazon MWAA-Umgebung nicht mit den neuesten Änderungen im Zusammenhang mit der Funktion kompatibel ist. InvokeRestApi Um dieses Problem zu beheben, aktualisieren Sie Ihre Amazon MWAA-Umgebung, um die erforderlichen Änderungen für die InvokeRestApi Funktion zu übernehmen.

Der InvokeRestApi Vorgang hat eine standardmäßige Timeoutdauer von 10 Sekunden. Wenn der Vorgang nicht innerhalb dieses Zeitrahmens von 10 Sekunden abgeschlossen wird, wird er automatisch beendet und es wird ein Fehler gemeldet. Stellen Sie sicher, dass Ihre REST-API-Aufrufe so konzipiert sind, dass sie innerhalb dieses Zeitlimits abgeschlossen werden, um Fehler zu vermeiden.

Um die Skalierbarkeit bei der Verwendung der Apache Airflow REST-API zu unterstützen, bietet Ihnen Amazon MWAA die Möglichkeit, die Webserverkapazität horizontal zu skalieren, um den erhöhten Bedarf zu bewältigen, sei es durch REST-API-Anfragen, die Verwendung der Befehlszeilenschnittstelle (CLI) oder mehr gleichzeitige Benutzer der Apache Airflow Airflow-Benutzeroberfläche (UI). Weitere Informationen darüber, wie Amazon MWAA Webserver skaliert, finden Sie unter. Konfiguration der automatischen Skalierung des Amazon MWAA-Webservers

Sie können die Apache Airflow REST API verwenden, um die folgenden Anwendungsfälle für Ihre Umgebungen zu implementieren:

  • Programmatischer Zugriff — Sie können jetzt Apache Airflow DAG-Läufe starten, Datensätze verwalten und den Status verschiedener Komponenten wie der Metadatendatenbank, Trigger und Scheduler abrufen, ohne sich auf die Apache Airflow Airflow-Benutzeroberfläche oder CLI verlassen zu müssen.

  • Integration mit externen Anwendungen und Microservices — REST-API-Unterstützung, mit der Sie maßgeschneiderte Lösungen erstellen können, die Ihre Amazon MWAA-Umgebungen in andere Systeme integrieren. Sie können Workflows beispielsweise als Reaktion auf Ereignisse aus externen Systemen starten, wie z. B. abgeschlossene Datenbankaufträge oder neue Benutzeranmeldungen.

  • Zentralisierte Überwachung — Sie können Überwachungs-Dashboards erstellen, die den Status Ihrer DAGs verschiedenen Amazon MWAA-Umgebungen zusammenfassen und so eine zentrale Überwachung und Verwaltung ermöglichen.

Weitere Informationen zur Apache Airflow REST API finden Sie in The Apache Airflow REST API Reference.

Mit Hilfe InvokeRestApi von AWS Anmeldeinformationen können Sie auf die Apache Airflow REST API zugreifen. Alternativ können Sie auch darauf zugreifen, indem Sie ein Webserver-Zugriffstoken abrufen und es dann mit dem Token aufrufen.

  • Wenn Update your environment to use InvokeRestApi bei der Verwendung des InvokeRestApi Vorgangs ein Fehler mit der Meldung auftritt, bedeutet dies, dass Sie Ihre Amazon MWAA-Umgebung aktualisieren müssen. Dieser Fehler tritt auf, wenn Ihre Amazon MWAA-Umgebung nicht mit den neuesten Änderungen im Zusammenhang mit der Funktion kompatibel ist. InvokeRestApi Um dieses Problem zu beheben, aktualisieren Sie Ihre Amazon MWAA-Umgebung, um die erforderlichen Änderungen für die InvokeRestApi Funktion zu übernehmen.

  • Der InvokeRestApi Vorgang hat eine standardmäßige Timeoutdauer von 10 Sekunden. Wenn der Vorgang nicht innerhalb dieses Zeitrahmens von 10 Sekunden abgeschlossen wird, wird er automatisch beendet und es wird ein Fehler gemeldet. Stellen Sie sicher, dass Ihre REST-API-Aufrufe so konzipiert sind, dass sie innerhalb dieses Zeitlimits abgeschlossen werden, um Fehler zu vermeiden.

Wichtig

Die Größe der Antwort-Payload darf 6 MB nicht überschreiten. Sie RestApi schlagen fehl, wenn dieses Limit überschritten wird.

Verwenden Sie die folgenden Beispiele, um API-Aufrufe an die Apache Airflow REST API zu tätigen und eine neue DAG-Ausführung zu starten:

Zugriff auf die Apache Airflow REST API gewähren: airflow:InvokeRestApi

Um mithilfe von AWS Anmeldeinformationen auf die Apache Airflow REST-API zuzugreifen, müssen Sie die airflow:InvokeRestApi Erlaubnis in Ihrer IAM-Richtlinie erteilen. Geben Sie im folgenden Richtlinienbeispiel die Public RolleAdmin,, OpUser, oder anViewer, {airflow-role} um die Ebene des Benutzerzugriffs anzupassen. Weitere Informationen finden Sie unter Standardrollen im Apache Airflow Airflow-Referenzhandbuch.

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:us-east-1:111122223333:role/{your-environment-name}/{airflow-role}" ] } ] }
Anmerkung

Bei der Konfiguration eines privaten Webservers kann die InvokeRestApi Aktion nicht von außerhalb einer Virtual Private Cloud (VPC) aufgerufen werden. Sie können den aws:SourceVpc Schlüssel verwenden, um eine detailliertere Zugriffskontrolle für diesen Vorgang anzuwenden. Weitere Informationen finden Sie unter aws: SourceVpc.

Aufrufen der Apache Airflow REST API

Das folgende Beispielskript beschreibt, wie Sie die Apache Airflow REST API verwenden, um die DAGs in Ihrer Umgebung verfügbaren Optionen aufzulisten und eine Apache Airflow Airflow-Variable zu erstellen:

import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)

Erstellen eines Webserver-Sitzungstoken und Aufrufen der Apache Airflow REST API

Verwenden Sie die folgende Python-Funktion, um ein Webserver-Zugriffstoken zu erstellen. Diese Funktion ruft zunächst die Amazon MWAA-API auf, um ein Web-Login-Token zu erhalten. Das Web-Login-Token, das nach 60 Sekunden abläuft, wird dann gegen ein Web-Session-Token ausgetauscht, mit dem Sie auf den Webserver zugreifen und die Apache Airflow REST API verwenden können. Wenn Sie mehr als 10 Transaktionen pro Sekunde (TPS) an Drosselungskapazität benötigen, können Sie mit dieser Methode auf die Apache Airflow REST API zugreifen.

Das Sitzungstoken läuft nach 12 Stunden ab.

Tipp

Die wichtigsten Änderungen in den folgenden Codebeispielen von Apache Airflow v2 zu v3 sind:

  • Der REST-API-Pfad wurde von /api/v1 zu geändert /api/v2

  • Der Anmeldepfad wurde von /aws_maa/login zu geändert /pluginsv2/aws_mwaa/login

  • Die Antwort von der Anmeldung response.cookies["_token"] enthält Token-Informationen, die Sie für nachfolgende API-Aufrufe verwenden müssen

  • Für einen REST-API-Aufruf müssen Sie jwt_token Informationen in Headern wie folgt übergeben:

    headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" }
Apache Airflow v3
def get_token_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was successful if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies['_token'] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None
Apache Airflow v2
def get_session_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was succesfull if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies["session"] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

Nach Abschluss der Authentifizierung verfügen Sie über die Anmeldeinformationen, um mit dem Senden von Anfragen an die API-Endpunkte zu beginnen. Verwenden Sie im Beispiel im folgenden Abschnitt den Endpunktdags/{dag_name}/dagRuns.

Apache Airflow v3
def trigger_dag(region, env_name, dag_id): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_id (str): ID of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}") # Retrieve the web server hostname and token for authentication try: web_server_host_name, jwt_token = get_token_info(region, env_name) if not jwt_token: logging.error("Authentication failed, no jwt token retrieved.") return except Exception as e: logging.error(f"Error retrieving token info: {str(e)}") return # Prepare headers and payload for the request request_headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" # Good practice to include, even for GET } # sample request body input json_body = {"logical_date": "2025-09-17T14:15:00Z"} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, headers=request_headers, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_id = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_id)
Apache Airflow v2
def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)