문제 해결: DAGs, 연산자, 연결 및 기타 문제 - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

문제 해결: DAGs, 연산자, 연결 및 기타 문제

이 페이지의 주제에서는 Amazon Managed Workflows for Apache Airflow 환경에서 발생할 수 있는 Apache Airflow v2 및 v3 Python 종속성, 사용자 지정 플러그인, DAGs, 연산자, 연결, 작업 및 웹 서버 문제에 대한 해결 방법을 설명합니다.

연결

다음 주제에서는 Apache Airflow 연결을 사용하거나 다른 AWS 데이터베이스를 사용할 때 발생할 수 있는 오류에 대해 설명합니다.

Secrets Manager에 연결할 수 없음

다음 단계를 수행하는 것이 좋습니다.

  1. AWS Secrets Manager 보안 암호를 사용하여 Apache Airflow 연결 구성에서 Apache Airflow 연결 및 변수를 위한 암호 키를 만드는 방법을 알아봅니다.

  2. Apache Airflow 변수에 AWS Secrets Manager 에서 보안 키 사용에서 Apache Airflow 변수(test-variable)에 대해 암호 키를 사용하는 방법을 알아봅니다.

  3. Apache Airflow 연결 AWS Secrets Manager 에에서 보안 키 사용에서 Apache Airflow 연결(myconn)에 대해 암호 키를 사용하는 방법을 알아봅니다.

실행 역할 정책에서 secretsmanager:ResourceTag/<tag-key> 암호 관리자 조건 또는 리소스 제한을 구성하려면 어떻게 해야 합니까?

참고

Apache Airflow 버전 2.0 이하에 적용됩니다.

Apache Airflow의 알려진 문제로 인해 현재는 환경 실행 역할의 조건 키 또는 기타 리소스 제한을 사용하여 Secrets Manager 암호에 대한 액세스를 제한할 수 없습니다.

Snowflake에 연결할 수 없음

다음 단계를 수행하는 것이 좋습니다.

  1. GitHub에서 aws-mwaa-docker-images를 사용하여 DAGs, 사용자 지정 플러그인 및 Python 종속성을 로컬에서 테스트합니다. aws-mwaa-docker-images

  2. 사용자 환경의 requirements.txt 파일에 다음 항목을 추가합니다.

    apache-airflow-providers-snowflake==1.3.0
  3. DAG에 다음 가져오기를 추가합니다.

    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

Apache Airflow 연결 객체에 다음 키-값 쌍이 포함되어 있는지 확인합니다.

  1. Conn Id: snowflake_conn

  2. Conn Type: Snowflake

  3. Host: <my account>.<my region if not us-west-2>.snowflakecomputing.com

  4. Schema: <my schema>

  5. Login: <my user name>

  6. Password: ********

  7. Port: <port, if any>

  8. 추가:

    { "account": "<my account>", "warehouse": "<my warehouse>", "database": "<my database>", "region": "<my region if not using us-west-2 otherwise omit this line>" }

예:

>>> import json >>> from airflow.models.connection import Connection >>> myconn = Connection( ... conn_id='snowflake_conn', ... conn_type='Snowflake', ... host='123456789012.us-east-1.snowflakecomputing.com', ... schema='YOUR_SCHEMA' ... login='YOUR_USERNAME', ... password='YOUR_PASSWORD', ... port='YOUR_PORT' ... extra=json.dumps(dict(account='123456789012', warehouse='YOUR_WAREHOUSE', database='YOUR_DB_OPTION', region='us-east-1')), ... )

Airflow UI에서 연결을 찾을 수 없음

Apache Airflow는 Apache Airflow UI에 연결 템플릿을 제공합니다. 연결 유형에 관계없이 이를 사용하여 연결 URI 문자열을 생성합니다. Apache Airflow UI에서 연결 템플릿을 사용할 수 없는 경우 HTTP 연결 템플릿 사용과 같은 대체 연결 템플릿을 사용하여 연결 URI 문자열을 생성할 수 있습니다.

다음 단계를 수행하는 것이 좋습니다.

  1. 의 Apache Airflow UI에서 Amazon MWAA가 제공하는 연결 유형에 액세스합니다Amazon MWAA 환경에 설치된 Apache Airflow 공급자 패키지.

  2. 명령에 액세스하여의 CLI에서 Apache Airflow 연결을 생성합니다Apache Airflow CLI 명령 참조.

  3. 연결 유형 개요의 Amazon MWAA의 Apache Airflow UI에서 사용할 수 없는 연결 유형에 대해 Apache Airflow UI의 연결 템플릿을 서로 바꿔서 사용하는 방법을 알아봅니다.

웹 서버

다음 주제에서는 Amazon MWAA의 Apache Airflow 웹 서버에 대해 발생할 수 있는 오류를 설명합니다.

웹 서버에 액세스하는 동안 5xx 오류가 발생합니다.

다음 단계를 수행하는 것이 좋습니다.

  1. Apache Airflow 구성 옵션을 확인합니다. 와 같이 Apache Airflow 구성 옵션으로 지정한 키-값 페어가 올바르게 구성 AWS Secrets Manager되었는지 확인합니다. 자세한 내용은 단원을 참조하십시오Secrets Manager에 연결할 수 없음.

  2. requirements.txt를 확인합니다. Airflow "extras" 패키지 및 requirements.txt에 나열된 기타 라이브러리가 Apache Airflow 버전과 호환되는지 확인합니다.

  3. requirements.txt 파일에서 Python 종속성을 지정하는 방법을 살펴봅니다. 단원을 참조하십시오requirements.txt에서의 Python 종속성 관리.

The scheduler does not seem to be running 오류가 발생합니다.

스케줄러가 실행 중이 아니거나 몇 시간 전에 마지막 '하트비트'가 수신된 경우 DAGs가 Apache Airflow에 나열되지 않을 수 있으며 새 작업이 예약되지 않습니다.

다음 단계를 수행하는 것이 좋습니다.

  1. VPC 보안 그룹이 포트 5432에 대한 인바운드 액세스를 허용하는지 확인합니다. 이 포트는 사용자 환경의 Amazon Aurora PostgreSQL 메타데이터 데이터베이스에 연결하는 데 필요합니다. 이 규칙을 추가한 후 Amazon MWAA에 몇 분 정도 시간을 주면 오류가 사라질 수 있습니다. 자세한 내용은 단원을 참조하십시오Amazon MWAA에서 VPC 보안.

    참고
    • Aurora PostgreSQL 메타데이터베이스는 Amazon MWAA 서비스 아키텍처의 일부이며에서 사용할 수 없습니다 AWS 계정.

    • 데이터베이스 관련 오류는 일반적으로 스케줄러 오류의 증상이지 근본 원인은 아닙니다.

  2. 스케줄러가 실행되지 않는 경우 종속성 설치 실패 또는 스케줄러 과부하와 같은 여러 요인 때문일 수 있습니다. CloudWatch Logs의 해당 로그 그룹에 액세스하여 DAGs, 플러그인 및 요구 사항이 올바르게 작동하는지 확인합니다. 자세한 내용은 단원을 참조하십시오Amazon Managed Workflows for Apache Airflow에 대한 모니터링 및 지표.

작업

다음 주제에서는 환경의 Apache Airflow 작업에 대해 발생할 수 있는 오류를 설명합니다.

내 작업이 멈춰 있거나 완료되지 않음

Apache Airflow 작업이 “멈춘” 상태이거나 완료되지 않는 경우 다음 단계를 따르는 것이 좋습니다.

  1. 정의된 DAGs 수가 많을 수 있습니다. DAG 수를 줄이고 환경 업데이트(예: 로그 수준 변경)를 수행하여 강제로 재설정합니다.

    1. Airflow는 DAG의 활성화 여부를 구문 분석합니다. 환경 용량의 50% 이상을 사용하는 경우 Apache Airflow 스케줄러에 부담을 주기 시작할 수 있습니다. 이로 인해 CloudWatch 지표의 총 구문 분석 시간이 길어지거나 CloudWatch Logs의 DAG 처리 시간이 길어집니다. Apache Airflow 구성을 최적화하는 다른 방법도 있으나 이 설명서에서는 다루지 않습니다.

    2. 환경의 성능을 조정하는 데 권장되는 모범 사례에 대해 자세히 알아보려면 섹션을 참조하세요Amazon MWAA의 Apache Airflow 성능 튜닝.

  2. 대기열에 많은 작업이 있을 수 있습니다. 이는 종종 None 상태의 태스크 수가 많고 증가하는 것으로 표시되거나 Queued Tasks 및/또는 Tasks Pending CloudWatch에서 많은 수로 표시됩니다. 이는 다음과 같은 이유로 발생할 수 있습니다.

    1. 환경보다 더 많은 작업을 실행할 수 있는 경우 실행할 수 있는 용량 및/또는 Auto Scaling 전에 대기열에 있는 많은 수의 작업이 작업을 감지하고 추가 작업자를 배포할 시간이 있습니다.

    2. 실행할 작업이 실행할 수 있는 용량보다 많으면 DAGs 실행되는 작업 수를 이거나 최소 Apache Airflow 작업자 수를 늘리는 것이 좋습니다.

    3. Auto Scaling이 추가 작업자를 감지하고 배포하는 데 시간이 걸리기 전에 대기 중인 작업이 많은 경우 작업 배포를 시차를 두거나 최소 Apache Airflow 작업자를 늘리는 것이 좋습니다.

    4. AWS Command Line Interface (AWS CLI)의 update-environment 명령을 사용하여 환경에서 실행되는 최소 또는 최대 작업자 수를 변경할 수 있습니다.

      aws mwaa update-environment --name MyEnvironmentName --min-workers 2 --max-workers 10
    5. 환경의 성능을 조정하는 데 권장되는 모범 사례에 대해 자세히 알아보려면 섹션을 참조하세요Amazon MWAA의 Apache Airflow 성능 튜닝.

  3. 작업이 “실행 중” 상태에서 멈춘 경우 작업을 지우거나 성공 또는 실패로 표시할 수도 있습니다. 이렇게 하면 사용자 환경의 자동 확장 구성 요소를 통해 환경에서 실행되는 작업자 수를 줄일 수 있습니다. 다음 이미지는 복잡한 작업의 예를 보여줍니다.

    이것은 문제가 있는 작업이 포함된 이미지입니다.
    1. 문제 있는 작업의 원을 선택한 다음 지우기(그림 참조)를 선택합니다. 이렇게 하면 Amazon MWAA에서 작업자의 규모를 축소할 수 있습니다. 그렇지 않으면 Amazon MWAA에서 어떤 DAG가 활성화되었는지 또는 비활성화되었는지 확인할 수 없으며 대기 중인 작업이 있는 경우 규모를 축소할 수 없습니다.

      Apache Airflow 작업
  4. Apache Airflow 참조 가이드개념에서 Apache Airflow 작업 수명 주기에 대해 자세히 알아봅니다.

Airflow v3에서 로그 없이 작업 실패가 발생합니다.

로그 없이 Apache Airflow 3 작업이 실패하는 경우 다음 단계를 따릅니다.

  • 작업자 로그에 작업이 실패한 Task handler raised error: WorkerLostError('Worker exited prematurely: exitcode 15 Job: 12.') 시간과 같은 오류가 표시되는 경우 이는 작업에 할당된 포크된 작업자 프로세스가 예기치 않게 종료되었을 수 있음을 나타냅니다.

    이를 해결하려면 동일한 최소값과 최대값으로 celery.worker_autoscale을 구성하는 것이 좋습니다. 예:

    celery.worker_autoscale=5,5 # for mw1.small celery.worker_autoscale=10,10 # for mw1.medium celery.worker_autoscale=20,20 # for mw1.large

    이렇게 하면 작업자 풀 크기를 고정하여 예상치 못한 작업자 종료를 방지할 수 있습니다.

