Solución de problemas: operadores DAGs, conexiones y otros problemas - Amazon Managed Workflows para Apache Airflow

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Solución de problemas: operadores DAGs, conexiones y otros problemas

Los temas de esta página describen las soluciones a las dependencias de Python de Apache Airflow v2 y v3, los complementos personalizados, los operadores, las conexiones DAGs, las tareas y los problemas del servidor web que pueden surgir en un entorno de Amazon Managed Workflows para Apache Airflow.

Conexiones

En el siguiente tema se describen los errores que puede recibir al utilizar una conexión de Apache Airflow o al utilizar otra AWS base de datos.

No puedo conectarme a Secrets Manager

Recomendamos los siguientes pasos:

  1. Aprenda a crear claves secretas para su conexión y variables de Apache Airflow en Configuración de una conexión Apache Airflow mediante un secreto AWS Secrets Manager.

  2. Aprenda a usar la clave secreta para una variable de Apache Airflow (test-variable) en Uso de una clave secreta AWS Secrets Manager para una variable de Apache Airflow.

  3. Aprenda a usar la clave secreta para una conexión de Apache Airflow (myconn) en Uso de una clave secreta AWS Secrets Manager para una conexión de Apache Airflow.

¿Cómo configuro las condiciones del administrador de información secreta de secretsmanager:ResourceTag/<tag-key> o una restricción de recursos en mi política de funciones de ejecución?

nota

Se aplica a la versión 2.0 y anteriores de Apache Airflow.

Actualmente no puede limitar el acceso a la información secreta de Secrets Manager mediante claves de condición u otras restricciones de recursos en el rol de ejecución de su entorno, debido a un problema conocido en Apache Airflow.

No puedo conectarme a Snowflake

Recomendamos los siguientes pasos:

  1. Pruebe sus DAGs complementos personalizados y sus dependencias de Python localmente con aws-mwaa-docker-imageson GitHub.

  2. Agregue las siguientes entradas al archivo requirements.txt de su entorno.

    apache-airflow-providers-snowflake==1.3.0
  3. Añada las siguientes importaciones al DAG:

    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

Asegúrese de que el objeto de conexión de Apache Airflow incluya los siguientes pares clave-valor:

  1. ID de conexión: snowflake_conn

  2. Tipo de conexión: Snowflake

  3. Host: <my account> <my region if not us-west-2>.snowflakecomputing.com

  4. Esquema: <my schema>

  5. Inicio de sesión: <my user name>

  6. Contraseña: ********

  7. Puerto: <port, if any>

  8. Extra:

    { "account": "<my account>", "warehouse": "<my warehouse>", "database": "<my database>", "region": "<my region if not using us-west-2 otherwise omit this line>" }

Por ejemplo:

>>> import json >>> from airflow.models.connection import Connection >>> myconn = Connection( ... conn_id='snowflake_conn', ... conn_type='Snowflake', ... host='123456789012.us-east-1.snowflakecomputing.com', ... schema='YOUR_SCHEMA' ... login='YOUR_USERNAME', ... password='YOUR_PASSWORD', ... port='YOUR_PORT' ... extra=json.dumps(dict(account='123456789012', warehouse='YOUR_WAREHOUSE', database='YOUR_DB_OPTION', region='us-east-1')), ... )

No encuentro mi conexión en la interfaz de usuario de Airflow

Apache Airflow proporciona plantillas de conexión en la interfaz de usuario de Apache Airflow. Lo usa para generar la cadena URI de conexión, independientemente del tipo de conexión. Si no hay ninguna plantilla de conexión disponible en la interfaz de usuario de Apache Airflow, se puede utilizar una plantilla de conexión alternativa para generar una cadena URI de conexión, por ejemplo, mediante la plantilla de conexión HTTP.

Recomendamos los siguientes pasos:

  1. Acceda a los tipos de conexión que proporciona Amazon MWAA en la interfaz de usuario de Apache Airflow en. Paquetes de proveedores de Apache Airflow instalados en entornos Amazon MWAA

  2. Acceda a los comandos para crear una conexión Apache Airflow en la CLI en. Referencia de los comandos de la CLI de Apache Airflow

  3. Aprenda a utilizar las plantillas de conexión de la interfaz de usuario de Apache Airflow indistintamente para los tipos de conexión que no están disponibles en la interfaz de usuario de Apache Airflow en Amazon MWAA de Información general sobre los tipos de conexión.

Servidor web

En el siguiente tema se describen los errores que puede recibir en su servidor web Apache Airflow en Amazon MWAA.

Aparece un error de 5 veces al acceder al servidor web

