

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Pulizia del database Aurora PostgreSQL in un ambiente Amazon MWAA
<a name="samples-database-cleanup"></a>

Amazon Managed Workflows for Apache Airflow utilizza un database Aurora PostgreSQL come database di metadati Apache Airflow, in cui vengono eseguite le operazioni DAG e vengono archiviate le istanze delle attività. Il seguente codice di esempio cancella periodicamente le voci dal database Aurora PostgreSQL dedicato per il tuo ambiente Amazon MWAA.

**Topics**
+ [Versione](#samples-database-cleanup-version)
+ [Prerequisiti](#samples-database-cleanup-prereqs)
+ [Dipendenze](#samples-sql-server-dependencies)
+ [Esempio di codice](#samples-database-cleanup-code)

## Versione
<a name="samples-database-cleanup-version"></a>

Gli esempi di codice in questa pagina sono specifici di Apache Airflow v2 supportato su Amazon MWAA. Fai riferimento alle versioni di [Apache Airflow supportate](airflow-versions.md).

**Suggerimento**  
**Per gli utenti di Apache Airflow v3**: se desideri pulire un database (eliminare i vecchi record dalle tabelle dei metastore), esegui il comando CLI. `db clean`

## Prerequisiti
<a name="samples-database-cleanup-prereqs"></a>

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:
+ Un ambiente [Amazon MWAA](get-started.md).

## Dipendenze
<a name="samples-sql-server-dependencies"></a>

Per utilizzare questo esempio di codice con Apache Airflow v2, non sono necessarie dipendenze aggiuntive. Utilizzare [aws-mwaa-docker-images](https://github.com/aws/amazon-mwaa-docker-images)per installare Apache Airflow.

## Esempio di codice
<a name="samples-database-cleanup-code"></a>

Il seguente DAG pulisce il database dei metadati per le tabelle specificate in. `TABLES_TO_CLEAN` L'esempio elimina i dati dalle tabelle specificate che risalgono a più di 30 giorni. Per modificare la data di eliminazione delle voci, `MAX_AGE_IN_DAYS` impostate un valore diverso.

------
#### [ Apache Airflow v2.4 to 2.10.3 ]

```
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

from datetime import datetime, timedelta

# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change 
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.

MAX_AGE_IN_DAYS = 30

# To clean specific tables, please provide a comma-separated list per 
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables

TABLES_TO_CLEAN = None

with DAG(
    dag_id="clean_db_dag", 
    schedule_interval=None, 
    catchup=False, 
    start_date=days_ago(1),
    params={
        "timestamp": Param(
            default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
            type="string",
            minLength=1,
            maxLength=255,
        ),     
    }   
) as dag:
    if TABLES_TO_CLEAN:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
    else:
        bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"

    cli_command = BashOperator(
        task_id="bash_command",
        bash_command=bash_command
    )
```

------
#### [ Apache Airflow v2.2 and earlier ]

```
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep

from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
    from airflow.jobs.job import Job
else:
    # The BaseJob class was renamed as of Apache Airflow v2.6
    from airflow.jobs.base_job import BaseJob as Job

# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7

# This is a list of (table, time) tuples. 
# table = the table to clean in the metadata database
# time  = the column in the table associated to the timestamp of an entry
#         or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
    [TaskInstance, TaskInstance.execution_date],
    [TaskReschedule, TaskReschedule.execution_date],
    [DagTag, None], 
    [DagModel, DagModel.last_parsed_time], 
    [DagRun, DagRun.execution_date], 
    [ImportError, ImportError.timestamp],
    [Log, Log.dttm], 
    [SlaMiss, SlaMiss.execution_date], 
    [RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date], 
    [XCom, XCom.execution_date],     
]

@task()
def cleanup_db_fn(x):
    session = settings.Session()

    if x[1]:
        for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
            earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
            print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
            earliest_date = days_ago(earliest_days_ago)
            oldest_date = days_ago(oldest_days_ago)
            query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
            query.delete(synchronize_session= False)
            session.commit()
            sleep(5)
    else:
        # No time column specified for the table. Delete all entries
        print("deleting", str(x[0]), "...")
        query = session.query(x[0])
        query.delete(synchronize_session= False)
        session.commit()
    
    session.close()

 
@dag(
    dag_id="cleanup_db",
    schedule_interval="@weekly",
    start_date=days_ago(7),
    catchup=False,
    is_paused_upon_creation=False
)

def clean_db_dag_fn():
    t_last=None
    for x in TABLES_TO_CLEAN:
        t=cleanup_db_fn(x)
        if t_last:
            t_last >> t
        t_last = t

clean_db_dag = clean_db_dag_fn()
```

------