Limpeza do banco de dados do Aurora PostgreSQL em um ambiente do Amazon MWAA
A solução Workflows gerenciados pela Amazon para Apache Airflow usa um banco de dados Aurora PostgreSQL como o banco de dados de metadados do Apache Airflow, onde o DAG é executado e as instâncias das tarefas são armazenadas. O código de exemplo a seguir limpa periodicamente as entradas do banco de dados Aurora PostgreSQL dedicado para seu ambiente Amazon MWAA.
Versão
Os exemplos de código nesta página são específicos do Apache Airflow v2 compatível com o Amazon MWAA. Consulte as versões compatíveis do Apache Airflow.
Para usuários do Apache Airflow v3: se você quiser limpar um banco de dados (limpar registros antigos das tabelas do metastore), execute o comando da CLI db clean.
Pré-requisitos
Para usar o código de amostra nesta página, você precisará do seguinte:
Dependências
Para usar esse exemplo de código com o Apache Airflow v2, nenhuma dependência adicional é necessária. Use aws-mwaa-docker-images para instalar o Apache Airflow.
Exemplo de código
O DAG a seguir limpa o banco de dados de metadados das tabelas especificadas em TABLES_TO_CLEAN. O exemplo exclui dados das tabelas especificadas com mais de 30 dias. Para ajustar até que ponto as entradas são excluídas, defina MAX_AGE_IN_DAYS para um outro valor.
- Apache Airflow v2.4 to 2.10.3
-
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import datetime, timedelta
# Note: Database commands might time out if running longer than 5 minutes. If this occurs, please increase the MAX_AGE_IN_DAYS (or change
# timestamp parameter to an earlier date) for initial runs, then reduce on subsequent runs until the desired retention is met.
MAX_AGE_IN_DAYS = 30
# To clean specific tables, please provide a comma-separated list per
# https://airflow.apache.org/docs/apache-airflow/stable/cli-and-env-variables-ref.html#clean
# A value of None will clean all tables
TABLES_TO_CLEAN = None
with DAG(
dag_id="clean_db_dag",
schedule_interval=None,
catchup=False,
start_date=days_ago(1),
params={
"timestamp": Param(
default=(datetime.now()-timedelta(days=MAX_AGE_IN_DAYS)).strftime("%Y-%m-%d %H:%M:%S"),
type="string",
minLength=1,
maxLength=255,
),
}
) as dag:
if TABLES_TO_CLEAN:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --tables '"+TABLES_TO_CLEAN+"' --skip-archive --yes"
else:
bash_command="airflow db clean --clean-before-timestamp '{{ params.timestamp }}' --skip-archive --yes"
cli_command = BashOperator(
task_id="bash_command",
bash_command=bash_command
)
- Apache Airflow v2.2 and earlier
-
from airflow import settings
from airflow.utils.dates import days_ago
from airflow.models import DagTag, DagModel, DagRun, ImportError, Log, SlaMiss, RenderedTaskInstanceFields, TaskInstance, TaskReschedule, XCom
from airflow.decorators import dag, task
from airflow.utils.dates import days_ago
from time import sleep
from airflow.version import version
major_version, minor_version = int(version.split('.')[0]), int(version.split('.')[1])
if major_version >= 2 and minor_version >= 6:
from airflow.jobs.job import Job
else:
# The BaseJob class was renamed as of Apache Airflow v2.6
from airflow.jobs.base_job import BaseJob as Job
# Delete entries for the past 30 days. Adjust MAX_AGE_IN_DAYS to set how far back this DAG cleans the database.
MAX_AGE_IN_DAYS = 30
MIN_AGE_IN_DAYS = 0
DECREMENT = -7
# This is a list of (table, time) tuples.
# table = the table to clean in the metadata database
# time = the column in the table associated to the timestamp of an entry
# or None if not applicable.
TABLES_TO_CLEAN = [[Job, Job.latest_heartbeat],
[TaskInstance, TaskInstance.execution_date],
[TaskReschedule, TaskReschedule.execution_date],
[DagTag, None],
[DagModel, DagModel.last_parsed_time],
[DagRun, DagRun.execution_date],
[ImportError, ImportError.timestamp],
[Log, Log.dttm],
[SlaMiss, SlaMiss.execution_date],
[RenderedTaskInstanceFields, RenderedTaskInstanceFields.execution_date],
[XCom, XCom.execution_date],
]
@task()
def cleanup_db_fn(x):
session = settings.Session()
if x[1]:
for oldest_days_ago in range(MAX_AGE_IN_DAYS, MIN_AGE_IN_DAYS, DECREMENT):
earliest_days_ago = max(oldest_days_ago + DECREMENT, MIN_AGE_IN_DAYS)
print(f"deleting {str(x[0])} entries between {earliest_days_ago} and {oldest_days_ago} days old...")
earliest_date = days_ago(earliest_days_ago)
oldest_date = days_ago(oldest_days_ago)
query = session.query(x[0]).filter(x[1] >= earliest_date).filter(x[1] <= oldest_date)
query.delete(synchronize_session= False)
session.commit()
sleep(5)
else:
# No time column specified for the table. Delete all entries
print("deleting", str(x[0]), "...")
query = session.query(x[0])
query.delete(synchronize_session= False)
session.commit()
session.close()
@dag(
dag_id="cleanup_db",
schedule_interval="@weekly",
start_date=days_ago(7),
catchup=False,
is_paused_upon_creation=False
)
def clean_db_dag_fn():
t_last=None
for x in TABLES_TO_CLEAN:
t=cleanup_db_fn(x)
if t_last:
t_last >> t
t_last = t
clean_db_dag = clean_db_dag_fn()