前几天用spark引擎执行了一个较大的sql,涉及的表和数据量都不少,不同时间段执行了几次都超时,经过上网及分析,尝试解决了此问题,使用spark引擎测试几次大概都在半个小时左右完成,不再出现超时现象

一、问题现象

摘抄部分现场日志如下:


2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: ShuffleMapStage 28 (run at ThreadPoolExecutor.java:1149) finished in 569.587 s
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: looking for newly runnable stages
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: running: Set(ShuffleMapStage 13, ShuffleMapStage 5, ShuffleMapStage 22, ShuffleMapStage 14)
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: failed: Set()

Caused by: org.apache.spark.SparkException: Could not execute broadcast in 600 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or spark.sql.broadcastMaxRetries or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1

2022-02-01 13:02:14 WARNING spark-sql运行异常!
2022-02-01 13:02:14 WARNING 将会使用hive重试!


二、分析问题

从上述日志中可以看出在ShuffleMapStage阶段,也就是ShuffleRead阶段,在Driver在向各个Executor广播输入数据时候,出现了超时现象,容错机制自动启动了hive引擎重新执行,hive引擎经过数较长一段时间完成了,但也出现了reduce阶段的数据倾斜。

既然出现了超时,要么增加超时时间,设置更长的超时时间,要么重试一下,也可能是当时网络原因导致的,要么在超时时间内增大吞吐处理能力

简单从以下为以下几个方面着手:

1、适当增加超时时间

2、适当增加重试次数

3、加快处理速度

4、禁用BroadcastJoin

看到日志中给出 disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1,spark.sql.autoBroadcastJoinThreshold参数默认值是10M,小于设置的此值小表将不能被广播,将,设置为-1是禁用了BroadcastJoin方式广播

此方式没有尝试过,至于禁用BroadcastJoin后,会使用什么方式join,这就涉及了Spark的几种join实现直接的关系,后续有时间再探讨这个

三、解决方案

根据以上简单分析,大致有以下三种解决方案

1、适当增加超时时间

spark.sql.broadcastTimeout=800

不过可能需要反复多调试几次,因为不知道多长时间合适,也就是常说的以时间换空间的方案

2、适当增加重试次数

spark.sql.broadcastMaxRetries=3

此种适合偶然现象,可能是因为集群中当时太多任务在运行,网络传输较为繁忙,数据输入较慢

3、加快处理速度(推荐)

这里通过优化调整参数来提升处理速度,开启MapJoin参数设置,在Map阶段完成join

(1)开启Mapjoin

hive.auto.convert.join = true;

 该参数开启了在map侧join,避免在reduce侧join,默认值true

(2)设置Mapjoin小表的阈值

hive.mapjoin.smalltable.filesize=50000000;

该参数决定在map侧join的具体标准,默认值25000000=25M,不超此值则开启map侧优化 ,具体多少合适需要根据自己表数据量的大小设置并反复调试

(3)开启worker节点缓存

hive.auto.convert.join.noconditionaltask = true;

该参数默认值为true ,用来来将一个小表变成hashtable然后作为分布式缓存文件分发到各个worker节点,进而实现在Map侧的连接优化机制

(4)设置worker节点缓存小表阈值

hive.auto.convert.join.noconditionaltask.size = 15000000;

该参数主要用来决定worker节点缓存的具体标准,默认值10000000=10M。

如果参与连接的n个表(或分区)中的n-1 的大小总和小于这个参数的值,即不超过此值,hive将启用小表在worker缓存,实现Map侧的连接优化机制。

具体多少合适需要根据自己表数据量的大小设置并反复调试


如有不正之处,还请留言指出,感谢批评!

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