Ajuste de desempenho para o Apache Airflow no Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Ajuste de desempenho para o Apache Airflow no Amazon MWAA

Este tópico descreve como ajustar o desempenho de um ambiente Amazon Managed Workflows for Apache Airflow usando Como usar opções de configuração do Apache Airflow no Amazon MWAA.

Como adicionar uma opção de configuração do Apache Airflow

Use o procedimento a seguir para adicionar uma opção de configuração do Airflow ao seu ambiente.

  1. Abra a página Ambientes no console do Amazon MWAA.

  2. Escolha um ambiente.

  3. Escolha Editar.

  4. Escolha Próximo.

  5. Escolha Adicionar configuração personalizada no painel Opções de configuração do Airflow.

  6. Escolha uma configuração na lista suspensa e insira um valor, ou digite uma configuração personalizada e insira um valor.

  7. Escolha Adicionar configuração personalizada para cada configuração que você deseja adicionar.

  8. Escolha Salvar.

Consulte Como usar opções de configuração do Apache Airflow no Amazon MWAA para saber mais.

Agendador do Apache Airflow

O agendador do Apache Airflow é um componente central do Apache Airflow. Um problema com o agendador pode impedir que os DAGs sejam analisados e as tarefas sejam agendadas. Para obter mais informações sobre o ajuste do agendador do Apache Airflow, consulte Ajustar o desempenho do agendador no site de documentação do Apache Airflow.

Parameters

Esta seção descreve as opções de configuração disponíveis para o agendador do Apache Airflow (Apache Airflow v2 e versões posteriores) e os respectivos casos de uso.

Apache Airflow v3
Configuração Caso de uso

celery.sync_parallelism

O número de processos que o Celery Executor usa para sincronizar o estado da tarefa.

Padrão: 1

Você pode usar essa opção para evitar conflitos de fila limitando os processos que o Celery Executor usa. Por padrão, o valor é definido como 1 para evitar erros na entrega de logs de tarefas ao CloudWatch Logs. Definir o valor como 0 significa usar o número máximo de processos, mas pode causar erros ao entregar logs de tarefas.

scheduler.scheduler_idle_sleep_time

O número de segundos de espera entre o processamento consecutivo do arquivo DAG no “loop” do agendador.

Padrão: 1

Você pode usar essa opção para liberar o uso da CPU no agendador ao aumentar o tempo de inatividade do agendador após terminar de recuperar os resultados da análise do DAG, localizar e enfileirar tarefas e executar tarefas em fila no Executor. Aumentar esse valor consome o número de threads do agendador executados em um ambiente em dag_processor.parsing_processes para o Apache Airflow v2 e para o Apache Airflow v3. Isso pode reduzir a capacidade dos agendadores de analisar DAGs e aumentar o tempo necessário para que os DAGs apareçam no servidor Web.

scheduler.max_dagruns_to_create_per_loop

O número máximo de DAGs para criar DagRuns por “loop” do agendador.

Padrão: 10

Você pode usar essa opção para liberar recursos para agendar tarefas ao diminuir o número máximo de DagRuns para o “loop” do agendador.

dag_processor.parsing_processes

O número de threads que o agendador pode executar paralelamente para agendar DAGs.

Padrão: usar (2 * number of vCPUs) - 1

Você pode usar essa opção para liberar recursos ao diminuir o número de processos que o agendador executa paralelamente para analisar DAGs. Recomendamos manter esse número baixo se a análise do DAG estiver afetando o agendamento de tarefas. Você deve especificar um valor menor que a contagem de vCPUs em seu ambiente. Consulte Limites para saber mais.

Apache Airflow v2
Configuração Caso de uso

celery.sync_parallelism

O número de processos que o Celery Executor usa para sincronizar o estado da tarefa.

Padrão: 1

Você pode usar essa opção para evitar conflitos de fila limitando os processos que o Celery Executor usa. Por padrão, o valor é definido como 1 para evitar erros na entrega de logs de tarefas ao CloudWatch Logs. Definir o valor como 0 significa usar o número máximo de processos, mas pode causar erros ao entregar logs de tarefas.

scheduler.idle_sleep_time

O número de segundos de espera entre o processamento consecutivo do arquivo DAG no “loop” do agendador.

Padrão: 1

Você pode usar essa opção para liberar o uso da CPU no agendador ao aumentar o tempo de inatividade do agendador após terminar de recuperar os resultados da análise do DAG, localizar e enfileirar tarefas e executar tarefas em fila no Executor. Aumentar esse valor consome o número de threads do agendador executados em um ambiente em scheduler.parsing_processes para o Apache Airflow v2 e para o Apache Airflow v3. Isso pode reduzir a capacidade dos agendadores de analisar DAGs e aumentar o tempo necessário para que os DAGs apareçam no servidor Web.

scheduler.max_dagruns_to_create_per_loop

O número máximo de DAGs para criar DagRuns por “loop” do agendador.

Padrão: 10

Você pode usar essa opção para liberar recursos para agendar tarefas ao diminuir o número máximo de DagRuns para o “loop” do agendador.

scheduler.parsing_processes

O número de threads que o agendador pode executar paralelamente para agendar DAGs.

Padrão: usar (2 * number of vCPUs) - 1

Você pode usar essa opção para liberar recursos ao diminuir o número de processos que o agendador executa paralelamente para analisar DAGs. Recomendamos manter esse número baixo se a análise do DAG estiver afetando o agendamento de tarefas. Você deve especificar um valor menor que a contagem de vCPUs em seu ambiente. Consulte Limites para saber mais.

Limites

Esta seção descreve os limites que você deve considerar ao ajustar os parâmetros padrão do agendador.

scheduler.parsing_processes, scheduler.max_threads (somente v2)

Dois threads são permitidos por vCPU para uma classe de ambiente. Pelo menos um thread deve ser reservado para o agendador de uma classe de ambiente. Se você notar um atraso no agendamento de tarefas, talvez seja necessário aumentar sua classe de ambiente. Por exemplo, um ambiente grande tem uma instância de contêiner Fargate de 4 vCPUs para seu agendador. Isso significa que um máximo total de 7 threads está disponível para uso em outros processos. Ou seja, dois threads multiplicaram quatro vCPUs, menos um para o próprio agendador. O valor que você especifica em scheduler.max_threads (somente v2) e scheduler.parsing_processes não pode exceder o número de threads disponíveis para uma classe de ambiente, conforme mostrado:

  • mw1.small: não deve exceder 1 thread para outros processos. O thread restante é reservado para o agendador.

  • mw1.medium: não deve exceder 3 threads de outros processos. O thread restante é reservado para o agendador.

  • mw1.large: não deve exceder 7 threads de outros processos. O thread restante é reservado para o agendador.

Pastas do DAG

O agendador do Apache Airflow verifica continuamente a pasta DAGs em seu ambiente. Quaisquer arquivos plugins.zip contidos ou arquivo Python (.py) contendo instruções de importação “airflow”. Todos os objetos Python DAG resultantes são então colocados em um DagBag para que esse arquivo seja processado pelo agendador para determinar quais tarefas, se houver, precisam ser agendadas. A análise do arquivo Dag ocorre independentemente de os arquivos conterem objetos DAG viáveis.

Parameters

Esta seção descreve as opções de configuração disponíveis para a pasta DAGs (Apache Airflow v2 e versões posteriores) e os respectivos casos de uso.

Apache Airflow v3
Configuração Caso de uso

dag_processor.refresh_interval

O número de segundos em que a pasta DAGs precisa ser examinada em busca de novos arquivos.

Padrão: 300 segundos

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos para analisar a pasta DAGs. Recomendamos aumentar esse valor se você observar longos tempos de análise em total_parse_time metrics, o que pode ser decorrente de um grande número de arquivos na sua pasta DAGs.

dag_processor.min_file_process_interval

O número de segundos após os quais o agendador analisa um DAG e as atualizações do DAG são refletidas.

Padrão: 30 segundos

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos que o agendador espera antes de analisar um DAG. Por exemplo, se você especificar um valor de 30, o arquivo DAG será analisado a cada 30 segundos. Recomendamos manter esse número alto para diminuir o uso da CPU em seu ambiente.

Apache Airflow v2
Configuração Caso de uso

scheduler.dag_dir_list_interval

O número de segundos em que a pasta DAGs precisa ser examinada em busca de novos arquivos.

Padrão: 300 segundos

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos para analisar a pasta DAGs. Recomendamos aumentar esse valor se você observar longos tempos de análise em total_parse_time metrics, o que pode ser decorrente de um grande número de arquivos na sua pasta DAGs.

scheduler.min_file_process_interval

O número de segundos após os quais o agendador analisa um DAG e as atualizações do DAG são refletidas.

Padrão: 30 segundos

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos que o agendador espera antes de analisar um DAG. Por exemplo, se você especificar um valor de 30, o arquivo DAG será analisado a cada 30 segundos. Recomendamos manter esse número alto para diminuir o uso da CPU em seu ambiente.

Arquivos DAG

