從 Airflow 提交 EMR Serverless 任務 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

從 Airflow 提交 EMR Serverless 任務

Apache Airflow 中的 Amazon Provider 提供 EMR Serverless 運算子。如需運算子的詳細資訊,請參閱 Apache Airflow 文件中的 Amazon EMR Serverless Operators

您可以使用 來EmrServerlessCreateApplicationOperator建立 Spark 或 Hive 應用程式。您也可以使用 EmrServerlessStartJobOperator 來啟動一或多個使用新應用程式的任務。

若要搭配 Amazon Managed Workflows for Apache Airflow (MWAA) 搭配 Airflow 2.2.2 使用 運算子,請將以下行新增至您的 requirements.txt 檔案,並更新您的 MWAA 環境以使用新的 檔案。

apache-airflow-providers-amazon==6.0.0 boto3>=1.23.9

請注意,EMR Serverless 支援已新增至 Amazon 供應商的 5.0.0 版。6.0.0 版是與 Airflow 2.2.2 相容的最新版本。您可以在 MWAA 上使用 Airflow 2.4.3 的更新版本。

下列縮寫範例示範如何建立應用程式、執行多個 Spark 任務,然後停止應用程式。EMR Serverless Samples GitHub 儲存庫提供完整範例。如需sparkSubmit組態的其他詳細資訊,請參閱 執行 EMR Serverless 任務時使用 Spark 組態

from datetime import datetime from airflow import DAG from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role" S3_LOGS_BUCKET = "amzn-s3-demo-bucket" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket/logs/"} }, } with DAG( dag_id="example_endtoend_emr_serverless_job", schedule_interval=None, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", job_type="SPARK", release_label="emr-6.7.0", config={"name": "airflow-test"}, ) application_id = create_app.output job1 = EmrServerlessStartJobOperator( task_id="start_job_1", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) job2 = EmrServerlessStartJobOperator( task_id="start_job_2", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py", "entryPointArguments": ["1000"] } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application_id, trigger_rule="all_done", ) (create_app >> [job1, job2] >> delete_app)