View a markdown version of this page

Jalankan pekerjaan Spark menggunakan Amazon EMR Tanpa Server - fsX untuk ONTAP

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Jalankan pekerjaan Spark menggunakan Amazon EMR Tanpa Server

Tim rekayasa data yang menjalankan beban kerja Spark — untuk pemrosesan log, rekayasa fitur, ETL kompleks, atau analisis ilmiah — sering kali memiliki data sumber pada FSx untuk volume ONTAP yang ditulis oleh pipeline konsumsi lokal, penggerak data NFS atau SMB, atau aplikasi yang memasang volume secara langsung.

Dengan jalur akses Amazon S3 yang terpasang pada volume, Amazon EMR Serverless membaca data melalui titik akses, menjalankan pekerjaan Spark terhadapnya, dan menulis hasilnya kembali ke volume yang sama. Amazon EMR Tanpa Server menangani siklus hidup klaster secara otomatis — Anda mengirimkan pekerjaan dan membayar untuk detik yang dijalankan.

Pola ini sesuai dengan beban kerja yang membutuhkan runtime Spark penuh (pustaka khusus, algoritme berulang, transformasi yang berjalan lama, atau notebook interaktif melalui Amazon EMR Studio) di mana opsi yang lebih ringan - Amazon Athena untuk SQL dan untuk ETL terkelola - tidak cocok. AWS Glue Untuk informasi tentang alternatif tersebut, lihat Kueri file dengan SQL menggunakan Amazon Athena danMembangun pipa ETL menggunakan AWS Glue.

Dalam tutorial ini, Anda mensimulasikan tim meteorologi yang mengumpulkan satu tahun pengamatan NOAA Global Surface Summary of the Day (GSOD) yang dipentaskan pada FSx untuk volume ONTAP. Anda mengirimkan PySpark pekerjaan yang membaca file CSV mentah, menghitung agregat per stasiun bulanan (suhu rata-rata, curah hujan total, dan hitungan hari dengan peristiwa curah hujan), dan menulis hasilnya sebagai Parket yang dipartisi berdasarkan bulan — semua melalui titik akses.

catatan

Tutorial ini membutuhkan waktu sekitar 30 hingga 40 menit untuk menyelesaikannya. Yang Layanan AWS digunakan dikenakan biaya untuk sumber daya yang Anda buat. Jika Anda menyelesaikan semua langkah, termasuk bagian Pembersihan segera, biaya yang diharapkan kurang dari $1 di AS Timur (Virginia Utara). Wilayah AWS Perkiraan ini tidak termasuk biaya yang sedang berlangsung untuk FSx untuk volume ONTAP itu sendiri.

Prasyarat

  • FSx untuk volume ONTAP dengan titik akses Amazon S3 terpasang. Jalur akses harus memiliki asal jaringan internet sehingga layanan Amazon EMR Tanpa Server dapat mencapainya. Untuk petunjuk, lihat Membuat titik akses.

  • AWS CLI versi 2 diinstal dan dikonfigurasi dengan kredensil yang dapat membuat peran IAM dan sumber daya Amazon EMR Tanpa Server.

Langkah 1: Unggah kumpulan data sampel ke titik akses

Dataset NOAA GSOD adalah kumpulan data publik dari pengamatan cuaca harian, satu file CSV per stasiun per tahun. Untuk tutorial ini, Anda mengunduh subset 100 stasiun dari bucket Amazon noaa-gsod-pds S3 publik dan mengunggahnya ke titik akses Anda.

  1. Unduh 100 file stasiun pertama untuk tahun 2024.

    $ mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -l

    Perintah mengunduh sekitar 100 file CSV dengan total sekitar 7-8 MB.

  2. Unggah file ke titik akses di bawah gsod/2024/ awalan. Ganti access-point-alias dengan alias titik akses Anda.

    $ aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors

Langkah 2: Tulis PySpark pekerjaan

Pekerjaan membaca semua file CSV di bawah awalan input, menyaring nilai sentinel yang mewakili data yang hilang, mem-parsing FRSHTT bitfield (Fog, Rain, Snow, Hail, Thunder, Tornado) untuk menghitung hari peristiwa presipitasi, agregat per, dan menulis Parket yang dipartisi kembali ke titik akses. (station, month)

  1. Simpan skrip berikut ke file bernamagsod_monthly.py.

    # gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop()
  2. Unggah skrip ke titik akses di bawah scripts/ awalan.

    $ aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"

Langkah 3: Buat peran pekerjaan Amazon EMR Tanpa Server

Amazon EMR Tanpa Server mengasumsikan peran eksekusi IAM saat menjalankan pekerjaan Anda. Peran membutuhkan izin untuk membaca dan menulis jalur akses dan menulis log ke CloudWatch Log. Perluas bagian berikut untuk langkah-langkah pengaturan.

  1. Simpan kebijakan kepercayaan berikut sebagaiemr-trust-policy.json. Hal ini memungkinkan Amazon EMR Serverless untuk mengambil peran.

    { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] }
  2. Simpan kebijakan izin berikut sebagaiemr-permissions.json. Ganti regionaccount-id,, dan access-point-name dengan nilai-nilai Anda.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] }
  3. Buat peran dan lampirkan kebijakan.

    $ aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json

Langkah 4: Buat dan mulai aplikasi Amazon EMR Tanpa Server

