Kafka 设计架构原理详细解析(超详细图解)
什么是Kafka?Apache Kafka是一个开放源代码的分布式事件流平台,成千上万的公司使用它来实现高性能数据管道,流分析,数据集成和关键任务等相关的应用程序。Kafka的应用场景构造实时流数据管道,它可以在系统或应用之间可靠地获取数据 (相当于message queue),特别是在集群情况下,多个服务器需要建立交流构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kaf
什么是Kafka?
Apache Kafka是一个开放源代码的分布式事件流平台,成千上万的公司使用它来实现高性
能数据管道,流分析,数据集成和关键任务等相关的应用程序。
Kafka的应用场景
- 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据 (相当于message queue),特别是在集群情况下,多个服务器需要建立交流
- 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)
Kafka架构设计
Producer:生产者可以将数据发布到所选择的topic(主题)中。生成者负责将记录分配到topic的哪一个分区(partition)中,这里可以使用对多个partition循环发送来实现多个server负载均衡
Broker:日志的分区(partition)分布在Kafka集群的服务器上。每个服务器处理数据和请求时,共享这些分区。每一个分区都会在以配置的服务器上进行备份,确保容错性。
其中,每个分区都有一台server作为leader,零台或堕胎server作为follows。leader server处理一切对分区的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers中的一台server会自动成为新的eader,每台server都会成为某些分区的leader和某些分区的follower,因此集群的负载是均衡的
Consumer:消费者使用一个group(消费组)名称来表示,发布到topic中的每条记录将被分配到订阅消费组中的其中一个消费者示例。消费者实例可以分布在多个进程中或多个机器上
这里有两个注意的地方:
- 如果所有的消费者实例在同一个消费组中,消息记录会负载均衡到消费组中的每一个消费者实例
- 如果所有的消费者实例在不同的消费组中,则会将每条消息记录广播到所有的消费组或消费者进程中
如图中所示,这个Kafka集群中有两台server,四个分区(p0-p3)和两个消费组。这时分区中的消息记录会广播到所有的消费者组中
Kafka 生产者架构
基本流程:
- 主线程Producer中会经过拦截器、序列化器、分区器,然后将处理好的消息发送到消息累加器中
- 消息累加器每个分区会对应一个队列,在收到消息后,将消息放到队列中
- 使用
ProducerBatch
批量的进行消息发送到Sender线程处理(这里为了提高发送效率,减少带宽),ProducerBatch中就是我们需要发送的消息,其中消息累加器中可以使用Buffer.memory
配置,默认为32MB - Sender线程会从队列的队头部开始读取消息,然后创建request后会经过会被缓存,然后提交到
Selector
,Selector发送消息到Kafka集群 - 对于一些还没收到Kafka集群ack响应的消息,会将未响应接收消息的请求进行缓存,当收到Kafka集群ack响应后,会将request请求在缓存中清除并同时移除消息累加器中的消息
Kafka 消费者架构
基本流程:
Consumer Group中的Consumer向各自注册的分区上进行消费消息
Consumer消费消息后会将当前标注的消费位移信息以消息的方式提交到位移主题中记录,一个Consumer Group中多个Consumer会做负载均衡,如果一个Consumer宕机,会自动切换到组内别的Consumer进行消费
关键的点:
Consumer Group:组内多个的Consumer可以公用一个Consumer Id,组内所有的Consumer只能注册到一个分区上去消费,一个Consumer Group只能到一个Topic上去消费
位移主题:
位移主题的主要作用是保存Kafka消费者的位移信息
Kafka老版本之前:
在Kafka老版本之前处理方式是自动或手动地将位移数据提交到Zookeeper进行保存,Consumer重启后,自动从Zookeeper中读取消费位移信息,从而在上次的offset地方继续消费
优点: Kafka Broker中不需要保存位移数据,减少了Broker端需要持有的状态信息,有利于动态扩展
缺点: 每一个Consumer消费后需要发送位移信息到Zookeeper,而Zooker不适用于这种高频的写操作
Kafka最新版本中位移主题的处理方式:
Consumer的位移信息offset会当作一条条普通消息提交到位移主题(_consumer_offsets)中。
Kafka 文件存储架构
window文件系统中的文件列表:
这里比较好理解:
- 一个Topic分别存储在不同的partition中
- 一个partitioin对应着多个replica备份
- 一个relica对应着一个Log
- 一个Log对应多个LogSegment
- 而在LogSegment中存储着log文件、索引文件、其它文件
Kafka 如何保证数据有序性?
一些场景需要保证多个消息的消费顺序,比如订单,但在kafka中一个消息可能被发到多个partition中多个线程处理,被多个消费者消费,无法保证消息的消费顺序
解决方案:将需要顺序消费的消息发送的时候设置将某个topic发送到指定的partition(也可以根据key的hash与分区进行运算),则在partition中的消息也是有序的,消费的时候将一组同hash的key放到同一个queue中保证同一个消费者下的同一个线程对此queue进行消费。
总结:一个producer->一个partition->一个queue->一个comsumer->一个线程
当对于需要顺序消费的消息数量大的时候,无法保证吞吐量
Kafka 如何保证数据可靠性?
AR(Assigned Replicas):分区中的所有副本统称为AR。所有消息会先发送到leader副本,然后follower副本才能从leader中拉取消息进行同步。但是在同步期间,follower对于leader而言会有一定程度的滞后,这个时候follower和leader并非完全同步状态
OSR(Out Sync Replicas):follower副本与leader副本没有完全同步或滞后的副本集合
ISR(In Sync Replicas):AR中的一个子集,ISR中的副本都是与leader保持完全同步的副本,如果某个在ISR中的follower副本落后于leader副本太多,则会被从ISR中移除,否则如果完全同步,会从OSR中移至ISR集合。
在默认情况下,当leader副本发生故障时,只有在ISR集合中的follower副本才有资格被选举为新leader,而OSR中的副本没有机会(可以通过unclean.leader.election.enable
进行配置)
HW(High Watermark):高水位,它标识了一个特定的消息偏移量(offset),消费者只能拉取到这个水位 offset 之前的消息
LEO(Log End Offset):标识当前日志文件中下一条待写入的消息的offset。在ISR集合中的每个副本都会维护自身的LEO,且HW==LEO。
图中,HW就是8,Consumer只能拉去0~7的消息,LEO就是15,代表消息还没有同步到follower
下面通过一个例子来说明下ISR、HW、LEO之间的关系:
假设由一个leader副本,它有两个follower副本,这时候producer向leader写入3、4两条消息,我们来观察下他们是如何同步的
这个时候写入两条消息到leader,这个时候LEO变为5,然后follower开始同步leader数据
由于网络或其它原因,follower2同步效率较低,还没有完成同步,这个时候HW的offset为4,在此offset之前的消息Consumer都可见
在一定的延迟后,follower2也完成了队leader副本的同步,这时HW为5,LEO为5,且两个follower副本都在ISR集合中,在leader或follower宕机后,会在ISR集合的副本中选举一个来当新的leader副本
HW高水位的弊端:
- 高水位更新需要一轮额外的拉取请求
- leader和follower之间同步会有时间差,可能导致数据不一致或数据丢失
接下来通过一个例子来进行详细说明消息1
消息丢失的过程(min.insync.replicas=1):
对于消息不一致的情况:
就是leader、follower同时宕机,然后由follower先恢复且写入消息1,HW=1,leader恢复启之后发现HW相等,则不进行同步,但实际上他们的消息1不是同一个消息,导致消息不一致
在kafka 0.11.0.0版本中引入Leader Epoch来解决使用高水位导致的数据丢失和数据不一致的问题
所谓leader epoch实际上是一对值:(epoch,offset),epoch标识leader的版本号,从0开始,每变更一次leader,epoch+1;而offset对应于该epoch版本的leader写入第一条消息(成为leader后的首条消息)的位移
(0,0)、(1,120)表示第一个leader从位移0开始写入消息,共写了120条,第二个leader版本号为1,从位移120处开始写入消息
规避数据丢失(图片来源网络):
规避数据不一致(图片来源网络):
Kafka 高性能原因分析
-
顺序写入:顺序写入与随机写入速度相差高达6000倍
-
批量处理:使用消息累加器仅多个消息批量发送,既节省带宽有提高了发送速度
-
消息压缩:kafka支持队消息压缩,支持格式有:gzip、snapply、lz4,可以使用
compression.type
配置 -
页缓存:在消息发送后,并没有等到消息写入磁盘后才返回,而是到page Cache中就返回。page Cache与文件系统的写入由操作系统自动完成
-
零拷贝(zero-copy):Kafka两个重要过程都使用了零拷贝技术,且都是操作系统层面的狭义零拷贝,一是Producer生产的数据存到broker,二是 Consumer从broker读取数据。
正常的非零拷贝的数据拷贝过程:
硬盘—>内核缓冲区—>用户缓冲区—>内核socket缓冲区—>协议引擎-
Producer生产的数据持久化到broker,采用
mmap文件映射
,实现顺序的快速写入;
硬盘—>内核缓冲区—>共享到用户空间缓存,共享而不是复制 -
Customer从broker读取数据,采用
sendfile
,将磁盘文件读到OS内核缓冲区后,直接转到socket buffer进行网络发送。 sendfile() 只是适用于应用程序地址空间不需要对所访问数据进行处理的情况
硬盘—>内核缓冲区—>内核socket缓冲区—>协议引擎
-
关键配置
Broker 配置
名称 | 描述 | 类型 | 默认值 | 配置示例 |
---|---|---|---|---|
log.dir | 保存日志数据的目录(对log.dirs属性的补充) | string | /tmp/kafka-logs | |
log.dirs | 保存日志数据的目录,如果未设置将使用log.dir的配置 | string | null | log.dirs=/home/kafka1,/home/kafka2,/home/kafka3 |
zookeeper.connect | Zookeeper主机地址 | string | zookeeper.connect=zk1:2181,zk2:2181,zk3:2181/kafka1 | |
listeners | 监听器列表 - 使用逗号分隔URI列表和监听器名称。如果侦听器名称不是安全协议,则还必须设置listener.security.protocol.map。指定主机名为0.0.0.0来绑定到所有接口。留空则绑定到默认接口上。 | string | null | 合法监听器列表的示例:PLAINTEXT:// myhost:9092,SSL://:9091 CLIENT://0.0.0.0:9092,REPLICATION:// localhost:9093 |
listener.security.protocol.map | 侦听器名称和安全协议之间的映射。 | string | PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL | |
auto.create.topics.enable | 是否允许在服务器上自动创建topic | boolean | true | 推荐为false |
unclean.leader.election.enable | 指定副本是否能够不再ISR中被选举为leader,即使这样可能会丢失数据 | boolean | false | 推荐为true |
auto.leader.rebalance.enable | 是否允许leader平衡。后台线程会定期检查并触发leader平衡 | boolean | true | 推荐为true |
log.retention.{hours | minutes | ms} | 日志删除的时间阈值(时、分、毫秒) | int |
log.rentention.bytes | 日志删除的大小阈值 | long | -1 | -1,表示没有限制 |
message.max.bytes | kafka允许的最大的一个批次消息大小 | int | 1000012=976KB |
Topic 配置
名称 | 描述 | 类型 | 默认值 | 配置示例 |
---|---|---|---|---|
retention.ms | 规定了该topic消息被保存的时长 | long | 604800000 | |
retention.bytes | 规定了要为该 Topic 预留多大的磁盘空间 | long | -1 | |
max.message.bytes Kafka | Kafka允许接收该topic最大消息大小 | int | 1000012=976KB |
Consumer 配置
名称 | 描述 | 类型 | 默认值 | 配置示例 |
---|---|---|---|---|
auto.commit.interval.ms | 消费者偏移量自动提交给Kafka的频率(以毫秒为单位) | int | 5000 |
总结
文章中分别介绍了Kafka的整体架构、架构设计的细节、Kafka实现高性能所作出的努力及一些常用的配置。同时通过比较多的图解来详细说明一些复杂逻辑。
扩展
mmap 内存映射
传统文件访问:
mmap的作用是映射文件描述符和指定文件的(off_t off)区域至调用进程的(addr,addr *len)的内存区域,如下图所示:
直接将文件映射到内存中,且不需要经过cache、分页物理存储
参考
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)