

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

# Exécutez des tâches Spark à l'aide d'Amazon EMR Serverless
<a name="tutorial-run-spark-with-emr-serverless"></a>

Les équipes d'ingénierie des données qui exécutent les charges de travail Spark (pour le traitement des journaux, l'ingénierie des fonctionnalités, l'ETL complexe ou l'analyse scientifique) disposent souvent de données sources sur un volume FSx for ONTAP écrites par des pipelines d'ingestion locaux, des data movers NFS ou SMB, ou des applications qui montent le volume directement.

Lorsqu'un point d'accès Amazon S3 est attaché au volume, Amazon EMR Serverless lit les données via le point d'accès, exécute la tâche Spark sur celui-ci et réécrit les résultats sur le même volume. Amazon EMR Serverless gère automatiquement le cycle de vie du cluster : vous soumettez une tâche et vous payez pour les secondes pendant lesquelles elle est exécutée.

Ce modèle convient aux charges de travail qui nécessitent un environnement d'exécution Spark complet (bibliothèques personnalisées, algorithmes itératifs, transformations de longue durée ou blocs-notes interactifs via Amazon EMR Studio) pour lesquelles les options plus légères (Amazon Athena pour AWS Glue SQL et pour l'ETL géré) ne conviennent pas. Pour plus d'informations sur ces alternatives, voir [Interrogez des fichiers avec SQL à l'aide d'Amazon Athena](tutorial-query-data-with-athena.md) et[Créez des pipelines ETL en utilisant AWS Glue](tutorial-transform-data-with-glue.md).

Dans ce didacticiel, vous simulez une équipe de météorologie agrégeant une année d'observations du Global Surface Summary of the Day (GSOD) de la NOAA effectuées sur un volume FSx pour ONTAP. Vous soumettez une PySpark tâche qui lit les fichiers CSV bruts, calcule les agrégats mensuels par station (température moyenne, précipitations totales et nombre de jours de précipitations) et enregistre les résultats sous forme de parquet partitionné par mois, le tout via le point d'accès.

**Note**  
Ce didacticiel prend environ **30 à 40 minutes**. Les ressources que vous Services AWS utilisez sont facturées pour les ressources que vous créez. Si vous effectuez rapidement toutes les étapes, y compris la section **Nettoyage**, le coût prévu est inférieur à **1$** dans l'est des États-Unis (Virginie du Nord) Région AWS. Cette estimation n'inclut pas les frais permanents pour le volume FSx for ONTAP lui-même.

## Conditions préalables
<a name="tutorial-emr-prerequisites"></a>
+ Un volume FSx for ONTAP associé à un point d'accès Amazon S3. Le point d'accès doit avoir une origine de réseau **Internet** pour que le service Amazon EMR Serverless puisse y accéder. Pour obtenir des instructions, veuillez consulter [Création d’un point d’accès](fsxn-creating-access-points.md).
+ AWS CLI version 2 installée et configurée avec des informations d'identification permettant de créer des rôles IAM et des ressources Amazon EMR Serverless.

## Étape 1 : télécharger l'exemple de jeu de données sur le point d'accès
<a name="tutorial-emr-upload-data"></a>

Le jeu de données GSOD de la NOAA est un ensemble de données public d'observations météorologiques quotidiennes, un fichier CSV par station et par an. Dans le cadre de ce didacticiel, vous devez télécharger un sous-ensemble de 100 stations depuis le compartiment public `noaa-gsod-pds` Amazon S3 et le charger sur votre point d'accès.

1. Téléchargez les 100 premiers fichiers de stations pour 2024.

   ```
   $ mkdir -p ~/gsod && cd ~/gsod
   aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt
   while read f; do
       aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors
   done < files.txt
   ls | wc -l
   ```

   La commande télécharge environ 100 fichiers CSV d'une taille totale d'environ 7 à 8 Mo.

1. Téléchargez les fichiers sur le point d'accès sous le `gsod/2024/` préfixe. {{access-point-alias}}Remplacez-le par l'alias de votre point d'accès.

   ```
   $ aws s3 cp ~/gsod/ "s3://{{access-point-alias}}/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors
   ```

## Étape 2 : Rédiger le PySpark travail
<a name="tutorial-emr-write-script"></a>

