Spark AQE(Adaptive Query Execution)机制
AQE 的全称是 Adaptive Query Execution,翻译过来是“自适应查询执行”。它包含了 3 个动态优化特性,分别是 Join 策略调整、自动分区合并和自动倾斜处理。
💐💐扫码关注公众号,回复 spark 关键字下载geekbang 原价 90 元 零基础入门 Spark 学习资料💐💐
AQE 的全称是 Adaptive Query Execution,翻译过来是“自适应查询执行”。它包含了 3 个动态优化特性,分别是 Join 策略调整、自动分区合并和自动倾斜处理。使用 AQE 特性需要先把spark.sql.adaptive.enabled 修改为 true 才行。
Join 策略调整
如果基表的存储尺寸小于广播阈值,那么无需开发者显示调用 broadcast 函数,Spark SQL 同样会选择 Broadcast Join 的策略,在基表之上创建广播变量,来完成两张表的数据关联。广播阈值由如下配置项设定,只要基表小于该配置项的设定值,Spark SQL 就会自动选择 Broadcast Join 策略。
广播阈值有了,要比较它与基表存储尺寸谁大谁小,Spark SQL 还要还得事先计算好基表的存储尺寸才行。如果基表数据来自文件系统,那么 Spark SQL 用来与广播阈值对比的基准,就是基表在磁盘中的存储大小。如果基表数据来自 DAG 计算的中间环节,那么 Spark SQL 将参考 DataFrame 执行计划中的统计值,跟广播阈值做对比,如下所示。
val df: DataFrame = _
// 先对分布式数据集加Cache
df.cache.count
// 获取执行计划
val plan = df.queryExecution.logical
// 获取执行计划对于数据集大小的精确预估
val estimated: BigInt = spark
.sessionState
.executePlan(plan)
.optimizedPlan
.stats
.sizeInBytes
Join 策略调整指的就是 Spark SQL 在运行时动态地将原本的 Shuffle Join 策略,调整为执行更加高效的 Broadcast Join。具体来说,每当 DAG 中的 Map 阶段执行完毕,Spark SQL 就会结合 Shuffle 中间文件的统计信息,重新计算 Reduce 阶段数据表的存储大小。如果发现基表尺寸小于广播阈值,那么 Spark SQL 就把下一阶段的 Shuffle Join 调整为 Broadcast Join。通过 Shuffle 中间文件,Spark SQL 可以获得诸如文件尺寸、Map Task 数据分片大小、Reduce Task 分片大小、空文件占比之类的统计信息。正是利用这些统计信息,Spark SQL 才能在作业执行的过程中,动态地调整执行计划。
自动分区合并
Shuffle 的计算过程分为两个阶段:Map 阶段和 Reduce 阶段。Map 阶段的数据分布,往往由分布式文件系统中的源数据决定,因此数据集在这个阶段的分布,是相对均匀的。Reduce 阶段的数据分布则不同,它是由 Distribution Key 和 Reduce 阶段并行度决定的。数据的不均衡往往体现在两个方面,一方面是一部分数据分区的体量过小,而另一方面,则是少数分区的体量极其庞大。
Spark SQL 采用了一种相对朴素的方法,来实现分区合并。具体来说,Spark SQL 事先并不去判断哪些分区是不是足够小,而是按照分区的编号依次进行扫描,当扫描过的数据体量超过了“目标尺寸”时,就进行一次合并。而这个目标尺寸,由以下两个配置项来决定。
其中,开发者可以通过第一个配置项 spark.sql.adaptive.advisoryPartitionSizeInBytes 来直接指定目标尺寸。第二个参数用于限制 Reduce 阶段在合并之后的并行度,避免因为合并导致并行度过低,造成 CPU 资源利用不充分。结合数据集大小与最低并行度,我们可以反推出来每个分区的平均大小,假设我们把这个平均大小记作是 #partitionSize。那么,实际的目标尺寸,取 advisoryPartitionSizeInBytes 设定值与 #partitionSize 之间较小的那个数值。
确定了目标尺寸之后,Spark SQL 就会依序扫描数据分区,当相邻分区的尺寸之和大于目标尺寸的时候,Spark SQL 就把扫描过的分区做一次合并。然后,继续使用这种方式,依次合并剩余的分区,直到所有分区都处理完毕。
自动倾斜处理
自动倾斜处理第一步是检测并判定体量较大的倾斜分区,第二步是把这些大分区拆分为小分区。要做到这两步,Spark SQL 需要依赖如下 3 个配置项。
首先,Spark SQL 对所有数据分区按照存储大小做排序,取中位数作为基数。然后,将中位数乘以 skewedPartitionFactor 指定的比例系数,得到判定阈值。凡是存储尺寸大于判定阈值的数据分区,都有可能被判定为倾斜分区。倾斜分区的判定,还要受到 skewedPartitionThresholdInBytes 参数的限制,它是判定倾斜分区的最低阈值。也就是说,只有那些尺寸大于 skewedPartitionThresholdInBytes 设定值的“候选分区”,才会最终判定为倾斜分区。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)