Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Mengekspor metadata lingkungan ke file CSV di Amazon S3
Gunakan contoh kode berikut untuk membuat grafik asiklik terarah (DAG) yang menanyakan database untuk berbagai informasi yang dijalankan DAG, dan menulis data ke .csv file yang disimpan di Amazon S3.
Anda mungkin ingin mengekspor informasi dari database Aurora PostgreSQL lingkungan Anda untuk memeriksa data secara lokal, mengarsipkannya dalam penyimpanan objek, atau menggabungkannya dengan alat seperti Amazon S3 ke Amazon Redshift operator dan pembersihan database, untuk memindahkan metadata Amazon MWAA
Anda dapat menanyakan database untuk salah satu objek yang tercantum dalam model Apache AirflowDagRun,TaskFail, danTaskInstance, yang memberikan informasi yang relevan dengan DAG run.
Versi
Anda dapat menggunakan contoh kode pada halaman ini dengan Apache Airflow v2 di Python 3.10 dan Apache Airflowv3 di Python 3.11
Prasyarat
Untuk menggunakan kode sampel di halaman ini, Anda memerlukan yang berikut:
-
Lingkungan Amazon MWAA.
-
Bucket Amazon S3 baru tempat Anda ingin mengekspor informasi metadata Anda.
Izin
Amazon MWAA memerlukan izin untuk s3:PutObject tindakan Amazon S3 untuk menulis informasi metadata yang ditanyakan ke Amazon S3. Tambahkan pernyataan kebijakan berikut ke peran eksekusi lingkungan Anda.
{ "Effect": "Allow", "Action": "s3:PutObject*", "Resource": "arn:aws:s3:::amzn-s3-demo-bucket" }
Kebijakan ini membatasi akses tulis sajaamzn-s3-demo-bucket.
Persyaratan
Untuk menggunakan contoh kode ini dengan Apache Airflow v2 dan yang lebih baru, tidak diperlukan dependensi tambahan. Gunakan aws-mwaa-docker-images
Contoh kode
Langkah-langkah berikut menjelaskan bagaimana Anda dapat membuat DAG yang menanyakan Aurora PostgreSQL dan menulis hasilnya ke bucket Amazon S3 baru Anda.
-
Di terminal Anda, arahkan ke direktori tempat kode DAG Anda disimpan. Misalnya:
cd dags -
Salin isi contoh kode berikut dan simpan secara lokal sebagai
metadata_to_csv.py. Anda dapat mengubah nilai yang ditetapkanMAX_AGE_IN_DAYSuntuk mengontrol usia rekaman terlama kueri DAG Anda dari database metadata.from airflow.decorators import dag, task from airflow import settings import os import boto3 from airflow.utils.dates import days_ago from airflow.models import DagRun, TaskFail, TaskInstance import csv, re from io import StringIO DAG_ID = os.path.basename(__file__).replace(".py", "") MAX_AGE_IN_DAYS = 30 S3_BUCKET = '<your-export-bucket>' S3_KEY = 'files/export/{0}.csv' # You can add other objects to export from the metadatabase, OBJECTS_TO_EXPORT = [ [DagRun,DagRun.execution_date], [TaskFail,TaskFail.execution_date], [TaskInstance, TaskInstance.execution_date], ] @task() def export_db_task(**kwargs): session = settings.Session() print("session: ",str(session)) oldest_date = days_ago(MAX_AGE_IN_DAYS) print("oldest_date: ",oldest_date) s3 = boto3.client('s3') for x in OBJECTS_TO_EXPORT: query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS)) print("type",type(query)) allrows=query.all() name=re.sub("[<>']", "", str(x[0])) print(name,": ",str(allrows)) if len(allrows) > 0: outfileStr="" f = StringIO(outfileStr) w = csv.DictWriter(f, vars(allrows[0]).keys()) w.writeheader() for y in allrows: w.writerow(vars(y)) outkey = S3_KEY.format(name[6:]) s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue()) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=days_ago(1), ) def export_db(): t = export_db_task() metadb_to_s3_test = export_db() -
Jalankan AWS CLI perintah berikut untuk menyalin DAG ke bucket lingkungan Anda, lalu picu DAG menggunakan Apache Airflow UI.
aws s3 cpyour-dag.py s3://your-environment-bucket/dags/ -
Jika berhasil, Anda akan menampilkan yang serupa dengan yang berikut di log tugas untuk
export_dbtugas:[2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [
your-tasks] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail : [your-tasks] [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'> [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance : [your-tasks] [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0 [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule checkAnda sekarang dapat mengakses dan mengunduh
.csvfile yang diekspor di bucket Amazon S3 baru Anda./files/export/