Ajuste del desempeño de Apache Airflow en Amazon MWAA - Amazon Managed Workflows para Apache Airflow

Ajuste del desempeño de Apache Airflow en Amazon MWAA

En esta página, se describe cómo ajustar el rendimiento de un entorno de Amazon Managed Workflows para Apache Airflow mediante Uso de las opciones de configuración de Apache Airflow en Amazon MWAA.

Adición de una opción de configuración de Apache Airflow

Siga los siguientes pasos para agregar una opción de configuración de Apache Airflow a su entorno.

  1. Abra la página Entornos en la consola de Amazon MWAA.

  2. Seleccione un entorno.

  3. Elija Editar.

  4. Elija Siguiente.

  5. Seleccione Agregar configuración personalizada en el panel Opciones de configuración de Airflow.

  6. En la lista desplegable, elija una opción de configuración e introduzca un valor. También puede escribir una configuración personalizada e introducir un valor.

  7. Seleccione Agregar configuración personalizada para cada configuración que desee agregar.

  8. Seleccione Save.

Consulte Uso de las opciones de configuración de Apache Airflow en Amazon MWAA para obtener más información.

Programador de Apache Airflow

El programador de Apache Airflow es un componente básico de Apache Airflow. Si hay algún problema en el programador, es posible que no se analicen los DAG ni se programen las tareas. Para obtener más información sobre cómo ajustar el programador de Apache Airflow, consulte cómo ajustar el funcionamiento del programador en el sitio web de documentación de Apache Airflow.

Parameters

En esta sección, se describen las opciones de configuración disponibles para el programador de Apache Airflow (Apache Airflow v2 y versiones posteriores) y sus casos de uso.

Apache Airflow v3
Configuración Caso de uso

celery.sync_parallelism

Cantidad de procesos que utiliza el ejecutor Celery para sincronizar el estado de las tareas.

Predeterminado: 1

Esta opción puede utilizarse para evitar conflictos en las colas ya que limita los procesos que utiliza el ejecutor Celery. El valor 1 se utiliza de forma predeterminada para evitar errores al entregar los registros de tareas a los registros de CloudWatch. Si se emplea el valor 0, se estará usando la cantidad máxima de procesos, lo que podría provocar errores al entregar los registros de tareas.

scheduler.scheduler_idle_sleep_time

Cantidad de segundos que debe esperar entre procesamientos consecutivos de archivos DAG en el “bucle” del programador.

Predeterminado: 1

Se puede usar esta opción para reducir el uso de la CPU por parte del programador aumentando el tiempo de inactividad del programador una vez que haya terminado de recuperar los resultados del análisis de los DAG, de buscar las tareas y ponerlas en cola y de ejecutar las tareas en cola en el ejecutor. Al aumentar este valor, se consume la cantidad de hilos del programador que se ejecutan en un entorno en dag_processor.parsing_processes en el caso de Apache Airflow v2 y v3. Esto podría reducir la capacidad de los programadores de analizar los DAG y aumentar el tiempo que tardan los DAG en propagarse en el servidor web.

scheduler.max_dagruns_to_create_per_loop

Cantidad máxima de DAG por “bucle” del programador para los que se pueden crear DagRuns.

Valor predeterminado: 10

Se puede usar esta opción para liberar recursos destinados a la programación de tareas reduciendo la cantidad máxima de DagRuns para el “bucle” del programador.

dag_processor.parsing_processes

Cantidad de hilos que el programador puede ejecutar en paralelo para programar los DAG.

De forma predeterminada: use (2 * number of vCPUs) - 1

Se puede usar esta opción para liberar recursos reduciendo la cantidad de procesos que el programador ejecuta en paralelo para analizar los DAG. Recomendamos utilizar una cantidad baja si el análisis de los DAG afecta a la programación de tareas. Debe especificar un valor inferior al recuento de la CPU virtual (vCPU) de su entorno. Consulte los límites para obtener más información.

Apache Airflow v2
Configuración Caso de uso

celery.sync_parallelism

