

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Limpieza de bases de datos de Aurora PostgreSQL en un entorno de Amazon MWAA
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows para Apache Airflow utiliza una base de datos Aurora PostgreSQL como base de datos de metadatos de Apache Airflow, donde se ejecuta el DAG y se almacenan las instancias de tareas. La siguiente muestra de código borra periódicamente las entradas de la base de datos Aurora PostgreSQL dedicada a su entorno Amazon MWAA.

**Topics**
+ [Versión](#samples-database-cleanup-version)
+ [Requisitos previos](#samples-database-cleanup-prereqs)
+ [Dependencias](#samples-sql-server-dependencies)
+ [Código de ejemplo](#samples-database-cleanup-code)

## Versión
<a name="samples-database-cleanup-version"></a>

Los ejemplos de código de esta página son específicos de Apache Airflow v2, compatible con Amazon MWAA. Consulte las versiones de [Apache Airflow compatibles](airflow-versions.md).

**sugerencia**  
**Para los usuarios de Apache Airflow v3**: si desea limpiar una base de datos (purgar los registros antiguos de las tablas del almacén de metadatos), ejecute el comando `db clean` en la CLI.

## Requisitos previos
<a name="samples-database-cleanup-prereqs"></a>

Para usar el código de muestra de esta página, necesitará lo siguiente:
+ Un [entorno de Amazon MWAA](get-started.md).

## Dependencias
<a name="samples-sql-server-dependencies"></a>

Para usar este código de ejemplo con Apache Airflow v2, no se necesitan dependencias adicionales. [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)Utilícelo para instalar Apache Airflow.

## Código de ejemplo
<a name="samples-database-cleanup-code"></a>

El siguiente DAG limpia la base de datos de metadatos de las tablas especificadas en `TABLES_TO_CLEAN`. En el ejemplo, se eliminan los datos de las tablas especificadas con más de 30 días de antigüedad. Para ajustar el tiempo de eliminación de las entradas, establezca `MAX_AGE_IN_DAYS` en un valor diferente.

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------