Lambda 関数を使用して DAG を呼び出す - Amazon Managed Workflows for Apache Airflow

Lambda 関数を使用して DAG を呼び出す

次のコード例では、AWS Lambda 関数を使用して Apache Airflow CLI トークンを取得し、Amazon MWAA 環境で有向非巡回グラフ (DAG) を呼び出します。

バージョン

このページのコード例は、Python 3.10Apache Airflow v2 および Python 3.11Apache Airflow v3 で使用可能です。

前提条件

コードサンプルを使用するには、以下が必要です。

注記

Lambda 関数と Amazon MWAA 環境が同じ VPC にある場合は、このコードをプライベートネットワークで使用できます。この設定では、Lambda 関数の実行ロールに、Amazon Elastic Compute Cloud (Amazon EC2) CreateNetworkInterface API オペレーションを呼び出すアクセス許可が必要です。AWSLambdaVPCAccessExecutionRole AWS マネージドポリシーを使用して、このアクセス許可を提供できます。

アクセス許可

このページのコード例を使用するには、Amazon MWAA 環境の実行ロールが airflow:CreateCliToken アクションを実行するためのアクセス権が必要です。AmazonMWAAAirflowCliAccess AWS マネージドポリシーを使用して、このアクセス許可を提供できます。

JSON
{ "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

詳細については、Apache Airflow CLI ポリシー: AmazonMWAAAirflowCliAccess を参照してください。

依存関係

このコード例を Apache Airflow v2 以降で使用する場合、追加の依存関係は必要ありません。aws-mwaa-docker-images を使用して、Apache Airflow をインストールします。

コード例

  1. AWS Lambda コンソールの https://console.aws.amazon.com/lambda/ を開いてください。

  2. 関数 リストから Lambda 関数を選択します。

  3. 関数ページで次のコードをコピーし、以下をリソース名に置き換えます。

    • YOUR_ENVIRONMENT_NAME – Amazon MWAA 環境の名前。

    • YOUR_DAG_NAME — 呼び出したい DAG の名前。

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. デプロイ を選択します。

  5. テスト を選択し、Lambda コンソールを使用して関数を呼び出します。

  6. Lambda が DAG を正常に呼び出したことを確認するには、Amazon MWAA コンソールを使用して、お使いの環境の Apache Airflow UI に移動し、次の操作を行います。

    1. DAG ページの DAG のリストから新しいターゲット DAG を見つけます。

    2. 前回の実行 で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における invoke_dag の最新のタイムスタンプとほぼ一致する必要があります。

    3. 最近のタスク で、前回の実行が成功したことを確認します。