As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Execute trabalhos do Spark usando o Amazon EMR Serverless
As equipes de engenharia de dados que executam cargas de trabalho do Spark — para processamento de registros, engenharia de recursos, ETL complexo ou análise científica — geralmente têm dados de origem em um volume FSx for ONTAP escrito por pipelines de ingestão locais, movimentadores de dados NFS ou SMB ou aplicativos que montam o volume diretamente.
Com um ponto de acesso Amazon S3 conectado ao volume, o Amazon EMR Serverless lê os dados por meio do ponto de acesso, executa o trabalho do Spark nele e grava os resultados no mesmo volume. O Amazon EMR Serverless gerencia automaticamente o ciclo de vida do cluster — você envia um trabalho e paga pelos segundos em que ele é executado.
Esse padrão é adequado para cargas de trabalho que precisam de um tempo de execução completo do Spark (bibliotecas personalizadas, algoritmos iterativos, transformações de longa execução ou cadernos interativos via Amazon EMR Studio), onde as opções mais leves — Amazon Athena para SQL e ETL gerenciado — não são adequadas. AWS Glue Para obter informações sobre essas alternativas, consulte Consulte arquivos com SQL usando o Amazon Athena Crie pipelines de ETL usando AWS Glue e.
Neste tutorial, você simula uma equipe de meteorologia agregando um ano de observações do Resumo Global da Superfície do Dia (GSOD) da NOAA realizadas em um volume FSx for ONTAP. Você envia um PySpark trabalho que lê os arquivos CSV brutos, calcula agregados mensais por estação (temperatura média, precipitação total e contagem de dias com eventos de precipitação) e grava os resultados como Parquet particionado por mês — tudo em todo o ponto de acesso.
nota
Este tutorial leva aproximadamente 30 a 40 minutos para ser concluído. Os Serviços da AWS usados incorrem em cobranças pelos recursos que você cria. Se você concluir todas as etapas, incluindo a seção Limpeza imediatamente, o custo esperado é inferior a $1 no Leste dos EUA (Norte da Virgínia) Região da AWS. Essa estimativa não inclui cobranças contínuas do FSx para o volume do ONTAP em si.
Pré-requisitos
Um volume FSx for ONTAP com um ponto de acesso Amazon S3 conectado. O ponto de acesso deve ter uma origem de rede na Internet para que o serviço Amazon EMR Serverless possa acessá-lo. Para instruções, consulte Criar um ponto de acesso.
AWS CLI versão 2 instalada e configurada com credenciais que podem criar funções do IAM e recursos sem servidor do Amazon EMR.
Etapa 1: Carregar o conjunto de dados de amostra para o ponto de acesso
O conjunto de dados GSOD da NOAA é um conjunto de dados público de observações meteorológicas diárias, um arquivo CSV por estação por ano. Para este tutorial, você baixa um subconjunto de 100 estações do bucket público do Amazon noaa-gsod-pds S3 e o carrega em seu ponto de acesso.
-
Baixe os primeiros 100 arquivos da estação para 2024.
$mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -lO comando baixa aproximadamente 100 arquivos CSV, totalizando cerca de 7 a 8 MB.
-
Faça o upload dos arquivos para o ponto de acesso com o
gsod/2024/prefixo.access-point-aliasSubstitua pelo alias do seu ponto de acesso.$aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors
Etapa 2: escrever o PySpark trabalho
O trabalho lê todos os arquivos CSV com o prefixo de entrada, filtra valores sentinela que representam dados ausentes, analisa o FRSHTT campo de bits (neblina, chuva, neve, granizo, trovão, tornado) para contar os dias do evento de precipitação, agrega por e grava o Parquet particionado de volta no ponto de acesso. (station, month)
-
Salve o script a seguir em um arquivo chamado
gsod_monthly.py.# gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop() -
Faça o upload do script para o ponto de acesso com o
scripts/prefixo.$aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"
Etapa 3: Criar a função de trabalho sem servidor do Amazon EMR
O Amazon EMR Serverless assume uma função de execução do IAM quando executa seu trabalho. A função precisa de permissões para ler e gravar o ponto de acesso e gravar registros em CloudWatch Logs. Expanda a seção a seguir para ver as etapas de configuração.
-
Salve a seguinte política de confiança como
emr-trust-policy.json. Ele permite que o Amazon EMR Serverless assuma a função.{ "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] } -
Salve a seguinte política de permissões como
emr-permissions.json. Substituaregionaccount-id,, eaccess-point-namepor seus valores.{ "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] } -
Crie a função e anexe a política.
$aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json
Etapa 4: criar e iniciar o aplicativo Amazon EMR Serverless
Um aplicativo Amazon EMR Serverless é um ambiente computacional de longa duração para uma etiqueta e um mecanismo de lançamento específicos (Spark ou Hive). Você envia um ou mais trabalhos para ele. Os aplicativos aumentam e diminuem a computação automaticamente com base na demanda do trabalho e ficam ociosos quando não há trabalhos em execução.
-
Crie um aplicativo Spark usando uma versão recente do Amazon EMR.
$aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0Observe o
applicationIdna resposta. -
Inicie o aplicativo. O início pré-aquece um pequeno grupo de trabalhadores para que o primeiro trabalho seja executado sem atrasos na inicialização a frio.
$aws emr-serverless start-application --application-idapplication-idEspere até que o estado se torne
STARTED.$aws emr-serverless get-application --application-idapplication-id\ --query 'application.state'
Etapa 5: enviar a tarefa do Spark
Envie o trabalho usando o ID do aplicativo e a função de execução. O trabalho lê os CSVs brutos gsod/2024/ e grava o Parquet particionado emgsod-monthly/, ambos por meio do ponto de acesso.
-
Salve a configuração do driver de trabalho como
job-driver.json. Substitua os marcadores de posição.{ "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } } -
Salve a seguinte configuração de monitoramento como
job-config.json. Ele envia os registros do driver e do executor para o CloudWatch Logs.{ "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } } -
Envie o trabalho.
$aws emr-serverless start-job-run \ --application-idapplication-id\ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.jsonObserve o
jobRunIdna resposta. -
Pesquise o status do trabalho. O trabalho muda de
SCHEDULEDparaRUNNINGparaSUCCESS.$aws emr-serverless get-job-run \ --application-idapplication-id\ --job-run-idjob-run-id\ --query 'jobRun.state'
nota
Se o trabalho falhar, verifique os registros do driver em CloudWatch Logs, no grupo de registros/aws/emr-serverless/fsxn-emr-app. O Amazon EMR Serverless grava um stream de log por execução de trabalho.
Etapa 6: inspecionar a saída
Verifique se o trabalho gravou uma partição Parquet por mês e se a saída está legível.
-
Liste as partições de saída.
$aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursiveVocê deve ver um arquivo Parquet por
month=YYYY-MM/partição mais um_SUCCESSmarcador na raiz. -
Leia uma partição localmente para verificar o conteúdo.
$aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"O esquema de saída inclui
station,station_namelat,lon,avg_temp_f,min_temp_f,max_temp_f,total_prcp_in,precip_event_days, e.observation_days
Estendendo o padrão
Consulte a saída com o Spark SQL. Registre a saída particionada como uma tabela com o AWS Glue Data Catalog e consulte-a com Spark SQL, Athena ou qualquer outra ferramenta que leia tabelas de catálogo. AWS Glue Para obter instruções sobre como registrar um conjunto de dados baseado em pontos de acesso, consulte. Consulte arquivos com SQL usando o Amazon Athena
Use o Iceberg para gravações ACID. Para cargas de trabalho que atualizam ou mesclam dados, configure o trabalho para gravar em uma tabela Iceberg no ponto de acesso em vez do Parquet simples. O Amazon EMR Serverless inclui o tempo de execução do Iceberg por padrão nas gravadoras de lançamentos recentes.
Execute de forma interativa com o Amazon EMR Studio. Conecte um notebook Jupyter ao aplicativo Amazon EMR Serverless para explorar os dados de forma interativa. Consulte Cargas de trabalho interativas com o Amazon EMR Serverless no Guia do usuário do Amazon EMR Serverless.
Agende o trabalho. Use o Amazon EventBridge Scheduler ou o AWS Step Functions para executar o trabalho em uma programação recorrente (por exemplo, quando um novo dia de dados chega ao volume).
Solução de problemas
- Job falha
AccessDeniedno ponto de acesso Verifique se a política de função de trabalho concede
s3:GetObjectes3:ListBucketestá no ARN do ponto de acesso (não em um bucket) e se o ponto de acesso tem uma origem de rede na Internet para que o serviço Amazon EMR Serverless possa acessá-lo.- Job é bem-sucedido, mas a saída está vazia
Verifique o caminho de entrada. O Amazon S3
ListObjectsV2trata os prefixos literalmente, entãos3://alias/gsod/2024(sem barra final) es3://alias/gsod/2024/(barra final) podem se comportar de forma diferente. Inclua a barra final ao apontar para um diretório de arquivos.- Os registros do driver não estão em CloudWatch Registros
A configuração de monitoramento deve ser
--configuration-overridestransmitidastart-job-run, não no aplicativo. Cada execução de trabalho grava em seu próprio fluxo de registros no grupo de registros configurado.
Limpeza
Pare e exclua o aplicativo, remova a função do IAM e exclua todos os dados enviados que você não precisa mais.
$aws emr-serverless stop-application --application-idapplication-idaws emr-serverless delete-application --application-idapplication-idaws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive