View a markdown version of this page

Esegui job Spark utilizzando Amazon EMR Serverless - FSx per ONTAP

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esegui job Spark utilizzando Amazon EMR Serverless

I team di progettazione dei dati che eseguono carichi di lavoro Spark, per l'elaborazione dei log, l'ingegneria delle funzionalità, l'ETL complessa o l'analisi scientifica, spesso dispongono di dati di origine su un volume FSx for ONTAP scritti da pipeline di ingestione locali, data mover NFS o SMB o applicazioni che montano direttamente il volume.

Con un punto di accesso Amazon S3 collegato al volume, Amazon EMR Serverless legge i dati attraverso il punto di accesso, esegue il job Spark su di esso e riscrive i risultati sullo stesso volume. Amazon EMR Serverless gestisce automaticamente il ciclo di vita del cluster: invii un lavoro e paghi per i secondi di esecuzione.

Questo modello si adatta ai carichi di lavoro che richiedono un runtime Spark completo (librerie personalizzate, algoritmi iterativi, trasformazioni a lunga durata o notebook interattivi tramite Amazon EMR Studio) in cui le opzioni più leggere, Amazon Athena per SQL e per ETL gestito, non sono la soluzione giusta. AWS Glue Per Crea pipeline ETL utilizzando AWS Glue informazioni su queste alternative, consulta e. Interroga i file con SQL utilizzando Amazon Athena

In questo tutorial, simuli un team meteorologico che aggrega un anno di osservazioni NOAA Global Surface Summary of the Day (GSOD) organizzate su un volume FSx for ONTAP. Invii un PySpark lavoro che legge i file CSV non elaborati, calcola gli aggregati mensili per stazione (temperatura media, precipitazioni totali e conteggio dei giorni con eventi di precipitazione) e scrive i risultati come Parquet partizionato per mese, il tutto tramite il punto di accesso.

Nota

Il completamento di questo tutorial richiede dai 30 ai 40 minuti circa. Le risorse Servizi AWS utilizzate sono soggette a costi per le risorse create. Se completi tempestivamente tutti i passaggi, inclusa la sezione Pulizia, il costo previsto è inferiore a 1 USD negli Stati Uniti orientali (Virginia settentrionale). Regione AWS Questa stima non include i costi correnti per il volume FSx for ONTAP stesso.

Prerequisiti

  • Un volume FSx for ONTAP con un access point Amazon S3 collegato. Il punto di accesso deve avere un'origine di rete Internet in modo che il servizio Amazon EMR Serverless possa raggiungerlo. Per istruzioni, consulta Creazione di un access point.

  • AWS CLI versione 2 installata e configurata con credenziali in grado di creare ruoli IAM e risorse Amazon EMR Serverless.

Passaggio 1: carica il set di dati di esempio sull'access point

Il set di dati NOAA GSOD è un set di dati pubblico di osservazioni meteorologiche giornaliere, un file CSV per stazione all'anno. Per questo tutorial, scarichi un sottoinsieme di 100 stazioni dal bucket pubblico noaa-gsod-pds Amazon S3 e lo carichi sul tuo punto di accesso.

  1. Scarica i primi 100 file delle stazioni per il 2024.

    $ mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -l

    Il comando scarica circa 100 file CSV per un totale di circa 7—8 MB.

  2. Carica i file nel punto di accesso sotto il prefisso. gsod/2024/ access-point-aliasSostituiscilo con l'alias del tuo punto di accesso.

    $ aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors

Fase 2: Scrivere il lavoro PySpark

Il job legge tutti i file CSV con il prefisso di input, filtra i valori sentinel che rappresentano i dati mancanti, analizza il FRSHTT bitfield (Fog, Rain, Snow, Hail, Thunder, Tornado) per contare i giorni associati agli eventi di precipitazione, aggrega per e riscrive Parquet partizionato sull'access point. (station, month)

  1. Salva lo script seguente in un file denominato. gsod_monthly.py

    # gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop()
  2. Carica lo script nel punto di accesso sotto il scripts/ prefisso.

    $ aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"

Fase 3: creazione del ruolo lavorativo Amazon EMR Serverless

Amazon EMR Serverless assume un ruolo di esecuzione IAM quando esegue il tuo lavoro. Il ruolo richiede le autorizzazioni per leggere e scrivere il punto di accesso e per scrivere i log in Logs. CloudWatch Espandi la sezione seguente per i passaggi di configurazione.

  1. Salva la seguente politica di fiducia come. emr-trust-policy.json Consente ad Amazon EMR Serverless di assumere il ruolo.

    { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] }
  2. Salva la seguente politica di autorizzazioni come. emr-permissions.json Sostituisci region account-id e access-point-name con i tuoi valori.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] }
  3. Crea il ruolo e allega la politica.

    $ aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json

Fase 4: Creare e avviare l'applicazione Amazon EMR Serverless

Un'applicazione Amazon EMR Serverless è un ambiente di calcolo di lunga durata per un'etichetta e un motore di rilascio specifici (Spark o Hive). Gli invii uno o più lavori. Le applicazioni scalano verso l'alto e verso il basso automaticamente in base alla domanda di lavoro e si esauriscono quando non è in esecuzione nessun lavoro.

  1. Crea un'applicazione Spark utilizzando una versione recente di Amazon EMR.

    $ aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0

    Prendere nota dell'ID applicationId nella risposta.

  2. Avvia l'applicazione. L'avvio preriscalda un piccolo gruppo di lavoratori in modo che il primo lavoro venga eseguito senza ritardi nell'avvio a freddo.

    $ aws emr-serverless start-application --application-id application-id

    Aspetta che lo stato diventi. STARTED

    $ aws emr-serverless get-application --application-id application-id \ --query 'application.state'

Fase 5: Invia il job Spark

Invia il lavoro utilizzando l'ID dell'applicazione e il ruolo di esecuzione. Il job legge i file CSV non elaborati gsod/2024/ e li scrive su Parquet partizionatogsod-monthly/, entrambi tramite il punto di accesso.

  1. Salva la configurazione del driver di lavoro come. job-driver.json Sostituisci i segnaposto.

    { "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } }
  2. Salva la seguente configurazione di monitoraggio come. job-config.json Invia i log dei driver e degli esecutori a CloudWatch Logs.

    { "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } }
  3. Invia il lavoro.

    $ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.json

    Prendere nota dell'ID jobRunId nella risposta.

  4. Esamina lo stato del lavoro. Il lavoro passa da SCHEDULED a. RUNNING SUCCESS

    $ aws emr-serverless get-job-run \ --application-id application-id \ --job-run-id job-run-id \ --query 'jobRun.state'
Nota

Se il processo fallisce, controlla i log del driver in Logs nel CloudWatch gruppo di log. /aws/emr-serverless/fsxn-emr-app Amazon EMR Serverless scrive un flusso di log per ogni esecuzione del processo.

Fase 6: Ispeziona l'output

Verifica che il lavoro abbia scritto una partizione Parquet al mese e che l'output sia leggibile.

  1. Elenca le partizioni di output.

    $ aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursive

    Dovresti vedere un file Parquet per month=YYYY-MM/ partizione più un _SUCCESS marker nella radice.

  2. Leggi una partizione localmente per verificarne il contenuto.

    $ aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"

    Lo schema di output include stationstation_name,lat,lon,avg_temp_f,min_temp_f,max_temp_f, total_prcp_inprecip_event_days, eobservation_days.

Estendere il pattern

  • Interroga l'output con Spark SQL. Registra l'output partizionato come tabella con AWS Glue Data Catalog e interrogalo con Spark SQL, Athena o qualsiasi altro strumento che legga le tabelle del catalogo. AWS Glue Per istruzioni sulla registrazione di un set di dati supportato da un punto di accesso, consulta. Interroga i file con SQL utilizzando Amazon Athena

  • Usa Iceberg per le scritture ACID. Per i carichi di lavoro che aggiornano o uniscono i dati, configura il job in modo che scriva su una tabella Iceberg sull'access point anziché su un semplice Parquet. Amazon EMR Serverless include il runtime Iceberg per impostazione predefinita nelle etichette delle release recenti.

  • Esegui in modo interattivo con Amazon EMR Studio. Collega un notebook Jupyter all'applicazione Amazon EMR Serverless per esplorare i dati in modo interattivo. Scopri i carichi di lavoro interattivi con Amazon EMR Serverless nella Amazon EMR Serverless User Guide.

  • Pianifica il lavoro. Usa Amazon EventBridge Scheduler o AWS Step Functions per eseguire il lavoro secondo una pianificazione ricorrente (ad esempio, quando un nuovo giorno di dati arriva sul volume).

Risoluzione dei problemi

Job fallisce con AccessDenied il punto di accesso

Verifica che la policy relativa al ruolo lavorativo s3:GetObject conceda l'ARN del punto di accesso (non su un bucket) e che il punto di accesso abbia un'origine di rete Internet in modo che il servizio Amazon EMR Serverless possa raggiungerlo. s3:ListBucket

Job riuscito ma l'output è vuoto

Controlla il percorso di input. Amazon S3 ListObjectsV2 tratta i prefissi alla lettera, quindi s3://alias/gsod/2024 (nessuna barra finale) e s3://alias/gsod/2024/ (barra finale) possono comportarsi diversamente. Includi la barra finale quando punti a una directory di file.

I registri dei driver non si trovano nei registri CloudWatch

La configurazione di monitoraggio deve essere trasmessa --configuration-overrides all'applicazione start-job-run e non all'applicazione. Ogni esecuzione di processo scrive nel proprio flusso di log all'interno del gruppo di log configurato.

Eliminazione

Arresta ed elimina l'applicazione, rimuovi il ruolo IAM ed elimina tutti i dati caricati che non ti servono più.

$ aws emr-serverless stop-application --application-id application-id aws emr-serverless delete-application --application-id application-id aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive