Amazon MWAA での DAG のタイムゾーンの変更 - Amazon Managed Workflows for Apache Airflow

Amazon MWAA での DAG のタイムゾーンの変更

Apache Airflow は、有向非巡回グラフ (DAG) をデフォルトで UTC+0 でスケジュールします。次のステップは、Amazon MWAA が Pendulum を使用して DAG を実行するタイムゾーンを変更する方法を示しています。このトピックでは、任意で、環境の Apache Airflow ログのタイムゾーンを変更するカスタムプラグインを作成する方法を示します。

バージョン

このページのコード例は、Python 3.10Apache Airflow v2 および Python 3.11Apache Airflow v3 で使用可能です。

前提条件

このページのサンプルコードを使用するには、以下が必要です。

アクセス許可

このページのコード例を使用する場合、追加のアクセス許可は必要ありません。

Airflow ログのタイムゾーンを変更するプラグインを作成する

Apache Airflow は、起動時に plugins ディレクトリ内の Python ファイルを実行します。次のプラグインを使用すると、エグゼキューターのタイムゾーンをオーバーライドできます。これにより、Apache Airflow がログを書き込むタイムゾーンが変更されます。

  1. カスタムプラグイン用に plugins という名前のディレクトリを作成し、そのディレクトリに移動します。例:

    $ mkdir plugins $ cd plugins
  2. 以下のコードサンプルの内容をコピーし、ローカルに dag-timezone-plugin.py として plugins フォルダに保存します。

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. plugins ディレクトリに、__init__.py という名前の空の Python ファイルを作成します。plugins ディレクトリは以下のようになっているはずです。

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

plugins.zip を作成する

以下のステップは、plugins.zip を作成する方法について説明します。この例の内容は、他のプラグインやバイナリと組み合わせて 1 つの plugins.zip ファイルにすることができます。

  1. コマンドプロンプトで、前の手順で作成した plugins ディレクトリに移動してください。例:

    cd plugins
  2. plugins ディレクトリ内のコンテンツを圧縮します。

    zip -r ../plugins.zip ./
  3. plugins.zip を S3 バケットにアップロードします

    aws s3 cp plugins.zip s3://your-mwaa-bucket/

コードサンプル

DAG が実行されるデフォルトのタイムゾーン (UTC+0) を変更するには、Pendulum というライブラリを使用します。これは、タイムゾーン対応の日時を処理するための Python ライブラリです。

  1. コマンドプロンプトで、DAG が保存されているディレクトリに移動します。例:

    cd dags
  2. 以下の例の内容をコピーして、tz-aware-dag.py として保存してください。

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. 以下の AWS CLI コマンドを実行して、DAG を環境のバケットにコピーし、次に Apache Airflow UI を使用して DAG をトリガーします。

    aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 成功した場合、tz_test DAG で tz_aware_task のタスクログに以下のような出力が表示されます。

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

次のステップ