기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
를 사용하여 ETL 파이프라인 구축 AWS Glue
데이터 엔지니어링 팀은 NFS 또는 SMB를 통한 애플리케이션, 일일 파일 삭제 또는 파트너 통합에서 FSx for ONTAP 볼륨에 대한 원시 데이터 랜딩을 수행하는 경우가 많습니다. 다운스트림 분석을 위해 데이터를 준비하려면 대규모로 데이터를 읽고, 변환하고, 보강하거나, 재분배하고, 큐레이션된 출력을 분석가와 애플리케이션에 제공해야 합니다.
FSx for ONTAP 볼륨에 연결된 Amazon S3 액세스 포인트를 사용하여는 소스 데이터를 AWS Glue 읽고 선택한 런타임(Apache Spark, Python 셸 또는 Ray)으로 변환한 다음 큐레이션된 출력을 동일한 볼륨에 다시 씁니다. 원시 데이터 세트와 큐레이션된 데이터 세트는 모두 FSx for ONTAP에 유지되므로 볼륨의 스냅샷, 백업 및 보존 정책이 파이프라인 전체에 균일하게 적용됩니다. FSx for ONTAP 볼륨은 NFS, SMB 및 Amazon S3 API를 통해 동시에 액세스할 수 있으므로 NFS 또는 SMB 클라이언트에서 원시 데이터를 생성하고 이러한 프로토콜에서 큐레이션된 출력을 사용할 수 있습니다.
이 자습서에서는 자Amazon Athena를 사용하여 SQL로 파일 쿼리습서의 NYC 택시 여행 데이터 세트를 사용합니다. AWS Glue ETL 작업은 원시 Parquet 데이터를 읽고, 계산된 열을 추가하고, 잘못된 레코드를 필터링하고, 변환된 출력을 시간대별로 분할된 볼륨에 다시 씁니다.
참고
이 자습서를 완료하는 데 약 25~35분이 걸립니다. AWS 서비스 사용한 에는 생성한 리소스에 대한 요금이 발생합니다. 정리 섹션을 포함하여 모든 단계를 즉시 완료하면 미국 동부(버지니아 북부)에서 예상 비용이 1 USD 미만입니다 AWS 리전. 이 견적에는 FSx for ONTAP 볼륨 자체에 대한 지속적인 요금은 포함되지 않습니다.
사전 조건
시작하기 전에 다음 항목이 준비되었는지 확인합니다.
자Amazon Athena를 사용하여 SQL로 파일 쿼리습서의 1~3단계를 완료합니다. 이 절차는 NYC Taxi 데이터 세트를 액세스 포인트에 업로드하고,에
fsxn_taxi_demo데이터베이스를 생성하고 AWS Glue Data Catalog,taxi_data테이블을 등록합니다. 이 자습서는 이러한 리소스를 기반으로 하므로이 자습서를 완료할 때까지 Athena 자습서의 정리 섹션을 실행하지 마십시오.CloudWatch Logs에 대한 쓰기 액세스, 액세스 포인트에 대한 읽기/쓰기 액세스,이 자습서에서 사용하는 AWS Glue Data Catalog 데이터베이스에 대한 액세스 권한을 부여하는 인라인 정책이 AWS Glue 있는에 대한 IAM 역할입니다. 다음 단계에서는이 자습서에 필요한 최소 권한을 가진 역할을 생성합니다.
다음 신뢰 정책을 로 저장합니다
glue-trust-policy.json. 이를 통해가 역할을 수임 AWS Glue 할 수 있습니다.{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "glue.amazonaws.com"}, "Action": "sts:AssumeRole" } ] }다음 권한 정책을 로 저장합니다
glue-permissions.json.,region및account-id를 값으로 바꿉니다.access-point-name{ "Version": "2012-10-17", "Statement": [ { "Sid": "Logs", "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:region:account-id:log-group:/aws-glue/*" }, { "Sid": "AccessPoint", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:PutObject", "s3:ListBucket", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:region:account-id:accesspoint/access-point-name", "arn:aws:s3:region:account-id:accesspoint/access-point-name/object/*" ] }, { "Sid": "DataCatalog", "Effect": "Allow", "Action": [ "glue:GetDatabase", "glue:GetTable", "glue:GetTables", "glue:CreateTable", "glue:UpdateTable", "glue:DeleteTable", "glue:BatchCreatePartition", "glue:BatchDeletePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions" ], "Resource": [ "arn:aws:glue:region:account-id:catalog", "arn:aws:glue:region:account-id:database/fsxn_taxi_demo", "arn:aws:glue:region:account-id:table/fsxn_taxi_demo/*" ] } ] }역할을 생성하고 인라인 정책을 연결합니다.
$aws iam create-role \ --role-namefsxn-tutorial-glue-etl-role\ --assume-role-policy-document file://glue-trust-policy.json aws iam put-role-policy \ --role-namefsxn-tutorial-glue-etl-role\ --policy-name glue-fsxn-access \ --policy-document file://glue-permissions.json
이 자습서에서는 액세스 포인트 자체에 ETL 스크립트를 저장하므로 별도의 Amazon S3 버킷이 필요하지 않습니다.
AccessPoint문은 스크립트와 택시 데이터를 모두 포함합니다.DataCatalog문은 4단계의 크롤러가 업데이트하는fsxn_taxi_demo데이터베이스에 대한 카탈로그 액세스의 범위를 AWS Glue 지정합니다.
중요
Amazon S3 액세스 포인트는 VPC가 아닌 관리형 인프라에서 Amazon S3에 액세스하는 인터넷 네트워크 origin. AWS Glue jobs를 사용해야 합니다.
1단계: ETL 스크립트 생성
다음 PySpark 스크립트는 FSx for ONTAP 볼륨에서 원시 택시 이동 데이터를 읽고, 변환을 적용하고, 결과를 볼륨에 다시 씁니다. 이 스크립트를 로 저장합니다taxi_transform.py.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from pyspark.sql.functions import col, hour, dayofweek, when, round as spark_round args = getResolvedOptions(sys.argv, ['JOB_NAME', 'AP_ALIAS']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ap_alias = args['AP_ALIAS'] # Read raw taxi data from FSx through the access point df = spark.read.parquet(f"s3://{ap_alias}/taxi-data/") # Transform: filter invalid records, add computed columns transformed = df \ .filter(col("trip_distance") > 0) \ .filter(col("total_amount") > 0) \ .filter(col("passenger_count") > 0) \ .withColumn("pickup_hour", hour(col("tpep_pickup_datetime"))) \ .withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime"))) \ .withColumn("cost_per_mile", spark_round(col("total_amount") / col("trip_distance"), 2)) \ .withColumn("time_of_day", when(hour(col("tpep_pickup_datetime")).between(6, 11), "morning") .when(hour(col("tpep_pickup_datetime")).between(12, 16), "afternoon") .when(hour(col("tpep_pickup_datetime")).between(17, 21), "evening") .otherwise("night") ) \ .select( "tpep_pickup_datetime", "tpep_dropoff_datetime", "passenger_count", "trip_distance", "PULocationID", "DOLocationID", "fare_amount", "tip_amount", "total_amount", "pickup_hour", "pickup_day_of_week", "cost_per_mile", "time_of_day" ) # Write transformed data back to FSx, partitioned by time of day transformed.write \ .mode("overwrite") \ .partitionBy("time_of_day") \ .parquet(f"s3://{ap_alias}/taxi-data-transformed/") job.commit()
스크립트는 다음과 같은 변환을 수행합니다.
주행 거리, 요금 또는 승객 수가 0이거나 음수인 레코드를 필터링합니다.
계산된 열 , , , (아침, 오후, 저녁 또는 밤)을 추가합니다.
pickup_hourpickup_day_of_weekcost_per_miletime_of_day분석과 관련된 열의 하위 집합을 선택합니다.
를 기준으로 출력을 분할하여 기간을 기준으로 필터링할 때 쿼리 성능을 개선합니다.
time_of_day
2단계: 스크립트 업로드 및 작업 생성
액세스 포인트를 통해 ETL 스크립트를 FSx for ONTAP 볼륨에 업로드하고 이를 참조하는 AWS Glue 작업을 생성합니다. 표준 Amazon S3 버킷에서 스크립트를 AWS Glue 로드하는 것과 동일한 방식으로 작업 시작 시 액세스 포인트에서 스크립트를 로드합니다.
$# Upload the script to the access point aws s3 cp taxi_transform.py \ s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py # Create the Glue job aws glue create-job \ --namefsxn-taxi-transform\ --rolemy-glue-role-arn\ --command '{ "Name": "glueetl", "ScriptLocation": "s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py", "PythonVersion": "3" }' \ --default-arguments '{ "--AP_ALIAS": "my-ap-alias-ext-s3alias", "--job-language": "python" }' \ --glue-version "4.0" \ --number-of-workers 2 \ --worker-type "G.1X"
3단계: 작업 실행
$aws glue start-job-run --job-namefsxn-taxi-transform
작업 상태를 모니터링합니다. 작업은 일반적으로 2명의 G.1X 작업자와 함께 1~2분 내에 완료됩니다.
$aws glue get-job-runs --job-namefsxn-taxi-transform\ --query "JobRuns[0].{State:JobRunState,Duration:ExecutionTime,Error:ErrorMessage}"
작업이 완료되면 FSx for ONTAP 볼륨에서 변환된 출력을 확인합니다.
$aws s3 ls s3://my-ap-alias-ext-s3alias/taxi-data-transformed/PRE time_of_day=afternoon/ PRE time_of_day=evening/ PRE time_of_day=morning/ PRE time_of_day=night/
출력은 시간대별로 네 개의 디렉터리로 분할됩니다. 각 파티션에는 변환된 데이터가 포함된 Parquet 파일이 포함되어 있습니다.
4단계: 변환된 데이터 쿼리
변환된 출력에서 AWS Glue 크롤러를 실행하여에 등록한 AWS Glue Data Catalog다음 Athena로 쿼리합니다.
$# Create a crawler for the transformed data aws glue create-crawler \ --namefsxn-taxi-transformed-crawler\ --rolemy-glue-role-arn\ --database-namefsxn_taxi_demo\ --targets '{"S3Targets": [{"Path": "s3://my-ap-alias-ext-s3alias/taxi-data-transformed/"}]}' # Run the crawler aws glue start-crawler --namefsxn-taxi-transformed-crawler
크롤러가 완료되면 Athena에서 변환된 데이터를 쿼리합니다. 분할된 구조를 통해 Athena는 관련 파티션만 스캔할 수 있습니다.
-- Average cost per mile by time of day SELECT time_of_day, COUNT(*) AS trip_count, ROUND(AVG(cost_per_mile), 2) AS avg_cost_per_mile, ROUND(AVG(tip_amount), 2) AS avg_tip FROM fsxn_taxi_demo.taxi_data_transformed GROUP BY time_of_day ORDER BY trip_count DESC
-- Busiest pickup locations during morning rush SELECT PULocationID AS pickup_location, COUNT(*) AS trip_count, ROUND(AVG(trip_distance), 2) AS avg_distance FROM fsxn_taxi_demo.taxi_data_transformed WHERE time_of_day = 'morning' GROUP BY PULocationID ORDER BY trip_count DESC LIMIT 10
데이터는에 의해 분할되므로 time_of_day두 번째 쿼리는 morning 파티션만 스캔하여 데이터 읽기 양을 줄이고 쿼리 성능을 개선합니다.
고려 사항
인터넷 오리진 required. AWS Glue jobs는 VPC 외부의 관리형 인프라에서 Amazon S3에 액세스합니다. 인터넷 오리진 액세스 포인트를 사용해야 합니다.
읽기 및 쓰기. AWS Glue ETL 작업은 액세스 포인트를 통해 FSx for ONTAP 볼륨에서 읽고 쓸 수 있습니다. 액세스 포인트 정책 및 파일 시스템 사용자는
s3:GetObject및를 모두 허용해야 합니다s3:PutObject.작업자 크기 조정. AWS Glue 작업자 수와 유형은 작업 성능 및 비용에 영향을 미칩니다. 48MB 샘플 데이터 세트의 경우 2개의 G.1X 작업자로 충분합니다. 더 큰 데이터 세트의 경우 작업자 수를 늘리거나 G.2X 작업자를 사용합니다.
파티셔닝. 분할된 출력을 작성하면 Athena 및 기타 분석 서비스의 다운스트림 쿼리 성능이 향상됩니다. 일반적으로 데이터를 쿼리하는 방식에 따라 파티션 키를 선택합니다.
스크립트 storage. AWS Glue loads 작업 시작 시 Amazon S3에서 ETL 스크립트를 로드합니다. 이 자습서에서는 스크립트가 데이터와 함께 존재하도록 액세스 포인트에 스크립트를 저장하지만 표준 Amazon S3 버킷에서도 호스팅할 수 있습니다. 독립 실행형 버킷을 사용하는 경우 스크립트 버킷 ARN에서 역할의 인라인 정책을 로 확장
s3:GetObject합니다.
정리
요금이 계속 부과되지 않도록 하려면이 자습서에서 생성한 리소스를 삭제합니다.
Athena 쿼리 편집기에서 크롤러가 생성한 테이블을 삭제합니다.
DROP TABLE IF EXISTS fsxn_taxi_demo.taxi_data_transformed;
$# Delete the Glue job and crawler aws glue delete-job --namefsxn-taxi-transformaws glue delete-crawler --namefsxn-taxi-transformed-crawler# Delete the ETL script and transformed data from the access point aws s3 rm s3://my-ap-alias-ext-s3alias/glue-scripts/taxi_transform.py aws s3 rm s3://my-ap-alias-ext-s3alias/taxi-data-transformed/ --recursive # Delete the IAM role aws iam delete-role-policy \ --role-namefsxn-tutorial-glue-etl-role\ --policy-name glue-fsxn-access aws iam delete-role --role-namefsxn-tutorial-glue-etl-role