

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Spark パフォーマンスの最適化
<a name="emr-spark-performance"></a>

Amazon EMR では、Spark 向けに複数のパフォーマンス最適化機能が用意されています。このトピックでは、それぞれの最適化機能について詳しく説明します。

Spark の設定方法に関する詳細については、「[Spark の設定](emr-spark-configure.md)」を参照してください。

## アダプティブクエリ実行
<a name="emr-spark-performance-aqe"></a>

アダプティブクエリ実行は、ランタイム統計に基づいてクエリプランを再最適化するためのフレームワークです。Amazon EMR 5.30.0 以降、Apache Spark 3 からの次のアダプティブクエリ実行最適化が Spark 2 の Apache Amazon EMR ランタイムで利用できます。
+ アダプティブ結合変換
+ シャッフルパーティションのアダプティブ結合

**アダプティブ結合変換**

アダプティブ結合変換は、クエリステージのランタイムサイズに基づいて、sort-merge-join オペレーションを broadcast-hash-joins オペレーションに変換することで、クエリのパフォーマンスを向上させます。Broadcast-hash-joins は、結合の片側がすべてのエグゼキューターに出力を効率的にブロードキャストするのに十分な程度小さい場合、交換をシャッフルして結合の両側をソートする必要がなくなるため、パフォーマンスがより高くなる傾向があります。アダプティブ結合変換では、Spark が broadcast-hash-joins を自動的に実行するときのケースの範囲が広がります。

この機能は、デフォルトでご利用になれます。これは、`spark.sql.adaptive.enabled` を `false` に設定することで無効にできます。これにより、アダプティブクエリ実行フレームワークも無効になります。Spark は、結合側のいずれかのランタイムサイズ統計が `spark.sql.autoBroadcastJoinThreshold` を超えない場合 (デフォルトで 10,485,760 バイト (10 MiB))、sort-merge-join を broadcast-hash-join に変換するように決定します。

**シャッフルパーティションのアダプティブ結合**

シャッフルパーティションのアダプティブ結合により、小さな連続するシャッフルパーティションが結合され、小さなタスクが多すぎることによるオーバーヘッドを回避することで、クエリのパフォーマンスが向上します。これにより、事前に初期シャッフルパーティションの数をより高く設定することができ、ランタイムでターゲットサイズまで削減されるため、シャッフルパーティションがより均等に分散される可能性が高まります。

この機能は、`spark.sql.shuffle.partitions` が明示的に設定されていない限り、デフォルトで有効になっています。これは、`spark.sql.adaptive.coalescePartitions.enabled` を `true` に設定することで有効にできます。シャッフルパーティションの初期数とターゲットパーティションのサイズは、それぞれ `spark.sql.adaptive.coalescePartitions.minPartitionNum` プロパティと `spark.sql.adaptive.advisoryPartitionSizeInBytes` プロパティを使用して調整できます。この機能に関連する Spark プロパティの詳細については、以下の表を参照してください。


**Spark アダプティブ結合パーティションのプロパティ**  

| プロパティ | デフォルトの値 | 説明 | 
| --- | --- | --- | 
|  `spark.sql.adaptive.coalescePartitions.enabled`  |  `spark.sql.shuffle.partitions` が明示的に設定されている場合を除き、true  |  true であり、かつ、spark.sql.adaptive.enabled が true の場合、Spark はターゲットサイズ (`spark.sql.adaptive.advisoryPartitionSizeInBytes` によって指定) に従って、連続するシャッフルパーティションを結合し、小さいタスクが多くなり過ぎないようにします。  | 
|  `spark.sql.adaptive.advisoryPartitionSizeInBytes`  | 64MB |  結合時のシャッフルパーティションのアドバイザリサイズ (バイト単位)。この設定は、`spark.sql.adaptive.enabled` と `spark.sql.adaptive.coalescePartitions.enabled` が両方とも `true` の場合にのみ効果があります。  | 
|  `spark.sql.adaptive.coalescePartitions.minPartitionNum`  | 25 |  結合後のシャッフルパーティションの最小数。この設定は、`spark.sql.adaptive.enabled` と `spark.sql.adaptive.coalescePartitions.enabled` が両方とも `true` の場合にのみ効果があります。  | 
|  `spark.sql.adaptive.coalescePartitions.initialPartitionNum`  | 1,000 |  結合前のシャッフルパーティションの初期数。この設定は、`spark.sql.adaptive.enabled` と `spark.sql.adaptive.coalescePartitions.enabled` が両方とも `true` の場合にのみ効果があります。  | 

## ダイナミックパーティションプルーニング
<a name="emr-spark-performance-dynamic"></a>

