Apache Airflow CLI コマンドリファレンス - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Airflow CLI コマンドリファレンス

このトピックでは、Amazon Managed Workflows for Apache Airflow でサポートされている Apache Airflow CLI コマンドとサポートされていない Apache Airflow CLI コマンドについて説明します。

ヒント

REST API は CLI よりも最新で、外部システムとプログラムで統合するように設計されています。REST は、Apache Airflow を操作する際に推奨される方法です。

前提条件

以下のセクションでは、このページのコマンドとスクリプトを使用するために必要な準備手順について説明します。

アクセス

AWS CLI

AWS Command Line Interface (AWS CLI) は、コマンドラインシェルのコマンドを使用して AWS サービスとやり取りするために使用できるオープンソースツールです。このページのステップを完了するには、以下のものが必要です。

何が変わったのですか?

  • v3: Airflow アーキテクチャ。Apache Airflow v3 では、セキュリティとスケーラビリティを向上させ、メンテナンスを容易にするために、アーキテクチャの大幅な変更が導入されています。詳細については、「Airflow 3 へのアップグレード」を参照してください。

  • v2: Airflow CLI コマンド構造。Apache Airflow v2 CLI は、関連するコマンドがサブコマンドとしてグループ化されるように設定されています。つまり、Apache Airflow v2 にアップグレードする場合は、Apache Airflow v1 スクリプトを更新する必要があります。例えば、Apache Airflow v1 unpauseの は Apache Airflow v2 dags unpauseにあります。詳細については、「 2.0 の Airflow CLI の変更」を参照してください。

サポート済みの CLI コマンド

以下のセクションでは、Amazon MWAA で使用できる Apache Airflow CLI コマンドの一覧を表示しています。

サポートされているコマンド

Apache Airflow v3
Apache Airflow v2

DAGs を解析するコマンドを使用する

環境が Apache Airflow v2.0.2 を実行している場合、DAGs が を介してインストールされたパッケージに依存するプラグインを使用すると、DAG を解析する CLI コマンドは失敗しますrequirements.txt

Apache Airflow v2.0.2
  • dags backfill

  • dags list

  • dags list-runs

  • dags next-execution

DAG が requirements.txt を介してインストールされたパッケージに依存するプラグインを使用していない場合は、次の CLI コマンドを使用できます。

「サンプルコード」

次のセクションには、Apache Airflow CLI を使用する各種方法の例が含まれています。

Apache Airflow v2 変数を設定、取得、または削除します。

次のサンプルコードを使用して、WWH 形式で変数を設定、取得、または削除できます<script> <mwaa env name> get | set | delete <variable> <variable value> </variable> </variable>

[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit if [[ $2 == "" ]]; then dag="variables list" elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then dag="variables $2 $3 $4 $5" else echo "Not a valid command" exit 1 fi CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \ && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \ && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \ && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \ --header "Authorization: Bearer $CLI_TOKEN" \ --header "Content-Type: text/plain" \ --data-raw "$dag" ) \ && echo "Output:" \ && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \ && echo "Errors:" \ && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode

DAG をトリガーするときに設定を追加します。

Apache Airflow v2 で次のサンプルコードを使用して、 などの DAG をトリガーするときに設定を追加できますairflow trigger_dag 'dag_name' —conf '{"key":"value"}'

import boto3 import json import requests import base64 mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' key = "YOUR_KEY" value = "YOUR_VALUE" conf = "{\"" + key + "\":\"" + value + "\"}" client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message)

拠点ホストへの SSH 踏み台ホストで CLI コマンドを実行します

次の例を使用して、Linux 踏み台ホストへの SSH トンネルプロキシを使用して Airflow CLI コマンドを実行します。

curl を使用
  1. ssh -D 8080 -f -C -q -N YOUR_USER@YOUR_BASTION_HOST
  2. curl -x socks5h://0:8080 --request POST https://YOUR_HOST_NAME/aws_mwaa/cli --header YOUR_HEADERS --data-raw YOUR_CLI_COMMAND