Optimización de las mezclas
Algunas operaciones, como join() y groupByKey(), requieren que Spark lleve a cabo una mezcla. La mezcla es el mecanismo de Spark para redistribuir los datos de forma que se agrupen de forma diferente en las particiones de RDD. Las mezclas pueden ayudarlo a corregir los cuellos de botella de rendimiento. Sin embargo, dado que la mezcla suele implicar la copia de datos entre los ejecutores de Spark, se trata de una operación compleja y costosa. Por ejemplo, las mezclas generan los siguientes costos:
-
E/S de disco:
-
Genera una gran cantidad de archivos intermedios en el disco.
-
-
E/S de red:
-
Necesita muchas conexiones de red (número de conexiones =
Mapper × Reducer). -
Como los registros se agregan a nuevas particiones de RDD que podrían alojarse en un ejecutor de Spark diferente, una parte sustancial del conjunto de datos podría moverse entre los ejecutores de Spark a través de la red.
-
-
Carga de CPU y memoria:
-
Ordena los valores y fusiona conjuntos de datos. Estas operaciones se planifican en el ejecutor, lo que supone una gran carga para este.
-
La mezcla es uno de los factores más importantes en la degradación del rendimiento de su aplicación de Spark. Al almacenar los datos intermedios, puede agotar el espacio en el disco local del ejecutor, lo que provoca un error en el trabajo de Spark.
Puede evaluar el rendimiento de la mezcla en las métricas de CloudWatch y en la IU de Spark.
Métricas de CloudWatch
Si el valor de Bytes de mezcla escritos es alto en comparación con Bytes de mezcla leídos, su trabajo de Spark podría utilizar operaciones de mezclajoin() o groupByKey().
Interfaz de usuario de Spark
En la pestaña Etapa de la IU de Spark, puede comprobar los valores de Tamaño de lectura de la mezcla / Registros. También puede verlos en la pestaña Ejecutores.
En la siguiente captura de pantalla, cada ejecutor intercambia aproximadamente 18,6 GB/4 020 000 registros con el proceso de mezcla, lo que supone un tamaño total de lectura de mezcla de unos 75 GB.
En la columna Volcado de mezcla (disco) se muestra una gran cantidad de datos que se vuelca de la memoria al disco, lo que puede provocar que el disco se llene o que se produzca un problema de rendimiento.
Si observa estos síntomas y la etapa tarda demasiado en comparación con sus objetivos de rendimiento, o si se producen los errores Out Of Memory o No space
left on device, considere las siguientes soluciones.
Optimización de la unión
La operación join(), que une tablas, es la operación de mezcla más utilizada, pero suele provocar un cuello de botella de rendimiento. Como la unión es una operación costosa, le recomendamos no utilizarla a menos que sea esencial para los requisitos de su empresa. Haga las siguientes preguntas para comprobar que está haciendo un uso eficiente de su canalización de datos:
-
¿Va a volver a calcular una unión que también se lleva a cabo en otros trabajos que puede reutilizar?
-
¿Va a llevar a cabo una unión para resolver claves externas a valores que no utilizan los consumidores de su resultado?
Tras confirmar que las operaciones de unión son esenciales para los requisitos de su empresa, consulte las siguientes opciones para optimizar la unión de forma que se ajuste a sus requisitos.
Uso de una inserción antes de una unión
Filtre las filas y columnas innecesarias en el DataFrame antes de llevar a cabo una unión. Esto tiene las siguientes ventajas:
-
Reduce la cantidad de transferencia de datos durante la mezcla.
-
Reduce la cantidad de procesamiento en el ejecutor de Spark.
-
Reduce la cantidad de análisis de datos.
# Default df_joined = df1.join(df2, ["product_id"]) # Use Pushdown df1_select = df1.select("product_id","product_title","star_rating").filter(col("star_rating")>=4.0) df2_select = df2.select("product_id","category_id") df_joined = df1_select.join(df2_select, ["product_id"])
Uso de una unión de DataFrame
Intente usar una API de alto nivel de Sparkdyf.toDF(). Como se explica en la sección Temas clave de Apache Spark, estas operaciones de unión aprovechan internamente la optimización de consultas del optimizador Catalyst.
Uniones de mezcla y hash de transmisión y sugerencias
Spark admite dos tipos de unión: unión de mezcla y unión hash de transmisión. Una unión hash de transmisión no requiere mezclas y puede requerir menos procesamiento que una unión de mezcla. Sin embargo, solo se aplica al unir una tabla pequeña a una grande. Al unir una tabla que quepa en la memoria de un solo ejecutor de Spark, considere la posibilidad de usar una unión hash de transmisión.
En el siguiente diagrama se muestran la estructura de alto nivel y los pasos de una unión hash de transmisión y una unión de mezcla.
Los detalles de cada unión son los siguientes:
-
Unión de mezcla:
-
La unión hash de mezcla une dos tablas sin ordenación y distribuye la unión entre las dos tablas. Es adecuada para uniones de tablas pequeñas que se pueden almacenar en la memoria del ejecutor de Spark.
-
La unión de ordenación/combinación distribuye las dos tablas que se van a unir por clave y las ordena antes de unirlas. Es adecuada para unir tablas grandes.
-
-
Unión hash de transmisión:
-
Una unión hash de transmisión envía el RDD o tabla más pequeño a cada uno de los nodos de trabajo. Luego, hace una combinación del lado de asignación con cada partición del RDD o la tabla más grande.
Es adecuada para las uniones cuando uno de los RDD o las tablas puede caber en la memoria o se puede hacer que quepa en la memoria. Siempre que sea posible, es recomendable hacer una unión hash de transmisión, ya que no se necesita una mezcla. Puede usar una sugerencia de unión para solicitar una unión de transmisión desde Spark de la siguiente manera.
# DataFrame from pySpark.sql.functions import broadcast df_joined= df_big.join(broadcast(df_small), right_df[key] == left_df[key], how='inner') -- SparkSQL SELECT /*+ BROADCAST(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;Para obtener más información sobre las sugerencias de unión, consulte Join hints
.
-
En la versión 3.0 y posteriores de AWS Glue, puede aprovechar las uniones hash de transmisión; para ello, habilite la ejecución adaptativa de consultas
En AWS Glue 3.0, puede activar la ejecución adaptativa de consultas mediante la definición de spark.sql.adaptive.enabled=true. La ejecución adaptativa de consultas está habilitada de forma predeterminada en AWS Glue 4.0.
Puede establecer parámetros adicionales relacionados con las mezclas y las uniones hash de transmisión:
-
spark.sql.adaptive.localShuffleReader.enabled -
spark.sql.adaptive.autoBroadcastJoinThreshold
Para obtener más información sobre los parámetros relacionados, consulte Conversión de uniones de ordenación/combinación en uniones de transmisión
En la versión 3.0 o posteriores de AWS Glue, puede usar otras sugerencias de unión para mezclar y así ajustar el comportamiento.
-- Join Hints for shuffle sort merge join SELECT /*+ SHUFFLE_MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGEJOIN(t2) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle hash join SELECT /*+ SHUFFLE_HASH(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle-and-replicate nested loop join SELECT /*+ SHUFFLE_REPLICATE_NL(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;
Uso de la asignación de buckets
La unión de ordenación/combinación requiere dos fases: mezcla y ordenación y, a continuación, fusión. Estas dos fases pueden sobrecargar el ejecutor de Spark y provocar problemas de OOM y rendimiento cuando algunos ejecutores se fusionan y otros se ordenan simultáneamente. En esos casos, podría ser posible llevar a cabo una unión eficiente mediante la asignación de buckets
Las tablas en buckets son útiles para lo siguiente:
-
Datos que se unen con frecuencia a través de la misma clave, como
account_id. -
Carga de tablas acumulativas diarias, como las tablas de base y delta, que podrían agruparse en un bucket en una columna común.
Puede crear una tabla en buckets mediante el siguiente código.
df.write.bucketBy(50, "account_id").sortBy("age").saveAsTable("bucketed_table")
Cómo volver a particionar DataFrames en las claves de unión antes de la unión
Para volver a particionar los dos DataFrames en las claves de unión antes de la unión, utilice las siguientes instrucciones.
df1_repartitioned = df1.repartition(N,"join_key") df2_repartitioned = df2.repartition(N,"join_key") df_joined = df1_repartitioned.join(df2_repartitioned,"product_id")
Esto particionará dos RDD (aún separados) en la clave de unión antes de iniciar la unión. Si los dos RDD se particionan en la misma clave y con el mismo código de partición, es muy probable que los registros de RDD que desee unir estén ubicados en el mismo nodo de trabajo antes de mezclarse para la unión. Esto podría mejorar el rendimiento al reducir la actividad de la red y el sesgo de datos durante la unión.
Cómo superar el sesgo de datos
El sesgo de datos es una de las causas más habituales de los cuellos de botella para los trabajos de Spark. Se produce cuando los datos no se distribuyen uniformemente en las particiones de RDD. Esto hace que las tareas de esa partición tarden mucho más que otras, lo que retrasa el tiempo total de procesamiento de la aplicación.
Para identificar el sesgo de datos, evalúe las siguientes métricas en la IU de Spark:
-
En la pestaña Etapa de la IU de Spark, examine la página Cronograma del evento. Puede ver una distribución desigual de tareas en la siguiente captura de pantalla. Las tareas que se distribuyen de forma desigual o que tardan demasiado en ejecutarse pueden indicar un sesgo de datos.
-
Otra página importante es Métricas de resumen, en la que se muestran las estadísticas de las tareas de Spark. En la siguiente captura de pantalla se muestran las métricas con los percentiles de Duración, Tiempo de GC, Volcado (memoria), Volcado (disco), etc.
Cuando las tareas estén distribuidas uniformemente, verá números similares en todos los percentiles. Cuando los datos estén sesgados, verá valores muy sesgados en cada percentil. En el ejemplo, la duración de la tarea es inferior a 13 segundos en Mínimo, Percentil 25, Mediana y Percentil 75. Si bien la tarea máxima procesó 100 veces más datos que el percentil 75, su duración de 6,4 minutos es aproximadamente 30 veces mayor. Esto significa que al menos una tarea (o hasta un 25 % de las tareas) tardó mucho más tiempo que el resto de las tareas.
Si observa un sesgo de datos, pruebe lo siguiente:
-
Si usa AWS Glue 3.0, puede activar la ejecución adaptativa de consultas mediante la definición de
spark.sql.adaptive.enabled=true. La ejecución adaptativa de consultas está habilitada de forma predeterminada en AWS Glue 4.0.También puede utilizar la ejecución adaptativa de consultas para el sesgo de datos introducido mediante las uniones; para ello, defina los siguientes parámetros relacionados:
-
spark.sql.adaptive.skewJoin.skewedPartitionFactor -
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes -
spark.sql.adaptive.advisoryPartitionSizeInBytes=128m (128 mebibytes or larger should be good) -
spark.sql.adaptive.coalescePartitions.enabled=true (when you want to coalesce partitions)
Para obtener más información, consulte la documentación de Apache Spark
. -
-
Utilice claves con un amplio rango de valores para las claves de unión. En una unión de mezcla, las particiones se determinan para cada valor de hash de una clave. Si la cardinalidad de una clave de unión es demasiado baja, es más probable que la función hash distribuya mal los datos entre las particiones. Por lo tanto, si su aplicación y su lógica empresarial lo admiten, considere la posibilidad de utilizar una clave compuesta o una clave de cardinalidad más alta.
# Use Single Primary Key df_joined = df1_select.join(df2_select, ["primary_key"]) # Use Composite Key df_joined = df1_select.join(df2_select, ["primary_key","secondary_key"])
Uso de la caché
Cuando utilice DataFrames repetitivos, evite la mezcla o los cálculos adicionales mediante df.cache() o df.persist() para almacenar en caché los resultados del cálculo en la memoria de cada ejecutor de Spark y en el disco. Spark también admite la conservación de los RDD en el disco o la replicación en varios nodos (nivel de almacenamiento
Por ejemplo, puede conservar los DataFrames agregando df.persist(). Cuando la caché ya no sea necesaria, puede utilizar unpersist para descartar los datos almacenados en caché.
df = spark.read.parquet("s3://<Bucket>/parquet/product_category=Books/") df_high_rate = df.filter(col("star_rating")>=4.0) df_high_rate.persist() df_joined1 = df_high_rate.join(<Table1>, ["key"]) df_joined2 = df_high_rate.join(<Table2>, ["key"]) df_joined3 = df_high_rate.join(<Table3>, ["key"]) ... df_high_rate.unpersist()
Eliminación de las acciones innecesarias de Spark
Evita ejecutar acciones innecesarias como count, show o collect. Como se explica en la sección Temas clave de Apache Spark, Spark es lento. Cada RDD transformado puede volver a calcularse cada vez que ejecute una acción en él. Cuando utiliza muchas acciones de Spark, se llama a varios accesos a orígenes, cálculos de tareas y ejecuciones de mezcla para cada acción.
Si no necesita collect() ni otras acciones en su entorno comercial, considere la posibilidad de eliminarlas.
nota
Evite usar collect() de Spark en entornos comerciales en la medida de lo posible. La acción collect() devuelve todos los resultados de un cálculo del ejecutor de Spark al controlador de Spark, lo que podría provocar que el controlador de Spark devuelva un error de OOM. Para evitar un error de OOM, Spark define spark.driver.maxResultSize = 1GB de forma predeterminada, lo que limita el tamaño máximo de los datos devueltos al controlador de Spark a 1 GB.