RabbitMQ基础

1.简介

​ RabbitMQ是Erlang语言开发的基于AMQP的开源的消息系统。支持Linux、Windows、MacOX等主流操作系统。支持Java、Python等多种语言。

​ AMQP是消息队列的一个协议。

ConnectionFactory、Connection、Channel都是RabbitMQ对外提供的API中最基本的对象。

Connection是RabbitMQ的socket链接,它封装了socket协议相关部分逻辑。ConnectionFactory为Connection的制造工厂。
Channel是我们与RabbitMQ打交道的最重要的一个接口,我们大部分的业务操作是在Channel这个接口中完成的,包括定义Queue、定义Exchange、绑定Queue与Exchange、发布消息等。

2.RabbitMQ的5种队列

1.简单队列

img

P:消息的生产者
C:消息的消费者
红色:队列

生产者将消息发送到队列,消费者从队列中获取消息。

2.工作队列模式(Work Queues)

img

特点:

​ 1条消息只能被1个消费者消费。

轮询分发(Round-Robin):优点是队列出现积压,只需要增加消费者即可解决这一问题。系统伸缩性更佳。缺点是不会考虑机器性能,只保证消息均匀分发出去,性能较高的机器利用率不高。

可以使用basicQoc(prefetchCount = 1),限制RabbitMQ只发不超过1条消息给同一个消费者。消息有了反馈后才会进行第二次发送。同时关闭自动应答,改为手动应答。即可时性能高的机器处理更多的消息。

3.发布订阅模式(Publish/Subscribe)

img

特点:

​ 1.一个生产者,多个消费者

​ 2.每个消费者都有自己的一个队列

​ 3.生产者将消息发送至交换机,由交换机分发消息至队列。

​ 4.每个队列必须要绑定交换机。

​ 5.生产者发送的消息,经过交换机进入多个队列,可以保证一个消息被多个消费者获取。

注意:

​ 消息发送到没有队列绑定的交换机时,消息将丢失。因为交换机不存储消息。

4.路由模式(Routing)

img

特点:

​ 生产者根据交换机类型和路由key确定其发往哪一个队列。

​ routing key设定的最大长度为255bytes

5.主题模式(Topic)

img

特点:

​ 由于路由模式的routing key都是固定的,不能满足扩展性,所以主题模式模糊匹配会更加实用。

6.RPC模式

img

特点:

  • 客户端发送请求(消息)时,在消息的属性(MessageProperties,在AMQP协议中定义了14中properties,这些属性会随着消息一起发送)中设置两个值replyTo(一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中)和correlationId(此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败)
  • 服务器端收到消息并处理
  • 服务器端处理完消息后,将生成一条应答消息到replyTo指定的Queue,同时带上correlationId属性
  • 客户端之前已订阅replyTo指定的Queue,从中收到服务器的应答消息后,根据其中的correlationId属性分析哪条请求被执行了,根据执行结果进行后续业务处理

RabbitMQ高可用

Rabbit有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

​ 自己本地跑demo做调研和演示用。

普通集群模式(无高可用)

​ 简单说就是在多台机器上启动RabbitMQ实例,将消息分发到多台机器上。 queue,只会放在一个 RabbitMQ 实例上,但是每个实例都会同步queue的元数据,消费时如果获取的消息不在当前实例上,会自动从存在此消息的实例上拉取数据。

在这里插入图片描述

缺点:

​ 1.没有做到分布式,只能提高一下吞吐量,做不到高可用。

​ 2.消费者每次随机连接一个实例去获取数据,如果不在当前实例,数据拉取会有一定的时间开销。

​ 3.存放queue的实例一旦宕机,如果没有配置消息持久化,数据将会丢失;配置了的话,需要等实例恢复后才可以正常提供服务。

镜像集群模式(高可用)

创建的queue,无论是元数据还是queue里面的消息都会存在多个实例上,每个Rabbit实例都有着完整的数据,相当于多了几个备份。当把消息写入queue中,会自动将消息同步至其他Rabbit实例。

mq-8

操作方式:在Rabbit控制台增加一个镜像集群策略。

优点:

​ 1.操作简单,在Rabbit提供的控制台即可操作。

