Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Optimización de las funciones definidas por el usuario
Las funciones definidas por el usuario (UDFs) yRDD.map, a menudo, reducen el rendimiento de manera significativa. PySpark Esto se debe a la sobrecarga necesaria para representar con precisión su código Python en la implementación de Scala subyacente de Spark.
El siguiente diagrama muestra la arquitectura de PySpark los trabajos. Cuando lo usas PySpark, el controlador Spark usa la biblioteca Py4j para llamar a los métodos de Java desde Python. Al llamar a Spark SQL o a funciones DataFrame integradas, hay poca diferencia de rendimiento entre Python y Scala porque las funciones se ejecutan en la JVM de cada ejecutor mediante un plan de ejecución optimizado.
Si usa su propia lógica de Python, como map/ mapPartitions/ udf, la tarea se ejecutará en un entorno de tiempo de ejecución de Python. La administración de dos entornos genera un costo general. Además, los datos en memoria deben transformarse para que las funciones integradas del entorno de tiempo de ejecución de JVM puedan utilizarlos. Pickle es un formato de serialización que se utiliza de forma predeterminada para el intercambio entre los tiempos de ejecución de JVM y Python. Sin embargo, el coste de esta serialización y deserialización es muy elevado, por lo que UDFs escribir en Java o Scala es más rápido que en Python. UDFs
Para evitar la sobrecarga de serialización y deserialización, tenga en cuenta lo siguiente: PySpark
-
Usa las funciones integradas de Spark SQL: considera reemplazar tu propia función de mapa o UDF por Spark SQL o funciones integradas. DataFrame Al ejecutar Spark SQL o funciones DataFrame integradas, hay poca diferencia de rendimiento entre Python y Scala porque las tareas se gestionan en la JVM de cada ejecutor.
-
UDFs Impleméntalo en Scala o Java: considera usar una UDF escrita en Java o Scala, ya que se ejecuta en la JVM.
-
Use Apache basado en Arrow para cargas de trabajo vectorizadas: considere usar UDFs el basado en Arrow. UDFs Esta característica también se conoce como UDF vectorizada (UDF de pandas). Apache Arrow
es un formato de datos en memoria independiente del lenguaje que se AWS Glue puede utilizar para transferir datos de manera eficiente entre los procesos de JVM y Python. Actualmente, esto es más beneficioso para los usuarios de Python que trabajan con Pandas o NumPy datos. Arrow es un formato de columnas (vectorizado). Su uso no es automático y puede requerir algunos cambios menores en la configuración o el código para aprovechar al máximo y garantizar la compatibilidad. Para obtener más detalles y conocer las limitaciones, consulte Apache Arrow en PySpark
. En el siguiente ejemplo se compara una UDF incremental básica en Python estándar, como una UDF vectorizada, y en Spark SQL.
UDF de Python estándar
El tiempo del ejemplo son 3,20 (segundos).
Código de ejemplo
# DataSet df = spark.range(10000000).selectExpr("id AS a","id AS b") # UDF Example def plus(a,b): return a+b spark.udf.register("plus",plus) df.selectExpr("count(plus(a,b))").collect()
Plan de ejecución
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(pythonUDF0#124)]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#580] +- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#124)]) +- Project [pythonUDF0#124] +- BatchEvalPython [plus(a#116L, b#117L)], [pythonUDF0#124] +- Project [id#114L AS a#116L, id#114L AS b#117L] +- Range (0, 10000000, step=1, splits=16)
UDF vectorizada
El tiempo del ejemplo son 0,59 (segundos).
La UDF vectorizada es 5 veces más rápida que el ejemplo de la UDF anterior. Si consulta Physical Plan, puede ver ArrowEvalPython, que indica que esta aplicación está vectorizada por Apache Arrow. Para habilitar la UDF vectorizada, debe especificar spark.sql.execution.arrow.pyspark.enabled = true en el código.
Código de ejemplo
# Vectorized UDF from pyspark.sql.types import LongType from pyspark.sql.functions import count, pandas_udf # Enable Apache Arrow Support spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true") # DataSet df = spark.range(10000000).selectExpr("id AS a","id AS b") # Annotate pandas_udf to use Vectorized UDF @pandas_udf(LongType()) def pandas_plus(a,b): return a+b spark.udf.register("pandas_plus",pandas_plus) df.selectExpr("count(pandas_plus(a,b))").collect()
Plan de ejecución
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- HashAggregate(keys=[], functions=[count(pythonUDF0#1082L)], output=[count(pandas_plus(a, b))#1080L]) +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#5985] +- HashAggregate(keys=[], functions=[partial_count(pythonUDF0#1082L)], output=[count#1084L]) +- Project [pythonUDF0#1082L] +- ArrowEvalPython [pandas_plus(a#1074L, b#1075L)], [pythonUDF0#1082L], 200 +- Project [id#1072L AS a#1074L, id#1072L AS b#1075L] +- Range (0, 10000000, step=1, splits=16)
Spark SQL
El tiempo del ejemplo son 0,087 (segundos).
Spark SQL es mucho más rápido que la UDF vectorizada, ya que las tareas se ejecutan en la JVM de cada ejecutor sin un tiempo de ejecución de Python. Si puede reemplazar la UDF por una función integrada, le recomendamos que lo haga.
Código de ejemplo
df.createOrReplaceTempView("test") spark.sql("select count(a+b) from test").collect()
Uso de pandas para macrodatos
Si ya estás familiarizado con los pandas