Mengirimkan pekerjaan EMR Tanpa Server dari Airflow - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Mengirimkan pekerjaan EMR Tanpa Server dari Airflow

Penyedia Amazon di Apache Airflow menyediakan operator EMR Tanpa Server. Untuk informasi selengkapnya tentang operator, lihat Operator Tanpa Server Amazon EMR di dokumentasi Apache Airflow.

Anda dapat menggunakan EmrServerlessCreateApplicationOperator untuk membuat aplikasi Spark atau Hive. Anda juga dapat menggunakan EmrServerlessStartJobOperator untuk memulai satu atau lebih pekerjaan dengan aplikasi baru Anda.

Untuk menggunakan operator dengan Amazon Managed Workflows for Apache Airflow (MWAA) dengan Airflow 2.2.2, tambahkan baris berikut ke file Anda dan perbarui lingkungan MWAA requirements.txt Anda untuk menggunakan file baru.

apache-airflow-providers-amazon==6.0.0 boto3>=1.23.9

Perhatikan bahwa dukungan EMR Tanpa Server ditambahkan untuk merilis 5.0.0 dari penyedia Amazon. Rilis 6.0.0 adalah versi terakhir yang kompatibel dengan Airflow 2.2.2. Anda dapat menggunakan versi yang lebih baru dengan Airflow 2.4.3 di MWAA.

Contoh singkat berikut menunjukkan cara membuat aplikasi, menjalankan beberapa pekerjaan Spark, dan kemudian menghentikan aplikasi. Contoh lengkap tersedia di repositori EMR Serverless Sampel. GitHub Untuk detail sparkSubmit konfigurasi tambahan, lihatMenggunakan konfigurasi Spark saat Anda menjalankan pekerjaan EMR Tanpa Server.

from datetime import datetime from airflow import DAG from airflow.providers.amazon.aws.operators.emr import ( EmrServerlessCreateApplicationOperator, EmrServerlessStartJobOperator, EmrServerlessDeleteApplicationOperator, ) # Replace these with your correct values JOB_ROLE_ARN = "arn:aws:iam::account-id:role/emr_serverless_default_role" S3_LOGS_BUCKET = "amzn-s3-demo-bucket" DEFAULT_MONITORING_CONFIG = { "monitoringConfiguration": { "s3MonitoringConfiguration": {"logUri": f"s3://amzn-s3-demo-bucket/logs/"} }, } with DAG( dag_id="example_endtoend_emr_serverless_job", schedule_interval=None, start_date=datetime(2021, 1, 1), tags=["example"], catchup=False, ) as dag: create_app = EmrServerlessCreateApplicationOperator( task_id="create_spark_app", job_type="SPARK", release_label="emr-6.7.0", config={"name": "airflow-test"}, ) application_id = create_app.output job1 = EmrServerlessStartJobOperator( task_id="start_job_1", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi_fail.py", } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) job2 = EmrServerlessStartJobOperator( task_id="start_job_2", application_id=application_id, execution_role_arn=JOB_ROLE_ARN, job_driver={ "sparkSubmit": { "entryPoint": "local:///usr/lib/spark/examples/src/main/python/pi.py", "entryPointArguments": ["1000"] } }, configuration_overrides=DEFAULT_MONITORING_CONFIG, ) delete_app = EmrServerlessDeleteApplicationOperator( task_id="delete_app", application_id=application_id, trigger_rule="all_done", ) (create_app >> [job1, job2] >> delete_app)