本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將環境中繼資料匯出至 Amazon S3 上的 CSV 檔案
使用下列程式碼範例建立定向無環圖 (DAG),查詢資料庫以取得一系列 DAG 執行資訊,並將資料寫入 Amazon S3 上存放.csv的檔案。
您可能想要從環境的 Aurora PostgreSQL 資料庫匯出資訊,以在本機檢查資料、將其封存在物件儲存中,或將其與 Amazon S3 等工具結合到 Amazon Redshift 運算子
您可以查詢資料庫,尋找 Apache Airflow 模型DagRun、 TaskFail和 TaskInstance,可提供與 DAG 執行相關的資訊。
版本
您可以在 Python 3.10 中使用 Apache Airflow v2 和 Python 3.11
先決條件
若要使用此頁面上的範例程式碼,您需要下列項目:
-
您要匯出中繼資料資訊的新 Amazon S3 儲存貯體。
權限
Amazon MWAA 需要 Amazon S3 動作的許可s3:PutObject,才能將查詢的中繼資料資訊寫入 Amazon S3。將下列政策陳述式新增至您環境的執行角色。
{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::amzn-s3-demo-bucket" }
此政策僅限制對 amzn-s3-demo-bucket 的寫入存取。
要求
若要搭配 Apache Airflow v2 和更新版本使用此程式碼範例,不需要額外的相依性。使用 aws-mwaa-docker-images
範例程式碼
下列步驟說明如何建立查詢 Aurora PostgreSQL 的 DAG,並將結果寫入新的 Amazon S3 儲存貯體。
-
在終端機中,導覽至存放 DAG 程式碼的目錄。例如:
cd dags -
複製下列程式碼範例的內容,並將其儲存為
metadata_to_csv.py。您可以變更指派給 的值MAX_AGE_IN_DAYS,以控制 DAG 從中繼資料資料庫查詢的最舊記錄的存留期。from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db() -
執行下列 AWS CLI 命令,將 DAG 複製到您環境的儲存貯體,然後使用 Apache Airflow UI 觸發 DAG。
aws s3 cpyour-dag.py s3://your-environment-bucket/dags/ -
如果成功,您將輸出類似任務任務日誌中的下列
export_db內容:[2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [
your-tasks] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail : [your-tasks] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance : [your-tasks] [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check您現在可以在 的新 Amazon S3 儲存貯體中存取和下載匯出
.csv的檔案/files/export/。