Cantidad de procesos que utiliza el ejecutor Celery para sincronizar el estado de las tareas.

Predeterminado: 1

Esta opción puede utilizarse para evitar conflictos en las colas ya que limita los procesos que utiliza el ejecutor Celery. El valor 1 se utiliza de forma predeterminada para evitar errores al entregar los registros de tareas a los registros de CloudWatch. Si se emplea el valor 0, se estará usando la cantidad máxima de procesos, lo que podría provocar errores al entregar los registros de tareas.

scheduler.idle_sleep_time

Cantidad de segundos que debe esperar entre procesamientos consecutivos de archivos DAG en el “bucle” del programador.

Predeterminado: 1

Se puede usar esta opción para reducir el uso de la CPU por parte del programador aumentando el tiempo de inactividad del programador una vez que haya terminado de recuperar los resultados del análisis de los DAG, de buscar las tareas y ponerlas en cola y de ejecutar las tareas en cola en el ejecutor. Al aumentar este valor, se consume la cantidad de hilos del programador que se ejecutan en un entorno en scheduler.parsing_processes en el caso de Apache Airflow v2 y v3. Esto podría reducir la capacidad de los programadores de analizar los DAG y aumentar el tiempo que tardan los DAG en propagarse en el servidor web.

scheduler.max_dagruns_to_create_per_loop

Cantidad máxima de DAG por “bucle” del programador para los que se pueden crear DagRuns.

Valor predeterminado: 10

Se puede usar esta opción para liberar recursos destinados a la programación de tareas reduciendo la cantidad máxima de DagRuns para el “bucle” del programador.

scheduler.parsing_processes

Cantidad de hilos que el programador puede ejecutar en paralelo para programar los DAG.

De forma predeterminada: use (2 * number of vCPUs) - 1

Se puede usar esta opción para liberar recursos reduciendo la cantidad de procesos que el programador ejecuta en paralelo para analizar los DAG. Recomendamos utilizar una cantidad baja si el análisis de los DAG afecta a la programación de tareas. Debe especificar un valor inferior al recuento de la CPU virtual (vCPU) de su entorno. Consulte los límites para obtener más información.

Límites

En esta sección, se describen los límites que debe tener en cuenta al ajustar los parámetros predeterminados del programador.

scheduler.parsing_processes, scheduler.max_threads (solo para la versión 2)

Se permiten dos hilos por vCPU en cada clase de entorno. Reserve al menos un hilo para el programador en cada clase de entorno. Si nota un retraso en la programación de las tareas, es posible que deba aumentar su clase de entorno. Por ejemplo, un entorno grande tendrá una instancia de contenedor de Fargate de 4 vCPU para el programador. Esto significa que hay un máximo de 7 hilos disponibles en total para que los utilicen otros procesos. Es decir: hay dos hilos para cada una de las cuatro vCPU, menos uno que es para el programador en sí. Tal como se muestra a continuación, el valor que especifique en scheduler.max_threads (solo para la versión 2) y en scheduler.parsing_processes no debe ser superior a la cantidad de hilos que tenga disponibles en una clase de entorno.

  • mw1.small: no debe destinarse más de 1 hilo al resto de procesos. El hilo restante debe reservarse para el programador.

  • mw1.medium: no deben destinarse más de 3 hilos al resto de procesos. El hilo restante debe reservarse para el programador.

  • mw1.large: no deben destinarse más de 7 hilos al resto de procesos. El hilo restante debe reservarse para el programador.

Carpetas de los DAG

El programador de Apache Airflow analiza de forma continua la carpeta de los DAG de su entorno. en busca de cualquier archivo que contenga plugins.zip, o cualquier archivo Python (.py) que contenga en sus instrucciones de importación la palabra “airflow”. Los objetos DAG en Python que se creen se incluirán en una DagBag para que el programador pueda procesarla a fin de determinar qué tareas deben programarse, en caso necesario. El análisis de los archivos DAG se realiza independientemente de si los archivos contienen objetos DAG viables o no.

