优化随机排序 -

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

优化随机排序

某些操作(例如 join()groupByKey())需要 Spark 执行随机排序。随机排序是 Spark 的机制,用于重新分配数据以跨 RDD 分区对数据进行不同的分组。随机排序有助于修复性能瓶颈。但是,由于随机排序通常涉及在 Spark 执行程序之间复制数据,因此随机排序是一项复杂且成本高昂的操作。例如,随机排序会产生以下成本:

  • 磁盘 I/O:

    • 在磁盘上生成大量中间文件。

  • 网络 I/O:

    • 需要许多网络连接(连接数 = Mapper × Reducer)。

    • 由于记录会聚合到可能在不同 Spark 执行程序上托管的新 RDD 分区,因此数据集的很大一部分可能会通过网络在 Spark 执行程序之间移动。

  • CPU 和内存负载:

    • 对值进行排序以及合并数据集。这些操作是在执行程序上计划的,这给执行程序带来了沉重的负担。

随机排序是导致 Spark 应用程序性能下降的最重要因素之一。在存储中间数据时,它可能会耗尽执行程序本地磁盘上的空间,从而导致 Spark 作业失败。

你可以通过 CloudWatch 指标和 Spark UI 来评估你的 shuffle 表现。

CloudWatch 指标

如果随机写入字节数的值高于随机读取字节数,则您的 Spark 作业可能会使用随机排序操作,例如 join()groupByKey()

数据跨执行程序随机排序(字节数)图,显示写入随机字节数的峰值。

火花用户界面

在 Spark UI 的阶段选项卡上,您可以查看随机读取大小/记录数值。您也可以在执行程序选项卡中看到。

在以下屏幕截图中,每个执行程序与随机排序流程交换了大约 18.6 GB/4020000 条记录,总随机读取大小约为 75 GB。

随机溢出(磁盘)列显示大量数据溢出内存到磁盘,这可能会导致磁盘已满或性能问题。

""

如果您发现这些迹象,并且该阶段与您的性能目标相比耗时过长,或者出现 Out Of MemoryNo space left on device 错误,请考虑以下解决方案。

优化联接

联接表的 join() 操作是最常用的随机排序操作,但其常常成为性能瓶颈。由于联接是一项昂贵的操作,因此除非其对您的业务需求至关重要,否则我们建议不要使用该操作。通过询问以下问题,仔细核查您的数据管线是否得到高效利用:

  • 您是否正在重新计算在其他可重复使用作业中也执行的联接?

  • 您是否联接以将外键解析为输出使用者未使用的值?

确认您的联接操作对您的业务需求至关重要后,请查看以下选项,以满足要求的方式来优化您的联接。

联接前使用下推

在执行联接 DataFrame 之前,过滤掉中不必要的行和列。此操作具有以下优势:

  • 减少随机排序期间的数据传输量

  • 减少 Spark 执行程序中的处理量

  • 减少数据扫描量

# Default df_joined = df1.join(df2, ["product_id"]) # Use Pushdown df1_select = df1.select("product_id","product_title","star_rating").filter(col("star_rating")>=4.0) df2_select = df2.select("product_id","category_id") df_joined = df1_select.join(df2_select, ["product_id"])

使用 “ DataFrame 加入”

尝试使用 SparkSQL、 DataFrame和数据集等 Spark 高级别 API 来代替 RDD API 或者加入。 DynamicFrame 您可以使用诸如 DynamicFrame 之类 DataFrame 的方法调用转换为dyf.toDF()。如 Apache Spark 中的关键主题一节所述,这些联接操作在内部利用 Catalyst 优化器的查询优化。

随机排序联接、广播哈希联接和提示

Spark 支持两种类型的联接:随机排序联接和广播哈希联接。广播哈希联接不需要随机排序,与随机排序联接相比需要的处理更少。但是,它仅在将小表联接到大表时适用。在联接可容纳单个 Spark 执行程序内存的表时,可以考虑使用广播哈希联接。

下图显示广播哈希联接和随机排序联接的高级别结构和步骤。

广播联接是表与联接表之间的直接连接,而随机排序联接是表与联接表之间有两个随机排序阶段。

每个联接的详细信息如下:

  • 随机排序联接:

    • 随机排序哈希联接在不进行排序的情况下联接两个表,并在两个表之间分配联接。它适用于可以存储在 Spark 执行程序内存中的小型表联接。

    • 排序合并联接按键分配要联接的两个表,并在联接之前对其进行排序。其适用于大型表的联接。

  • 广播哈希联接:

    • 广播哈希联接会将较小的 RDD 或表推送到每个 Worker 节点。然后其对较大的 RDD 或表的每个分区进行映射端组合。

      当您的一个或一个表可以容纳在内存中 RDDs 或可以放入内存中时,它适用于联接。尽可能进行广播哈希联接的优势在于它不需要随机排序。您可以使用联接提示从 Spark 请求广播联接,如下所示。

      # DataFrame from pySpark.sql.functions import broadcast df_joined= df_big.join(broadcast(df_small), right_df[key] == left_df[key], how='inner') -- SparkSQL SELECT /*+ BROADCAST(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

      有关联接提示的更多信息,请参阅 Join hints

在 AWS Glue 3.0 及更高版本中,您可以通过启用自适应查询执行和其他参数来自动利用广播哈希联接。当任一联接端的运行时统计信息小于自适应广播哈希联接阈值时,自适应查询执行会将排序合并联接转换为广播哈希联接。

在 AWS Glue 3.0 中,您可以通过设置启用自适应查询执行spark.sql.adaptive.enabled=true。自适应查询执行在 AWS Glue 4.0 中默认已启用。

您可以设置与随机排序和广播哈希联接相关的其他参数:

  • spark.sql.adaptive.localShuffleReader.enabled

  • spark.sql.adaptive.autoBroadcastJoinThreshold

有关相关参数的更多信息,请参阅 Converting sort-merge join to broadcast join

在 AWS Glue 3.0 及更高版本中,你可以使用 shuffle 的其他联接提示来调整你的行为。

