Ottimizzazione delle prestazioni per Apache Airflow su Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Ottimizzazione delle prestazioni per Apache Airflow su Amazon MWAA

Questo argomento descrive come ottimizzare le prestazioni di un ambiente Amazon Managed Workflows for Apache Airflow utilizzando. Utilizzo delle opzioni di configurazione Apache Airflow su Amazon MWAA

Aggiungere un'opzione di configurazione Apache Airflow

Utilizzate la seguente procedura per aggiungere un'opzione di configurazione Airflow al vostro ambiente.

  1. Apri la pagina Ambienti sulla console Amazon MWAA.

  2. Scegli un ambiente.

  3. Scegli Modifica.

  4. Scegli Next (Successivo).

  5. Scegli Aggiungi configurazione personalizzata nel riquadro delle opzioni di configurazione Airflow.

  6. Scegli una configurazione dall'elenco a discesa e inserisci un valore, oppure inserisci una configurazione personalizzata e inserisci un valore.

  7. Scegli Aggiungi configurazione personalizzata per ogni configurazione che desideri aggiungere.

  8. Scegli Save (Salva).

Per ulteriori informazioni, fare riferimento aUtilizzo delle opzioni di configurazione Apache Airflow su Amazon MWAA.

Pianificatore Apache Airflow

Lo scheduler Apache Airflow è un componente fondamentale di Apache Airflow. Un problema con lo scheduler può DAGs impedire l'analisi e la pianificazione delle attività. Per ulteriori informazioni sull'ottimizzazione dello scheduler di Apache Airflow, consulta la sezione Ottimizzazione delle prestazioni dello scheduler nel sito Web della documentazione di Apache Airflow.

Parametri

Questa sezione descrive le opzioni di configurazione disponibili per lo scheduler Apache Airflow (Apache Airflow v2 e versioni successive) e i relativi casi d'uso.

Apache Airflow v3
Configurazione Caso d'uso

celery.sync_parallelism

Il numero di processi utilizzati da Celery Executor per sincronizzare lo stato delle attività.

Impostazione predefinita: 1

È possibile utilizzare questa opzione per prevenire i conflitti di coda limitando i processi utilizzati da Celery Executor. Per impostazione predefinita, viene impostato un valore per 1 prevenire errori nella consegna dei registri delle attività ai registri. CloudWatch Impostare il valore su 0 significa utilizzare il numero massimo di processi, ma può causare errori durante la consegna dei registri delle attività.

scheduler.scheduler_idle_sleep_time

Il numero di secondi di attesa tra un'elaborazione consecutiva dei file DAG nel «ciclo» dello scheduler.

Impostazione predefinita: 1

È possibile utilizzare questa opzione per liberare l'utilizzo della CPU sullo scheduler aumentando il tempo di inattività dello scheduler dopo aver terminato il recupero dei risultati dell'analisi DAG, la ricerca e l'accodamento delle attività e l'esecuzione delle attività in coda nell'Executor. L'aumento di questo valore consuma il numero di thread di pianificazione eseguiti su un ambiente in dag_processor.parsing_processes Apache Airflow v2 e Apache Airflow v3. Ciò può ridurre la capacità di analisi degli scheduler e aumentare il tempo necessario per il DAGs popolamento nel server web. DAGs

scheduler.max_dagruns_to_create_per_loop

Il numero massimo di elementi da creare per ogni «ciclo» dello scheduler. DAGs DagRuns

Impostazione predefinita: 10

È possibile utilizzare questa opzione per liberare risorse per la pianificazione delle attività diminuendo il numero massimo di «loop» dello DagRunsscheduler.

dag_processor.parsing_processes

Il numero di thread che lo scheduler può eseguire in parallelo alla pianificazione. DAGs

Predefinito: Usa (2 * number of vCPUs) - 1

È possibile utilizzare questa opzione per liberare risorse diminuendo il numero di processi che lo scheduler esegue in parallelo per analizzare. DAGs Si consiglia di mantenere basso questo numero se l'analisi del DAG influisce sulla pianificazione delle attività. È necessario specificare un valore inferiore al numero di vCPU nell'ambiente. Per ulteriori informazioni, consulta Limiti.

Apache Airflow v2
Configurazione Caso d'uso

celery.sync_parallelism

Il numero di processi utilizzati da Celery Executor per sincronizzare lo stato delle attività.

Impostazione predefinita: 1

È possibile utilizzare questa opzione per prevenire i conflitti di coda limitando i processi utilizzati da Celery Executor. Per impostazione predefinita, viene impostato un valore per 1 prevenire errori nella consegna dei registri delle attività ai registri. CloudWatch Impostare il valore su 0 significa utilizzare il numero massimo di processi, ma può causare errori durante la consegna dei registri delle attività.

scheduler.idle_sleep_time

Il numero di secondi di attesa tra un'elaborazione consecutiva dei file DAG nel «ciclo» dello scheduler.

Impostazione predefinita: 1

È possibile utilizzare questa opzione per liberare l'utilizzo della CPU sullo scheduler aumentando il tempo di inattività dello scheduler dopo aver terminato il recupero dei risultati dell'analisi DAG, la ricerca e l'accodamento delle attività e l'esecuzione delle attività in coda nell'Executor. L'aumento di questo valore consuma il numero di thread di pianificazione eseguiti su un ambiente in scheduler.parsing_processes Apache Airflow v2 e Apache Airflow v3. Ciò può ridurre la capacità di analisi degli scheduler e aumentare il tempo necessario per il DAGs popolamento nel server web. DAGs

scheduler.max_dagruns_to_create_per_loop

Il numero massimo di elementi da creare per ogni «ciclo» dello scheduler. DAGs DagRuns

Impostazione predefinita: 10

È possibile utilizzare questa opzione per liberare risorse per la pianificazione delle attività diminuendo il numero massimo di «loop» dello DagRunsscheduler.

scheduler.parsing_processes

Il numero di thread che lo scheduler può eseguire in parallelo alla pianificazione. DAGs

Predefinito: Usa (2 * number of vCPUs) - 1

È possibile utilizzare questa opzione per liberare risorse diminuendo il numero di processi che lo scheduler esegue in parallelo per analizzare. DAGs Si consiglia di mantenere basso questo numero se l'analisi del DAG influisce sulla pianificazione delle attività. È necessario specificare un valore inferiore al numero di vCPU nell'ambiente. Per ulteriori informazioni, consulta Limiti.

Limits

Questa sezione descrive i limiti da considerare quando si regolano i parametri predefiniti per lo scheduler.

scheduler.parsing_processes, scheduler.max_threads (solo v2)

Sono consentiti due thread per vCPU per una classe di ambiente. Almeno un thread deve essere riservato allo scheduler per una classe di ambiente. Se si nota un ritardo nella pianificazione delle attività, potrebbe essere necessario aumentare la classe di ambiente. Ad esempio, un ambiente di grandi dimensioni ha un'istanza del contenitore Fargate a 4 vCpu come scheduler. Ciò significa che è disponibile un massimo di thread 7 totali da utilizzare per altri processi. Cioè, due thread moltiplicati per quattro vCPUs, meno uno per lo scheduler stesso. Il valore specificato in scheduler.max_threads (solo v2) non scheduler.parsing_processes deve superare il numero di thread disponibili per una classe di ambiente, come elencato:

  • mw1.small — Non deve superare il numero di 1 thread per altri processi. Il thread rimanente è riservato allo scheduler.

  • mw1.medium — Non deve superare i 3 thread per altri processi. Il thread rimanente è riservato allo scheduler.

  • mw1.large — Non deve superare i 7 thread per altri processi. Il thread rimanente è riservato allo scheduler.

Cartelle DAG

L'utilità di pianificazione Apache Airflow analizza DAGs continuamente la cartella nel tuo ambiente. Qualsiasi plugins.zip file contenuto o file Python (.py) contenente istruzioni di importazione «airflow». Tutti gli oggetti Python DAG risultanti vengono quindi inseriti in un file DagBagaffinché quel file venga elaborato dallo scheduler per determinare quali attività, se del caso, devono essere pianificate. L'analisi dei file DAG avviene indipendentemente dal fatto che i file contengano oggetti DAG validi.

Parametri

Questa sezione descrive le opzioni di configurazione disponibili per la DAGs cartella (Apache Airflow v2 e versioni successive) e i relativi casi d'uso.

Apache Airflow v3
Configurazione Caso d'uso

dag_processor.refresh_interval

Il numero di secondi in cui la cartella deve essere scansionata alla ricerca di nuovi file. DAGs

Impostazione predefinita: 300 secondi

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di secondi di analisi della DAGs cartella. Ti consigliamo di aumentare questo valore se riscontri lunghi tempi di analisi intotal_parse_time metrics, ad esempio a causa dell'elevato numero di file nella cartella DAGs .

dag_processor.min_file_process_interval

Il numero di secondi dopo i quali lo scheduler analizza un DAG e vengono riflessi gli aggiornamenti al DAG.

Impostazione predefinita: 30 secondi

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di secondi che lo scheduler attende prima di analizzare un DAG. Ad esempio, se si specifica un valore di30, il file DAG viene analizzato ogni 30 secondi. Si consiglia di mantenere questo numero elevato per ridurre l'utilizzo della CPU nell'ambiente.

Apache Airflow v2
Configurazione Caso d'uso

scheduler.dag_dir_list_interval

Il numero di secondi in cui la cartella deve essere scansionata alla ricerca di nuovi file. DAGs

Impostazione predefinita: 300 secondi

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di secondi di analisi della DAGs cartella. Ti consigliamo di aumentare questo valore se riscontri lunghi tempi di analisi intotal_parse_time metrics, ad esempio a causa dell'elevato numero di file nella cartella DAGs .

scheduler.min_file_process_interval

Il numero di secondi dopo i quali lo scheduler analizza un DAG e vengono riflessi gli aggiornamenti al DAG.

Impostazione predefinita: 30 secondi

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di secondi che lo scheduler attende prima di analizzare un DAG. Ad esempio, se si specifica un valore di30, il file DAG viene analizzato ogni 30 secondi. Si consiglia di mantenere questo numero elevato per ridurre l'utilizzo della CPU nell'ambiente.

File DAG

Come parte del ciclo di pianificazione Apache Airflow, i singoli file DAG vengono analizzati per estrarre oggetti DAG Python. In Apache Airflow v2 e versioni successive, lo scheduler analizza un numero massimo di processi di analisi contemporaneamente. Il numero di secondi specificato in scheduler.min_file_process_interval (v2) o dag_processor.min_file_process_interval (v3) deve trascorrere prima che lo stesso file venga nuovamente analizzato.

Parametri

Questa sezione descrive le opzioni di configurazione disponibili per i file Apache Airflow DAG (Apache Airflow v2 e versioni successive) e i relativi casi d'uso.

Apache Airflow v3
Configurazione Caso d'uso

dag_processor.dag_file_processor_timeout

Il numero di secondi prima del timeout dell'elaborazione di un file DAG. DagFileProcessor

Impostazione predefinita: 50 secondi

È possibile utilizzare questa opzione per liberare risorse aumentando il tempo necessario prima del DagFileProcessortimeout. Si consiglia di aumentare questo valore se si verificano dei timeout nei registri di elaborazione DAG che impediscono il caricamento di file validi. DAGs

core.dagbag_import_timeout

Il numero di secondi prima dell'importazione di un file Python scade.

Impostazione predefinita: 30 secondi

È possibile utilizzare questa opzione per liberare risorse aumentando il tempo necessario prima che lo scheduler scada durante l'importazione di un file Python per estrarre gli oggetti DAG. Questa opzione viene elaborata come parte del «ciclo» dello scheduler e deve contenere un valore inferiore al valore specificato in. dag_processor.dag_file_processor_timeout

core.min_serialized_dag_update_interval

Il numero minimo di secondi dopo il quale vengono aggiornati i serializzati nel database. DAGs

Valore predefinito: 30

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di secondi dopo i quali vengono aggiornati i dati serializzati DAGs nel database. Si consiglia di aumentare questo valore se si dispone di DAGs un numero elevato o complesso DAGs. L'aumento di questo valore riduce il carico sullo scheduler e sul database man mano che DAGs vengono serializzati.

core.min_serialized_dag_fetch_interval

Il numero di secondi in cui un DAG serializzato viene recuperato nuovamente dal database quando è già caricato in. DagBag

Impostazione predefinita: 10

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di secondi di recupero di un DAG serializzato. Il valore deve essere maggiore del valore specificato in core.min_serialized_dag_update_interval per ridurre le velocità di «scrittura» del database. L'aumento di questo valore riduce il carico sul server web e sul database man mano che DAGs vengono serializzati.

Apache Airflow v2
Configurazione Caso d'uso

core.dag_file_processor_timeout

Il numero di secondi prima del timeout dell'elaborazione di un file DAG. DagFileProcessor

Impostazione predefinita: 50 secondi

È possibile utilizzare questa opzione per liberare risorse aumentando il tempo necessario prima del DagFileProcessortimeout. Si consiglia di aumentare questo valore se si verificano dei timeout nei registri di elaborazione DAG che impediscono il caricamento di file validi. DAGs

core.dagbag_import_timeout

Il numero di secondi prima dell'importazione di un file Python scade.

Impostazione predefinita: 30 secondi

È possibile utilizzare questa opzione per liberare risorse aumentando il tempo necessario prima che lo scheduler scada durante l'importazione di un file Python per estrarre gli oggetti DAG. Questa opzione viene elaborata come parte del «ciclo» dello scheduler e deve contenere un valore inferiore al valore specificato in. core.dag_file_processor_timeout

core.min_serialized_dag_update_interval

Il numero minimo di secondi dopo il quale vengono aggiornati i serializzati nel database. DAGs

Valore predefinito: 30

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di secondi dopo i quali vengono aggiornati i dati serializzati DAGs nel database. Si consiglia di aumentare questo valore se si dispone di DAGs un numero elevato o complesso DAGs. L'aumento di questo valore riduce il carico sullo scheduler e sul database man mano che DAGs vengono serializzati.

core.min_serialized_dag_fetch_interval

Il numero di secondi in cui un DAG serializzato viene recuperato nuovamente dal database quando è già caricato in. DagBag

Impostazione predefinita: 10

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di secondi di recupero di un DAG serializzato. Il valore deve essere maggiore del valore specificato in core.min_serialized_dag_update_interval per ridurre le velocità di «scrittura» del database. L'aumento di questo valore riduce il carico sul server web e sul database man mano che DAGs vengono serializzati.

Processi

Lo scheduler e gli operatori di Apache Airflow sono entrambi coinvolti nelle attività di attesa e disattesa. Lo scheduler prende le attività analizzate pronte per essere pianificate dallo stato Nessuno allo stato Pianificato. L'esecutore, anch'esso in esecuzione nel contenitore scheduler di Fargate, mette in coda tali attività e ne imposta lo stato su In coda. Quando i lavoratori hanno capacità, preleva l'operazione dalla coda e imposta lo stato su In esecuzione, che successivamente modifica lo stato in Operazione completata o Non riuscita a seconda che l'attività abbia esito positivo o negativo.

Parametri

Questa sezione descrive le opzioni di configurazione disponibili per le attività di Apache Airflow e i relativi casi d'uso.

Le opzioni di configurazione predefinite che Amazon MWAA sostituisce sono contrassegnate. red

Apache Airflow v3
Configurazione Caso d'uso

core.parallelism

Il numero massimo di istanze di attività che possono avere uno stato. Running

Predefinito: impostato dinamicamente in base a. (maxWorkers * maxCeleryWorkers) / schedulers * 1.5

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di istanze di attività che possono essere eseguite contemporaneamente. Il valore specificato deve essere il numero di lavoratori disponibili moltiplicato per la densità delle attività dei lavoratori. Si consiglia di modificare questo valore solo quando si verifica un gran numero di attività bloccate nello stato «In esecuzione» o «In coda».

core.execute_tasks_new_python_interpreter

Determina se Apache Airflow esegue le attività biforcando il processo principale o creando un nuovo processo Python.

Default: True

Quando è impostato suTrue, Apache Airflow riconosce le modifiche apportate ai plugin come un nuovo processo Python creato per eseguire attività.

celery.worker_concurrency

Amazon MWAA sostituisce l'installazione di base Airflow per questa opzione per scalare i lavoratori come parte del componente di scalabilità automatica.

Impostazione predefinita: non applicabile

Any value specified for this option is ignored.

celery.worker_autoscale

La concorrenza delle attività per i lavoratori.

Valori predefiniti:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge - 40,0

  • mw1.2xlarge - 80,0

È possibile utilizzare questa opzione per liberare risorse riducendo la concomitanza tra le mansioni dei maximum lavoratori. minimum I lavoratori accettano fino alle attività maximum simultanee configurate, indipendentemente dal fatto che vi siano risorse sufficienti per farlo. Se le attività sono pianificate senza risorse sufficienti, le attività falliscono immediatamente. Si consiglia di modificare questo valore per le attività che richiedono molte risorse riducendo i valori a un valore inferiore a quello predefinito per consentire una maggiore capacità per attività.

Apache Airflow v2
Configurazione Caso d'uso

core.parallelismo

Il numero massimo di istanze di attività che possono avere uno stato. Running

Predefinito: impostato dinamicamente in base a. (maxWorkers * maxCeleryWorkers) / schedulers * 1.5

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di istanze di attività che possono essere eseguite contemporaneamente. Il valore specificato deve essere il numero di lavoratori disponibili moltiplicato per la densità delle attività dei lavoratori. Si consiglia di modificare questo valore solo quando si verifica un gran numero di attività bloccate nello stato «In esecuzione» o «In coda».

core.dag_concurrency

Il numero di istanze di attività che possono essere eseguite contemporaneamente per ogni DAG.

Valore predefinito: 10000

È possibile utilizzare questa opzione per liberare risorse aumentando il numero di istanze di attività consentite per l'esecuzione simultanea. Ad esempio, se si dispone di cento attività parallele DAGs con dieci e si desidera che tutte vengano DAGs eseguite contemporaneamente, è possibile calcolare il parallelismo massimo come il numero di lavoratori disponibili moltiplicato per la densità delle attività dei lavoratori incelery.worker_concurrency, diviso per il numero di. DAGs

core.execute_tasks_new_python_interpreter

Determina se Apache Airflow esegue le attività biforcando il processo principale o creando un nuovo processo Python.

Default: True

Quando è impostato suTrue, Apache Airflow riconosce le modifiche apportate ai plugin come un nuovo processo Python creato per eseguire attività.

celery.worker_concurrency

Amazon MWAA sostituisce l'installazione di base Airflow per questa opzione per scalare i lavoratori come parte del componente di scalabilità automatica.

Impostazione predefinita: non applicabile

Any value specified for this option is ignored.

celery.worker_autoscale

La concorrenza delle attività per i lavoratori.

Valori predefiniti:

  • mw1.micro - 3,0

  • mw1.small - 5,0

  • mw1.medium - 10,0

  • mw1.large - 20,0

  • mw1.xlarge - 40,0

  • mw1.2xlarge - 80,0

È possibile utilizzare questa opzione per liberare risorse riducendo la concomitanza tra le mansioni dei maximum lavoratori. minimum I lavoratori accettano fino alle attività maximum simultanee configurate, indipendentemente dal fatto che vi siano risorse sufficienti per farlo. Se le attività sono pianificate senza risorse sufficienti, le attività falliscono immediatamente. Si consiglia di modificare questo valore per le attività che richiedono molte risorse riducendo i valori a un valore inferiore a quello predefinito per consentire una maggiore capacità per attività.