

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

# Amazon S3의 CSV 파일로 환경 메타데이터 내보내기
<a name="samples-dag-run-info-to-csv"></a>

다음 코드 예제를 사용하여 다양한 DAG 실행 정보에 대해 데이터베이스를 쿼리하고 Amazon S3에 저장된 `.csv` 파일에 데이터를 쓰는 DAG(방향성 비순환 그래프)를 생성합니다.

데이터를 로컬에서 검사하거나, 이를 객체 스토리지에 보관하거나, [Amazon S3에서 Amazon Redshift 운영자](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) 및 [데이터베이스 정리](samples-database-cleanup.md)와 같은 도구와 결합하기 위해, Amazon MWAA 메타데이터를 환경 외부로 이동하고 향후 분석을 위해 보존하기 위해 사용자 환경의 Aurora PostgreSQL 데이터베이스에서 정보를 내보내고자 할 수 있습니다.

[Apache Airflow 모델](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models)에 나열된 모든 객체에 대해 데이터베이스를 쿼리할 수 있습니다. 이 코드 샘플은 DAG 실행과 관련된 정보를 제공하는 세 가지 모델인 `DagRun`, `TaskFail` 및 `TaskInstance`를 사용합니다.

**Topics**
+ [버전](#samples-dag-run-info-to-csv-version)
+ [사전 조건](#samples-dag-run-info-to-csv-prereqs)
+ [권한](#samples-dag-run-info-to-csv-permissions)
+ [요구 사항](#samples-dag-run-info-to-csv-dependencies)
+ [코드 샘플](#samples-dag-run-info-to-csv-code)

## 버전
<a name="samples-dag-run-info-to-csv-version"></a>

이 페이지의 코드 예제는 [Python 3.10](https://peps.python.org/pep-0619/)의 **Apache Airflow v2** 및 [Python 3.11](https://peps.python.org/pep-0664/)의 **Apache Airflow v3**에서 사용할 수 있습니다.

## 사전 조건
<a name="samples-dag-run-info-to-csv-prereqs"></a>

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.
+ [Amazon MWAA 환경](get-started.md).
+ 메타데이터 정보를 내보내고자 하는 [새 Amazon S3 버킷](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html).

## 권한
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA에는 쿼리된 메타데이터 정보를 Amazon S3에 쓰기 위한 Amazon S3 작업`s3:PutObject`에 대해 권한이 필요합니다. 다음 정책 문을 사용자 환경의 실행 역할에 추가합니다.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::{{amzn-s3-demo-bucket}}"
}
```

이 정책은 쓰기 액세스 권한을 {{amzn-s3-demo-bucket}}으로만 제한합니다.

## 요구 사항
<a name="samples-dag-run-info-to-csv-dependencies"></a>

이 코드 예제를 Apache Airflow v2 이상에 사용하려면 추가 종속성이 필요하지 않습니다. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)를 사용하여 Apache Airflow를 설치합니다.

## 코드 샘플
<a name="samples-dag-run-info-to-csv-code"></a>

다음 단계는 Aurora PostgreSQL을 쿼리하고 새 Amazon S3 버킷에 결과를 쓰는 DAG를 생성하는 방법을 설명합니다.

1. 터미널에서 DAG 코드가 저장된 디렉터리로 이동합니다. 예제:

   ```
   cd dags
   ```

1. 다음 코드 예제의 콘텐츠를 복사하고 로컬에서 `metadata_to_csv.py`로 저장합니다. 사용자 DAG가 메타데이터 데이터베이스에서 쿼리하는 가장 오래된 기록의 보존 기간을 제어하기 위해 `MAX_AGE_IN_DAYS`에 할당된 값을 변경할 수 있습니다.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  다음 AWS CLI 명령을 실행하여 DAG를 환경의 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

   ```
   aws s3 cp {{your-dag}}.py s3://{{your-environment-bucket}}/dags/
   ```

1. 성공적으로 실행되면 `export_db` 작업의 작업 로그에 다음과 유사한 출력이 표시됩니다.

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [{{your-tasks}}]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [{{your-tasks}}]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [{{your-tasks}}]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   이제 새 Amazon S3 버킷에서 내보낸 `.csv` 파일에 액세스하여 `/files/export/`에 다운로드할 수 있습니다.