Menggunakan DAG untuk mengimpor variabel di CLI - Amazon Managed Workflows for Apache Airflow

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan DAG untuk mengimpor variabel di CLI

Contoh kode berikut mengimpor variabel menggunakan CLI di Alur Kerja Terkelola Amazon untuk Apache Airflow.

Versi

Anda dapat menggunakan contoh kode pada halaman ini dengan Apache Airflow v2 di Python 3.10 dan Apache Airflowv3 di Python 3.11.

Prasyarat

Tidak ada izin tambahan yang diperlukan untuk menggunakan contoh kode di halaman ini.

Izin

Anda Akun AWS membutuhkan akses ke AmazonMWAAAirflowCliAccess kebijakan. Untuk mempelajari lebih lanjut, lihatKebijakan CLI Apache Airflow: Amazon MWAAAirflow CliAccess.

Dependensi

Untuk menggunakan contoh kode ini dengan Apache Airflow v2 dan yang lebih baru, tidak diperlukan dependensi tambahan. Gunakan aws-mwaa-docker-imagesuntuk menginstal Apache Airflow.

Contoh kode

Kode contoh berikut mengambil tiga input: nama lingkungan Amazon MWAA Anda (inmwaa_env), lingkungan Anda (inaws_region), dan file lokal yang berisi variabel yang ingin Anda impor (in). Wilayah AWS var_file

import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)

Apa selanjutnya?