

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 優化 Spark 效能
<a name="emr-spark-performance"></a>

Amazon EMR 為 Spark 提供多種效能優化功能。本主題將詳細說明每個最佳化功能。

如需如何設定 Spark 組態的詳細資訊，請參閱 [設定 Spark](emr-spark-configure.md)。

## 自適應查詢執行
<a name="emr-spark-performance-aqe"></a>

自適應查詢執行是一種依據執行期統計資料對查詢計畫進行重新優化的框架。從 Amazon EMR 5.30.0 開始，以下來自 Apache Spark 3 的自適應查詢執行優化也在 Apache Amazon EMR Runtime for Spark 2 上提供。
+ 自適應聯結轉換
+ 隨機分割區的自適應合併

**自適應聯結轉換**

自適應聯結轉換透過依據查詢階段的執行期大小將 sort-merge-join 操作轉換為 broadcast-hash-joins 操作，從而提升查詢的效能。當聯結的一側小到足以將其輸出廣播到所有執行器，因此無需隨機交換並排序聯結的兩側時，broadcast-hash-joins 的效能通常會更出色。當 Spark 自動執行 broadcast-hash-joins 時，自適應聯結轉換會擴大案例的範圍。

此功能預設為啟用。它可透過將 `spark.sql.adaptive.enabled` 設為 `false` 停用，但也會停用自適應查詢執行框架。當其中某個聯結的一側之執行期統計資料不超過 `spark.sql.autoBroadcastJoinThreshold` (預設為 10,485,760 位元組，即 10 MiB) 時，Spark 會決定將 sort-merge-join 轉換為 broadcast-hash-join。

**隨機分割區的自適應合併**

隨機分割區的自適應合併小型接續隨機分割區，以避免過多小型任務產生額外負荷，從而提升查詢的效能。這可讓您預先設定較高數量的初始隨機分割區，然後在執行期縮減為目標大小，提高使分散式隨機分割區變得更均勻的機率。

除非明確設定 `spark.sql.shuffle.partitions`，否則此功能預設為啟用。它可透過將 `spark.sql.adaptive.coalescePartitions.enabled` 設為 `true` 啟用。使用 `spark.sql.adaptive.coalescePartitions.minPartitionNum` 和 `spark.sql.adaptive.advisoryPartitionSizeInBytes` 屬性，可分別調整隨機分割區的初始數量和目標分割區大小。請見下表，了解此功能相關 Spark 屬性的更詳細資訊。


**Spark 自適應合併分割區屬性**  

| 屬性 | 預設值 | Description | 
| --- | --- | --- | 
|  `spark.sql.adaptive.coalescePartitions.enabled`  |  true，除非明確設定 `spark.sql.shuffle.partitions`  |  當為 true 且 spark.sql.adaptive.enabled 也設為 true 時，Spark 會依據目標大小合併接續的隨機分割區 (由 `spark.sql.adaptive.advisoryPartitionSizeInBytes` 指定)，以避免有過多小型任務。  | 
|  `spark.sql.adaptive.advisoryPartitionSizeInBytes`  | 64 MB |  合併時隨機分割區的建議大小 (以位元組為單位)。此組態僅在 `spark.sql.adaptive.enabled` 和 `spark.sql.adaptive.coalescePartitions.enabled` 均為 `true` 時有影響。  | 
|  `spark.sql.adaptive.coalescePartitions.minPartitionNum`  | 25 |  合併後隨機分割區的最小數量。此組態僅在 `spark.sql.adaptive.enabled` 和 `spark.sql.adaptive.coalescePartitions.enabled` 均為 `true` 時有影響。  | 
|  `spark.sql.adaptive.coalescePartitions.initialPartitionNum`  | 1000 |  合併前隨機分割區的初始數量。此組態僅在 `spark.sql.adaptive.enabled` 和 `spark.sql.adaptive.coalescePartitions.enabled` 均為 `true` 時有影響。  | 

## 動態分割區剔除
<a name="emr-spark-performance-dynamic"></a>

動態分割區清除可透過更準確選取表格中需要供特定查詢讀取和處理的特定分割區，來改善任務效能。透過降低讀取和處理的資料量，即可省下任務執行所用的大量時間。使用 Amazon EMR 5.26.0，此功能預設為啟用。在 Amazon EMR 5.24.0 和 5.25.0 中，您可以透過從 Spark 設定 Spark 屬性 `spark.sql.dynamicPartitionPruning.enabled` 或在建立叢集時，來啟用此功能。


**Spark 動態分割區剔除分割區屬性**  

| 屬性 | 預設值 | Description | 
| --- | --- | --- | 
|  `spark.sql.dynamicPartitionPruning.enabled`  |  `true`  |  如果為 true，啟用動態分割區剔除。  | 
|  `spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse`  |  `true`  |  若為 `true`，Spark 會在查詢執行前執行防禦性檢查，以確保動態剔除篩選條件中廣播交換的重複使用不會被後續的準備規則 (例如使用者定義的單欄規則) 中斷。當重複使用被中斷且此組態為 `true`，Spark 會移除受影響的動態剔除篩選條件，以防範發生效能與正確性問題。當動態剔除篩選條件的廣播交換產生與對應聯結操作的廣播交換不同或不一致的結果時，即可能發生正確性問題。將此組態設為 `false` 時應保持謹慎；它支援規避一些情形，例如重複使用被使用者定義的單欄規則中斷。啟用「自適應查詢執行」時，始終強制執行廣播重複使用。  | 

此最佳化可改善 Spark 2.4.1 現有功能，此版本僅支援下推可在計劃時間解析的靜態述詞。

以下是在 Spark 2.4.2 中下推的靜態述詞範例。

```
partition_col = 5

partition_col IN (1,3,5)

partition_col between 1 and 3

partition_col = 1 + 3
```

動態分割區清除允許 Spark 引擎在執行時間動態推斷需要讀取哪些分割區，以及可安全消除哪些分割區。例如，以下查詢包含兩個表格：`store_sales` 表格，其中包含所有商店的所有總銷售，並依區域分區，而 `store_regions` 表格包含每個國家的區域對應。此表格包含分佈於世界各地的商店相關資料，但我們只會查詢北美洲的資料。

```
select ss.quarter, ss.region, ss.store, ss.total_sales 
from store_sales ss, store_regions sr
where ss.region = sr.region and sr.country = 'North America'
```

在沒有使用動態分割區清除時，這個查詢會讀取所有區域，再篩選符合子查詢結果的區域子集。使用動態分割區清除時，這個查詢只會為在子查詢中傳回的區域讀取和處理分割區。這可透過讀取較少儲存區的資料和處理較少記錄來節省時間和資源。

## 扁平化純量子查詢
<a name="emr-spark-performance-flatten"></a>

此最佳化可改善在相同表格具備純量子查詢的查詢效能。使用 Amazon EMR 5.26.0，此功能預設為啟用。在 Amazon EMR 5.24.0 和 5.25.0 中，您可以透過從 Spark 設定 Spark 屬性 `spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled` 或在建立叢集時，來啟用此功能。當此屬性設為 true，如有可能，此查詢最佳化工具會將使用相同關係的彙總純量子查詢扁平化。純量子查詢的扁平化是透過將子查詢中存在的任何述詞推送至彙總函數，然後再使用每個關係的所有彙總函數來執行一個彙總。

以下是將受益於此最佳化的查詢範本。

```
select (select avg(age) from students                    /* Subquery 1 */
                 where age between 5 and 10) as group1,
       (select avg(age) from students                    /* Subquery 2 */
                 where age between 10 and 15) as group2,
       (select avg(age) from students                    /* Subquery 3 */
                 where age between 15 and 20) as group3
```

此最佳化會透過以下方式重寫先前的查詢：

```
select c1 as group1, c2 as group2, c3 as group3
from (select avg (if(age between 5 and 10, age, null)) as c1,
             avg (if(age between 10 and 15, age, null)) as c2,
             avg (if(age between 15 and 20, age, null)) as c3 from students);
```

請注意，重寫查詢只會讀取一次學生的表格，三個子查詢的述詞會被推送至該 `avg` 函數。

## INTERSECT 前的 DISTINCT
<a name="emr-spark-performance-distinct"></a>

使用 INTERSECT 時此最佳化會將聯結最佳化。使用 Amazon EMR 5.26.0，此功能預設為啟用。在 Amazon EMR 5.24.0 和 5.25.0 中，您可以透過從 Spark 設定 Spark 屬性 `spark.sql.optimizer.distinctBeforeIntersect.enabled` 或在建立叢集時，來啟用此功能。使用 INTERSECT 的查詢會自動轉換為使用 left-semi 聯結。此屬性設為 true 時，如果查詢最佳化工具偵測到 DISTINCT 運算子可讓 left-semi 聯結變成 BroadcastHashJoin (而不是 SortMergeJoin)，則會將 DISTINCT 運算子推送到 INTERSECT 的子項。

以下是將受益於此最佳化的查詢範本。

```
(select item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
intersect
(select item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
```

沒有啟用此屬性 `spark.sql.optimizer.distinctBeforeIntersect.enabled` 時，查詢的重寫方式如下。

```
select distinct brand from
  (select item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
left semi join
   (select item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
 on brand <=> cs_brand
```

啟用此屬性 `spark.sql.optimizer.distinctBeforeIntersect.enabled` 時，查詢的重寫方式如下。

```
select brand from
  (select distinct item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
left semi join
   (select distinct item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
 on brand <=> cs_brand
```

## Bloom 篩選條件聯結
<a name="emr-spark-performance-bloom"></a>

此最佳化可以使用從聯結另一側之值產生的 [Bloom 篩選條件](https://en.wikipedia.org/wiki/Bloom_filter)，預先篩選聯結的一側，進而提升某些聯結的效能。使用 Amazon EMR 5.26.0，此功能預設為啟用。在 Amazon EMR 5.25.0 中，您可以透過從 Spark 設定 Spark 屬性 `spark.sql.bloomFilterJoin.enabled` 至 `true` 或在建立叢集時，來啟用此功能。

以下是受益於 Bloom 篩選條件的範例查詢。

```
select count(*)
from sales, item
where sales.item_id = item.id
and item.category in (1, 10, 16)
```

啟用此功能時，Bloom 篩選條件是以其類別落於正接受查詢之類別集合的所有項目 ID 所建立。掃描銷售資料表時，Bloom 篩選條件用於判定哪些銷售是肯定不在 Bloom Filter 所定義之集合中的項目。因此能夠盡早篩選掉這些已識別的銷售。

## 優化的聯結重新排序
<a name="emr-spark-performance-join-reorder"></a>

此最佳化可透過篩選條件重新排序涉及資料表的聯結，以提升查詢效能。使用 Amazon EMR 5.26.0，此功能預設為啟用。在 Amazon EMR 5.25.0 中，您可以將 Spark 組態參數 `spark.sql.optimizer.sizeBasedJoinReorder.enabled` 設為 true，來啟用此功能。如在查詢中所列出，在 Spark 中的預設行為是由左至右聯結資料表。此策略會以篩選條件跳過執行較小聯結的機會，以受益於之後成本較高的聯結。

以下範例查詢回報一國內所有商店的全部退貨項目。若無最佳化的聯結重新排序功能，Spark 會先聯結兩張大型資料表 `store_sales` 和 `store_returns`，然後將這些資料表與 `store` 聯結，最後與 `item` 聯結。

```
select ss.item_value, sr.return_date, s.name, i.desc, 
from store_sales ss, store_returns sr, store s, item i
where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id
and s.country = 'USA'
```

有了最佳化的聯結重新排序功能，由於 `store` 有篩選條件且小於 `store_returns` 和 `broadcastable`，因此 Spark 會先將 `store_sales` 與 `store` 聯結。然後，Spark 與 `store_returns` 聯結，最後與 `item` 聯結。如果 `item` 有篩選條件且是 broadcastable，它也會符合重新排序的資格，因而讓 `store_sales` 與 `store` 聯結，然後與 `item` 聯結，最後再與 `store_returns` 聯結。