Ändern der Zeitzone einer DAG auf Amazon MWAA - Amazon Managed Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Ändern der Zeitzone einer DAG auf Amazon MWAA

Apache Airflow plant deinen gerichteten azyklischen Graphen (DAG) standardmäßig in UTC+0. Die folgenden Schritte zeigen, wie Sie die Zeitzone ändern können, in der Amazon MWAA Ihre DAGs mit Pendulum ausführt. Optional zeigt dieses Thema, wie Sie ein benutzerdefiniertes Plugin erstellen können, um die Zeitzone für die Apache Airflow Airflow-Protokolle Ihrer Umgebung zu ändern.

Version

Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 in Python 3.10 und Apache Airflow v3 in Python 3.11 verwenden.

Voraussetzungen

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:

Berechtigungen

Für die Verwendung des Codebeispiels auf dieser Seite sind keine zusätzlichen Berechtigungen erforderlich.

Erstellen Sie ein Plugin, um die Zeitzone in Airflow-Protokollen zu ändern

Apache Airflow führt die Python-Dateien beim Start im plugins Verzeichnis aus. Mit dem folgenden Plugin können Sie die Zeitzone des Executors überschreiben, wodurch die Zeitzone geändert wird, in der Apache Airflow Logs schreibt.

  1. Erstellen Sie ein Verzeichnis, das plugins nach Ihrem benutzerdefinierten Plugin benannt ist, und navigieren Sie zu dem Verzeichnis. Beispiel:

    $ mkdir plugins $ cd plugins
  2. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal wie dag-timezone-plugin.py im plugins Ordner.

    import time import os os.environ['TZ'] = 'America/Los_Angeles' time.tzset()
  3. Erstellen Sie im plugins Verzeichnis eine leere Python-Datei mit dem Namen__init__.py. Ihr plugins Verzeichnis sollte dem folgenden ähnlich sein:

    plugins/ |-- __init__.py |-- dag-timezone-plugin.py

Erstellen eines plugins.zip

In den folgenden Schritten wird erklärt, wie Sie erstellenplugins.zip. Der Inhalt dieses Beispiels kann mit anderen Plugins und Binärdateien in einer einzigen plugins.zip Datei kombiniert werden.

  1. Navigieren Sie in der Befehlszeile zu dem plugins Verzeichnis aus dem vorherigen Schritt. Beispiel:

    cd plugins
  2. Komprimieren Sie den Inhalt Ihres plugins Verzeichnisses.

    zip -r ../plugins.zip ./
  3. Laden Sie plugins.zip es in Ihren S3-Bucket hoch

    aws s3 cp plugins.zip s3://your-mwaa-bucket/

Codebeispiel

Um die Standardzeitzone (UTC+0) zu ändern, in der die DAG ausgeführt wird, verwenden wir eine Bibliothek namens Pendulum, eine Python-Bibliothek für die Arbeit mit zeitzonensensitiver Datetime.

  1. Navigieren Sie in der Befehlszeile zu dem Verzeichnis, in dem Sie gespeichert sind. DAGs Beispiel:

    cd dags
  2. Kopieren Sie den Inhalt des folgenden Beispiels und speichern Sie ihn untertz-aware-dag.py.

    from airflow import DAG from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta # Import the Pendulum library. import pendulum # Instantiate Pendulum and set your timezone. local_tz = pendulum.timezone("America/Los_Angeles") with DAG( dag_id = "tz_test", schedule_interval="0 12 * * *", catchup=False, start_date=datetime(2022, 1, 1, tzinfo=local_tz) ) as dag: bash_operator_task = BashOperator( task_id="tz_aware_task", dag=dag, bash_command="date" )
  3. Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus.

    aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Wenn der Vorgang erfolgreich ist, geben Sie in den Task-Logs für die in der tz_test DAG eine ähnliche Ausgabe wie folgt aus: tz_aware_task

    [2022-08-01, 12:00:00 PDT] {{subprocess.py:74}} INFO - Running command: ['bash', '-c', 'date']
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:85}} INFO - Output:
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:89}} INFO - Mon Aug  1 12:00:00 PDT 2022
    [2022-08-01, 12:00:00 PDT] {{subprocess.py:93}} INFO - Command exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=tz_test, task_id=tz_aware_task, execution_date=20220801T190033, start_date=20220801T190035, end_date=20220801T190035
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-08-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

Als nächstes