

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Invocación DAGs en diferentes entornos de Amazon MWAA
<a name="samples-invoke-dag"></a>

El siguiente código de ejemplo crea un token CLI de Apache Airflow. A continuación, el código utiliza un gráfico acíclico dirigido (DAG) en un entorno de Amazon MWAA para invocar un DAG en un entorno distinto de Amazon MWAA.

**Topics**
+ [Versión](#samples-invoke-dag-version)
+ [Requisitos previos](#samples-invoke-dag-prereqs)
+ [Permisos](#samples-invoke-dag-permissions)
+ [Dependencias](#samples-invoke-dag-dependencies)
+ [Ejemplo de código](#samples-invoke-dag-code)

## Versión
<a name="samples-invoke-dag-version"></a>

Puede usar el código de ejemplo que aparece en esta página con **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) y **Apache Airflow v3** en [Python 3.11](https://peps.python.org/pep-0664/).

## Requisitos previos
<a name="samples-invoke-dag-prereqs"></a>

Para usar el ejemplo de código en esta página, necesitará lo siguiente:
+ Dos [entornos de Amazon MWAA](get-started.md) con acceso a un servidor web de **red pública**, incluido su entorno actual.
+ Un DAG de muestra cargado en el bucket de Amazon Simple Storage Service (Amazon S3) de su entorno de destino.

## Permisos
<a name="samples-invoke-dag-permissions"></a>

Para usar el código de ejemplo de esta página, el rol de ejecución de su entorno debe tener permiso para crear un token CLI de Apache Airflow. Puedes adjuntar la política AWS gestionada `AmazonMWAAAirflowCliAccess` para conceder este permiso.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "airflow:CreateCliToken"
            ],
            "Resource": "*"
        }
    ]
}
```

------

Para obtener más información, consulta [Política de CLI de Apache Airflow: Amazon MWAAAirflow CliAccess](access-policies.md#cli-access).

## Dependencias
<a name="samples-invoke-dag-dependencies"></a>

Para usar este código de ejemplo con Apache Airflow v2 y versiones posteriores, no se necesitan dependencias adicionales. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)Utilícelo para instalar Apache Airflow.

## Ejemplo de código
<a name="samples-invoke-dag-code"></a>

En el siguiente código de ejemplo se supone que está utilizando un DAG en su entorno actual para invocar un DAG en otro entorno.

1. En su terminal, vaya hasta el directorio en el que está almacenado el código de DAG. Por ejemplo:

   ```
   cd dags
   ```

1. Copie el contenido del siguiente código de ejemplo y guárdelo localmente como `invoke_dag.py`. Reemplace los valores siguientes por sus propios valores.
   + `your-new-environment-name`: nombre del otro entorno en el que desea invocar el DAG.
   + `your-target-dag-id`: ID del DAG del otro entorno que desea invocar.

   ```
   from airflow.decorators import dag, task
   import boto3
   from datetime import datetime, timedelta
   import os, requests
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   @task()
   def invoke_dag_task(**kwargs):
       client = boto3.client('mwaa')
       token = client.create_cli_token(Name='your-new-environment-name')
       url = f"https://{token['WebServerHostname']}/aws_mwaa/cli"
       body = 'dags trigger your-target-dag-id'
       headers = {
           'Authorization' : 'Bearer ' + token['CliToken'],
           'Content-Type': 'text/plain'
           }
       requests.post(url, data=body, headers=headers)
   
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=datetime(2022, 1, 1),
       dagrun_timeout=timedelta(minutes=60),
       catchup=False
       )
   def invoke_dag():
       t = invoke_dag_task()
   
   invoke_dag_test = invoke_dag()
   ```

1.  Ejecute el siguiente AWS CLI comando para copiar el DAG en el bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow. 

   ```
   aws s3 cp your-dag.py s3://your-environment-bucket/dags/
   ```

1. Si el DAG se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas de `invoke_dag_task`.

   ```
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Para comprobar que el DAG se haya invocado correctamente, vaya a la interfaz de usuario de Apache Airflow del nuevo entorno y, a continuación, haga lo siguiente:

   1. En la **DAGs**página, busque su nuevo DAG de destino en la lista de DAGs.

   1. En **Última ejecución**, compruebe la marca de tiempo de la última ejecución del DAG. Esta marca de tiempo debe acercarse lo máximo posible a la última marca de tiempo para `invoke_dag` en su otro entorno.

   1. En **Tareas recientes**, compruebe que la última ejecución se haya realizado correctamente.