Aplikasi Amazon EMR Tanpa Server adalah lingkungan komputasi berumur panjang untuk label rilis dan mesin tertentu (Spark atau Hive). Anda mengirimkan satu atau lebih pekerjaan untuk itu. Skala aplikasi menghitung naik dan turun secara otomatis berdasarkan permintaan pekerjaan dan menganggur saat tidak ada pekerjaan yang berjalan.

  1. Buat aplikasi Spark menggunakan rilis Amazon EMR baru-baru ini.

    $ aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0

    Catat applicationId dalam respons.

  2. Mulai aplikasi. Memulai pra-pemanasan sekelompok kecil pekerja sehingga pekerjaan pertama berjalan tanpa penundaan mulai dingin.

    $ aws emr-serverless start-application --application-id application-id

    Tunggu sampai negara menjadiSTARTED.

    $ aws emr-serverless get-application --application-id application-id \ --query 'application.state'

Langkah 5: Kirim pekerjaan Spark

Kirim pekerjaan menggunakan ID aplikasi dan peran eksekusi. Pekerjaan membaca CSV mentah dari gsod/2024/ dan menulis Parket yang dipartisi kegsod-monthly/, keduanya melalui titik akses.

  1. Simpan konfigurasi driver pekerjaan sebagaijob-driver.json. Ganti placeholder.

    { "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } }
  2. Simpan konfigurasi pemantauan berikut sebagaijob-config.json. Ini mengirimkan log driver dan eksekutor ke CloudWatch Log.

    { "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } }
  3. Kirimkan pekerjaan.

    $ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.json

    Catat jobRunId dalam respons.

  4. Polling status pekerjaan. Transisi pekerjaan dari SCHEDULED ke RUNNING keSUCCESS.

    $ aws emr-serverless get-job-run \ --application-id application-id \ --job-run-id job-run-id \ --query 'jobRun.state'
catatan

Jika pekerjaan gagal, periksa log driver di CloudWatch Log di bawah grup log/aws/emr-serverless/fsxn-emr-app. Amazon EMR Tanpa Server menulis satu aliran log per pekerjaan yang dijalankan.

Langkah 6: Periksa outputnya

Verifikasi bahwa pekerjaan menulis satu partisi Parket per bulan dan bahwa output dapat dibaca.

  1. Buat daftar partisi output.

    $ aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursive

    Anda akan melihat satu file Parket per month=YYYY-MM/ partisi ditambah _SUCCESS penanda di root.

  2. Baca partisi secara lokal untuk memverifikasi konten.

    $ aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"

    Skema output meliputistation,,station_name,lat,lon,avg_temp_f,min_temp_f,max_temp_f, total_prcp_inprecip_event_days, danobservation_days.

Memperluas pola

  • Kueri output dengan Spark SQL. Daftarkan output yang dipartisi sebagai tabel dengan AWS Glue Data Catalog dan kueri dengan Spark SQL, Athena, atau alat lain yang membaca tabel katalog. AWS Glue Untuk petunjuk tentang mendaftarkan kumpulan data yang didukung titik akses, lihat. Kueri file dengan SQL menggunakan Amazon Athena

  • Gunakan Iceberg untuk menulis ACID. Untuk beban kerja yang memperbarui atau menggabungkan data, konfigurasikan pekerjaan untuk menulis ke tabel Iceberg pada titik akses, bukan Parket biasa. Amazon EMR Tanpa Server menyertakan runtime Iceberg secara default pada label rilis terbaru.

  • Jalankan secara interaktif dengan Amazon EMR Studio. Lampirkan notebook Jupyter ke aplikasi Amazon EMR Tanpa Server untuk menjelajahi data secara interaktif. Lihat Beban kerja interaktif dengan Amazon EMR Tanpa Server di Panduan Pengguna Tanpa Server Amazon EMR.

  • Jadwalkan pekerjaan. Gunakan Amazon EventBridge Scheduler atau AWS Step Functions untuk menjalankan pekerjaan pada jadwal berulang (misalnya, saat hari data baru mendarat di volume).

Pemecahan masalah

Job gagal dengan AccessDenied pada titik akses

Verifikasi bahwa kebijakan peran pekerjaan memberikan s3:GetObject dan s3:ListBucket pada titik akses ARN (bukan pada ember), dan bahwa titik akses memiliki asal jaringan internet sehingga layanan Amazon EMR Tanpa Server dapat mencapainya.

Job berhasil tetapi outputnya kosong

Periksa jalur input. Amazon S3 ListObjectsV2 memperlakukan awalan secara harfiah, jadi s3://alias/gsod/2024 (tidak ada garis miring) dan s3://alias/gsod/2024/ (garis miring) dapat berperilaku berbeda. Sertakan garis miring saat menunjuk ke direktori file.

Log driver tidak ada di CloudWatch Log

Konfigurasi pemantauan harus --configuration-overrides diteruskanstart-job-run, bukan pada aplikasi. Setiap job run menulis ke aliran lognya sendiri di bawah grup log yang dikonfigurasi.

Bersihkan

Hentikan dan hapus aplikasi, hapus peran IAM, dan hapus semua data yang diunggah yang tidak lagi Anda perlukan.

$ aws emr-serverless stop-application --application-id application-id aws emr-serverless delete-application --application-id application-id aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive