读懂消息队列:Kafka与RocketMQ
3月份学完了极客时间的《消息列队高手课》专栏,专栏讲解了许多消息队列的基础知识并且对Kafka与RocketMQ两种主流消息队列有精彩的对比分析。学完专栏后将所有要点整理为笔记记录下来。
3月份学完了极客时间的《消息列队高手课》专栏,专栏讲解了许多消息队列的基础知识并且对Kafka与RocketMQ两种主流消息队列有精彩的对比分析。学完专栏后将所有要点整理为笔记记录下来,其他相关知识也搜索了大量资料,博文写得比较凌乱,分为两部分,第一部分是消息队列的基础知识,不涉及具体的消息队列产品。在了解了基础知识后,第二部分着重比较两款消息队列的明星产品——RocketMQ与Kafka,在比较的过程中理解消息队列产品的设计与架构。
消息队列基础知识
1、消息模型:主题与队列
最初的消息队列,就是一个严格意义上的队列。在计算机领域,“队列(Queue)”是一种数据结构,有完整而严格的定义。
早期的消息队列,就是按照“队列”的数据结构来设计的。我们一起看下这个图,生产者(Producer)发消息就是入队操作,消费者(Consumer)收消息就是出队也就是删除操作,服务端存放消息的容器自然就称为“队列”。
如果有多个生产者往同一个队列里面发送消息,这个队列中可以消费到的消息,就是这些生产者生产的所有消息的合集。消息的顺序就是这些生产者发送消息的自然顺序。如果有多个消费者接收同一个队列的消息,这些消费者之间实际上是竞争的关系,每个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到。如果需要将一份消息数据分发给多个消费者,要求每个消费者都能收到全量的消息,例如,对于一份订单数据,风控系统、分析系统、支付系统等都需要接收消息。这个时候,单个队列就满足不了需求,一个可行的解决方式是,为每个消费者创建一个单独的队列,让生产者发送多份。显然这是个比较蠢的做法,同样的一份消息数据被复制到多个队列中会浪费资源,更重要的是,生产者必须知道有多少个消费者。为每个消费者单独发送一份消息,这实际上违背了消息队列“解耦”这个设计初衷。
在发布—订阅模型中,消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。发布者将消息发送到主题中,订阅者在接收消息之前需要先“订阅主题”。“订阅”在这里既是一个动作,同时还可以认为是主题在消费时的一个逻辑副本,每份订阅中,订阅者都可以接收到主题的所有消息。在消息领域的历史上很长的一段时间,队列模式和发布—订阅模式是并存的,有些消息队列同时支持这两种消息模型,比如ActiveMQ。我们仔细对比一下这两种模型,生产者就是发布者,消费者就是订阅者,队列就是主题,并没有本质的区别。它们最大的区别其实就是,一份消息数据能不能被消费多次的问题。实际上,在这种发布—订阅模型中,如果只有一个订阅者,那它和队列模型就基本是一样的了。也就是说,发布—订阅模型在功能层面上是可以兼容队列模型的。
2、如何确保消息可靠传递(消息不丢失)
我们一先来看一下,整个消息从生产到消费的过程中,哪些地方可能会导致丢消息,以及应该如何避免消息丢失。你可以看下这个图,一条消息从生产到消费完成这个过程,可以划分三个阶段,为了方便描述,我给每个阶段分别起了个名字。
生产阶段
消息队列通过最常用的请求确认机制,来保证消息的可靠传递:当你的代码调用发消息方法时,消息队列的客户端会把消息发送到Broker,Broker收到消息后,会给客户端返回一个确认响应,表明消息已经收到了。客户端收到响应后,完成了一次正常消息的发送。只要Producer收到了Broker的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。
在编写发送消息代码时,需要注意,正确处理返回值或者捕获异常,就可以保证这个阶段的消息不会丢失。以 Kafka 为例,我们看一下如何可靠地发送消息。
同步发送时,只要注意捕获异常即可。
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println("消息发送成功。");
} catch (Throwable e) {
System.out.println("消息发送失败!");
System.out.println(e);
}
异步发送时,则需要在回调方法里进行检查,发送失败时要重试。这个地方是需要特别注意的,很多丢消息的原因就是,我们使用了异步发送,却没有在回调中检查发送结果。
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println("消息发送成功。");
} else {
System.out.println("消息发送失败!");
System.out.println(exception);
}
});
另外这里推荐为Producer的retries (重试次数)设置一个比较合理的值,一般是3 ,但是为了保证消息不丢失的话一般会设置比较大一点。设置完成之后,当出现网络问题之后能够自动重试消息发送,避免消息丢失。另外,建议还要设置重试间隔,因为间隔太小的话重试的效果就不明显了,网络波动一次你3次一下子就重试完了。
存储阶段
在存储阶段正常情况下,只要Broker在正常运行,就不会出现丢失消息的问题,但是如果Broker出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。如果对消息的存储可靠性要求非常高,可以通过配置Broker参数来避免因为宕机丢消息。
同步异步刷盘及同步异步复制
对于单个节点的Broker,需要配置Broker参数,在收到消息后,将消息写入磁盘后再给Producer返回确认响应,这样即使发生宕机,由于消息已经被写入磁盘,就不会丢失消息,恢复后还可以继续消费。
例如,在RocketMQ中,需要将刷盘方式flushDiskType配置为SYNC_FLUSH同步刷盘。如果Broker是由多个节点组成的集群,需要将Broker集群配置成:至少将消息发送到2个以上的节点,再给客户端回复发送确认响应。这样当某个Broker宕机时,其他的Broker可以替代宕机的Broker,也不会发生消息丢失。
RocketMQ支持异步刷盘,同步刷盘,同步Replication,异步Replication;
与RocketMQ不同,Kafka使用异步刷盘,异步Replication。
Kafka为分区(Partition)引入了多副本(Replica)机制。分区(Partition)中的多个副本之间会有一个叫做leader的家伙,其他副本是follower。我们发送的消息会被发送到leader,然后follower才能从leader中拉取消息进行同步。生产者和消费者只与leader交互。你可以理解为其他副本只是leader的拷贝,它们的存在只是为了保证消息存储的安全性。
试想一种情况:假如leader所在的broker突然挂掉,那么就要从follower副本重新选出一个leader,但是leader的数据如果还有一些没有被follower副本的同步的话,就会造成消息丢失,解决办法就是我们设置acks=all。acks是Kafka生产者(Producer) 很重要的一个参数。acks的默认值即为1,代表我们的消息被leader副本接收之后就算被成功发送。当我们配置acks=all代表所有副本都接收到该消息之后该消息才算真正成功被发送。
设置replication.factor >= 3,为了保证leader能有多个follower副本,我们一般会为topic设置replication.factor >= 3。这样就可以保证每个分区(partition) 至少有3个副本。虽然造成了数据冗余,但是带来了数据的安全性。
设置min.insync.replicas > 1,这样配置代表消息至少要被写入到2个副本才算是被成功发送。min.insync.replicas的默认值为1,在实际生产中应尽量避免默认值1。
消费阶段
对于Kafka,消息在被追加到Partition(分区)的时候都会分配一个特定的偏移量(offset)。偏移量(offset)表示Consumer当前消费到的 Partition(分区)的所在的位置。Kafka通过偏移量(offset)可以保证消息在分区内的顺序性。
消费阶段采用和生产阶段类似的确认机制来保证消息的可靠传递,客户端从Broker拉取消息后,执行用户的消费业务逻辑,成功后,才会给Broker发送消费确认响应。如果Broker没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程中丢失,也不会因为客户端在执行消费逻辑中出错导致丢失。
在编写消费代码时需要注意的是,不要在收到消息后就立即发送消费确认,而是应该在执行完所有消费业务逻辑之后,再发送消费确认。对于Kafka应手动关闭消费者拉取消息后自动提交offset的功能,每次在真正消费完消息之后之后再由消费者手动提交offset。
3、如何处理消息重复
首先需要明确的一点是:消息重复的情况必然存在。
在MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)中,给出了三种传递消息时能够提供的服务质量标准,这三种服务质量从低到高依次是:
- At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,允许丢消息。一般都是一些对消息可靠性要求不太高的监控场景使用。
- At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
- Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。
这个服务质量标准不仅适用于MQTT,对所有的消息队列都是适用的。我们现在常用的绝大部分消息队列提供的服务质量都是At least once,包括RocketMQ、RabbitMQ和Kafka都是这样。
既然消息队列无法保证消息不重复,就需要我们的消费代码能够接受“消息是可能会重复的”这一现状,然后,通过一些方法来消除重复消息对业务的影响。一般解决重复消息的办法是,在消费端,让我们消费消息的操作具备幂等性。
At least once + 幂等消费 = Exactly once,以下是几种常用的设计幂等操作的方法:
- 数据库唯一约束实现幂等,以消息体中某个具备唯一特性的字段为唯一索引建立消息流水表,每消费一条消息就插入一行记录。
- 乐观锁实现幂等,消息体中传递数据的version,获取数据时根据version来查,消费成功后更新version
- 分布式锁幂等
4、事务消息保障业务处理与消息投递的原子性
事务消息需要消息队列提供相应的功能才能实现,Kafka和RocketMQ都提供了事务相关功能。以在电商平台上下单购物的场景为例。
首先,订单系统在消息队列上开启一个事务。然后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的唯一区别是,在事务提交之前,对于消费者来说,这个消息是不可见的。
半消息发送成功后,订单系统就可以执行本地事务了,在订单库中创建一条订单记录,并提交订单库的数据库事务。然后根据本地事务的执行结果决定提交或者回滚事务消息。如果订单创建成功,那就提交事务消息,购物车系统就可以消费到这条消息继续后续的流程。如果订单创建失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了最终一致性事务。
如果你足够细心,可能已经发现了,这个实现过程中,有一个问题是没有解决的。如果在第四步提交事务消息时失败了怎么办?对于这个问题,Kafka和RocketMQ给出了两种不同的解决方案。Kafka的解决方案比较简单粗暴,直接抛出异常,让用户自行处理。我们可以在业务代码中反复重试提交,直到提交成功,或者删除之前创建的订单进行补偿。RocketMQ则给出了另外一种解决方案,如下图所示。
在RocketMQ中的事务实现中,增加了事务反查的机制来解决事务消息提交失败的问题。如果Producer也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ的Broker没有收到提交或者回滚的请求,Broker会定期去Producer上反查这个事务对应的本地事务的状态,然后根据反查结果决定提交或者回滚这个事务。
为了支撑这个事务反查机制,我们的业务代码需要实现一个反查本地事务状态的接口,告知RocketMQ本地事务是成功还是失败。
通过上面这幅图,我们可以看出,事务不一致的两种情况中,永远不会发生“订单创建失败,购物车处理成功(从购物车中移除商品)”这样的情况。因为如果A服务本地事务都失败了,那B服务永远不会执行任何操作,因为消息压根就不会传到B服务。
那么订单创建成功,购物车处理失败这种情况会不会发生呢。答案是会的,因为A服务只负责当我消息执行成功了,保证消息能够送达到B,至于B服务接到消息后最终的执行结果成不成功A并不管。那B服务失败了怎么办?如果B最终执行失败,几乎可以断定就是代码有问题所以才引起的异常,因为消费端RocketMQ有重试机制,如果不是代码问题一般重试几次就能成功。如果是代码的原因引起多次重试失败后,也没有关系,将该异常记录下来,由人工处理,人工兜底处理后,就可以让事务达到最终的一致性。
RocketMQ是一种最终一致性的分布式事务,就是说它保证的是消息最终一致性,而不是像2PC、3PC、TCC那样实现强一致的分布式事务。
5、如何保证顺序消费消息
哪些场景要求顺序消费?
最常见的场景就是数据同步(从一个库同步到另一个库),当数据量大的时候数据同步压力也是很大的。这种情况我们都是怼到队列里面去,然后慢慢消费的,那问题就来了呀,我们在数据库同时对一个Id的数据进行了增、改、删三个操作,但是你消息发过去消费的时候变成了改,删、增,这样数据就不对了。
本来一条数据应该删掉了,结果在你那儿却还在,这不是出大问题!
通常我们所说的顺序消费消息指的是生产者按照顺序发送,消费者按照顺序进行消费,听起来简单,但做起来却非常困难。典型的要求消息顺序消费的场景就是基于binlog作数据同步。
我们都知道无论是Kafka还是RocketMQ,每个主题下面都有若干分区(RocketMQ叫队列),如果消息被分配到不同的分区中,那么Kafka是不能保证消息的消费顺序的,因为每个分区都分配到一个消费者,此时无法保证消费者的消费先后,因此如果需要进行消息具有消费顺序性,可以在生产端指定这一类消息的key,这类消息都用相同的key进行消息发送,kafka就会根据key哈希取模选取其中一个分区进行存储,由于一个分区只能由一个消费者进行监听消费,因此这时候一个分区中的消息就具有消费的顺序性了。
发送端
但以上情况只是在正常情况下可以保证顺序消息,但发生故障后,就没办法保证消息的顺序了,我总结以下两点:
- 当生产端是异步发送时,此时有消息发送失败,比如你异步发送了1、2、3消息,2消息发送异常要重发,这时候顺序就乱了;
- 当部分Broker宕机,会触发Reblance,导致同一Topic下的分区数量有变化,此时生产端有可能会把顺序消息发送到不同的分区,这时会发生短暂消息顺序不一致的现象,如果生产端指定分区发送,则该分区所在的Broker宕机后将直接不可用;
针对以上两点,生产端必须保证单线程同步发送,这还好解决,针对第二点,想要做到严格的消息顺序,就要保证当集群出现故障后集群立马不可用,或者主题做成单分区,但这么做大大牺牲了集群的高可用,单分区也会令集群性能大大降低。
存储端
对于存储端,要保证消息顺序,会有以下几个问题:
- 要保证顺序的消息不能多分区存储,也就是只能放置在同一个分区中,在Kafka中,它叫做partition;在RocketMQ中,它叫做queue。 如果消息分散到多个分区里面,自然不能保证顺序;
- 即使顺序消息都存储在一个分区中,还会有第2个问题。Broker挂了之后,能否切换到其他副本机器?也就是高可用问题;
比如你当前的Broker挂了,上面还有消息没有消费完。此时切换到副本机器,可用性保证了,但消息顺序就乱掉了。
要想保证,一方面要同步复制不能异步复制;另1方面得保证,切机器之前,挂掉的机器上面,所有消息必须消费完了,不能有残留,很明显,这个很难!!!
消费端
对于消费端,不能并行消费,即不能开多线程或者多个客户端消费同1个队列。RocketMQ会为每个队列分配一个PullRequest,并将其放入pullRequestQueue,PullMessageService线程会不断轮询从pullRequestQueue中取出PullRequest去拉取消息,接着将拉取到的消息给到ConsumeMessageService处理,ConsumeMessageService有两个子接口:
// 并发消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageConcurrentlyService;
// 顺序消息消费逻辑实现类
org.apache.rocketmq.client.impl.consumer.ConsumeMessageOrderlyService;
ConsumeMessageConcurrentlyService内部有一个线程池,用于并发消费,同样地,如果需要顺序消费,那么RocketMQ提供了 ConsumeMessageOrderlyService类进行顺序消息消费处理。从ConsumeMessageOrderlyService源码中能够看出RocketMQ能够实现局部消费顺序,主要有以下两点:
- RocketMQ会为每个队列建一个对象锁,在消费某一个消息消费队列时先加锁,意味着一个消费者内消费线程池中的并发度是消费队列级别,同一个消费队列在同一时刻只会被一个线程消费,其他线程排队消费。保证了当前Consumer内,同一队列的消息进行串行消费。
- 向Broker端请求锁定当前顺序消费的队列,防止在消费过程中被分配给其它消费者处理从而打乱消费顺序。
从上面的分析可以看出,要保证消息的严格有序,有多么困难!发送端和接收端的问题,还好解决一点,限制异步发送,限制并行消费。但对于存储端,机器挂了之后,切换的问题,就很难解决了。你切换了,可能消息就会乱;你不切换,那就暂时不可用。这2者之间,就需要权衡了。结论是:无论RocketMQ还是Kafka,都不保证消息的严格有序消费!
7、消费模式
消费模式分为推(push)和拉(pull)两种方式,两种方式各有优劣。其中推模式是队列将消息推送给客户端,实时性高,但容易导致客户端流量过高。而拉模式是客户端主动拉取队列中的消息,实时性较差,但是可以控制拉取流量。
Kafka是pull模式,RocketMQ和RabbitMQ均支持选择push或pull模式。
RocketMQ与Kafka对比
1、Kafka的架构及消息存储
在所有的存储系统中,消息队列的存储可能是最简单的。每个主题(Topic)包含若干个分区(Partition),每个分区其实就是一个WAL(WriteAheadLog),写入的时候只能在尾部追加,不允许修改。读取的时候,根据一个索引序号进行查询,然后连续顺序往下读。
如下图,一个Kafka架构包括若干个Producer(服务器日志、业务数据、web前端产生的page view等),若干个Broker(Kafka支持水平扩展,一般broker数量越多集群的吞吐量越大),若干个Consumer group,一个Zookeeper集群,Kafka通过Zookeeper管理集群配置、选举Leader、Consumer group发生变化时进行Rebalance。
Topic & Partition的关系是什么
一个Topic为一类消息,每条消息必须指定一个Topic。物理上,一个Topic分成一个或多个partition,每个partition有多个副本分布在不同的broker中,如下图所示,一个机器可能既是topicA_partition_1的leader又是topicB_partition_2的follower。
每个partition在存储层面是一个append log文件,发布到此partition的消息会追加到log文件的尾部,为顺序写人磁盘(顺序写磁盘比随机写内存的效率还要高)。每条消息在log文件中的位置成为offset(偏移量),offset为一个long型数字,唯一标记一条消息。写入过程如下图所示,Kafka只能保证单个partition中消息是有序的,而不保证topic下不同partition内消息的顺序。
这种存储方式,对于每个文件来说是顺序IO,但是当并发的读写多个partition的时候,就对应多个文件的顺序IO,只要partition的数量足够大,表现在文件系统的磁盘层面还是随机IO。因此当出现了多个partition或者topic个数过多时,Kafka的性能会急剧下降。
另外,Kafka没有重试机制不支持消息重试,也没有死信队列,因此使用Kafka做消息队列时,如果遇到了消息者收到消息进行业务处理时抛异常,就会很难进行下一步处理。应对这种场景,需要自己定制实现消息重投的功能。
2、RocketMQ的架构及消息存储
RocketMQ的部署架构如下图所示,在早期的RocketMQ版本中,是有依赖ZK的。而现在的版本中已经去掉了对ZK的依赖,转而使用自己开发的NameServer来实现元数据(Topic路由信息)的管理,并且这个NameServer是无状态的,可以随意的部署多台,其代码也非常简单,非常轻量。
NameServer内部维护了topic和broker之间的对应关系,并且和所有broker保持心跳连接,producer和consumer需要发布或者消费消息的时候,向NameServer发出请求来获取连接的broker的信息。
RocketMQ的启动流程可以描述为:
- Broker消息服务器启动,向所有NameServer注册,NameServer与每台Broker服务器保持长连接,并定时检测 Broker是否存活,如果检测到broker宕机,则从路由注册表中将其移除。
- 消息生产者(Producer)在发送消息之前先从NameServer获取Broker服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息发送。
- 消息消费者(Consumer)在拉取消息之前先从NameServer获取Broker服务器地址列表,然后根据负载算法从列表中选择一台消息服务器进行消息拉取。
不同于Kafka里面,一台机器可以同时是Master和Slave。在RocketMQ里面,1台机器只能要么是Master,要么是Slave。这个在初始的机器配置里面,就定死了,其架构拓扑图如下。
在这里,RocketMQ里面queue这个概念,就对应Kafka里面partition。图中有3个Master, 6个Slave,那对应到物理上面,就是9台机器3个broker。
通过对比可以看出,Kafka和RocketMQ在Master/Slave/Broker这个3个概念上的差异。这个差异,也就影响到topic&partition这种逻辑概念和Master/Slave/Broker这些物理概念上的映射关系。
具体来讲就是:在Kafka里面,Maser/Slave是选举出来的,而RocketMQ不需要选举!在Kafka里面,每个partition的Master是谁Slave是谁要通过选举决定。Master/Slave是动态的,当Master挂了之后,会有1个Slave切换成Master。
而在RocketMQ中,不需要选举,Master/Slave的角色也是固定的。当一个Master挂了之后,你可以写到其他Master上,但不会说一个Slave切换成Master。这种简化,使得RocketMQ可以不依赖ZooKeeper就很好的管理Topic&queue和物理机器的映射关系了,也实现了高可用。
为了解决Kafka的设计中当topic或partition过多,顺序IO变随机IO的问题,RocketMQ采用了单一的日志文件,即把同1台机器上面所有topic的所有queue的消息,存放在一个文件里面,从而避免了随机的磁盘写入。其存储结构如下图所示。
所有消息都存在一个单一的CommitLog文件里面(完全的顺序写),然后有后台线程异步将消息在CommitLog的位置存储到ConsumeQueue(消息消费队列)文件中,再由Consumer进行消费。
需要说明的是:Kafka针对Producer和Consumer使用了同1份存储结构,而RocketMQ却为Producer和Consumer分别设计了不同的存储结构,Producer对应CommitLog文件,Consumer对应ConsumeQueue文件。ConsumeQueue文件中并不需要存储消息的内容,而存储的是消息在CommitLog中的offset。也就是说,ConsumeQueue其实是CommitLog的一个索引文件。
CommitLog文件默认大小为1G,也可以通过配置属性来改变默认大小。
这里之所以可以用“异步线程”,也是因为消息队列天生就是用来“缓冲消息”的。只要消息到了CommitLog,发送的消息也就不会丢。只要消息不丢,那就有了充足的回旋余地,用一个后台线程慢慢同步到ConsumeQueue,再由Consumer消费。可以说,这也是在消息队列内部的一个典型的“最终一致性”的案例。Producer发了消息,进了CommitLog,此时Consumer并不可见。但没关系,只要消息不丢,消息的offset最终肯定会写入ConsumeQueue,让Consumer可以消费。很显然,Consumer消费消息的时候,要读2次:先读ConsumeQueue得到offset,再读CommitLog得到消息内容(随机读)。
3、Kafka的消息查找
Kafka的存储以Partition为单位,每个Partition包含一组消息文件(Segment file)和一组索引文件(Index),并且消息文件和索引文件一一对应,具有相同的文件名(但文件扩展名不一样),文件名就是这个文件中第一条消息的索引序号。
每个索引中保存索引序号(也就是这条消息是这个分区中的第几条消息)和对应的消息在消息文件中的绝对位置。在索引的设计上,Kafka采用的是稀疏索引,为了节省存储空间,它不会为每一条消息都创建索引,而是每隔几条消息创建一条索引。
写入消息的时候非常简单,就是在消息文件尾部连续追加写入,一个文件写满了再写下一个文件。查找消息时,首先根据文件名找到所在的索引文件,然后用二分法遍历索引文件内的索引,在里面找到离目标消息最近的索引,再去消息文件中,找到这条最近的索引指向的消息位置,从这个位置开始顺序遍历消息文件,找到目标消息。
可以看到,寻址过程还是需要一定时间的。一旦找到消息位置后,就可以批量顺序读取,不必每条消息都要进行一次寻址。
4、RocketMQ的消息查找
RocketMQ的存储以Broker为单位。它的存储也是分为消息文件和索引文件,但是在RocketMQ中,每个Broker只有一组commitLog文件,它把在这个 Broker上的所有主题的消息都存在这一组文件中。索引文件和Kafka一样,是按照主题和队列分别建立的,每个队列对应一组索引文件,这组索引文件在RocketMQ中称为ConsumerQueue。
ConsumeQueue是RocketMQ用来存储消息的物理offset、size和tagscode的数据结构。
RocketMQ引入Hash索引机制为消息建立定长稠密索引,它为每一条消息都建立索引,每个索引的长度(注意不是消息长度)是固定的20个字节。
写入消息的时候,Broker上所有主题、所有队列的消息按照自然顺序追加写入到同一个消息文件中,一个文件写满了再写下一个文件。查找消息的时候,可以直接根据队列的消息序号,计算出索引的全局位置(索引序号 x 索引固定长度20),然后直接读取这条索引,再根据索引中记录的消息的全局位置,找到消息。可以看到,这里两次寻址都是绝对位置寻址,比Kafka的查找是要快的。
两种存储结构的对比如上图所示,可以看到它们有很多共通的地方,都是采用消息文件 + 索引文件的存储方式,索引文件的名字都是第1条消息的索引序号,索引中记录了消息的位置等等。
在消息文件的存储粒度上,Kafka以分区为单位,粒度更细,优点是更加灵活,很容易进行数据迁移和扩容。RocketMQ以Broker为单位,较粗的粒度牺牲了灵活性,带来的好处是,在写入的时候,同时写入的文件更少,有更好的批量写入性能(不同主题和分区的数据可以组成一批一起写入),更多的顺序写入,尤其是在Broker上有很多主题和分区的情况下,有更好的写入性能。
索引设计上,RocketMQ和Kafka分别采用了稠密和稀疏索引,稠密索引需要更多的存储空间,但查找性能更好,稀疏索引能节省一些存储空间,代价是牺牲了查找性能。
5、高效的文件读写及传输
RocketMQ和Kafka都基于磁盘做持久化,使用文件系统存储消息,两者都使用了PageCache,且都利用「零拷贝」技术的方式来提高I/O 的吞吐率,这也是Kafka在处理海量数据为什么这么快的原因之一。不同的是Kafka的零拷贝使用的是sendFile,而RocketMQ使用的是mmap虚拟内存映射的方式。
PageCache是OS对文件的缓存,用于加速对文件的读写。对于数据文件的写入,OS会先写入至PageCache内,随后通过异步的方式由pdflush内核线程将Cache内的数据刷盘至物理磁盘上。对于数据文件的读取,如果一次读取文件时出现未命中PageCache的情况,OS从物理磁盘上访问读取文件的同时,会顺序对其他相邻块的数据文件进行预读取。对于文件的顺序读写操作来说,读和写的区域都在OS的PageCache内,此时读写性能接近于内存。
同步刷盘与异步刷盘唯一的区别是,异步刷盘写完PageCache就直接返回,同步刷盘需要等待刷盘完成后才返回。
传统的文件传输低效在哪儿
传统的文件传输过程如下图,数据读取和写入是从用户空间到内核空间来回复制,而内核空间的数据是通过操作系统层面的I/O接口从磁盘读取或写入。总结下来就是,一次操作中有2次上下文切换和2次数据copy(DMA Copy + CPU copy)。
- 系统调用read导致了从用户空间到内核空间的上下文切换。DMA模块从磁盘中读取文件内容,并将其存储在内核空间的缓冲区内,完成了第1次复制。
- 数据从内核空间缓冲区复制到用户空间缓冲区,之后系统调用read返回,这导致了从内核空间向用户空间的上下文切换。此时,需要的数据已存放在指定的用户空间缓冲区内。
- 系统调用write导致从用户空间到内核空间的上下文切换。数据从用户空间缓冲区被再次复制到内核空间缓冲区,完成了第3次复制。不过,这次数据存放在内核空间中与使用的socket相关的特定缓冲区中,而不是步骤1中的缓冲区。
- 系统调用返回,导致了第4次上下文切换。
回过头看这个文件传输的过程,我们只是搬运一份数据,结果却搬运了4次,过多的数据拷贝无疑会消耗CPU资源,大大降低了系统性能。要想提高文件传输的性能,就需要减少「用户态与内核态的上下文切换」和「内存拷贝」的次数。用户空间没有权限操作磁盘或网卡,内核的权限最高,这些操作设备的过程都需要交由操作系统内核来完成,所以一般要通过内核去完成某些任务的时候,就需要使用操作系统提供的系统调用函数。而一次系统调用必然会发生2次上下文切换:首先从用户态切换到内核态,当内核执行完任务后,再切换回用户态交由进程代码执行。所以,要想减少上下文切换到次数,就要减少系统调用的次数。
零拷贝如何实现
零拷贝技术实现的方式通常有 2 种:
- mmap + write
- sendfile
传统文件传输过程中存在很多的数据冗余。某些冗余可以被消除,以减少开销、提高性能。
OS的mmap内存映射技术,通过MMU(内存管理单元)映射文件,将文件直接映射到用户态的内存地址,使得对文件的操作不再是write/read,而转化为直接对内存地址的操作,使随机读写文件和读写内存相似的速度。mmap把文件映射到用户空间里的虚拟内存,省去了从内核缓冲区复制到用户空间的过程,文件的位置在虚拟内存中有了对应的地址,可以像操作内存一样操作这个文件,这样的文件读写少了数据从内核缓存到用户空间的拷贝,效率很高,过程如下图所示:
- mmap系统调用导致文件的内容通过DMA模块被复制到内核缓冲区中,该缓冲区之后与用户进程共享,这样就内核缓冲区与用户缓冲区之间的复制就不会发生。
- write系统调用导致内核将数据从内核缓冲区复制到与socket相关联的内核缓冲区中。
- DMA模块将数据由socket的缓冲区传递给协议引擎时,第3次复制发生。
通过使用mmap()来代替read()系统调用, 可以减少一次数据拷贝的过程。但这还不是最理想的零拷贝,因为仍然需要通过CPU把内核缓冲区的数据拷贝到socket 缓冲区里,而且仍然需要4次上下文切换,因为系统调用还是2 次。
在Linux内核版本2.1中,提供了一个专门用于传输文件的系统调用函数 sendfile(),可以替代read()和write()这两个系统调用,这样就可以减少一次系统调用,也就减少了2次上下文切换的开销。该系统调用,可以直接把内核缓冲区里的数据拷贝到socket缓冲区里,不再拷贝到用户态,这样就只有2次上下文切换,和3次数据拷贝。
但是这还不是真正的零拷贝技术,如果网卡支持SG-DMA(The Scatter-Gather Direct Memory Access)技术(和普通的DMA有所不同),我们可以进一步减少通过CPU 把内核缓冲区里的数据拷贝到socket缓冲区的过程。从Linux内核2.4版本开始起,对于支持网卡支持SG-DMA 技术的情况下,sendfile()系统调用的过程有所变化,网卡的SG-DM 控制器直接将内核缓存中的数据拷贝到网卡的缓冲区里,此过程不需要将数据从操作系统内核缓冲区拷贝到socket缓冲区中,这样就减少了一次数据拷贝。所以,整个过程之中,只进行了2次数据拷贝。
这就是所谓的零拷贝(Zero-copy)技术,因为我们没有在内存层面去拷贝数据,也就是说全程没有通过CPU来搬运数据,所有的数据都是通过DMA 来进行传输的。
零拷贝技术的文件传输方式相比传统文件传输的方式,减少了2次上下文切换和数据拷贝次数,只需要2次上下文切换和数据拷贝次数,就可以完成文件的传输,而且2次的数据拷贝过程,都不需要通过CPU,2次都是由DMA 来搬运。总体来看,零拷贝技术可以把文件传输的性能提高至少一倍以上。
6、负载均衡
Kafka生产者负载均衡
分区器(Partitioner接口)是生产者层面的负载均衡。Kafka生产者生产消息时,根据分区器将消息投递到指定的分区中,所以Kafka的生产者负载均衡很大程度上依赖于分区器的实现。
Kafka提供了分区器实现类—org.apache.kafka.clients.producer.internals.DefaultPartitioner类。它的分区策略是:
- 如果消息中指定了分区,则发送到指定分区;
- 如果未指定分区但存在key,则根据序列化key使用murmur2哈希算法对分区数取模计算出一个分区号;拥有相同key值的消息被写入同一个分区;
- 如果不存在分区或key,默认分区器会基于Round-Robin轮询算法为每条消息分配分区;
如果不想使用Kafka默认的分区器,用户可以自行实现Partitioner接口,自行实现分区方法。
Kafka消费者负载均衡
Kafka具有消费分组的概念,某个Topic的某个partition只能由一个Consumer group中的一个Consmer消费。但如果两个Consmer不在同一个Consumer group,那么他们是可以同时消费某Topic的同一个partition的。
既然Kafka允许多个Consumer对多个partition同时消费且producer投递的消息也落于不同的partition中,那么在这种情况下,无论是存储消息还是消费消息,全局的顺序肯定是不可控的。但是要知道Kafka的partition是只能被一个Consumer(同一Consumer group下)消费的,那么如果能让消息全部都落入同一个partition就可以保证顺序,我们投递消息时通过设定消息的key就能让Kafka的producer根据key进行hash选择要写入的partition,就能保证消息写入的顺序。
Kafka目前主流的partition分配策略可以通过partition.assignment.strategy参数指定,主流的有2种。
-
Range(默认): Range策略是针对topic而言的,在进行分区分配时,为了尽可能保证所有consumer均匀的消费分区,会对同一个topic中的partition按照序号排序,并对consumer按照字典顺序排序。然后为每个consumer划分固定的分区范围,如果不够平均分配,那么排序靠前的消费者会被多分配分区。
-
Round-Robin轮询:比如一个topic下有3个分区,那么第一条消息被发送到分区0,第二条被发送到分区1,第三条被发送到分区2,以此类推,当生产第四条消息时又会重新开始。
在Kafka中,当有新consumer加入、旧consumer宕机(或者缩容)、Topic的partition数量发生变化时都会触发Reblance,为了保证大体上partition和consumer的均衡性,提升topic的并发消费能力,所以会有Rebalance机制。
RocketMQ 中的负载均衡都在Client端完成,具体来说,主要可以分为Producer端发送消息时的负载均衡和Consumer端订阅消息的负载均衡。
RocketMQ生产者负载均衡
一个broker通常是一个服务器节点,broker分为master和slave,master和slave存储的数据一样,slave从master同步数据。nameServer与每个集群成员保持心跳,保存着Topic-Broker路由信息,同一个Topic的队列会分布在不同的服务器上。
对于非顺序消息(普通消息、定时/延时消息、事务消息)场景,默认且只能使用RoundRobin模式的负载均衡策略。
Producer每个实例在发消息的时候,默认会通过轮询队列的方式发送,以达到让消息平均落在不同的队列上,即每个队列接收平均的消息量。发送时要指定消息的topic、tags、keys,消息不能指定投递到哪个队列。
对于顺序消息场景,默认且只能使用MessageGroupHash模式的负载均衡策略,该策略的原理是Hash算法。MessageGroupHash模式下,生产者发送消息时,以消息组为粒度,按照内置的Hash算法,将相同消息组的消息分配到同一队列中,保证同一消息组的消息按照发送的先后顺序存储。
如图中所示,消息G1-M1、G1-M2、G1-M3属于消息组1中的第一条消息、第二条消息和第三条消息,生产者按照Hash算法将这几条消息分配到同一队列MessageQueue1中,且在队列中保存的先后顺序和发送顺序一致。
MessageGroupHash模式的生产者负载均衡策略仅适用于顺序性的消息,可以很好地保证同消息组内消息的顺序性。但是若不同消息组的消息数量差异较大,MessageGroupHash模式将不能很好地保障消息的均衡分配和性能扩展能力。在极端场景下,可能会出现大部分消息集中在少数队列中的情况,建议设计消息组时尽量将消息离散开,不要集中在少量消息组中。
RocketMQ消费者负载均衡
注意,RocketMQ消费模式有集群消费和广播消费,因为广播模式所有的Consumer都会收到全量消息,所以RocketMQ的负载均衡只针对于Consumer集群消费的模式。
一个Topic中的Queue只能由Consumer Group中的一个Consumer进行消费,而一个Consumer可以同时消费多个Queue中的消息。那么Queue与Consumer间的配对关系是如何确定的,即Queue要分配给哪个Consumer进行消费,也是有算法策略的,常见的有四种策略,分别是:平均分配策略、环形平均分配策略、一致性hash分配策略、同机房分配策略。
- 平均分配策略(默认,AllocateMessageQueueAveragely)
该算法是根据【avg = QueueCount / ConsumerCount 】的计算结果进行分配的,如果能够整除,则按顺序将avg个Queue逐个分配,如果不能整除,则将多余出的Queue按照Consumer顺序逐个分配。 - 环形平均分配策略(AllocateMessageQueueAveragelyByCircle)
环形平均分配即轮询式分配,根据消费者的顺序,依次由Queue队列组成的环形图轮流的给一个消费者逐个分配,该方法不需要提前计算。 - 一致性哈希分配策略(AllocateMessageQueueConsistentHash)
该算法会将consumer的hash值作为Node节点存放到虚拟的hash环上,然后将queue的hash值也放到hash环上,通过顺时针方向,距离queue最近的那个consumer就是该queue要分配的consumer。一致性哈希算法可以有效减少由于消费者组扩容或缩容所带来的大量的Rebalance,所以它适合用在Consumer数量变化较频繁的场景 。 - 同机房分配策略(AllocateMessageQueueByMachineRoom)
该算法会根据queue的部署机房位置和consumer的位置,过滤出与当前consumer相同机房的queue。然后按照平均分配策略或环形平均分配策略对同机房queue进行分配,如果没有同机房queue,则按照平均分配策略或环形平均分配策略对所有queue进行分配。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)