记一次Spark引擎执行Sql超时优化
前几天用spark引擎执行了一个较大的sql,涉及的表和数据量都不少,不同时间段执行了几次都超时,经过上网及分析,尝试解决了此问题,使用spark引擎测试几次大概都在半个小时左右完成,不再出现超时现象一、问题现象摘抄部分现场日志如下:2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGSched
前几天用spark引擎执行了一个较大的sql,涉及的表和数据量都不少,不同时间段执行了几次都超时,经过上网及分析,尝试解决了此问题,使用spark引擎测试几次大概都在半个小时左右完成,不再出现超时现象
一、问题现象
摘抄部分现场日志如下:
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: ShuffleMapStage 28 (run at ThreadPoolExecutor.java:1149) finished in 569.587 s
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: looking for newly runnable stages
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: running: Set(ShuffleMapStage 13, ShuffleMapStage 5, ShuffleMapStage 22, ShuffleMapStage 14)
2022-02-01 13:02:12 INFO 22/02/01 13:02:12 [dag-scheduler-event-loop] INFO DAGScheduler: failed: Set()
Caused by: org.apache.spark.SparkException: Could not execute broadcast in 600 secs. You can increase the timeout for broadcasts via spark.sql.broadcastTimeout or spark.sql.broadcastMaxRetries or disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1
2022-02-01 13:02:14 WARNING spark-sql运行异常!
2022-02-01 13:02:14 WARNING 将会使用hive重试!
二、分析问题
从上述日志中可以看出在ShuffleMapStage阶段,也就是ShuffleRead阶段,在Driver在向各个Executor广播输入数据时候,出现了超时现象,容错机制自动启动了hive引擎重新执行,hive引擎经过数较长一段时间完成了,但也出现了reduce阶段的数据倾斜。
既然出现了超时,要么增加超时时间,设置更长的超时时间,要么重试一下,也可能是当时网络原因导致的,要么在超时时间内增大吞吐处理能力
简单从以下为以下几个方面着手:
1、适当增加超时时间
2、适当增加重试次数
3、加快处理速度
4、禁用BroadcastJoin
看到日志中给出 disable broadcast join by setting spark.sql.autoBroadcastJoinThreshold to -1,spark.sql.autoBroadcastJoinThreshold参数默认值是10M,小于设置的此值小表将不能被广播,将,设置为-1是禁用了BroadcastJoin方式广播
此方式没有尝试过,至于禁用BroadcastJoin后,会使用什么方式join,这就涉及了Spark的几种join实现直接的关系,后续有时间再探讨这个
三、解决方案
根据以上简单分析,大致有以下三种解决方案
1、适当增加超时时间
spark.sql.broadcastTimeout=800
不过可能需要反复多调试几次,因为不知道多长时间合适,也就是常说的以时间换空间的方案
2、适当增加重试次数
spark.sql.broadcastMaxRetries=3
此种适合偶然现象,可能是因为集群中当时太多任务在运行,网络传输较为繁忙,数据输入较慢
3、加快处理速度(推荐)
这里通过优化调整参数来提升处理速度,开启MapJoin参数设置,在Map阶段完成join
(1)开启Mapjoin
hive.auto.convert.join = true;
该参数开启了在map侧join,避免在reduce侧join,默认值true
(2)设置Mapjoin小表的阈值
hive.mapjoin.smalltable.filesize=50000000;
该参数决定在map侧join的具体标准,默认值25000000=
25M,不超此值则开启map侧优化
,具体多少合适需要根据自己表数据量的大小设置并反复调试
(3)开启worker节点缓存
hive.auto.convert.join.noconditionaltask = true;
该参数默认值为true ,用来来将一个小表变成hashtable然后作为分布式缓存文件分发到各个worker节点,进而实现在Map侧的连接优化机制
(4)设置worker节点缓存小表阈值
hive.auto.convert.join.noconditionaltask.size = 15000000;
该参数主要用来决定worker节点缓存的具体标准,默认值10000000=10M。
如果参与连接的n个表(或分区)中的n-1 的大小总和小于这个参数的值,即不超过此值,hive将启用小表在worker缓存,实现Map侧的连接优化机制。
具体多少合适需要根据自己表数据量的大小设置并反复调试
如有不正之处,还请留言指出,感谢批评!
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)