여러 Amazon MWAA 환경에서 DAG 호출 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

여러 Amazon MWAA 환경에서 DAG 호출

다음 코드 예제는 Apache Airflow CLI 토큰을 생성합니다. 그런 다음 코드는 한 Amazon MWAA 환경에서 방향성 비순환 그래프(DAG)를 사용하여 다른 Amazon MWAA 환경에서 DAG를 호출합니다.

버전

이 페이지의 코드 예제를 Python 3.10의 Apache Airflow v2 및 Python 3.11의 Apache Airflow v3와 함께 사용할 수 있습니다. https://peps.python.org/pep-0664/

사전 조건

이 페이지에서 코드 예제를 사용하려면 다음이 필요합니다.

권한

이 페이지의 코드 예제를 사용하려면 사용자 환경의 실행 역할에 Apache Airflow CLI 토큰을 생성할 권한이 있어야 합니다. AWS관리형 정책을 연결하여이 권한을 부여AmazonMWAAAirflowCliAccess할 수 있습니다.

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

자세한 정보는 Apache Airflow CLI 정책: AmazonMWAAAirflowCliAccess 섹션을 참조하세요.

종속성

Apache Airflow v2 이상에서이 코드 예제를 사용하려면 추가 종속성이 필요하지 않습니다. aws-mwaa-docker-images를 사용하여 Apache Airflow를 설치합니다.

코드 예제

다음 코드 예제는 현재 환경에서 DAG를 사용하여 다른 환경에서 DAG를 호출한다고 가정합니다.

  1. 터미널에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 예제의 내용을 복사하고 로컬에서 invoke_dag.py로 저장합니다. 다음 값을 사용자의 정보로 교체합니다.

    • your-new-environment-name— DAG를 호출하려는 다른 환경의 이름.

    • your-target-dag-id— 호출하려는 다른 환경에서 DAG의 ID.

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. 다음 AWS CLI 명령을 실행하여 DAG를 환경의 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

    aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. DAG가 성공적으로 실행되면에 대한 작업 로그에서 다음과 유사한 출력을 얻게 됩니다invoke_dag_task.

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    DAG가 성공적으로 호출되었는지 확인하려면 새 환경에 대한 Apache Airflow UI로 이동한 다음 다음을 수행합니다.

    1. DAG 페이지의 DAG 목록에서 새 대상 DAG를 찾습니다.

    2. 마지막 실행에서 최신 DAG 실행의 타임스탬프를 확인합니다. 이 타임스탬프는 사용자의 다른 환경에서 invoke_dag에 대한 최신 타임스탬프와 거의 일치해야 합니다.

    3. 최근 작업에서 마지막 실행이 성공했는지 확인합니다.