Menggunakan Apache Airflow REST API - Amazon Managed Workflows for Apache Airflow

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan Apache Airflow REST API

Alur Kerja Terkelola Amazon untuk Apache Airflow (Amazon MWAA) mendukung interaksi dengan lingkungan Apache Airflow Anda secara langsung menggunakan API REST Apache Airflow untuk lingkungan yang menjalankan Apache Airflow v2.4.3 dan yang lebih baru. Ini memungkinkan Anda mengakses dan mengelola lingkungan Amazon MWAA Anda secara terprogram, menyediakan cara standar untuk menjalankan alur kerja orkestrasi data, mengelola, dan DAGs memantau status berbagai komponen Apache Airflow seperti database metadata, pemicu, dan penjadwal.

Untuk mendukung skalabilitas saat menggunakan Apache Airflow REST API, Amazon MWAA memberi Anda opsi untuk menskalakan kapasitas server web secara horizontal untuk menangani peningkatan permintaan, baik dari permintaan REST API, penggunaan antarmuka baris perintah (CLI), atau pengguna antarmuka pengguna Apache Airflow (UI) lainnya secara bersamaan. Untuk informasi lebih lanjut tentang bagaimana Amazon MWAA menskalakan server web, lihat. Mengkonfigurasi penskalaan otomatis server web Amazon MWAA

Anda dapat menggunakan Apache Airflow REST API untuk mengimplementasikan kasus penggunaan berikut untuk lingkungan Anda:

  • Akses terprogram - Anda sekarang dapat memulai Apache Airflow DAG berjalan, mengelola kumpulan data, dan mengambil status berbagai komponen seperti database metadata, pemicu, dan penjadwal tanpa bergantung pada Apache Airflow UI atau CLI.

  • Integrasikan dengan aplikasi eksternal dan layanan mikro - Dukungan REST API memungkinkan Anda membuat solusi khusus yang mengintegrasikan lingkungan Amazon MWAA Anda dengan sistem lain. Misalnya, Anda dapat memulai alur kerja sebagai respons terhadap peristiwa dari sistem eksternal, seperti pekerjaan database yang diselesaikan atau pendaftaran pengguna baru.

  • Pemantauan terpusat — Anda dapat membuat dasbor pemantauan yang menggabungkan status Anda di DAGs beberapa lingkungan Amazon MWAA, memungkinkan pemantauan dan pengelolaan terpusat.

Untuk informasi selengkapnya tentang Apache Airflow REST API, lihat Referensi API Apache Airflow REST.

Dengan menggunakanInvokeRestApi, Anda dapat mengakses Apache Airflow REST API menggunakan kredensi. AWS Atau, Anda juga dapat mengaksesnya dengan mendapatkan token akses server web dan kemudian menggunakan token untuk memanggilnya.

Jika Anda menemukan kesalahan dengan pesan Update your environment to use InvokeRestApi saat menggunakan InvokeRestApi operasi, ini menunjukkan bahwa Anda perlu memperbarui lingkungan Amazon MWAA Anda. Kesalahan ini terjadi ketika lingkungan Amazon MWAA Anda tidak kompatibel dengan perubahan terbaru yang terkait dengan fitur tersebutInvokeRestApi. Untuk mengatasi masalah ini, perbarui lingkungan Amazon MWAA Anda untuk memasukkan perubahan yang diperlukan untuk fitur tersebutInvokeRestApi.

InvokeRestApiOperasi ini memiliki durasi batas waktu default 10 detik. Jika operasi tidak selesai dalam jangka waktu 10 detik ini, itu akan dihentikan secara otomatis, dan kesalahan akan muncul. Pastikan bahwa panggilan REST API Anda dirancang untuk diselesaikan dalam periode waktu tunggu ini untuk menghindari kesalahan.

Untuk mendukung skalabilitas saat menggunakan Apache Airflow REST API, Amazon MWAA memberi Anda opsi untuk menskalakan kapasitas server web secara horizontal untuk menangani peningkatan permintaan, baik dari permintaan REST API, penggunaan antarmuka baris perintah (CLI), atau pengguna antarmuka pengguna Apache Airflow (UI) yang lebih bersamaan. Untuk informasi lebih lanjut tentang bagaimana Amazon MWAA menskalakan server web, lihat. Mengkonfigurasi penskalaan otomatis server web Amazon MWAA

Anda dapat menggunakan Apache Airflow REST API untuk mengimplementasikan kasus penggunaan berikut untuk lingkungan Anda:

  • Akses terprogram - Anda sekarang dapat memulai Apache Airflow DAG berjalan, mengelola kumpulan data, dan mengambil status berbagai komponen seperti database metadata, pemicu, dan penjadwal tanpa bergantung pada Apache Airflow UI atau CLI.

  • Integrasikan dengan aplikasi eksternal dan layanan mikro - Dukungan REST API memungkinkan Anda membuat solusi khusus yang mengintegrasikan lingkungan Amazon MWAA Anda dengan sistem lain. Misalnya, Anda dapat memulai alur kerja sebagai respons terhadap peristiwa dari sistem eksternal, seperti pekerjaan database yang diselesaikan atau pendaftaran pengguna baru.

  • Pemantauan terpusat — Anda dapat membuat dasbor pemantauan yang menggabungkan status Anda di DAGs beberapa lingkungan Amazon MWAA, memungkinkan pemantauan dan pengelolaan terpusat.

Untuk informasi selengkapnya tentang Apache Airflow REST API, lihat Referensi API Apache Airflow REST.

Dengan menggunakanInvokeRestApi, Anda dapat mengakses Apache Airflow REST API menggunakan kredensi. AWS Atau, Anda juga dapat mengaksesnya dengan mendapatkan token akses server web dan kemudian menggunakan token untuk memanggilnya.

  • Jika Anda menemukan kesalahan dengan pesan Update your environment to use InvokeRestApi saat menggunakan InvokeRestApi operasi, ini menunjukkan bahwa Anda perlu memperbarui lingkungan Amazon MWAA Anda. Kesalahan ini terjadi ketika lingkungan Amazon MWAA Anda tidak kompatibel dengan perubahan terbaru yang terkait dengan fitur tersebutInvokeRestApi. Untuk mengatasi masalah ini, perbarui lingkungan Amazon MWAA Anda untuk memasukkan perubahan yang diperlukan untuk fitur tersebutInvokeRestApi.

  • InvokeRestApiOperasi ini memiliki durasi batas waktu default 10 detik. Jika operasi tidak selesai dalam jangka waktu 10 detik ini, itu akan dihentikan secara otomatis, dan kesalahan akan muncul. Pastikan bahwa panggilan REST API Anda dirancang untuk diselesaikan dalam periode waktu tunggu ini untuk menghindari kesalahan.

penting

Ukuran payload respons tidak boleh melebihi 6 MB. Anda RestApi gagal jika batas ini terlampaui.

Gunakan contoh berikut untuk melakukan panggilan API ke Apache Airflow REST API dan memulai DAG run baru:

Memberikan akses ke Apache Airflow REST API: airflow:InvokeRestApi

Untuk mengakses Apache Airflow REST API AWS menggunakan kredensional, Anda harus memberikan izin airflow:InvokeRestApi dalam kebijakan IAM Anda. Dalam contoh kebijakan berikut, tentukan Public peran AdminOp, UserViewer,,, atau {airflow-role} untuk menyesuaikan tingkat akses pengguna. Untuk informasi selengkapnya, lihat Peran Default dalam panduan referensi Apache Airflow.

JSON
{ "Version":"2012-10-17", "Statement": [ { "Sid": "AllowMwaaRestApiAccess", "Effect": "Allow", "Action": "airflow:InvokeRestApi", "Resource": [ "arn:aws:airflow:us-east-1:111122223333:role/{your-environment-name}/{airflow-role}" ] } ] }
catatan

Saat mengonfigurasi server web pribadi, InvokeRestApi tindakan tidak dapat dipanggil dari luar Virtual Private Cloud (VPC). Anda dapat menggunakan aws:SourceVpc kunci untuk menerapkan kontrol akses yang lebih terperinci untuk operasi ini. Untuk informasi lebih lanjut, lihat aws: SourceVpc.

Memanggil API REST Apache Airflow

Contoh skrip berikut ini mencakup cara menggunakan Apache Airflow REST API untuk mencantumkan yang DAGs tersedia di lingkungan Anda dan cara membuat variabel Apache Airflow:

import boto3 env_name = "MyAirflowEnvironment" def list_dags(client): request_params = { "Name": env_name, "Path": "/dags", "Method": "GET", "QueryParameters": { "paused": False } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) def create_variable(client): request_params = { "Name": env_name, "Path": "/variables", "Method": "POST", "Body": { "key": "test-restapi-key", "value": "test-restapi-value", "description": "Test variable created by MWAA InvokeRestApi API", } } response = client.invoke_rest_api( **request_params ) print("Airflow REST API response: ", response['RestApiResponse']) if __name__ == "__main__": client = boto3.client("mwaa") list_dags(client) create_variable(client)

Membuat token sesi server web dan memanggil Apache Airflow REST API

Untuk membuat token akses server web, gunakan fungsi Python berikut. Fungsi ini pertama-tama memanggil Amazon MWAA API untuk mendapatkan token login web. Token login web, yang kedaluwarsa setelah 60 detik, kemudian ditukar dengan token sesi web, yang memungkinkan Anda mengakses server web dan menggunakan API REST Apache Airflow. Jika Anda memerlukan lebih dari 10 transaksi per detik (TPS) kapasitas pelambatan, Anda dapat menggunakan metode ini untuk mengakses Apache Airflow REST API.