Como parte do loop do agendador do Apache Airflow, arquivos DAG individuais são analisados para extrair objetos do DAG Python. No Apache Airflow v2 e versões posteriores, o agendador analisa um número máximo de processos de análise ao mesmo tempo. O número de segundos especificado em scheduler.min_file_process_interval (v2) ou dag_processor.min_file_process_interval (v3) deve passar antes que o mesmo arquivo seja analisado novamente.

Parameters

Esta seção descreve as opções de configuração disponíveis para os arquivos DAG do Apache Airflow (Apache Airflow v2 e versões posteriores) e os respectivos casos de uso.

Apache Airflow v3
Configuração Caso de uso

dag_processor.dag_file_processor_timeout

O número de segundos antes que o DagFileProcessor atinja o tempo limite de processamento de um arquivo DAG.

Padrão: 50 segundos

Você pode usar essa opção para liberar recursos ao aumentar o tempo necessário até que o DagFileProcessor atinja o tempo limite. Recomendamos aumentar esse valor se o tempo limite for atingido continuamente nos logs de processamento do DAG e resulte na falta de carregamento de DAGs viáveis.

core.dagbag_import_timeout

O tempo limite do número de segundos antes de importar um arquivo do Python.

Padrão: 30 segundos

Você pode usar essa opção para liberar recursos ao aumentar o tempo necessário até que o agendador atinja o tempo limite ao importar um arquivo Python para extrair os objetos DAG. Essa opção é processada como parte do “loop” do agendador e deve conter um valor menor que o valor especificado em dag_processor.dag_file_processor_timeout.

core.min_serialized_dag_update_interval

O número mínimo de segundos após os quais os DAGs serializados no banco de dados são atualizados.

Padrão: 30

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos após os quais os DAGs serializados no banco de dados são atualizados. Recomendamos aumentar esse valor se você tiver um número grande de DAGs, ou DAGs complexos. Aumentar esse valor reduz a carga no agendador e no banco de dados à medida que os DAGs são serializados.

core.min_serialized_dag_fetch_interval

O número de segundos em que um DAG serializado é recuperado do banco de dados quando já está carregado no DagBag.

Padrão: 10

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos em que um DAG serializado é recuperado. O valor deve ser maior que o valor especificado em core.min_serialized_dag_update_interval para reduzir as taxas de “gravação” do banco de dados. Aumentar esse valor reduz a carga no servidor Web e no banco de dados à medida que os DAGs são serializados.

Apache Airflow v2
Configuração Caso de uso

core.dag_file_processor_timeout

O número de segundos antes que o DagFileProcessor atinja o tempo limite de processamento de um arquivo DAG.

Padrão: 50 segundos

Você pode usar essa opção para liberar recursos ao aumentar o tempo necessário até que o DagFileProcessor atinja o tempo limite. Recomendamos aumentar esse valor se o tempo limite for atingido continuamente nos logs de processamento do DAG e resulte na falta de carregamento de DAGs viáveis.

core.dagbag_import_timeout

O tempo limite do número de segundos antes de importar um arquivo do Python.

Padrão: 30 segundos

Você pode usar essa opção para liberar recursos ao aumentar o tempo necessário até que o agendador atinja o tempo limite ao importar um arquivo Python para extrair os objetos DAG. Essa opção é processada como parte do “loop” do agendador e deve conter um valor menor que o valor especificado em core.dag_file_processor_timeout.

core.min_serialized_dag_update_interval

O número mínimo de segundos após os quais os DAGs serializados no banco de dados são atualizados.

Padrão: 30

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos após os quais os DAGs serializados no banco de dados são atualizados. Recomendamos aumentar esse valor se você tiver um número grande de DAGs, ou DAGs complexos. Aumentar esse valor reduz a carga no agendador e no banco de dados à medida que os DAGs são serializados.

core.min_serialized_dag_fetch_interval

O número de segundos em que um DAG serializado é recuperado do banco de dados quando já está carregado no DagBag.

Padrão: 10

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos em que um DAG serializado é recuperado. O valor deve ser maior que o valor especificado em core.min_serialized_dag_update_interval para reduzir as taxas de “gravação” do banco de dados. Aumentar esse valor reduz a carga no servidor Web e no banco de dados à medida que os DAGs são serializados.

Tarefas

Tanto o agendador quanto os operadores do Apache Airflow estão envolvidos em tarefas de enfileiramento e desenfileiramento. O agendador leva as tarefas analisadas prontas para serem agendadas de um status vazio para um status agendado. O executor, também em execução no contêiner do agendador no Fargate, coloca essas tarefas em fila e define o status como Em fila. Quando os operadores têm capacidade, retiram a tarefa da fila e definem o status como executando, que posteriormente muda o status para Êxito ou Falha com base no sucesso ou falha da tarefa.

