Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Exécutez des tâches Spark à l'aide d'Amazon EMR Serverless
Les équipes d'ingénierie des données qui exécutent les charges de travail Spark (pour le traitement des journaux, l'ingénierie des fonctionnalités, l'ETL complexe ou l'analyse scientifique) disposent souvent de données sources sur un volume FSx for ONTAP écrites par des pipelines d'ingestion locaux, des data movers NFS ou SMB, ou des applications qui montent le volume directement.
Lorsqu'un point d'accès Amazon S3 est attaché au volume, Amazon EMR Serverless lit les données via le point d'accès, exécute la tâche Spark sur celui-ci et réécrit les résultats sur le même volume. Amazon EMR Serverless gère automatiquement le cycle de vie du cluster : vous soumettez une tâche et vous payez pour les secondes pendant lesquelles elle est exécutée.
Ce modèle convient aux charges de travail qui nécessitent un environnement d'exécution Spark complet (bibliothèques personnalisées, algorithmes itératifs, transformations de longue durée ou blocs-notes interactifs via Amazon EMR Studio) pour lesquelles les options plus légères (Amazon Athena pour AWS Glue SQL et pour l'ETL géré) ne conviennent pas. Pour plus d'informations sur ces alternatives, voir Interrogez des fichiers avec SQL à l'aide d'Amazon Athena etCréez des pipelines ETL en utilisant AWS Glue.
Dans ce didacticiel, vous simulez une équipe de météorologie agrégeant une année d'observations du Global Surface Summary of the Day (GSOD) de la NOAA effectuées sur un volume FSx pour ONTAP. Vous soumettez une PySpark tâche qui lit les fichiers CSV bruts, calcule les agrégats mensuels par station (température moyenne, précipitations totales et nombre de jours de précipitations) et enregistre les résultats sous forme de parquet partitionné par mois, le tout via le point d'accès.
Note
Ce didacticiel prend environ 30 à 40 minutes. Les ressources que vous Services AWS utilisez sont facturées pour les ressources que vous créez. Si vous effectuez rapidement toutes les étapes, y compris la section Nettoyage, le coût prévu est inférieur à 1$ dans l'est des États-Unis (Virginie du Nord) Région AWS. Cette estimation n'inclut pas les frais permanents pour le volume FSx for ONTAP lui-même.
Conditions préalables
Un volume FSx for ONTAP associé à un point d'accès Amazon S3. Le point d'accès doit avoir une origine de réseau Internet pour que le service Amazon EMR Serverless puisse y accéder. Pour obtenir des instructions, veuillez consulter Création d’un point d’accès.
AWS CLI version 2 installée et configurée avec des informations d'identification permettant de créer des rôles IAM et des ressources Amazon EMR Serverless.
Étape 1 : télécharger l'exemple de jeu de données sur le point d'accès
Le jeu de données GSOD de la NOAA est un ensemble de données public d'observations météorologiques quotidiennes, un fichier CSV par station et par an. Dans le cadre de ce didacticiel, vous devez télécharger un sous-ensemble de 100 stations depuis le compartiment public noaa-gsod-pds Amazon S3 et le charger sur votre point d'accès.
-
Téléchargez les 100 premiers fichiers de stations pour 2024.
$mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -lLa commande télécharge environ 100 fichiers CSV d'une taille totale d'environ 7 à 8 Mo.
-
Téléchargez les fichiers sur le point d'accès sous le
gsod/2024/préfixe.access-point-aliasRemplacez-le par l'alias de votre point d'accès.$aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors
Étape 2 : Rédiger le PySpark travail
La tâche lit tous les fichiers CSV sous le préfixe d'entrée, filtre les valeurs sentinelles qui représentent les données manquantes, analyse le FRSHTT champ de bits (brouillard, pluie, neige, grêle, tonnerre, tornade) pour compter les jours de précipitations, agrège par et renvoie le parquet partitionné au point d'(station, month)accès.
-
Enregistrez le script suivant dans un fichier nommé
gsod_monthly.py.# gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop() -
Téléchargez le script sur le point d'accès sous le
scripts/préfixe.$aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"
Étape 3 : Création du rôle de travail Amazon EMR Serverless
Amazon EMR Serverless assume un rôle d'exécution IAM lorsqu'il exécute votre tâche. Le rôle a besoin d'autorisations pour lire et écrire le point d'accès et pour écrire des CloudWatch journaux dans Logs. Développez la section suivante pour les étapes de configuration.
-
Enregistrez la politique de confiance suivante sous
emr-trust-policy.json. Cela permet à Amazon EMR Serverless d'assumer le rôle.{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] } -
Enregistrez la politique d'autorisation suivante sous
emr-permissions.json. Remplacezregionaccount-id, etaccess-point-namepar vos valeurs.{ "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] } -
Créez le rôle et associez la politique.
$aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json
Étape 4 : créer et démarrer l'application Amazon EMR Serverless
Une application Amazon EMR Serverless est un environnement informatique de longue durée destiné à une étiquette de version et à un moteur spécifiques (Spark ou Hive). Vous y soumettez une ou plusieurs offres d'emploi. Les applications augmentent ou diminuent automatiquement les capacités de calcul en fonction de la demande des tâches et s'interrompent lorsqu'aucune tâche n'est en cours d'exécution.
-
Créez une application Spark à l'aide d'une version récente d'Amazon EMR.
$aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0Notez le
applicationIddans la réponse. -
Lancez l'application. Le démarrage permet de préchauffer un petit groupe de travailleurs afin que la première tâche soit exécutée sans délai de démarrage à froid.
$aws emr-serverless start-application --application-idapplication-idAttendez que l'État le devienne
STARTED.$aws emr-serverless get-application --application-idapplication-id\ --query 'application.state'
Étape 5 : Soumettre le job Spark
Soumettez la tâche en utilisant l'ID de l'application et le rôle d'exécution. La tâche lit les fichiers CSV bruts gsod/2024/ et y écrit du Parquet partitionnégsod-monthly/, par le biais du point d'accès.
-
Enregistrez la configuration du pilote de tâche sous
job-driver.json. Remplacez les espaces réservés.{ "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } } -
Enregistrez la configuration de surveillance suivante sous
job-config.json. Il envoie les journaux du pilote et de l'exécuteur à CloudWatch Logs.{ "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } } -
Soumettez le job.
$aws emr-serverless start-job-run \ --application-idapplication-id\ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.jsonNotez le
jobRunIddans la réponse. -
Interrogez le statut du poste. Le travail passe de
SCHEDULEDRUNNINGàSUCCESS.$aws emr-serverless get-job-run \ --application-idapplication-id\ --job-run-idjob-run-id\ --query 'jobRun.state'
Note
Si la tâche échoue, vérifiez que le pilote se connecte dans CloudWatch les journaux du groupe de journaux/aws/emr-serverless/fsxn-emr-app. Amazon EMR Serverless écrit un flux de journal par tâche exécutée.
Étape 6 : Inspecter la sortie
Vérifiez que le travail écrit une partition Parquet par mois et que le résultat est lisible.
-
Répertoriez les partitions de sortie.
$aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursiveVous devriez voir un fichier Parquet par
month=YYYY-MM/partition plus un_SUCCESSmarqueur à la racine. -
Lisez une partition localement pour vérifier le contenu.
$aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"Le schéma de sortie inclut
stationstation_name,lat,,lon,avg_temp_f,min_temp_f,max_temp_f,total_prcp_in,precip_event_days, etobservation_days.
Extension du motif
Interrogez le résultat avec Spark SQL. Enregistrez la sortie partitionnée sous forme de table auprès du AWS Glue Data Catalog et interrogez-la avec Spark SQL, Athena ou tout autre outil qui AWS Glue lit les tables de catalogue. Pour obtenir des instructions sur l'enregistrement d'un jeu de données basé sur des points d'accès, consultez. Interrogez des fichiers avec SQL à l'aide d'Amazon Athena
Utilisez Iceberg pour les écritures ACID. Pour les charges de travail qui mettent à jour ou fusionnent des données, configurez la tâche pour écrire dans une table Iceberg sur le point d'accès plutôt que dans un simple Parquet. Amazon EMR Serverless inclut le moteur d'exécution Iceberg par défaut sur les étiquettes des versions récentes.
Exécutez de manière interactive avec Amazon EMR Studio. Joignez un bloc-notes Jupyter à l'application Amazon EMR Serverless pour explorer les données de manière interactive. Consultez la section Charges de travail interactives avec Amazon EMR Serverless dans le guide de l'utilisateur Amazon EMR Serverless.
Planifiez le travail. Utilisez Amazon EventBridge Scheduler ou AWS Step Functions pour exécuter la tâche selon un calendrier récurrent (par exemple, lorsqu'un nouveau jour de données arrive sur le volume).
Résolution des problèmes
- Le job échoue
AccessDeniedsur le point d'accès Vérifiez que la politique relative aux rôles professionnels accorde
s3:GetObjectets3:ListBucketsur l'ARN du point d'accès (et non sur un compartiment) et que le point d'accès provient d'un réseau Internet afin que le service Amazon EMR Serverless puisse y accéder.- Job réussi mais la sortie est vide
Vérifiez le chemin d'entrée. Amazon S3
ListObjectsV2traite les préfixes de manière littérale, de sorte ques3://alias/gsod/2024(pas de barre oblique de fin) ets3://alias/gsod/2024/(barre oblique de fin) peuvent se comporter différemment. Incluez la barre oblique finale lorsque vous pointez sur un répertoire de fichiers.- Les journaux des pilotes ne sont pas dans CloudWatch les journaux
La configuration de surveillance doit être transmise
--configuration-overridessur l'applicationstart-job-runet non sur l'application. Chaque tâche exécutée écrit dans son propre flux de journaux sous le groupe de journaux configuré.
Nettoyage
Arrêtez et supprimez l'application, supprimez le rôle IAM et supprimez toutes les données téléchargées dont vous n'avez plus besoin.
$aws emr-serverless stop-application --application-idapplication-idaws emr-serverless delete-application --application-idapplication-idaws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive