Risoluzione dei problemi: DAGs, Operatori, Connessioni e altri problemi - Flussi di lavoro gestiti da Amazon per Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Risoluzione dei problemi: DAGs, Operatori, Connessioni e altri problemi

Gli argomenti di questa pagina descrivono le risoluzioni alle dipendenze Python di Apache Airflow v2 e v3, i plugin DAGs personalizzati, gli operatori, le connessioni, le attività e i problemi del server web che potresti riscontrare in un ambiente Amazon Managed Workflows for Apache Airflow.

Connessioni

L'argomento seguente descrive gli errori che potresti ricevere quando utilizzi una connessione Apache Airflow o usi un altro database. AWS

Non riesco a connettermi a Secrets Manager

È consigliabile eseguire le operazioni seguenti:

  1. Scopri come creare chiavi segrete per la tua connessione Apache Airflow e le variabili in. Configurazione di una connessione Apache Airflow utilizzando un segreto AWS Secrets Manager

  2. Scopri come usare la chiave segreta per una variabile test-variable Apache Airflow () in. Utilizzo di una chiave segreta AWS Secrets Manager per una variabile Apache Airflow

  3. Scopri come utilizzare la chiave segreta per una connessione myconn Apache Airflow () in. Utilizzo di una chiave segreta AWS Secrets Manager per una connessione Apache Airflow

Come posso configurare le condizioni di secretsmanager:ResourceTag/<tag-key> Secrets Manager o una restrizione delle risorse nella mia politica sui ruoli di esecuzione?

Nota

Si applica a Apache Airflow versione 2.0 e precedenti.

Attualmente, non è possibile limitare l'accesso ai segreti di Secrets Manager utilizzando chiavi di condizione o altre restrizioni relative alle risorse nel ruolo di esecuzione dell'ambiente, a causa di un problema noto in Apache Airflow.

Non riesco a connettermi a Snowflake

È consigliabile eseguire le operazioni seguenti:

  1. Testa i tuoi DAGs plugin personalizzati e le dipendenze Python localmente usando on. aws-mwaa-docker-images GitHub

  2. Aggiungi le seguenti voci al file requirements.txt relativo al tuo ambiente.

    apache-airflow-providers-snowflake==1.3.0
  3. Aggiungete le seguenti importazioni al vostro DAG:

    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

Assicurati che l'oggetto di connessione Apache Airflow includa le seguenti coppie chiave-valore:

  1. ID connessione: snowflake_conn

  2. Tipo di connessione: Snowflake

  3. Ospite:. <my account> <my region if not us-west-2>.snowflakecomputing.com

  4. Schema: <my schema>

  5. Accedi: <my user name>

  6. Password: ********

  7. Porta: <port, if any>

  8. Supplementare:

    { "account": "<my account>", "warehouse": "<my warehouse>", "database": "<my database>", "region": "<my region if not using us-west-2 otherwise omit this line>" }

Ad esempio:

>>> import json >>> from airflow.models.connection import Connection >>> myconn = Connection( ... conn_id='snowflake_conn', ... conn_type='Snowflake', ... host='123456789012.us-east-1.snowflakecomputing.com', ... schema='YOUR_SCHEMA' ... login='YOUR_USERNAME', ... password='YOUR_PASSWORD', ... port='YOUR_PORT' ... extra=json.dumps(dict(account='123456789012', warehouse='YOUR_WAREHOUSE', database='YOUR_DB_OPTION', region='us-east-1')), ... )

Non riesco a trovare la mia connessione nell'interfaccia utente Airflow

Apache Airflow fornisce modelli di connessione nell'interfaccia utente di Apache Airflow. Lo utilizza per generare la stringa URI di connessione, indipendentemente dal tipo di connessione. Se un modello di connessione non è disponibile nell'interfaccia utente di Apache Airflow, è possibile utilizzare un modello di connessione alternativo per generare una stringa URI di connessione, ad esempio utilizzando il modello di connessione HTTP.

È consigliabile eseguire le operazioni seguenti:

  1. Accedi ai tipi di connessione forniti da Amazon MWAA nell'interfaccia utente di Apache Airflow all'indirizzo. Pacchetti del provider Apache Airflow installati in ambienti Amazon MWAA

  2. Accedi ai comandi per creare una connessione Apache Airflow nella CLI all'indirizzo. Riferimento ai comandi CLI Apache Airflow

  3. Scopri come utilizzare i modelli di connessione nell'interfaccia utente di Apache Airflow in modo intercambiabile per tipi di connessione che non sono disponibili nell'interfaccia utente di Apache Airflow su Amazon MWAA all'indirizzo. Panoramica dei tipi di connessione

Server Web

L'argomento seguente descrive gli errori che potresti ricevere per il tuo server web Apache Airflow su Amazon MWAA.

Ricevo un errore 5xx durante l'accesso al server web

È consigliabile eseguire le operazioni seguenti:

  1. Controlla le opzioni di configurazione di Apache Airflow. Verifica che le coppie chiave-valore che hai specificato come opzione di configurazione Apache Airflow, ad esempio AWS Secrets Manager, siano configurate correttamente. Per ulteriori informazioni, fare riferimento a. Non riesco a connettermi a Secrets Manager

  2. Controlla ilrequirements.txt. Verifica che il pacchetto «extras» di Airflow e le altre librerie elencate nel tuo requirements.txt siano compatibili con la tua versione di Apache Airflow.

  3. Esplora i modi per specificare le dipendenze Python in un requirements.txt file, consulta. Gestione delle dipendenze Python in requirements.txt

Ricevo un errore The scheduler does not seem to be running

Se lo scheduler non sembra funzionare o se l'ultimo «battito cardiaco» è stato ricevuto diverse ore fa, è possibile che il tuo nome non DAGs sia elencato in Apache Airflow e che non vengano pianificate nuove attività.

È consigliabile eseguire le operazioni seguenti:

  1. Verifica che il tuo gruppo di sicurezza VPC consenta l'accesso in entrata alla porta. 5432 Questa porta è necessaria per connettersi al database di metadati PostgreSQL di Amazon Aurora per il tuo ambiente. Dopo aver aggiunto questa regola, concedi alcuni minuti ad Amazon MWAA e l'errore può scomparire. Per ulteriori informazioni, consulta. Sicurezza nel tuo VPC su Amazon MWAA

    Nota
    • Il metadatabase Aurora PostgreSQL fa parte dell'architettura del servizio Amazon MWAA e non è disponibile nel tuo. Account AWS

    • Gli errori relativi al database sono in genere un sintomo di un errore dello scheduler e non la causa principale.

  2. Se lo scheduler non è in esecuzione, potrebbe essere dovuto a una serie di fattori, come errori di installazione delle dipendenze o un sovraccarico dello scheduler. Verifica che i tuoi plugin e DAGs i tuoi requisiti funzionino correttamente accedendo ai gruppi di log corrispondenti in Logs. CloudWatch Per ulteriori informazioni, fare riferimento a. Monitoraggio e parametri per Amazon Managed Workflows for Apache Airflow

Attività

L'argomento seguente descrive gli errori che potresti ricevere per le attività di Apache Airflow in un ambiente.

Le mie attività sono bloccate o non vengono completate

Se le tue attività su Apache Airflow sono «bloccate» o non vengono completate, ti consigliamo di seguire i seguenti passaggi:

  1. Potrebbe esserci un gran numero di definizioni. DAGs Riduci il numero DAGs ed esegui un aggiornamento dell'ambiente (ad esempio modificando un livello di registro) per forzare il ripristino.

    1. Airflow analizza DAGs se sono abilitati o meno. Se utilizzi più del 50% della capacità del tuo ambiente, potresti iniziare a sovraccaricare lo scheduler Apache Airflow. Ciò comporta un tempo di analisi totale elevato nelle CloudWatch metriche o lunghi tempi di elaborazione DAG nei registri. CloudWatch Esistono altri modi per ottimizzare le configurazioni di Apache Airflow che non rientrano nell'ambito di questa guida.

    2. Per ulteriori informazioni sulle migliori pratiche consigliate per ottimizzare le prestazioni del proprio ambiente, consulta. Ottimizzazione delle prestazioni per Apache Airflow su Amazon MWAA

  2. Potrebbe esserci un gran numero di attività in coda. Questo viene spesso indicato come un numero elevato e crescente di attività None nello stato o come un numero elevato di attività all'interno. Queued Tasks and/or Tasks Pending CloudWatch Ciò può verificarsi per i seguenti motivi:

    1. Se ci sono più attività da eseguire rispetto a quelle che l'ambiente è in grado di eseguire, and/or un gran numero di attività che erano in coda prima della scalabilità automatica hanno il tempo di rilevarle e impiegare altri lavoratori.

    2. Se ci sono più attività da eseguire rispetto a quelle che un ambiente ha la capacità di eseguire, consigliamo di ridurre il numero di attività DAGs eseguite contemporaneamente, and/or aumentando il numero minimo di lavoratori Apache Airflow.

    3. Se c'è un gran numero di attività che erano in coda prima che la scalabilità automatica avesse avuto il tempo di rilevare e impiegare lavoratori aggiuntivi, consigliamo di eseguire una distribuzione graduale delle attività aumentando il numero minimo di lavoratori Apache Airflow. and/or

    4. È possibile utilizzare il comando update-environment in AWS Command Line Interface (AWS CLI) per modificare il numero minimo o massimo di worker in esecuzione nel proprio ambiente.

      aws mwaa update-environment --name MyEnvironmentName --min-workers 2 --max-workers 10
    5. Per ulteriori informazioni sulle best practice consigliate per ottimizzare le prestazioni del proprio ambiente, consulta. Ottimizzazione delle prestazioni per Apache Airflow su Amazon MWAA

  3. Se le tue attività sono bloccate nello stato «in esecuzione», puoi anche cancellarle o contrassegnarle come riuscite o non riuscite. Ciò consente al componente di scalabilità automatica dell'ambiente di ridurre il numero di lavoratori che operano nell'ambiente. L'immagine seguente mostra un esempio di attività bloccata.

    Questa è un'immagine con un'attività bloccata.
    1. Scegli il cerchio per l'attività bloccata, quindi seleziona Cancella (come mostrato). Ciò consente ad Amazon MWAA di ridimensionare i dipendenti; in caso contrario, Amazon MWAA non è in grado di determinare quali DAGs sono abilitati o disabilitati e non può ridimensionare se ci sono ancora attività in coda.

      Azioni Apache Airflow
  4. Scopri di più sul ciclo di vita delle attività di Apache Airflow su Concepts nella guida di riferimento di Apache Airflow.

Ricevo errori nelle attività senza log in Airflow v3

Se le attività di Apache Airflow 3 non funzionano senza registri, procedi nel seguente modo:

  • Se i registri di lavoro presentano un errore, ad esempio nel momento Task handler raised error: WorkerLostError('Worker exited prematurely: exitcode 15 Job: 12.') in cui l'operazione non è riuscita, ciò indica che il processo di lavoro biforcato assegnato all'attività è stato probabilmente interrotto in modo imprevisto.

    Per risolvere questo problema, prendete in considerazione la possibilità di configurare celery.worker_autoscale con gli stessi valori minimo e massimo. Ad esempio:

    celery.worker_autoscale=5,5 # for mw1.small celery.worker_autoscale=10,10 # for mw1.medium celery.worker_autoscale=20,20 # for mw1.large

    Ciò garantisce che la dimensione del pool di lavoratori rimanga fissa, evitando licenziamenti imprevisti dei lavoratori.

CLI

L'argomento seguente descrive gli errori che potresti ricevere durante l'esecuzione dei comandi CLI Airflow in. AWS Command Line Interface

Ricevo un errore '503' quando si attiva un DAG nella CLI

L'Airflow CLI funziona sul server web Apache Airflow, che ha una concorrenza limitata. In genere è possibile eseguire contemporaneamente un massimo di 4 comandi CLI.

Perché il comando CLI dags backfill Apache Airflow fallisce? Esiste una soluzione alternativa?

Nota

Quanto segue si applica solo agli ambienti Apache Airflow v2.0.2.

Il backfill comando, come altri comandi CLI di Apache Airflow, analizza DAGs tutti localmente prima DAGs che vengano elaborati, indipendentemente dal DAG a cui si applica l'operazione CLI. Negli ambienti Amazon MWAA che utilizzano Apache Airflow v2.0.2, poiché i plug-in e i requisiti non sono ancora installati sul server Web al momento dell'esecuzione del comando CLI, l'operazione di analisi fallisce e l'operazione non viene richiamata. backfill Se non disponi di requisiti o plug-in nel tuo ambiente, l'operazione avrebbe esito positivo. backfill

Per poter eseguire il comando backfill CLI, consigliamo di richiamarlo in un operatore bash. In un operatore bash, backfill viene avviato dal worker, consentendogli di eseguire correttamente l'analisi man mano che tutti i requisiti e i plugin necessari sono disponibili e installati. DAGs Utilizzate l'esempio seguente per creare un DAG con un to run. BashOperator backfill

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="backfill_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="airflow dags backfill my_dag_id" )

Operatori

L'argomento seguente descrive gli errori che si potrebbero ricevere quando si utilizzano gli operatori.

Ho ricevuto un PermissionError: [Errno 13] Permission denied errore utilizzando l'operatore S3Transform

Ti consigliamo di seguire i seguenti passaggi se stai cercando di eseguire uno script di shell con l'operatore S3Transform e ricevi un errore. PermissionError: [Errno 13] Permission denied I passaggi seguenti presuppongono che tu abbia un file plugins.zip esistente. Se state creando un nuovo file plugins.zip, fate riferimento aInstallazione di plugin personalizzati.

  1. Testa i tuoi DAGs plugin personalizzati e le dipendenze Python localmente usando on. aws-mwaa-docker-images GitHub

  2. Crea il tuo script di «trasformazione».

    #!/bin/bash cp $1 $2
  3. (opzionale) Gli utenti macOS e Linux potrebbero dover eseguire il comando seguente per assicurarsi che lo script sia eseguibile.

    chmod 777 transform_test.sh
  4. Aggiungi lo script al tuo file plugins.zip.

    zip plugins.zip transform_test.sh
  5. Segui la procedura descritta in Caricare il file plugins.zip su Amazon S3.

  6. Segui la procedura descritta in Specificare la versione di plugins.zip sulla console Amazon MWAA.

  7. Crea il seguente DAG.

    from airflow import DAG from airflow.providers.amazon.aws.operators.s3_file_transform import S3FileTransformOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") with DAG (dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: file_transform = S3FileTransformOperator( task_id='file_transform', transform_script='/usr/local/airflow/plugins/transform_test.sh', source_s3_key='s3://amzn-s3-demo-bucket/files/input.txt', dest_s3_key='s3://amzn-s3-demo-bucket/files/output.txt' )
  8. Segui la procedura descritta in Caricamento del codice DAG su Amazon S3.