本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
使用 Lambda 函數叫DAGs
下列程式碼範例使用 AWS Lambda函數來取得 Apache Airflow CLI 權杖,並在 Amazon MWAA 環境中叫用定向非循環圖形 (DAG)。
版本
-
您可以在 Python 3.10
中使用此頁面上的程式碼範例搭配 Apache Airflow v2。
先決條件
若要使用此程式碼範例,您必須:
-
為您的 Amazon MWAA 環境使用公有網路存取模式。
-
使用最新的 Python 執行時間來擁有 Lambda 函數。
注意
如果 Lambda 函數和您的 Amazon MWAA 環境位於相同的 VPC 中,您可以在私有網路上使用此程式碼。在此組態中,Lambda 函數的執行角色需要呼叫 Amazon Elastic Compute Cloud (Amazon EC2) CreateNetworkInterface API 操作的許可。您可以使用 AWSLambdaVPCAccessExecutionRole
許可
若要使用此頁面上的程式碼範例,Amazon MWAA 環境的執行角色需要存取權才能執行airflow:CreateCliToken
動作。您可以使用 AmazonMWAAAirflowCliAccess
AWS 受管政策提供此許可:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }
如需詳細資訊,請參閱Apache Airflow CLI 政策:AmazonMWAAAirflowCliAccess。
相依性
-
若要將此程式碼範例與 Apache Airflow v2 搭配使用,不需要額外的相依性。此程式碼會在您的環境中使用 Apache Airflow v2 基本安裝
。
程式碼範例
-
在 https://https://console.aws.amazon.com/lambda/
開啟 AWS Lambda 主控台。 -
從函數清單中選擇您的 Lambda 函數。
-
在函數頁面上,複製下列程式碼,並以您的資源名稱取代下列程式碼:
-
YOUR_ENVIRONMENT_NAME
– Amazon MWAA 環境的名稱。 -
YOUR_DAG_NAME
– 您要叫用之 DAG 的名稱。
import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' client = boto3.client('mwaa') def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
-
-
選擇部署。
-
選擇測試,使用 Lambda 主控台叫用您的函數。
-
若要驗證您的 Lambda 是否已成功調用 DAG,請使用 Amazon MWAA 主控台導覽至您環境的 Apache Airflow UI,然後執行下列動作:
-
在 DAGs頁面上,在 DAG 清單中找到您的新目標 DAGs。
-
在上次執行下,檢查時間戳記以取得最新的 DAG 執行。此時間戳記應緊密符合您
invoke_dag
其他環境中 的最新時間戳記。 -
在最近任務下,檢查上次執行是否成功。
-