View a markdown version of this page

Amazon EMR Serverless를 사용하여 Spark 작업 실행 - FSx for ONTAP

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon EMR Serverless를 사용하여 Spark 작업 실행

로그 처리, 기능 엔지니어링, 복잡한 ETL 또는 과학 분석을 위해 Spark 워크로드를 실행하는 데이터 엔지니어링 팀은 온프레미스 수집 파이프라인, NFS 또는 SMB 데이터 이동기 또는 볼륨을 직접 탑재하는 애플리케이션에서 작성한 FSx for ONTAP 볼륨에 소스 데이터가 있는 경우가 많습니다.

볼륨에 연결된 Amazon S3 액세스 포인트를 사용하여 Amazon EMR Serverless는 액세스 포인트를 통해 데이터를 읽고 이에 대해 Spark 작업을 실행한 다음 결과를 동일한 볼륨에 다시 씁니다. Amazon EMR Serverless는 클러스터 수명 주기를 자동으로 처리합니다. 작업을 제출하고 실행되는 초에 대한 비용을 지불합니다.

이 패턴은 경량 옵션인 SQL 및 관리형 ETL용 Amazon Athena가 적합하지 않은 전체 Spark 런타임(사용자 지정 라이브러리, 반복 알고리즘, 장기 실행 변환 또는 Amazon EMR Studio를 통한 대화형 노트북)이 필요한 워크로드 AWS Glue 에 적합합니다. 이러한 대안에 대한 자세한 내용은 Amazon Athena를 사용하여 SQL로 파일 쿼리 및 단원을 참조하십시오를 사용하여 ETL 파이프라인 구축 AWS Glue.

이 자습서에서는 FSx for ONTAP 볼륨에 준비된 1년간의 NOAA Global Surface Summary of the Day(GSOD) 관측치를 집계하는 기상 팀을 시뮬레이션합니다. 원시 CSV 파일을 읽고, 월별 스테이션당 집계(평균 온도, 총 강수 및 강수 이벤트가 있는 일수)를 계산하고, 결과를 월별로 분할된 Parquet으로 기록하는 PySpark 작업을 제출합니다.이 모든 작업은 액세스 포인트를 통해 이루어집니다.

참고

이 자습서를 완료하는 데 약 30~40분이 걸립니다. AWS 서비스 사용한 에는 생성한 리소스에 대한 요금이 발생합니다. 정리 섹션을 포함하여 모든 단계를 즉시 완료하면 미국 동부(버지니아 북부)에서 예상 비용이 1 USD 미만입니다 AWS 리전. 이 견적에는 FSx for ONTAP 볼륨 자체에 대한 지속적인 요금은 포함되지 않습니다.

사전 조건

  • Amazon S3 액세스 포인트가 연결된 FSx for ONTAP 볼륨입니다. Amazon EMR Serverless 서비스가 액세스할 수 있도록 액세스 포인트에 인터넷 네트워크 오리진이 있어야 합니다. 지침은 액세스 포인트 생성 섹션을 참조하세요.

  • AWS CLI 버전 2는 IAM 역할 및 Amazon EMR Serverless 리소스를 생성할 수 있는 자격 증명으로 설치 및 구성됩니다.

1단계: 샘플 데이터 세트를 액세스 포인트에 업로드

NOAA GSOD 데이터 세트는 일별 기상 관측치의 공개 데이터 세트로, 매년 스테이션당 하나의 CSV 파일입니다. 이 자습서에서는 퍼블릭 noaa-gsod-pds Amazon S3 버킷에서 100-스테이션 하위 집합을 다운로드하여 액세스 포인트에 업로드합니다.

  1. 2024년 처음 100개의 스테이션 파일을 다운로드합니다.

    $ mkdir -p ~/gsod && cd ~/gsod aws s3 ls s3://noaa-gsod-pds/2024/ --no-sign-request | head -100 | awk '{print $NF}' > files.txt while read f; do aws s3 cp "s3://noaa-gsod-pds/2024/$f" "$f" --no-sign-request --only-show-errors done < files.txt ls | wc -l

    이 명령은 총 약 7~8MB에 해당하는 약 100개의 CSV 파일을 다운로드합니다.

  2. gsod/2024/ 접두사 아래의 액세스 포인트에 파일을 업로드합니다. access-point-alias를 액세스 포인트 별칭으로 바꿉니다.

    $ aws s3 cp ~/gsod/ "s3://access-point-alias/gsod/2024/" --recursive --exclude "files.txt" --only-show-errors

2단계: PySpark 작업 작성

작업은 입력 접두사 아래의 모든 CSV 파일을 읽고, 누락된 데이터를 나타내는 감시 값을 필터링하고, FRSHTT 비트 필드(안개, 비, 눈, 손톱, 천둥, 토네이도)를 구문 분석하여 강수 이벤트 일수, 당 집계 수를 계산하고(station, month), 분할된 Parquet을 액세스 포인트에 다시 씁니다.

  1. 다음 스크립트를 라는 파일에 저장합니다gsod_monthly.py.

    # gsod_monthly.py import sys from pyspark.sql import SparkSession from pyspark.sql import functions as F INPUT_PATH, OUTPUT_PATH = sys.argv[1], sys.argv[2] # GSOD sentinels for missing data TEMP_SENTINEL = 9999.9 PRCP_SENTINEL = 99.99 spark = SparkSession.builder.appName("gsod-monthly-summary").getOrCreate() raw = spark.read.option("header", True).csv(INPUT_PATH) cleaned = (raw .select( F.col("STATION").alias("station"), F.col("NAME").alias("station_name"), F.col("LATITUDE").cast("double").alias("lat"), F.col("LONGITUDE").cast("double").alias("lon"), F.to_date("DATE", "yyyy-MM-dd").alias("date"), F.col("TEMP").cast("double").alias("temp_f"), F.col("PRCP").cast("double").alias("prcp_in"), F.col("FRSHTT").alias("frshtt"), ) .filter(F.col("temp_f") != TEMP_SENTINEL) .withColumn("month", F.date_format("date", "yyyy-MM")) .withColumn( "prcp_in", F.when(F.col("prcp_in") == PRCP_SENTINEL, None).otherwise(F.col("prcp_in")), ) # FRSHTT is a 6-char bitfield: Fog, Rain, Snow, Hail, Thunder, Tornado. # Check only positions 2-4 (Rain, Snow, Hail) for precipitation events. .withColumn( "had_precip_event", F.when(F.col("frshtt").substr(2, 3).rlike("1"), 1).otherwise(0), ) ) monthly = (cleaned .groupBy("station", "station_name", "lat", "lon", "month") .agg( F.avg("temp_f").alias("avg_temp_f"), F.min("temp_f").alias("min_temp_f"), F.max("temp_f").alias("max_temp_f"), F.sum("prcp_in").alias("total_prcp_in"), F.sum("had_precip_event").alias("precip_event_days"), F.count("*").alias("observation_days"), ) ) (monthly.write .mode("overwrite") .partitionBy("month") .parquet(OUTPUT_PATH)) spark.stop()
  2. scripts/ 접두사 아래의 액세스 포인트에 스크립트를 업로드합니다.

    $ aws s3 cp gsod_monthly.py "s3://access-point-alias/scripts/gsod_monthly.py"

3단계: Amazon EMR Serverless 작업 역할 생성

Amazon EMR Serverless는 작업을 실행할 때 IAM 실행 역할을 수임합니다. 역할에는 액세스 포인트를 읽고 쓸 수 있는 권한과 CloudWatch Logs에 로그를 쓸 수 있는 권한이 필요합니다. 설정 단계에 대한 다음 섹션을 확장합니다.

  1. 다음 신뢰 정책을 로 저장합니다emr-trust-policy.json. 이를 통해 Amazon EMR Serverless가 역할을 수임할 수 있습니다.

    { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": {"Service": "emr-serverless.amazonaws.com"}, "Action": "sts:AssumeRole" }] }
  2. 다음 권한 정책을 로 저장합니다emr-permissions.json. region, account-idaccess-point-name을 값으로 바꿉니다.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "logs:DescribeLogGroups", "logs:DescribeLogStreams" ], "Resource": "*" }, { "Sid": "APRead", "Effect": "Allow", "Action": ["s3:GetObject", "s3:ListBucket"], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "APWrite", "Effect": "Allow", "Action": [ "s3:PutObject", "s3:DeleteObject", "s3:AbortMultipartUpload", "s3:ListMultipartUploadParts" ], "Resource": "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" } ] }
  3. 역할을 생성하고 정책을 연결합니다.

    $ aws iam create-role --role-name fsxn-emr-job-role \ --assume-role-policy-document file://emr-trust-policy.json aws iam put-role-policy --role-name fsxn-emr-job-role \ --policy-name emr-access --policy-document file://emr-permissions.json

4단계: Amazon EMR Serverless 애플리케이션 생성 및 시작

Amazon EMR Serverless 애플리케이션은 특정 릴리스 레이블 및 엔진(Spark 또는 Hive)을 위한 수명이 긴 컴퓨팅 환경입니다. 하나 이상의 작업을 제출합니다. 애플리케이션은 작업 수요에 따라 컴퓨팅을 자동으로 확장 및 축소하고 실행 중인 작업이 없을 때 유휴 상태로 전환합니다.

  1. 최신 Amazon EMR 릴리스를 사용하여 Spark 애플리케이션을 생성합니다.

    $ aws emr-serverless create-application \ --name fsxn-emr-app --type SPARK --release-label emr-7.0.0

    응답의 applicationId에 주의하세요.

  2. 애플리케이션을 시작합니다. 시작하면 콜드 스타트 지연 없이 첫 번째 작업이 실행되도록 소규모 작업자 풀이 사전 워밍됩니다.

    $ aws emr-serverless start-application --application-id application-id

    상태가가 될 때까지 기다립니다STARTED.

    $ aws emr-serverless get-application --application-id application-id \ --query 'application.state'

5단계: Spark 작업 제출

애플리케이션 ID와 실행 역할을 사용하여 작업을 제출합니다. 작업은에서 원시 CSVs 읽고 액세스 포인트를 gsod-monthly/통해 분할된 Parquet을에 gsod/2024/ 씁니다.

  1. 작업 드라이버 구성을 로 저장합니다job-driver.json. 자리 표시자를 바꿉니다.

    { "sparkSubmit": { "entryPoint": "s3://access-point-alias/scripts/gsod_monthly.py", "entryPointArguments": [ "s3://access-point-alias/gsod/2024/", "s3://access-point-alias/gsod-monthly/" ], "sparkSubmitParameters": "--conf spark.executor.cores=2 --conf spark.executor.memory=4g --conf spark.driver.cores=2 --conf spark.driver.memory=4g --conf spark.executor.instances=2" } }
  2. 다음 모니터링 구성을 로 저장합니다job-config.json. 드라이버 및 실행기 로그를 CloudWatch Logs로 전송합니다.

    { "monitoringConfiguration": { "cloudWatchLoggingConfiguration": { "enabled": true, "logGroupName": "/aws/emr-serverless/fsxn-emr-app" } } }
  3. 작업을 제출합니다.

    $ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn arn:aws:iam::account-id:role/fsxn-emr-job-role \ --name gsod-monthly \ --job-driver file://job-driver.json \ --configuration-overrides file://job-config.json

    응답의 jobRunId에 주의하세요.

  4. 작업 상태를 폴링합니다. 작업은에서 SCHEDULEDRUNNING로 전환됩니다SUCCESS.

    $ aws emr-serverless get-job-run \ --application-id application-id \ --job-run-id job-run-id \ --query 'jobRun.state'
참고

작업이 실패하면 로그 그룹 아래의 CloudWatch Logs에서 드라이버 로그를 확인합니다/aws/emr-serverless/fsxn-emr-app. Amazon EMR Serverless는 작업 실행당 하나의 로그 스트림을 씁니다.

6단계: 출력 검사

작업이 매월 하나의 Parquet 파티션을 작성하고 출력을 읽을 수 있는지 확인합니다.

  1. 출력 파티션을 나열합니다.

    $ aws s3 ls "s3://access-point-alias/gsod-monthly/" --recursive

    month=YYYY-MM/ 파티션당 하나의 Parquet 파일과 루트에 _SUCCESS 마커가 표시되어야 합니다.

  2. 로컬에서 파티션을 읽고 콘텐츠를 확인합니다.

    $ aws s3 cp "s3://access-point-alias/gsod-monthly/month=2024-06/" . \ --recursive --exclude "_SUCCESS" python3 -c "import pyarrow.parquet as pq; \ t = pq.read_table(next(__import__('glob').iglob('*.parquet'))); \ print(t.schema); print(t.to_pandas().head())"

    출력 스키마에는 station, , station_name, lat, lon, avg_temp_f, max_temp_f, min_temp_ftotal_prcp_in, 및 precip_event_days가 포함됩니다observation_days.

패턴 확장

  • Spark SQL을 사용하여 출력을 쿼리합니다. 분할된 출력을에 테이블로 등록 AWS Glue Data Catalog 하고 Spark SQL, Athena 또는 AWS Glue 카탈로그 테이블을 읽는 다른 도구를 사용하여 쿼리합니다. 액세스 포인트 지원 데이터 세트를 등록하는 방법에 대한 지침은 섹션을 참조하세요Amazon Athena를 사용하여 SQL로 파일 쿼리.

  • ACID 쓰기에 Iceberg를 사용합니다. 데이터를 업데이트하거나 병합하는 워크로드의 경우 일반 Parquet 대신 액세스 포인트의 Iceberg 테이블에 쓰도록 작업을 구성합니다. Amazon EMR Serverless는 최신 릴리스 레이블에 기본적으로 Iceberg 런타임을 포함합니다.

  • Amazon EMR Studio를 사용하여 대화형으로를 실행합니다. Jupyter 노트북을 Amazon EMR Serverless 애플리케이션에 연결하여 데이터를 대화형으로 탐색합니다. Amazon EMR Serverless 사용 설명서의 Amazon EMR Serverless를 사용한 대화형 워크로드를 참조하세요.

  • 작업을 예약합니다. Amazon EventBridge 스케줄러 또는 AWS Step Functions를 사용하여 반복 일정(예: 새 날짜의 데이터가 볼륨에 도착하는 경우)으로 작업을 실행합니다.

문제 해결

액세스 포인트AccessDenied에서 작업 실패

Amazon EMR Serverless 서비스가 액세스할 수 있도록 작업 역할 정책이 액세스 포인트 ARN(버킷 아님)s3:ListBuckets3:GetObject 및를 부여하고 액세스 포인트에 인터넷 네트워크 오리진이 있는지 확인합니다.

작업이 성공했지만 출력이 비어 있음

입력 경로를 확인합니다. Amazon S3는 접두사를 문자 그대로 ListObjectsV2 처리하므로 s3://alias/gsod/2024 (후행 슬래시 없음) 및 s3://alias/gsod/2024/ (후행 슬래시)가 다르게 작동할 수 있습니다. 파일 디렉터리를 가리킬 때 후행 슬래시를 포함합니다.

드라이버 로그가 CloudWatch Logs에 없음

모니터링 구성은 애플리케이션이 start-job-run아닌 --configuration-overrides에서 전달되어야 합니다. 각 작업 실행은 구성된 로그 그룹 아래의 자체 로그 스트림에 씁니다.

정리

애플리케이션을 중지 및 삭제하고, IAM 역할을 제거하고, 더 이상 필요하지 않은 업로드된 데이터를 삭제합니다.

$ aws emr-serverless stop-application --application-id application-id aws emr-serverless delete-application --application-id application-id aws iam delete-role-policy --role-name fsxn-emr-job-role --policy-name emr-access aws iam delete-role --role-name fsxn-emr-job-role aws s3 rm "s3://access-point-alias/scripts/gsod_monthly.py" aws s3 rm "s3://access-point-alias/gsod/" --recursive aws s3 rm "s3://access-point-alias/gsod-monthly/" --recursive