태스크 병렬화 -

태스크 병렬화

성능을 최적화하려면 데이터 로드 및 변환에 대한 태스크를 병렬화하는 것이 중요합니다. Apache Spark의 주요 주제에서 설명한 것처럼 복원력이 뛰어난 분산 데이터세트(RDD) 파티션의 수는 병렬화 수준을 결정하기 때문에 중요합니다. Spark가 생성하는 각 태스크는 1:1 기준으로 RDD 파티션에 해당합니다. 최상의 성능을 얻으려면 RDD 파티션 수를 결정하는 방법과 해당 수를 최적화하는 방법을 이해해야 합니다.

병렬 처리가 충분하지 않은 경우 CloudWatch 지표 및 Spark UI에 다음 증상이 기록됩니다.

CloudWatch 지표

CPU 로드메모리 사용률을 확인합니다. 작업 단계에서 일부 실행기가 처리 중이 아닌 경우 병렬 처리를 개선하는 것이 좋습니다. 이 경우 시각화된 기간에 실행기 1이 태스크를 수행했지만 나머지 실행기(2, 3, 4)는 수행하지 않았습니다. Spark 드라이버에서 해당 실행기에 태스크가 할당되지 않았다고 추론할 수 있습니다.

드라이버와 하나의 실행기만 보여주는 그래프.

Spark UI

Spark UI의 Stage 탭에서 단계의 태스크 수 볼 수 있습니다. 이 경우 Spark는 하나의 태스크만 수행했습니다.

""

또한 이벤트 타임라인에는 실행기 1이 하나의 태스크를 처리하는 것으로 표시됩니다. 즉, 이 단계의 작업은 하나의 실행기에서 완전히 수행되었고 다른 실행기는 유휴 상태였습니다.

하나의 태스크만 보여주는 이벤트 타임라인.

이러한 증상이 관찰되면 각 데이터 소스에 대해 다음 솔루션을 시도해 보세요.

Amazon S3에서 데이터 로드 병렬화

Amazon S3에서 데이터 로드를 병렬화하려면 먼저 기본 파티션 수를 확인합니다. 그런 다음 대상 파티션 수를 수동으로 결정할 수 있지만 파티션이 너무 많지 않도록 해야 합니다.

기본 파티션 수 결정

Amazon S3의 경우 초기 Spark RDD 파티션 수(각각 Spark 태스크에 해당)는 Amazon S3 데이터세트의 기능(예: 형식, 압축 및 크기)에 따라 결정됩니다. Amazon S3에 저장된 CSV 객체에서 AWS Glue DynamicFrame 또는 Spark DataFrame을 생성하면 다음과 같이 초기 RDD 파티션(NumPartitions) 수를 대략적으로 계산할 수 있습니다.

  • 객체 크기가 64MB 이하인 경우: NumPartitions = Number of Objects

  • 객체 크기가 64MB를 초과하는 경우: NumPartitions = Total Object Size / 64 MB

  • 분할 불가(gzip): NumPartitions = Number of Objects

데이터 스캔 양 감소 섹션에서 설명한 대로 Spark는 큰 S3 객체를 병렬로 처리할 수 있는 분할로 나눕니다. 객체가 분할 크기보다 크면 Spark는 객체를 분할하고 각 분할에 대해 RDD 파티션(및 태스크)을 생성합니다. Spark의 분할 크기는 데이터 형식과 런타임 환경에 기반하지만 이는 합리적인 시작 근사치입니다. 일부 객체는 gzip과 같은 분할할 수 없는 압축 형식을 사용하여 압축되므로 Spark는 이를 분할할 수 없습니다.

NumPartitions 값은 데이터 형식, 압축, AWS Glue 버전, AWS Glue 작업자 수 및 Spark 구성에 따라 다를 수 있습니다.

예를 들어 Spark DataFrame을 사용하여 단일 10GB csv.gz 객체를 로드하면 gzip은 분할할 수 없으므로 Spark 드라이버는 하나의 RDD 파티션(NumPartitions=1)만 생성합니다. 따라서 다음 그림에 설명된 대로 하나의 특정 Spark 실행기에 과도한 로드가 생기고 나머지 실행기에 태스크가 할당되지 않습니다.

Spark Web UI 단계 탭에서 단계의 실제 태스크 수(NumPartitions)를 확인하거나 코드에서 df.rdd.getNumPartitions()를 실행하여 병렬화를 확인합니다.

10GB gzip 파일이 생기면 해당 파일을 생성하는 시스템이 분할 가능한 형식으로 파일을 생성할 수 있는지 검사합니다. 옵션이 아닌 경우 클러스터 용량을 조정하여 파일을 처리해야 할 수 있습니다. 로드한 데이터에 대해 변환을 효율적으로 실행하려면 다시 분할을 사용하여 클러스터의 작업자 간에 RDD를 재조정해야 합니다.

