简介

Rabbitmq是一个开源的AMQP实现, 服务器端使用Erlang语言编写的, 支持Ajax, 用于分布式系统中存储转发消息. 其中, AMQP, 全拼: Advanced Message Queuing Protocol ,高级消息队列, 应用层协议测一个开放标准, 为面向消息的中间件设计, 中间件主要作用是解决组件之间的耦合问题(解耦), 消息发送者无需知道消息消费者是否存在.

五种工作模式

  1. Simple简单模式
    一个生产者, 一个消费者 ,生产者将消息放到队列中, 消费者监听消息队列, 如果队列中有消息, 立即进行消费, 消费后, 队列中消息自动删除
  2. Work工作模式
    一个生产者, 多个消费者, 每个消费者获取的消息唯一, 多个消费者之间是竞争关系, 同时争抢队列中的消息, 谁拿到谁负责消费
  3. Publish/Subscribe发布订阅
    一个生产者发送消息会被多个消费者获取
    生产者将消息放入交换机, 交换机发布订阅把消息发送到所有消息队列中, 对应队列的消费者获取消息进行消费, 只有匹配对应的key的消息队列, 消费者才可以消费
  4. Routing路由模式
    发送消息到交换机并且要指定路由key, 消费者将队列绑定到交换机时需要指定路由key, 当前消费者携带key进行消费,
  5. Topic主题模式(路由模式的一种)
    将路由键进行匹配, 此时队列需要绑定在一个模式上, “#*”, 代表通配符 "#"匹配一个词或多个词, "*"只能匹配一个词. 路由功能添加模糊匹配, 交换机根据key的规则模糊匹配到对应的队列, 由队列的监听者接受消息消费.

相关术语

  1. Exchange: 交换机, 消息路由, 有的模式生产者发送的消息不会直接发送到队列中的, 而是先到指定的路由中, 然后由路由key绑定的队列发送到指定队列中. 常用交换机有三种: fanout、direct、topic,其中,发布订阅模式用的是fanout类型交换机,路由模式用的是direct类型交换机,主题模式用的是topic类型的交换机。
  2. Binding: 绑定, 建立路由交换机和队列容器的绑定关系.
  3. Routing: 路由key, 用于消费者和队列, 转换机和队列之间的认定

相关问题

  1. 问什么用消息队列?
    解耦、异步、削峰
    1.1 传统模式是每个系统直接直接访问,这样缺点和明显:耦合性太强,如果系统A调用B和C系统的接口,此时要加入一个新系统D,系统A要调用D的话还要修改A的代码,如果使用中间件的话,只需要将D的消息发送到消息队列中,A直接从队列中消费消息即可,这样A就不必修改代码了。
    1.2 传统模式中如果业务比较繁琐,需要访问多个其他系统,才可以的响应,这样会导致响应时间过长,影响系统性能,对于一些没必要的业务逻辑,一些不必立即得到结果的逻辑,可以通过中间件的方式实现,只要将需要处理的业务信息发送到消息队列中,发送成功即可返回响应(发送成功),即可,服务器继续执行下面的业务,而不必等待所有的消息消费完,从而提高效率。
    1.3 传统模式系统直接与数据库进行操作,然而当并发量大的时候,直接将所有请求推给数据库,很容易导致数据库不堪重负,通过使用中间件,系统可以根据数据库能承担的并发量进行发送请求慢慢的从消息队列中获取消息。

  2. 系统使用消息会出现什么问题?
    2.1 系统可用性降低,在没有用消息队列之前系统直接访问,现在添加一个中间件,如果消息队列停止工作,那么相关系统也就不能正常工作了。
    2.2 系统复杂性增加:需要面对消息队列传输一致性问题、消息不被重复消费的问题、数据不丢失的问题、消息有序消费…

  3. 关于消息一致性问题

  4. 关于重复消费

  5. 关于数据丢失(可靠传输)
    消息丢失分三种情况: 消息在传输过程中丢失 / MQ收到消息暂存在内存中, 还没有消费, 服务器挂掉, 导致消息丢失 / 消费者收到消息, 但是还没处理, 服务器挂掉, MQ认为消息已被消费
    问题方案:
    情况一:
    方案一: 利用RabbitMq提供的事务功能, 在生产者发送数据之前开启RabbitMq事务channel.txSelect, 然后发送消息, 如果没有成功被RabbitMq接收, 那么生产者会收到异常报错, 此时会执行回滚事务channel.txRollback, 然后重新发送消息 ; 如果RabbitMq成功接收消息, 那么就可以提交事务
    // 开启事务
    channel.txSelect
    try{ //这里发送消息
    } catch (Exception e){
    channel.txRollback; // 事务回滚
    // 再次发送这条消息;
    }
    缺点: RabbitMq事务机制是同步的, 所以这回增加服务器的性能消耗, 将服务器的吞吐量降低

    方案二:
    开启RabbitMq的confirm模式, 在生产者一端开启confirm模式, 每次写的消息都会分配一个唯一的id, 如果RabbitMq成功接收, RabbitMq会回传一个ask消息, 表示消息发送成功; 如果RabbitMq没有接受到消息, 会回调一个生产者的nack接口, 表示消息接收失败, 同时可以在生产者处, 维护一个消息的id, 如果超过一定时间没有收到该消息的回调, 可以重新发送
    注意:
    事务是同步的 所以如果你提交了一个事务只有会阻塞在那里, 而confirm是异步的, 消息的发送不受消息接收结果的影响, 无论接收是否成功, 分别都会回调一个生产者的接口, 表示该消息的接受结果
    情况二:
    待续…

  6. 关于有序消费

