トラブルシューティング: DAGs、オペレータ、接続、その他の問題 - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

トラブルシューティング: DAGs、オペレータ、接続、その他の問題

このページのトピックでは、Amazon Managed Workflows for Apache Airflow 環境で発生する可能性のある Apache Airflow v2 および v3 Python 依存関係、カスタムプラグイン、DAGs、オペレータ、接続、タスク、およびウェブサーバーの問題の解決方法について説明します。

Connections

次のトピックでは、Apache Airflow 接続または別の AWS データベースを使用する場合に発生する可能性のあるエラーについて説明します。

Secrets Manager に接続できません。

次のステップを推奨します。

  1. Apache Airflow 接続と変数のシークレットキーを作成する方法を AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定 で学習できます。

  2. test-variable で ApacheAirflow 変数 (Apache Airflow 変数 AWS Secrets Manager の でのシークレットキーの使用) のシークレットキーを使用する方法を学習します。

  3. Apache Airflow 接続 AWS Secrets Manager に でシークレットキーを使用する で ApacheAirflow 接続 (myconn) のシークレットキーを使用する方法を学習します。

secretsmanager:ResourceTag/<tag-key> シークレットマネージャの条件またはリソース制限は、実行ロールポリシーでどのように設定しますか。

注記

Apache Airflow バージョン 2.0 やそれ以前のバージョンに適用されます。

現時点では、Apache Airflow の既知の問題のため、環境の実行ロールで条件キーやその他のリソース制限を使用して Secrets Manager secrets へのアクセスを制限することはできません。

Snowflake に接続できません。

次のステップを推奨します。

  1. GitHub DAGs、カスタムプラグイン、Python 依存関係をローカルでテストします。 aws-mwaa-docker-images

  2. ご使用の環境に適した requirements.txt に次のエントリを追加します。

    apache-airflow-providers-snowflake==1.3.0
  3. 以下のインポートを DAG に追加する:

    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

Apache Airflow 接続オブジェクトに、次のキーバリューのペアが含まれていることを確認します。

  1. 接続 ID: snowflake_conn

  2. コーンタイプ: スノーフレーク

  3. ホスト: <my account> <my region if not us-west-2>.snowflakecomputing.com

  4. スキーマ: <my schema>

  5. ログイン: <my user name>

  6. パスワード: ********

  7. ポート: <port, if any>

  8. エキストラ:

    { "account": "<my account>", "warehouse": "<my warehouse>", "database": "<my database>", "region": "<my region if not using us-west-2 otherwise omit this line>" }

例:

>>> import json >>> from airflow.models.connection import Connection >>> myconn = Connection( ... conn_id='snowflake_conn', ... conn_type='Snowflake', ... host='123456789012.us-east-1.snowflakecomputing.com', ... schema='YOUR_SCHEMA' ... login='YOUR_USERNAME', ... password='YOUR_PASSWORD', ... port='YOUR_PORT' ... extra=json.dumps(dict(account='123456789012', warehouse='YOUR_WAREHOUSE', database='YOUR_DB_OPTION', region='us-east-1')), ... )

Airflow UI に接続が見つかりません

Apache Airflow は Apache Airflow UI に接続テンプレートを用意しています。接続タイプに関係なく、接続 URI 文字列を生成します。Apache Airflow UI に接続テンプレートがない場合は、HTTP 接続テンプレートを使用するなど、代替の接続テンプレートを使用して接続 URI 文字列を生成できます。

次のステップを推奨します。

  1. の Apache Airflow UI で Amazon MWAA が提供する接続タイプにアクセスしますAmazon MWAA 環境にインストールされている Apache エアフロープロバイダーパッケージ

  2. コマンドにアクセスして、 の CLI で Apache Airflow 接続を作成しますApache Airflow CLI コマンドリファレンス

  3. 接続タイプの概要 Amazon MWAA 上の Apache Airflow UI では使用できない接続タイプのために、Apache Airflow UI で接続テンプレートを交換して使用する方法を学習します。

Web サーバー

次のトピックでは、Amazon MWAA の Apache Airflow ウェブサーバーで発生する可能性のあるエラーについて説明します。

ウェブサーバーへのアクセス中に 5xx エラーが表示される

次のステップを推奨します。

  1. Apache Airflow 構成オプションを確認してください。などの Apache Airflow 設定オプションとして指定したキーと値のペアが正しく設定 AWS Secrets Managerされていることを確認します。詳細については、「Secrets Manager に接続できません。」を参照してください。

  2. requirements.txt をチェックしてください。Apache Airflow バージョンと互換性のある、requirements.txt にリストされている Airflow の「extras」パッケージおよびその他のライブラリを確認してください。

  3. requirements.txt ファイルで Python 依存関係を指定する方法については、「」を参照してくださいrequirements.txt での Python 依存関係の管理

The scheduler does not seem to be running エラーが表示される

スケジューラが実行されていないように見える場合、または最後の「ハートビートDAGs が Apache Airflow に表示されず、新しいタスクがスケジュールされない可能性があります。

次のステップを推奨します。

  1. VPC セキュリティグループがポート 5432 へのインバウンドアクセスを許可していることを確認します。これは、ご使用の環境の Amazon Aurora PostgreSQL メタデータデータベースに接続するために必要のポートです。このルールを追加したら、Amazon MWAA に数分与えると、エラーが消える可能性があります。詳細については、「Amazon MWAA の VPC のセキュリティ」を参照してください。

    注記
    • Aurora PostgreSQL メタデータベースは Amazon MWAA サービスアーキテクチャの一部であり、 では使用できません AWS アカウント。

    • データベース関連のエラーは通常、スケジューラー障害の症状であり、根本的な原因ではありません。

  2. スケジューラーが動作していない場合は、依存関係のインストールの失敗スケジューラーの過負荷 など、さまざまな要因が原因である可能性があります。CloudWatch Logs DAGs、プラグイン、要件が正しく動作していることを確認します。詳細については、「Amazon Managed Workflows for Apache Airflow のモニタリングとメトリクス」を参照してください。

タスク

次のトピックでは、 環境の Apache Airflow タスクで発生する可能性のあるエラーについて説明します。

タスクがスタックするか完了しない

Apache Airflow タスクが「行き詰まっている」か、完了していない場合は、次のステップを実行することをお勧めします。

  1. 多数の DAGs が定義されている可能性があります。DAG の数を減らし、環境の更新 (ログレベルの変更など) を実行して強制的にリセットしてください。

    1. Airflow は DAG が有効かどうかに関係なく解析します。環境の容量の 50% 以上を使用している場合は、Apache Airflow スケジューラの負担が増大する可能性があります。これにより、CloudWatch メトリクスの合計解析時間が長くなったり、CloudWatCloudWatch Logs の DAG 処理時間が長くることがあります。Apache Airflow の設定を最適化する方法は他にもありますが、このガイドの対象範囲には含まれていません。

    2. 環境のパフォーマンスを調整するために推奨されるベストプラクティスの詳細については、「」を参照してくださいAmazon MWAA での Apache Airflow のパフォーマンス調整

  2. キューには多数のタスクが存在する可能性があります。これは多くの場合、 None状態のタスクの数が大きい、増加している、または Tasks Pending CloudWatch Queued Tasksのタスクの数が多いと表示されます。これは以下のような原因で起こりうる:

    1. 実行するタスクが環境の容量よりも多い場合、および/または Auto Scaling がタスクを検出して追加のワーカーをデプロイする前にキューに入れられたタスクが多数ある場合。

    2. 実行するタスクが環境の実行容量よりも多い場合は、DAGs が同時に実行するタスクの数を減らすか、Apache Airflow ワーカーの最小数を増やすことをお勧めします。

    3. Auto Scaling が追加のワーカーを検出してデプロイする前にキューに入れられたタスクが多数ある場合は、タスクのデプロイをずらすか、Apache Airflow ワーカーの最小数を増やすことをお勧めします。

    4. AWS Command Line Interface (AWS CLI) の update-environment コマンドを使用して、環境で実行されるワーカーの最小数または最大数を変更できます。

      aws mwaa update-environment --name MyEnvironmentName --min-workers 2 --max-workers 10
    5. 環境のパフォーマンスを調整するために推奨されるベストプラクティスの詳細については、「」を参照してくださいAmazon MWAA での Apache Airflow のパフォーマンス調整

  3. タスクが「実行中」状態のままになっている場合は、タスクをクリアしたり、成功または失敗のマークを付けることもできます。これにより、環境の自動スケーリングコンポーネントは、環境で実行されているワーカー数をスケールダウンできます。次の図は、ストランドタスクの例を示しています。

    これは課題が立ち消えになっているイメージです。
    1. 取り残されたタスクの円を選択し、クリアを選択します(図を参照)。これにより Amazon MWAA はワーカーをスケールダウンできます。そうしない場合、Amazon MWAA はどの DAG が有効か無効かを判断できず、キューにタスクが残っている場合はスケールダウンできません。

      Apache Airflow アクション
  4. Apache Airflow タスクライフサイクルの詳細については、Apache Airflow リファレンスガイド「概念」 を参照してください。

