Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Jalankan pekerjaan Spark menggunakan Amazon EMR Tanpa Server
Tim rekayasa data yang menjalankan beban kerja Spark — untuk pemrosesan log, rekayasa fitur, ETL kompleks, atau analisis ilmiah — sering kali memiliki data sumber pada FSx untuk volume ONTAP yang ditulis oleh pipeline konsumsi lokal, penggerak data NFS atau SMB, atau aplikasi yang memasang volume secara langsung.
Dengan jalur akses Amazon S3 yang terpasang pada volume, Amazon EMR Serverless membaca data melalui titik akses, menjalankan pekerjaan Spark terhadapnya, dan menulis hasilnya kembali ke volume yang sama. Amazon EMR Tanpa Server menangani siklus hidup klaster secara otomatis — Anda mengirimkan pekerjaan dan membayar untuk detik yang dijalankan.
Pola ini sesuai dengan beban kerja yang membutuhkan runtime Spark penuh (pustaka khusus, algoritme berulang, transformasi yang berjalan lama, atau notebook interaktif melalui Amazon EMR Studio) di mana opsi yang lebih ringan - Amazon Athena untuk SQL dan untuk ETL terkelola - tidak cocok. AWS Glue Untuk informasi tentang alternatif tersebut, lihat Kueri file dengan SQL menggunakan Amazon Athena danMembangun pipa ETL menggunakan AWS Glue.
Dalam tutorial ini, Anda mensimulasikan tim meteorologi yang mengumpulkan satu tahun pengamatan NOAA Global Surface Summary of the Day (GSOD) yang dipentaskan pada FSx untuk volume ONTAP. Anda mengirimkan PySpark pekerjaan yang membaca file CSV mentah, menghitung agregat per stasiun bulanan (suhu rata-rata, curah hujan total, dan hitungan hari dengan peristiwa curah hujan), dan menulis hasilnya sebagai Parket yang dipartisi berdasarkan bulan — semua melalui titik akses.
catatan
Tutorial ini membutuhkan waktu sekitar 30 hingga 40 menit untuk menyelesaikannya. Yang Layanan AWS digunakan dikenakan biaya untuk sumber daya yang Anda buat. Jika Anda menyelesaikan semua langkah, termasuk bagian Pembersihan segera, biaya yang diharapkan kurang dari $1 di AS Timur (Virginia Utara). Wilayah AWS Perkiraan ini tidak termasuk biaya yang sedang berlangsung untuk FSx untuk volume ONTAP itu sendiri.
Prasyarat
FSx untuk volume ONTAP dengan titik akses Amazon S3 terpasang. Jalur akses harus memiliki asal jaringan internet sehingga layanan Amazon EMR Tanpa Server dapat mencapainya. Untuk petunjuk, lihat Membuat titik akses.
AWS CLI versi 2 diinstal dan dikonfigurasi dengan kredensil yang dapat membuat peran IAM dan sumber daya Amazon EMR Tanpa Server.
Langkah 1: Unggah kumpulan data sampel ke titik akses
Dataset NOAA GSOD adalah kumpulan data publik dari pengamatan cuaca harian, satu file CSV per stasiun per tahun. Untuk tutorial ini, Anda mengunduh subset 100 stasiun dari bucket Amazon noaa-gsod-pds S3 publik dan mengunggahnya ke titik akses Anda.
-
Unduh 100 file stasiun pertama untuk tahun 2024.
$mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -lPerintah mengunduh sekitar 100 file CSV dengan total sekitar 7-8 MB.
-
Unggah file ke titik akses di bawah
gsod/2024/awalan. Gantiaccess-point-aliasdengan alias titik akses Anda.$aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors
Langkah 2: Tulis PySpark pekerjaan
Pekerjaan membaca semua file CSV di bawah awalan input, menyaring nilai sentinel yang mewakili data yang hilang, mem-parsing FRSHTT bitfield (Fog, Rain, Snow, Hail, Thunder, Tornado) untuk menghitung hari peristiwa presipitasi, agregat per, dan menulis Parket yang dipartisi kembali ke titik akses. (station, month)
-
Simpan skrip berikut ke file bernama
gsod_monthly.py.# gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop() -
Unggah skrip ke titik akses di bawah
scripts/awalan.$aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"
Langkah 3: Buat peran pekerjaan Amazon EMR Tanpa Server
Amazon EMR Tanpa Server mengasumsikan peran eksekusi IAM saat menjalankan pekerjaan Anda. Peran membutuhkan izin untuk membaca dan menulis jalur akses dan menulis log ke CloudWatch Log. Perluas bagian berikut untuk langkah-langkah pengaturan.
-
Simpan kebijakan kepercayaan berikut sebagai
emr-trust-policy.json. Hal ini memungkinkan Amazon EMR Serverless untuk mengambil peran.{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] } -
Simpan kebijakan izin berikut sebagai
emr-permissions.json. Gantiregionaccount-id,, danaccess-point-namedengan nilai-nilai Anda.{ "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] } -
Buat peran dan lampirkan kebijakan.
$aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json
Langkah 4: Buat dan mulai aplikasi Amazon EMR Tanpa Server
Aplikasi Amazon EMR Tanpa Server adalah lingkungan komputasi berumur panjang untuk label rilis dan mesin tertentu (Spark atau Hive). Anda mengirimkan satu atau lebih pekerjaan untuk itu. Skala aplikasi menghitung naik dan turun secara otomatis berdasarkan permintaan pekerjaan dan menganggur saat tidak ada pekerjaan yang berjalan.
-
Buat aplikasi Spark menggunakan rilis Amazon EMR baru-baru ini.
$aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0Catat
applicationIddalam respons. -
Mulai aplikasi. Memulai pra-pemanasan sekelompok kecil pekerja sehingga pekerjaan pertama berjalan tanpa penundaan mulai dingin.
$aws emr-serverless start-application --application-idapplication-idTunggu sampai negara menjadi
STARTED.$aws emr-serverless get-application --application-idapplication-id\ --query 'application.state'
Langkah 5: Kirim pekerjaan Spark
Kirim pekerjaan menggunakan ID aplikasi dan peran eksekusi. Pekerjaan membaca CSV mentah dari gsod/2024/ dan menulis Parket yang dipartisi kegsod-monthly/, keduanya melalui titik akses.
-
Simpan konfigurasi driver pekerjaan sebagai
job-driver.json. Ganti placeholder.{ "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } } -
Simpan konfigurasi pemantauan berikut sebagai
job-config.json. Ini mengirimkan log driver dan eksekutor ke CloudWatch Log.{ "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } } -
Kirimkan pekerjaan.
$aws emr-serverless start-job-run \ --application-idapplication-id\ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.jsonCatat
jobRunIddalam respons. -
Polling status pekerjaan. Transisi pekerjaan dari
SCHEDULEDkeRUNNINGkeSUCCESS.$aws emr-serverless get-job-run \ --application-idapplication-id\ --job-run-idjob-run-id\ --query 'jobRun.state'
catatan
Jika pekerjaan gagal, periksa log driver di CloudWatch Log di bawah grup log/aws/emr-serverless/fsxn-emr-app. Amazon EMR Tanpa Server menulis satu aliran log per pekerjaan yang dijalankan.
Langkah 6: Periksa outputnya
Verifikasi bahwa pekerjaan menulis satu partisi Parket per bulan dan bahwa output dapat dibaca.
-
Buat daftar partisi output.
$aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursiveAnda akan melihat satu file Parket per
month=YYYY-MM/partisi ditambah_SUCCESSpenanda di root. -
Baca partisi secara lokal untuk memverifikasi konten.
$aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"Skema output meliputi
station,,station_name,lat,lon,avg_temp_f,min_temp_f,max_temp_f,total_prcp_inprecip_event_days, danobservation_days.
Memperluas pola
Kueri output dengan Spark SQL. Daftarkan output yang dipartisi sebagai tabel dengan AWS Glue Data Catalog dan kueri dengan Spark SQL, Athena, atau alat lain yang membaca tabel katalog. AWS Glue Untuk petunjuk tentang mendaftarkan kumpulan data yang didukung titik akses, lihat. Kueri file dengan SQL menggunakan Amazon Athena
Gunakan Iceberg untuk menulis ACID. Untuk beban kerja yang memperbarui atau menggabungkan data, konfigurasikan pekerjaan untuk menulis ke tabel Iceberg pada titik akses, bukan Parket biasa. Amazon EMR Tanpa Server menyertakan runtime Iceberg secara default pada label rilis terbaru.
Jalankan secara interaktif dengan Amazon EMR Studio. Lampirkan notebook Jupyter ke aplikasi Amazon EMR Tanpa Server untuk menjelajahi data secara interaktif. Lihat Beban kerja interaktif dengan Amazon EMR Tanpa Server di Panduan Pengguna Tanpa Server Amazon EMR.
Jadwalkan pekerjaan. Gunakan Amazon EventBridge Scheduler atau AWS Step Functions untuk menjalankan pekerjaan pada jadwal berulang (misalnya, saat hari data baru mendarat di volume).
Pemecahan masalah
- Job gagal dengan
AccessDeniedpada titik akses Verifikasi bahwa kebijakan peran pekerjaan memberikan
s3:GetObjectdans3:ListBucketpada titik akses ARN (bukan pada ember), dan bahwa titik akses memiliki asal jaringan internet sehingga layanan Amazon EMR Tanpa Server dapat mencapainya.- Job berhasil tetapi outputnya kosong
Periksa jalur input. Amazon S3
ListObjectsV2memperlakukan awalan secara harfiah, jadis3://alias/gsod/2024(tidak ada garis miring) dans3://alias/gsod/2024/(garis miring) dapat berperilaku berbeda. Sertakan garis miring saat menunjuk ke direktori file.- Log driver tidak ada di CloudWatch Log
Konfigurasi pemantauan harus
--configuration-overridesditeruskanstart-job-run, bukan pada aplikasi. Setiap job run menulis ke aliran lognya sendiri di bawah grup log yang dikonfigurasi.
Bersihkan
Hentikan dan hapus aplikasi, hapus peran IAM, dan hapus semua data yang diunggah yang tidak lagi Anda perlukan.
$aws emr-serverless stop-application --application-idapplication-idaws emr-serverless delete-application --application-idapplication-idaws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive