

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

# Verwenden der Apache Airflow REST API
<a name="access-mwaa-apache-airflow-rest-api"></a>

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) unterstützt die direkte Interaktion mit Ihren Apache Airflow-Umgebungen mithilfe der Apache Airflow Airflow-REST-API für Umgebungen, in denen Apache Airflow v2.4.3 und höher ausgeführt wird. Auf diese Weise können Sie programmgesteuert auf Ihre Amazon MWAA-Umgebungen zugreifen und diese verwalten. Dies bietet eine standardisierte Möglichkeit, Datenorchestrierungs-Workflows aufzurufen, Ihre verschiedenen Apache Airflow Airflow-Komponenten wie die Metadatendatenbank DAGs, den Trigger und den Scheduler zu verwalten und deren Status zu überwachen.

Um die Skalierbarkeit bei der Verwendung der Apache Airflow REST-API zu unterstützen, bietet Ihnen Amazon MWAA die Möglichkeit, die Webserverkapazität horizontal zu skalieren, um den erhöhten Bedarf zu bewältigen, sei es durch REST-API-Anfragen, die Verwendung der Befehlszeilenschnittstelle (CLI) oder mehr gleichzeitige Benutzer der Apache Airflow Airflow-Benutzeroberfläche (UI). Weitere Informationen zur Skalierung von Webservern durch Amazon MWAA finden Sie unter. [Konfiguration der automatischen Skalierung des Amazon MWAA-Webservers](mwaa-web-server-autoscaling.md)

Sie können die Apache Airflow REST API verwenden, um die folgenden Anwendungsfälle für Ihre Umgebungen zu implementieren:
+ **Programmatischer Zugriff** — Sie können jetzt Apache Airflow DAG-Läufe starten, Datensätze verwalten und den Status verschiedener Komponenten wie der Metadatendatenbank, Trigger und Scheduler abrufen, ohne sich auf die Apache Airflow Airflow-Benutzeroberfläche oder CLI verlassen zu müssen.
+ **Integration mit externen Anwendungen und Microservices** — REST-API-Unterstützung, mit der Sie maßgeschneiderte Lösungen erstellen können, die Ihre Amazon MWAA-Umgebungen in andere Systeme integrieren. Sie können Workflows beispielsweise als Reaktion auf Ereignisse aus externen Systemen starten, wie z. B. abgeschlossene Datenbankaufträge oder neue Benutzeranmeldungen.
+ **Zentralisierte Überwachung** — Sie können Überwachungs-Dashboards erstellen, die den Status Ihrer DAGs verschiedenen Amazon MWAA-Umgebungen zusammenfassen und so eine zentrale Überwachung und Verwaltung ermöglichen.

Weitere Informationen zur Apache Airflow REST API finden Sie in der [Apache Airflow REST API-Referenz](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html).

Mit Hilfe `InvokeRestApi` von AWS Anmeldeinformationen können Sie auf die Apache Airflow REST API zugreifen. Alternativ können Sie auch darauf zugreifen, indem Sie ein Webserver-Zugriffstoken abrufen und es dann mit dem Token aufrufen.

Wenn `Update your environment to use InvokeRestApi` bei der Verwendung des `InvokeRestApi` Vorgangs ein Fehler mit der Meldung auftritt, bedeutet dies, dass Sie Ihre Amazon MWAA-Umgebung aktualisieren müssen. Dieser Fehler tritt auf, wenn Ihre Amazon MWAA-Umgebung nicht mit den neuesten Änderungen im Zusammenhang mit der Funktion kompatibel ist. `InvokeRestApi` Um dieses Problem zu beheben, aktualisieren Sie Ihre Amazon MWAA-Umgebung, um die erforderlichen Änderungen für die `InvokeRestApi` Funktion zu übernehmen.