Parameters

En esta sección, se describen las opciones de configuración disponibles para la carpeta de los DAG (Apache Airflow v2 y versiones posteriores) y sus casos de uso.

Apache Airflow v3
Configuración Caso de uso

dag_processor.refresh_interval

Cantidad de segundos durante los cuales debe escanearse la carpeta de los DAG en busca de archivos nuevos.

Valor predeterminado: 300 segundos

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de segundos destinados a analizar la carpeta de los DAG. Recomendamos aumentar este valor si observa que los tiempos de análisis en total_parse_time metrics son muy largos; esto puede deberse a que hay una gran cantidad de archivos en su carpeta de DAG.

dag_processor.min_file_process_interval

Cantidad de segundos que transcurren desde que el programador analiza un DAG hasta que se reflejan las actualizaciones del mismo.

Valor predeterminado: 30 segundos

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de segundos que el programador espera antes de analizar un DAG. Por ejemplo, si especifica un valor de 30, el archivo DAG se analizará cada 30 segundos. Recomendamos mantener elevado dicho valor para reducir el uso de la CPU en su entorno.

Apache Airflow v2
Configuración Caso de uso

scheduler.dag_dir_list_interval

Cantidad de segundos durante los cuales debe escanearse la carpeta de los DAG en busca de archivos nuevos.

Valor predeterminado: 300 segundos

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de segundos destinados a analizar la carpeta de los DAG. Recomendamos aumentar este valor si observa que los tiempos de análisis en total_parse_time metrics son muy largos; esto puede deberse a que hay una gran cantidad de archivos en su carpeta de DAG.

scheduler.min_file_process_interval

Cantidad de segundos que transcurren desde que el programador analiza un DAG hasta que se reflejan las actualizaciones del mismo.

Valor predeterminado: 30 segundos

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de segundos que el programador espera antes de analizar un DAG. Por ejemplo, si especifica un valor de 30, el archivo DAG se analizará cada 30 segundos. Recomendamos mantener elevado dicho valor para reducir el uso de la CPU en su entorno.

Archivos DAG

Como parte del bucle del programador de Apache Airflow, los archivos DAG individuales se analizan para extraer los objetos DAG en Python. En Apache Airflow v2 y versiones posteriores, el programador analiza simultáneamente un número máximo de procesos de análisis. Para que se vuelva a analizar el mismo archivo, deben transcurrir primero los segundos especificados en scheduler.min_file_process_interval (versión 2) o en dag_processor.min_file_process_interval (versión 3).

Parameters

En esta sección, se describen las opciones de configuración disponibles para los archivos DAG de Apache Airflow (Apache Airflow v2 y versiones posteriores) y sus casos de uso.

Apache Airflow v3
Configuración Caso de uso

dag_processor.dag_file_processor_timeout

Cantidad de segundos que transcurren antes de que el DAGFileProcessor interrumpa el procesamiento de un archivo DAG.

Valor predeterminado: 50 segundos

Esta opción puede utilizarse para liberar recursos aumentando el tiempo que transcurre hasta la interrupción del DAGFileProcessor. Le recomendamos que aumente este valor si observa interrupciones en los registros de procesamiento de los DAG que impiden que se carguen DAG viables.

core.dagbag_import_timeout

Cantidad segundos que transcurren antes de que se interrumpe la importación de un archivo en Python.

Valor predeterminado: 30 segundos

Se puede usar esta opción para liberar recursos aumentando el tiempo que transcurre hasta que el programador interrumpe la importación de un archivo en Python para extraer los objetos DAG. Esta opción se procesa como parte del “bucle” del programador y debe incluir un valor inferior al especificado en dag_processor.dag_file_processor_timeout.

core.min_serialized_dag_update_interval

Cantidad mínima de segundos que transcurren hasta que se actualizan los DAG serializados en la base de datos.

Valor predeterminado: 30

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de segundos que deben transcurrir hasta que se actualizan los DAG serializados en la base de datos. Recomendamos aumentar este valor si dispone de una gran cantidad de DAG o DAG complejos. Al aumentar este valor, se reduce la carga del programador y de la base de datos, ya que los DAG se serializan.

core.min_serialized_dag_fetch_interval

Cantidad de segundos que se tarda en volver a extraer un DAG serializado de la base de datos una vez que ya se ha cargado en la DagBag.

Valor predeterminado: 10

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de segundos que se tarda en volver a extraer un DAG serializado. El valor debe ser superior al valor especificado en core.min_serialized_dag_update_interval para reducir las tasas de “escritura” de la base de datos. Al aumentar este valor, se reduce la carga del servidor web y de la base de datos, ya que los DAG se serializan.

Apache Airflow v2
Configuración Caso de uso

core.dag_file_processor_timeout

Cantidad de segundos que transcurren antes de que el DAGFileProcessor interrumpa el procesamiento de un archivo DAG.

Valor predeterminado: 50 segundos

Esta opción puede utilizarse para liberar recursos aumentando el tiempo que transcurre hasta la interrupción del DAGFileProcessor. Le recomendamos que aumente este valor si observa interrupciones en los registros de procesamiento de los DAG que impiden que se carguen DAG viables.

core.dagbag_import_timeout

Cantidad segundos que transcurren antes de que se interrumpe la importación de un archivo en Python.

Valor predeterminado: 30 segundos

Se puede usar esta opción para liberar recursos aumentando el tiempo que transcurre hasta que el programador interrumpe la importación de un archivo en Python para extraer los objetos DAG. Esta opción se procesa como parte del “bucle” del programador y debe incluir un valor inferior al especificado en core.dag_file_processor_timeout.

core.min_serialized_dag_update_interval

Cantidad mínima de segundos que transcurren hasta que se actualizan los DAG serializados en la base de datos.

Valor predeterminado: 30

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de segundos que deben transcurrir hasta que se actualizan los DAG serializados en la base de datos. Recomendamos aumentar este valor si dispone de una gran cantidad de DAG o DAG complejos. Al aumentar este valor, se reduce la carga del programador y de la base de datos, ya que los DAG se serializan.

core.min_serialized_dag_fetch_interval

Cantidad de segundos que se tarda en volver a extraer un DAG serializado de la base de datos una vez que ya se ha cargado en la DagBag.

Valor predeterminado: 10

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de segundos que se tarda en volver a extraer un DAG serializado. El valor debe ser superior al valor especificado en core.min_serialized_dag_update_interval para reducir las tasas de “escritura” de la base de datos. Al aumentar este valor, se reduce la carga del servidor web y de la base de datos, ya que los DAG se serializan.

Tareas

Tanto el programador como los procesos de trabajo de Apache Airflow intervienen en la puesta de tareas en la cola y en su retirada de la misma. El programador toma las tareas analizadas que ya están a punto para ser programadas y modifica su estado de Ninguno a Programado. El ejecutor, que también se ejecuta en el contenedor del programador de Fargate, pone esas tareas en cola y cambia su estado a En cola. Cuando los procesos de trabajo tengan capacidad para ello, tomarán la tarea de la cola y cambiarán su estado a En ejecución, que posteriormente cambiará a Correcto o Error en función de si se ha logrado llevar a cabo la tarea correctamente o si algo ha fallado.

Parameters

En esta sección se describen las opciones de configuración disponibles para las tareas de Apache Airflow y sus casos de uso.

Las opciones de configuración predeterminadas que Amazon MWAA anula están marcadas en rojo.

Apache Airflow v3
Configuración Caso de uso

core.parallelism

Cantidad máxima de instancias de tareas que pueden tener el estado Running.

Valor predeterminado: Se establece dinámicamente en función de (maxWorkers * maxCeleryWorkers) / schedulers * 1.5.

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de instancias de tareas que se pueden ejecutar simultáneamente. El valor que se especifique debe ser igual al número de procesos de trabajo disponibles por la densidad de tareas de los procesos de trabajo. Recomendamos cambiar este valor únicamente cuando haya una gran cantidad de tareas atascadas en los estados de ejecución o de cola.

