Lambda 関数を使用して DAG を呼び出す
次のコード例では、AWS Lambda 関数を使用して Apache Airflow CLI トークンを取得し、Amazon MWAA 環境で有向非巡回グラフ (DAG) を呼び出します。
バージョン
このページのコード例は、Python 3.10
前提条件
コードサンプルを使用するには、以下が必要です。
-
Amazon MWAA 環境 には、パブリックネットワークアクセスモード を使用してください。
-
最新の Python ランタイムを使用する Lambda 関数 を用意してください。
注記
Lambda 関数と Amazon MWAA 環境が同じ VPC にある場合は、このコードをプライベートネットワークで使用できます。この設定では、Lambda 関数の実行ロールに、Amazon Elastic Compute Cloud (Amazon EC2) CreateNetworkInterface API オペレーションを呼び出すアクセス許可が必要です。AWSLambdaVPCAccessExecutionRole
アクセス許可
このページのコード例を使用するには、Amazon MWAA 環境の実行ロールが airflow:CreateCliToken アクションを実行するためのアクセス権が必要です。AmazonMWAAAirflowCliAccess AWS マネージドポリシーを使用して、このアクセス許可を提供できます。
詳細については、Apache Airflow CLI ポリシー: AmazonMWAAAirflowCliAccess を参照してください。
依存関係
このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。aws-mwaa-docker-images
コード例
-
AWS Lambda コンソールの https://console.aws.amazon.com/lambda/
を開いてください。 -
関数 リストから Lambda 関数を選択します。
-
関数ページで次のコードをコピーし、以下をリソース名に置き換えます。
-
YOUR_ENVIRONMENT_NAME– Amazon MWAA 環境の名前。 -
YOUR_DAG_NAME— 呼び出したい DAG の名前。
import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' client = boto3.client('mwaa') def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout']) -
-
デプロイ を選択します。
-
テスト を選択し、Lambda コンソールを使用して関数を呼び出します。
-
Lambda が DAG を正常に呼び出したことを確認するには、Amazon MWAA コンソールを使用して、お使いの環境の Apache Airflow UI に移動し、次の操作を行います。
-
DAG ページの DAG のリストから新しいターゲット DAG を見つけます。
-
前回の実行 で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における
invoke_dagの最新のタイムスタンプとほぼ一致する必要があります。 -
最近のタスク で、前回の実行が成功したことを確認します。
-