大数据精选面试题160道
大数据精选面试题160道01、 Hive和数据库比较Hive 和数据库除了拥有类似的查询语言,再无类似之处。1)数据存储位置Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。2)数据更新Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,3)执行延迟Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据
大数据精选面试题160道
01、 Hive和数据库比较
Hive 和数据库除了拥有类似的查询语言,再无类似之处。
1)数据存储位置
Hive 存储在 HDFS 。数据库将数据保存在块设备或者本地文件系统中。
2)数据更新
Hive中不建议对数据的改写。而数据库中的数据通常是需要经常进行修改的,
3)执行延迟
Hive 执行延迟较高。数据库的执行延迟较低。当然,这个是有条件的,即数据规模较小,当数据规模大到超过数据库的处理能力的时候,Hive的并行计算显然能体现出优势。
4)数据规模
Hive支持很大规模的数据计算;数据库可以支持的数据规模较小。
02、 内部表和外部表
1)内部表又叫管理表:当我们删除一个管理表时,Hive也会删除这个表中数据。管理表不适合和其他工具共享数据。
2)外部表:删除该表并不会删除掉原始数据,删除的是表的元数据
03、Hive中order by,sort by,distribute by和cluster by的区别
Sort By:在同一个分区内排序
Order By:全局排序,只有一个Reducer;
Distrbute By:类似 MapReduce 中Partition,进行分区,一般结合sort by使用。
Cluster By:当 Distribute by 和 Sort by 字段相同时,可以使用Cluster by方式。Cluster by 除了具有 Distribute by 的功能外还兼具 Sort by 的功能。但是只能升序排序,不能指定排序规则为ASC或者DESC。
04、UDF、UDAF、UDTF的区别
当Hive自带的函数无法满足我们的业务处理需求时,hive允许我们自定义函数来满足需求。
根据自定义函数的类别分为以下三种:
UDF:User-Defined-Function,用户自定义函数,数据是一进一出,功能类似于大多数数学函数或者字符串处理函数;
UDAF:User-Defined Aggregation Function,用户自定义聚合函数,数据是多进一出,功能类似于 count/max/min;
UDTF:User-Defined Table-Generating Functions,用户自定义表生成函数,数据是一进多处,功能类似于lateral view explore();
05、Rank排名函数
- RANK() 排序相同时会重复,总数不会变;
- DENSE_RANK() 排序相同时会重复,总数会减少;
- ROW_NUMBER() 根据顺序计算排名。
在实际开发中,以上三个rank函数通常是和开窗函数一起使用的。
06、窗口函数(开窗函数)
OVER():用于指定分析函数工作时的数据窗口大小,这个数据窗口大小可能会随着行的变而变化;
CURRENT ROW:当前行;
n PRECEDING:往前n行数据;
n FOLLOWING:往后n行数据;
UNBOUNDED:起点,UNBOUNDED PRECEDING 表示从前面的起点, UNBOUNDED FOLLOWING表示到后面的终点;
LAG(col,n,default_val):往前第n行数据;
LEAD(col,n, default_val):往后第n行数据;
NTILE(n):把有序分区中的行分发到指定数据的组中,各个组有编号,编号从1开始,对于每一行,NTILE返回此行所属的组的编号。这个函数需要注意:n必须为int类型。
07、行转列函数
7.1、CONCAT(string A/col, string B/col…):返回输入字符串连接后的结果,支持任意个输入字符串。
例如: concat( aa, ‘:’, bb) 就相当于把aa列和bb列用冒号连接起来了,aa:bb。
7.2、CONCAT_WS(separator, str1, str2,…):CONCAT_WS() 代表 CONCAT With Separator ,是CONCAT()的特殊形式。第一个参数是其它参数的分隔符。分隔符的位置放在要连接的两个字符串之间。分隔符可以是一个字符串,也可以是其它参数。如果分隔符为 NULL,则结果为 NULL。函数会忽略任何分隔符参数后的 NULL 值。但是CONCAT_WS()不会忽略任何空字符串。 (然而会忽略所有的 NULL)。
7.3、COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生array类型字段。
08、列转行函数
8.1、EXPLODE(col):将hive某列中复杂的array或者map结构拆分成多行。
8.2、LATERAL VIEW:常和UDTF函数一起使用。
用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias
解释:用于和split, explode等UDTF一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。
09、Hive的数据文件存储格式
texfile:默认的存储格式:普通的文本文件,数据不压缩,磁盘的开销比较大,分析开销大。
sequencefile:提供的一种二进制存储格式,可以切割,天生压缩。
rcfile:提供的是一种行列混合存储方式,该方式会把相近的行和列数据放在一块儿,存储比较耗时,查询效率高,也天生压缩。
orc:是rcfile的一种优化存储。
parquet:自定义输入输出格式
10、Hive中常用的系统函数有哪些
date_add(str,n)、date_sub(str,n) 加减时间
next_day(to_date(str),’MO’) 周指标相关,获取str下周一日期
date_format(str,’yyyy’) 根据格式整理日期
last_day(to_date(str)) 求当月最后一天日期
collect_set(col) 收集数据返回一个以逗号分割的字符串数组
get_json_object(jsondata,object) 解析json,使用object获取对象值
NVL(str,replace) 空字段赋值,str为空返回replace值;两个都为空则返回null
11、什么是消息队列?
- 消息队列就是用于当两个系统之间或者两个模块之间实现消息传递时,基于队列机制实现数据缓存的中间件
12、消息队列有什么好处?
-
实现解耦,将高耦合转换为低耦合
-
通过异步并发,提高性能,并实现最终一致性
13、Kafka是什么?
- Kafka是一个基于订阅发布模式的高性能、高吞吐的实时消息队列系统
14、Kafka在大数据中用于什么场景下?
- 用于实时架构中,实现将实时数据采集的数据进行实时存储,供于SparkStreaming或者Flink等工具实现实时数据消费处理
15、请简述Kafka的集群架构
-
Kafka是一个分布式主从架构集群
- 主节点:Kafka Controller:一种特殊的Broker,由ZK辅助实现从所有Broker中选举,负责集群管理,管理Topic及分区副本等
- 从节点:Kafka Broker:负责实现Kafka集群的数据存储
-
Kafka依赖于Zookeeper实现集群辅助管理
-
基于Zookeeper辅助选举Controller
-
基于Zookeeper存储元数据
-
16、Kafka中消费者与消费者组的关系是什么?
- 消费者组负责订阅Topic,消费者负责消费Topic分区的数据
- 消费者组中可以包含多个消费者,多个消费者共同消费数据,增加消费并行度,提高消费性能
- 消费者组的id由开发者指定,消费者的id由Kafka自动分配
17、Kafka中Topic和Partition是什么,如何保证Partition数据安全?
-
Topic:逻辑上实现数据存储的分类,类似于数据库中的表概念
-
Partition:Topic中用于实现分布式存储的物理单元,一个Topic可以有多个分区
- 每个分区可以存储在不同的节点,实现分布式存储
-
保证数据安全通过副本机制:Kafka中每个分区可以构建多个副本【副本个数 <= 机器的个数】
-
将一个分区的多个副本分为两种角色
-
leader副本:负责对外提供读写请求
-
follower副本:负责与leader同步数据,如果leader故障,follower要重新选举一个成为leader
- 选举:由Kafka Crontroller来决定谁是leader
-
18、Kafka中的Segment是什么?
-
Segment是对分区内部的数据进行更细的划分,分区段,文件段
-
规则:按照文件产生的时间或者大小
-
目的:提高写入和查询性能
- 文件名称可以用于检索数据:用offset命名的
-
组成:每个Segment由两个文件组成
- .log:存储的数据
- .index:对应.log文件的索引信息
19、Kafka中的Offset是什么?
- Offset是kafka中存储数据时给每个数据做的标记或者编号
- 分区级别的编号,每个分区从0开始编号
- 功能:消费者根据offset来进行消费,保证顺序消费以及消费数据的一次性语义
20、请简述如何使用Kafka Simple Java API 实现数据生产?描述具体的类及方法
-
step1:构建生产者连接对象:KafkaProducer
- 需要配置对象:管理配置,例如连接地址:Properties
-
step2:KafkaProducer:send:生产数据到Kafka中
- 需要构建一个生产的数据对象:ProducerRecord
- ProducerRecord(Topic,Value)
- ProducerRecord(Topic,Key,Value)
- ProducerRecord(Topic,Partition,Key,Value)
21、请简述如何使用Kafka Simple Java API 实现数据消费?描述具体的类及方法
-
step1:构建消费者连接对象:KafkaConsumer
- 需要配置对象:管理配置,例如连接地址:Properties
-
step2:消费者需要订阅Topic
- KafkaConsumer:subscribe(List)
-
step3:消费数据
-
KafkaConsumer:poll:实现拉取消费数据
-
ConsumerRecords:拉取到的所有数据集合
-
ConsumerRecord:消费到的每一条数据
- topic:获取数据中的Topic
- partition:获取数据中的分区编号
- offset:获取数据的offset
- key:获取数据中的Key
- value:获取数据中的Value
-
22、请简述Kafka生产数据时如何保证生产数据不丢失?
- acks机制:当接收方收到数据以后,就会返回一个确认的ack消息
- 生产者向Kafka生产数据,根据配置要求Kafka返回ACK
- ack=0:生产者不管Kafka有没有收到,直接发送下一条
- 优点:快
- 缺点:容易导致数据丢失,概率比较高
- ack=1:生产者将数据发送给Kafka,Kafka等待这个分区leader副本写入成功,返回ack确认,生产者发送下一条
- 优点:性能和安全上做了平衡
- 缺点:依旧存在数据丢失的概率,但是概率比较小
- ack=all/-1:生产者将数据发送给Kafka,Kafka等待这个分区所有副本全部写入,返回ack确认,生产者发送下一条
- 优点:数据安全
- 缺点:慢
- 如果使用ack=all,可以搭配min.insync.replicas参数一起使用,可以提高效率
- min.insync.replicas:表示最少同步几个副本以后,就返回ack
- ack=0:生产者不管Kafka有没有收到,直接发送下一条
- 如果生产者没有收到ack,就使用重试机制,重新发送上一条消息,直到收到ack
23、请简述Kafka生产数据时如何保证生产数据不重复?
- 数据重复的场景:Kafka写入数据,返回ack,但是ack丢失,生产者没有收到ack,重新写入数据,导致Kafka数据重复
- Kafka中使用幂等性机制来保证生产数据不重复
- step1:发送数据时,给每条数据增加一个数据id的编号,每次下一条数据的编号自增1
- step2:Kafka将数据写入,并记住写入的数据id
- step3:如果下一条数据的id与上一次的数据id一致,就不写入,直接返回ack
24、Kafka中生产者的数据分区规则是什么,如何自定义分区规则?
- 如果指定了分区:就写入指定的分区
- 如果没有指定分区,就判断是否指定了Key
- 如果指定了Key:根据Key的Hash取余分区
- 如果没有指定Key:根据黏性分区来实现
- 自定义分区
- 开发一个类实现Partitioner接口
- 实现partition方法
- 在生产者中指定分区器的配置
25、Kafka中消费者消费数据的规则是什么?
-
消费者根据Offset对Topic中的分区进行消费
-
第一次消费:根据auto.offset.reset属性进行消费
- latest:从最新的位置开始消费
- earliest:从头开始消费
-
第二次消费:根据上一次的offset+1继续消费
26、如果消费者遇到故障,Kafka怎么保证不重复不丢失?
- Kafka通过消费者commit Offset机制将每个消费者每次消费的位置存储在__consumer_offset中来保证每个消费者如果故障,依旧能从上一次的位置继续消费
27、一个消费者组中有多个消费者,消费多个Topic多个分区,分区分配给消费者的分配规则有哪些?
28、Kafka写入数据过程是什么?
- step1:生产者构建批次,提交给Kafka集群
- step2:Kafka根据分区规则,检索元数据,将请求转发给Leader副本对应Broker
- step3:先写Broker的PageCache
- step4:后台实现将PageCache中顺序写同步到磁盘中:.log文件
- step5:Follower同步Leader副本的数据
29、Kafka读取数据过程是什么?
-
step1:消费者请求读取数据:Topic+Partition+Offset
-
step2:Kafka根据元数据,找到对应分区的leader副本进行检索
-
step3:先检索PageCache
- 如果有,就通过零拷贝机制从PageCache中读取数据
- 如果没有,就读取Segment文件段
-
step4:先根据Offset找到对应的Segment的一对文件
-
step5:先读index,找到offset对应的数据在.log文件中的最近位置
-
step6:根据位置,读取.log文件
30、为什么Kafka读写会很快?
- 写很快
- 应用了PageCache的页缓存机制
- 顺序写磁盘的机制
- 读很快
- 优先基于PageCache内存的读取,使用零拷贝机制
- 按照Offset有序读取每一条
- 构建Segment文件段
- 构建index索引
31、为什么要设计Segment?
- 加快查询效率:将数据划分到多个小文件中,通过offset匹配可以定位某个文件,从小数据量中找到需要的数据
- 提高删除性能:以Segment为单位进行删除,避免以每一条数据进行删除,影响性能
32、什么是AR、ISR、OSR?
-
AR:all replicas
- 所有副本 = ISR + OSR
-
ISR:In-sync-replicas
- 表示正在同步的副本 =》 可用副本分区
- 如果Leader故障,会从ISR中选举一个新的leader
-
OSR:Out-sync-replicas
-
表示不健康的副本 =》 不可用副本
-
判断依据
#如果这个从副本在这个时间内没有与leader副本同步数据,认为这个副本是不正常的 replica.lag.time.max.ms = 10000
-
33、什么是HW、LEO?
- HW:表示当前leader副本中所有Follower都已经同步的位置 + 1,高水位线
- LEO:表示当前leader副本最新的数据位置 + 1
- 消费者能消费到的位置是HW:为了保证消费者消费分区数据的统一性
34、什么是一次性语义?
-
at-most-once:最多一次
-
at-least-once:至少一次
-
exactly-once:有且仅有一次
35、Kafka如何保证消费者消费数据不重复不丢失?
- Kafka消费者通过Offset实现数据消费,只要保证各种场景下能正常实现Offset的记录即可
- 保证消费数据不重复需要每次消费处理完成以后,将Offset存储在外部存储中,例如MySQL、Zookeeper、Redis中
- 保证以消费分区、处理分区、记录分区的offset的顺序实现消费处理
- 如果故障重启,只要从外部系统中读取上一次的Offset继续消费即可
36、Hbase的功能与应用场景?
-
功能:Hbase是一个分布式的、基于分布式内存和HDFS的按列存储的NoSQL数据库
-
应用:Hbase适合于需要实时的对大量数据进行快速、随机读写访问的场景
37、Hbase有什么特点?
-
分布式的,可以实现高并发的数据读写
-
上层构建分布式内存,可以实现高性能、随机、实时的读写
-
底层基于HDFS,可以实现大数据
-
按列存储,基于列实现数据存储,灵活性更高
38、Hbase设计思想是什么?
-
设计思想、冷热数据分离,Hbase将新数据直接写入内存中,如果内存中存储的数据过多,就将内存的数据写入HDFS
-
热数据是指刚产生的数据,先写内存,大概率的情况下,可以直接从内存中读取
-
冷数据是指先产生的数据,将内存中产生很久的数据写入HDFS中,被读取的概率较小
-
39、Hbase与HDFS的区别是什么?
-
Hbase是一个高性能实时随机读写数据的数据库存储系统,用于实现实时数据存储
-
HDFS是一个分布式离线大数据文件存储系统,用于实现离线的文件存储
40、Hbase与MySQL的区别是什么?
- Hbase是分布式NoSQL数据库,可以实现高性能的大数据存储
- MySQL是RDBMS关系型数据库,只能实现小数据量的结构化数据存储
41、Hbase与Hive的区别是什么?
- Hive是通过构建元数据,映射HDFS文件构建成表,本质还是HDFS,实现离线大数据仓库
- Hbase是通过构建上层分布式内存,底层HDFS,实现大数据实时存储的NoSQL数据库
42、Hbase的按列存储是什么?
-
Hbase按列存储的设计是指Hbase中的最小操作单元是列,可以实现对每一行的每一列进行读写
-
每一行的列都是动态的,每一行可以拥有不同的列
43、请简述Namespace、Rowkey、ColumnFamily及多版本的功能及含义
-
Namespace:命名空间,类似于数据库的设计,用于区分不同的业务表
-
Rowkey:行健,类似于主键的设计,唯一标识一条数据并且作为Hbase中的唯一索引
-
ColumnFamily:列族,用于将列进行分组,底层用于区分存储不同的列,提高查询性能
-
多版本:Hbase中允许一列存储多个版本的值,并通过数据写入的时间戳来区分不同版本
44、请简述Hbase的分布式主从架构
- 主节点:HMaster:管理节点、负责管理集群的从节点、元数据以及所有Region的分配
- 从节点:HRegionServer:存储节点,负责实现所有数据的存储,管理Region,构建分布式内存
45、请简述Table表与RegionServer的关系
- Table是Hbase中的表对象,一张表可以划分为多个Region分区
- RegionServer是Hbase中实现数据存储的节点,负责存储每个Region
46、表的Region的划分规则及数据写入分区的规则是什么?
- Region划分规则:范围划分,一张表可以在Rowkey行的方向上划分多个Region,每个Region构成一段连续的区间
- 数据划分规则:根据Rowkey属于哪个Region的范围,就将这条数据写入哪个Region分区中
47、Region的内部存储结构是什么?
- 每个RegionServer中管理多个Region
- 每个Region中根据列族划分多个Store
- 每个Store中有1个memstore和多个StoreFile文件
- 数据写入memstore中,如果达到内存阈值,memstore中的数据将写入StoreFile
48、什么是热点问题?
-
现象:在某个时间段内,大量的读写请求全部集中在某个Region中,导致这台RegionServer的负载比较高,其他的Region和RegionServer比较空闲
-
问题:这台RegionServer故障的概率就会增加,整体性能降低,效率比较差
-
原因:本质上的原因,数据分配不均衡
-
情况
-
一张表只有一个Region
-
一张表有多个Region,但是Rowkey是连续产生的
-
49、怎么解决热点问题?
- 合理的设计Rowkey,构建不连续的Rowkey
- 根据Rowkey的前缀,为表划分多个Region
50、Rowkey如何设计,设计规则是什么?
- 业务原则:贴合业务,保证前缀是最常用的查询字段
- 唯一原则:每条rowkey唯一表示一条数据
- 组合原则:常用的查询条件组合作为Rowkey
- 散列原则:rowkey构建不能连续
- 长度原则:满足业务需求越短越好
51、列族设计规则是什么?
- 个数原则:如果列的个数比较多,建议2 ~ 3个,如果列的个数比较少,建议1个
- 列族个数多了,导致比较次数变多,降低性能
- 列族个数少了,导致列的比较次数变多,降低性能
- 长度原则 :能满足业务需求的情况下,越短越好
52、Hive on Hbase的实现原理是什么?
- Hive on Hbase的原理是通过MapReduce实现对Hbase数据的读写
- MapReduce中提供了TableInputFormat读取Hbase数据,TableOutputFormat写入数据到Hbase
53、Phoenix是什么?
- Phoenix是一个专门为Hbase设计的SQL on Hbase的工具
- 底层通过Hbase API和大量的协处理器实现
- 可以实现基于SQL访问Hbase以及构建维护二级索引等功能
54、什么是二级索引?为什么要构建二级索引
- 二级索引指的是基于一级索引之上再构建一层索引
- Hbase使用Rowkey作为唯一索引,只有使用Rowkey前缀进行查询,才走索引查询
- 导致大部分的查询都是不走索引,性能比较差
- 通过建立二级索引,可以通过走两次索引代替全表扫描,加快查询速度
55、Phoenix实现二级索引时,可以构建哪些索引类型?
- 全局索引
- 覆盖索引
- 本地索引
- 函数索引
56、什么是全局索引?
-
创建全局索引,会自动构建一张索引表
-
索引表结构
- Rowkey:索引字段+原表的rowkey
- 列:占位置x
-
特点:如果查询字段或者查询条件不是索引字段,就不会走索引
-
应用:适合于读多写少
57、什么是覆盖索引?
-
创建覆盖索引,会自动构建一张索引表
-
索引表结构
- Rowkey:索引字段+原表的rowkey
- 列:将include中的列放入索引表
-
特点
-
如果查询字段或者查询条件不是索引字段,就不会走索引
-
如果查询的字段在索引表中,直接从索引表返回结果
-
58、什么是本地索引?
-
创建覆盖索引,会自动基于原表构建一个列族来实现索引存储
-
原表的数据中:多了一个索引列族
-
特点
- 不论查询字段是否是索引字段,都会走索引
- 将索引与数据存储在同一台RegionServer,提高索引读写性能
-
注意
- 本地索引会修改原数据表,对于本地索引只能使用Phoenix来操作表的数据
- 盐表不能使用本地索引
59、请简述Hbase写入数据的流程
-
step1:获取元数据
- 客户端请求Zookeeper,获取meta表所在的regionserver的地址
- 读取meta表的数据:获取所有表的元数据
-
step2:找到对应的Region
- 根据meta表中的元数据,找到表对应的所有的region
- 根据region的范围和写入的Rowkey,判断需要写入具体哪一个Region
- 根据region的Regionserver的地址,请求对应的RegionServer
-
step3:写入数据
-
请求RegionServer写入对应Region:根据Region的名称来指定写入哪个Region
-
根据列族判断写入哪一个具体的Store
- 先写入WAL:Hlog预写日志中
-
写入对应Store的MemStore中
- MemStore
-
60、请简述Hbase读取数据的流程
- step1:获取元数据
- 客户端请求Zookeeper,获取meta表所在的regionserver的地址
- 读取meta表的数据
- 注意:客户端会缓存meta表的数据,只有第一次会连接ZK,读取meta表的数据,缓存会定期失效,要重新缓存
- 避免每次请求都要先连接zk,再读取meta表
- step2:找到对应的Region
- 根据meta表中的元数据,找到表对应的region
- 根据region的范围和写入的Rowkey,判断需要写入具体哪一个Region
- 根据region的Regionserver的地址,请求对应的RegionServer
- step3:读取数据
- 先查询memstore
- 如果查询的列族开启了缓存机制,就读取BlockCache
- 如果没有,就读取StoreFile,并将结果放入BlockCache中
61、请简述LSM模型的设计思想
- step1:数据写入的时候,只写入内存
- step2:将数据在内存构建有序,当数据量大的时候,将有序的数据写入磁盘,变成一个有序的数据文件
- step3:基于所有有序的小文件进行合并,合并为一个整体有序的大文件
62、什么是Flush,什么时候会触发Flush?
-
Flush是指将memstore中的数据写入HDFS,变成StoreFile
-
2.0之前:判断memstore存储大小,单个memstore达到128M就会触发Flush,或者整个memstore达到95%就会触发
-
2.0之后:根据平均每个memstore的存储大小与16M取最大值计算水位线,高于水位线就Flush,不高于就不Flush,都不高于全部Flush
63、什么是Compaction,什么时候会触发Compaction?
- Compaction的功能是将多个单独有序StoreFile文件进行合并,合并为整体有序的大文件并且删除过期数据,加快读取速度
- 2.0之前:通过minor compaction和major compaction来实现
- minor compaction:用于合并最早生成的几个小文件,不清理过期数据
- major compaction:用于将所有storefile合并为一个StoreFile,并清理过期数据
- 2.0之后:除了minor compaction和major compaction,添加了in-memory-compaction
- In-memory compaction:在内存中进行合并,合并以后的结果再进行flush,有四种配置
- none:不开启
- basic:开启,但是合并时不删除过期数据
- eager:开启,合并时并清理删除过期数据
- adaptive:开启,并在合并时根据数据量来自动判断是否清理过期数据
- In-memory compaction:在内存中进行合并,合并以后的结果再进行flush,有四种配置
64、什么是Spit,什么时候会触发Split?
-
Split是指当一个Region存储的数据过多,导致这个Region的负载比较高,Hbase中设定了一个Region最多存储的数据量的阈值,一旦达到阈值,允许Region分裂为两个region,老的region会下线,新的两个region对外提供服务
-
0.94之前:ConstantSizeRegionSplitPolicy
- 只要region中的storefile达到10G,就分裂
-
2.0之前:IncreasingToUpperBoundRegionSplitPolicy
- 根据Region个数来实现计算,当达到4个region以后,也是按照10GB来分裂
-
2.0之后:SteppingSplitPolicy
- Region个数等于1个:按照256M来分裂
- Region个数超过1个:按照10GB来分裂
65、MapReduce读取Hbase数据的原理及返回值是什么?
-
MapReduce读取Hbase原理:封装了一个TableInputFormat来实现读取Hbase的数据
-
返回值
-
每个Region对应一个分片,每个分片启动一个MapTask进行处理
-
每个Rowkey的数据变成一个KV对
-
K是Rowkey的字节对象:ImmutableBytesWriable类型
-
V是Rowkey的数据对象:Result类型
-
66、MapReduce写入Hbase的原理和要求是什么?
- MapReduce写入Hbase原理:封装了一个TableOutputFormat来实现写入Hbase的数据
- 要求
- 写入Hbase的数据的V的类型必须为Put类型
67、什么是BulkLoad,用于什么场景?
- BulkLoad是指将数据直接转换为StoreFile文件,放入Hbase中,不经过Hbase的内存,避免大量数据进入内存,又从内存进入HDFS
- 应用:大数据量批量写入Hbase
68、协处理器是什么?Hbase中提供了几种协处理器?
- 协处理器指的是Hbase提供了一些开发接口,可以自定义开发一些功能集成到Hbase中
- 类似于Hive中的UDF,当没有这个功能时,可以使用协处理器来自定义开发,让Hbase支持对应的功能
- 协处理器分为两类
- Observer:观察者类,类似于监听器的实现
- Endpoint:终端者类,类似于存储过程的实现
69、Hbase常见优化有哪些?
- 内存优化:针对于不同的读写场景,合理的调整Memstore和BlockCache的比例大小
- 压缩优化:对列族配置压缩存储,减少IO消耗
- 布隆过滤:基于数据文件构建布隆索引,加快数据查询
- 提高客户端缓存空间、指定每次扫描的行数、设置合适的GC算法等
- 其他优化
- Linux句柄数优化,提高Linux线程、文件通道等资源句柄数
- HDFS句柄数优化:提高文件打开线程数、提高连接超时时间
- Zookeeper优化:优化连接超时时间
70、为什么Kafka不支持读写分离?
在 Kafka 中,生产者写入消息、消费者读取消息的操作都是与 leader 副本进行交互的,从 而实现的是一种主写主读的生产消费模型。
Kafka 并不支持主写从读,因为主写从读有 2 个很明 显的缺点:
(1)数据一致性问题。数据从主节点转到从节点必然会有一个延时的时间窗口,这个时间 窗口会导致主从节点之间的数据不一致。某一时刻,在主节点和从节点中 A 数据的值都为 X, 之后将主节点中 A 的值修改为 Y,那么在这个变更通知到从节点之前,应用读取从节点中的 A 数据的值并不为最新的 Y,由此便产生了数据不一致的问题。
(2)延时问题。类似 Redis 这种组件,数据从写入主节点到同步至从节点中的过程需要经 历网络→主节点内存→网络→从节点内存这几个阶段,整个过程会耗费一定的时间。而在 Kafka 中,主从同步会比 Redis 更加耗时,它需要经历网络→主节点内存→主节点磁盘→网络→从节 点内存→从节点磁盘这几个阶段。对延时敏感的应用而言,主写从读的功能并不太适用。
71、Spark为什么要持久化,一般什么场景下要进行persist操作?
为什么要进行持久化?
spark所有复杂一点的算法都会有persist身影,spark默认数据放在内存,spark很多内容都是放在内存的,非常适合高速迭代,1000个步骤只有第一个输入数据,中间不产生临时数据,但分布式系统风险很高,所以容易出错,就要容错,rdd出错或者分片可以根据血统算出来,如果没有对父rdd进行persist 或者cache的化,就需要重头做。
以下场景会使用persist
1)某个步骤计算非常耗时,需要进行persist持久化
2)计算链条非常长,重新恢复要算很多步骤,很好使,persist
3)checkpoint所在的rdd要持久化persist。checkpoint前,要持久化,写个rdd.cache或者rdd.persist,将结果保存起来,再写checkpoint操作,这样执行起来会非常快,不需要重新计算rdd链条了。checkpoint之前一定会进行persist。
4)shuffle之后要persist,shuffle要进性网络传输,风险很大,数据丢失重来,恢复代价很大
5)shuffle之前进行persist,框架默认将数据持久化到磁盘,这个是框架自动做的。
72、介绍一下join操作优化经验?
join其实常见的就分为两类: map-side join 和 reduce-side join。当大表和小表join时,用map-side join能显著提高效率。将多份数据进行关联是数据处理过程中非常普遍的用法,不过在分布式计算系统中,这个问题往往会变的非常麻烦,因为框架提供的 join 操作一般会将所有数据根据 key 发送到所有的 reduce 分区中去,也就是 shuffle 的过程。造成大量的网络以及磁盘IO消耗,运行效率极其低下,这个过程一般被称为 reduce-side-join。如果其中有张表较小的话,我们则可以自己实现在 map 端实现数据关联,跳过大量数据进行 shuffle 的过程,运行时间得到大量缩短,根据不同数据可能会有几倍到数十倍的性能提升。
备注:这个题目面试中非常非常大概率见到,务必搜索相关资料掌握,这里抛砖引玉。
73、描述Yarn执行一个任务的过程?
1)客户端client向ResouceManager提交Application,ResouceManager接受Application并根据集群资源状况选取一个node来启动Application的任务调度器driver(ApplicationMaster)。
2)ResouceManager找到那个node,命令其该node上的nodeManager来启动一个新的 JVM进程运行程序的driver(ApplicationMaster)部分,driver(ApplicationMaster)启动时会首先向ResourceManager注册,说明由自己来负责当前程序的运行。
3)driver(ApplicationMaster)开始下载相关jar包等各种资源,基于下载的jar等信息决定向ResourceManager申请具体的资源内容。
4)ResouceManager接受到driver(ApplicationMaster)提出的申请后,会最大化的满足 资源分配请求,并发送资源的元数据信息给driver(ApplicationMaster)。
5)driver(ApplicationMaster)收到发过来的资源元数据信息后会根据元数据信息发指令给具体机器上的NodeManager,让其启动具体的container。
6)NodeManager收到driver发来的指令,启动container,container启动后必须向driver(ApplicationMaster)注册。
7)driver(ApplicationMaster)收到container的注册,开始进行任务的调度和计算,直到 任务完成。
注意:如果ResourceManager第一次没有能够满足driver(ApplicationMaster)的资源请求 ,后续发现有空闲的资源,会主动向driver(ApplicationMaster)发送可用资源的元数据信息以提供更多的资源用于当前程序的运行。
74、Spark on Yarn 模式有哪些优点?
1)与其他计算框架共享集群资源(Spark框架与MapReduce框架同时运行,如果不用Yarn进行资源分配,MapReduce分到的内存资源会很少,效率低下);资源按需分配,进而提高集群资源利用等。
2)相较于Spark自带的Standalone模式,Yarn的资源分配更加细致。
3)Application部署简化,例如Spark,Storm等多种框架的应用由客户端提交后,由Yarn负责资源的管理和调度,利用Container作为资源隔离的单位,以它为单位去使用内存,cpu等。
4)Yarn通过队列的方式,管理同时运行在Yarn集群中的多个服务,可根据不同类型的应用程序负载情况,调整对应的资源使用量,实现资源弹性管理。
75、谈谈你对container的理解?
1)Container作为资源分配和调度的基本单位,其中封装了的资源如内存,CPU,磁盘,网络带宽等。 目前yarn仅仅封装内存和CPU
2)Container由ApplicationMaster向ResourceManager申请的,由ResouceManager中的资源调度器异步分配给ApplicationMaster
3)Container的运行是由ApplicationMaster向资源所在的NodeManager发起的,Container运行时需提供内部执行的任务命令
76、Spark使用parquet文件存储格式能带来哪些好处?
1)如果说HDFS是大数据时代分布式文件系统首选标准,那么parquet则是整个大数据时代文件存储格式实时首选标准。
2)速度更快:从使用spark sql操作普通文件CSV和parquet文件速度对比上看,绝大多数情况会比使用csv等普通文件速度提升10倍左右,在一些普通文件系统无法在spark上成功运行的情况下,使用parquet很多时候可以成功运行。
3)parquet的压缩技术非常稳定出色,在spark sql中对压缩技术的处理可能无法正常的完成工作(例如会导致lost task,lost executor)但是此时如果使用parquet就可以正常的完成。
4)极大的减少磁盘I/o,通常情况下能够减少75%的存储空间,由此可以极大的减少spark sql处理数据的时候的数据输入内容,尤其是在spark1.6x中有个下推过滤器在一些情况下可以极大的减少磁盘的IO和内存的占用,(下推过滤器)。
5)spark 1.6x parquet方式极大的提升了扫描的吞吐量,极大提高了数据的查找速度spark1.6和spark1.5x相比而言,提升了大约1倍的速度,在spark1.6X中,操作parquet时候cpu也进行了极大的优化,有效的降低了cpu消耗。
6)采用parquet可以极大的优化spark的调度和执行。我们测试spark如果用parquet可以有效的减少stage的执行消耗,同时可以优化执行路径。
77、介绍parition和block有什么关联关系?
1)hdfs中的block是分布式存储的最小单元,等分,可设置冗余,这样设计有一部分磁盘空间的浪费,但是整齐的block大小,便于快速找到、读取对应的内容;
2)Spark中的partion是弹性分布式数据集RDD的最小单元,RDD是由分布在各个节点上的partion组成的。partion是指的spark在计算过程中,生成的数据在计算空间内最小单元,同一份数据(RDD)的partion大小不一,数量不定,是根据application里的算子和最初读入的数据分块数量决定;
3)block位于存储空间、partion位于计算空间,block的大小是固定的、partion大小是不固定的,是从2个不同的角度去看数据。
78、Spark应用程序的执行过程是什么?
1)构建Spark Application的运行环境(启动SparkContext),SparkContext向资源管理器(可以是Standalone、Mesos或YARN)注册并申请运行Executor资源;
2)资源管理器分配Executor资源并启动StandaloneExecutorBackend,Executor运行情况将随着心跳发送到资源管理器上;
3)SparkContext构建成DAG图,将DAG图分解成Stage,并把Taskset发送给Task Scheduler。Executor向SparkContext申请Task,Task Scheduler将Task发放给Executor运行同时SparkContext将应用程序代码发放给Executor;
4)Task在Executor上运行,运行完毕释放所有资源。
79、不需要排序的hash shuffle是否一定比需要排序的sort shuffle速度快?
不一定,当数据规模小,Hash shuffle快于Sorted Shuffle数据规模大的时候;当数据量大,sorted Shuffle会比Hash shuffle快很多,因为数量大的有很多小文件,不均匀,甚至出现数据倾斜,消耗内存大,1.x之前spark使用hash,适合处理中小规模,1.x之后,增加了Sorted shuffle,Spark更能胜任大规模处理了。
80、Sort-based shuffle的缺陷?
1)如果mapper中task的数量过大,依旧会产生很多小文件,此时在shuffle传递数据的过程中reducer段,reduce会需要同时大量的记录进行反序列化,导致大量的内存消耗和GC的巨大负担,造成系统缓慢甚至崩溃。
2)如果需要在分片内也进行排序,此时需要进行mapper段和reducer段的两次排序。
81、spark.storage.memoryFraction参数的含义,实际生产中如何调优?
1)用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6,,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘;
2)如果持久化操作比较多,可以提高spark.storage.memoryFraction参数,使得更多的持久化数据保存在内存中,提高数据的读取性能,如果shuffle的操作比较多,有很多的数据读写操作到JVM中,那么应该调小一点,节约出更多的内存给JVM,避免过多的JVM gc发生。在web ui中观察如果发现gc时间很长,可以设置spark.storage.memoryFraction更小一点。
82、Spark有哪两种算子?
Transformation(转化)算子和Action(执行)算子。
83、Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?
在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。
这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。
84、如何从Kafka中获取数据?
1)基于Receiver的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存
中的,然后Spark Streaming启动的job会去处理那些数据。
2)基于Direct的方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地
查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来
获取Kafka指定offset范围的数据。
85、RDD创建有哪几种方式?
1)使用程序中的集合创建rdd
2)使用本地文件系统创建rdd
3)使用hdfs创建rdd
4)基于数据库db创建rdd
5)基于Nosql创建rdd,如hbase
6)基于s3创建rdd
7)基于数据流,如socket创建rdd
86、Spark并行度怎么设置比较合适?
spark并行度,每个core承载24个partition,如,32个core,那么64128之间的并行度,也就是设置64~128个partion,并行读和数据规模无关,
只和内存使用量和cpu使用时间有关。
87、Spark如何处理不能被序列化的对象?
将不能序列化的内容封装成object。
88、collect功能是什么,其底层是怎么实现的?
driver通过collect把集群中各个节点的内容收集过来汇总成结果,collect返回结果是Array类型的,collect把各个节点上的数据抓过来,
抓过来数据是Array型,collect对Array抓过来的结果进行合并,合并后Array中只有一个元素,是tuple类型(KV类型的)的。
89、为什么Spark Application在没有获得足够的资源,job就开始执行了,可能会导致什么什么问题发生?
会导致执行该job时候集群资源不足,导致执行job结束也没有分配足够的资源,分配了部分Executor,该job就开始执行task,应该是task的调度线程
和Executor资源申请是异步的;如果想等待申请完所有的资源再执行job的:
需要将
spark.scheduler.maxRegisteredResourcesWaitingTime设置的很大;
spark.scheduler.minRegisteredResourcesRatio 设置为1,但是应该结合实际考虑
否则很容易出现长时间分配不到资源,job一直不能运行的情况。
90、map与flatMap的区别?
map:对RDD每个元素转换,文件中的每一行数据返回一个数组对象。
flatMap:对RDD每个元素转换,然后再扁平化。
将所有的对象合并为一个对象,文件中的所有行数据仅返回一个数组对象,会抛弃值为null的值。
91、Spark on Mesos中,什么是的粗粒度分配,什么是细粒度分配,各自的优点和缺点是什么?
1)粗粒度:启动时就分配好资源, 程序启动,后续具体使用就使用分配好的资源,不需要再分配资源;优点:作业特别多时,资源复用率高,适合粗粒度;
缺点:容易资源浪费,假如一个job有1000个task,完成了999个,还有一个没完成,那么使用粗粒度,999个资源就会闲置在那里,资源浪费。
2)细粒度分配:用资源的时候分配,用完了就立即回收资源,启动会麻烦一点,启动一次分配一次,会比较麻烦。
92、driver的功能是什么?
1)一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且有SparkContext的实例,是程序的入口点;
2)功能:负责向集群申请资源,向master注册信息,负责了作业的调度,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,
TaskScheduler。
93、Spark技术栈有哪些组件,每个组件都有什么功能,适合什么应用场景?
可以画一个这样的技术栈图先,然后分别解释下每个组件的功能和场景
1)Spark core:是其它组件的基础,spark的内核,主要包含:有向循环图、RDD、Lingage、Cache、broadcast等,并封装了底层通讯框架,
是Spark的基础。
2)SparkStreaming是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kafka、Flume、Twitter、Zero和TCP 套接字)
进行类似Map、Reduce和Join等复杂操作,将流式计算分解成一系列短小的批处理作业。
3)Spark sql:Shark是SparkSQL的前身,Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,
同时进行更复杂的数据分析。
4)BlinkDB :是一个用于在海量数据上运行交互式 SQL 查询的大规模并行查询引擎,它允许用户通过权衡数据精度来提升查询响应时间,其数据的精度
被控制在允许的误差范围内。
5)MLBase是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase。
MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。
6)GraphX是Spark中用于图和图并行计算。
94、Spark中Worker的主要工作是什么?
主要功能:管理当前节点内存,CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头,
管理分配新进程,做计算的服务,相当于process服务。
需要注意的是:
1)worker会不会汇报当前信息给master,worker心跳给master主要只有workid,它不会发送资源信息以心跳的方式给mater,master分配的时候就知道work,
只有出现故障的时候才会发送资源。
2)worker不会运行代码,具体运行的是Executor是可以运行具体appliaction写的业务逻辑代码,操作代码的节点,它不会运行程序的代码的。
95、Mapreduce和Spark的都是并行计算,那么他们有什么相同和区别?
两者都是用mr模型来进行并行计算:
1)hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。
2)spark用户提交的任务成为application,一个application对应一个SparkContext,app中存在多个job,每触发一次action操作就会产生一个job。
这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,
组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存
进行计算。
3)hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系。
4)spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错。
96、RDD机制?
rdd分布式弹性数据集,简单的理解成一种数据结构,是spark框架上的通用货币。 所有算子都是基于rdd来执行的,不同的场景会有不同的rdd实现类,
但是都可以进行互相转换。rdd执行过程中会形成dag图,然后形成lineage保证容错性等。 从物理的角度来看rdd存储的是block和node之间的映射。
97、什么是RDD宽依赖和窄依赖?
RDD和它依赖的parent RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)
1)窄依赖指的是每一个parent RDD的Partition最多被子RDD的一个Partition使用
2)宽依赖指的是多个子RDD的Partition会依赖同一个parent RDD的Partition
98、cache和pesist的区别?
cache和persist都是用于将一个RDD进行缓存的,这样在之后使用的过程中就不需要重新计算了,可以大大节省程序运行时间
1) cache只有一个默认的缓存级别MEMORY_ONLY ,cache调用了persist,而persist可以根据情况设置其它的缓存级别;
2)executor执行的时候,默认60%做cache,40%做task操作,persist是最根本的函数,最底层的函数。
99、 cache后面能不能接其他算子,它是不是action操作?
cache可以接其他算子,但是接了算子之后,起不到缓存应有的效果,因为会重新触发cache。
cache不是action操作。
100、reduceByKey是不是action?
不是,很多人都会以为是action,reduce rdd是action
101、 RDD通过Linage(记录数据更新)的方式为何很高效?
1)lazy记录了数据的来源,RDD是不可变的,且是lazy级别的,且RDD之间构成了链条,lazy是弹性的基石。由于RDD不可变,所以每次操作就产生新的rdd,
不存在全局修改的问题,控制难度下降,所有有计算链条将复杂计算链条存储下来,计算的时候从后往前回溯 900步是上一个stage的结束,要么就checkpoint。
2)记录原数据,是每次修改都记录,代价很大如果修改一个集合,代价就很小,官方说rdd是粗粒度的操作,是为了效率,为了简化,每次都是操作数据集合,
写或者修改操作,都是基于集合的rdd的写操作是粗粒度的,rdd的读操作既可以是粗粒度的也可以是细粒度,读可以读其中的一条条的记录。
3)简化复杂度,是高效率的一方面,写的粗粒度限制了使用场景如网络爬虫,现实世界中,大多数写是粗粒度的场景。
102、为什么要进行序列化序列化?
可以减少数据的体积,减少存储空间,高效存储和传输数据,不好的是使用的时候要反序列化,非常消耗CPU。
103、Yarn中的container是由谁负责销毁的,在Hadoop Mapreduce中container可以复用么?
ApplicationMaster负责销毁,在Hadoop Mapreduce不可以复用,在spark on yarn程序container可以复用。
104、不启动Spark集群Master和work服务,可不可以运行Spark程序?
可以,只要资源管理器第三方管理就可以,如由yarn管理,spark集群不启动也可以使用spark;spark集群启动的是work和master,这个其实就是资源管理框架,
yarn中的resourceManager相当于master,NodeManager相当于worker,做计算是Executor,和spark集群的work和manager可以没关系,归根接底还是JVM的运行,
只要所在的JVM上安装了spark就可以。
105、spark on yarn Cluster 模式下,ApplicationMaster和driver是在同一个进程么?
是,driver 位于ApplicationMaster进程中。该进程负责申请资源,还负责监控程序、资源的动态情况。
106、运行在yarn中Application有几种类型的container?
1)运行ApplicationMaster的Container:这是由ResourceManager(向内部的资源调度器)申请和启动的,用户提交应用程序时,
可指定唯一的ApplicationMaster所需的资源;
2)运行各类任务的Container:这是由ApplicationMaster向ResourceManager申请的,并由ApplicationMaster与NodeManager通信以启动之。
107、Executor启动时,资源通过哪几个参数指定?
1)num-executors是executor的数量
2)executor-memory 是每个executor使用的内存
3)executor-cores 是每个executor分配的CPU
108、一个task的map数量由谁来决定?
一般情况下,在输入源是文件的时候,一个task的map数量由splitSize来决定的
那么splitSize是由以下几个来决定的
goalSize = totalSize / mapred.map.tasks
inSize = max {mapred.min.split.size, minSplitSize}
splitSize = max (minSize, min(goalSize, dfs.block.size))
一个task的reduce数量,由partition决定。
109、列出你所知道的调度器,说明其工作原理?
1)FiFo schedular 默认的调度器 先进先出
2)Capacity schedular 计算能力调度器 选择占用内存小 优先级高的
3)Fair schedular 调度器 公平调度器 所有job 占用相同资源
110、导致Executor产生FULL gc 的原因,可能导致什么问题?
可能导致Executor僵死问题,海量数据的shuffle和数据倾斜等都可能导致full gc。以shuffle为例,伴随着大量的Shuffle写操作,JVM的新生代不断GC,
Eden Space写满了就往Survivor Space写,同时超过一定大小的数据会直接写到老生代,当新生代写满了之后,也会把老的数据搞到老生代,如果老生代空间不足了,
就触发FULL GC,还是空间不够,那就OOM错误了,此时线程被Blocked,导致整个Executor处理数据的进程被卡住。
111、Spark累加器有哪些特点?
1)累加器在全局唯一的,只增不减,记录全局集群的唯一状态;
2)在exe中修改它,在driver读取;
3)executor级别共享的,广播变量是task级别的共享两个application不可以共享累加器,但是同一个app不同的job可以共享。
112、spark hashParitioner的弊端是什么?
HashPartitioner分区的原理很简单,对于给定的key,计算其hashCode,并除于分区的个数取余,如果余数小于0,则用余数+分区的个数,最后返回的值就是
这个key所属的分区ID;弊端是数据不均匀,容易导致数据倾斜,极端情况下某几个分区会拥有rdd的所有数据。
113、RangePartitioner分区的原理?
RangePartitioner分区则尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,也就是说一个分区中的元素肯定都是比另一个分区内的元素小
或者大;但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。其原理是水塘抽样。
114、rangePartioner分区器特点?
rangePartioner尽量保证每个分区中数据量的均匀,而且分区与分区之间是有序的,一个分区中的元素肯定都是比另一个分区内的元素小或者大;
但是分区内的元素是不能保证顺序的。简单的说就是将一定范围内的数映射到某一个分区内。RangePartitioner作用:将一定范围内的数映射到某一个分区内,
在实现中,分界的算法尤为重要。算法对应的函数是rangeBounds。
115、如何理解Standalone模式下,Spark资源分配是粗粒度的?
spark默认情况下资源分配是粗粒度的,也就是说程序在提交时就分配好资源,后面执行的时候使用分配好的资源,除非资源出现了故障才会重新分配。
比如Spark shell启动,已提交,一注册,哪怕没有任务,worker都会分配资源给executor。
116、union操作是产生宽依赖还是窄依赖?
产生窄依赖。
117、窄依赖父RDD的partition和子RDD的parition是不是都是一对一的关系?
不一定,除了一对一的窄依赖,还包含一对固定个数的窄依赖(就是对父RDD的依赖的Partition的数量不会随着RDD数量规模的改变而改变),
比如join操作的每个partiion仅仅和已知的partition进行join,这个join操作是窄依赖,依赖固定数量的父rdd,因为是确定的partition关系。
118、Hadoop中,Mapreduce操作的mapper和reducer阶段相当于spark中的哪几个算子?
相当于spark中的map算子和reduceByKey算子,当然还是有点区别的,MR会自动进行排序的,spark要看你用的是什么partitioner。
119、Spark中的HashShufle的有哪些不足?
1)shuffle产生海量的小文件在磁盘上,此时会产生大量耗时的、低效的IO操作;
2)容易导致内存不够用,由于内存需要保存海量的文件操作句柄和临时缓存信息,如果数据处理规模比较大的话,容易出现OOM;
3)容易出现数据倾斜,导致OOM。
120、 conslidate是如何优化Hash shuffle时在map端产生的小文件?
1)conslidate为了解决Hash Shuffle同时打开过多文件导致Writer handler内存使用过大以及产生过多文件导致大量的随机读写带来的低效磁盘IO;
2)conslidate根据CPU的个数来决定每个task shuffle map端产生多少个文件,假设原来有10个task,100个reduce,每个CPU有10个CPU,那么
使用hash shuffle会产生10100=1000个文件,conslidate产生1010=100个文件
注意:conslidate部分减少了文件和文件句柄,并行读很高的情况下(task很多时)还是会很多文件。
121、spark.default.parallelism这个参数有什么意义,实际生产中如何设置?
1)参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能;
2)很多人都不会设置这个参数,会使得集群非常低效,你的cpu,内存再多,如果task始终为1,那也是浪费,
spark官网建议task个数为CPU的核数*executor的个数的2~3倍。
122、spark.shuffle.memoryFraction参数的含义,以及优化经验?
1)spark.shuffle.memoryFraction是shuffle调优中 重要参数,shuffle从上一个task拉去数据过来,要在Executor进行聚合操作,
聚合操作时使用Executor内存的比例由该参数决定,默认是20%如果聚合时数据超过了该大小,那么就会spill到磁盘,极大降低性能;
2)如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,
避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,
那么同样建议调低这个参数的值。
123、Spark中standalone模式特点,有哪些优点和缺点?
1)特点:
(1)standalone是master/slave架构,集群由Master与Worker节点组成,程序通过与Master节点交互申请资源,Worker节点启动Executor运行;
(2)standalone调度模式使用FIFO调度方式;
(3)无依赖任何其他资源管理系统,Master负责管理集群资源。
2)优点:
(1)部署简单;
(2)不依赖其他资源管理系统。
3)缺点:
(1)默认每个应用程序会独占所有可用节点的资源,当然可以通过spark.cores.max来决定一个应用可以申请的CPU cores个数;
(2)可能有单点故障,需要自己配置master HA。
124、FIFO调度模式的基本原理、优点和缺点?
基本原理:按照先后顺序决定资源的使用,资源优先满足最先来的job。第一个job优先获取所有可用的资源,接下来第二个job再获取剩余资源。
以此类推,如果第一个job没有占用所有的资源,那么第二个job还可以继续获取剩余资源,这样多个job可以并行运行,如果第一个job很大,占用所有资源,
则第二job就需要等待,等到第一个job释放所有资源。
优点和缺点:
1)适合长作业,不适合短作业;
2)适合CPU繁忙型作业(计算时间长,相当于长作业),不利于IO繁忙型作业(计算时间短,相当于短作业)。
125、FAIR调度模式的优点和缺点?
所有的任务拥有大致相当的优先级来共享集群资源,spark多以轮训的方式为任务分配资源,不管长任务还是端任务都可以获得资源,并且获得不错的响应时间,
对于短任务,不会像FIFO那样等待较长时间了,通过参数spark.scheduler.mode 为FAIR指定。
126、CAPCACITY调度模式的优点和缺点?
1)原理:
计算能力调度器支持多个队列,每个队列可配置一定的资源量,每个队列采用 FIFO 调度策略,为了防止同一个用户的作业独占队列中的资源,该调度器会对
同一用户提交的作业所占资源量进行限定。调度时,首先按以下策略选择一个合适队列:计算每个队列中正在运行的任务数与其应该分得的计算资源之间的
比值(即比较空闲的队列),选择一个该比值最小的队列;然后按以下策略选择该队列中一个作业:按照作业优先级和提交时间顺序选择,
同时考虑用户资源量限制和内存限制
2)优点:
(1)计算能力保证。支持多个队列,某个作业可被提交到某一个队列中。每个队列会配置一定比例的计算资源,且所有提交到队列中的作业
共享该队列中的资源;
(2)灵活性。空闲资源会被分配给那些未达到资源使用上限的队列,当某个未达到资源的队列需要资源时,一旦出现空闲资源资源,便会分配给他们;
(3)支持优先级。队列支持作业优先级调度(默认是FIFO);
(4)多重租赁。综合考虑多种约束防止单个作业、用户或者队列独占队列或者集群中的资源;
(5)基于资源的调度。 支持资源密集型作业,允许作业使用的资源量高于默认值,进而可容纳不同资源需求的作业。不过,当前仅支持内存资源的调度。
127、常见的数压缩方式,你们生产集群采用了什么压缩方式,提升了多少效率?
1)数据压缩,大片连续区域进行数据存储并且存储区域中数据重复性高的状况下,可以使用适当的压缩算法。数组,对象序列化后都可以使用压缩,数更紧凑,
减少空间开销。常见的压缩方式有snappy,LZO,gz等
2)Hadoop生产环境常用的是snappy压缩方式(使用压缩,实际上是CPU换IO吞吐量和磁盘空间,所以如果CPU利用率不高,不忙的情况下,
可以大大提升集群处理效率)。snappy压缩比一般20%~30%之间,并且压缩和解压缩效率也非常高(参考数据如下):
(1)GZIP的压缩率最高,但是其实CPU密集型的,对CPU的消耗比其他算法要多,压缩和解压速度也慢;
(2)LZO的压缩率居中,比GZIP要低一些,但是压缩和解压速度明显要比GZIP快很多,其中解压速度快的更多;
(3)Zippy/Snappy的压缩率最低,而压缩和解压速度要稍微比LZO要快一些。
提升了多少效率可以从2方面回答:1)数据存储节约多少存储,2)任务执行消耗时间节约了多少,可以举个实际例子展开描述。
128、使用scala代码实现WordCount?
val conf = new SparkConf()
val sc = new SparkContext(conf)
val line = sc.textFile(“xxxx.txt”) line.flatMap(.split(" ")).map((,1)).reduceByKey(+). collect().foreach(println) sc.stop()
129、Spark RDD 和 MapReduce2的区别?
1)mr2只有2个阶段,数据需要大量访问磁盘,数据来源相对单一 ,spark RDD ,可以无数个阶段进行迭代计算,数据来源非常丰富,数据落地介质也
非常丰富spark计算基于内存;
2)MapReduce2需要频繁操作磁盘IO,需要大家明确的是如果是SparkRDD的话,你要知道每一种数据来源对应的是什么,RDD从数据源加载数据,
将数据放到不同的partition针对这些partition中的数据进行迭代式计算计算完成之后,落地到不同的介质当中。
130、spark和Mapreduce快? 为什么快呢? 快在哪里呢?
Spark更加快的主要原因有几点:
1)基于内存计算,减少低效的磁盘交互;
2)高效的调度算法,基于DAG;
3)容错机制Lingage,主要是DAG和Lianage,即使spark不使用内存技术,也大大快于mapreduce。
131、RDD的数据结构是怎么样的?
一个RDD对象,包含如下5个核心属性。
1)一个分区列表,每个分区里是RDD的部分数据(或称数据块)。
2)一个依赖列表,存储依赖的其他RDD。
3)一个名为compute的计算函数,用于计算RDD各分区的值。
4)分区器(可选),用于键/值类型的RDD,比如某个RDD是按散列来分区。
5)计算各分区时优先的位置列表(可选),比如从HDFS上的文件生成RDD时,RDD分区的位置优先选择数据所在的节点,这样可以避免数据移动带来的开销。
132、简述Spark on yarn的作业提交流程
YARN Client模式
在YARN Client模式下,Driver在任务提交的本地机器上运行,Driver启动后会和ResourceManager通讯申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster的功能相当于一个ExecutorLaucher,只负责向ResourceManager申请Executor内存。
ResourceManager接到ApplicationMaster的资源申请后会分配container,然后ApplicationMaster在资源分配指定的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。
YARN Cluster模式
在YARN Cluster模式下,任务提交后会和ResourceManager通讯申请启动ApplicationMaster,随后ResourceManager分配container,在合适的NodeManager上启动ApplicationMaster,此时的ApplicationMaster就是Driver。
Driver启动后向ResourceManager申请Executor内存,ResourceManager接到ApplicationMaster的资源申请后会分配container,然后在合适的NodeManager上启动Executor进程,Executor进程启动后会向Driver反向注册,Executor全部注册完成后Driver开始执行main函数,之后执行到Action算子时,触发一个job,并根据宽依赖开始划分stage,每个stage生成对应的taskSet,之后将task分发到各个Executor上执行。
133、Spark的有几种部署模式,每种模式特点?
1)本地模式
Spark不一定非要跑在hadoop集群,可以在本地,起多个线程的方式来指定。将Spark应用以多线程的方式直接运行在本地,一般都是为了方便调试,本地模式分三类
local:只启动一个executor
local[k]:启动k个executor
local[*]:启动跟cpu数目相同的 executor
2)standalone模式
分布式部署集群,自带完整的服务,资源管理和任务监控是Spark自己监控,这个模式也是其他模式的基础。
3)Spark on yarn模式
分布式部署集群,资源和任务监控交给yarn管理,但是目前仅支持粗粒度资源分配方式,包含cluster和client运行模式,cluster适合生产,driver运行在集群子节点,具有容错功能,client适合调试,dirver运行在客户端。
4)Spark On Mesos模式。
官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。用户可选择两种调度模式之一运行自己的应用程序:
(1)粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。
(2)细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。
134、Spark为什么比mapreduce快?
2.1、Spark vs MapReduce ≠ 内存 vs 磁盘
其实Spark和MapReduce的计算都发生在内存中,区别在于:
- MapReduce通常需要将计算的中间结果写入磁盘,然后还要读取磁盘,从而导致了频繁的磁盘IO。
- Spark则不需要将计算的中间结果写入磁盘,这得益于Spark的RDD(弹性分布式数据集,很强大)和DAG(有向无环图),其中DAG记录了job的stage以及在job执行过程中父RDD和子RDD之间的依赖关系。中间结果能够以RDD的形式存放在内存中,且能够从DAG中恢复,大大减少了磁盘IO。
2.2、Spark vs MapReduce Shuffle的不同
Spark和MapReduce在计算过程中通常都不可避免的会进行Shuffle,两者至少有一点不同:
- MapReduce在Shuffle时需要花费大量时间进行排序,排序在MapReduce的Shuffle中似乎是不可避免的;
- Spark在Shuffle时则只有部分场景才需要排序,支持基于Hash的分布式聚合,更加省时;
2.3、多进程模型 vs 多线程模型的区别
- MapReduce采用了多进程模型,而Spark采用了多线程模型。多进程模型的好处是便于细粒度控制每个任务占用的资源,但每次任务的启动都会消耗一定的启动时间。就是说MapReduce的Map Task和Reduce Task是进程级别的,而Spark Task则是基于线程模型的,就是说mapreduce 中的 map 和 reduce 都是 jvm 进程,每次启动都需要重新申请资源,消耗了不必要的时间(假设容器启动时间大概1s,如果有1200个block,那么单独启动map进程事件就需要20分钟)
- Spark则是通过复用线程池中的线程来减少启动、关闭task所需要的开销。(多线程模型也有缺点,由于同节点上所有任务运行在一个进程中,因此,会出现严重的资源争用,难以细粒度控制每个任务占用资源)
135、简单说一下hadoop和spark的shuffle相同和差异?
1)从 high-level 的角度来看,两者并没有大的差别。 都是将 mapper(Spark 里是 ShuffleMapTask)的输出进行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一个 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以内存作缓冲区,边 shuffle 边 aggregate 数据,等到数据 aggregate 好以后进行 reduce() (Spark 里可能是后续的一系列操作)。
2)从 low-level 的角度来看,两者差别不小。 Hadoop MapReduce 是 sort-based,进入 combine() 和 reduce() 的 records 必须先 sort。这样的好处在于 combine/reduce() 可以处理大规模的数据,因为其输入数据可以通过外排得到(mapper 对每段数据先做排序,reducer 的 shuffle 对排好序的每段数据做归并)。目前的 Spark 默认选择的是 hash-based,通常使用 HashMap 来对 shuffle 来的数据进行 aggregate,不会对数据进行提前排序。如果用户需要经过排序的数据,那么需要自己调用类似 sortByKey() 的操作;如果你是Spark 1.1的用户,可以将spark.shuffle.manager设置为sort,则会对数据进行排序。在Spark 1.2中,sort将作为默认的Shuffle实现。
3)从实现角度来看,两者也有不少差别。 Hadoop MapReduce 将处理流程划分出明显的几个阶段:map(), spill, merge, shuffle, sort, reduce() 等。每个阶段各司其职,可以按照过程式的编程思想来逐一实现每个阶段的功能。在 Spark 中,没有这样功能明确的阶段,只有不同的 stage 和一系列的 transformation(),所以 spill, merge, aggregate 等操作需要蕴含在 transformation() 中。
如果我们将 map 端划分数据、持久化数据的过程称为 shuffle write,而将 reducer 读入数据、aggregate 数据的过程称为 shuffle read。那么在 Spark 中,问题就变为怎么在 job 的逻辑或者物理执行图中加入 shuffle write 和 shuffle read的处理逻辑?以及两个处理逻辑应该怎么高效实现?
Shuffle write由于不要求数据有序,shuffle write 的任务很简单:将数据 partition 好,并持久化。之所以要持久化,一方面是要减少内存存储空间压力,另一方面也是为了 fault-tolerance。
136、spark工作机制?
① 构建Application的运行环境,Driver创建一个SparkContext
② SparkContext向资源管理器(Standalone、Mesos、Yarn)申请Executor资源,资源管理器启动StandaloneExecutorbackend(Executor)
③ Executor向SparkContext申请Task
④ SparkContext将应用程序分发给Executor
⑤ SparkContext就建成DAG图,DAGScheduler将DAG图解析成Stage,每个Stage有多个task,形成taskset发送给task Scheduler,由task Scheduler将Task发送给Executor运行
⑥ Task在Executor上运行,运行完释放所有资源
137、spark的优化怎么做?
spark调优比较复杂,但是大体可以分为三个方面来进行
1)平台层面的调优:防止不必要的jar包分发,提高数据的本地性,选择高效的存储格式如parquet
2)应用程序层面的调优:过滤操作符的优化降低过多小任务,降低单条记录的资源开销,处理数据倾斜,复用RDD进行缓存,作业并行化执行等等
3)JVM层面的调优:设置合适的资源量,设置合理的JVM,启用高效的序列化方法如kyro,增大off head内存等等
138、数据本地性是在哪个环节确定的?
具体的task运行在那他机器上,dag划分stage的时候确定的
139、RDD的弹性表现在哪几点?
1)自动的进行内存和磁盘的存储切换;
2)基于Lineage的高效容错;
3)task如果失败会自动进行特定次数的重试;
4)stage如果失败会自动进行特定次数的重试,而且只会计算失败的分片;
5)checkpoint和persist,数据计算之后持久化缓存;
6)数据调度弹性,DAG TASK调度和资源无关;
7)数据分片的高度弹性。
140、RDD有哪些缺陷?
1)不支持细粒度的写和更新操作(如网络爬虫),spark写数据是粗粒度的。所谓粗粒度,就是批量写入数据,为了提高效率。但是读数据是细粒度的也就是说可以一条条的读。
2)不支持增量迭代计算,Flink支持
141、介绍一下你对Unified Memory Management内存管理模型的理解?(了解)
Spark中的内存使用分为两部分:执行(execution)与存储(storage)。执行内存主要用于shuffles、joins、sorts和aggregations,存储内存则用于缓存或者跨节点的内部数据传输。1.6之前,对于一个Executor,内存都由以下部分构成:
1)ExecutionMemory。这片内存区域是为了解决 shuffles,joins, sorts and aggregations 过程中为了避免频繁IO需要的buffer。 通过spark.shuffle.memoryFraction(默认 0.2) 配置。
2)StorageMemory。这片内存区域是为了解决 block cache(就是你显示调用rdd.cache, rdd.persist等方法), 还有就是broadcasts,以及task results的存储。可以通过参数 spark.storage.memoryFraction(默认0.6)设置。
3)OtherMemory。给系统预留的,因为程序本身运行也是需要内存的(默认为0.2)。
传统内存管理的不足:
1)Shuffle占用内存0.2*0.8,内存分配这么少,可能会将数据spill到磁盘,频繁的磁盘IO是很大的负担,Storage内存占用0.6,主要是为了迭代处理。传统的Spark内存分配对操作人的要求非常高。(Shuffle分配内存:ShuffleMemoryManager, TaskMemoryManager, ExecutorMemoryManager)一个Task获得全部的Execution的Memory,其他Task过来就没有内存了,只能等待;
2)默认情况下,Task在线程中可能会占满整个内存,分片数据
142、 Spark的数据本地性有哪几种?
Spark中的数据本地性有三种:
1)PROCESS_LOCAL是指读取缓存在本地节点的数据
2)NODE_LOCAL是指读取本地节点硬盘数据
3)ANY是指读取非本地节点数据
通常读取数据PROCESS_LOCAL>NODE_LOCAL>ANY,尽量使数据以PROCESS_LOCAL或NODE_LOCAL方式读取。其中PROCESS_LOCAL还和cache有关,如果RDD经常用的话将该RDD cache到内存中,注意,由于cache是lazy的,所以必须通过一个action的触发,才能真正的将该RDD cache到内存中。
一、数据倾斜调优
1、数据倾斜
数据倾斜指的是,并行处理的数据集中,某一部分(如Spark或Kafka的一个Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
数据倾斜俩大直接致命后果。
1、数据倾斜直接会导致一种情况:Out Of Memory。
2、运行速度慢。
主要是发生在Shuffle阶段。同样Key的数据条数太多了。导致了某个key(下图中的80亿条)所在的Task数据量太大了。远远超过其他Task所处理的数据量。
一个经验结论是:一般情况下,OOM的原因都是数据倾斜
2、如何定位数据倾斜
数据倾斜一般会发生在shuffle过程中。很大程度上是你使用了可能会触发shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。
原因:查看任务->查看Stage->查看代码
某个task执行特别慢的情况
某个task莫名其妙内存溢出的情况
查看导致数据倾斜的key的数据分布情况
也可从以下几种情况考虑:
1、是不是有OOM情况出现,一般是少数内存溢出的问题
2、是不是应用运行时间差异很大,总体时间很长
3、需要了解你所处理的数据Key的分布情况,如果有些Key有大量的条数,那么就要小心数据倾斜的问题
4、一般需要通过Spark Web UI和其他一些监控方式出现的异常来综合判断
5、看看代码里面是否有一些导致Shuffle的算子出现
3、数据倾斜的几种典型情况
3.1 数据源中的数据分布不均匀,Spark需要频繁交互
3.2 数据集中的不同Key由于分区方式,导致数据倾斜
3.3 JOIN操作中,一个数据集中的数据分布不均匀,另一个数据集较小(主要)
3.4 聚合操作中,数据集中的数据分布不均匀(主要)
3.5 JOIN操作中,两个数据集都比较大,其中只有几个Key的数据分布不均匀
3.6 JOIN操作中,两个数据集都比较大,有很多Key的数据分布不均匀
3.7 数据集中少数几个key数据量很大,不重要,其他数据均匀
注意:
1、需要处理的数据倾斜问题就是Shuffle后数据的分布是否均匀问题
2、只要保证最后的结果是正确的,可以采用任何方式来处理数据倾斜,只要保证在处理过程中不发生数据倾斜就可以
4、数据倾斜的处理方法
4.1 数据源中的数据分布不均匀,Spark需要频繁交互
解决方案
:避免数据源的数据倾斜
实现原理
:通过在Hive中对倾斜的数据进行预处理,以及在进行kafka数据分发时尽量进行平均分配。这种方案从根源上解决了数据倾斜,彻底避免了在Spark中执行shuffle类算子,那么肯定就不会有数据倾斜的问题了。
方案优点
:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。
方案缺点
:治标不治本,Hive或者Kafka中还是会发生数据倾斜。
适用情况
:在一些Java系统与Spark结合使用的项目中,会出现Java代码频繁调用Spark作业的场景,而且对Spark作业的执行性能要求很高,就比较适合使用这种方案。将数据倾斜提前到上游的Hive ETL,每天仅执行一次,只有那一次是比较慢的,而之后每次Java调用Spark作业时,执行速度都会很快,能够提供更好的用户体验。
总结:前台的Java系统和Spark有很频繁的交互,这个时候如果Spark能够在最短的时间内处理数据,往往会给前端有非常好的体验。这个时候可以将数据倾斜的问题抛给数据源端,在数据源端进行数据倾斜的处理。但是这种方案没有真正的处理数据倾斜问题。
4.2 数据集中的不同Key由于分区方式,导致数据倾斜
解决方案1
:调整并行度
实现原理
:增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。
方案优点
:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。
方案缺点
:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。
实践经验
:该方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,都无法处理。
总结:调整并行度:适合于有大量key由于分区算法或者分区数的问题,将key进行了不均匀分区,可以通过调大或者调小分区数来试试是否有效
解决方案2:
缓解数据倾斜(自定义Partitioner)
适用场景
:大量不同的Key被分配到了相同的Task造成该Task数据量过大。
解决方案
: 使用自定义的Partitioner实现类代替默认的HashPartitioner,尽量将所有不同的Key均匀分配到不同的Task中。
优势
: 不影响原有的并行度设计。如果改变并行度,后续Stage的并行度也会默认改变,可能会影响后续Stage。
劣势
: 适用场景有限,只能将不同Key分散开,对于同一Key对应数据集非常大的场景不适用。效果与调整并行度类似,只能缓解数据倾斜而不能完全消除数据倾斜。而且需要根据数据特点自定义专用的Partitioner,不够灵活。
4.3 JOIN操作中,一个数据集中的数据分布不均匀,另一个数据集较小(主要)
解决方案:Reduce side Join转变为Map side Join
方案适用场景
:在对RDD使用join类操作,或者是在Spark SQL中使用join语句时,而且join操作中的一个RDD或表的数据量比较小(比如几百M),比较适用此方案。
方案实现原理
:普通的join是会走shuffle过程的,而一旦shuffle,就相当于会将相同key的数据拉取到一个shuffle read task中再进行join,此时就是reduce join。但是如果一个RDD是比较小的,则可以采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。
方案优点
:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。
方案缺点
:适用场景较少,因为这个方案只适用于一个大表和一个小表的情况。
4.4 聚合操作中,数据集中的数据分布不均匀(主要)
解决方案
:两阶段聚合(局部聚合+全局聚合)
适用场景
:对RDD执行reduceByKey等聚合类shuffle算子或者在Spark SQL中使用group by语句进行分组聚合时,比较适用这种方案
实现原理
:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。
优点
:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。
缺点
:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案将相同key的数据分拆处理
4.5 JOIN操作中,两个数据集都比较大,其中只有几个Key的数据分布不均匀
解决方案
:为倾斜key增加随机前/后缀
适用场景
:两张表都比较大,无法使用Map侧Join。其中一个RDD有少数几个Key的数据量过大,另外一个RDD的Key分布较为均匀。
解决方案
:将有数据倾斜的RDD中倾斜Key对应的数据集单独抽取出来加上随机前缀,另外一个RDD每条数据分别与随机前缀结合形成新的RDD(笛卡尔积,相当于将其数据增到到原来的N倍,N即为随机前缀的总个数),然后将二者Join后去掉前缀。然后将不包含倾斜Key的剩余数据进行Join。最后将两次Join的结果集通过union合并,即可得到全部Join结果。
优势
:相对于Map侧Join,更能适应大数据集的Join。如果资源充足,倾斜部分数据集与非倾斜部分数据集可并行进行,效率提升明显。且只针对倾斜部分的数据做数据扩展,增加的资源消耗有限。
劣势
:如果倾斜Key非常多,则另一侧数据膨胀非常大,此方案不适用。而且此时对倾斜Key与非倾斜Key分开处理,需要扫描数据集两遍,增加了开销。
注意:具有倾斜Key的RDD数据集中,key的数量比较少
4.6 JOIN操作中,两个数据集都比较大,有很多Key的数据分布不均匀
解决方案
:随机前缀和扩容RDD进行join
适用场景
:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义。
实现思路
:将该RDD的每条数据都打上一个n以内的随机前缀。同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。最后将两个处理后的RDD进行join即可。和上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。
优点
:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。
缺点
:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。
实践经验
:曾经开发一个数据需求的时候,发现一个join导致了数据倾斜。优化之前,作业的执行时间大约是60分钟左右;使用该方案优化之后,执行时间缩短到10分钟左右,性能提升了6倍。
注意:将倾斜Key添加1-N的随机前缀,并将被Join的数据集相应的扩大N倍(需要将1-N数字添加到每一条数据上作为前缀)
4.7 数据集中少数几个key数据量很大,不重要,其他数据均匀
解决方案
:过滤少数倾斜Key
适用场景
:如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大的话,那么很适合使用这种方案。比如99%的key就对应10条数据,但是只有一个key对应了100万数据,从而导致了数据倾斜。
优点
:实现简单,而且效果也很好,可以完全规避掉数据倾斜。
缺点
:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。
实践经验
:在项目中我们也采用过这种方案解决数据倾斜。有一次发现某一天Spark作业在运行的时候突然OOM了,追查之后发现,是Hive表中的某一个key在那天数据异常,导致数据量暴增。因此就采取每次执行前先进行采样,计算出样本中数据量最大的几个key之后,直接在程序中将那些key给过滤掉。
二、Spark资源调优
一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个task,都是以每个task一条线程的方式,多线程并发运行的。
一个应用提交的时候设置多大的内存?设置多少Core?设置几个Executor?
./bin/spark-submit \
--master yarn-cluster \
--num-executors 100 \
--executor-memory 6G \
--executor-cores 4 \
--driver-memory 1G \
--conf spark.default.parallelism=1000 \
--conf spark.storage.memoryFraction=0.5 \
--conf spark.shuffle.memoryFraction=0.3 \
1 、运行资源优化配置 -num-executors
参数说明:该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。
参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
2、 运行资源优化配置 -executor-memory
参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。
参数调优建议:每个Executor进程的内存设置4G~8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors * executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同事的作业无法运行。
3 、运行资源优化配置 -executor-cores
参数说明:该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。
参数调优建议:Executor的CPU core数量设置为2~4个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/3~1/2左右比较合适,也是避免影响其他同事的作业运行。
4、 运行资源优化配置 -driver-memory
参数说明:该参数用于设置Driver进程的内存。
参数调优建议:Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理(或者是用map side join操作),那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。
5 、运行资源优化配置 -spark.default.parallelism
参数说明:该参数用于设置每个stage的默认task数量,也可以认为是分区数。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。
参数调优建议:Spark作业的默认task数量为500~1000个较为合适。很多人常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的2~3倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
6、 运行资源优化配置 -spark.storage.memoryFraction
参数说明:该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘。
参数调优建议:如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
7、 运行资源优化配置 -spark.shuffle.memoryFraction
参数说明:该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。
参数调优建议:如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
总结:
1、num-executors
:应用运行时executor的数量,推荐50-100左右比较合适
2、executor-memory
:应用运行时executor的内存,推荐4-8G比较合适
3、executor-cores
:应用运行时executor的CPU核数,推荐2-4个比较合适
4、driver-memory
:应用运行时driver的内存量,主要考虑如果使用map side join或者一些类似于collect的操作,那么要相应调大内存量
5、spark.default.parallelism
:每个stage默认的task数量,推荐参数为num-executors * executor-cores的2~3倍较为合适
6、spark.storage.memoryFraction
:每一个executor中用于RDD缓存的内存比例,如果程序中有大量的数据缓存,可以考虑调大整个的比例,默认为60%
7、spark.shuffle.memoryFraction
:每一个executor中用于Shuffle操作的内存比例,默认是20%,如果程序中有大量的Shuffle类算子,那么可以考虑其它的比例
三、Spark程序开发调优
1、程序开发调优 :避免创建重复的RDD
需要对名为“hello.txt”的HDFS文件进行一次map操作,再进行一次reduce操作。也就是说,需要对一份数据执行两次算子操作。
错误的做法:
对于同一份数据执行多次算子操作时,创建多个RDD。//这里执行了两次textFile方法,针对同一个HDFS文件,创建了两个RDD出来,然后分别对每个RDD都执行了一个算子操作。
这种情况下,Spark需要从HDFS上两次加载hello.txt文件的内容,并创建两个单独的RDD;//第二次加载HDFS文件以及创建RDD的性能开销,很明显是白白浪费掉的。
val rdd1 = sc.textFile("hdfs://master:9000/hello.txt")
rdd1.map(...)
val rdd2 = sc.textFile("hdfs://master:9000/hello.txt")
rdd2.reduce(...)
正确的用法:
对于一份数据执行多次算子操作时,只使用一个RDD。
2、程序开发调优 :尽可能复用同一个RDD
错误的做法:
有一个<long , String>格式的RDD,即rdd1。
接着由于业务需要,对rdd1执行了一个map操作,创建了一个rdd2,而rdd2中的数据仅仅是rdd1中的value值而已,也就是说,rdd2是rdd1的子集。
JavaPairRDD<long , String> rdd1 = ...
JavaRDD<string> rdd2 = rdd1.map(...)
分别对rdd1和rdd2执行了不同的算子操作。
rdd1.reduceByKey(...)
rdd2.map(...)
正确的做法:
rdd2的数据完全就是rdd1的子集而已,却创建了两个rdd,并对两个rdd都执行了一次算子操作。
此时会因为对rdd1执行map算子来创建rdd2,而多执行一次算子操作,进而增加性能开销。
其实在这种情况下完全可以复用同一个RDD。
我们可以使用rdd1,既做reduceByKey操作,也做map操作。
JavaPairRDD<long , String>
rdd1 = ...rdd1.reduceByKey(...)
rdd1.map(tuple._2...)
3、程序开发调优 :对多次使用的RDD进行持久化
正确的做法:
cache()方法表示:使用非序列化的方式将RDD中的数据全部尝试持久化到内存中。
此时再对rdd1执行两次算子操作时,只有在第一次执行map算子时,才会将这个rdd1从源头处计算一次。
第二次执行reduce算子时,就会直接从内存中提取数据进行计算,不会重复计算一个rdd。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt").cache()
rdd1.map(...)
rdd1.reduce(...)
序列化的方式可以减少持久化的数据对内存/磁盘的占用量,进而避免内存被持久化数据占用过多,从而发生频繁GC。
val rdd1 = sc.textFile("hdfs://192.168.0.1:9000/hello.txt") .persist(StorageLevel.MEMORY_AND_DISK_SER)
rdd1.map(...)
rdd1.reduce(...)
注意:通常不建议使用DISK_ONLY和后缀为_2的级别:因为完全基于磁盘文件进行数据的读写,会导致性能急剧降低,导致网络较大开销
4、程序开发调优 :尽量避免使用shuffle类算子
如果有可能的话,要尽量避免使用shuffle类算子,最消耗性能的地方就是shuffle过程。
shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。
尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。
传统的join操作会导致shuffle操作。
因为两个RDD中,相同的key都需要通过网络拉取到一个节点上,由一个task进行join操作。
val rdd3 = rdd1.join(rdd2)
Broadcast+map的join操作,不会导致shuffle操作。
使用Broadcast将一个数据量较小的RDD作为广播变量。
val rdd2Data = rdd2.collect() val rdd2DataBroadcast = sc.broadcast(rdd2Data) val rdd3 = rdd1.map(rdd2DataBroadcast...)
注意:以上操作,建议仅仅在rdd2的数据量比较少(比如几百M,或者一两G)的情况下使用。因为每个Executor的内存中,都会驻留一份rdd2的全量数据。
5、程序开发调优 :使用map-side预聚合的shuffle操作
如果因为业务需要,一定要使用shuffle操作,无法用map类的算子来替代,那么尽量使用可以map-side预聚合的算子,类似于MapReduce中的本地combiner。map-side预聚合之后,每个节点本地就只会有一条相同的key,因为多条相同的key都被聚合起来了。其他节点在拉取所有节点上的相同key时,就会大大减少需要拉取的数据数量,从而也就减少了磁盘IO以及网络传输开销。
建议使用reduceByKey或者aggregateByKey算子来替代掉groupByKey算子
6、程序开发调优 :使用高性能的算子
使用reduceByKey/aggregateByKey替代groupByKey : map-side
使用mapPartitions替代普通map : 函数执行频率
使用foreachPartitions替代foreach : 函数执行频率
使用filter之后进行coalesce操作 : filter后对分区进行压缩
使用repartitionAndSortWithinPartitions替代repartition与sort类操作
repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子
7、程序开发调优 :广播大变量
有时在开发过程中,会遇到需要在算子函数中使用外部变量的场景(尤其是大变量,比如100M以上的大集合),那么此时就应该使用Spark的广播(Broadcast)功能来提升性能。
默认情况下,Spark会将该变量复制多个副本,通过网络传输到task中,此时每个task都有一个变量副本。如果变量本身比较大的话(比如100M,甚至1G),那么大量的变量副本在网络中传输的性能开销,以及在各个节点的Executor中占用过多内存导致的频繁GC,都会极大地影响性能。
广播后的变量,会保证每个Executor的内存中,只驻留一份变量副本,而Executor中的task执行时共享该Executor中的那份变量副本。
8、程序开发调优 :使用Kryo优化序列化性能
1)在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输。
2)将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
3)使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
Spark默认使用的是Java的序列化机制,你可以使用Kryo作为序列化类库,效率要比Java的序列化机制要高
// 创建SparkConf对象。val conf = new SparkConf().setMaster(...).setAppName(...)// 设置序列化器为KryoSerializer。conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")// 注册要序列化的自定义类型。conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
9、程序开发调优 :分区Shuffle优化
当遇到userData和events进行join时,userData比较大,而且join操作比较频繁,这个时候,可以先将userData调用了 partitionBy()分区,可以极大提高效率。
cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()等都能够受益
总结:如果遇到一个RDD频繁和其他RDD进行Shuffle类操作,比如 cogroup()、 groupWith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、 combineByKey() 以及 lookup()等,那么最好将该RDD通过partitionBy()操作进行预分区,这些操作在Shuffle过程中会减少Shuffle的数据量
10、程序开发调优 :优化数据结构
Java中,有三种类型比较耗费内存:
1)对象,每个Java对象都有对象头、引用等额外的信息,因此比较占用内存空间。
2)字符串,每个字符串内部都有一个字符数组以及长度等额外信息。
3)集合类型,比如HashMap、LinkedList等,因为集合类型内部通常会使用一些内部类来封装集合元素,比如Map.Entry
Spark官方建议,在Spark编码实现中,特别是对于算子函数中的代码,尽量不要使用上述三种数据结构,尽量使用字符串替代对象,使用原始类型(比如Int、Long)替代字符串,使用数组替代集合类型,这样尽可能地减少内存占用,从而降低GC频率,提升性能。
四、Spark的Shuffle配置调优
1、Shuffle优化配置 -spark.shuffle.file.buffer
默认值:32k
参数说明:该参数用于设置shuffle write task的BufferedOutputStream的buffer缓冲大小。将数据写到磁盘文件之前,会先写入buffer缓冲中,待缓冲写满之后,才会溢写到磁盘。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如64k),从而减少shuffle write过程中溢写磁盘文件的次数,也就可以减少磁盘IO次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
2、Shuffle优化配置 -spark.reducer.maxSizeInFlight
默认值:48m
参数说明:该参数用于设置shuffle read task的buffer缓冲大小,而这个buffer缓冲决定了每次能够拉取多少数据。
调优建议:如果作业可用的内存资源较为充足的话,可以适当增加这个参数的大小(比如96m),从而减少拉取数据的次数,也就可以减少网络传输的次数,进而提升性能。在实践中发现,合理调节该参数,性能会有1%~5%的提升。
3、Shuffle优化配置 -spark.shuffle.io.maxRetries
默认值:3
参数说明:shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的。该参数就代表了可以重试的最大次数。如果在指定次数之内拉取还是没有成功,就可能会导致作业执行失败。
调优建议:对于那些包含了特别耗时的shuffle操作的作业,建议增加重试最大次数(比如60次),以避免由于JVM的full gc或者网络不稳定等因素导致的数据拉取失败。在实践中发现,对于针对超大数据量(数十亿~上百亿)的shuffle过程,调节该参数可以大幅度提升稳定性。
4、Shuffle优化配置 -spark.shuffle.io.retryWait
默认值:5s
参数说明: shuffle read task从shuffle write task所在节点拉取属于自己的数据时,如果因为网络异常导致拉取失败,是会自动进行重试的,该参数代表了每次重试拉取数据的等待间隔,默认是5s。
调优建议:建议加大间隔时长(比如60s),以增加shuffle操作的稳定性。
5、Shuffle优化配置 -spark.shuffle.memoryFraction
默认值:0.2
参数说明:该参数代表了Executor内存中,分配给shuffle read task进行聚合操作的内存比例,默认是20%。
调优建议:在资源参数调优中讲解过这个参数。如果内存充足,而且很少使用持久化操作,建议调高这个比例,给shuffle read的聚合操作更多内存,以避免由于内存不足导致聚合过程中频繁读写磁盘。在实践中发现,合理调节该参数可以将性能提升10%左右。
6、Shuffle优化配置 -spark.shuffle.manager
默认值:sort
参数说明:该参数用于设置ShuffleManager的类型。Spark 1.5以后,有三个可选项:hash、sort和tungsten-sort。HashShuffleManager是Spark 1.2以前的默认选项,但是Spark 1.2以及之后的版本默认都是SortShuffleManager了。tungsten-sort与sort类似,但是使用了tungsten计划中的堆外内存管理机制,内存使用效率更高。
调优建议:由于SortShuffleManager默认会对数据进行排序,因此如果你的业务逻辑中需要该排序机制的话,则使用默认的SortShuffleManager就可以;而如果你的业务逻辑不需要对数据进行排序,那么建议参考后面的几个参数调优,通过bypass机制或优化的HashShuffleManager来避免排序操作,同时提供较好的磁盘读写性能。这里要注意的是,tungsten-sort要慎用,因为之前发现了一些相应的bug。
7、Shuffle优化配置 -spark.shuffle.sort.bypassMergeThreshold
默认值:200
参数说明:当ShuffleManager为SortShuffleManager时,如果shuffle read task的数量小于这个阈值(默认是200),则shuffle write过程中不会进行排序操作,而是直接按照未经优化的HashShuffleManager的方式去写数据,但是最后会将每个task产生的所有临时磁盘文件都合并成一个文件,并会创建单独的索引文件。
调优建议:当你使用SortShuffleManager时,如果的确不需要排序操作,那么建议将这个参数调大一些,大于shuffle read task的数量。那么此时就会自动启用bypass机制,map-side就不会进行排序了,减少了排序的性能开销。但是这种方式下,依然会产生大量的磁盘文件,因此shuffle write性能有待提高。
8、Shuffle优化配置 -spark.shuffle.consolidateFiles
默认值:false
参数说明:如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。
调优建议:如果的确不需要SortShuffleManager的排序机制,那么除了使用bypass机制,还可以尝试将spark.shffle.manager参数手动指定为hash,使用HashShuffleManager,同时开启consolidate机制。在实践中尝试过,发现其性能比开启了bypass机制的SortShuffleManager要高出10%~30%。
总结:
1、spark.shuffle.file.buffer
:主要是设置的Shuffle过程中写文件的缓冲,默认32k,如果内存足够,可以适当调大,来减少写入磁盘的数量。
2、spark.reducer.maxSizeInFight
:主要是设置Shuffle过程中读文件的缓冲区,一次能够读取多少数据,如果内存足够,可以适当扩大,减少整个网络传输次数。
3、spark.shuffle.io.maxRetries
:主要是设置网络连接失败时,重试次数,适当调大能够增加稳定性。
4、spark.shuffle.io.retryWait
:主要设置每次重试之间的间隔时间,可以适当调大,增加程序稳定性。
5、spark.shuffle.memoryFraction
:Shuffle过程中的内存占用,如果程序中较多使用了Shuffle操作,那么可以适当调大该区域。
6、spark.shuffle.manager
:Hash和Sort方式,Sort是默认,Hash在reduce数量 比较少的时候,效率会很高。
7、spark.shuffle.sort. bypassMergeThreshold
:设置的是Sort方式中,启用Hash输出方式的临界值,如果你的程序数据不需要排序,而且reduce数量比较少,那推荐可以适当增大临界值。
8、spark. shuffle.cosolidateFiles
:如果你使用Hash shuffle方式,推荐打开该配置,实现更少的文件输出。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)