Airflow v3 のログなしでタスクの失敗が発生する

Apache Airflow 3 タスクがログなしで失敗する場合は、次の手順に従います。

  • ワーカーログにタスクが失敗した時間Task handler raised error: WorkerLostError('Worker exited prematurely: exitcode 15 Job: 12.')前後などのエラーが表示される場合は、タスクに割り当てられた分岐ワーカープロセスが予期せず終了した可能性が高いことを示します。

    これに対処するには、celery.worker_autoscale を同じ最小値と最大値で設定することを検討してください。例:

    celery.worker_autoscale=5,5 # for mw1.small celery.worker_autoscale=10,10 # for mw1.medium celery.worker_autoscale=20,20 # for mw1.large

    これにより、ワーカープールのサイズが固定され、予期しないワーカーの終了を防ぐことができます。

CLI

次のトピックでは、 で Airflow CLI コマンドを実行するときに発生する可能性のあるエラーについて説明します AWS Command Line Interface。

CLI で DAG をトリガーすると「503」というエラーが表示される

Airflow CLI は、同時実行が制限されている Apache Airflow ウェブサーバーで実行されます。通常、 CLI コマンドを最大 4 つ同時に実行できます。

dags backfill Apache エアフロー CLI コマンドが失敗した原因は? 回避策はありますか?

注記

以下は Apache Airflow v2.0.2 の環境にのみ適用されます。

他の Apache Airflow CLI コマンドと同様に、backfill コマンドはすべての DAGs を処理する前に、CLI 操作がどの DAGs に適用されているかに関係なく、すべての DAGs をローカルに解決します。Apache Airflow v2.0.2 を使用する Amazon MWAA 環境では、CLI コマンドが実行されるまでにプラグインと要件がウェブサーバーにまだインストールされていないため、解析オペレーションは失敗し、backfillオペレーションは呼び出されません。環境に要件やプラグインがない場合、backfill 操作は成功します。

backfill CLI コマンドを実行できるようにするには、bash 演算子で呼び出すことをお勧めします。bash オペレータでは、backfill はワーカーから起動されます。これにより、必要な要件とプラグインがすべて使用可能になり、インストール後、 DAG が正常に解析できるようになります。次の例を使用して、 BashOperatorを実行する で DAG を作成しますbackfill

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="backfill_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="airflow dags backfill my_dag_id" )

演算子

次のトピックでは、 Operators の使用時に発生する可能性のあるエラーについて説明します。

S3-Transform オペレータの使用中に PermissionError: [Errno 13] Permission denied エラーが発生しました。

S3Transform オペレータを使用してシェルスクリプトを実行しようとして PermissionError: [Errno 13] Permission denied エラーが発生する場合は、次の手順を実行することをお勧めします。次のステップは、既存の plugins.zip ファイルがあることを前提としています。新しい plugins.zip を作成する場合は、「」を参照してくださいカスタムプラグインのインストール

  1. GitHub DAGs、カスタムプラグイン、Python 依存関係をローカルでテストします。 aws-mwaa-docker-images

  2. 「変換」スクリプトを作成します。

    #!/bin/bash cp $1 $2
  3. (オプション) macOS および Linux ユーザーは、スクリプトが実行可能であることを確認するために次のコマンドを実行する必要がある場合があります。

    chmod 777 transform_test.sh
  4. スクリプトを plugins.zip に追加します。

    zip plugins.zip transform_test.sh
  5. 「plugins.zip を Amazon S3 にアップロードする」 のステップに従ってください。

  6. 「Amazon MWAA コンソールのplugins.zip バージョンの指定」 のステップに従ってください。

  7. 以下の DAG を作成します。

    from airflow import DAG from airflow.providers.amazon.aws.operators.s3_file_transform import S3FileTransformOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") with DAG (dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: file_transform = S3FileTransformOperator( task_id='file_transform', transform_script='/usr/local/airflow/plugins/transform_test.sh', source_s3_key='s3://amzn-s3-demo-bucket/files/input.txt', dest_s3_key='s3://amzn-s3-demo-bucket/files/output.txt' )
  8. 「Amazon S3 への DAG コードのアップロード」 のステップに従ってください。