수동으로 대상 파티션 수 확인

데이터의 속성과 Spark의 특정 기능 구현에 따라 기본 작업을 계속 병렬화할 수 있더라도 NumPartitions 값이 낮아질 수 있습니다. NumPartitions가 너무 작은 경우 df.repartition(N)을 실행하여 파티션 수를 늘리면 처리가 여러 Spark 실행기에 분산될 수 있습니다.

이 경우 df.repartition(100)을 실행하면 NumPartitions가 1에서 100으로 증가하고 각각 다른 실행기에 할당할 수 있는 태스크를 포함하는 100개의 데이터 파티션이 생성됩니다.

repartition(N) 작업은 전체 데이터를 균등하게 나누어(10GB/100개 파티션 = 100MB/파티션) 특정 파티션에 대한 데이터 스큐를 방지합니다.

참고

join과 같은 셔플 작업이 실행되면 spark.sql.shuffle.partitions 또는 spark.default.parallelism의 값에 따라 파티션 수가 동적으로 증가하거나 감소합니다. 이를 통해 Spark 실행기 사이에서 데이터를 보다 효율적으로 교환할 수 있습니다. 자세한 내용은 Spark 설명서를 참조하세요.

파티션의 목표 수를 결정할 때 목표는 프로비저닝된 AWS Glue 작업자의 사용을 극대화하는 것입니다. AWS Glue 작업자 수와 Spark 태스크 수는 vCPU 수를 통해 관련됩니다. Spark는 각 vCPU 코어에 대해 하나의 태스크를 지원합니다. AWS Glue 버전 3.0 이상에서는 다음 공식을 사용하여 대상 파티션 수를 계산할 수 있습니다.

# Calculate NumPartitions by WorkerType numExecutors = (NumberOfWorkers - 1) numSlotsPerExecutor = 4 if WorkerType is G.1X 8 if WorkerType is G.2X 16 if WorkerType is G.4X 32 if WorkerType is G.8X NumPartitions = numSlotsPerExecutor * numExecutors # Example: Glue 4.0 / G.1X / 10 Workers numExecutors = ( 10 - 1 ) = 9 # 1 Worker reserved on Spark Driver numSlotsPerExecutor = 4 # G.1X has 4 vCpu core ( Glue 3.0 or later ) NumPartitions = 9 * 4 = 36

이 예제에서 각 G.1X 작업자는 Spark 실행기(spark.executor.cores = 4)에 4개의 vCPU 코어를 제공합니다. Spark는 각 vCPU 코어에 대해 하나의 태스크를 지원하므로 G.1X Spark 실행기는 4개의 태스크(numSlotPerExecutor)를 동시에 실행할 수 있습니다. 태스크에 동일한 시간이 걸리는 경우 이 파티션 수는 클러스터를 완전히 사용합니다. 그러나 일부 태스크가 다른 태스크보다 오래 걸리므로 유휴 코어가 생성됩니다. 이 경우 병목 현상 태스크를 나누고 효율적으로 예약하려면 numPartitions에 2 또는 3을 곱하는 방법을 고려합니다.

파티션이 너무 많음

파티션 수가 너무 많으면 태스크 수가 너무 많습니다. 이로 인해 Spark 실행기 간의 관리 태스크 및 데이터 교환과 같은 분산 처리와 관련된 오버헤드 때문에 Spark 드라이버에 과부하가 발생합니다.

작업의 파티션 수가 목표 파티션 수보다 훨씬 큰 경우 파티션 수를 줄이는 방법을 고려합니다. 다음 옵션을 사용하여 파티션을 줄일 수 있습니다.

  • 파일 크기가 매우 작은 경우 AWS Glue groupFiles를 사용합니다. Apache Spark 태스크 시작으로 인한 과도한 병렬화를 줄여 각 파일을 처리할 수 있습니다.

  • coalesce(N)를 사용하여 파티션을 병합합니다. 이는 비용이 저렴한 프로세스입니다. 파티션 수를 줄이면 repartition(N)에서 셔플을 수행하여 각 파티션에 균등하게 레코드의 양을 분산하므로 coalesce(N)repartition(N)보다 선호됩니다. 이 경우 비용과 관리 오버헤드가 증가합니다.

  • Spark 3.x 적응형 쿼리 실행을 사용합니다. Apache Spark 섹션의 주요 주제에서 설명한 대로 적응형 쿼리 실행은 파티션 수를 자동으로 병합하는 함수를 제공합니다. 실행을 수행할 때까지 파티션 수를 알 수 없는 경우 이 접근 방식을 사용할 수 있습니다.

JDBC에서 데이터 로드 병렬화

Spark RDD 파티션 수는 구성에 의해 결정됩니다. 기본적으로 SELECT 쿼리를 통해 전체 소스 데이터세트를 스캔하기 위해 단일 태스크만 실행됩니다.

AWS Glue DynamicFrames와 Spark DataFrames는 모두 여러 태스크에서 병렬화된 JDBC 데이터 로드를 지원합니다. 이는 where 조건자를 사용하여 하나의 SELECT 쿼리를 여러 쿼리로 분할하여 수행됩니다. JDBC에서 읽기를 병렬화하려면 다음 옵션을 구성합니다.

  • AWS Glue DynamicFrame의 경우 hashfield(또는 hashexpression)hashpartition을 설정합니다. 자세한 내용은 JDBC 테이블을 병렬로 읽기를 참조하세요.

    connection_mysql8_options = { "url": "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test", "dbtable": "medicare_tb", "user": "test", "password": "XXXXXXXXX", "hashexpression":"id", "hashpartitions":"10" } datasource0 = glueContext.create_dynamic_frame.from_options( 'mysql', connection_options=connection_mysql8_options, transformation_ctx= "datasource0" )
  • Spark DataFrame의 경우 numPartitions, partitionColumn, lowerBound, upperBound를 설정합니다. 자세한 내용은 JDBC To Other Databases를 참조하세요.

    df = spark.read \ .format("jdbc") \ .option("url", "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test") \ .option("dbtable", "medicare_tb") \ .option("user", "test") \ .option("password", "XXXXXXXXXX") \ .option("partitionColumn", "id") \ .option("numPartitions", "10") \ .option("lowerBound", "0") \ .option("upperBound", "1141455") \ .load() df.write.format("json").save("s3://bucket_name/Tests/sparkjdbc/with_parallel/")

ETL 커넥터를 사용할 경우 DynamoDB에서 데이터 로드 병렬화

Spark RDD 파티션 수는 dynamodb.splits 파라미터에 의해 결정됩니다. Amazon DynamoDB에서 읽기를 병렬화하려면 다음 옵션을 구성합니다.

Kinesis Data Streams에서 데이터 로드 병렬화

Spark RDD 파티션 수는 소스 Amazon Kinesis Data Streams 데이터 스트림의 샤드 수에 의해 결정됩니다. 데이터 스트림에 샤드가 몇 개뿐인 경우 Spark 태스크는 몇 개뿐입니다. 이로 인해 다운스트림 프로세스에서 병렬화가 낮아질 수 있습니다. Kinesis Data Streams에서 읽기를 병렬화하려면 다음 옵션을 구성합니다.

  • Kinesis Data Streams에서 데이터를 로드할 경우 더 많은 병렬화를 얻으려면 샤드 수를 늘립니다.

  • 마이크로 배치의 로직이 충분히 복잡한 경우 불필요한 열을 삭제한 후 배치 시작 시 데이터를 재분할하는 방법을 고려합니다.

자세한 내용은 Best practices to optimize cost and performance for AWS Glue streaming ETL jobs를 참조하세요.

데이터 로드 후 태스크 병렬화

데이터 로드 후 태스크를 병렬화하려면 다음 옵션을 사용하여 RDD 파티션 수를 늘립니다.

  • 특히 로드 자체를 병렬화할 수 없는 경우 초기 로드 직후 더 많은 수의 파티션을 생성하기 위해 데이터를 다시 분할합니다.

    파티션 수를 지정하여 DynamicFrame 또는 DataFrame에서 repartition()을 직접 호출합니다. 일반적으로 사용 가능한 코어 수의 2~3배가 적당합니다.

    그러나 분할된 테이블을 작성할 경우 파일이 크게 증가할 수 있습니다(각 파티션이 잠재적으로 각 테이블 파티션에 파일을 생성할 수 있음). 이를 방지하기 위해 열을 기준으로 DataFrame을 다시 분할할 수 있습니다. 이렇게 하면 쓰기 전에 데이터가 구성되도록 테이블 파티션 열이 사용됩니다. 테이블 파티션에서 작은 파일을 가져오지 않고도 더 많은 수의 파티션을 지정할 수 있습니다. 그러나 일부 파티션 값이 대부분의 데이터로 끝나고 작업 완료가 지연되는 데이터 스큐를 피하도록 주의하세요.

  • 셔플이 있는 경우 spark.sql.shuffle.partitions 값을 늘립니다. 또한 셔플 작업 시 메모리 문제를 해결하는 데 도움이 될 수 있습니다.

    2,001개가 넘는 셔플 파티션이 있는 경우 Spark는 압축된 메모리 형식을 사용합니다. 숫자가 이에 근사한 경우 보다 효율적인 표현을 얻기 위해 해당 한도를 초과하는 spark.sql.shuffle.partitions 값을 설정하려고 할 수 있습니다.