여러 Amazon MWAA 환경에서 DAG 호출 - Amazon Managed Workflows for Apache Airflow

여러 Amazon MWAA 환경에서 DAG 호출

다음 코드 예제는 Apache Airflow CLI 토큰을 생성합니다. 그런 다음 코드는 한 Amazon MWAA 환경에서 방향성 비순환 그래프(DAG)를 사용하여 다른 Amazon MWAA 환경에서 DAG를 호출합니다.

버전

이 페이지의 코드 예제는 Python 3.10Apache Airflow v2Python 3.11Apache Airflow v3에서 사용할 수 있습니다.

사전 조건

이 페이지에서 코드 예제를 사용하려면 다음이 필요합니다.

  • 퍼블릭 네트워크 웹 서버에 액세스할 수 있는 두 개의 Amazon MWAA 환경(현재 환경 포함).

  • 대상 환경의 Amazon Simple Storage Service(S3) 버킷에 업로드된 샘플 DAG.

권한

이 페이지의 코드 예제를 사용하려면 사용자 환경의 실행 역할에 Apache Airflow CLI 토큰을 생성할 권한이 있어야 합니다. AWS 관리형 정책 AmazonMWAAAirflowCliAccess를 연결하여 이 권한을 부여할 수 있습니다.

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

자세한 정보는 Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccess 섹션을 참조하세요.

종속성

이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. aws-mwaa-docker-images를 사용하여 Apache Airflow를 설치합니다.

코드 예제

다음 코드 예제는 현재 환경에서 DAG를 사용하여 다른 환경에서 DAG를 호출한다고 가정합니다.

  1. 터미널에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 예제의 내용을 복사하고 로컬에서 invoke_dag.py로 저장합니다. 다음 값을 사용자의 정보로 교체합니다.

    • your-new-environment-name— DAG를 호출하려는 다른 환경의 이름.

    • your-target-dag-id— 호출하려는 다른 환경에서 DAG의 ID.

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. 다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

    aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. DAG가 성공적으로 실행되면 invoke_dag_task의 작업 로그에 다음과 유사한 출력을 얻게 됩니다.

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    DAG가 성공적으로 호출되었는지 확인하려면 새 환경에 대한 Apache Airflow UI로 이동한 다음 다음을 수행합니다.

    1. DAG 페이지의 DAG 목록에서 새 대상 DAG를 찾습니다.

    2. 마지막 실행에서 최신 DAG 실행의 타임스탬프를 확인합니다. 이 타임스탬프는 사용자의 다른 환경에서 invoke_dag에 대한 최신 타임스탬프와 거의 일치해야 합니다.

    3. 최근 작업에서 마지막 실행이 성공했는지 확인합니다.