ダイナミックパーティションプルーニングでは、特定のパーティションを特定のクエリを読み取って処理するテーブル内でより正確に選択し、ジョブパフォーマンスを改善します。読み取って処理するデータ量を減らすと、ジョブの実行において大幅な時間の節約になります。Amazon EMR 5.26.0 では、この機能はデフォルトで有効になっています。Amazon EMR 5.24.0 および 5.25.0 では、Spark 内から Spark プロパティ `spark.sql.dynamicPartitionPruning.enabled` を設定して有効にできます。また、クラスターの作成時に有効にすることもできます。


**Spark ダイナミックパーティションプルーニングのパーティションのプロパティ**  

| プロパティ | デフォルト値 | 説明 | 
| --- | --- | --- | 
|  `spark.sql.dynamicPartitionPruning.enabled`  |  `true`  |  true の場合、ダイナミックパーティションプルーニングが有効になります。  | 
|  `spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse`  |  `true`  |  `true` の場合、Spark はクエリの実行前に防御チェックを行い、ダイナミックプルーニングフィルターでのブロードキャスト交換の再利用が、ユーザー定義の列指向ルールなどの後続の準備ルールによって破損していないことを確認します。再利用が壊れていて、この config が `true` の場合、Spark は影響を受けるダイナミックプルーニングフィルターを除去し、パフォーマンスと正確性に関する問題から保護します。ダイナミックプルーニングフィルターのブロードキャスト交換によって、対応する結合オペレーションのブロードキャスト交換とは異なる、整合しない結果が生じると、正確性に関する問題が発生します。この構成ではユーザー定義の列指向ルールによって再利用が破損している場合などのシナリオを回避できるため、これを `false` に設定する場合は慎重に行ってください。アダプティブクエリ実行が有効になっている場合、ブロードキャストの再利用は常に適用されます。  | 

この最適化は、予定時間に解決される固定述語のプッシュダウンにのみ対応している Spark 2.4.2 の既存の機能を改良するものです。

Spark 2.4.2 での固定述語のプッシュダウンの例を以下に示します。

```
partition_col = 5

partition_col IN (1,3,5)

partition_col between 1 and 3

partition_col = 1 + 3
```

ダイナミックパーティションプルーニングにより、Spark エンジンでランタイムに積極的に情報から推計でき、パーティションが読み取り、安全に削除できます。たとえば、以下のクエリには、すべてのストアの販売合計を含み、リージョンごとに分けられた `store_sales` テーブルと、各国のリージョンのマッピングを含む `store_regions` テーブルという、2 つのテーブルが含まれます。テーブルには世界中に分散しているストアに関するデータが含まれていますが、北米のデータのみをクエリします。

```
select ss.quarter, ss.region, ss.store, ss.total_sales 
from store_sales ss, store_regions sr
where ss.region = sr.region and sr.country = 'North America'
```

ダイナミックパーティションプルーニングを使用しない場合、このクエリはサブクエリ結果に一致する地域のサブセットをフィルタリングする前に、すべての地域のデータを読み取ります。ダイナミックパーティションプルーニングを使用すると、このクエリはサブクエリで返された地域のパーティションのみを読み取り処理します。このため、ストレージのデータの読み取りや履歴の処理が少なくなり、時間とリソースが削減されます。

## スカラサブクエリの平坦化
<a name="emr-spark-performance-flatten"></a>

この最適化では、同じテーブルにスカラサブクエリのあるクエリのパフォーマンスを向上させます。Amazon EMR 5.26.0 では、この機能はデフォルトで有効になっています。Amazon EMR 5.24.0 および 5.25.0 では、Spark プロパティ `spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled` をSpark 内で設定して有効にできます。また、クラスターの作成時に有効にすることもできます。このプロパティを true に設定すると、可能な場合に、クエリオプティマイザが同じ関係式を使用する集約スカラサブクエリを平坦化します。スカラサブクエリは、サブクエリ内に存在する述語を集約関数にプッシュし、リレーションごとにすべての集約関数を使用して 1 つの集約を実行することによって平坦化されます。

以下は、この最適化を行うメリットがあるクエリの例です。

```
select (select avg(age) from students                    /* Subquery 1 */
                 where age between 5 and 10) as group1,
       (select avg(age) from students                    /* Subquery 2 */
                 where age between 10 and 15) as group2,
       (select avg(age) from students                    /* Subquery 3 */
                 where age between 15 and 20) as group3
```

最適化により、以前のクエリが次のように書き換えられます。

```
select c1 as group1, c2 as group2, c3 as group3
from (select avg (if(age between 5 and 10, age, null)) as c1,
             avg (if(age between 10 and 15, age, null)) as c2,
             avg (if(age between 15 and 20, age, null)) as c3 from students);
```