高可用

RabbitMq的高可用是基于主从方式是实现的, 它的工作模式分为: 单机模式 普通集群模式 镜像集群模式
元数据: 终结数据, 算是一种电子目录, 支持指示: 存储位置, 历史数据, 资源查找, 文件记录
普通集群模式
原理在多个服务器上部署MQ实例, 每台服务器有一个实例, 但创建的每个队列queue, 只会存储在一个queue上, 其他服务器上的实例会同步主queue的元数据, 可以通过该元数据拉取主queue上的数据
缺点: 集群之间可能产生大量的数据传输/如果主MQ节点宕机了, 会直接到时队列中所有消息数据丢失, 队列服务不可用
镜像集群模式
多台服务器, 每台一个实例, 无论是queue的元数据还是消息数据, 都会同步到所有的queue上, 每个queue都可以获取消息数据.
缺点:
整个集群内部的实例的所有数据同步, 内存性能开销比较大
无法线性扩容: 以为每个服务器中都包含整个集群服务节点的所有数据, 这样如果一旦单个服务器节点的容量无法容纳了怎么办

消息路由

资源来源: https://blog.csdn.net/vipshop_fin_dev/article/details/81612935
在这里插入图片描述
在这里插入图片描述
tcp_acceptor : 接受客户端连接 , 创建rabbit_reader、 rabbit_writer 、 rabbit_channel 进程
rabbit_reader : 接受客户端连接, 解析AMQP帧;
rabbit_writer : 向客户端返回数据
rabbit_channel : 解析AMQP方法, 对消息进行路由, 然后发给相应队列进程
rabbit_amqqueue_process是队列进程, 在RabbitMQ(恢复durable类型队列)或创建队列时创建
rabbit_msg_store : 是负责消息持久化的进程
在整个系统中, 存在一个tcp_accepter进程, 一个rabbit_msg_store进程, 有多少个队列就有多少个rabbit_amqqueue_process进程, 每个客户端连接对应一个rabbit_reader和rebbit_writer进程

关于AMQP协议

1 . AMQP帧组件
AMQP帧由五个不同的组件组成:
帧类型 | 信道编号 | 以字节为单位的帧大小 | 帧有效载荷payload | 结束字节标志(ASCII值206)
在这里插入图片描述
2 . 帧类型
AMQP规范定义了五种类型的帧: 协议头帧、方法帧、内容帧、消息帧、及心跳帧。每种帧类型都有明确的目的,有些帧的使用频率比其他的高许多。每种帧的作用如下:
协议头帧 :用于连接到rabbitmq, 仅使用一次
方法帧 : 携带方法给rabbitmq或者从rabbitmq接收到的rpc请求或者响应
内容帧 : 包含一条消息的大小和属性
消息体帧 : 包含消息的内容
心跳帧 : 在客户端与rabbitmq直接进行传递 , 作为一种校验机制, 确保连接的两端可用并且正常工作
3 . 将消息组成帧
使用方法帧、内容头帧和消息体帧组成一个完整的rabbitmq消息,方法头帧携带命令和执行所需要的参数(如交换机和路由键)、内容帧包含消息的基本属性以及消息的大小,消息体帧携带真正需要发送的消息内容。
在这里插入图片描述
4 . 方法帧结构
在这里插入图片描述
5 . 内容帧结构
在这里插入图片描述
内容头包含的具体属性如下 :
to在这里插入图片描述
content-type : 消息体的报文编码, 如application/json
expiration : 消息过期时间
reply-to : 响应消息的队列名
content-encoding : 报文压缩的编码 , 如gzip
message-id : 消息的编号
correlation-id : 链路id
deliver-mode : 告诉rabbitmq将消息写入磁盘还是内存
user-id : 投递消息的用户(发送消息时不要设置该值)
timestamp : 投递消息的时间
headers : 定义一些属性, 可用于实现rabbitmq路由(比如exchange类型是headers的时候用到)

6 . 消息体帧结构
在这里插入图片描述
7 . 几个概念
Broker : 简单来说就是消息队列服务器实体
Exchange : 消息交换机, 它指定消息按什么规则, 路由到那个队列.
Queue : 消息队列载体, 每个消息都会被投入到一个或多个队列, 队列类型又分为临时队列, 持久化队列, 排他队列.
Binding : 绑定, 它的作用就是把Exchange和Queue按照路由规则绑定起来.
Routing Key : 路由关键字, Exchange根据这个关键字进行消息投递.
VHost : 虚拟主机, 一个Broker里可以开设多个VHost, 用作不同用户的权限分离.
Producker : 消息消费者, 就是投递消息的程序.
Consumer : 消息消费者, 就是接受消息的程序.
Channel : 消息通道, 在客户端的每个链接里, 可建立多个Channel, 每个Channel代表一个会话任务

通讯过程

1 . 启动会话
在这里插入图片描述
2 . 声明交换器
在这里插入图片描述
3 . 声明队列
在这里插入图片描述
4 . 绑定队列到Exchange
在这里插入图片描述
5 . 发送消息-使用事务机制
在这里插入图片描述
对事务的支持是AMQP协议的一个重要特性。假设当生产者将一个持久化消息发送给服务器时,假如使用no_ack模式, 所以即使服务器崩溃, 没有持久化该消息, 生产者也无法获知该消息已经丢失.如使用no_ack模式, 所以即使服务器崩溃, 没有持久化该消息, 生产者也无法获知该消息已经丢失. 如果此时使用事务, 即通过txSelect()开启一个事务, 然后发送消息给服务器, 然后通过txCommit()提交该事务, 即可以保证, 如果txCommit()提交了, 则该消息一定会持久化, 如果txCommit()还未提交即服务器崩溃, 则该消息不会被服务器接收. 当然RabbitMQ也提供了txRollBack()命令用于回滚某个事务.但是使用使用, 会导致性能下降, 它使得生产者发布消息之后必须等待真正持久化之后, 服务器响应了之后, 才可以结束本次连接, 所以需要在实际应用中平衡性能与安全的问题.

6 . 发送消息-非事务方式
在这里插入图片描述
使用事务固然可以保证只有提交的事务才被服务器执行. 但是这样同时也将客户端与消息服务器同步起来, 这就背离了消息队列解耦的本质. RabbitMQ提供了一个更加轻量级的机制来保证生产者可以感知服务器消息是否已经被路由到正确的队列中- -confirm. 如果设置channel为confirm状态, 则通过该channel发送的消息都会被分配一个唯一的ID, 然后一旦该消息被正确的路由到匹配的队列中后, 服务器会返回给生产者一个confirm, 该Confirm包含该消息的ID, 这样生产者就会知道该消息已经被正确分发. 对于持久化消息, 只有该消息被持久化后, 才会返回Confirm.
Confirm机制的最大优点在于异步, 生产者在发送消息之后, 即可继续执行其他任务(也就是异步监听服务器端的ACK即可).而服务器返回Confirm后, 会触发生产者的回调函数, 生产者在回调函数中处理Confirm信息, 如果消息服务器发生异常, 导致该消息丢失, 会返回给生产者一个nack, 表示消息已经丢失, 这样生产者就可以通过重发消息, 保证消息不丢失. Confirm机制在性能上要比事务优越很多.
但是Confirm机制, 无法进行回滚, 就是一旦服务器崩溃, 生产者无法得到Confirm信息, 生产者其实本身也不知道该消息是否已经本持久化, 只有继续重发来保证消息不丢失, 但是如果原先已经持久化信息, 并不会回滚, 这样队列中就会存在两条相同的消息, 系统需要支持去重.

7 . 消费消息
在这里插入图片描述

delivery-mode的使用

上面提到了delivery-mode: 是一个在内容头帧中的一个属性, 通过该属性可以控制发送的消息内容存在内存还是磁盘, 下面来具体介绍该属性的使用.
delivery-mode 属性有两个属性值 : 1 - 表示非持久化 | 2 - 表示持久化
1 . 发送消息到纯内存队列中, delivery-mode = 1
在这里插入图片描述
特点 : 非持久化的消息在服务器宕机的时候会丢失数据, 但是由于不需要磁盘IO, 尽可能的降低消息投递延迟性, 性能较高.
2 . 发送消息到支持磁盘存储的队列, delivery-mode = 2
在这里插入图片描述
特点 : 持久化的消息安全性较高, 尽管服务器宕机, 数据也不会丢失, 但是在投递消息的过程中需要发生磁盘IO, 性能相对纯内存投递的方式低, 但是尽管是产生了磁盘IO, 由于日志的记录方式是直接追加到消息日志文件的末尾, 属于顺序IO, 没有随机IO, 所以性能还是可以接受的.

1 . 大概原理
所有队列中的消息都以append的方式写到一个文件中, 当这个文件的大小超过指定的限制大小后, 关闭这个文件再创建一个新的文件供消息的写入. 文件名(*.rdq)从0开始然后依次累加. 当某个消息被删除时, 并不会立即从文件中删除相关信息, 而是做一些记录, 当垃圾数据达到一定的比例时, 启动垃圾回收处理, 将逻辑相邻的文件中的数据合并到一个文件中.
2 . 消息的读写和删除
RabbitMQ在启动时会创建msg_store_persistent, msg_store_transient两个进程, 一个用于持久化的存储, 一个用于内存不够时, 将存储在内存中的非持久化数据转存到磁盘中. 所有队列的消息的写入和删除最终都由这两个进程处理, 而消息的读取则可能是队列本身直接打开文件进行读取, 也可能是发送请求由msg_store_persisteng/msg_store_transient进程进行处理.
在进行消息的存储时,rabbitmq会在ets表中记录消息在文件中的映射,以及文件的相关信息。消息读取时,根据消息ID找到该消息所存储的文件,在文件中的偏移量,然后打开文件进行读取。消息的删除只是从ets表删除指定消息的相关信息,同时更新消息对应存储的文件的相关信息(更新文件有效数据大小)。

消息路由模式

流量控制

RabbitMQ可以对内存和磁盘使用量设置阈值, 当达到阈值后, 生产者将被阻塞(block), 直到对应项恢复正常, 除了这两个阈值, RabbitMQ在正常情况下还用流控(Flow Control)机制来确保稳定性.
Erlang进程之间并不共享内存(binaries类型除外), 而是通过消息传递来通信, 每个进程都有自己的进程邮箱. Erlang默认没有对进程邮箱大小设限制, 所以当有大量消息持续发往进程时, 会导致该进程邮箱过大, 最终内存溢出并崩溃.
在RabbitMQ中, 如果生产者持续高速发送, 而消费者消费速度较低时, 如果没有流控, 很快就会使内部进程邮箱大小达到内存阈值, 阻塞生产者(得益于block机制, 并不会崩溃) .然后RabbitMQ会进行page操作, 将内存中的数据持久化达到磁盘中.
为了解决该问题, RabbitMQ是用来一种基于信用证的流控机制. 消息处理进程有一个信用组{InitialCredit , MoreCreditAfter}, 默认值为:{200,50}. 消息发送者进程A向接受者进程B发送消息, 每发送一条消息, Credit数量减一, 直到0, A被block住, 对于接受者B, 每接受MoreCreditAfter条消息, 会向A发送一条消息, 给予A MoreCreditAfter个Credit, 当A的Credit>0时 , A可以继续向B发送消息.

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