

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# さまざまな Amazon MWAA 環境での DAG の呼び出し
<a name="samples-invoke-dag"></a>

次のコード例では、Apache Airflow CLI トークンを作成します。次に、このコードは、ある Amazon MWAA 環境の有向非巡回グラフ (DAG) を使用して、別の Amazon MWAA 環境の DAG を呼び出します。

**Topics**
+ [バージョン](#samples-invoke-dag-version)
+ [前提条件](#samples-invoke-dag-prereqs)
+ [アクセス許可](#samples-invoke-dag-permissions)
+ [依存関係](#samples-invoke-dag-dependencies)
+ [コード例](#samples-invoke-dag-code)

## バージョン
<a name="samples-invoke-dag-version"></a>

このページのコード例は、[Python 3.10](https://peps.python.org/pep-0619/) の **Apache Airflow v2** および [Python 3.11](https://peps.python.org/pep-0664/) の **Apache Airflow v3** で使用可能です。

## 前提条件
<a name="samples-invoke-dag-prereqs"></a>

このページのコード例を使用するには、次のものが必要です。
+ **パブリックネットワーク** のウェブサーバーにアクセスできる 2 つの [Amazon MWAA 環境](get-started.md) (現在の環境を含む)。
+ ターゲット環境の Amazon Simple Storage Service (Amazon S3) バケットに、サンプルの DAG。

## アクセス許可
<a name="samples-invoke-dag-permissions"></a>

このページのコード例を使用するには、環境の実行ロールに Apache Airflow CLI トークンを作成する権限が必要です。 AWS管理ポリシーをアタッチ`AmazonMWAAAirflowCliAccess`して、このアクセス許可を付与できます。

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

詳細については、[Apache Airflow CLI ポリシー: AmazonMWAAAirflowCliAccess](access-policies.md#cli-access) を参照してください。

## 依存関係
<a name="samples-invoke-dag-dependencies"></a>

このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。[aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images) を使用して、Apache Airflow をインストールします。

## コード例
<a name="samples-invoke-dag-code"></a>

次のコード例は、現在の環境で DAG を使用して別の環境で DAG を呼び出していると想定しています。

1. ターミナルで、DAG コードが保存されているディレクトリに移動します。例えば、次のようになります。

   ```
   cd dags
   ```

1. 次のコード例の内容をコピーし、`invoke_dag.py` という名前でローカルに保存します。以下の値をお客様の情報に置き換えます。
   + `your-new-environment-name` — DAG を起動する他の環境の名前。
   + `your-target-dag-id` — 起動する他の環境の DAG の ID。

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  次の AWS CLI コマンドを実行して DAG を環境のバケットにコピーし、Apache Airflow UI を使用して DAG をトリガーします。

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. DAG が正常に実行されると、`invoke_dag_task` のタスクログに以下のような出力が表示されます。

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   DAG が正常に呼び出されたことを確認するには、新しい環境の Apache Airflow UI に移動し、次の操作を行います。

   1. **DAG** ページの DAG のリストから新しいターゲット DAG を見つけます。

   1. **前回の実行** で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における `invoke_dag` の最新のタイムスタンプとほぼ一致する必要があります。

   1. **[最近のタスク]** で、前回の実行が成功したことを確認します。