-- Join Hints for shuffle sort merge join SELECT /*+ SHUFFLE_MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGEJOIN(t2) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle hash join SELECT /*+ SHUFFLE_HASH(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle-and-replicate nested loop join SELECT /*+ SHUFFLE_REPLICATE_NL(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

使用分桶

排序合并联接需要两个阶段:随机排序与排序,然后合并。当一些执行程序合并而同时另一些执行程序排序时,这两个阶段可能会使 Spark 执行程序过载,并导致 OOM 和性能问题。在这类情况下,或许可通过使用分桶来高效地联接。分桶将对联接键的输入进行预随机排序和预排序,然后将排序后的数据写入中间表。通过预先定义排序后的中间表,可以降低联接大型表时随机排序和排序步骤的成本。

排序合并联接还有额外的随机排序和排序步骤。

分桶的表可用于以下用途:

  • 数据经常通过同一个键(例如 account_id)联接

  • 加载每日累积表,例如可以基于共用列分桶的基础表和增量表

您可以使用以下代码创建分桶表。

df.write.bucketBy(50, "account_id").sortBy("age").saveAsTable("bucketed_table")

DataFrames 在加入之前对联接键进行重新分区

要在联接之前在联接键 DataFrames 上对两者进行重新分区,请使用以下语句。

df1_repartitioned = df1.repartition(N,"join_key") df2_repartitioned = df2.repartition(N,"join_key") df_joined = df1_repartitioned.join(df2_repartitioned,"product_id")

在启动联接之前,这将在连接键 RDDs 上分成两个(仍然是分开的)。如果两 RDDs 者使用相同的分区代码在同一个密钥上进行分区,RDD 会记录在一起的计划很有可能在改组联接之前位于同一个工作线程上。这可能会减少联接期间的网络活动和数据偏斜,从而提升性能。

克服数据偏斜

数据偏斜是 Spark 作业瓶颈最常见的原因之一。当数据在 RDD 分区之间分布不均匀时,就会发生这种情况。这会导致该分区的任务花费的时间比其他分区长得多,从而延迟了应用程序的总处理时间。

要识别数据偏斜,请在 Spark UI 中评测以下指标:

  • 在 Spark UI 的阶段选项卡上,查看事件时间线页面。您可以在以下屏幕截图中看到任务分布不均匀。分布不均匀或运行耗时过长的任务可能表明存在数据偏斜。

    某一项任务的执行程序计算时间比其他任务长得多。
  • 另一个重要的页面是摘要指标,其显示 Spark 任务的统计信息。以下屏幕截图显示持续时间GC 时间溢出(内存)溢出(磁盘)等指标的百分位数。

    摘要指标表,突出显示“持续时间”行。

    当任务均匀分布时,您将在所有百分位数中看到类似的数字。当出现数据偏斜时,您将在每个百分位数中看到偏差很大的值。在示例中,任务持续时间在最小值第 25 个百分位数中位数第 75 个百分位数中均小于 13 秒。虽然最大值任务处理的数据量是第 75 个百分位数的 100 倍,但其 6.4 分钟的持续时间却长了约 30 倍。这意味着至少一项任务(或最多 25% 的任务)耗时比其他任务长得多。

如果您发现数据偏斜,请尝试以下操作:

  • 如果您使用 AWS Glue 3.0,请通过设置启用自适应查询执行spark.sql.adaptive.enabled=true。在 AWS Glue 4.0 中,自适应查询执行功能默认处于启用状态。

    您还可以通过设置以下相关参数,使用自适应查询执行来处理联接引入的数据偏斜:

    • spark.sql.adaptive.skewJoin.skewedPartitionFactor

    • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

    • spark.sql.adaptive.advisoryPartitionSizeInBytes=128m (128 mebibytes or larger should be good)

    • spark.sql.adaptive.coalescePartitions.enabled=true (when you want to coalesce partitions)

    有关更多信息,请参阅 Apache Spark 文档

  • 联接键使用值范围较大的键。在随机排序联接中,针对键的每个哈希值确定分区。如果联接键的基数过低,则哈希函数更有可能无法完美地将数据分布到各个分区。因此,如果您的应用程序和业务逻辑支持,请考虑使用较高基数的键或复合键。

    # Use Single Primary Key df_joined = df1_select.join(df2_select, ["primary_key"]) # Use Composite Key df_joined = df1_select.join(df2_select, ["primary_key","secondary_key"])

使用缓存

重复使用时 DataFrames,请使用或df.persist()将计算结果缓存在每个 Spark 执行程序的内存和磁盘上,从而避免额外的随机处理df.cache()或计算。Spark 还支持 RDDs 在磁盘上持久化或跨多个节点复制(存储级别)。

例如,您可以 DataFrames 通过添加来保留df.persist()。当不再需要缓存时,您可以使用 unpersist 丢弃缓存的数据。

df = spark.read.parquet("s3://<Bucket>/parquet/product_category=Books/") df_high_rate = df.filter(col("star_rating")>=4.0) df_high_rate.persist() df_joined1 = df_high_rate.join(<Table1>, ["key"]) df_joined2 = df_high_rate.join(<Table2>, ["key"]) df_joined3 = df_high_rate.join(<Table3>, ["key"]) ... df_high_rate.unpersist()

移除不需要的 Spark 操作

避免运行不必要的操作,例如 countshowcollect。正如 Apache Spark 中的关键主题一节所述,Spark 有惰性。每次对转换后的 RDD 运行操作,都可能对其进行重新计算。当您使用许多 Spark 操作时,会为每个操作调用多次数据源访问、任务计算和随机排序运行。

如果您不需要在商业环境中执行 collect() 或其他操作,请考虑将其删除。

注意

尽可能避免在商业环境中使用 Spark collect()collect() 操作将 Spark 执行程序中的所有计算结果返回给 Spark 驱动程序,这可能会导致 Spark 驱动程序返回 OOM 错误。为避免出现 OOM 错误,Spark 默认设置为 spark.driver.maxResultSize = 1GB,将返回给 Spark 驱动程序的最大数据大小限制为 1 GB。