Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Exportieren von Umgebungsmetadaten in CSV-Dateien auf Amazon S3
Verwenden Sie das folgende Codebeispiel, um einen gerichteten azyklischen Graph (DAG) zu erstellen, der die Datenbank nach einer Reihe von DAG-Ausführungsinformationen abfragt und die Daten in .csv Dateien schreibt, die auf Amazon S3 gespeichert sind.
Möglicherweise möchten Sie Informationen aus der Aurora PostgreSQL-Datenbank Ihrer Umgebung exportieren, um die Daten lokal zu untersuchen, sie im Objektspeicher zu archivieren oder sie mit Tools wie dem Amazon S3 S3-to-Amazon-Redshift-Operator und der Datenbankbereinigung zu
Sie können die Datenbank nach allen Objekten abfragen, die in Apache Airflow Airflow-ModellenDagRun, verwendetTaskFail, die Informationen bereitstellenTaskInstance, die für DAG-Läufe relevant sind.
Version
Sie können das Codebeispiel auf dieser Seite mit Apache Airflow v2 in Python 3.10
Voraussetzungen
Um den Beispielcode auf dieser Seite zu verwenden, benötigen Sie Folgendes:
-
Eine Amazon MWAA-Umgebung.
-
Ein neuer Amazon S3 S3-Bucket, in den Sie Ihre Metadateninformationen exportieren möchten.
Berechtigungen
Amazon MWAA benötigt für die Amazon S3-Aktion die Erlaubnis, die abgefragten Metadateninformationen in Amazon S3 s3:PutObject zu schreiben. Fügen Sie der Ausführungsrolle Ihrer Umgebung die folgende Richtlinienerklärung hinzu.
{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::amzn-s3-demo-bucket" }
Diese Richtlinie beschränkt den Schreibzugriff nur aufamzn-s3-demo-bucket.
Voraussetzungen
Um dieses Codebeispiel mit Apache Airflow v2 und höher zu verwenden, sind keine zusätzlichen Abhängigkeiten erforderlich. Wird verwendet aws-mwaa-docker-images
Codebeispiel
In den folgenden Schritten wird beschrieben, wie Sie eine DAG erstellen können, die Aurora PostgreSQL abfragt und das Ergebnis in Ihren neuen Amazon S3 S3-Bucket schreibt.
-
Navigieren Sie in Ihrem Terminal zu dem Verzeichnis, in dem Ihr DAG-Code gespeichert ist. Zum Beispiel:
cd dags -
Kopieren Sie den Inhalt des folgenden Codebeispiels und speichern Sie ihn lokal unter
metadata_to_csv.py. Sie können den zugewiesenen Wert ändern,MAX_AGE_IN_DAYSum das Alter der ältesten Datensätze zu steuern, die Ihre DAG aus der Metadatendatenbank abfragt.from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db() -
Führen Sie den folgenden AWS CLI Befehl aus, um die DAG in den Bucket Ihrer Umgebung zu kopieren, und lösen Sie dann die DAG mithilfe der Apache Airflow Airflow-Benutzeroberfläche aus.
aws s3 cpyour-dag.py s3://your-environment-bucket/dags/ -
Wenn der Vorgang erfolgreich ist, geben Sie in den Aufgabenprotokollen für die Aufgabe eine Ausgabe ähnlich der
export_dbfolgenden aus:[2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [
your-tasks] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail : [your-tasks] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance : [your-tasks] [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule checkSie können jetzt auf die exportierten
.csvDateien in Ihrem neuen Amazon S3 S3-Bucket in zugreifen und sie herunterladen/files/export/.