SparkSQL中控制文件输出数量
Coalesce and Repartition Hint或者spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled为true
在生产中,无论是通过SQL语句或者Scala/Java等代码的方式使用Spark SQL处理数据,在Spark SQL写数据时,往往会遇到生成的小文件过多的问题,而管理这些大量的小文件,是一件非常头疼的事情。
大量的小文件会影响Hadoop集群管理或者Spark在处理数据时的稳定性:
- Spark SQL写Hive或者直接写入HDFS,过多的小文件会对NameNode内存管理等产生巨大的压力,会影响整个集群的稳定运行。
- 容易导致task数过多,如果超过参数spark.driver.maxResultSize的配置(默认1g),会抛出类似如下的异常,影响任务的处理。
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 478 tasks (2026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
当然可以通过调大spark.driver.maxResultSize的默认配置来解决问题,但如果不能从源头上解决小文件问题,以后还可能遇到类似的问题。
下面通过一个例子,Spark SQL写数据时,导致产生分区数"剧增"的典型场景,通过分区数"剧增",以及Spark中task数和分区数的关系等,来倒推小文件过多的可能原因(这里的分区数是指生成的DataSet/RDD的分区数,不是Hive分区表的分区概念):
现象
- 对表test_tab进行写入操作
- t1的分区数是100,t2的分区数是200,union all后生成的tmp分区数是300
- test_tab产生的小文件数基本也在300左右
select * from t1 union all select * from t2 as tmp;
insert overwrite table test_tab select * from tmp;
分析
1)执行上述insert操作时的分区并行度,主要受tmp的分区数(对应一个DataSet)影响,
2)tmp的分区数主要受t1、t2以及union all的影响
3)暂且不考虑t1或t2是物理表还是经过其他处理生成的临时表,它们的分区数是确定的,这里主要看经过union all处理后,生成的tmp的分区数和t1、t2的分区数有何关系?
4)Spark SQL语句中的union all对应到DataSet中即为unionAll算子,底层调用union算子
Spark RDD中的union算子对union产生的新的RDD的分区数是如何受被union的多个RDD的影响的,这里直接给出结论:
RDD在调用union算子时,最终生成的RDD分区数分两种情况:
- union的RDD分区器已定义并且它们的分区器相同
多个符RDD具有相同额分区器,union后产生的RDD的分区器与父RDD相同且分区数也相同。比如,n个RDD的分区器相同且是defined,分区数是m个。那么这n个RDD最终union生成的一个RDD的分区数仍是m,分区器也是相同的- 不满足第一种情况,则通过union生成的RDD的分区数为父RDD的分区数之和。
同样的这种机制也可以套用到Spark SQL中的DataSet上,那么就很好解释了tmp的分区数为什么等于t1和t2的分区数的和。
最后,Spark中一个task处理一个分区从而也会影响最终生成的文件数。
当然上述只是以Spark SQL中的一个场景阐述了小文件产生过多的原因之一(分区数过多)。在数仓建设中,产生小文件过多的原因有很多种,比如:
-
流式处理中,每个批次的处理执行保存操作也会产生很多小文件
-
为了解决数据更新问题,同一份数据保存了不同的几个状态,也容易导致文件数过多
那么如何解决这种小文件的问题呢?
- 通过repartition或coalesce算子控制最后的DataSet的分区数
注意repartition和coalesce的区别。 - 将Hive风格的Coalesce and Repartition Hint 应用到Spark SQL需要注意这种方式对Spark的版本有要求,建议在Spark2.4.X及以上版本使用,示例:
--提示名称不区分大小写
INSERT ... SELECT /*+ COALESCE(numPartitions) */ ...
INSERT ... SELECT /*+ REPARTITION(numPartitions) */ ...
Coalesce Hint减少了分区数,它仅合并分区 ,因此最大程度地减少了数据移动,但须注意内存不足容易OOM。
Repartition Hint可以增加或减少分区数量,它执行数据的完全shuffle,并确保数据平均分配。
repartition增加了一个新的stage,因此它不会影响现有阶段的并行性;相反,coalesce会影响现有阶段的并行性,因为它不会添加新stage。
该写法还支持多个插入查询和命名子查询。
- 小文件定期合并
可以定时通过异步的方式针对Hive分区表的每一个分区中的小文件进行合并操作。
补充
如何在纯写SQL的场景下,如何用Spark SQL做数据导入时候,控制小文件的数量。
-
对于原始数据进行按照分区字段进行shuffle,可以规避小文件问题。但有可能引入数据倾斜的问题;
-
可以通过distribute by ss_sold_date_sk, cast(rand() * N as int),N值可以在文件数量和倾斜度之间做权衡;
-
知道倾斜键的情况下,可以将原始数据分成几个部分处理,不倾斜的按照分区键shuffle,倾斜部分可以按照rand函数来shuffle;
-
对于Spark 2.4 以上版本的用户,也可以使用HINT 详情,链接如下:
https://issues.apache.org/jira/browse/SPARK-24940 -
对于Spark 3.0 以上版本的用户,可以使用自适应查询(AQE)功能,设置spark.sql.adaptive.enabled和spark.sql.adaptive.coalescePartitions.enabled为true,Spark就会在计算过程中自动帮助用户合并小文件,更加方便和智能。
# 自适应执行框架的开关
spark.sql.adaptive.enabled = true
# reduce个数区间最小值
spark.sql.adaptive.minNumPostShufflePartitions = 1
# reduce个数区间最大值
spark.sql.adaptive.maxNumPostShufflePartitions = 500
# 动态调整reduce个数的partition大小依据,如果设置为64 MB,则reduce阶段每个task最少处理64 MB的数据
spark.sql.adaptive.shuffle.targetPostShuffleInputSize = 67108864
# 动态调整reduce个数的partition条数依据,如设置20000000则reduce阶段每个task最少处理20000000条的数据
spark.sql.adaptive.shuffle.targetPostShuffleRowCount = 20000000
spark.sql.auto.repartition = true
# 以下配置是针对join操作进行的性能优化
spark.sql.adaptive.join.enabled = true
spark.sql.adaptive.skewJoin.enabled = true
spark.shuffle.consolidateFiles = true
spark.shuffle.service.enabled = true
spark.sql.adaptive.allowAdditionalShuffle = true
必须要出发shuffle,如果任务中只有map task,需要通过group by 或者distribute 触发shuffle的执行,只有触发shuffle,才能使用adaptive解决小文件问题。
参考文章:
spark sql合并小文件_Spark SQL小文件问题在OPPO的解决方案
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)