Lambda 함수를 사용한 DAG 호출
다음 코드 예제는 Amazon MWAA 환경에서 AWS Lambda 함수를 사용하여 Apache Airflow CLI 토큰을 가져오고 방향성 비순환 그래프(DAG)를 호출합니다.
버전
이 페이지의 코드 예제는 Python 3.10
사전 조건
이 코드 예제를 활용하려면 다음과 같이 해야 합니다.
-
Amazon MWAA 환경에서는 퍼블릭 네트워크 액세스 모드를 사용합니다.
-
최신 Python 런타임을 사용하는 Lambda 함수를 보유합니다.
참고
Lambda 함수와 Amazon MWAA 환경이 동일한 VPC에 있는 경우, 프라이빗 네트워크에서 이 코드를 사용할 수 있습니다. 이 구성의 경우 Lambda 함수의 실행 역할에 Amazon Elastic Compute Cloud(Amazon EC2) CreateNetworkInterface API 작업을 호출할 수 있는 권한이 필요합니다. AWSLambdaVPCAccessExecutionRole
권한
이 페이지의 코드 예제를 사용하려면 Amazon MWAA 환경의 실행 역할에 airflow:CreateCliToken 작업을 수행할 수 있는 액세스 권한이 필요합니다. AmazonMWAAAirflowCliAccess AWS 관리형 정책을 사용하여 이 권한을 제공할 수 있습니다.
자세한 정보는 Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccess 섹션을 참조하세요.
종속성
이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. aws-mwaa-docker-images
코드 예제
-
https://console.aws.amazon.com/lambda/
에서 AWS Lambda 콘솔을 엽니다. -
함수 목록에서 Lambda 함수를 선택합니다.
-
함수 페이지에서 다음 코드를 복사하고 다음을 리소스 이름으로 바꿉니다.
-
YOUR_ENVIRONMENT_NAME– Amazon MWAA 환경의 이름입니다. -
YOUR_DAG_NAME– 호출하려는 DAG의 이름입니다.
import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' client = boto3.client('mwaa') def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout']) -
-
배포(Deploy)를 선택합니다.
-
Lambda 콘솔을 사용하여 함수를 호출하려면 테스트를 선택합니다.
-
Lambda가 DAG를 성공적으로 호출했는지 확인하려면 Amazon MWAA를 사용하여 환경의 Apache Airflow UI로 이동한 후 다음을 수행합니다.
-
DAG 페이지의 DAG 목록에서 새 대상 DAG를 찾습니다.
-
마지막 실행에서 최신 DAG 실행의 타임스탬프를 확인합니다. 이 타임스탬프는 사용자의 다른 환경에서
invoke_dag에 대한 최신 타임스탬프와 거의 일치해야 합니다. -
최근 작업에서 마지막 실행이 성공했는지 확인합니다.
-