Ajuste de desempenho para o Apache Airflow no Amazon MWAA - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Ajuste de desempenho para o Apache Airflow no Amazon MWAA

Este tópico descreve como ajustar o desempenho de um ambiente Amazon Managed Workflows para Apache Airflow usando. Como usar opções de configuração do Apache Airflow no Amazon MWAA

Como adicionar uma opção de configuração do Apache Airflow

Use o procedimento a seguir para adicionar uma opção de configuração do Airflow ao seu ambiente.

  1. Abra a página Ambientes no console do Amazon MWAA.

  2. Escolha um ambiente.

  3. Escolha Editar.

  4. Escolha Próximo.

  5. Escolha Adicionar configuração personalizada no painel Opções de configuração do Airflow.

  6. Escolha uma configuração na lista suspensa e insira um valor, ou digite uma configuração personalizada e insira um valor.

  7. Escolha Adicionar configuração personalizada para cada configuração que você deseja adicionar.

  8. Escolha Salvar.

Para saber mais, consulteComo usar opções de configuração do Apache Airflow no Amazon MWAA.

Agendador do Apache Airflow

O agendador do Apache Airflow é um componente central do Apache Airflow. Um problema com o agendador pode DAGs impedir a análise e o agendamento de tarefas. Para obter mais informações sobre o ajuste do agendador do Apache Airflow, consulte Como ajustar o desempenho do agendador no site de documentação do Apache Airflow.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para o agendador do Apache Airflow (Apache Airflow v2 e versões posteriores) e seus casos de uso.

Apache Airflow v3
Configuração Caso de uso

celery.sync_parallelism

O número de processos que o Celery Executor usa para sincronizar o estado da tarefa.

Padrão: 1

Você pode usar essa opção para evitar conflitos de fila limitando os processos que o Celery Executor usa. Por padrão, um valor é definido como 1 para evitar erros na entrega de registros de tarefas ao CloudWatch Logs. Definir o valor como 0 significa usar o número máximo de processos, mas pode causar erros ao entregar logs de tarefas.

scheduler.scheduler_idle_sleep_time

O número de segundos de espera entre o processamento consecutivo do arquivo DAG no “loop” do agendador.

Padrão: 1

Você pode usar essa opção para liberar o uso da CPU no agendador aumentando o tempo em que o agendador dorme após terminar de recuperar os resultados da análise do DAG, localizar e enfileirar tarefas e executar tarefas em fila no Executor. Aumentar esse valor consome o número de threads do agendador executados em um ambiente dag_processor.parsing_processes para o Apache Airflow v2 e o Apache Airflow v3. Isso pode reduzir a capacidade de análise DAGs dos agendadores e aumentar o tempo necessário DAGs para serem preenchidos no servidor web.

scheduler.max_dagruns_to_create_per_loop

O número máximo de DAGs a serem criados DagRunspor “loop” do agendador.

Padrão: 10

Você pode usar essa opção para liberar recursos para agendar tarefas diminuindo o número máximo de DagRuns“loop” do agendador.

processador_dag. Processos de análise

O número de threads que o agendador pode executar paralelamente ao agendamento DAGs.

Padrão: Usar (2 * number of vCPUs) - 1

Você pode usar essa opção para liberar recursos diminuindo o número de processos que o agendador executa paralelamente para analisar. DAGs Recomendamos manter esse número baixo se a análise do DAG estiver afetando o agendamento de tarefas. Você deve especificar um valor menor que a contagem de vCPUs em seu ambiente. Para saber mais, consulte Limites.

Apache Airflow v2
Configuração Caso de uso

celery.sync_parallelism

O número de processos que o Celery Executor usa para sincronizar o estado da tarefa.

Padrão: 1

Você pode usar essa opção para evitar conflitos de fila limitando os processos que o Celery Executor usa. Por padrão, um valor é definido como 1 para evitar erros na entrega de registros de tarefas ao CloudWatch Logs. Definir o valor como 0 significa usar o número máximo de processos, mas pode causar erros ao entregar logs de tarefas.

scheduler.idle_sleep_time

O número de segundos de espera entre o processamento consecutivo do arquivo DAG no “loop” do agendador.

Padrão: 1

Você pode usar essa opção para liberar o uso da CPU no agendador aumentando o tempo em que o agendador dorme após terminar de recuperar os resultados da análise do DAG, localizar e enfileirar tarefas e executar tarefas em fila no Executor. Aumentar esse valor consome o número de threads do agendador executados em um ambiente scheduler.parsing_processes para o Apache Airflow v2 e o Apache Airflow v3. Isso pode reduzir a capacidade de análise DAGs dos agendadores e aumentar o tempo necessário DAGs para serem preenchidos no servidor web.

scheduler.max_dagruns_to_create_per_loop

O número máximo de DAGs a serem criados DagRunspor “loop” do agendador.

Padrão: 10

Você pode usar essa opção para liberar recursos para agendar tarefas diminuindo o número máximo de DagRuns“loop” do agendador.

scheduler.parsing_processes

O número de threads que o agendador pode executar paralelamente ao agendamento DAGs.

Padrão: Usar (2 * number of vCPUs) - 1

Você pode usar essa opção para liberar recursos diminuindo o número de processos que o agendador executa paralelamente para analisar. DAGs Recomendamos manter esse número baixo se a análise do DAG estiver afetando o agendamento de tarefas. Você deve especificar um valor menor que a contagem de vCPUs em seu ambiente. Para saber mais, consulte Limites.

Limites

Esta seção descreve os limites a serem considerados ao ajustar os parâmetros padrão do agendador.

scheduler.parsing_processes, scheduler.max_threads (somente v2)

Dois threads são permitidos por vCPU para uma classe de ambiente. Pelo menos um thread deve ser reservado para o agendador de uma classe de ambiente. Se você notar um atraso no agendamento de tarefas, talvez seja necessário aumentar sua classe de ambiente. Por exemplo, um ambiente grande tem uma instância de contêiner Fargate de 4 vCPUs para seu agendador. Isso significa que um máximo total de 7 threads está disponível para uso em outros processos. Ou seja, dois threads multiplicaram quatro vCPUs, menos um para o próprio agendador. O valor que você especifica em scheduler.max_threads (somente v2) e não scheduler.parsing_processes deve exceder o número de threads disponíveis para uma classe de ambiente, conforme listado:

  • mw1.small: não deve exceder 1 thread para outros processos. O tópico restante é reservado para o agendador.

  • mw1.medium: não deve exceder 3 threads de outros processos. O tópico restante é reservado para o agendador.

  • mw1.large: não deve exceder 7 threads de outros processos. O tópico restante é reservado para o agendador.

Pastas do DAG

O agendador do Apache Airflow verifica continuamente a pasta em seu ambiente. DAGs Quaisquer arquivos plugins.zip contidos ou arquivo Python (.py) contendo instruções de importação “airflow”. Todos os objetos Python DAG resultantes são então colocados em um arquivo DagBagpara que esse arquivo seja processado pelo agendador para determinar quais tarefas, se houver, precisam ser agendadas. A análise do arquivo Dag ocorre independentemente de os arquivos conterem objetos DAG viáveis.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para a DAGs pasta (Apache Airflow v2 e posterior) e seus casos de uso.

Apache Airflow v3
Configuração Caso de uso

dag_processor.refresh_interval

O número de segundos em que a DAGs pasta deve ser examinada em busca de novos arquivos.

Padrão: 300 segundos

Você pode usar essa opção para liberar recursos aumentando o número de segundos para analisar a DAGs pasta. Recomendamos aumentar esse valor se você tiver longos tempos de análisetotal_parse_time metrics, o que pode ser devido a um grande número de arquivos em sua DAGs pasta.

dag_processor.min_file_process_interval

O número de segundos após os quais o agendador analisa um DAG e as atualizações do DAG são refletidas.

Padrão: 30 segundos

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos que o agendador espera antes de analisar um DAG. Por exemplo, se você especificar um valor de 30, o arquivo DAG será analisado a cada 30 segundos. Recomendamos manter esse número alto para diminuir o uso da CPU em seu ambiente.

Apache Airflow v2
Configuração Caso de uso

scheduler.dag_dir_list_interval

O número de segundos em que a DAGs pasta deve ser examinada em busca de novos arquivos.

Padrão: 300 segundos

Você pode usar essa opção para liberar recursos aumentando o número de segundos para analisar a DAGs pasta. Recomendamos aumentar esse valor se você tiver longos tempos de análisetotal_parse_time metrics, o que pode ser devido a um grande número de arquivos em sua DAGs pasta.

scheduler.min_file_process_interval

O número de segundos após os quais o agendador analisa um DAG e as atualizações do DAG são refletidas.

Padrão: 30 segundos

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos que o agendador espera antes de analisar um DAG. Por exemplo, se você especificar um valor de 30, o arquivo DAG será analisado a cada 30 segundos. Recomendamos manter esse número alto para diminuir o uso da CPU em seu ambiente.

Arquivos DAG

Como parte do loop do agendador do Apache Airflow, arquivos DAG individuais são analisados para extrair objetos do DAG Python. No Apache Airflow v2 e versões posteriores, o agendador analisa um número máximo de processos de análise ao mesmo tempo. O número de segundos especificado em scheduler.min_file_process_interval (v2) ou dag_processor.min_file_process_interval (v3) deve passar antes que o mesmo arquivo seja analisado novamente.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para arquivos Apache Airflow DAG (Apache Airflow v2 e versões posteriores) e seus casos de uso.

Apache Airflow v3
Configuração Caso de uso

dag_processor.dag_file_processor_timeout

O número de segundos antes do DagFileProcessortempo limite de processamento de um arquivo DAG.

Padrão: 50 segundos

Você pode usar essa opção para liberar recursos aumentando o tempo necessário antes que o tempo limite seja DagFileProcessoratingido. Recomendamos aumentar esse valor se você tiver tempos limite nos registros de processamento do DAG que resultem em impossibilidade DAGs de carregamento.

core.dagbag_import_timeout

O tempo limite do número de segundos antes de importar um arquivo do Python.

Padrão: 30 segundos

Você pode usar essa opção para liberar recursos aumentando o tempo necessário até que o agendador expire ao importar um arquivo Python para extrair os objetos do DAG. Essa opção é processada como parte do “loop” do agendador e deve conter um valor menor que o valor especificado emdag_processor.dag_file_processor_timeout.

core.min_serialized_dag_update_interval

O número mínimo de segundos após o qual os serializados DAGs no banco de dados são atualizados.

Padrão: 30

Você pode usar essa opção para liberar recursos aumentando o número de segundos após os quais os serializados DAGs no banco de dados são atualizados. Recomendamos aumentar esse valor se você tiver um número DAGs grande ou complexo DAGs. Aumentar esse valor reduz a carga no agendador e no banco de dados à medida que DAGs são serializados.

core.min_serialized_dag_fetch_interval

O número de segundos em que um DAG serializado é recuperado do banco de dados quando já está carregado no. DagBag

Padrão: 10

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos em que um DAG serializado é recuperado. O valor deve ser maior que o valor especificado em core.min_serialized_dag_update_interval para reduzir as taxas de “gravação” do banco de dados. Aumentar esse valor reduz a carga no servidor web e no banco de dados à medida que DAGs são serializados.

Apache Airflow v2
Configuração Caso de uso

core.dag_file_processor_timeout

O número de segundos antes do DagFileProcessortempo limite de processamento de um arquivo DAG.

Padrão: 50 segundos

Você pode usar essa opção para liberar recursos aumentando o tempo necessário antes que o tempo limite seja DagFileProcessoratingido. Recomendamos aumentar esse valor se você tiver tempos limite nos registros de processamento do DAG que resultem em impossibilidade DAGs de carregamento.

core.dagbag_import_timeout

O tempo limite do número de segundos antes de importar um arquivo do Python.

Padrão: 30 segundos

Você pode usar essa opção para liberar recursos aumentando o tempo necessário até que o agendador expire ao importar um arquivo Python para extrair os objetos do DAG. Essa opção é processada como parte do “loop” do agendador e deve conter um valor menor que o valor especificado emcore.dag_file_processor_timeout.

core.min_serialized_dag_update_interval

O número mínimo de segundos após o qual os serializados DAGs no banco de dados são atualizados.

Padrão: 30

Você pode usar essa opção para liberar recursos aumentando o número de segundos após os quais os serializados DAGs no banco de dados são atualizados. Recomendamos aumentar esse valor se você tiver um número DAGs grande ou complexo DAGs. Aumentar esse valor reduz a carga no agendador e no banco de dados à medida que DAGs são serializados.

core.min_serialized_dag_fetch_interval

O número de segundos em que um DAG serializado é recuperado do banco de dados quando já está carregado no. DagBag

Padrão: 10

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos em que um DAG serializado é recuperado. O valor deve ser maior que o valor especificado em core.min_serialized_dag_update_interval para reduzir as taxas de “gravação” do banco de dados. Aumentar esse valor reduz a carga no servidor web e no banco de dados à medida que DAGs são serializados.

Tarefas

Tanto o agendador quanto os operadores do Apache Airflow estão envolvidos em tarefas de enfileiramento e desenfileiramento. O agendador leva as tarefas analisadas prontas para serem agendadas de um status vazio para um status agendado. O executor, também em execução no contêiner do agendador no Fargate, coloca essas tarefas em fila e define o status como Em fila. Quando os operadores têm capacidade, retiram a tarefa da fila e definem o status como executando, que posteriormente muda o status para Êxito ou Falha com base no sucesso ou falha da tarefa.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para tarefas do Apache Airflow e os casos de uso delas.

As opções de configuração padrão que o Amazon MWAA substitui estão marcadas em. red

Apache Airflow v3
Configuração Caso de uso

core.parallelism

O número máximo de instâncias de tarefas que podem ter um Running status.

Padrão: definido dinamicamente com base em. (maxWorkers * maxCeleryWorkers) / schedulers * 1.5

Você pode usar essa opção para liberar recursos ao aumentar o número de instâncias de tarefas que podem ser executadas simultaneamente. O valor especificado deve ser o número de trabalhadores disponíveis multiplicado pela densidade de tarefas dos trabalhadores. Recomendamos alterar esse valor somente quando você tiver um grande número de tarefas paralisadas no estado “Em execução” ou “Em fila”.

core.execute_tasks_new_python_interpreter

Determina se o Apache Airflow executa tarefas bifurcando o processo principal ou criando um processo em Python.

Padrão: True

Quando definido como True, o Apache Airflow reconhece as alterações feitas nos seus plug-ins como um novo processo Python criado para executar tarefas.

celery.worker_concurrency

O Amazon MWAA substitui a instalação básica do Airflow para essa opção para escalar trabalhadores como parte de seu componente de escalonamento automático.

Padrão: Não aplicável

Any value specified for this option is ignored.

celery.worker_autoscale

A simultaneidade de tarefas para trabalhadores.

Padrões:

  • mw1.micro - 3,0

  • mw1.small: 5,0

  • mw1.medium: 10,0

  • mw1.large: 20,0

  • mw1.xlarge: 40,0

  • mw1.2xlarge: 80,0

Você pode usar essa opção para liberar recursos reduzindo a maximum simultaneidade de minimum tarefas dos trabalhadores. Os trabalhadores aceitam até as tarefas maximum simultâneas configuradas, independentemente de haver recursos suficientes para fazer isso. Se as tarefas forem agendadas sem recursos suficientes, elas falharão imediatamente. Recomendamos alterar esse valor em tarefas que consomem muitos recursos ao reduzir os valores para serem menores que os padrões a fim de permitir mais capacidade por tarefa.

Apache Airflow v2
Configuração Caso de uso

core.parallelism

O número máximo de instâncias de tarefas que podem ter um Running status.

Padrão: definido dinamicamente com base em. (maxWorkers * maxCeleryWorkers) / schedulers * 1.5

Você pode usar essa opção para liberar recursos ao aumentar o número de instâncias de tarefas que podem ser executadas simultaneamente. O valor especificado deve ser o número de trabalhadores disponíveis multiplicado pela densidade de tarefas dos trabalhadores. Recomendamos alterar esse valor somente quando você tiver um grande número de tarefas paralisadas no estado “Em execução” ou “Em fila”.

core.dag_concurrency

O número de instâncias de tarefas que podem ser executadas simultaneamente para cada DAG.

Padrão: 10000

Você pode usar essa opção para liberar recursos ao aumentar o número de instâncias de tarefas autorizadas a serem executadas simultaneamente. Por exemplo, se você tiver cem DAGs com dez tarefas paralelas e quiser que todas DAGs sejam executadas simultaneamente, você pode calcular o paralelismo máximo como o número de trabalhadores disponíveis multiplicado pela densidade de tarefas do trabalhador emcelery.worker_concurrency, dividido pelo número de. DAGs

core.execute_tasks_new_python_interpreter

Determina se o Apache Airflow executa tarefas bifurcando o processo principal ou criando um processo em Python.

Padrão: True

Quando definido como True, o Apache Airflow reconhece as alterações feitas nos seus plug-ins como um novo processo Python criado para executar tarefas.

celery.worker_concurrency

O Amazon MWAA substitui a instalação básica do Airflow para essa opção para escalar trabalhadores como parte de seu componente de escalonamento automático.

Padrão: Não aplicável

Any value specified for this option is ignored.

celery.worker_autoscale

A simultaneidade de tarefas para trabalhadores.

Padrões:

  • mw1.micro - 3,0

  • mw1.small: 5,0

  • mw1.medium: 10,0

  • mw1.large: 20,0

  • mw1.xlarge: 40,0

  • mw1.2xlarge: 80,0

Você pode usar essa opção para liberar recursos reduzindo a maximum simultaneidade de minimum tarefas dos trabalhadores. Os trabalhadores aceitam até as tarefas maximum simultâneas configuradas, independentemente de haver recursos suficientes para fazer isso. Se as tarefas forem agendadas sem recursos suficientes, elas falharão imediatamente. Recomendamos alterar esse valor em tarefas que consomem muitos recursos ao reduzir os valores para serem menores que os padrões a fim de permitir mais capacidade por tarefa.