將環境中繼資料匯出至 Amazon S3 上的 CSV 檔案 - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將環境中繼資料匯出至 Amazon S3 上的 CSV 檔案

下列程式碼範例示範如何建立導向無環圖 (DAG),查詢資料庫以取得各種 DAG 執行資訊,並將資料寫入 Amazon S3 上存放.csv的檔案。

您可能想要從環境的 Aurora PostgreSQL 資料庫匯出資訊,以便在本機檢查資料、將其封存在物件儲存中,或將其與 Amazon S3 等工具結合到 Amazon Redshift 運算子資料庫清除,以便將 Amazon MWAA 中繼資料移出環境,但保留以供未來分析。

您可以查詢資料庫,尋找 Apache Airflow 模型中列出的任何物件。此程式碼範例使用三種模型 、 DagRunTaskFailTaskInstance,可提供與 DAG 執行相關的資訊。

版本

  • 您可以在 Python 3.10 中使用此頁面上的程式碼範例搭配 Apache Airflow v2

先決條件

若要使用此頁面上的範例程式碼,您需要下列項目:

許可

Amazon MWAA 需要 Amazon S3 動作的許可s3:PutObject,才能將查詢的中繼資料資訊寫入 Amazon S3。將下列政策陳述式新增至您環境的執行角色。

{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::amzn-s3-demo-bucket" }

此政策僅限制對 amzn-s3-demo-bucket 的寫入存取。

要求

  • 若要搭配 Apache Airflow v2 使用此程式碼範例,不需要額外的相依性。此程式碼會在您的環境中使用 Apache Airflow v2 基本安裝

範例程式碼

下列步驟說明如何建立查詢 Aurora PostgreSQL 的 DAG,並將結果寫入新的 Amazon S3 儲存貯體。

  1. 在終端機中,導覽至存放 DAG 程式碼的目錄。例如:

    cd dags
  2. 複製下列程式碼範例的內容,並將其儲存為 metadata_to_csv.py。您可以變更指派給 的值MAX_AGE_IN_DAYS,以控制 DAG 從中繼資料資料庫查詢的最舊記錄的存留期。

    from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db()
  3. 執行下列 AWS CLI 命令,將 DAG 複製到您環境的儲存貯體,然後使用 Apache Airflow UI 觸發 DAG。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 如果成功,您將在任務的任務日誌中輸出類似以下內容export_db

    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
    [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [your-tasks]
    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    您現在可以在 的新 Amazon S3 儲存貯體中存取和下載匯出.csv的檔案/files/export/