Memanggil DAGs dengan fungsi Lambda - Amazon Managed Workflows for Apache Airflow (MWAA)

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memanggil DAGs dengan fungsi Lambda

Contoh kode berikut menggunakan AWS Lambdafungsi untuk mendapatkan token CLI Apache Airflow dan memanggil grafik asiklik terarah (DAG) di lingkungan Amazon MWAA.

Versi

Prasyarat

Untuk menggunakan contoh kode ini, Anda harus:

catatan

Jika fungsi Lambda dan lingkungan Amazon MWAA Anda berada di VPC yang sama, Anda dapat menggunakan kode ini di jaringan pribadi. Untuk konfigurasi ini, peran eksekusi fungsi Lambda memerlukan izin untuk memanggil operasi Amazon Elastic Compute Cloud (Amazon EC2) CreateNetworkInterface API. Anda dapat memberikan izin ini menggunakan kebijakan AWSLambdaVPCAccessExecutionRole AWS terkelola.

Izin

Untuk menggunakan contoh kode di halaman ini, peran eksekusi lingkungan Amazon MWAA Anda memerlukan akses untuk melakukan tindakan. airflow:CreateCliToken Anda dapat memberikan izin ini menggunakan kebijakan AmazonMWAAAirflowCliAccess AWS terkelola:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

Untuk informasi selengkapnya, lihat Kebijakan CLI Aliran Udara Apache: Amazon MWAAAirflow CliAccess.

Dependensi

Contoh kode

  1. Buka AWS Lambda konsol di https://console.aws.amazon.com/lambda/.

  2. Pilih fungsi Lambda Anda dari daftar Fungsi.

  3. Pada halaman fungsi, salin kode berikut dan ganti yang berikut ini dengan nama sumber daya Anda:

    • YOUR_ENVIRONMENT_NAME— Nama lingkungan Amazon MWAA Anda.

    • YOUR_DAG_NAME— Nama DAG yang ingin Anda panggil.

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. Pilih Deploy.

  5. Pilih Uji untuk menjalankan fungsi Anda menggunakan konsol Lambda.

  6. Untuk memverifikasi bahwa Lambda berhasil memanggil DAG, gunakan konsol Amazon MWAA untuk menavigasi ke UI Apache Airflow lingkungan Anda, lalu lakukan hal berikut:

    1. Pada DAGshalaman, cari DAG target baru Anda dalam daftar DAGs.

    2. Di bawah Last Run, periksa stempel waktu untuk menjalankan DAG terbaru. Stempel waktu ini harus sangat cocok dengan stempel waktu terbaru invoke_dag di lingkungan Anda yang lain.

    3. Di bawah Tugas Terbaru, periksa apakah proses terakhir berhasil.