翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
ユーザー定義関数を最適化する
PySpark のユーザー定義関数 (UDF) と RDD.map は、パフォーマンスを大幅に低下させることがあります。これは、Spark の基盤となる Scala 実装で Python コードを正確に表現するために必要なオーバーヘッドが原因です。
PySpark ジョブのアーキテクチャを次の図に示します。PySpark を使用すると、Spark ドライバーは Py4j ライブラリを使用して Python から Java メソッドを呼び出します。Spark SQL または DataFrame の組み込み関数を呼び出す場合、関数は最適化された実行計画を使用して各エグゼキュターの JVM 上で実行されるため、Python と Scala の間でパフォーマンスの差はほとんどありません。
map/ mapPartitions/ udf の使用などを使った独自の Python ロジックを使用すると、タスクは Python ランタイム環境で実行されます。2 つの環境を管理することで、オーバーヘッドコストが発生します。さらに、JVM ランタイム環境の組み込み関数で使用するには、メモリ上のデータを変換する必要があります。Pickle は、JVM と Python のランタイム間でデータをやり取りする際に、デフォルトで使用されるシリアル化形式です。ただし、このシリアル化と逆シリアル化のコストは非常に高いため、Java または Scala で記述された UDF の方が、Python の UDF よりも高速です。
PySpark におけるシリアル化と逆シリアル化のオーバーヘッドを回避するには、次の点を検討してください。
-
Spark SQL の組み込み関数を使用する – 独自の UDF やマップ関数を Spark SQL または DataFrame の組み込み関数に置き換えることを検討してください。Spark SQL または DataFrame の組み込み関数を実行する場合、タスクは各エグゼキュターの JVM 上で処理されるため、Python と Scala の間でパフォーマンスの差はほとんどありません。
-
UDF を Scala または Java で実装する – Java または Scala で記述された UDF は JVM 上で実行されるため、それらの言語による実装を検討してください。
-
ベクトル化されたワークロードには Apache Arrow ベースの UDF を使用する – Arrow ベースの UDF の使用を検討してください。この機能は、ベクトル化された UDF (Pandas UDF) とも呼ばれます。Apache Arrow
は、JVM と Python プロセス間でデータを効率的に転送するために AWS Glue が使用できる言語に依存しないインメモリデータ形式です。これは現在、Pandas や NumPy のデータを扱う Python ユーザーにとって最も有益です。 Arrow は列指向 (ベクトル化) 形式です。その使用は自動ではなく、最大限に活用したり互換性を確保したりするには、設定やコードにわずかな変更が必要になる可能性があります。詳細と制限については、「Apache Arrow in PySpark
」を参照してください。 次の例では、標準の Python、ベクトル化された UDF、Spark SQL の基本的な増分 UDF を比較します。
標準の Python UDF
サンプルの時間 3.20 (秒) です。
コードの例
# DataSet df = spark.range(10000000).selectExpr("id AS a","id AS b") # UDF Example def plus(a,b): return a+b spark.udf.register("plus",plus) df.selectExpr("count(plus(a,b))").collect()
実行計画
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(pythonUDF0#124)]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#580] +- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#124)]) +- Project [pythonUDF0#124] +- BatchEvalPython [plus(a#116L, b#117L)], [pythonUDF0#124] +- Project [id#114L AS a#116L, id#114L AS b#117L] +- Range (0, 10000000, step=1, splits=16)
ベクトル化された UDF
サンプルの時間 0.59 (秒) です。
ベクトル化された UDF は、前の UDF のサンプルと比べて 5 倍高速です。Physical Plan を確認すると、ArrowEvalPython が表示されており、このアプリケーションが Apache Arrow によってベクトル化されていることがわかります。ベクトル化された UDF を有効にするには、コードで spark.sql.execution.arrow.pyspark.enabled = true を指定する必要があります。
コードの例
# Vectorized UDF from pyspark.sql.types import LongType from pyspark.sql.functions import count, pandas_udf # Enable Apache Arrow Support spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # DataSet df = spark.range(10000000).selectExpr("id AS a","id AS b") # Annotate pandas_udf to use Vectorized UDF @pandas_udf(LongType()) def pandas_plus(a,b): return a+b spark.udf.register("pandas_plus",pandas_plus) df.selectExpr("count(pandas_plus(a,b))").collect()
実行計画
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(pythonUDF0#1082L)], output=[count(pandas_plus(a, b))#1080L]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#5985] +- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#1082L)], output=[count#1084L]) +- Project [pythonUDF0#1082L] +- ArrowEvalPython [pandas_plus(a#1074L, b#1075L)], [pythonUDF0#1082L], 200 +- Project [id#1072L AS a#1074L, id#1072L AS b#1075L] +- Range (0, 10000000, step=1, splits=16)
Spark SQL
サンプルの時間 0.087 (秒) です。
タスクが Python ランタイムを介さずに各エグゼキュターの JVM 上で実行されるため、Spark SQL はベクトル化された UDF よりもはるかに高速です。UDF を組み込み関数に置き換えられる場合は、そうすることをお勧めします。
コードの例
df.createOrReplaceTempView("test") spark.sql("select count(a+b) from test").collect()
ビッグデータに pandas を使用する
pandas