DAG를 사용하여 CLI에서 변수 가져오기
다음 샘플 코드는 Amazon Managed Workflows for Apache Airflow의 CLI를 사용하여 변수를 가져옵니다.
버전
이 페이지의 코드 예제는 Python 3.10
사전 조건
이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.
권한
AWS 계정은 AmazonMWAAAirflowCliAccess 정책에 대한 액세스 권한이 필요합니다. 자세한 내용은 Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccess 섹션을 참조하세요.
종속성
이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. aws-mwaa-docker-images
코드 샘플
다음 샘플 코드는 Amazon MWAA 환경 이름(mwaa_env), 환경의 AWS 리전 리전(var_file) 및 가져오려는 변수가 포함된 로컬 파일(aws_region)의 세 가지 입력을 사용합니다.
import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)
다음 단계
-
이 예제의 DAG 코드를 DAG 추가 또는 업데이트에서 Amazon S3 버킷의
dags폴더에 업로드하는 방법을 알아봅니다.