Utilizzo di un DAG per importare variabili nella CLI - Amazon Managed Workflows for Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo di un DAG per importare variabili nella CLI

Il codice di esempio seguente importa le variabili utilizzando la CLI su Amazon Managed Workflows for Apache Airflow.

Versione

È possibile utilizzare l'esempio di codice in questa pagina con Apache Airflow v2 in Python 3.10 e Apache Airflow v3in Python 3.11.

Prerequisiti

Non sono necessarie autorizzazioni aggiuntive per utilizzare l'esempio di codice in questa pagina.

Autorizzazioni

È Account AWS necessario accedere alla AmazonMWAAAirflowCliAccess politica. Per ulteriori informazioni, fare riferimento aPolitica della CLI di Apache Airflow: Amazon MWAAAirflow CliAccess.

Dipendenze

Per utilizzare questo esempio di codice con Apache Airflow v2 e versioni successive, non sono necessarie dipendenze aggiuntive. Utilizzare aws-mwaa-docker-imagesper installare Apache Airflow.

Esempio di codice

Il codice di esempio seguente richiede tre input: il nome dell'ambiente Amazon MWAA (inmwaa_env), il nome Regione AWS dell'ambiente (inaws_region) e il file locale che contiene le variabili che desideri importare (in). var_file

import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)

Fasi successive