Recomendamos los siguientes pasos:

  1. Compruebe las opciones de configuración de Apache Airflow. Compruebe que los pares clave-valor que especificó como opción de configuración de Apache Airflow, por ejemplo, se hayan configurado AWS Secrets Manager correctamente. Para obtener más información, consulte. No puedo conectarme a Secrets Manager

  2. Compruebe los requirements.txt. Compruebe que el paquete “extras” de Airflow y las demás bibliotecas que figuran en su requirements.txt sean compatibles con su versión de Apache Airflow.

  3. Explore las formas de especificar las dependencias de Python en un requirements.txt archivo, consulte. Administración de las dependencias de Python en requirements.txt

Me sale un error The scheduler does not seem to be running

Si parece que el programador no está funcionando o se recibió el último «latido» hace varias horas, es DAGs posible que no aparezca en la lista de Apache Airflow y no se programarán nuevas tareas.

Recomendamos los siguientes pasos:

  1. Confirme que su grupo de seguridad de VPC permita el acceso entrante al puerto 5432. Este puerto es necesario para conectarse a la base de datos de metadatos de PostgreSQL de Amazon Aurora de su entorno. Tras añadir esta regla, espere unos minutos a Amazon MWAA y el error desaparecerá. Para obtener más información, consulte. Seguridad en la VPC en Amazon MWAA

    nota
    • La base de metadatos de Aurora PostgreSQL forma parte de la arquitectura de servicios de Amazon MWAA y no está disponible en su. Cuenta de AWS

    • Los errores relacionados con la base de datos suelen ser síntoma de un fallo del programador, no su causa principal.

  2. Si el programador no se está ejecutando, podría deberse a varios factores, como errores en la instalación de las dependencias o a una sobrecarga del programador. Confirme que sus DAGs complementos y requisitos funcionan correctamente accediendo a los grupos de registros correspondientes en Logs. CloudWatch Para obtener más información, consultaMonitorización y métricas de Amazon Managed Workflows para Apache Airflow.

Tareas

En el siguiente tema se describen los errores que puede recibir en las tareas de Apache Airflow en un entorno.

Mis tareas se atascan o no las finalizo

Si sus tareas de Apache Airflow están “bloqueadas” o no se completan, le recomendamos que siga los siguientes pasos:

  1. Es posible que haya una gran cantidad de DAGs definiciones. Reduzca el número DAGs y actualice el entorno (por ejemplo, cambiando un nivel de registro) para forzar el restablecimiento.

    1. Airflow analiza DAGs si están activados o no. Si utiliza más del 50% de la capacidad de su entorno, podría empezar a sobrecargar el programador Apache Airflow. Esto se traduce en un mayor tiempo total de análisis en CloudWatch las métricas o en tiempos de procesamiento del DAG prolongados en CloudWatch los registros. Hay otras formas de optimizar las configuraciones de Apache Airflow que no se tratan en esta guía.

    2. Para obtener más información sobre las prácticas recomendadas que recomendamos para ajustar el rendimiento de su entorno, consulteAjuste del desempeño de Apache Airflow en Amazon MWAA.

  2. Es posible que haya una gran cantidad de tareas en cola. Suele mostrarse como una cantidad grande (y creciente) de tareas en el None estado o como una gran cantidad en el interior. Queued Tasks and/or Tasks Pending CloudWatch Esto puede producirse por varias razones:

    1. Si hay más tareas que ejecutar de las que el entorno tiene capacidad para ejecutar, and/or una gran cantidad de tareas que estaban en cola antes del escalado automático tienen tiempo de detectarlas y desplegar más trabajadores.

    2. Si hay más tareas que ejecutar de las que un entorno tiene capacidad para ejecutar, se recomienda reducir la cantidad de tareas que se DAGs ejecutan simultáneamente y and/or aumentar el número mínimo de trabajadores de Apache Airflow.

    3. Si hay un gran número de tareas en cola antes de que el escalado automático haya tenido tiempo de detectar y desplegar más trabajadores, recomendamos escalonar el despliegue de tareas y and/or aumentar el número mínimo de trabajadores de Apache Airflow.

    4. Puede usar el comando update-environment en AWS Command Line Interface (AWS CLI) para cambiar la cantidad mínima o máxima de trabajadores que se ejecutan en su entorno.

      aws mwaa update-environment --name MyEnvironmentName --min-workers 2 --max-workers 10
    5. Para obtener más información sobre las prácticas recomendadas que recomendamos para ajustar el rendimiento de su entorno, consulte. Ajuste del desempeño de Apache Airflow en Amazon MWAA

  3. Si sus tareas están bloqueadas en el estado “en ejecución”, también puede borrarlas o marcarlas como ejecutadas o fallidas. Esto permite que el componente de escalado automático de su entorno reduzca verticalmente la cantidad de procesos de trabajo que trabajan en su entorno. La siguiente imagen muestra un ejemplo de una tarea innecesaria.

    La imagen muestra una tarea pendiente.
    1. Elija el círculo para la tarea pendiente y, a continuación, seleccione Borrar como se muestra. Esto permite a Amazon MWAA reducir el número de trabajadores; de lo contrario, Amazon MWAA no puede determinar cuáles DAGs están habilitados o deshabilitados, ni puede reducir la plantilla si todavía hay tareas en cola.

      Acciones de Apache Airflow
  4. Para más información sobre el ciclo de vida de las tareas de Apache Airflow en Conceptos, consulte la guía de referencia de Apache Airflow.

