

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Exportation des métadonnées de l'environnement vers des fichiers CSV sur Amazon S3
<a name="samples-dag-run-info-to-csv"></a>

Utilisez l'exemple de code suivant pour créer un graphe acyclique dirigé (DAG) qui interroge la base de données pour obtenir une série d'informations d'exécution du DAG et écrit les données dans des `.csv` fichiers stockés sur Amazon S3.

Vous souhaiterez peut-être exporter des informations depuis la base de données Aurora PostgreSQL de votre environnement pour inspecter les données localement, les archiver dans un stockage d'objets ou les combiner avec des outils tels que l'opérateur Amazon [S3 vers Amazon Redshift](https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/operators/s3_to_redshift.html) et le [nettoyage de la base de données, afin de déplacer les](samples-database-cleanup.md) métadonnées Amazon MWAA hors de l'environnement, tout en les préservant pour une analyse future.

Vous pouvez interroger la base de données pour n'importe quel objet répertorié dans les modèles [Apache Airflow](https://github.com/apache/airflow/tree/v2-0-stable/airflow/models). Cet exemple de code utilise trois modèles,`DagRun`, et `TaskFail``TaskInstance`, qui fournissent des informations relatives aux exécutions de To DAG.

**Topics**
+ [Version](#samples-dag-run-info-to-csv-version)
+ [Conditions préalables](#samples-dag-run-info-to-csv-prereqs)
+ [Permissions](#samples-dag-run-info-to-csv-permissions)
+ [Exigences](#samples-dag-run-info-to-csv-dependencies)
+ [Exemple de code](#samples-dag-run-info-to-csv-code)

## Version
<a name="samples-dag-run-info-to-csv-version"></a>

[Vous pouvez utiliser l'exemple de code présenté sur cette page avec **Apache Airflow v2** en [Python 3.10](https://peps.python.org/pep-0619/) et **Apache Airflow v3** en Python 3.11.](https://peps.python.org/pep-0664/)

## Conditions préalables
<a name="samples-dag-run-info-to-csv-prereqs"></a>

Pour utiliser l'exemple de code présenté sur cette page, vous aurez besoin des éléments suivants :
+ Un [environnement Amazon MWAA.](get-started.md)
+ Un [nouveau compartiment Amazon S3](https://docs.aws.amazon.com/AmazonS3/latest/userguide/create-bucket-overview.html) dans lequel vous souhaitez exporter vos informations de métadonnées.

## Permissions
<a name="samples-dag-run-info-to-csv-permissions"></a>

Amazon MWAA a besoin d'une autorisation pour que l'action Amazon S3 puisse `s3:PutObject` écrire les informations de métadonnées demandées dans Amazon S3. Ajoutez la déclaration de politique suivante au rôle d'exécution de votre environnement.

```
{
  "Effect": "Allow",
  "Action": "s3:PutObject*",
  "Resource": "arn:aws:s3:::{{amzn-s3-demo-bucket}}"
}
```

Cette politique limite l'accès en écriture uniquement à{{amzn-s3-demo-bucket}}.

## Exigences
<a name="samples-dag-run-info-to-csv-dependencies"></a>

Pour utiliser cet exemple de code avec Apache Airflow v2 et versions ultérieures, aucune dépendance supplémentaire n'est requise. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)À utiliser pour installer Apache Airflow.

## Exemple de code
<a name="samples-dag-run-info-to-csv-code"></a>

Les étapes suivantes décrivent comment créer un DAG qui interroge Aurora PostgreSQL et écrit le résultat dans votre nouveau compartiment Amazon S3.

1. Dans votre terminal, accédez au répertoire dans lequel votre code DAG est enregistré. Par exemple :

   ```
   cd dags
   ```

1. Copiez le contenu de l'exemple de code suivant et enregistrez-le localement sous le nom de`metadata_to_csv.py`. Vous pouvez modifier la valeur attribuée `MAX_AGE_IN_DAYS` à pour contrôler l'âge des enregistrements les plus anciens que votre DAG interroge dans la base de données de métadonnées.

   ```
   from airflow.decorators import dag, task
   from airflow import settings
   import os
   import boto3
   from airflow.utils.dates import days_ago
   from airflow.models import DagRun, TaskFail, TaskInstance
   import csv, re
   from io import StringIO
   
   DAG_ID = os.path.basename(__file__).replace(".py", "")
   
   MAX_AGE_IN_DAYS = 30 
   S3_BUCKET = '<your-export-bucket>'
   S3_KEY = 'files/export/{0}.csv' 
   
   # You can add other objects to export from the metadatabase,
   OBJECTS_TO_EXPORT = [
       [DagRun,DagRun.execution_date], 
       [TaskFail,TaskFail.end_date], 
       [TaskInstance, TaskInstance.execution_date],
   ]
    
   @task()
   def export_db_task(**kwargs):
       session = settings.Session()
       print("session: ",str(session))
    
       oldest_date = days_ago(MAX_AGE_IN_DAYS)
       print("oldest_date: ",oldest_date)
   
       s3 = boto3.client('s3')
   
       for x in OBJECTS_TO_EXPORT:
           query = session.query(x[0]).filter(x[1] >= days_ago(MAX_AGE_IN_DAYS))
           print("type",type(query))
           allrows=query.all()
           name=re.sub("[<>']", "", str(x[0]))
           print(name,": ",str(allrows))
   
           if len(allrows) > 0:
               outfileStr=""
               f = StringIO(outfileStr)
               w = csv.DictWriter(f, vars(allrows[0]).keys())
               w.writeheader()
               for y in allrows:
                   w.writerow(vars(y))
               outkey = S3_KEY.format(name[6:])
               s3.put_object(Bucket=S3_BUCKET, Key=outkey, Body=f.getvalue())
    
   @dag(
       dag_id=DAG_ID,
       schedule_interval=None,
       start_date=days_ago(1),
       )
   def export_db():
       t = export_db_task()
   
   metadb_to_s3_test = export_db()
   ```

1.  Exécutez la AWS CLI commande suivante pour copier le DAG dans le bucket de votre environnement, puis déclenchez le DAG à l'aide de l'interface utilisateur d'Apache Airflow. 

   ```
   aws s3 cp {{your-dag}}.py s3://{{your-environment-bucket}}/dags/
   ```

1. En cas de réussite, vous obtiendrez un résultat similaire à ce qui suit dans les journaux des tâches associées à la `export_db` tâche :

   ```
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.dagrun.DagRun : [{{your-tasks}}]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskfail.TaskFail :  [{{your-tasks}}]
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - type <class 'sqlalchemy.orm.query.Query'>
   [2022-01-01, 12:00:00 PDT] {{logging_mixin.py:109}} INFO - class airflow.models.taskinstance.TaskInstance :  [{{your-tasks}}]
   [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: OK
   [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=metadb_to_s3, task_id=export_db, execution_date=20220101T000000, start_date=20220101T000000, end_date=20220101T000000
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
   [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check
   ```

   Vous pouvez désormais accéder aux `.csv` fichiers exportés et les télécharger dans votre nouveau compartiment Amazon S3 dans`/files/export/`.