

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

# Esegui job Spark utilizzando Amazon EMR Serverless
<a name="tutorial-run-spark-with-emr-serverless"></a>

I team di progettazione dei dati che eseguono carichi di lavoro Spark, per l'elaborazione dei log, l'ingegneria delle funzionalità, l'ETL complessa o l'analisi scientifica, spesso dispongono di dati di origine su un volume FSx for ONTAP scritti da pipeline di ingestione locali, data mover NFS o SMB o applicazioni che montano direttamente il volume.

Con un punto di accesso Amazon S3 collegato al volume, Amazon EMR Serverless legge i dati attraverso il punto di accesso, esegue il job Spark su di esso e riscrive i risultati sullo stesso volume. Amazon EMR Serverless gestisce automaticamente il ciclo di vita del cluster: invii un lavoro e paghi per i secondi di esecuzione.

Questo modello si adatta ai carichi di lavoro che richiedono un runtime Spark completo (librerie personalizzate, algoritmi iterativi, trasformazioni a lunga durata o notebook interattivi tramite Amazon EMR Studio) in cui le opzioni più leggere, Amazon Athena per SQL e per ETL gestito, non sono la soluzione giusta. AWS Glue Per [Crea pipeline ETL utilizzando AWS Glue](tutorial-transform-data-with-glue.md) informazioni su queste alternative, consulta e. [Interroga i file con SQL utilizzando Amazon Athena](tutorial-query-data-with-athena.md)

In questo tutorial, simuli un team meteorologico che aggrega un anno di osservazioni NOAA Global Surface Summary of the Day (GSOD) organizzate su un volume FSx for ONTAP. Invii un PySpark lavoro che legge i file CSV non elaborati, calcola gli aggregati mensili per stazione (temperatura media, precipitazioni totali e conteggio dei giorni con eventi di precipitazione) e scrive i risultati come Parquet partizionato per mese, il tutto tramite il punto di accesso.

**Nota**  
**Il completamento di questo tutorial richiede dai 30 ai 40 minuti circa.** Le risorse Servizi AWS utilizzate sono soggette a costi per le risorse create. Se completi tempestivamente tutti i passaggi, inclusa la sezione **Pulizia**, il costo previsto è inferiore a **1 USD** negli Stati Uniti orientali (Virginia settentrionale). Regione AWS Questa stima non include i costi correnti per il volume FSx for ONTAP stesso.

## Prerequisiti
<a name="tutorial-emr-prerequisites"></a>
+ Un volume FSx for ONTAP con un access point Amazon S3 collegato. Il punto di accesso deve avere un'origine di rete **Internet** in modo che il servizio Amazon EMR Serverless possa raggiungerlo. Per istruzioni, consulta [Creazione di un access point](fsxn-creating-access-points.md).
+ AWS CLI versione 2 installata e configurata con credenziali in grado di creare ruoli IAM e risorse Amazon EMR Serverless.

## Passaggio 1: carica il set di dati di esempio sull'access point
<a name="tutorial-emr-upload-data"></a>

Il set di dati NOAA GSOD è un set di dati pubblico di osservazioni meteorologiche giornaliere, un file CSV per stazione all'anno. Per questo tutorial, scarichi un sottoinsieme di 100 stazioni dal bucket pubblico `noaa-gsod-pds` Amazon S3 e lo carichi sul tuo punto di accesso.

1. Scarica i primi 100 file delle stazioni per il 2024.

   ```
   $ mkdir -p ~/gsod && cd ~/gsod
   aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt
   while read f; do
       aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors
   done < files.txt
   ls | wc -l
   ```

   Il comando scarica circa 100 file CSV per un totale di circa 7—8 MB.

1. Carica i file nel punto di accesso sotto il prefisso. `gsod/2024/` {{access-point-alias}}Sostituiscilo con l'alias del tuo punto di accesso.

   ```
   $ aws s3 cp ~/gsod/ "s3://{{access-point-alias}}/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors
   ```

## Fase 2: Scrivere il lavoro PySpark
<a name="tutorial-emr-write-script"></a>

Il job legge tutti i file CSV con il prefisso di input, filtra i valori sentinel che rappresentano i dati mancanti, analizza il `FRSHTT` bitfield (Fog, Rain, Snow, Hail, Thunder, Tornado) per contare i giorni associati agli eventi di precipitazione, aggrega per e riscrive Parquet partizionato sull'access point. `(station, month)`

1. Salva lo script seguente in un file denominato. `gsod_monthly.py`

   ```
   # gsod_monthly.py
   import sys
   from pyspark.sql import SparkSession
   from pyspark.sql import functions as F
   
   INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2]
   
   # GSOD sentinels for missing data
   TEMP_SENTINEL = 9999.9
   PRCP_SENTINEL = 99.99
   
   spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate()
   
   raw = spark.read.option("header", True).csv(INPUT_PATH)
   
   cleaned = (raw
       .select(
           F.col("STATION").alias("station"),
           F.col("NAME").alias("station_name"),
           F.col("LATITUDE").cast("double").alias("lat"),
           F.col("LONGITUDE").cast("double").alias("lon"),
           F.to_date("DATE", "yyyy-MM-dd").alias("date"),
           F.col("TEMP").cast("double").alias("temp_f"),
           F.col("PRCP").cast("double").alias("prcp_in"),
           F.col("FRSHTT").alias("frshtt"),
       )
       .filter(F.col("temp_f") != TEMP_SENTINEL)
       .withColumn("month", F.date_format("date", "yyyy-MM"))
       .withColumn(
           "prcp_in",
           F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")),
       )
       # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado.
       # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events.
       .withColumn(
           "had_precip_event",
           F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0),
       )
   )
   
   monthly = (cleaned
       .groupBy("station", "station_name", "lat", "lon", "month")
       .agg(
           F.avg("temp_f").alias("avg_temp_f"),
           F.min("temp_f").alias("min_temp_f"),
           F.max("temp_f").alias("max_temp_f"),
           F.sum("prcp_in").alias("total_prcp_in"),
           F.sum("had_precip_event").alias("precip_event_days"),
           F.count("*").alias("observation_days"),
       )
   )
   
   (monthly.write
       .mode("overwrite")
       .partitionBy("month")
       .parquet(OUTPUT_PATH))
   
   spark.stop()
   ```

1. Carica lo script nel punto di accesso sotto il `scripts/` prefisso.

   ```
   $ aws s3 cp gsod_monthly.py "s3://{{access-point-alias}}/scripts/gsod_monthly.py"
   ```

## Fase 3: creazione del ruolo lavorativo Amazon EMR Serverless
<a name="tutorial-emr-iam-role"></a>

Amazon EMR Serverless assume un ruolo di esecuzione IAM quando esegue il tuo lavoro. Il ruolo richiede le autorizzazioni per leggere e scrivere il punto di accesso e per scrivere i log in Logs. CloudWatch Espandi la sezione seguente per i passaggi di configurazione.

### Per creare il ruolo lavorativo di Amazon EMR Serverless
<a name="tutorial-emr-iam-role-steps"></a>

1. Salva la seguente politica di fiducia come. `emr-trust-policy.json` Consente ad Amazon EMR Serverless di assumere il ruolo.

   ```
   {
       "Version": "2012-10-17", 		 	 	 
       "Statement": [{
           "Effect": "Allow",
           "Principal": {"Service": "emr-serverless.amazonaws.com"},
           "Action": "sts:AssumeRole"
       }]
   }
   ```

1. Salva la seguente politica di autorizzazioni come. `emr-permissions.json` Sostituisci {{region}} {{account-id}} e {{access-point-name}} con i tuoi valori.

   ```
   {
       "Version": "2012-10-17", 		 	 	 
       "Statement": [
           {
               "Sid": "Logs",
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents",
                   "logs:DescribeLogGroups",
                   "logs:DescribeLogStreams"
               ],
               "Resource": "*"
           },
           {
               "Sid": "APRead",
               "Effect": "Allow",
               "Action": ["s3:GetObject", "s3:ListBucket"],
               "Resource": [
                   "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}",
                   "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
               ]
           },
           {
               "Sid": "APWrite",
               "Effect": "Allow",
               "Action": [
                   "s3:PutObject", "s3:DeleteObject",
                   "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts"
               ],
               "Resource": "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
           }
       ]
   }
   ```

1. Crea il ruolo e allega la politica.

   ```
   $ aws iam create-role --role-name fsxn-emr-job-role \
       --assume-role-policy-document file://emr-trust-policy.json
   aws iam put-role-policy --role-name fsxn-emr-job-role \
       --policy-name emr-access --policy-document file://emr-permissions.json
   ```

## Fase 4: Creare e avviare l'applicazione Amazon EMR Serverless
<a name="tutorial-emr-create-app"></a>

Un'applicazione Amazon EMR Serverless è un ambiente di calcolo di lunga durata per un'etichetta e un motore di rilascio specifici (Spark o Hive). Gli invii uno o più lavori. Le applicazioni scalano verso l'alto e verso il basso automaticamente in base alla domanda di lavoro e si esauriscono quando non è in esecuzione nessun lavoro.

1. Crea un'applicazione Spark utilizzando una versione recente di Amazon EMR.

   ```
   $ aws emr-serverless create-application \
       --name fsxn-emr-app --type SPARK --release-label emr-7.0.0
   ```

   Prendere nota dell'ID `applicationId` nella risposta.

1. Avvia l'applicazione. L'avvio preriscalda un piccolo gruppo di lavoratori in modo che il primo lavoro venga eseguito senza ritardi nell'avvio a freddo.

   ```
   $ aws emr-serverless start-application --application-id {{application-id}}
   ```

   Aspetta che lo stato diventi. `STARTED`

   ```
   $ aws emr-serverless get-application --application-id {{application-id}} \
       --query 'application.state'
   ```

## Fase 5: Invia il job Spark
<a name="tutorial-emr-submit-job"></a>

Invia il lavoro utilizzando l'ID dell'applicazione e il ruolo di esecuzione. Il job legge i file CSV non elaborati `gsod/2024/` e li scrive su Parquet partizionato`gsod-monthly/`, entrambi tramite il punto di accesso.

1. Salva la configurazione del driver di lavoro come. `job-driver.json` Sostituisci i segnaposto.

   ```
   {
       "sparkSubmit": {
           "entryPoint": "s3://{{access-point-alias}}/scripts/gsod_monthly.py",
           "entryPointArguments": [
               "s3://{{access-point-alias}}/gsod/2024/",
               "s3://{{access-point-alias}}/gsod-monthly/"
           ],
           "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2"
       }
   }
   ```

1. Salva la seguente configurazione di monitoraggio come. `job-config.json` Invia i log dei driver e degli esecutori a CloudWatch Logs.

   ```
   {
       "monitoringConfiguration": {
           "cloudWatchLoggingConfiguration": {
               "enabled": true,
               "logGroupName": "/aws/emr-serverless/fsxn-emr-app"
           }
       }
   }
   ```

1. Invia il lavoro.

   ```
   $ aws emr-serverless start-job-run \
       --application-id {{application-id}} \
       --execution-role-arn arn:aws:iam::{{account-id}}:role/fsxn-emr-job-role \
       --name gsod-monthly \
       --job-driver file://job-driver.json \
       --configuration-overrides file://job-config.json
   ```

   Prendere nota dell'ID `jobRunId` nella risposta.

1. Esamina lo stato del lavoro. Il lavoro passa da `SCHEDULED` a. `RUNNING` `SUCCESS`

   ```
   $ aws emr-serverless get-job-run \
       --application-id {{application-id}} \
       --job-run-id {{job-run-id}} \
       --query 'jobRun.state'
   ```

**Nota**  
Se il processo fallisce, controlla i log del driver in Logs nel CloudWatch gruppo di log. `/aws/emr-serverless/fsxn-emr-app` Amazon EMR Serverless scrive un flusso di log per ogni esecuzione del processo.

## Fase 6: Ispeziona l'output
<a name="tutorial-emr-inspect-output"></a>

Verifica che il lavoro abbia scritto una partizione Parquet al mese e che l'output sia leggibile.

1. Elenca le partizioni di output.

   ```
   $ aws s3 ls "s3://{{access-point-alias}}/gsod-monthly/" --recursive
   ```

   Dovresti vedere un file Parquet per `month=YYYY-MM/` partizione più un `_SUCCESS` marker nella radice.

1. Leggi una partizione localmente per verificarne il contenuto.

   ```
   $ aws s3 cp "s3://{{access-point-alias}}/gsod-monthly/month=2024-06/" . \
       --recursive --exclude "_SUCCESS"
   python3 -c "import pyarrow.parquet as pq; \
       t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \
       print(t.schema); print(t.to_pandas().head())"
   ```

   Lo schema di output include `station``station_name`,`lat`,`lon`,`avg_temp_f`,`min_temp_f`,`max_temp_f`, `total_prcp_in``precip_event_days`, e`observation_days`.

## Estendere il pattern
<a name="tutorial-emr-extending"></a>
+ **Interroga l'output con Spark SQL.** Registra l'output partizionato come tabella con AWS Glue Data Catalog e interrogalo con Spark SQL, Athena o qualsiasi altro strumento che legga le tabelle del catalogo. AWS Glue Per istruzioni sulla registrazione di un set di dati supportato da un punto di accesso, consulta. [Interroga i file con SQL utilizzando Amazon Athena](tutorial-query-data-with-athena.md)
+ **Usa Iceberg per le scritture ACID.** Per i carichi di lavoro che aggiornano o uniscono i dati, configura il job in modo che scriva su una tabella Iceberg sull'access point anziché su un semplice Parquet. Amazon EMR Serverless include il runtime Iceberg per impostazione predefinita nelle etichette delle release recenti.
+ **Esegui in modo interattivo con Amazon EMR Studio.** Collega un notebook Jupyter all'applicazione Amazon EMR Serverless per esplorare i dati in modo interattivo. Scopri i [carichi di lavoro interattivi con Amazon EMR Serverless nella *Amazon EMR*](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/interactive-workloads.html) Serverless User Guide.
+ **Pianifica il lavoro.** Usa Amazon EventBridge Scheduler o AWS Step Functions per eseguire il lavoro secondo una pianificazione ricorrente (ad esempio, quando un nuovo giorno di dati arriva sul volume).

## Risoluzione dei problemi
<a name="tutorial-emr-troubleshooting"></a>

Job fallisce con `AccessDenied` il punto di accesso  
Verifica che la policy relativa al ruolo lavorativo `s3:GetObject` conceda l'ARN del punto di accesso (non su un bucket) e che il punto di accesso abbia un'origine di rete Internet in modo che il servizio Amazon EMR Serverless possa raggiungerlo. `s3:ListBucket`

Job riuscito ma l'output è vuoto  
Controlla il percorso di input. Amazon S3 `ListObjectsV2` tratta i prefissi alla lettera, quindi `s3://alias/gsod/2024` (nessuna barra finale) e `s3://alias/gsod/2024/` (barra finale) possono comportarsi diversamente. Includi la barra finale quando punti a una directory di file.

I registri dei driver non si trovano nei registri CloudWatch   
La configurazione di monitoraggio deve essere trasmessa `--configuration-overrides` all'applicazione `start-job-run` e non all'applicazione. Ogni esecuzione di processo scrive nel proprio flusso di log all'interno del gruppo di log configurato.

## Eliminazione
<a name="tutorial-emr-clean-up"></a>

Arresta ed elimina l'applicazione, rimuovi il ruolo IAM ed elimina tutti i dati caricati che non ti servono più.

```
$ aws emr-serverless stop-application --application-id {{application-id}}
aws emr-serverless delete-application --application-id {{application-id}}
aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access
aws iam delete-role --role-name fsxn-emr-job-role
aws s3 rm "s3://{{access-point-alias}}/scripts/gsod_monthly.py"
aws s3 rm "s3://{{access-point-alias}}/gsod/" --recursive
aws s3 rm "s3://{{access-point-alias}}/gsod-monthly/" --recursive
```