在不同的 Amazon MWAA 環境中叫用 DAGs - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在不同的 Amazon MWAA 環境中叫用 DAGs

下列程式碼範例會建立 Apache Airflow CLI 字符。然後,程式碼會在一個 Amazon MWAA 環境中使用定向無環圖 (DAG),在不同的 Amazon MWAA 環境中叫用 DAG。

版本

您可以在 Python 3.10 中使用 Apache Airflow v2 和 Python 3.11 中使用此頁面的程式碼範例。 https://peps.python.org/pep-0619/

先決條件

若要使用此頁面上的程式碼範例,您需要下列項目:

  • 兩個具有公有網路 Web 伺服器存取權的 Amazon MWAA 環境,包括您目前的環境。

  • 上傳至目標環境 Amazon Simple Storage Service (Amazon S3) 儲存貯體的範例 DAG。

許可

若要使用此頁面上的程式碼範例,您環境的執行角色必須具有建立 Apache Airflow CLI 權杖的許可。您可以連接 AWS受管政策AmazonMWAAAirflowCliAccess來授予此許可。

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

如需詳細資訊,請參閱 Apache Airflow CLI 政策:AmazonMWAAAirflowCliAccess

相依性

若要搭配 Apache Airflow v2 和更新版本使用此程式碼範例,不需要額外的相依性。使用 aws-mwaa-docker-images 來安裝 Apache Airflow。

程式碼範例

下列程式碼範例假設您在目前環境中使用 DAG 來叫用另一個環境中的 DAG。

  1. 在終端機中,導覽至存放 DAG 程式碼的目錄。例如:

    cd dags
  2. 複製下列程式碼範例的內容,並將其儲存為 invoke_dag.py。將下列值取代為您的資訊。

    • your-new-environment-name – 您要叫用 DAG 的其他環境名稱。

    • your-target-dag-id – 您要叫用之其他環境中的 DAG ID。

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. 執行下列 AWS CLI 命令,將 DAG 複製到您環境的儲存貯體,然後使用 Apache Airflow UI 觸發 DAG。

    aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果 DAG 成功執行,您會在 的任務日誌中收到類似以下的輸出invoke_dag_task

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    若要確認您的 DAG 已成功調用,請導覽至新環境的 Apache Airflow UI,然後執行下列動作:

    1. DAGs頁面上,在 DAG 清單中找到您的新目標 DAGs。

    2. 上次執行下,檢查最新 DAG 執行的時間戳記。此時間戳記應緊密符合您invoke_dag其他環境中 的最新時間戳記。

    3. 最近任務下,檢查上次執行是否成功。