CLI

다음 주제에서는에서 Airflow CLI 명령을 실행할 때 발생할 수 있는 오류에 대해 설명합니다 AWS Command Line Interface.

CLI에서 DAG를 트리거할 때 '503' 오류가 발생합니다.

Airflow CLI는 동시성이 제한된 Apache Airflow 웹 서버에서 실행됩니다. 일반적으로 최대 4개의 CLI 명령을 동시에 실행할 수 있습니다.

dags backfill Apache Airflow CLI 명령이 실패하는 이유는 무엇입니까? 해결 방법이 있나요?

참고

다음은 Apache Airflow v2.0.2 환경에만 적용됩니다.

다른 Apache Airflow CLI 명령과 마찬가지로 backfill 명령은 CLI 작업이 적용되는 DAG에 관계없이 DAG가 처리되기 전에 모든 DAG를 로컬에서 구문 분석합니다. Apache Airflow v2.0.2를 사용하는 Amazon MWAA 환경에서는 CLI 명령이 실행될 때까지 플러그인 및 요구 사항이 아직 웹 서버에 설치되지 않아 구문 분석 작업이 실패하고 backfill 작업이 호출되지 않기 때문입니다. 환경에 요구 사항이나 플러그인이 없으면 backfill 작업은 성공할 수 있습니다.

backfill CLI 명령을 실행하려면 bash 연산자에서 CLI 명령을 호출하는 것이 좋습니다. bash 연산자에서는 작업자로부터 backfill이 시작되므로, 필요한 모든 요구 사항과 플러그인을 사용하여 설치할 수 있으므로 DAG를 성공적으로 구문 분석할 수 있습니다. 다음 예제를 사용하여를 사용하여를 실행할 DAGBashOperator를 생성합니다backfill.

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="backfill_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="airflow dags backfill my_dag_id" )

연산자

다음 주제에서는 연산자를 사용할 때 발생할 수 있는 오류에 대해 설명합니다.

S3Transform 연산자를 사용할 때 PermissionError: [Errno 13] Permission denied 오류가 발생함

S3Transform 연산자를 사용하여 쉘 스크립트를 실행하려고 하는데 PermissionError: [Errno 13] Permission denied 오류가 발생하는 경우 다음 단계를 수행하는 것이 좋습니다. 다음 단계에서는 기존 plugins.zip 파일이 있다고 가정합니다. plugins.zip을 생성하는 경우 섹션을 참조하세요사용자 지정 플러그인 설치.

  1. GitHub에서 aws-mwaa-docker-images를 사용하여 DAGs, 사용자 지정 플러그인 및 Python 종속성을 로컬에서 테스트합니다. aws-mwaa-docker-images

  2. "transform" 스크립트를 생성합니다.

    #!/bin/bash cp $1 $2
  3. (선택 사항) 스크립트를 실행하려면 macOS 및 Linux 사용자가 다음 명령을 실행해야 할 수 있습니다.

    chmod 777 transform_test.sh
  4. plugins.zip 파일에 스크립트를 추가합니다.

    zip plugins.zip transform_test.sh
  5. Amazon S3에 plugins.zip 업로드의 단계를 따릅니다.

  6. Amazon MWAA 콘솔에서 plugins.zip 버전 지정의 단계를 따릅니다.

  7. 다음 DAG 파일을 생성합니다.

    from airflow import DAG from airflow.providers.amazon.aws.operators.s3_file_transform import S3FileTransformOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") with DAG (dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: file_transform = S3FileTransformOperator( task_id='file_transform', transform_script='/usr/local/airflow/plugins/transform_test.sh', source_s3_key='s3://amzn-s3-demo-bucket/files/input.txt', dest_s3_key='s3://amzn-s3-demo-bucket/files/output.txt' )
  8. Amazon S3에 DAG 코드 업로드의 단계를 따릅니다.