

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Invocação DAGs em diferentes ambientes do Amazon MWAA
<a name="samples-invoke-dag"></a>

O exemplo de código a seguir cria um token da CLI do Apache Airflow. O código, usa um gráfico acíclico direcionado (directed acyclic graph, DAG) em um ambiente do Amazon MWAA para invocar um DAG em um ambiente diferente do Amazon MWAA.

**Topics**
+ [Versão](#samples-invoke-dag-version)
+ [Pré-requisitos](#samples-invoke-dag-prereqs)
+ [Permissões](#samples-invoke-dag-permissions)
+ [Dependências](#samples-invoke-dag-dependencies)
+ [Exemplo de código](#samples-invoke-dag-code)

## Versão
<a name="samples-invoke-dag-version"></a>

É possível usar o exemplo de código nesta página com o **Apache Airflow v2** no [Python 3.10](https://peps.python.org/pep-0619/) e o **Apache Airflow v3** no [Python 3.11](https://peps.python.org/pep-0664/).

## Pré-requisitos
<a name="samples-invoke-dag-prereqs"></a>

Para usar o código de exemplo nesta página, você precisará de:
+ Dois [ambientes Amazon MWAA](get-started.md) com acesso ao servidor Web de **rede pública**, incluindo seu ambiente atual.
+ Um exemplo de DAG que foi carregado para o bucket do Amazon Simple Storage Service (Amazon S3) do seu ambiente de destino.

## Permissões
<a name="samples-invoke-dag-permissions"></a>

Para usar o exemplo de código nesta página, o perfil de execução do seu ambiente deve ter permissão para criar um token da CLI do Apache Airflow. Você pode anexar a política AWS gerenciada `AmazonMWAAAirflowCliAccess` para conceder essa permissão.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Para obter mais informações, consulte [Política de CLI do Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dependências
<a name="samples-invoke-dag-dependencies"></a>

Para usar esse exemplo de código com o Apache Airflow v2 e versões posteriores, nenhuma dependência adicional é necessária. Use [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)para instalar o Apache Airflow.

## Exemplo de código
<a name="samples-invoke-dag-code"></a>

O exemplo de código a seguir pressupõe que você esteja usando um DAG em seu ambiente atual para invocar um DAG em outro ambiente.

1. Em seu terminal, navegue até o diretório em que seu código DAG está armazenado. Por exemplo:

   ```
   cd dags
   ```

1. Copie o conteúdo do exemplo de código a seguir e salve-o localmente como `invoke_dag.py`. Substitua os seguintes valores por suas informações.
   + `your-new-environment-name`: o nome do outro ambiente onde deseja invocar o DAG.
   + `your-target-dag-id`: o ID do DAG no outro ambiente que você deseja invocar.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Execute o AWS CLI comando a seguir para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a interface do Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Se o DAG for executado com êxito, você verá um resultado semelhante ao seguinte nos logs de tarefas de `invoke_dag_task`.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Para verificar se seu DAG foi invocado com sucesso, navegue até a IU do Apache Airflow para seu novo ambiente e faça o seguinte:

   1. Na **DAGs**página, localize seu novo DAG de destino na lista de DAGs.

   1. Em **Última execução**, verifique a data e hora da última execução do DAG. Esse carimbo de data/hora deve ser semelhante ao carimbo de data/hora mais recente para `invoke_dag` em seu outro ambiente.

   1. Em **Tarefas recentes**, verifique se a última execução foi bem-sucedida.