En Airflow v3 se producen errores en las tareas sin necesidad de iniciar sesión

Si sus tareas de Apache Airflow 3 fallan sin registros, siga estos pasos:

  • Si los registros de trabajo presentan un error, por ejemplo, Task handler raised error: WorkerLostError('Worker exited prematurely: exitcode 15 Job: 12.') alrededor del momento en que la tarea falló, esto indica que el proceso de trabajo bifurcado asignado a la tarea probablemente finalizó inesperadamente.

    Para solucionar este problema, considere la posibilidad de configurar celery.worker_autoscale con los mismos valores mínimos y máximos. Por ejemplo:

    celery.worker_autoscale=5,5 # for mw1.small celery.worker_autoscale=10,10 # for mw1.medium celery.worker_autoscale=20,20 # for mw1.large

    Esto garantiza que el tamaño del grupo de trabajadores permanezca fijo, lo que evita el despido inesperado de trabajadores.

CLI

En el siguiente tema se describen los errores que puede recibir al ejecutar los comandos CLI de Airflow en. AWS Command Line Interface

Aparece un error '503' al activar un DAG en la CLI

La CLI de Airflow se ejecuta en el servidor web Apache Airflow, que tiene una simultaneidad limitada. Por lo general, se pueden ejecutar un máximo de 4 comandos de CLI simultáneamente.

¿Por qué falla el comando dags backfill de la CLI de Apache Airflow? ¿Qué solución hay?

nota

Lo siguiente solo se aplica a los entornos Apache Airflow v2.0.2.

El backfill comando, al igual que otros comandos de la CLI de Apache Airflow, analiza todos DAGs localmente antes de que DAGs se procese ninguno, independientemente del DAG al que se aplique la operación de CLI. En los entornos de Amazon MWAA que utilizan Apache Airflow v2.0.2, dado que los complementos y los requisitos aún no están instalados en el servidor web cuando se ejecuta el comando CLI, la operación de análisis falla y la operación no se invoca. backfill Si no tuviera ningún requisito ni complemento en su entorno, la operación backfill se realizaría correctamente.

Para poder ejecutar el comando backfill de la CLI, recomendamos invocarlo en un operador de bash. En un operador bash, backfill se inicia desde el trabajador, lo que le permite analizarlo correctamente, ya que todos DAGs los requisitos y complementos necesarios están disponibles e instalados. Utilice el siguiente ejemplo para crear un DAG con un BashOperator modo de ejecución. backfill

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="backfill_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="airflow dags backfill my_dag_id" )

Operadores

En el siguiente tema se describen los errores que se pueden producir al utilizar los operadores.

Se produjo un error PermissionError: [Errno 13] Permission denied al utilizar el operador S3Transform

Siga estos pasos si está intentando ejecutar un script de intérprete de comandos con el operador S3Transform y recibe un mensaje de error PermissionError: [Errno 13] Permission denied. En los pasos siguientes se supone que ya tiene un archivo plugins.zip. Si va a crear un archivo plugins.zip nuevo, consulteInstalación de complementos personalizados.

  1. Pruebe sus DAGs complementos personalizados y sus dependencias de Python localmente con aws-mwaa-docker-imageson GitHub.

  2. Cree su script de transformación.

    #!/bin/bash cp $1 $2
  3. (opcional) Es posible que los usuarios de macOS y Linux tengan que ejecutar el siguiente comando para asegurarse de que el script es ejecutable.

    chmod 777 transform_test.sh
  4. Añada el script a su plugins.zip.

    zip plugins.zip transform_test.sh
  5. Siga los pasos que se indican en Carga del archivo plugins.zip a Amazon S3.

  6. Siga los pasos que se indican en Especificación de la versión de plugins.zip en la consola de Amazon MWAA.

  7. Utilice los siguientes DAG.

    from airflow import DAG from airflow.providers.amazon.aws.operators.s3_file_transform import S3FileTransformOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") with DAG (dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: file_transform = S3FileTransformOperator( task_id='file_transform', transform_script='/usr/local/airflow/plugins/transform_test.sh', source_s3_key='s3://amzn-s3-demo-bucket/files/input.txt', dest_s3_key='s3://amzn-s3-demo-bucket/files/output.txt' )
  8. Siga los pasos que se indican en Carga del código del DAG a Amazon S3.