ユーザー定義関数を最適化する -

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ユーザー定義関数を最適化する

PySpark のユーザー定義関数 (UDF) と RDD.map は、パフォーマンスを大幅に低下させることがあります。これは、Spark の基盤となる Scala 実装で Python コードを正確に表現するために必要なオーバーヘッドが原因です。

PySpark ジョブのアーキテクチャを次の図に示します。PySpark を使用すると、Spark ドライバーは Py4j ライブラリを使用して Python から Java メソッドを呼び出します。Spark SQL または DataFrame の組み込み関数を呼び出す場合、関数は最適化された実行計画を使用して各エグゼキュターの JVM 上で実行されるため、Python と Scala の間でパフォーマンスの差はほとんどありません。

Spark コンテキストは Py4J を使用して Spark ドライバーに接続し、ドライバーはワーカーノードに接続します。

map/ mapPartitions/ udf の使用などを使った独自の Python ロジックを使用すると、タスクは Python ランタイム環境で実行されます。2 つの環境を管理することで、オーバーヘッドコストが発生します。さらに、JVM ランタイム環境の組み込み関数で使用するには、メモリ上のデータを変換する必要があります。Pickle は、JVM と Python のランタイム間でデータをやり取りする際に、デフォルトで使用されるシリアル化形式です。ただし、このシリアル化と逆シリアル化のコストは非常に高いため、Java または Scala で記述された UDF の方が、Python の UDF よりも高速です。

PySpark におけるシリアル化と逆シリアル化のオーバーヘッドを回避するには、次の点を検討してください。

  • Spark SQL の組み込み関数を使用する – 独自の UDF やマップ関数を Spark SQL または DataFrame の組み込み関数に置き換えることを検討してください。Spark SQL または DataFrame の組み込み関数を実行する場合、タスクは各エグゼキュターの JVM 上で処理されるため、Python と Scala の間でパフォーマンスの差はほとんどありません。

  • UDF を Scala または Java で実装する – Java または Scala で記述された UDF は JVM 上で実行されるため、それらの言語による実装を検討してください。

  • ベクトル化されたワークロードには Apache Arrow ベースの UDF を使用する – Arrow ベースの UDF の使用を検討してください。この機能は、ベクトル化された UDF (Pandas UDF) とも呼ばれます。Apache Arrow は、JVM と Python プロセス間でデータを効率的に転送するために AWS Glue が使用できる言語に依存しないインメモリデータ形式です。これは現在、Pandas や NumPy のデータを扱う Python ユーザーにとって最も有益です。

    Arrow は列指向 (ベクトル化) 形式です。その使用は自動ではなく、最大限に活用したり互換性を確保したりするには、設定やコードにわずかな変更が必要になる可能性があります。詳細と制限については、「Apache Arrow in PySpark」を参照してください。

    次の例では、標準の Python、ベクトル化された UDF、Spark SQL の基本的な増分 UDF を比較します。

標準の Python UDF

サンプルの時間 3.20 (秒) です。

コードの例

# DataSet df = spark.range(10000000).selectExpr("id AS a","id AS b") # UDF Example def plus(a,b): return a+b spark.udf.register("plus",plus) df.selectExpr("count(plus(a,b))").collect()

実行計画

== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(pythonUDF0#124)]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#580] +- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#124)]) +- Project [pythonUDF0#124] +- BatchEvalPython [plus(a#116L, b#117L)], [pythonUDF0#124] +- Project [id#114L AS a#116L, id#114L AS b#117L] +- Range (0, 10000000, step=1, splits=16)

ベクトル化された UDF

サンプルの時間 0.59 (秒) です。

ベクトル化された UDF は、前の UDF のサンプルと比べて 5 倍高速です。Physical Plan を確認すると、ArrowEvalPython が表示されており、このアプリケーションが Apache Arrow によってベクトル化されていることがわかります。ベクトル化された UDF を有効にするには、コードで spark.sql.execution.arrow.pyspark.enabled = true を指定する必要があります。

コードの例

# Vectorized UDF from pyspark.sql.types import LongType from pyspark.sql.functions import count, pandas_udf # Enable Apache Arrow Support spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # DataSet df = spark.range(10000000).selectExpr("id AS a","id AS b") # Annotate pandas_udf to use Vectorized UDF @pandas_udf(LongType()) def pandas_plus(a,b): return a+b spark.udf.register("pandas_plus",pandas_plus) df.selectExpr("count(pandas_plus(a,b))").collect()

実行計画

== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(pythonUDF0#1082L)], output=[count(pandas_plus(a, b))#1080L]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#5985] +- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#1082L)], output=[count#1084L]) +- Project [pythonUDF0#1082L] +- ArrowEvalPython [pandas_plus(a#1074L, b#1075L)], [pythonUDF0#1082L], 200 +- Project [id#1072L AS a#1074L, id#1072L AS b#1075L] +- Range (0, 10000000, step=1, splits=16)

Spark SQL

サンプルの時間 0.087 (秒) です。

タスクが Python ランタイムを介さずに各エグゼキュターの JVM 上で実行されるため、Spark SQL はベクトル化された UDF よりもはるかに高速です。UDF を組み込み関数に置き換えられる場合は、そうすることをお勧めします。

コードの例

df.createOrReplaceTempView("test") spark.sql("select count(a+b) from test").collect()

ビッグデータに pandas を使用する

pandas にすでに精通していて、ビッグデータに Spark を使用する場合は、Spark. AWS Glue 4.0 で pandas API を使用できます。開始するには、公式ノートブック「Quickstart: Pandas API on Spark」を使用できます。詳細については、PySpark のドキュメントを参照してください。