Exportieren von Umgebungsmetadaten in CSV-Dateien auf Amazon S3 - Von Amazon verwaltete Workflows für Apache Airflow

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Exportieren von Umgebungsmetadaten in CSV-Dateien auf Amazon S3

Verwenden Sie das folgende Codebeispiel, um einen gerichteten azyklischen Graph (DAG) zu erstellen, der die Datenbank nach einer Reihe von DAG-Ausführungsinformationen abfragt und die Daten in .csv Dateien schreibt, die auf Amazon S3 gespeichert sind.

Möglicherweise möchten Sie Informationen aus der Aurora PostgreSQL-Datenbank Ihrer Umgebung exportieren, um die Daten lokal zu untersuchen, sie im Objektspeicher zu archivieren oder sie mit Tools wie dem Amazon S3 S3-to-Amazon-Redshift-Operator und der Datenbankbereinigung zu kombinieren, um Amazon MWAA-Metadaten aus der Umgebung zu verschieben, sie aber für future Analysen aufzubewahren.

Sie können die Datenbank nach allen Objekten abfragen, die in Apache Airflow Airflow-Modellen aufgeführt sind. In diesem Codebeispiel werden drei Modelle,, undDagRun, verwendetTaskFail, die Informationen bereitstellenTaskInstance, die für DAG-Läufe relevant sind.

Version

Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 in Python 3.10 und Apache Airflow v3 in Python 3.11 verwenden.

Voraussetzungen

Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:

Berechtigungen

Amazon MWAA benötigt für die Amazon S3-Aktion die Erlaubnis, die abgefragten Metadateninformationen in Amazon S3 s3:PutObject zu schreiben. Fügen Sie der Ausführungsrolle Ihrer Umgebung die folgende Richtlinienerklärung hinzu.

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::amzn-s3-demo-bucket" }

Diese Richtlinie beschränkt den Schreibzugriff nur aufamzn-s3-demo-bucket.

Voraussetzungen

Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet aws-mwaa-docker-images, um Apache Airflow zu installieren.

Codebeispiel

In den folgenden Schritten wird beschrieben, wie Sie eine DAG erstellen können, die Aurora PostgreSQL abfragt und das Ergebnis in Ihren neuen Amazon S3 S3-Bucket schreibt.

  1. Navigieren Sie in Ihrem Terminal zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Zum Beispiel:

    cd dags
  2. Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal untermetadata_to_csv.py. Sie können den zugewiesenen Wert ändern, MAX_AGE_IN_DAYS um das Alter der ältesten Datensätze zu steuern, die Ihre DAG aus der Metadatendatenbank abfragt.

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus.

    aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Wenn der Vorgang erfolgreich ist, geben Sie in den Aufgabenprotokollen für die Aufgabe eine Ausgabe ähnlich der export_db folgenden aus:

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    Sie können jetzt auf die exportierten .csv Dateien in Ihrem neuen Amazon S3 S3-Bucket in zugreifen und sie herunterladen/files/export/.