翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Apache Airflow CLI コマンドリファレンス
このトピックでは、Amazon Managed Workflows for Apache Airflow でサポートされている Apache Airflow CLI コマンドとサポートされていない Apache Airflow CLI コマンドについて説明します。
ヒント
REST API は CLI よりも最新で、外部システムとプログラムで統合するように設計されています。REST は、Apache Airflow を操作する際に推奨される方法です。
目次
前提条件
以下のセクションでは、このページのコマンドとスクリプトを使用するために必要な準備手順について説明します。
アクセス
-
AWS アカウント AWS Identity and Access Management (IAM) で の Amazon MWAA アクセス許可ポリシーにアクセスしますApache Airflow UI アクセスポリシー: AmazonMWAAWebServerAccess。
-
AWS アカウント AWS Identity and Access Management (IAM) の Amazon MWAA アクセス許可ポリシー へのアクセスAPI とコンソールのフルアクセスポリシー: AmazonMWAAFullApiAccess。
AWS CLI
AWS Command Line Interface (AWS CLI) は、コマンドラインシェルのコマンドを使用して AWS サービスとやり取りするために使用できるオープンソースツールです。このページのステップを完了するには、以下のものが必要です。
何が変わったのですか?
-
v3: Airflow アーキテクチャ。Apache Airflow v3 では、セキュリティとスケーラビリティを向上させ、メンテナンスを容易にするために、アーキテクチャの大幅な変更が導入されています。詳細については、「Airflow 3 へのアップグレード
」を参照してください。 -
v2: Airflow CLI コマンド構造。Apache Airflow v2 CLI は、関連するコマンドがサブコマンドとしてグループ化されるように設定されています。つまり、Apache Airflow v2 にアップグレードする場合は、Apache Airflow v1 スクリプトを更新する必要があります。例えば、Apache Airflow v1
unpauseの は Apache Airflow v2dags unpauseにあります。詳細については、「 2.0 の Airflow CLI の変更」を参照してください。
サポート済みの CLI コマンド
以下のセクションでは、Amazon MWAA で使用できる Apache Airflow CLI コマンドの一覧を表示しています。
サポートされているコマンド
DAGs を解析するコマンドを使用する
環境が Apache Airflow v2.0.2 を実行している場合、DAGs が を介してインストールされたパッケージに依存するプラグインを使用すると、DAG を解析する CLI コマンドは失敗しますrequirements.txt。
Apache Airflow v2.0.2
-
dags backfill -
dags list -
dags list-runs -
dags next-execution
DAG が requirements.txt を介してインストールされたパッケージに依存するプラグインを使用していない場合は、次の CLI コマンドを使用できます。
「サンプルコード」
次のセクションには、Apache Airflow CLI を使用する各種方法の例が含まれています。
Apache Airflow v2 変数を設定、取得、または削除します。
次のサンプルコードを使用して、WWH 形式で変数を設定、取得、または削除できます<script> <mwaa env name> get | set | delete <variable> <variable value> </variable> </variable>。
[ $# -eq 0 ] && echo "Usage: $0 MWAA environment name " && exit if [[ $2 == "" ]]; then dag="variables list" elif [ $2 == "get" ] || [ $2 == "delete" ] || [ $2 == "set" ]; then dag="variables $2 $3 $4 $5" else echo "Not a valid command" exit 1 fi CLI_JSON=$(aws mwaa --region $AWS_REGION create-cli-token --name $1) \ && CLI_TOKEN=$(echo $CLI_JSON | jq -r '.CliToken') \ && WEB_SERVER_HOSTNAME=$(echo $CLI_JSON | jq -r '.WebServerHostname') \ && CLI_RESULTS=$(curl --request POST "https://$WEB_SERVER_HOSTNAME/aws_mwaa/cli" \ --header "Authorization: Bearer $CLI_TOKEN" \ --header "Content-Type: text/plain" \ --data-raw "$dag" ) \ && echo "Output:" \ && echo $CLI_RESULTS | jq -r '.stdout' | base64 --decode \ && echo "Errors:" \ && echo $CLI_RESULTS | jq -r '.stderr' | base64 --decode
DAG をトリガーするときに設定を追加します。
Apache Airflow v2 で次のサンプルコードを使用して、 などの DAG をトリガーするときに設定を追加できますairflow trigger_dag 'dag_name' —conf '{"key":"value"}'。
import boto3 import json import requests import base64 mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' key = "YOUR_KEY" value = "YOUR_VALUE" conf = "{\"" + key + "\":\"" + value + "\"}" client = boto3.client('mwaa') mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) mwaa_auth_token = 'Bearer ' + mwaa_cli_token['CliToken'] mwaa_webserver_hostname = 'https://{0}/aws_mwaa/cli'.format(mwaa_cli_token['WebServerHostname']) raw_data = "trigger_dag {0} -c '{1}'".format(dag_name, conf) mwaa_response = requests.post( mwaa_webserver_hostname, headers={ 'Authorization': mwaa_auth_token, 'Content-Type': 'text/plain' }, data=raw_data ) mwaa_std_err_message = base64.b64decode(mwaa_response.json()['stderr']).decode('utf8') mwaa_std_out_message = base64.b64decode(mwaa_response.json()['stdout']).decode('utf8') print(mwaa_response.status_code) print(mwaa_std_err_message) print(mwaa_std_out_message)
拠点ホストへの SSH 踏み台ホストで CLI コマンドを実行します
次の例を使用して、Linux 踏み台ホストへの SSH トンネルプロキシを使用して Airflow CLI コマンドを実行します。
curl を使用
-
ssh -D 8080 -f -C -q -NYOUR_USER@YOUR_BASTION_HOST -
curl -x socks5h://0:8080 --request POST https://YOUR_HOST_NAME/aws_mwaa/cli --headerYOUR_HEADERS--data-rawYOUR_CLI_COMMAND