

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Utilizzo dell'API REST di Apache Airflow
<a name="access-mwaa-apache-airflow-rest-api"></a>

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) supporta l'interazione con gli ambienti Apache Airflow direttamente utilizzando l'API REST di Apache Airflow per ambienti che eseguono Apache Airflow v2.4.3 e versioni successive. Ciò consente di accedere e gestire gli ambienti Amazon MWAA in modo programmatico, fornendo un modo standardizzato per richiamare flussi di lavoro di orchestrazione dei dati, gestire e DAGs monitorare lo stato di vari componenti di Apache Airflow come il database di metadati, il trigger e lo scheduler.

Per supportare la scalabilità durante l'utilizzo dell'API REST di Apache Airflow, Amazon MWAA offre la possibilità di scalare orizzontalmente la capacità del server Web per gestire l'aumento della domanda, sia che si tratti di richieste API REST, utilizzo dell'interfaccia a riga di comando (CLI) o più utenti simultanei dell'interfaccia utente (UI) Apache Airflow. Per ulteriori informazioni su come Amazon MWAA ridimensiona i server web, consulta. [Configurazione della scalabilità automatica del server web Amazon MWAA](mwaa-web-server-autoscaling.md)

Puoi utilizzare l'API REST di Apache Airflow per implementare i seguenti casi d'uso per i tuoi ambienti:
+ **Accesso programmatico**: ora puoi avviare le esecuzioni di Apache Airflow DAG, gestire set di dati e recuperare lo stato di vari componenti come il database dei metadati, i trigger e gli scheduler senza fare affidamento sull'interfaccia utente o sulla CLI di Apache Airflow.
+ **Integrazione con applicazioni e microservizi esterni**: supporto API REST che puoi utilizzare per creare soluzioni personalizzate che integrano i tuoi ambienti Amazon MWAA con altri sistemi. Ad esempio, puoi avviare flussi di lavoro in risposta a eventi provenienti da sistemi esterni, come lavori di database completati o registrazioni di nuovi utenti.
+ **Monitoraggio centralizzato**: puoi creare dashboard di monitoraggio che aggregano lo stato del tuo ambiente in DAGs più ambienti Amazon MWAA, abilitando monitoraggio e gestione centralizzati.

Per ulteriori informazioni sull'API REST di Apache Airflow, consulta l'Apache Airflow REST API [Reference](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html).

Utilizzando`InvokeRestApi`, è possibile accedere all'API REST di Apache Airflow utilizzando le credenziali. AWS In alternativa, puoi accedervi anche ottenendo un token di accesso al server web e quindi utilizzando il token per chiamarlo.

Se si verifica un errore nel messaggio `Update your environment to use InvokeRestApi` durante l'utilizzo dell'`InvokeRestApi`operazione, significa che è necessario aggiornare l'ambiente Amazon MWAA. Questo errore si verifica quando l'ambiente Amazon MWAA non è compatibile con le ultime modifiche relative alla `InvokeRestApi` funzionalità. Per risolvere questo problema, aggiorna il tuo ambiente Amazon MWAA per incorporare le modifiche necessarie alla `InvokeRestApi` funzionalità.

L'`InvokeRestApi`operazione ha una durata di timeout predefinita di 10 secondi. Se l'operazione non viene completata entro questo intervallo di tempo di 10 secondi, viene terminata automaticamente e viene generato un errore. Assicurati che le chiamate all'API REST siano progettate per essere completate entro questo periodo di timeout per evitare errori.

Per supportare la scalabilità durante l'utilizzo dell'API REST di Apache Airflow, Amazon MWAA offre la possibilità di scalare orizzontalmente la capacità del server Web per gestire l'aumento della domanda, sia che si tratti di richieste API REST, utilizzo dell'interfaccia a riga di comando (CLI) o più utenti simultanei dell'interfaccia utente (UI) Apache Airflow. Per ulteriori informazioni su come Amazon MWAA ridimensiona i server Web, consulta. [Configurazione della scalabilità automatica del server web Amazon MWAA](mwaa-web-server-autoscaling.md)

Puoi utilizzare l'API REST di Apache Airflow per implementare i seguenti casi d'uso per i tuoi ambienti:
+ **Accesso programmatico**: ora puoi avviare le esecuzioni di Apache Airflow DAG, gestire set di dati e recuperare lo stato di vari componenti come il database dei metadati, i trigger e gli scheduler senza fare affidamento sull'interfaccia utente o sulla CLI di Apache Airflow.
+ **Integrazione con applicazioni e microservizi esterni**: supporto API REST che puoi utilizzare per creare soluzioni personalizzate che integrano i tuoi ambienti Amazon MWAA con altri sistemi. Ad esempio, puoi avviare flussi di lavoro in risposta a eventi provenienti da sistemi esterni, come lavori di database completati o registrazioni di nuovi utenti.
+ **Monitoraggio centralizzato**: puoi creare dashboard di monitoraggio che aggregano lo stato del tuo ambiente in DAGs più ambienti Amazon MWAA, abilitando monitoraggio e gestione centralizzati.

Per ulteriori informazioni sull'API REST di Apache Airflow, consulta The [Apache Airflow REST API Reference](https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html).

Utilizzando`InvokeRestApi`, è possibile accedere all'API REST di Apache Airflow utilizzando le credenziali. AWS In alternativa, è possibile accedervi anche ottenendo un token di accesso al server Web e quindi utilizzando il token per chiamarlo.
+ Se si verifica un errore nel messaggio `Update your environment to use InvokeRestApi` durante l'utilizzo dell'`InvokeRestApi`operazione, significa che è necessario aggiornare l'ambiente Amazon MWAA. Questo errore si verifica quando l'ambiente Amazon MWAA non è compatibile con le ultime modifiche relative alla `InvokeRestApi` funzionalità. Per risolvere questo problema, aggiorna il tuo ambiente Amazon MWAA per incorporare le modifiche necessarie alla `InvokeRestApi` funzionalità.
+ L'`InvokeRestApi`operazione ha una durata di timeout predefinita di 10 secondi. Se l'operazione non viene completata entro questo intervallo di tempo di 10 secondi, viene terminata automaticamente e viene generato un errore. Assicurati che le chiamate all'API REST siano progettate per essere completate entro questo periodo di timeout per evitare errori.

**Importante**  
La dimensione del payload di risposta non può superare i 6 MB. Il tuo `RestApi` fallisce se questo limite viene superato.

Utilizza i seguenti esempi per effettuare chiamate API all'API REST di Apache Airflow e avviare una nuova esecuzione DAG:

**Topics**
+ [Concessione dell'accesso all'API REST di Apache Airflow: `airflow:InvokeRestApi`](#granting-access-MWAA-Enhanced-REST-API)
+ [Chiamata dell'API REST di Apache Airflow](#listing-DAGs-creating-variables-using-restapi-script)
+ [Creazione di un token di sessione del server Web e chiamata all'API REST di Apache Airflow](#create-web-server-session-token)

## Concessione dell'accesso all'API REST di Apache Airflow: `airflow:InvokeRestApi`
<a name="granting-access-MWAA-Enhanced-REST-API"></a>

Per accedere all'API REST di Apache Airflow AWS utilizzando le credenziali, devi concedere `airflow:InvokeRestApi` l'autorizzazione nella tua policy IAM. Nel seguente esempio di policy, specifica il `Public` ruolo`Admin`,`Op`, `User``Viewer`, o `{airflow-role}` per personalizzare il livello di accesso degli utenti. Per ulteriori informazioni, consulta [Ruoli predefiniti](https://airflow.apache.org/docs/apache-airflow/1.10.6/security.html?highlight=ldap#default-roles) nella guida di riferimento di *Apache Airflow*.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "AllowMwaaRestApiAccess",
            "Effect": "Allow",
            "Action": "airflow:InvokeRestApi",
            "Resource": [
            "arn:aws:airflow:us-east-1:111122223333:role/{your-environment-name}/{airflow-role}"
            ]
        }
    ]
}
```

------

**Nota**  
Durante la configurazione di un server web privato, l'`InvokeRestApi`azione non può essere richiamata dall'esterno di un Virtual Private Cloud (VPC). È possibile utilizzare la `aws:SourceVpc` chiave per applicare un controllo di accesso più granulare per questa operazione. Per ulteriori informazioni, fare riferimento a [aws: SourceVpc](https://docs.aws.amazon.com//IAM/latest/UserGuide/reference_policies_condition-keys.html#condition-keys-sourcevpc).

## Chiamata dell'API REST di Apache Airflow
<a name="listing-DAGs-creating-variables-using-restapi-script"></a>

Questo script di esempio seguente illustra come utilizzare l'API REST di Apache Airflow per elencare le opzioni DAGs disponibili nell'ambiente e come creare una variabile Apache Airflow:

```
import boto3
			
  env_name = "MyAirflowEnvironment"
			
  def list_dags(client):
    request_params = {
      "Name": env_name,
      "Path": "/dags",
      "Method": "GET",
      "QueryParameters": {
        "paused": False
      }
    }
  response = client.invoke_rest_api(
    **request_params
  )
			
  print("Airflow REST API response: ", response['RestApiResponse'])
			
  def create_variable(client):
    request_params = {
      "Name": env_name,
      "Path": "/variables",
      "Method": "POST",
      "Body": {
        "key": "test-restapi-key",
        "value": "test-restapi-value",
        "description": "Test variable created by MWAA InvokeRestApi API",
      }
    }
  response = client.invoke_rest_api(
    **request_params
  )
			
  print("Airflow REST API response: ", response['RestApiResponse'])
			
  if __name__ == "__main__":
    client = boto3.client("mwaa")
    list_dags(client)
    create_variable(client)
```

## Creazione di un token di sessione del server Web e chiamata all'API REST di Apache Airflow
<a name="create-web-server-session-token"></a>

Per creare un token di accesso al server web, usa la seguente funzione Python. Questa funzione chiama innanzitutto l'API Amazon MWAA per ottenere un token di accesso Web. Il token di accesso Web, che scade dopo 60 secondi, viene quindi sostituito con un token di *sessione* Web, che consente di accedere al server Web e utilizzare l'API REST Apache Airflow. Se hai bisogno di più di 10 transazioni al secondo (TPS) di capacità di limitazione, puoi utilizzare questo metodo per accedere all'API REST di Apache Airflow.

Il token di sessione scade dopo 12 ore.

**Suggerimento**  
Le principali modifiche nei seguenti esempi di codice da Apache Airflow v2 a v3 sono:  
Il percorso dell'API REST è stato modificato da a `/api/v1` `/api/v2`
Percorso di accesso modificato da `/aws_maa/login` a `/pluginsv2/aws_mwaa/login`
La risposta di login `response.cookies["_token"]` contiene informazioni sul token che è necessario utilizzare per le successive chiamate API
Per una chiamata all'API REST, è necessario passare `jwt_token` informazioni nelle intestazioni come:  

  ```
  headers = {
    "Authorization": f"Bearer {jwt_token}",
    "Content-Type": "application/json"
  }
  ```

------
#### [ Apache Airflow v3 ]

```
def get_token_info(region, env_name):
  logging.basicConfig(level=logging.INFO)
						
  try:
    # Initialize MWAA client and request a web login token
    mwaa = boto3.client('mwaa', region_name=region)
    response = mwaa.create_web_login_token(Name=env_name)
						
    # Extract the web server hostname and login token
    web_server_host_name = response["WebServerHostname"]
    web_token = response["WebToken"]
						
    # Construct the URL needed for authentication 
    login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login"
    login_payload = {"token": web_token}
						
    # Make a POST request to the MWAA login url using the login payload
    response = requests.post(
      login_url,
      data=login_payload,
      timeout=10
    )
						
    # Check if login was successful
    if response.status_code == 200:
						
    # Return the hostname and the session cookie 
    return (
      web_server_host_name,
      response.cookies['_token']
    )
    else:
      # Log an error
      logging.error("Failed to log in: HTTP %d", response.status_code)
      return None
      except requests.RequestException as e:
      
      # Log any exceptions raised during the request to the MWAA login endpoint
      logging.error("Request failed: %s", str(e))
      return None
      except Exception as e:
      
      # Log any other unexpected exceptions
      logging.error("An unexpected error occurred: %s", str(e))
      return None
```

------
#### [ Apache Airflow v2 ]

```
def get_session_info(region, env_name):
  logging.basicConfig(level=logging.INFO)

  try:
      # Initialize MWAA client and request a web login token
      mwaa = boto3.client('mwaa', region_name=region)
      response = mwaa.create_web_login_token(Name=env_name)
      
      # Extract the web server hostname and login token
      web_server_host_name = response["WebServerHostname"]
      web_token = response["WebToken"]
      
      # Construct the URL needed for authentication 
      login_url = f"https://{web_server_host_name}/aws_mwaa/login"
      login_payload = {"token": web_token}

      # Make a POST request to the MWAA login url using the login payload
      response = requests.post(
          login_url,
          data=login_payload,
          timeout=10
      )

      # Check if login was succesfull 
      if response.status_code == 200:
      
          # Return the hostname and the session cookie 
          return (
              web_server_host_name,
              response.cookies["session"]
          )
      else:
          # Log an error
          logging.error("Failed to log in: HTTP %d", response.status_code)
          return None
  except requests.RequestException as e:
       # Log any exceptions raised during the request to the MWAA login endpoint
      logging.error("Request failed: %s", str(e))
      return None
  except Exception as e:
      # Log any other unexpected exceptions
      logging.error("An unexpected error occurred: %s", str(e))
      return None
```

------

Una volta completata l'autenticazione, disponi delle credenziali per iniziare a inviare richieste agli endpoint dell'API. Nell'esempio riportato nella sezione seguente, utilizzate l'endpoint. `dags/{dag_name}/dagRuns`

------
#### [ Apache Airflow v3 ]

```
def trigger_dag(region, env_name, dag_id):
						"""
						Triggers a DAG in a specified MWAA environment using the Airflow REST API.
						
						Args:
						region (str): AWS region where the MWAA environment is hosted.
						env_name (str): Name of the MWAA environment.
						dag_id (str): ID of the DAG to trigger.
						"""
						
						logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}")
						
						# Retrieve the web server hostname and token for authentication
						try:
						web_server_host_name, jwt_token = get_token_info(region, env_name)
						if not jwt_token:
						logging.error("Authentication failed, no jwt token retrieved.")
						return
						except Exception as e:
						logging.error(f"Error retrieving token info: {str(e)}")
						return
						
						# Prepare headers and payload for the request
						request_headers = {
						"Authorization": f"Bearer {jwt_token}",
						"Content-Type": "application/json" # Good practice to include, even for GET
						}
						
						# sample request body input
						json_body = {"logical_date": "2025-09-17T14:15:00Z"}
						
						# Construct the URL for triggering the DAG
						url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns"
						
						# Send the POST request to trigger the DAG
						try:
						response = requests.post(url, headers=request_headers, json=json_body)
						# Check the response status code to determine if the DAG was triggered successfully
						if response.status_code == 200:
						logging.info("DAG triggered successfully.")
						else:
						logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
						except requests.RequestException as e:
						logging.error(f"Request to trigger DAG failed: {str(e)}")
						
						if __name__ == "__main__":
						logging.basicConfig(level=logging.INFO)
						
						# Check if the correct number of arguments is provided
						if len(sys.argv) != 4:
						logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}")
						sys.exit(1)
						
						region = sys.argv[1]
						env_name = sys.argv[2]
						dag_id = sys.argv[3]
						
						# Trigger the DAG with the provided arguments
						trigger_dag(region, env_name, dag_id)
```

------
#### [ Apache Airflow v2 ]

```
def trigger_dag(region, env_name, dag_name):
						"""
						Triggers a DAG in a specified MWAA environment using the Airflow REST API.
						
						Args:
						region (str): AWS region where the MWAA environment is hosted.
						env_name (str): Name of the MWAA environment.
						dag_name (str): Name of the DAG to trigger.
						"""
						
						logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}")
						
						# Retrieve the web server hostname and session cookie for authentication
						try:
						web_server_host_name, session_cookie = get_session_info(region, env_name)
						if not session_cookie:
						logging.error("Authentication failed, no session cookie retrieved.")
						return
						except Exception as e:
						logging.error(f"Error retrieving session info: {str(e)}")
						return
						
						# Prepare headers and payload for the request
						cookies = {"session": session_cookie}
						json_body = {"conf": {}}
						
						# Construct the URL for triggering the DAG
						url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns"
						
						# Send the POST request to trigger the DAG
						try:
						response = requests.post(url, cookies=cookies, json=json_body)
						# Check the response status code to determine if the DAG was triggered successfully
						if response.status_code == 200:
						logging.info("DAG triggered successfully.")
						else:
						logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}")
						except requests.RequestException as e:
						logging.error(f"Request to trigger DAG failed: {str(e)}")
						
						if __name__ == "__main__":
						logging.basicConfig(level=logging.INFO)
						
						# Check if the correct number of arguments is provided
						if len(sys.argv) != 4:
						logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}")
						sys.exit(1)
						
						region = sys.argv[1]
						env_name = sys.argv[2]
						dag_name = sys.argv[3]
						
						# Trigger the DAG with the provided arguments
						trigger_dag(region, env_name, dag_name)
```

------