

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Memanggil DAGs di lingkungan MWAA Amazon yang berbeda
<a name="samples-invoke-dag"></a>

Contoh kode berikut membuat token Apache Airflow CLI. Kode tersebut kemudian menggunakan grafik asiklik terarah (DAG) di satu lingkungan Amazon MWAA untuk memanggil DAG di lingkungan MWAA Amazon yang berbeda.

**Topics**
+ [Versi](#samples-invoke-dag-version)
+ [Prasyarat](#samples-invoke-dag-prereqs)
+ [Izin](#samples-invoke-dag-permissions)
+ [Dependensi](#samples-invoke-dag-dependencies)
+ [Contoh kode](#samples-invoke-dag-code)

## Versi
<a name="samples-invoke-dag-version"></a>

**[Anda dapat menggunakan contoh kode pada halaman ini dengan **Apache Airflow v2 di Python 3.10 dan Apache Airflow**[v3 di Python 3.11](https://peps.python.org/pep-0619/).](https://peps.python.org/pep-0664/)**

## Prasyarat
<a name="samples-invoke-dag-prereqs"></a>

Untuk menggunakan contoh kode pada halaman ini, Anda memerlukan yang berikut:
+ Dua [lingkungan Amazon MWAA](get-started.md) dengan akses server web **jaringan publik**, termasuk lingkungan Anda saat ini.
+ Contoh DAG yang diunggah ke bucket Amazon Simple Storage Service (Amazon S3) lingkungan target Anda.

## Izin
<a name="samples-invoke-dag-permissions"></a>

Untuk menggunakan contoh kode di halaman ini, peran eksekusi lingkungan Anda harus memiliki izin untuk membuat token CLI Apache Airflow. Anda dapat melampirkan kebijakan AWS-managed `AmazonMWAAAirflowCliAccess` untuk memberikan izin ini.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Untuk informasi lebih lanjut, lihat[Kebijakan CLI Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dependensi
<a name="samples-invoke-dag-dependencies"></a>

Untuk menggunakan contoh kode ini dengan Apache Airflow v2 dan yang lebih baru, tidak diperlukan dependensi tambahan. Gunakan [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)untuk menginstal Apache Airflow.

## Contoh kode
<a name="samples-invoke-dag-code"></a>

Contoh kode berikut mengasumsikan bahwa Anda menggunakan DAG di lingkungan Anda saat ini untuk memanggil DAG di lingkungan lain.

1. Di terminal Anda, arahkan ke direktori tempat kode DAG Anda disimpan. Contoh:

   ```
   cd dags
   ```

1. Salin konten contoh kode berikut dan simpan secara lokal sebagai`invoke_dag.py`. Ganti nilai berikut dengan informasi Anda.
   + `your-new-environment-name`— Nama lingkungan lain di mana Anda ingin memanggil DAG.
   + `your-target-dag-id`— ID DAG di lingkungan lain yang ingin Anda panggil.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Jalankan AWS CLI perintah berikut untuk menyalin DAG ke bucket lingkungan Anda, lalu picu DAG menggunakan Apache Airflow UI. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Jika DAG berhasil berjalan, Anda akan mendapatkan output yang mirip dengan yang berikut di log tugas untuk`invoke_dag_task`.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Untuk memverifikasi bahwa DAG Anda berhasil dipanggil, navigasikan ke Apache Airflow UI untuk lingkungan baru Anda, lalu lakukan hal berikut:

   1. Pada **DAGs**halaman, cari DAG target baru Anda dalam daftar DAGs.

   1. Di bawah **Last Run**, periksa stempel waktu untuk menjalankan DAG terbaru. Stempel waktu ini harus sangat cocok dengan stempel waktu terbaru `invoke_dag` di lingkungan Anda yang lain.

   1. Di bawah **Tugas Terbaru**, periksa apakah proses terakhir berhasil.