Uso de un DAG para importar variables en la CLI - Amazon Managed Workflows para Apache Airflow

Uso de un DAG para importar variables en la CLI

La siguiente muestra de código importa variables mediante la CLI de Amazon Managed Workflows para Apache Airflow.

Versión

Puede usar el código de ejemplo que aparece en esta página con Apache Airflow v2 en Python 3.10 y Apache Airflow v3 en Python 3.11.

Requisitos previos

No se necesitan permisos adicionales para usar el código de ejemplo de esta página.

Permisos

Su Cuenta de AWS necesita acceso a la política AmazonMWAAAirflowCliAccess. Consulte Política de la CLI de Apache Airflow: AmazonMWAAAirflowCliAccess para obtener más información.

Dependencias

Para usar este código de ejemplo con Apache Airflow v2 y versiones posteriores, no se necesitan dependencias adicionales. Use aws-mwaa-docker-images para instalar Apache Airflow.

Código de ejemplo

En la siguiente muestra de código, se requieren tres entradas: el nombre de su entorno de Amazon MWAA (en mwaa_env), la Región de AWS de su entorno (en aws_region) y el archivo local que contiene las variables que desea importar (en var_file).

import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)

Siguientes pasos