​ 2.一个机器宕机了,其他实例还包含这所有的数据,不会因为RabbitMQ的不可以导致系统的不可用。

缺点:

​ 1.每条消息都需要同步到每个实例上,同步消息会有一定的性能开销。

​ 2.不支持横向扩展,要想横向扩展只能够给每台机器增配。

RabbitMQ如何保证消息可靠性传输、顺序性

RabbitMQ如何保证消息的可靠性传输?

消息丢失的三种情况及解决方案

1.生产者弄丢了数据。

​ 生产者在发送数据时由于网络、系统等原因导致消息没有发送成功。

​ 可以使用RabbitMQ提供的事务功能。就是生产者发送数据之前开启 RabbitMQ 事务channel.txSelect,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务channel.txRollback,然后重试发送消息;如果收到了消息,那么可以提交事务channel.txCommit

// 开启事务
channel.txSelect
try {
    // 这里发送消息
} catch (Exception e) {
    channel.txRollback

    // 这里再次重发这条消息
}

// 提交事务
channel.txCommit

**缺点:**吞吐量太低,严重影响性能。

常用方式:confirm机制。

​ 在生产者那里设置开启 confirm 模式之后,你每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给你回传一个 ack 消息,告诉你说这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调你的一个 nack 接口,告诉你这个消息接收失败,你可以重试。而且你可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,那么你可以重发。

​ 事务机制和 confirm 机制最大的不同在于,事务机制是同步的,你提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,你发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调你的一个接口通知你这个消息接收到了。

2.RabbitMQ弄丢了数据。

就是 RabbitMQ 自己弄丢了数据,这个你必须开启 RabbitMQ 的持久化,就是消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

设置持久化有两个步骤

  • 创建 queue 的时候将其设置为持久化
    这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2
    就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

必须要同时设置这两个持久化才行,RabbitMQ 哪怕是挂了,再次重启,也会从磁盘上重启恢复 queue,恢复这个 queue 里的数据。

注意,哪怕是你给 RabbitMQ 开启了持久化机制,也有一种可能,就是这个消息写到了 RabbitMQ 中,但是还没来得及持久化到磁盘上,结果不巧,此时 RabbitMQ 挂了,就会导致内存里的一点点数据丢失。

所以,持久化可以跟生产者那边的 confirm 机制配合起来,只有消息被持久化到磁盘之后,才会通知生产者 ack 了,所以哪怕是在持久化到磁盘之前,RabbitMQ 挂了,数据丢了,生产者收不到 ack,你也是可以自己重发的。

3.消费端弄丢了数据。

RabbitMQ 如果丢失了数据,主要是因为你消费的时候,刚消费到,还没处理,结果进程挂了,比如重启了,那么就尴尬了,RabbitMQ 认为你都消费了,这数据就丢了。

这个时候得用 RabbitMQ 提供的 ack 机制,简单来说,就是你必须关闭 RabbitMQ 的自动 ack,可以通过一个 api 来调用就行,然后每次你自己代码里确保处理完的时候,再在程序里 ack 一把。这样的话,如果你还没处理完,不就没有 ack 了?那 RabbitMQ 就认为你还没处理完,这个时候 RabbitMQ 会把这个消费分配给别的 consumer 去处理,消息是不会丢的。

rabbitmq-message-lose-solution

RabbitMQ如何保证消息的顺序性?

场景:

  • 一个 queue,多个 consumer。比如,生产者向 RabbitMQ 里发送了三条数据,顺序依次是 data1/data2/data3,压入的是 RabbitMQ 的一个内存队列。有三个消费者分别从 MQ 中消费这三条数据中的一条,结果消费者2先执行完操作,把 data2 存入数据库,然后是 data1/data3。这不明显乱了。

rabbitmq-order-01

解决方案:

拆分多个 queue,每个 queue 一个 consumer,就是多一些 queue 而已,确实是麻烦点;或者就一个 queue 但是对应一个 consumer,然后这个 consumer 内部用内存队列做排队,然后分发给底层不同的 worker 来处理。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-i2RVzzce-1585320256418)(https://doocs.github.io/advanced-java/docs/high-concurrency/images/rabbitmq-order-02.png)]

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