Amazon MWAA에서 DAG 시간대 변경 - Amazon Managed Workflows for Apache Airflow

Amazon MWAA에서 DAG 시간대 변경

Apache Airflow는 기본적으로 방향성 비순환 그래프(DAG)를 UTC+0으로 스케줄링합니다. 다음 단계는 Amazon MWAA가 Pendulum을 사용하여 DAG를 실행하는 시간대를 변경하는 방법을 보여줍니다. 선택적으로 이 주제에서는 사용자 지정 플러그인을 생성하여 사용자 환경의 Apache Airflow 로그의 시간대를 변경하는 방법을 보여줍니다.

버전

이 페이지의 코드 예제는 Python 3.10Apache Airflow v2Python 3.11Apache Airflow v3에서 사용할 수 있습니다.

사전 조건

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.

권한

이 페이지의 코드 예제를 사용하는 데 추가 권한이 필요하지 않습니다.

Airflow 로그의 시간대를 변경하는 플러그인을 생성합니다.

Apache Airflow는 시작시 plugins 디렉터리의 Python 파일을 실행합니다. 다음 플러그인을 사용하면 실행자의 시간대를 재정의하여 Apache Airflow가 로그를 기록하는 시간대를 수정할 수 있습니다.

  1. 사용자 지정 플러그인에 대해 지정된 이름 plugins 디렉터리를 생성하고 디렉터리로 이동합니다. 예:

    $ mkdir plugins $ cd plugins
  2. 다음 코드 샘플의 콘텐츠를 복사하고 로컬의 plugins 폴더에 dag-timezone-plugin.py로 저장합니다.

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. plugins 디렉터리에서 __init__.py라는 빈 Python 파일을 만듭니다. plugins 디렉터리는 다음과 비슷해야 합니다.

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

생성plugins.zip

다음 단계에서는 plugins.zip을 생성하는 방법에 대해 설명합니다. 이 예제의 콘텐츠를 다른 플러그인 및 바이너리와 결합하여 단일 plugins.zip 파일로 만들 수 있습니다.

  1. 명령 프롬프트에서 이전 단계의 plugins 디렉터리로 이동합니다. 예:

    cd plugins
  2. plugins 디렉터리 내의 콘텐츠를 압축합니다.

    zip -r ../plugins.zip ./
  3. S3 버킷에 plugins.zip를 업로드합니다.

    aws s3 cp plugins.zip s3://your-mwaa-bucket/

코드 샘플

DAG가 실행되는 기본 시간대(UTC+0)를 변경하려면 시간대를 인식하는 날짜/시간을 처리하는 Python 라이브러리인 Pendulum이라는 라이브러리를 사용합니다.

  1. 명령 프롬프트에서 DAG가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 예제의 콘텐츠를 복사하고 tz-aware-dag.py로 저장합니다.

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. 다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

    aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 성공하면 tz_test DAG의 tz_aware_task에 대한 작업 로그에 다음과 비슷한 결과가 출력됩니다.

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

다음 단계