さまざまな Amazon MWAA 環境での DAG の呼び出し - Amazon Managed Workflows for Apache Airflow

さまざまな Amazon MWAA 環境での DAG の呼び出し

次のコード例では、Apache Airflow CLI トークンを作成します。次に、このコードは、ある Amazon MWAA 環境の有向非巡回グラフ (DAG) を使用して、別の Amazon MWAA 環境の DAG を呼び出します。

バージョン

このページのコード例は、Python 3.10Apache Airflow v2 および Python 3.11Apache Airflow v3 で使用可能です。

前提条件

このページのコード例を使用するには、次のものが必要です。

  • パブリックネットワーク のウェブサーバーにアクセスできる 2 つの Amazon MWAA 環境 (現在の環境を含む)。

  • ターゲット環境の Amazon Simple Storage Service (Amazon S3) バケットに、サンプルの DAG。

アクセス許可

このページのコード例を使用するには、環境の実行ロールに Apache Airflow CLI トークンを作成する権限が必要です。この権限を付与するためには、AWS のマネージドポリシー AmazonMWAAAirflowCliAccess をアタッチすることができます。

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

詳細については、Apache Airflow CLI ポリシー: AmazonMWAAAirflowCliAccess を参照してください。

依存関係

このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。aws-mwaa-docker-images を使用して、Apache Airflow をインストールします。

コード例

次のコード例は、現在の環境で DAG を使用して別の環境で DAG を呼び出していると想定しています。

  1. ターミナルで、DAG コードが保存されているディレクトリに移動します。例:

    cd dags
  2. 次のコード例の内容をコピーし、invoke_dag.py という名前でローカルに保存します。以下の値をお客様の情報に置き換えます。

    • your-new-environment-name — DAG を起動する他の環境の名前。

    • your-target-dag-id — 起動する他の環境の DAG の ID。

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. 以下の AWS CLI コマンドを実行して、DAG を環境のバケットにコピーし、次に Apache Airflow UI を使用して DAG をトリガーします。

    aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. DAG が正常に実行されると、invoke_dag_task のタスクログに以下のような出力が表示されます。

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    DAG が正常に呼び出されたことを確認するには、新しい環境の Apache Airflow UI に移動し、次の操作を行います。

    1. DAG ページの DAG のリストから新しいターゲット DAG を見つけます。

    2. 前回の実行 で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における invoke_dag の最新のタイムスタンプとほぼ一致する必要があります。

    3. 最近のタスク で、前回の実行が成功したことを確認します。