Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Aufrufen DAGs in verschiedenen Amazon MWAA-Umgebungen
Das folgende Codebeispiel erstellt ein Apache Airflow CLI-Token. Der Code verwendet dann einen gerichteten azyklischen Graphen (DAG) in einer Amazon MWAA-Umgebung, um eine DAG in einer anderen Amazon MWAA-Umgebung aufzurufen.
Version
Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 in Python 3.10
Voraussetzungen
Um das Codebeispiel auf dieser Seite zu verwenden, benötigen Sie Folgendes:
-
Zwei Amazon MWAA-Umgebungen mit Zugriff auf öffentliche Netzwerk-Webserver, einschließlich Ihrer aktuellen Umgebung.
-
Eine Beispiel-DAG, die in den Amazon Simple Storage Service (Amazon S3) -Bucket Ihrer Zielumgebung hochgeladen wurde.
Berechtigungen
Um das Codebeispiel auf dieser Seite verwenden zu können, muss die Ausführungsrolle Ihrer Umgebung über die Berechtigung verfügen, ein Apache Airflow CLI-Token zu erstellen. Sie können die AWS-managed Policy anhängen, um diese AmazonMWAAAirflowCliAccess Berechtigung zu erteilen.
Weitere Informationen finden Sie unter Apache Airflow CLI-Richtlinie: Amazon MWAAAirflow CliAccess.
Abhängigkeiten
Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet aws-mwaa-docker-images
Codebeispiel
Im folgenden Codebeispiel wird davon ausgegangen, dass Sie eine DAG in Ihrer aktuellen Umgebung verwenden, um eine DAG in einer anderen Umgebung aufzurufen.
-
Navigieren Sie in Ihrem Terminal zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Beispiel:
cd dags -
Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter
invoke_dag.py. Ersetzen Sie die folgenden Werte durch Ihre Informationen.-
your-new-environment-name— Der Name der anderen Umgebung, in der Sie die DAG aufrufen möchten. -
your-target-dag-id— Die ID der DAG in der anderen Umgebung, die Sie aufrufen möchten.
from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag() -
-
Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus.
aws s3 cpyour-dag.py s3://your-environment-bucket/dags/ -
Wenn die DAG erfolgreich ausgeführt wird, erhalten Sie in den Aufgabenprotokollen für
invoke_dag_taskeine Ausgabe ähnlich der folgenden.[2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
Um zu überprüfen, ob Ihre DAG erfolgreich aufgerufen wurde, navigieren Sie zur Apache Airflow Airflow-Benutzeroberfläche für Ihre neue Umgebung und gehen Sie dann wie folgt vor:
-
Suchen Sie auf der DAGsSeite Ihre neue Ziel-DAG in der Liste von. DAGs
-
Überprüfen Sie unter Letzte Ausführung den Zeitstempel für die letzte DAG-Ausführung. Dieser Zeitstempel sollte genau mit dem letzten Zeitstempel für Ihre andere
invoke_dagUmgebung übereinstimmen. -
Überprüfen Sie unter Letzte Aufgaben, ob die letzte Ausführung erfolgreich war.
-