Parameters

Esta seção descreve as opções de configuração disponíveis para tarefas do Apache Airflow e os casos de uso delas.

As opções de configuração padrão que o Amazon MWAA substitui estão marcadas em vermelho.

Apache Airflow v3
Configuração Caso de uso

core.parallelism

O número máximo de instâncias de tarefas que podem ter o status Running.

Padrão: definido dinamicamente com base em (maxWorkers * maxCeleryWorkers) / schedulers * 1.5.

Você pode usar essa opção para liberar recursos ao aumentar o número de instâncias de tarefas que podem ser executadas simultaneamente. O valor especificado deve ser o número de operadores disponíveis multiplicado pela densidade de tarefas dos operadores. Recomendamos alterar esse valor somente quando houver um grande número de tarefas bloqueadas no estado “Em execução” ou “Em fila”.

core.execute_tasks_new_python_interpreter

Determina se o Apache Airflow executa tarefas bifurcando o processo principal ou criando um processo em Python.

Padrão: True

Quando definido como True, o Apache Airflow reconhece as alterações feitas nos seus plug-ins como um novo processo Python criado para executar tarefas.

celery.worker_concurrency

O Amazon MWAA substitui a instalação básica do Airflow por essa opção para escalar operadores como parte de seu componente de ajuste de escala automático.

Padrão: não aplicável

Qualquer valor especificado para essa opção é ignorado.

celery.worker_autoscale

A simultaneidade de tarefas para operadores.

Padrões:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge - 40,0

  • mw1.2xlarge - 80,0

Você pode usar essa opção para liberar recursos ao reduzir a simultaneidade minimum, maximum de tarefas dos operadores. Os Operadores aceitam até maximum tarefas simultâneas configuradas, independentemente de haver recursos suficientes para fazer isso. Se as tarefas forem agendadas sem recursos suficientes, elas falharão imediatamente. Recomendamos alterar esse valor em tarefas que consomem muitos recursos ao reduzir os valores para serem menores que os padrões a fim de permitir mais capacidade por tarefa.

Apache Airflow v2
Configuração Caso de uso

core.parallelism

O número máximo de instâncias de tarefas que podem ter o status Running.

Padrão: definido dinamicamente com base em (maxWorkers * maxCeleryWorkers) / schedulers * 1.5.

Você pode usar essa opção para liberar recursos ao aumentar o número de instâncias de tarefas que podem ser executadas simultaneamente. O valor especificado deve ser o número de operadores disponíveis multiplicado pela densidade de tarefas dos operadores. Recomendamos alterar esse valor somente quando houver um grande número de tarefas bloqueadas no estado “Em execução” ou “Em fila”.

core.dag_concurrency

O número de instâncias de tarefas que podem ser executadas simultaneamente para cada DAG.

Padrão: 10000

Você pode usar essa opção para liberar recursos ao aumentar o número de instâncias de tarefas autorizadas a serem executadas simultaneamente. Por exemplo, se você tiver cem DAGs com dez tarefas paralelas e quiser que todos os DAGs sejam executados simultaneamente, é possível calcular o paralelismo máximo como o número de operadores disponíveis multiplicado pela densidade de tarefas dos operadores em celery.worker_concurrency, dividido pelo número de DAGs.

core.execute_tasks_new_python_interpreter

Determina se o Apache Airflow executa tarefas bifurcando o processo principal ou criando um processo em Python.

Padrão: True

Quando definido como True, o Apache Airflow reconhece as alterações feitas nos seus plug-ins como um novo processo Python criado para executar tarefas.

celery.worker_concurrency

O Amazon MWAA substitui a instalação básica do Airflow por essa opção para escalar operadores como parte de seu componente de ajuste de escala automático.

Padrão: não aplicável

Qualquer valor especificado para essa opção é ignorado.

celery.worker_autoscale

A simultaneidade de tarefas para operadores.

Padrões:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge - 40,0

  • mw1.2xlarge - 80,0

Você pode usar essa opção para liberar recursos ao reduzir a simultaneidade minimum, maximum de tarefas dos operadores. Os Operadores aceitam até maximum tarefas simultâneas configuradas, independentemente de haver recursos suficientes para fazer isso. Se as tarefas forem agendadas sem recursos suficientes, elas falharão imediatamente. Recomendamos alterar esse valor em tarefas que consomem muitos recursos ao reduzir os valores para serem menores que os padrões a fim de permitir mais capacidade por tarefa.