core.execute_tasks_new_python_interpreter

Determina si Apache Airflow ejecuta tareas mediante la bifurcación del proceso principal o la creación de un nuevo proceso de Python.

Valor predeterminado: True

Cuando este parámetro esté establecido en True, Apache Airflow reconocerá los cambios que realice en sus complementos como un nuevo proceso en Python creado para ejecutar tareas.

celery.worker_concurrency

Amazon MWAA anula la instalación base de Airflow para que esta opción escale procesos de trabajo como parte de su componente de escalado automático.

Valor predeterminado: No corresponde

Se pasa por alto cualquier valor especificado para esta opción.

celery.worker_autoscale

Simultaneidad de tareas de los procesos de trabajo.

Valores predeterminados:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge - 40,0

  • mw1.2xlarge - 80,0

Se puede usar esta opción para liberar recursos reduciendo la simultaneidad minimum y maximum de tareas de los procesos de trabajo. Los procesos de trabajo aceptarán el maximum de tareas simultáneas configuradas, independientemente de si hay recursos suficientes para realizarlas. Si las tareas se programan sin que haya recursos suficientes, se producirá un error de inmediato. Recomendamos cambiar este valor en el caso de que las tareas consuman muchos recursos; reduzca los valores por debajo de los valores predeterminados para que haya una mayor capacidad por tarea.

Apache Airflow v2
Configuración Caso de uso

core.parallelism

Cantidad máxima de instancias de tareas que pueden tener el estado Running.

Valor predeterminado: Se establece dinámicamente en función de (maxWorkers * maxCeleryWorkers) / schedulers * 1.5.

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de instancias de tareas que se pueden ejecutar simultáneamente. El valor que se especifique debe ser igual al número de procesos de trabajo disponibles por la densidad de tareas de los procesos de trabajo. Recomendamos cambiar este valor únicamente cuando haya una gran cantidad de tareas atascadas en los estados de ejecución o de cola.

core.dag_concurrency

Cantidad de instancias de tareas que se pueden ejecutar simultáneamente para cada DAG.

Predeterminado: 10000

Esta opción puede utilizarse para liberar recursos aumentando la cantidad de instancias de tareas que pueden ejecutarse simultáneamente. Por ejemplo, si tiene cien DAG con diez tareas paralelas y quiere que todos los DAG se ejecuten simultáneamente, puede calcular el paralelismo máximo multiplicando el número de procesos de trabajo disponibles por la densidad de las tareas de los procesos de trabajo en celery.worker_concurrency y dividirlo por la cantidad de DAG.

core.execute_tasks_new_python_interpreter

Determina si Apache Airflow ejecuta tareas mediante la bifurcación del proceso principal o la creación de un nuevo proceso de Python.

Valor predeterminado: True

Cuando este parámetro esté establecido en True, Apache Airflow reconocerá los cambios que realice en sus complementos como un nuevo proceso en Python creado para ejecutar tareas.

celery.worker_concurrency

Amazon MWAA anula la instalación base de Airflow para que esta opción escale procesos de trabajo como parte de su componente de escalado automático.

Valor predeterminado: No corresponde

Se pasa por alto cualquier valor especificado para esta opción.

celery.worker_autoscale

Simultaneidad de tareas de los procesos de trabajo.

Valores predeterminados:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge - 40,0

  • mw1.2xlarge - 80,0

Se puede usar esta opción para liberar recursos reduciendo la simultaneidad minimum y maximum de tareas de los procesos de trabajo. Los procesos de trabajo aceptarán el maximum de tareas simultáneas configuradas, independientemente de si hay recursos suficientes para realizarlas. Si las tareas se programan sin que haya recursos suficientes, se producirá un error de inmediato. Recomendamos cambiar este valor en el caso de que las tareas consuman muchos recursos; reduzca los valores por debajo de los valores predeterminados para que haya una mayor capacidad por tarea.