Der `InvokeRestApi` Vorgang hat eine standardmäßige Timeoutdauer von 10 Sekunden. Wenn der Vorgang nicht innerhalb dieses Zeitrahmens von 10 Sekunden abgeschlossen wird, wird er automatisch beendet und es wird ein Fehler gemeldet. Stellen Sie sicher, dass Ihre REST-API-Aufrufe so konzipiert sind, dass sie innerhalb dieses Zeitlimits abgeschlossen werden, um Fehler zu vermeiden.

Um die Skalierbarkeit bei der Verwendung der Apache Airflow REST-API zu unterstützen, bietet Ihnen Amazon MWAA die Möglichkeit, die Webserverkapazität horizontal zu skalieren, um den erhöhten Bedarf zu bewältigen, sei es durch REST-API-Anfragen, die Verwendung der Befehlszeilenschnittstelle (CLI) oder mehr gleichzeitige Benutzer der Apache Airflow Airflow-Benutzeroberfläche (UI). Weitere Informationen darüber, wie Amazon MWAA Webserver skaliert, finden Sie unter. [Konfiguration der automatischen Skalierung des Amazon MWAA-Webservers](mwaa-web-server-autoscaling.md)

Sie können die Apache Airflow REST API verwenden, um die folgenden Anwendungsfälle für Ihre Umgebungen zu implementieren:
+ **Programmatischer Zugriff** — Sie können jetzt Apache Airflow DAG-Läufe starten, Datensätze verwalten und den Status verschiedener Komponenten wie der Metadatendatenbank, Trigger und Scheduler abrufen, ohne sich auf die Apache Airflow Airflow-Benutzeroberfläche oder CLI verlassen zu müssen.
+ **Integration mit externen Anwendungen und Microservices** — REST-API-Unterstützung, mit der Sie maßgeschneiderte Lösungen erstellen können, die Ihre Amazon MWAA-Umgebungen in andere Systeme integrieren. Sie können Workflows beispielsweise als Reaktion auf Ereignisse aus externen Systemen starten, wie z. B. abgeschlossene Datenbankaufträge oder neue Benutzeranmeldungen.
+ **Zentralisierte Überwachung** — Sie können Überwachungs-Dashboards erstellen, die den Status Ihrer DAGs verschiedenen Amazon MWAA-Umgebungen zusammenfassen und so eine zentrale Überwachung und Verwaltung ermöglichen.

Weitere Informationen zur Apache Airflow REST API finden Sie in [The Apache Airflow REST API](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html) Reference.

Mit Hilfe `InvokeRestApi` von AWS Anmeldeinformationen können Sie auf die Apache Airflow REST API zugreifen. Alternativ können Sie auch darauf zugreifen, indem Sie ein Webserver-Zugriffstoken abrufen und es dann mit dem Token aufrufen.
+ Wenn `Update your environment to use InvokeRestApi` bei der Verwendung des `InvokeRestApi` Vorgangs ein Fehler mit der Meldung auftritt, bedeutet dies, dass Sie Ihre Amazon MWAA-Umgebung aktualisieren müssen. Dieser Fehler tritt auf, wenn Ihre Amazon MWAA-Umgebung nicht mit den neuesten Änderungen im Zusammenhang mit der Funktion kompatibel ist. `InvokeRestApi` Um dieses Problem zu beheben, aktualisieren Sie Ihre Amazon MWAA-Umgebung, um die erforderlichen Änderungen für die `InvokeRestApi` Funktion zu übernehmen.
+ Der `InvokeRestApi` Vorgang hat eine standardmäßige Timeoutdauer von 10 Sekunden. Wenn der Vorgang nicht innerhalb dieses Zeitrahmens von 10 Sekunden abgeschlossen wird, wird er automatisch beendet und es wird ein Fehler gemeldet. Stellen Sie sicher, dass Ihre REST-API-Aufrufe so konzipiert sind, dass sie innerhalb dieses Zeitlimits abgeschlossen werden, um Fehler zu vermeiden.

**Wichtig**  
Die Größe der Antwort-Payload darf 6 MB nicht überschreiten. Sie `RestApi` schlagen fehl, wenn dieses Limit überschritten wird.

Verwenden Sie die folgenden Beispiele, um API-Aufrufe an die Apache Airflow REST API zu tätigen und eine neue DAG-Ausführung zu starten:

**Topics**
+ [Zugriff auf die Apache Airflow REST API gewähren: `airflow:InvokeRestApi`](#granting-access-MWAA-Enhanced-REST-API)
+ [Aufrufen der Apache Airflow REST API](#listing-DAGs-creating-variables-using-restapi-script)
+ [Erstellen eines Webserver-Sitzungstoken und Aufrufen der Apache Airflow REST API](#create-web-server-session-token)

## Zugriff auf die Apache Airflow REST API gewähren: `airflow:InvokeRestApi`
<a name="granting-access-MWAA-Enhanced-REST-API"></a>

Um mithilfe von AWS Anmeldeinformationen auf die Apache Airflow REST-API zuzugreifen, müssen Sie die `airflow:InvokeRestApi` Erlaubnis in Ihrer IAM-Richtlinie erteilen. Geben Sie im folgenden Richtlinienbeispiel die `Public` Rolle`Admin`,, `Op``User`, oder an`Viewer`, `{airflow-role}` um die Ebene des Benutzerzugriffs anzupassen. Weitere Informationen finden Sie unter [Standardrollen](https://airflow.apache.org/docs/apache-airflow/1.10.6/security.html?highlight=ldap#default-roles) im *Apache Airflow Airflow-Referenzhandbuch.*

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "AllowMwaaRestApiAccess",
            "Effect": "Allow",
            "Action": "airflow:InvokeRestApi",
            "Resource": [
            "arn:aws:airflow:us-east-1:111122223333:role/{your-environment-name}/{airflow-role}"
            ]
        }
    ]
}
```

------

**Anmerkung**  
Bei der Konfiguration eines privaten Webservers kann die `InvokeRestApi` Aktion nicht von außerhalb einer Virtual Private Cloud (VPC) aufgerufen werden. Sie können den `aws:SourceVpc` Schlüssel verwenden, um eine detailliertere Zugriffskontrolle für diesen Vorgang anzuwenden. Weitere Informationen finden Sie unter [aws: SourceVpc](https://docs.aws.amazon.com//IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcevpc).

## Aufrufen der Apache Airflow REST API
<a name="listing-DAGs-creating-variables-using-restapi-script"></a>

Das folgende Beispielskript beschreibt, wie Sie die Apache Airflow REST API verwenden, um die DAGs in Ihrer Umgebung verfügbaren Optionen aufzulisten und eine Apache Airflow Airflow-Variable zu erstellen:

```
import boto3
			
  env_name = "MyAirflowEnvironment"
			
  def list_dags(client):
    request_params = {
      "Name": env_name,
      "Path": "/dags",
      "Method": "GET",
      "QueryParameters": {
        "paused": False
      }
    }
  response = client.invoke_rest_api(
    **request_params
  )
			
  print("Airflow REST API response: ", response['RestApiResponse'])
			
  def create_variable(client):
    request_params = {
      "Name": env_name,
      "Path": "/variables",
      "Method": "POST",
      "Body": {
        "key": "test-restapi-key",
        "value": "test-restapi-value",
        "description": "Test variable created by MWAA InvokeRestApi API",
      }
    }
  response = client.invoke_rest_api(
    **request_params
  )
			
  print("Airflow REST API response: ", response['RestApiResponse'])
			
  if __name__ == "__main__":
    client = boto3.client("mwaa")
    list_dags(client)
    create_variable(client)
```

## Erstellen eines Webserver-Sitzungstoken und Aufrufen der Apache Airflow REST API
<a name="create-web-server-session-token"></a>

Verwenden Sie die folgende Python-Funktion, um ein Webserver-Zugriffstoken zu erstellen. Diese Funktion ruft zunächst die Amazon MWAA-API auf, um ein Web-Login-Token zu erhalten. Das Web-Login-Token, das nach 60 Sekunden abläuft, wird dann gegen ein *Web-Session-Token* ausgetauscht, mit dem Sie auf den Webserver zugreifen und die Apache Airflow REST API verwenden können. Wenn Sie mehr als 10 Transaktionen pro Sekunde (TPS) an Drosselungskapazität benötigen, können Sie mit dieser Methode auf die Apache Airflow REST API zugreifen.

Das Sitzungstoken läuft nach 12 Stunden ab.

**Tipp**  
Die wichtigsten Änderungen in den folgenden Codebeispielen von Apache Airflow v2 zu v3 sind:  
Der REST-API-Pfad wurde von `/api/v1` zu geändert `/api/v2`
Der Anmeldepfad wurde von `/aws_maa/login` zu geändert `/pluginsv2/aws_mwaa/login`
Die Antwort von der Anmeldung `response.cookies["_token"]` enthält Token-Informationen, die Sie für nachfolgende API-Aufrufe verwenden müssen
Für einen REST-API-Aufruf müssen Sie `jwt_token` Informationen in Headern wie folgt übergeben:  

  ```
  headers = {
    "Authorization": f"Bearer {jwt_token}",
    "Content-Type": "application/json"
  }
  ```

------
#### [ Apache Airflow v3 ]

```
def get_token_info(region, env_name):
  logging.basicConfig(level=logging.INFO)
						
  try:
    # Initialize MWAA client and request a web login token
    mwaa = boto3.client('mwaa', region_name=region)
    response = mwaa.create_web_login_token(Name=env_name)
						
    # Extract the web server hostname and login token
    web_server_host_name = response["WebServerHostname"]
    web_token = response["WebToken"]
						
    # Construct the URL needed for authentication 
    login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login"
    login_payload = {"token": web_token}
						
    # Make a POST request to the MWAA login url using the login payload
    response = requests.post(
      login_url,
      data=login_payload,
      timeout=10
    )
						
    # Check if login was successful
    if response.status_code == 200:
						
    # Return the hostname and the session cookie 
    return (
      web_server_host_name,
      response.cookies['_token']
    )
    else:
      # Log an error
      logging.error("Failed to log in: HTTP %d", response.status_code)
      return None
      except requests.RequestException as e:
      
      # Log any exceptions raised during the request to the MWAA login endpoint
      logging.error("Request failed: %s", str(e))
      return None
      except Exception as e:
      
      # Log any other unexpected exceptions
      logging.error("An unexpected error occurred: %s", str(e))
      return None
```

------
#### [ Apache Airflow v2 ]

```
def get_session_info(region, env_name):
  logging.basicConfig(level=logging.INFO)

  try:
      # Initialize MWAA client and request a web login token
      mwaa = boto3.client('mwaa', region_name=region)
      response = mwaa.create_web_login_token(Name=env_name)
      
      # Extract the web server hostname and login token
      web_server_host_name = response["WebServerHostname"]
      web_token = response["WebToken"]
      
      # Construct the URL needed for authentication 
      login_url = f"https://{web_server_host_name}/aws_mwaa/login"
      login_payload = {"token": web_token}

      # Make a POST request to the MWAA login url using the login payload
      response = requests.post(
          login_url,
          data=login_payload,
          timeout=10
      )

      # Check if login was succesfull 
      if response.status_code == 200:
      
          # Return the hostname and the session cookie 
          return (
              web_server_host_name,
              response.cookies["session"]
          )
      else:
          # Log an error
          logging.error("Failed to log in: HTTP %d", response.status_code)
          return None
  except requests.RequestException as e:
       # Log any exceptions raised during the request to the MWAA login endpoint
      logging.error("Request failed: %s", str(e))
      return None
  except Exception as e:
      # Log any other unexpected exceptions
      logging.error("An unexpected error occurred: %s", str(e))
      return None
```

------

Nach Abschluss der Authentifizierung verfügen Sie über die Anmeldeinformationen, um mit dem Senden von Anfragen an die API-Endpunkte zu beginnen. Verwenden Sie im Beispiel im folgenden Abschnitt den Endpunkt`dags/{dag_name}/dagRuns`.

------
#### [ Apache Airflow v3 ]

```
def trigger_dag(region, env_name, dag_id):
						"""
						Triggers a DAG in a specified MWAA environment using the Airflow REST API.
						
						Args:
						region (str): AWS region where the MWAA environment is hosted.
						env_name (str): Name of the MWAA environment.
						dag_id (str): ID of the DAG to trigger.
						"""
						
						logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}")
						
						# Retrieve the web server hostname and token for authentication
						try:
						web_server_host_name, jwt_token = get_token_info(region, env_name)
						if not jwt_token:
						logging.error("Authentication failed, no jwt token retrieved.")
						return
						except Exception as e:
						logging.error(f"Error retrieving token info: {str(e)}")
						return
						
						# Prepare headers and payload for the request
						request_headers = {
						"Authorization": f"Bearer {jwt_token}",
						"Content-Type": "application/json" # Good practice to include, even for GET
						}
						
						# sample request body input
						json_body = {"logical_date": "2025-09-17T14:15:00Z"}
						
						# Construct the URL for triggering the DAG
						url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns"
						
						# Send the POST request to trigger the DAG
						try:
						response = requests.post(url, headers=request_headers, json=json_body)
						# Check the response status code to determine if the DAG was triggered successfully
						if response.status_code == 200:
						logging.info("DAG triggered successfully.")
						else:
						logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
						except requests.RequestException as e:
						logging.error(f"Request to trigger DAG failed: {str(e)}")
						
						if __name__ == "__main__":
						logging.basicConfig(level=logging.INFO)
						
						# Check if the correct number of arguments is provided
						if len(sys.argv) != 4:
						logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}")
						sys.exit(1)
						
						region = sys.argv[1]
						env_name = sys.argv[2]
						dag_id = sys.argv[3]
						
						# Trigger the DAG with the provided arguments
						trigger_dag(region, env_name, dag_id)
```

------
#### [ Apache Airflow v2 ]

```
def trigger_dag(region, env_name, dag_name):
						"""
						Triggers a DAG in a specified MWAA environment using the Airflow REST API.
						
						Args:
						region (str): AWS region where the MWAA environment is hosted.
						env_name (str): Name of the MWAA environment.
						dag_name (str): Name of the DAG to trigger.
						"""
						
						logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")
						
						# Retrieve the web server hostname and session cookie for authentication
						try:
						web_server_host_name, session_cookie = get_session_info(region, env_name)
						if not session_cookie:
						logging.error("Authentication failed, no session cookie retrieved.")
						return
						except Exception as e:
						logging.error(f"Error retrieving session info: {str(e)}")
						return
						
						# Prepare headers and payload for the request
						cookies = {"session": session_cookie}
						json_body = {"conf": {}}
						
						# Construct the URL for triggering the DAG
						url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns"
						
						# Send the POST request to trigger the DAG
						try:
						response = requests.post(url, cookies=cookies, json=json_body)
						# Check the response status code to determine if the DAG was triggered successfully
						if response.status_code == 200:
						logging.info("DAG triggered successfully.")
						else:
						logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
						except requests.RequestException as e:
						logging.error(f"Request to trigger DAG failed: {str(e)}")
						
						if __name__ == "__main__":
						logging.basicConfig(level=logging.INFO)
						
						# Check if the correct number of arguments is provided
						if len(sys.argv) != 4:
						logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}")
						sys.exit(1)
						
						region = sys.argv[1]
						env_name = sys.argv[2]
						dag_name = sys.argv[3]
						
						# Trigger the DAG with the provided arguments
						trigger_dag(region, env_name, dag_name)
```

------