La tâche lit tous les fichiers CSV sous le préfixe d'entrée, filtre les valeurs sentinelles qui représentent les données manquantes, analyse le `FRSHTT` champ de bits (brouillard, pluie, neige, grêle, tonnerre, tornade) pour compter les jours de précipitations, agrège par et renvoie le parquet partitionné au point d'`(station, month)`accès.

1. Enregistrez le script suivant dans un fichier nommé`gsod_monthly.py`.

   ```
   # gsod_monthly.py
   import sys
   from pyspark.sql import SparkSession
   from pyspark.sql import functions as F
   
   INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2]
   
   # GSOD sentinels for missing data
   TEMP_SENTINEL = 9999.9
   PRCP_SENTINEL = 99.99
   
   spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate()
   
   raw = spark.read.option("header", True).csv(INPUT_PATH)
   
   cleaned = (raw
       .select(
           F.col("STATION").alias("station"),
           F.col("NAME").alias("station_name"),
           F.col("LATITUDE").cast("double").alias("lat"),
           F.col("LONGITUDE").cast("double").alias("lon"),
           F.to_date("DATE", "yyyy-MM-dd").alias("date"),
           F.col("TEMP").cast("double").alias("temp_f"),
           F.col("PRCP").cast("double").alias("prcp_in"),
           F.col("FRSHTT").alias("frshtt"),
       )
       .filter(F.col("temp_f") != TEMP_SENTINEL)
       .withColumn("month", F.date_format("date", "yyyy-MM"))
       .withColumn(
           "prcp_in",
           F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")),
       )
       # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado.
       # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events.
       .withColumn(
           "had_precip_event",
           F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0),
       )
   )
   
   monthly = (cleaned
       .groupBy("station", "station_name", "lat", "lon", "month")
       .agg(
           F.avg("temp_f").alias("avg_temp_f"),
           F.min("temp_f").alias("min_temp_f"),
           F.max("temp_f").alias("max_temp_f"),
           F.sum("prcp_in").alias("total_prcp_in"),
           F.sum("had_precip_event").alias("precip_event_days"),
           F.count("*").alias("observation_days"),
       )
   )
   
   (monthly.write
       .mode("overwrite")
       .partitionBy("month")
       .parquet(OUTPUT_PATH))
   
   spark.stop()
   ```

1. Téléchargez le script sur le point d'accès sous le `scripts/` préfixe.

   ```
   $ aws s3 cp gsod_monthly.py "s3://{{access-point-alias}}/scripts/gsod_monthly.py"
   ```

## Étape 3 : Création du rôle de travail Amazon EMR Serverless
<a name="tutorial-emr-iam-role"></a>

Amazon EMR Serverless assume un rôle d'exécution IAM lorsqu'il exécute votre tâche. Le rôle a besoin d'autorisations pour lire et écrire le point d'accès et pour écrire des CloudWatch journaux dans Logs. Développez la section suivante pour les étapes de configuration.

### Pour créer le rôle de travail Amazon EMR Serverless
<a name="tutorial-emr-iam-role-steps"></a>

1. Enregistrez la politique de confiance suivante sous`emr-trust-policy.json`. Cela permet à Amazon EMR Serverless d'assumer le rôle.

   ```
   {
       "Version": "2012-10-17", 		 	 	 
       "Statement": [{
           "Effect": "Allow",
           "Principal": {"Service": "emr-serverless.amazonaws.com"},
           "Action": "sts:AssumeRole"
       }]
   }
   ```

1. Enregistrez la politique d'autorisation suivante sous`emr-permissions.json`. Remplacez {{region}}{{account-id}}, et {{access-point-name}} par vos valeurs.

   ```
   {
       "Version": "2012-10-17", 		 	 	 
       "Statement": [
           {
               "Sid": "Logs",
               "Effect": "Allow",
               "Action": [
                   "logs:CreateLogGroup",
                   "logs:CreateLogStream",
                   "logs:PutLogEvents",
                   "logs:DescribeLogGroups",
                   "logs:DescribeLogStreams"
               ],
               "Resource": "*"
           },
           {
               "Sid": "APRead",
               "Effect": "Allow",
               "Action": ["s3:GetObject", "s3:ListBucket"],
               "Resource": [
                   "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}",
                   "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
               ]
           },
           {
               "Sid": "APWrite",
               "Effect": "Allow",
               "Action": [
                   "s3:PutObject", "s3:DeleteObject",
                   "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts"
               ],
               "Resource": "arn:aws:s3:{{region}}:{{account-id}}:accesspoint/{{access-point-name}}/object/*"
           }
       ]
   }
   ```

1. Créez le rôle et associez la politique.

   ```
   $ aws iam create-role --role-name fsxn-emr-job-role \
       --assume-role-policy-document file://emr-trust-policy.json
   aws iam put-role-policy --role-name fsxn-emr-job-role \
       --policy-name emr-access --policy-document file://emr-permissions.json
   ```

## Étape 4 : créer et démarrer l'application Amazon EMR Serverless
<a name="tutorial-emr-create-app"></a>

Une application Amazon EMR Serverless est un environnement informatique de longue durée destiné à une étiquette de version et à un moteur spécifiques (Spark ou Hive). Vous y soumettez une ou plusieurs offres d'emploi. Les applications augmentent ou diminuent automatiquement les capacités de calcul en fonction de la demande des tâches et s'interrompent lorsqu'aucune tâche n'est en cours d'exécution.

1. Créez une application Spark à l'aide d'une version récente d'Amazon EMR.

   ```
   $ aws emr-serverless create-application \
       --name fsxn-emr-app --type SPARK --release-label emr-7.0.0
   ```

   Notez le `applicationId` dans la réponse.

1. Lancez l'application. Le démarrage permet de préchauffer un petit groupe de travailleurs afin que la première tâche soit exécutée sans délai de démarrage à froid.

   ```
   $ aws emr-serverless start-application --application-id {{application-id}}
   ```

   Attendez que l'État le devienne`STARTED`.

   ```
   $ aws emr-serverless get-application --application-id {{application-id}} \
       --query 'application.state'
   ```

## Étape 5 : Soumettre le job Spark
<a name="tutorial-emr-submit-job"></a>

Soumettez la tâche en utilisant l'ID de l'application et le rôle d'exécution. La tâche lit les fichiers CSV bruts `gsod/2024/` et y écrit du Parquet partitionné`gsod-monthly/`, par le biais du point d'accès.

1. Enregistrez la configuration du pilote de tâche sous`job-driver.json`. Remplacez les espaces réservés.

   ```
   {
       "sparkSubmit": {
           "entryPoint": "s3://{{access-point-alias}}/scripts/gsod_monthly.py",
           "entryPointArguments": [
               "s3://{{access-point-alias}}/gsod/2024/",
               "s3://{{access-point-alias}}/gsod-monthly/"
           ],
           "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2"
       }
   }
   ```

1. Enregistrez la configuration de surveillance suivante sous`job-config.json`. Il envoie les journaux du pilote et de l'exécuteur à CloudWatch Logs.

   ```
   {
       "monitoringConfiguration": {
           "cloudWatchLoggingConfiguration": {
               "enabled": true,
               "logGroupName": "/aws/emr-serverless/fsxn-emr-app"
           }
       }
   }
   ```

1. Soumettez le job.

   ```
   $ aws emr-serverless start-job-run \
       --application-id {{application-id}} \
       --execution-role-arn arn:aws:iam::{{account-id}}:role/fsxn-emr-job-role \
       --name gsod-monthly \
       --job-driver file://job-driver.json \
       --configuration-overrides file://job-config.json
   ```

   Notez le `jobRunId` dans la réponse.

1. Interrogez le statut du poste. Le travail passe de `SCHEDULED` `RUNNING` à`SUCCESS`.

   ```
   $ aws emr-serverless get-job-run \
       --application-id {{application-id}} \
       --job-run-id {{job-run-id}} \
       --query 'jobRun.state'
   ```

**Note**  
Si la tâche échoue, vérifiez que le pilote se connecte dans CloudWatch les journaux du groupe de journaux`/aws/emr-serverless/fsxn-emr-app`. Amazon EMR Serverless écrit un flux de journal par tâche exécutée.

## Étape 6 : Inspecter la sortie
<a name="tutorial-emr-inspect-output"></a>

Vérifiez que le travail écrit une partition Parquet par mois et que le résultat est lisible.

1. Répertoriez les partitions de sortie.

   ```
   $ aws s3 ls "s3://{{access-point-alias}}/gsod-monthly/" --recursive
   ```

   Vous devriez voir un fichier Parquet par `month=YYYY-MM/` partition plus un `_SUCCESS` marqueur à la racine.

1. Lisez une partition localement pour vérifier le contenu.

   ```
   $ aws s3 cp "s3://{{access-point-alias}}/gsod-monthly/month=2024-06/" . \
       --recursive --exclude "_SUCCESS"
   python3 -c "import pyarrow.parquet as pq; \
       t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \
       print(t.schema); print(t.to_pandas().head())"
   ```

   Le schéma de sortie inclut `station``station_name`,`lat`,,`lon`,`avg_temp_f`,`min_temp_f`,`max_temp_f`,`total_prcp_in`,`precip_event_days`, et`observation_days`.

## Extension du motif
<a name="tutorial-emr-extending"></a>
+ **Interrogez le résultat avec Spark SQL.** Enregistrez la sortie partitionnée sous forme de table auprès du AWS Glue Data Catalog et interrogez-la avec Spark SQL, Athena ou tout autre outil qui AWS Glue lit les tables de catalogue. Pour obtenir des instructions sur l'enregistrement d'un jeu de données basé sur des points d'accès, consultez. [Interrogez des fichiers avec SQL à l'aide d'Amazon Athena](tutorial-query-data-with-athena.md)
+ **Utilisez Iceberg pour les écritures ACID.** Pour les charges de travail qui mettent à jour ou fusionnent des données, configurez la tâche pour écrire dans une table Iceberg sur le point d'accès plutôt que dans un simple Parquet. Amazon EMR Serverless inclut le moteur d'exécution Iceberg par défaut sur les étiquettes des versions récentes.
+ **Exécutez de manière interactive avec Amazon EMR Studio.** Joignez un bloc-notes Jupyter à l'application Amazon EMR Serverless pour explorer les données de manière interactive. Consultez la section [Charges de travail interactives avec Amazon EMR](https://docs.aws.amazon.com/emr/latest/EMR-Serverless-UserGuide/interactive-workloads.html) Serverless dans le guide de l'utilisateur *Amazon EMR* Serverless.
+ **Planifiez le travail.** Utilisez Amazon EventBridge Scheduler ou AWS Step Functions pour exécuter la tâche selon un calendrier récurrent (par exemple, lorsqu'un nouveau jour de données arrive sur le volume).

## Résolution des problèmes
<a name="tutorial-emr-troubleshooting"></a>

Le job échoue `AccessDenied` sur le point d'accès  
Vérifiez que la politique relative aux rôles professionnels accorde `s3:GetObject` et `s3:ListBucket` sur l'ARN du point d'accès (et non sur un compartiment) et que le point d'accès provient d'un réseau Internet afin que le service Amazon EMR Serverless puisse y accéder.

Job réussi mais la sortie est vide  
Vérifiez le chemin d'entrée. Amazon S3 `ListObjectsV2` traite les préfixes de manière littérale, de sorte que `s3://alias/gsod/2024` (pas de barre oblique de fin) et `s3://alias/gsod/2024/` (barre oblique de fin) peuvent se comporter différemment. Incluez la barre oblique finale lorsque vous pointez sur un répertoire de fichiers.

Les journaux des pilotes ne sont pas dans CloudWatch les journaux  
La configuration de surveillance doit être transmise `--configuration-overrides` sur l'application `start-job-run` et non sur l'application. Chaque tâche exécutée écrit dans son propre flux de journaux sous le groupe de journaux configuré.

## Nettoyage
<a name="tutorial-emr-clean-up"></a>

Arrêtez et supprimez l'application, supprimez le rôle IAM et supprimez toutes les données téléchargées dont vous n'avez plus besoin.

```
$ aws emr-serverless stop-application --application-id {{application-id}}
aws emr-serverless delete-application --application-id {{application-id}}
aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access
aws iam delete-role --role-name fsxn-emr-job-role
aws s3 rm "s3://{{access-point-alias}}/scripts/gsod_monthly.py"
aws s3 rm "s3://{{access-point-alias}}/gsod/" --recursive
aws s3 rm "s3://{{access-point-alias}}/gsod-monthly/" --recursive
```