View a markdown version of this page

Exécutez des tâches Spark à l'aide d'Amazon EMR Serverless - FSx pour ONTAP

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exécutez des tâches Spark à l'aide d'Amazon EMR Serverless

Les équipes d'ingénierie des données qui exécutent les charges de travail Spark (pour le traitement des journaux, l'ingénierie des fonctionnalités, l'ETL complexe ou l'analyse scientifique) disposent souvent de données sources sur un volume FSx for ONTAP écrites par des pipelines d'ingestion locaux, des data movers NFS ou SMB, ou des applications qui montent le volume directement.

Lorsqu'un point d'accès Amazon S3 est attaché au volume, Amazon EMR Serverless lit les données via le point d'accès, exécute la tâche Spark sur celui-ci et réécrit les résultats sur le même volume. Amazon EMR Serverless gère automatiquement le cycle de vie du cluster : vous soumettez une tâche et vous payez pour les secondes pendant lesquelles elle est exécutée.

Ce modèle convient aux charges de travail qui nécessitent un environnement d'exécution Spark complet (bibliothèques personnalisées, algorithmes itératifs, transformations de longue durée ou blocs-notes interactifs via Amazon EMR Studio) pour lesquelles les options plus légères (Amazon Athena pour AWS Glue SQL et pour l'ETL géré) ne conviennent pas. Pour plus d'informations sur ces alternatives, voir Interrogez des fichiers avec SQL à l'aide d'Amazon Athena etCréez des pipelines ETL en utilisant AWS Glue.

Dans ce didacticiel, vous simulez une équipe de météorologie agrégeant une année d'observations du Global Surface Summary of the Day (GSOD) de la NOAA effectuées sur un volume FSx pour ONTAP. Vous soumettez une PySpark tâche qui lit les fichiers CSV bruts, calcule les agrégats mensuels par station (température moyenne, précipitations totales et nombre de jours de précipitations) et enregistre les résultats sous forme de parquet partitionné par mois, le tout via le point d'accès.

Note

Ce didacticiel prend environ 30 à 40 minutes. Les ressources que vous Services AWS utilisez sont facturées pour les ressources que vous créez. Si vous effectuez rapidement toutes les étapes, y compris la section Nettoyage, le coût prévu est inférieur à 1$ dans l'est des États-Unis (Virginie du Nord) Région AWS. Cette estimation n'inclut pas les frais permanents pour le volume FSx for ONTAP lui-même.

Conditions préalables

  • Un volume FSx for ONTAP associé à un point d'accès Amazon S3. Le point d'accès doit avoir une origine de réseau Internet pour que le service Amazon EMR Serverless puisse y accéder. Pour obtenir des instructions, veuillez consulter Création d’un point d’accès.

  • AWS CLI version 2 installée et configurée avec des informations d'identification permettant de créer des rôles IAM et des ressources Amazon EMR Serverless.

Étape 1 : télécharger l'exemple de jeu de données sur le point d'accès

Le jeu de données GSOD de la NOAA est un ensemble de données public d'observations météorologiques quotidiennes, un fichier CSV par station et par an. Dans le cadre de ce didacticiel, vous devez télécharger un sous-ensemble de 100 stations depuis le compartiment public noaa-gsod-pds Amazon S3 et le charger sur votre point d'accès.

  1. Téléchargez les 100 premiers fichiers de stations pour 2024.

    $ mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -l

    La commande télécharge environ 100 fichiers CSV d'une taille totale d'environ 7 à 8 Mo.

  2. Téléchargez les fichiers sur le point d'accès sous le gsod/2024/ préfixe. access-point-aliasRemplacez-le par l'alias de votre point d'accès.

    $ aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors

Étape 2 : Rédiger le PySpark travail

La tâche lit tous les fichiers CSV sous le préfixe d'entrée, filtre les valeurs sentinelles qui représentent les données manquantes, analyse le FRSHTT champ de bits (brouillard, pluie, neige, grêle, tonnerre, tornade) pour compter les jours de précipitations, agrège par et renvoie le parquet partitionné au point d'(station, month)accès.

  1. Enregistrez le script suivant dans un fichier nommégsod_monthly.py.

    # gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop()
  2. Téléchargez le script sur le point d'accès sous le scripts/ préfixe.

    $ aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"

Étape 3 : Création du rôle de travail Amazon EMR Serverless

Amazon EMR Serverless assume un rôle d'exécution IAM lorsqu'il exécute votre tâche. Le rôle a besoin d'autorisations pour lire et écrire le point d'accès et pour écrire des CloudWatch journaux dans Logs. Développez la section suivante pour les étapes de configuration.

  1. Enregistrez la politique de confiance suivante sousemr-trust-policy.json. Cela permet à Amazon EMR Serverless d'assumer le rôle.

    { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] }
  2. Enregistrez la politique d'autorisation suivante sousemr-permissions.json. Remplacez regionaccount-id, et access-point-name par vos valeurs.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] }
  3. Créez le rôle et associez la politique.

    $ aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json

Étape 4 : créer et démarrer l'application Amazon EMR Serverless

Une application Amazon EMR Serverless est un environnement informatique de longue durée destiné à une étiquette de version et à un moteur spécifiques (Spark ou Hive). Vous y soumettez une ou plusieurs offres d'emploi. Les applications augmentent ou diminuent automatiquement les capacités de calcul en fonction de la demande des tâches et s'interrompent lorsqu'aucune tâche n'est en cours d'exécution.

  1. Créez une application Spark à l'aide d'une version récente d'Amazon EMR.

    $ aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0

    Notez le applicationId dans la réponse.

  2. Lancez l'application. Le démarrage permet de préchauffer un petit groupe de travailleurs afin que la première tâche soit exécutée sans délai de démarrage à froid.

    $ aws emr-serverless start-application --application-id application-id

    Attendez que l'État le devienneSTARTED.

    $ aws emr-serverless get-application --application-id application-id \ --query 'application.state'

Étape 5 : Soumettre le job Spark

Soumettez la tâche en utilisant l'ID de l'application et le rôle d'exécution. La tâche lit les fichiers CSV bruts gsod/2024/ et y écrit du Parquet partitionnégsod-monthly/, par le biais du point d'accès.

  1. Enregistrez la configuration du pilote de tâche sousjob-driver.json. Remplacez les espaces réservés.

    { "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } }
  2. Enregistrez la configuration de surveillance suivante sousjob-config.json. Il envoie les journaux du pilote et de l'exécuteur à CloudWatch Logs.

    { "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } }
  3. Soumettez le job.

    $ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.json

    Notez le jobRunId dans la réponse.

  4. Interrogez le statut du poste. Le travail passe de SCHEDULED RUNNING àSUCCESS.

    $ aws emr-serverless get-job-run \ --application-id application-id \ --job-run-id job-run-id \ --query 'jobRun.state'
Note

Si la tâche échoue, vérifiez que le pilote se connecte dans CloudWatch les journaux du groupe de journaux/aws/emr-serverless/fsxn-emr-app. Amazon EMR Serverless écrit un flux de journal par tâche exécutée.

Étape 6 : Inspecter la sortie

Vérifiez que le travail écrit une partition Parquet par mois et que le résultat est lisible.

  1. Répertoriez les partitions de sortie.

    $ aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursive

    Vous devriez voir un fichier Parquet par month=YYYY-MM/ partition plus un _SUCCESS marqueur à la racine.

  2. Lisez une partition localement pour vérifier le contenu.

    $ aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"

    Le schéma de sortie inclut stationstation_name,lat,,lon,avg_temp_f,min_temp_f,max_temp_f,total_prcp_in,precip_event_days, etobservation_days.

Extension du motif

  • Interrogez le résultat avec Spark SQL. Enregistrez la sortie partitionnée sous forme de table auprès du AWS Glue Data Catalog et interrogez-la avec Spark SQL, Athena ou tout autre outil qui AWS Glue lit les tables de catalogue. Pour obtenir des instructions sur l'enregistrement d'un jeu de données basé sur des points d'accès, consultez. Interrogez des fichiers avec SQL à l'aide d'Amazon Athena

  • Utilisez Iceberg pour les écritures ACID. Pour les charges de travail qui mettent à jour ou fusionnent des données, configurez la tâche pour écrire dans une table Iceberg sur le point d'accès plutôt que dans un simple Parquet. Amazon EMR Serverless inclut le moteur d'exécution Iceberg par défaut sur les étiquettes des versions récentes.

  • Exécutez de manière interactive avec Amazon EMR Studio. Joignez un bloc-notes Jupyter à l'application Amazon EMR Serverless pour explorer les données de manière interactive. Consultez la section Charges de travail interactives avec Amazon EMR Serverless dans le guide de l'utilisateur Amazon EMR Serverless.

  • Planifiez le travail. Utilisez Amazon EventBridge Scheduler ou AWS Step Functions pour exécuter la tâche selon un calendrier récurrent (par exemple, lorsqu'un nouveau jour de données arrive sur le volume).

Résolution des problèmes

Le job échoue AccessDenied sur le point d'accès

Vérifiez que la politique relative aux rôles professionnels accorde s3:GetObject et s3:ListBucket sur l'ARN du point d'accès (et non sur un compartiment) et que le point d'accès provient d'un réseau Internet afin que le service Amazon EMR Serverless puisse y accéder.

Job réussi mais la sortie est vide

Vérifiez le chemin d'entrée. Amazon S3 ListObjectsV2 traite les préfixes de manière littérale, de sorte que s3://alias/gsod/2024 (pas de barre oblique de fin) et s3://alias/gsod/2024/ (barre oblique de fin) peuvent se comporter différemment. Incluez la barre oblique finale lorsque vous pointez sur un répertoire de fichiers.

Les journaux des pilotes ne sont pas dans CloudWatch les journaux

La configuration de surveillance doit être transmise --configuration-overrides sur l'application start-job-run et non sur l'application. Chaque tâche exécutée écrit dans son propre flux de journaux sous le groupe de journaux configuré.

Nettoyage

Arrêtez et supprimez l'application, supprimez le rôle IAM et supprimez toutes les données téléchargées dont vous n'avez plus besoin.

$ aws emr-serverless stop-application --application-id application-id aws emr-serverless delete-application --application-id application-id aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive