RabbitMQ - 简介 和 知识详解
RabbitMQ 是一款开源的,使用 Erlang 语言编写的,基于 AMQP 协议的消息中间件。AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP 协议对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。当然其没有kafka性能好,
一、介绍
RabbitMQ 是一款开源的,使用 Erlang 语言编写的,基于 AMQP 协议的消息中间件。
AMQP 的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
AMQP 协议对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
RabbitMQ的可靠性是非常好的,数据能够保证百分之百的不丢失。可以使用镜像队列,它的稳定性非常好。当然
其没有kafka性能好,但是要比AvtiveMQ性能要好很多。
二、常用组件 - 基本概念
-
Producer
(生产者):生产者是消息的发送方。它负责创建并发送消息到 RabbitMQ 的交换机。 -
Consumer
(消费者):消费者是消息的接收方。它订阅一个或多个队列,并从队列中接收消息进行处理。 -
Message
(消息):消息是生产者发送给 RabbitMQ 的数据包。它包含了要传递的信息和一些可选的属性。 -
Queue
(队列):队列是消息的缓冲区,用于存储待处理的消息。消费者从队列中获取消息进行处理,并且消息按照先进先出的顺序进行传递。 -
Exchange
(交换机):交换机是消息的分发中心。它接收来自生产者的消息,并根据一定的规则将消息路由到一个或多个队列中。 -
Binding
(绑定):绑定是交换机与队列之间的关系。它定义了交换机如何将消息路由到队列。绑定可以指定一个或多个队列,并可以使用路由键来过滤消息。 -
Routing Key
(路由键):在消息被发送到交换机时,生产者可以指定一个路由键。交换机根据路由键将消息路由到绑定了相同路由键的队列。 -
Exchange Type
(交换机类型):交换机类型定义了消息的路由策略。RabbitMQ 支持多种交换机类型,如直连交换机、主题交换机、广播交换机等。 -
Message Acknowledgement
(消息确认):消费者在处理完一条消息后发送一个消息确认给 RabbitMQ,告知消息已经被成功处理。如果消费者崩溃或断开连接,RabbitMQ 将重新将该消息发送给其他消费者处理。 -
Durability
(持久化):持久化是指将队列和消息保存到磁盘上,以防止数据丢失。通过将队列和消息标记为持久化,即使 RabbitMQ 服务器重启,也能保证数据的安全性。 -
Connection
(连接):连接是客户端与 RabbitMQ 服务器之间的网络连接。客户端通过连接与 RabbitMQ 进行通信,包括发送和接收消息等操作。 -
Channel
(通道):通道是在连接上创建的虚拟连接。通道允许在单个连接上进行多个并发操作,提高性能和资源利用率。通道是轻量级的,创建和销毁通道的开销较小。 -
Broker
(经纪人):经纪人是 RabbitMQ 服务器的核心组件,它负责接收、存储、转发消息,并管理交换机、队列和绑定等资源。经纪人处理来自生产者和消费者的消息,并根据规则进行路由和分发。 -
Virtual Host
(虚拟主机):虚拟主机是 RabbitMQ 的逻辑隔离机制,它将 RabbitMQ 服务器划分为多个独立的消息空间。每个虚拟主机都有自己的交换机、队列、绑定和权限控制规则,可以实现不同应用程序之间的隔离和安全性。
三、Exchange Type(交换机类型)
-
Direct Exchange
(直连交换机):直连交换机是最简单的交换机类型。它使用消息的路由键(Routing Key)与绑定的队列的路由键进行精确匹配。只有当消息的路由键与绑定的队列的路由键完全相同时,消息才会被路由到该队列。 -
Topic Exchange
(主题交换机 - 模糊匹配):主题交换机允许使用通配符匹配
的方式进行消息的路由。它使用消息的路由键与绑定的队列的路由键进行模式匹配。可以使用"*"
来匹配一个单词,或者使用"#"
匹配零个或多个单词。这种灵活的匹配方式使得主题交换机非常适合处理多种复杂的消息路由需求。 -
Fanout Exchange
(广播交换机):广播交换机将收到的消息广播到所有绑定的队列中。它忽略消息的路由键,只关注与交换机绑定的队列。当需要将消息发送给所有消费者时,广播交换机是一个非常有用的选择。 -
Headers Exchange
(头交换机 -性能较低不常用
):头交换机根据消息的头部属性(Headers)进行匹配和路由。它不使用消息的路由`键,而是根据消息的头部属性来确定消息应该被路由到哪个队列。头交换机适用于复杂的匹配规则,但在实际应用中使用较少。
四、优缺点
优点:
- 高可用性:支持集群和镜像队列机制,可以实现高可用性和负载均衡。
- 可靠性:支持持久化消息和事务机制,确保消息不会丢失和重复消费。
- 灵活性:支持多种交换机类型和路由规则,可以满足不同场景下的需求。
- 可扩展性:支持分布式架构和多种编程语言,适用于大规模和复杂的应用程序。
缺点:
- 性能瓶颈:由于需要进行消息持久化和路由计算,RabbitMQ 的吞吐量和延迟可能会受到限制。
- 配置复杂:RabbitMQ 的配置比较复杂,需要熟悉 AMQP 协议和 RabbitMQ 的组件模型。
- 不适合低延迟场景:RabbitMQ 的延迟可能会受到消息持久化和路由计算的影响,不适合对延迟要求比较高的场景。
五、重复消费
RabbitMQ消息造成重复消费时,可能是由于以下原因导致的:
-
消费者异常终止
:当消费者异常终止时,RabbitMQ 无法检测到该消费者已经处理过的消息。如果该消费者重新启动并重新订阅队列,它将再次消费相同的消息,导致重复消费。
-
消费者消费速度过慢
:如果消费者消费速度过慢,队列中积累的消息将会增加。当一个消费者正在处理一条消息时,其他消费者可能会同时从队列中取出相同的消息并进行处理,导致重复消费。
为了避免 RabbitMQ 重复消费,可以采取以下措施:
-
消费者异常终止时
:可以使用消息确认机制(ack/nack)来确保消息只被处理一次。当消费者接收到消息后,先将消息设置为“未确认”状态,然后进行消息处理。当消息处理完成后,消费者发送确认消息给 RabbitMQ,RabbitMQ 将该消息标记为“已确认”。如果消费者在处理消息期间发生异常终止,RabbitMQ 将重新将该消息发送给其他消费者进行处理。
-
消费者消费速度过慢时
:可以通过增加消费者数量或者优化消费者代码来提高消费速度。还可以设置合适的消息限流策略,控制每个消费者可以处理的消息数量,避免队列中积累过多的消息。
-
可以考虑设置
幂等性
在生产者发送消息时,为每个消息设置一个全局唯一ID + 码值 (全局ID:采用雪花算法生成的业务表主键,码值:订单号/批次号/或每条消息内的主键);
①并发量不高的情况下:可以考虑在数据库维护一张消费记录表维护处理状态,查询是否处理消费消息。
②并发量很高的情况下:使用分布式锁解决并发问题,将全局ID写入Redis 的key,Value值保存处理状态,用来判断是否消费消息。当消费者接收到消息后,可以先查询该消息的 ID 是否已经被处理过。如果已经处理过,则可以直接跳过该消息,避免重复消费。
总之,避免 RabbitMQ 重复消费需要综合考虑多个因素,包括消费者异常终止、消费速度过慢和消息确认等问题。通过合理地设置消费者数量、消息限流策略和消息确认机制等措施,可以有效地避免 RabbitMQ 重复消费问题的发生。
六、消息丢失
RabbitMQ
消息丢失通常可能是由以下几个原因导致的:
-
生产者消息未被成功发送到 RabbitMQ
当生产者发送消息时,可能由于网络故障、连接中断或生产者异常终止等原因,导致消息未能成功发送到 RabbitMQ。这种情况下,消息会丢失,消费者将无法收到该消息。
- 解决办法:
- Confirm 模式
在生产者端,设置开启
Confirm
(confirm mode
) 确认模式,来确保消息的可靠发送。此模式保证每次写的消息都会分配一个唯一的 ID,如果消息成功写入了 RabbitMQ 中,RabbitMQ 会回调生产者 Ack 接口,让生产者收到一个确认回执,告知此消息成功写入,如果 RabbitMQ 没能处理这个消息,会回调一个 nack 接口,告知此消息接受失败,需要重发。并且也可以在内存里维护每个消息的ID状态,如果超过一定时间还没有接收到此消息的回调,也可以重发。 - 事务机制(基于AMQP协议)
发送消息前,开启事务(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback()),如果发送成功则提交事务(channel.txCommit()),吞吐量下降,同步处理,不推荐
- Confirm 模式
- 解决办法:
-
RabbitMQ 服务器故障,导致消息丢失
:RabbitMQ 服务器可能会遇到各种故障,如断电、宕机、崩溃等,这可能导致消息丢失。如果消息尚未持久化并且未被传递到任何持久化的队列中,那么在服务器故障后,这些消息将不可恢复地丢失。
-
解决办法
为了防止消息丢失,可以考虑将消息设置为持久化,以确保消息在 RabbitMQ 服务器故障后仍然可恢复。
设置Queue持久化和消息持久化:
-
① 创建 Queue 时设置持久化
-
② 发送消息时设置持久化
开启 RabbitMQ 持久化
设置持久化有两个步骤:
1.创建 Queue 的时候将其设置为持久化,队列的持久化标识 durable 设置为 true,这样就可以保证持久化 Queue 的元数据,但是不会持久化 Queue 里的消息。
2.发送消息的时候将消息的 deliveryMode 设置为MessageDeliveryMode.NON_PERSISTENT
,就是将消息设置为持久化。此时 RabbitMQ 就会将消息持久化到磁盘了
注意:必须同时设置这两个才可以
这个持久化可以跟生产者的 Confirm 机制配合起来,只有消息被持久化到磁盘之后,才会回调生产者的 ack 接口,所以即使是持久化到磁盘之前 MQ 挂了,生产者收不到回调,也会重发
-
-
-
消费者消费失败或异常终止
:当消费者在处理消息时发生异常、崩溃或消费速度过慢时,可能会导致消息丢失。如果消费者未能正确处理消息并发送确认回执,RabbitMQ 将认为该消息仍然未被消费,然后将该消息重新投递给其他消费者。
- 解决办法
在消费者端,可以使用消息确认机制(ack/nack)来确保消息被正确消费。消费者在处理完消息后,发送确认回执给 RabbitMQ,以告知消息已经成功处理。如果消费者遇到异常情况,可以选择拒绝该消息(nack)并进行相应的处理。这样,RabbitMQ 将会将消息重新投递给其他消费者,确保消息不会丢失。
- 关闭消费者的自动 ack 机制,采用手动 ack 形式
- 消费者处理完消息后手动 ack 通知 MQ 删除消息
- 解决办法
-
队列溢出
:如果队列的容量限制超出,新的消息将无法进入队列,从而导致消息丢失。这通常发生在没有设置合理的队列大小限制或者没有进行及时的消息处理的情况下。
- 解决办法:
可以通过设置合理的队列大小限制,或者使用消息丢弃策略(如先进先出、最近未使用等)来控制队列溢出的情况。
- 解决办法:
总之,为了避免 RabbitMQ 消息丢失,需要在生产者和消费者端采取相应的措施。在生产者端,可以使用确认模式来确保消息的可靠发送;在消费者端,可以使用消息确认机制来确保消息被正确处理。此外,合理设置队列大小限制和消息丢弃策略也是防止消息丢失的有效方法。
七、顺序消费
在 RabbitMQ 中实现顺序消费是一个相对复杂的问题,因为 RabbitMQ 是一个多节点、分布式的消息中间件,消息的顺序性无法得到保证。
以下是几种可能的方法:
-
单队列单消费者:
在这种情况下,只有一个队列和一个消费者。由于只有一个消费者,消费者将按照消息的顺序逐个消费消息,以达到顺序消费的目的。
-
多队列单消费者:
可以通过将消息按照一定规则(例如按照某个关键字或 ID)发送到不同的队列中,然后只创建一个消费者来消费这些队列。消费者从队列中按照顺序消费消息,以达到顺序消费的效果。
-
多队列多消费者:
可以通过创建多个队列和多个消费者,每个队列对应一个消费者。然后,根据某种规则将消息发送到不同的队列中。消费者按照队列的顺序消费消息,从而实现顺序消费。
-
消息排序器:
可以在消息生产者端引入一个排序器组件,该组件负责对消息进行排序,并将排序后的消息发送到 RabbitMQ。消费者按照消息的顺序消费。
注意: 这些方法都只能实现近似的顺序消费,并不能完全保证消息的绝对顺序。因为 RabbitMQ 是一个分布式系统,消息可能会在不同节点之间进行传输和分发,无法保证单个节点上的顺序。此外,当消息在队列中堆积较多时,消费者可能会出现处理速度不一致的情况,进而导致消息的相对顺序发生变化。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)