기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Spark 성능 최적화
Amazon EMR은 Spark를 위한 여러 성능 최적화 성능을 제공합니다. 이 주제에서는 각 최적화 기능에 대해 자세히 설명합니다.
Spark 구성을 설정하는 방법에 대한 자세한 내용은 Spark 구성 단원을 참조하십시오.
적응형 쿼리 실행
적응형 쿼리 실행은 런타임 통계를 기반으로 쿼리 계획을 재최적화하기 위한 프레임워크입니다. Amazon EMR 5.30.0부터 Spark 2용 Apache Amazon EMR 런타임에서 Apache Spark 3의 다음과 같은 적응형 쿼리 실행 최적화를 사용할 수 있습니다.
-
적응형 조인 변환
-
셔플 파티션의 적응형 병합
적응형 조인 변환
적응형 조인 변환은 쿼리 스테이지의 런타임 크기에 따라 정렬-병합-조인 작업을 브로드캐스트-해시-조인 작업으로 변환하여 쿼리 성능을 개선합니다. 브로드캐스트-해시-조인은 조인의 한쪽이 출력을 모든 실행기에게 효율적으로 브로드캐스트할 수 있을 만큼 작은 경우 교환을 셔플하고 조인 양쪽을 모두 정렬할 필요가 없으므로 더 나은 성능을 지원합니다. 적응형 조인 변환은 Spark가 브로드캐스트-해시 조인을 자동으로 수행하는 경우더 다양한 사례를 지원합니다.
이 기능은 기본적으로 활성화되어 있습니다. spark.sql.adaptive.enabled
를 false
로 설정하여 비활성화할 수 있으며, 이렇게 하면 적응형 쿼리 실행 프레임워크도 비활성화됩니다. Spark는 조인 측 중 하나의 런타임 크기 통계가 spark.sql.autoBroadcastJoinThreshold
(기본값: 10,485,760바이트(10MiB))를 초과하지 않을 때 정렬-병합-조인을 브로드캐스트-해시-조인으로 변환하기로 결정합니다.
셔플 파티션의 적응형 병합
셔플 파티션의 적응형 병합은 작은 연속 셔플 파티션을 통합하여 작은 작업이 너무 많아서 발생하는 오버헤드를 방지함으로써 쿼리 성능을 개선합니다. 이렇게 하면 더 많은 수의 초기 셔플 파티션을 미리 구성한 다음, 런타임 시 목표 크기로 줄일 수 있으므로 셔플 파티션을 더 균등하게 분산할 가능성이 커집니다.
이 기능은 명시적으로 spark.sql.shuffle.partitions
를 설정하지 않는 한 기본적으로 활성화되어 있습니다. spark.sql.adaptive.coalescePartitions.enabled
를 true
로 설정하여 활성화할 수 있습니다. 셔플 파티션의 초기 수와 목표 파티션 크기는 각각 spark.sql.adaptive.coalescePartitions.minPartitionNum
및 spark.sql.adaptive.advisoryPartitionSizeInBytes
속성을 사용하여 조정할 수 있습니다. 이 기능의 관련 Spark 속성에 대한 자세한 내용은 다음 테이블을 참조하세요.
속성 | 기본값 | 설명 |
---|---|---|
|
true(명시적으로 |
이 속성이 true이고 spark.sql.adaptive.enabled가 true인 경우 Spark는 목표 크기( |
|
64MB |
병합 시 셔플 파티션의 권장 크기(바이트). 이 구성은 |
|
25 |
병합 후 셔플 파티션의 최소 수. 이 구성은 |
|
1000 |
병합 전 셔플 파티션의 초기 수. 이 구성은 |
동적 파티션 잘라내기
동적 파티션 잘라내기는 특정 쿼리를 위해 읽고 처리해야 하는 테이블 내 특정 파티션을 더 정확하게 선택하여 작업 성능을 향상시킵니다. 읽고 처리한 데이터의 양을 줄임으로써 작업 실행 시 상당한 시간이 절약됩니다. Amazon EMR 5.26.0의 경우 이 기능은 기본적으로 활성화되어 있습니다. Amazon EMR 5.24.0 및 5.25.0의 경우 Spark 내에서 또는 클러스터 생성 시 Spark 속성인 spark.sql.dynamicPartitionPruning.enabled
를 설정하여 이 기능을 활성화할 수 있습니다.
속성 | 기본 값 | 설명 |
---|---|---|
|
|
true인 경우 동적 파티션 정리를 활성화합니다. |
|
|
|
이러한 최적화를 통해 계획 시간에 해결할 수 있는 정적 조건자를 푸시다운하는 것만 지원하는 Spark 2.4.2의 기존 기능보다 나은 결과를 얻을 수 있습니다.
다음은 Spark 2.4.2이 실행하는 정적 조건자 푸시다운의 예입니다.
partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3
동적 파티션 잘라내기를 통해 Spark 엔진은 읽어야 할 파티션과 안전하게 제거할 수 있는 파티션을 실행 시간에 동적으로 추론할 수 있습니다. 예를 들어 다음 쿼리에는 두 가지 테이블이 수반됩니다. 즉 모든 스토어에 대해 총 매출 전체를 포함하고 리전에 따라 분할된 store_sales
테이블과 각 국가에 대해 리전 매핑을 포함하는 store_regions
테이블입니다. 테이블에는 전 세계에 분산된 스토어에 대한 데이터가 포함되어 있지만, 여기서는 북미에 대한 데이터만 쿼리합니다.
select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'
동적 파티션 잘라내기가 없으면 이 쿼리는 하위 쿼리의 결과와 일치하는 리전의 하위 집합을 필터링하기 전에 모든 리전을 읽습니다. 동적 파티션 잘라내기가 있는 경우 이 쿼리는 하위 쿼리에서 반환되는 리전에 대한 파티션만 읽고 처리합니다. 이를 통해 스토리지에서 더 적은 데이터를 읽고 더 적은 레코드를 처리함으로써 시간과 리소스를 절약합니다.
스칼라 하위 쿼리 평면화
이 최적화를 통해 동일한 테이블에 대한 스칼라 하위 쿼리가 있는 쿼리의 성능이 향상됩니다. Amazon EMR 5.26.0의 경우 이 기능은 기본적으로 활성화되어 있습니다. Amazon EMR 5.24.0 및 5.25.0의 경우 Spark 내에서 또는 클러스터 생성 시 Spark 속성인 spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled
를 설정하여 이 기능을 활성화할 수 있습니다. 이 속성을 true로 설정하면 이 쿼리 최적화 프로그램은 가능한 경우 동일한 관계를 사용하는 총 스칼라 하위 쿼리를 평면화합니다. 스칼라 하위 쿼리는 집계 함수에 대한 하위 쿼리에 있는 모든 조건자를 푸시한 후 관계당 모든 집계 함수로 한 가지 집계를 수행하는 방식으로 평면화됩니다.
다음은 이러한 최적화를 통해 이익을 얻을 쿼리의 샘플입니다.
select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3
최적화를 통해 이전 쿼리는 다음과 같이 재작성됩니다.
select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);
재작성된 쿼리는 학생 테이블을 한 번만 읽고 세 가지 하위 쿼리의 조건자는 avg
함수로 푸시된다는 점에 유의하십시오.
INTERSECT에 앞선 DISTINCT
이 최적화에서는 INTERSECT 사용 시 조인을 최적화합니다. Amazon EMR 5.26.0의 경우 이 기능은 기본적으로 활성화되어 있습니다. Amazon EMR 5.24.0 및 5.25.0의 경우 Spark 내에서 또는 클러스터 생성 시 Spark 속성인 spark.sql.optimizer.distinctBeforeIntersect.enabled
를 설정하여 이 기능을 활성화할 수 있습니다. INTERSECT를 사용하는 쿼리는 자동으로 좌측 세미 조인을 사용하도록 자동 변환됩니다. 이 속성이 true로 설정된 경우 DISTINCT 연산자가 좌측 세미 조인을 SortMergeJoin이 아닌 BroadcastHashJoin으로 만들 수 있음을 감지한 쿼리 최적화 프로그램은 DISTINCT 연산자를 INTERSECT의 하위 항목으로 푸시합니다.
다음은 이러한 최적화를 통해 이익을 얻을 쿼리의 샘플입니다.
(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)
이 spark.sql.optimizer.distinctBeforeIntersect.enabled
속성을 활성화하지 않으면 쿼리는 다음과 같이 재작성됩니다.
select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand
이 spark.sql.optimizer.distinctBeforeIntersect.enabled
속성을 활성화하면 쿼리는 다음과 같이 재작성됩니다.
select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand
블룸 필터 조인
이러한 최적화는 조인의 다른 측면 값에서 생성된 블룸 필터spark.sql.bloomFilterJoin.enabled
를 true
로 설정하여 이 기능을 활성화할 수 있습니다.
다음은 블룸 필터를 활용할 수 있는 예제 쿼리입니다.
select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)
이 기능을 사용할 때, 블룸 필터는 카테고리가 쿼리 처리되는 카테고리 세트에 있는 모든 항목 id들로 구축됩니다. 매출 테이블을 스캔하는 동안 블룸 필터를 사용해 어떤 매출이 블룸 필터에서 지정한 세트에서 분명 제외된 항목에 있는지 파악합니다. 따라서 이렇게 확인된 매출은 가능한 한 조기에 필터링될 수 있습니다.
최적화된 조인 재정렬
이 최적화로 필터와 함께 테이블에 관여하는 조인들을 재정렬하여 쿼리 성능을 향상시킬 수 있습니다. Amazon EMR 5.26.0의 경우 이 기능은 기본적으로 활성화되어 있습니다. Amazon EMR 5.25.0의 경우 Spark 구성 파라미터 spark.sql.optimizer.sizeBasedJoinReorder.enabled
를 true로 설정하여 이 기능을 활성화할 수 있습니다. Spark의 기본 동작은 쿼리에 기재된 대로 테이블을 왼쪽에서 오른쪽으로 조인하는 것입니다. 이러한 전략은 많이 사용하는 조인을 나중에 활용하기 위해 필터가 있는 소규모 조인을 우선 실행할 수 있는 기회를 놓칠 수 있습니다.
아래의 예시 쿼리는 한 국가의 모든 매장에 있는 모든 반환품을 보고합니다. 조인 재정렬을 최적화하지 않으면, Spark는 두 개의 대형 테이블인 store_sales
와 store_returns
를 우선 조인한 다음, 두 대형 테이블을 store
그리고 마지막으로 item
과 조인합니다.
select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'
최적화된 조인 재정렬을 활용해 Spark는 store_sales
와 store
를 우선 조인하는 데, 이는 store
에 필터가 있으며 store_returns
및 broadcastable
보다 소규모이기 때문입니다. 그런 다음 Spark는 store_returns
와 조인한 뒤 마지막으로 item
와 조인합니다. item
에 필터가 있고 브로드캐스팅할 수 있다면, 재정렬에 적격하며 store_sales
를 store
와, 그리고 item
, 또 결국 store_returns
와 조인하게 됩니다.