Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utilisation d'un DAG pour importer des variables dans la CLI
L'exemple de code suivant importe des variables à l'aide de la CLI sur Amazon Managed Workflows pour Apache Airflow.
Version
Vous pouvez utiliser l'exemple de code présenté sur cette page avec Apache Airflow v2 en Python 3.10
Prérequis
Aucune autorisation supplémentaire n'est requise pour utiliser l'exemple de code présenté sur cette page.
Autorisations
Vos Compte AWS besoins ont accès à la AmazonMWAAAirflowCliAccess politique. Pour en savoir plus, reportez-vous àPolitique de la CLI Apache Airflow : Amazon MWAAAirflow CliAccess.
Dépendances
Pour utiliser cet exemple de code avec Apache Airflow v2 et versions ultérieures, aucune dépendance supplémentaire n'est requise. aws-mwaa-docker-images
Exemple de code
L'exemple de code suivant utilise trois entrées : le nom de votre environnement Amazon MWAA (inmwaa_env), le nom Région AWS de votre environnement (inaws_region) et le fichier local contenant les variables que vous souhaitez importer (invar_file).
import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)
Quelle est la prochaine étape ?
-
Découvrez comment télécharger le code DAG dans cet exemple dans le
dagsdossier de votre compartiment Amazon S3 dansAjouter ou mettre à jour DAGs.