

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 最佳化使用者定義的函數
<a name="optimize-user-defined-functions"></a>

PySpark `RDD.map`中的使用者定義函數 UDFs) 和 通常會大幅降低效能。 PySpark 這是因為在 Spark 的基礎 Scala 實作中準確代表 Python 程式碼所需的額外負荷。

下圖顯示 PySpark 任務的架構。當您使用 PySpark 時，Spark 驅動程式會使用 Py4j 程式庫從 Python 呼叫 Java 方法。呼叫 Spark SQL 或 DataFrame 內建函數時，Python 和 Scala 之間的效能差異不大，因為這些函數使用最佳化的執行計畫在每個執行器的 JVM 上執行。



![Spark 內容會使用 Py4J 連線至 Spark 驅動程式，而驅動程式會連線至工作者節點。](http://docs.aws.amazon.com/zh_tw/prescriptive-guidance/latest/tuning-aws-glue-for-apache-spark/images/worker-nodes.png)


如果您使用自己的 Python 邏輯，例如使用 `map/ mapPartitions/ udf`，任務將在 Python 執行時間環境中執行。管理兩個環境會產生額外負荷成本。此外，您必須轉換記憶體中的資料，以供 JVM 執行時間環境的內建函數使用。*Pickle* 是預設用於 JVM 和 Python 執行時間之間交換的序列化格式。不過，此序列化和還原序列化成本的成本非常高，因此以 Java 或 Scala 編寫的 UDFs 比 Python UDFs 更快。

若要避免 PySpark 中的序列化和還原序列化額外負荷，請考慮下列事項：
+ **使用內建 Spark SQL 函數** – 考慮使用 Spark SQL 或 DataFrame 內建函數取代您自己的 UDF 或映射函數。執行 Spark SQL 或 DataFrame 內建函數時，Python 和 Scala 之間的效能差異不大，因為任務會在每個執行器的 JVM 上處理。
+ 在 **Scala 或 Java 中實作 UDFs ** – 考慮使用以 Java 或 Scala 撰寫的 UDF，因為它們在 JVM 上執行。
+ 針對**向量化工作負載使用 Apache Arrow 型 UDFs ** – 考慮使用 Arrow 型 UDFs。此功能也稱為向量化 UDF (Pandas UDF)。[Apache Arrow](https://arrow.apache.org/) 是一種與語言無關的記憶體內資料格式， AWS Glue 可用來在 JVM 和 Python 程序之間有效率地傳輸資料。這目前對使用 Pandas 或 NumPy 資料的 Python 使用者最有益。

  Arrow 是一種單欄式 （向量化） 格式。其用量並非自動，可能需要稍微變更組態或程式碼，才能充分利用並確保相容性。如需詳細資訊和限制，請參閱 [ PySpark 中的 Apache Arrow](https://spark.apache.org/docs/latest/api/python/tutorial/sql/arrow_pandas.html)。

  下列範例會比較標準 Python、向量化 UDF 和 Spark SQL 中的基本增量 UDF。

## 標準 Python UDF
<a name="python-udf"></a>

範例時間為 3.20 （秒）。

**範例程式碼**

```
# DataSet
df = spark.range(10000000).selectExpr("id AS a","id AS b")

# UDF Example
def plus(a,b):
    return a+b
spark.udf.register("plus",plus)

df.selectExpr("count(plus(a,b))").collect()
```

**執行計畫**

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(pythonUDF0#124)])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#580]
+- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#124)])
+- Project [pythonUDF0#124]
+- BatchEvalPython [plus(a#116L, b#117L)], [pythonUDF0#124]
+- Project [id#114L AS a#116L, id#114L AS b#117L]
+- Range (0, 10000000, step=1, splits=16)
```

## 向量化 UDF
<a name="vectorized-udf"></a>

範例時間為 0.59 （秒）。

向量化 UDF 比上一個 UDF 範例快 5 倍。檢查 `Physical Plan`時，您可以看到 `ArrowEvalPython`，其中顯示此應用程式是由 Apache Arrow 向量化。若要啟用向量化 UDF，您必須在程式碼`spark.sql.execution.arrow.pyspark.enabled = true`中指定 。

**範例程式碼**

```
# Vectorized UDF
from pyspark.sql.types import LongType
from pyspark.sql.functions import count, pandas_udf

# Enable Apache Arrow Support
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

# DataSet
df = spark.range(10000000).selectExpr("id AS a","id AS b")

# Annotate pandas_udf to use Vectorized UDF
@pandas_udf(LongType())
def pandas_plus(a,b):
    return a+b
spark.udf.register("pandas_plus",pandas_plus)

df.selectExpr("count(pandas_plus(a,b))").collect()
```

**執行計畫**

```
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[], functions=[count(pythonUDF0#1082L)], output=[count(pandas_plus(a, b))#1080L])
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#5985]
+- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#1082L)], output=[count#1084L])
+- Project [pythonUDF0#1082L]
+- ArrowEvalPython [pandas_plus(a#1074L, b#1075L)], [pythonUDF0#1082L], 200
+- Project [id#1072L AS a#1074L, id#1072L AS b#1075L]
+- Range (0, 10000000, step=1, splits=16)
```

## Spark SQL
<a name="spark-sql"></a>

範例時間為 0.087 （秒）。

Spark SQL 比量化 UDF 快得多，因為任務會在每個執行器的 JVM 上執行，而沒有 Python 執行時間 。如果您可以使用內建函數取代 UDF，建議您這麼做。

**範例程式碼**

```
df.createOrReplaceTempView("test")
spark.sql("select count(a+b) from test").collect()
```

## 將 panda 用於大數據
<a name="pandas"></a>

如果您已經熟悉 [pandas](https://pandas.pydata.org/docs/) 並想要將 Spark 用於大數據，則可以在 Spark. AWS Glue 4.0 上使用 pandas API 和更新版本支援它。若要開始使用，您可以使用官方筆記本[ Quickstart：Pandas API on Spark](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_ps.html)。如需詳細資訊，請參閱 [PySpark 文件](https://spark.apache.org/docs/latest/api/python/index.html)。