DAG를 사용하여 CLI에서 변수 가져오기 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

DAG를 사용하여 CLI에서 변수 가져오기

다음 샘플 코드는 Amazon Managed Workflows for Apache Airflow의 CLI를 사용하여 변수를 가져옵니다.

버전

이 페이지의 코드 예제는 Python 3.10Apache Airflow v2Python 3.11Apache Airflow v3에서 사용할 수 있습니다.

사전 조건

이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

권한

AWS 계정은 AmazonMWAAAirflowCliAccess 정책에 대한 액세스 권한이 필요합니다. 자세한 내용은 Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccess 섹션을 참조하세요.

종속성

이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. aws-mwaa-docker-images를 사용하여 Apache Airflow를 설치합니다.

코드 샘플

다음 샘플 코드는 Amazon MWAA 환경 이름(mwaa_env), 환경의 AWS 리전 리전(var_file) 및 가져오려는 변수가 포함된 로컬 파일(aws_region)의 세 가지 입력을 사용합니다.

import boto3 import json import requests import base64 import getopt import sys argv = sys.argv[1:] mwaa_env='' aws_region='' var_file='' try: opts, args = getopt.getopt(argv, 'e:v:r:', ['environment', 'variable-file','region']) #if len(opts) == 0 and len(opts) > 3: if len(opts) != 3: print ('Usage: -e MWAA environment -v variable file location and filename -r aws region') else: for opt, arg in opts: if opt in ("-e"): mwaa_env=arg elif opt in ("-r"): aws_region=arg elif opt in ("-v"): var_file=arg boto3.setup_default_session(region_name="{}".format(aws_region)) mwaa_env_name = "{}".format(mwaa_env) client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) with open ("{}".format(var_file), "r") as myfile: fileconf = myfile.read().replace('\n', '') json_dictionary = json.loads(fileconf) for key in json_dictionary: print(key, " ", json_dictionary[key]) val = (key + " " + json_dictionary[key]) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "variables set {0}".format(val) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message) except: print('Use this script with the following options: -e MWAA environment -v variable file location and filename -r aws region') print("Unexpected error:", sys.exc_info()[0]) sys.exit(2)

다음 단계