Token sesi berakhir setelah 12 jam.

Tip

Perubahan utama dalam contoh kode berikut dari Apache Airflow v2 ke v3 adalah:

  • Jalur REST API diubah dari /api/v1 menjadi /api/v2

  • Jalur masuk diubah dari /aws_maa/login menjadi /pluginsv2/aws_mwaa/login

  • Respons dari login response.cookies["_token"] berisi informasi token yang harus Anda gunakan untuk panggilan API berikutnya

  • Untuk panggilan REST API, Anda harus meneruskan jwt_token informasi di header sebagai:

    headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" }
Apache Airflow v3
def get_token_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was successful if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies['_token'] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None
Apache Airflow v2
def get_token_info(region, env_name): logging.basicConfig(level=logging.INFO) try: # Initialize MWAA client and request a web login token mwaa = boto3.client('mwaa', region_name=region) response = mwaa.create_web_login_token(Name=env_name) # Extract the web server hostname and login token web_server_host_name = response["WebServerHostname"] web_token = response["WebToken"] # Construct the URL needed for authentication login_url = f"https://{web_server_host_name}/pluginsv2/aws_mwaa/login" login_payload = {"token": web_token} # Make a POST request to the MWAA login url using the login payload response = requests.post( login_url, data=login_payload, timeout=10 ) # Check if login was successful if response.status_code == 200: # Return the hostname and the session cookie return ( web_server_host_name, response.cookies['_token'] ) else: # Log an error logging.error("Failed to log in: HTTP %d", response.status_code) return None except requests.RequestException as e: # Log any exceptions raised during the request to the MWAA login endpoint logging.error("Request failed: %s", str(e)) return None except Exception as e: # Log any other unexpected exceptions logging.error("An unexpected error occurred: %s", str(e)) return None

Setelah otentikasi selesai, Anda memiliki kredensional untuk mulai mengirim permintaan ke titik akhir API. Dalam contoh di bagian berikut, gunakan titik akhirdags/{dag_name}/dagRuns.

Apache Airflow v3
def trigger_dag(region, env_name, dag_id): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_id (str): ID of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_id} in environment {env_name} at region {region}") # Retrieve the web server hostname and token for authentication try: web_server_host_name, jwt_token = get_token_info(region, env_name) if not jwt_token: logging.error("Authentication failed, no jwt token retrieved.") return except Exception as e: logging.error(f"Error retrieving token info: {str(e)}") return # Prepare headers and payload for the request request_headers = { "Authorization": f"Bearer {jwt_token}", "Content-Type": "application/json" # Good practice to include, even for GET } # sample request body input json_body = {"logical_date": "2025-09-17T14:15:00Z"} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v2/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, headers=request_headers, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_id}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_id = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_id)
Apache Airflow v2
def trigger_dag(region, env_name, dag_name): """ Triggers a DAG in a specified MWAA environment using the Airflow REST API. Args: region (str): AWS region where the MWAA environment is hosted. env_name (str): Name of the MWAA environment. dag_name (str): Name of the DAG to trigger. """ logging.info(f"Attempting to trigger DAG {dag_name} in environment {env_name} at region {region}") # Retrieve the web server hostname and session cookie for authentication try: web_server_host_name, session_cookie = get_session_info(region, env_name) if not session_cookie: logging.error("Authentication failed, no session cookie retrieved.") return except Exception as e: logging.error(f"Error retrieving session info: {str(e)}") return # Prepare headers and payload for the request cookies = {"session": session_cookie} json_body = {"conf": {}} # Construct the URL for triggering the DAG url = f"https://{web_server_host_name}/api/v1/dags/{dag_id}/dagRuns" # Send the POST request to trigger the DAG try: response = requests.post(url, cookies=cookies, json=json_body) # Check the response status code to determine if the DAG was triggered successfully if response.status_code == 200: logging.info("DAG triggered successfully.") else: logging.error(f"Failed to trigger DAG: HTTP {response.status_code} - {response.text}") except requests.RequestException as e: logging.error(f"Request to trigger DAG failed: {str(e)}") if __name__ == "__main__": logging.basicConfig(level=logging.INFO) # Check if the correct number of arguments is provided if len(sys.argv) != 4: logging.error("Incorrect usage. Proper format: python script_name.py {region} {env_name} {dag_name}") sys.exit(1) region = sys.argv[1] env_name = sys.argv[2] dag_name = sys.argv[3] # Trigger the DAG with the provided arguments trigger_dag(region, env_name, dag_name)