書き換えられたクエリは一度だけ student テーブルを読み取ります。また、3 つのサブクエリの述語が `avg` 関数にプッシュされます。

## INTERSECT 前に DISTINCT
<a name="emr-spark-performance-distinct"></a>

この最適化では、INTERSECT を使用する場合の結合を最適化します。Amazon EMR 5.26.0 では、この機能はデフォルトで有効になっています。Amazon EMR 5.24.0 および 5.25.0 では、Spark プロパティ `spark.sql.optimizer.distinctBeforeIntersect.enabled` をSpark 内で設定して有効にできます。また、クラスターの作成時に有効にすることもできます。INTERSECT を使用したクエリは、左準結合を使用するように自動変換されます。このプロパティを true に設定すると、DISTINCT 演算子が左準結合を SortMergeJoin ではなく BroadcastHashJoin にできることを検出すると、DISTINCT 演算子を INTERSECT の子にプッシュします。

以下は、この最適化を行うメリットがあるクエリの例です。

```
(select item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
intersect
(select item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
```

このプロパティが有効になっていないと `spark.sql.optimizer.distinctBeforeIntersect.enabled` 、クエリは以下のように書き換えられます。

```
select distinct brand from
  (select item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
left semi join
   (select item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
 on brand <=> cs_brand
```

このプロパティを有効にすると `spark.sql.optimizer.distinctBeforeIntersect.enabled` 、クエリは以下のように書き換えられます。

```
select brand from
  (select distinct item.brand brand from store_sales, item
     where store_sales.item_id = item.item_id)
left semi join
   (select distinct item.brand cs_brand from catalog_sales, item 
     where catalog_sales.item_id = item.item_id)
 on brand <=> cs_brand
```

## ブルームフィルター結合
<a name="emr-spark-performance-bloom"></a>

この最適化では、結合の一方の側の値から生成された[Bloom フィルター](https://en.wikipedia.org/wiki/Bloom_filter)を使用して、結合のもう一方の側を事前にフィルタリングすることで、一部の結合のパフォーマンスを向上させます。Amazon EMR 5.26.0 では、この機能はデフォルトで有効になっています。Amazon EMR 5.25.0 では、Spark 内で Spark プロパティ `spark.sql.bloomFilterJoin.enabled` を `true` に設定してこの機能を有効にできます。また、クラスターを作成するときにもこの機能を有効にできます。

以下は、Bloom フィルターを使用することによってメリットがあるクエリの例です。

```
select count(*)
from sales, item
where sales.item_id = item.id
and item.category in (1, 10, 16)
```

この機能を有効にすると、Bloom フィルターが、クエリの対象となるカテゴリのセットに属するカテゴリを持つすべてのアイテム ID から構築されます。sales テーブルのスキャン中に Bloom フィルターを使用して、Bloom フィルターで定義されたセット内に確実に含まれていないアイテムの売上を洗い出します。こうすることで、特定されたこれらの売上をできるだけ早い段階で除外できます。

## 結合順序の最適化
<a name="emr-spark-performance-join-reorder"></a>

この最適化では、フィルターを持つテーブルを含む結合の順序を変更することでクエリのパフォーマンスを向上させることができます。Amazon EMR 5.26.0 では、この機能はデフォルトで有効になっています。Amazon EMR 5.25.0 では、Spark 設定パラメータ `spark.sql.optimizer.sizeBasedJoinReorder.enabled` を true に設定することで、この機能を有効にすることができます。Spark のデフォルトでは、クエリにリストされている順に、テーブルと左から右に結合するようになっています。この手法では、フィルターを含む小さい結合を最初に実行し、コストの高い結合を後で行うというチャンスを逃してしまいます。

このクエリの例では、ある国のすべての店舗に返品されたすべての商品が報告されます。結合順序を最適化しなかった場合、Spark は、大きなテーブル 2 つ (`store_sales` と `store_returns`) を最初に結合した後、これを `store` と結合し、最後に `item` と結合します。

```
select ss.item_value, sr.return_date, s.name, i.desc, 
from store_sales ss, store_returns sr, store s, item i
where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id
and s.country = 'USA'
```

結合順序を最適化した場合、Spark は、まず `store_sales` を`store` と結合します。`store` にはフィルターがあり、`store_returns` および `broadcastable` よりも小さいためです。次に `store_returns`、最後に `item` と結合します。`item` にフィルターがあり、プロードキャストが可能な場合は、結合順序の変更の対象となるため、`store_sales` が `store` と結合され、続いて`item`、最後に `store_returns` と結合されます。