执行数据量较大的spark任务时经常会出现MetadataFetchFailedException:

org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 10
at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:867)
at org.apache.spark.MapOutputTracker$$anonfun$convertMapStatuses$2.apply(MapOutputTracker.scala:863)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
...

原因分析:

shuffle分为shuffle write和shuffle read两部分。
shuffle write的分区数由上一阶段的RDD分区数控制,shuffle read的分区数则是由Spark提供的一些参数控制。
shuffle write可以简单理解为类似于saveAsLocalDiskFile的操作,将计算的中间结果按某种规则临时放到各个executor所在的本地磁盘上。
shuffle read的时候数据的分区数则是由spark提供的一些参数控制。可以想到的是,如果这个参数值设置的很小,同时shuffle read的量很大,那么将会导致一个task需要处理的数据非常大。结果导致JVM crash,从而导致取shuffle数据失败,同时executor也丢失了,看到Failed to connect to host的错误,也就是executor lost的意思。有时候即使不会导致JVM crash也会造成长时间的gc。

解决思路:

  • 减少shuffle数据
    主要从代码层面着手,可以将不必要的数据在shuffle前进行过滤,比如原始数据有20个字段,只要选取需要的字段进行处理即可,将会减少一定的shuffle数据。

  • 修改分区
    通过spark.sql.shuffle.partitions控制分区数,默认为200,根据shuffle的量以及计算的复杂度适当提高这个值,例如500。

  • 增加失败的重试次数和重试的时间间隔
    通过spark.shuffle.io.maxRetries控制重试次数,默认是3,可适当增加,例如10。
    通过spark.shuffle.io.retryWait控制重试的时间间隔,默认是5s,可适当增加,例如10s。

  • 提高executor的内存
    在spark-submit提交任务时,适当提高executor的memory值,例如15G或者20G。

  • 考虑是否存在数据倾斜的问题

另外可参考其他的优化方式点击此文章https://blog.csdn.net/weixin_44455388/article/details/100932921

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