rabbitMQ从入门到精通
引言RabbitMQ是基于AMQP协议(具有天然的跨平台性),有erlang语言开发,是目前部署最广泛的开源消息中间件,它的优势在于erlang语言开发,适用于socket开发,其次是它天生与spring框架整合非常方便,最后它在处理消息的丢失,事务一致性方便处理的十分严密,几乎没有丢失。生产者产生消息后,放在exchange中,exchange将消息一个一个的放在消...
目录
<三>. fanout模型(广播队列fanout Queue)
引言
RabbitMQ是基于AMQP协议(具有天然的跨平台性),由erlang语言开发,天生具有高并发的语言优势。是目前部署最广泛的开源消息中间件,它的优势在于erlang语言开发,适用于socket开发,其次是它天生与spring框架整合非常方便,最后它在处理消息的丢失,事务一致性方便处理的十分严密,几乎没有丢失。
生产者产生消息后,放在exchange中,exchange将消息一个一个的放在消息队列中,消费者与消息队列进行绑定,从而获取消息
目录
一:rabbitMQ的安装
1. 官网:https://www.rabbitmq.com/
下载不止要下载rabbit的安装包,还要下载erlang语言的支持,因为rabbitMQ是基于该语言开发的。而且erlang的版本要与rabbit的版本兼容
有三个:erlang-22.0.7-1.el7.x86_64.rpm、rabbitmq-server-3.7.18.rpm、socat-1.7.3.2.rpm
安装教程参考博客:https://blog.csdn.net/qq_20492999/article/details/81254242
2. 登录到rabbit的web管理界面,可进行添加账号,虚拟主机绑定,通道管理,交换机管理等操作
rabbitMQ的服务端口默认是5672,管理界面默认端口是15672,集群的通讯端口是25672
对于MQ来说,连接时需要设置虚拟主机(virtuaHost),默认的虚拟主机是:/,可以对虚拟主机添加用户,讲虚拟主机与用户进行绑定,在连接时,设置虚拟主机和绑定该虚拟主机的用户,完成连接。虚拟主机的概念就是用来区分不同的项目的,例如我A系统生产者和消费者统一都是连接的xxx这个虚拟主机,B系统连接的是mmm虚拟主机,这样可以进行业务模块区分,相当于数据库的不同库的概念。也可以是相同系统中的不同环境或者不同业务模块,也可以根据虚拟主机进行隔离区分。
3. 在rabbitMQ的web管理界面中,可以添加用户添加虚拟主机, 查看交换机,查看消息队列等信息
二:MQ知识锦集
- MQ消息的自动确认机制。开启后可能会丢失消息。关闭自动确认消息机制,消费完成后手动确认消息
- MQ消息的持久化:队列与消息均设置为持久化。在创建连接并获取通道后,在通道与队列进行绑定的过程中,需要设置队列的持久化。在通道给队列发送消息的时候,也要设置消息的持久化参数。设置持久化后,在rabbit服务重启后,消息和队列对从磁盘中完成恢复。
三:rabbitMQ的使用
<一>:直连模式
生产者发送消息(通过通道将消息直接放在队列)、消费者绑定消息队列,等待消息的到来
1. 发布消息
public void test1() throws Exception{
//创建MQ连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("127.0.0.1");
//设置连接端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/admin");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中通道对象
Channel channel = connection.createChannel();
/*
通道绑定 对应消息队列
p1: 队列名称,如果不存在会自动创建
p2: 是否持久化,即rabbitmq重启后,该队列是否还存在,true时,该队列会存放在磁盘。false时,重启后不复存在,仅仅是队列的持久化,不管是true还是false。队列中的消息都有可能丢失,如果想让具体的消息也完成持久化,需要在发布消息的时候对队列跟消息均进行持久化设置。
p3: 是否独占队列,代表此队列是否只能被当前连接所使用。true:其他连接不可使用,连接ji连接对象
p4: 是否在消费完成后,自动删除队列,true:删除,false:不删除.但是是在消费者与队列连接以及通道彻底关闭时,并且消息已经被消费完成。才会自动删除。
p5: 额外附加参数
*/
channel.queueDeclare("hello",false,false,false,null);
// 此时与MQ的连接已经完成,通道已经创建,通道与队列已经完成绑定。
// 可以进行发布消息了
/*
p1: 交换机名称
p2: 队列名称
p3: 传递消息额外设置,可以通过参数,将消息设置为持久化MessageProperties.PERSISTENT_TEXT_PLAIN,当队列与消息【均为】持久化时,消息就不会被丢失
p4: 消息的具体内容
*/
channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
channel.close();
connection.close();
}
2. 消费消息
public static void main(String[] args) throws Exception{
//创建MQ连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接rabbitmq主机
connectionFactory.setHost("127.0.0.1");
//设置连接端口号
connectionFactory.setPort(5672);
//设置连接的虚拟主机
connectionFactory.setVirtualHost("/admin");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = connectionFactory.newConnection();
//创建通道对象
Channel channel = connection.createChannel();
//通道绑定队列,消费者对同一个队列的定义,需要与消费者对该队列的定义一致,不要出现,消费者定义该队列为持久化,
//生产者定义的是不持久化,否则会出现问题
channel.queueDeclare("hello", false, false, false, null);
//消费消息
//方法一:
/*
p1: 消费消息队列的名称
p2: 开启消息的自动确认机制,为true时:也就是消费者收到消息后,不管后续处理如何,会立即告诉rabbitmq已经完成了这些消息的处理,mq队列中就会将给该消费者的消息进行删除
p3: 消费时的回调接口
*/
/*channel.basicConsume("hello",true,new DefaultConsumer(channel) {
@Override //body参数,就是我们拿到的消息
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("message is " + new String(body));
}
});*/
//方法二:rabbitMQ官网给出的example:
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody());
System.out.println("message is that:" + message);
};
channel.basicConsume("hello", true, deliverCallback, consumerTag -> {});
//如果通道关闭或者连接关闭,就会导致我们的消费者还没来得及消费消息,或者还没有消费完,通道连接就关闭了,
//从而导致消息没有被正确处理
/*channel.close();
connection.close();*/
}
3. 特性:这种方式是最简单的点对点的消息队列模型,《一个通道只能被一个消费者使用》。
尽管简单,但是使用可以根据业务逻辑变得广泛
4. 对于实际的项目中,可以将MQ的连接信息封装在配置文件中,通过启动bean的方式将MQ的连接工厂进行注册,后续使用时可通过注入的方式使用连接工厂获取通道
一个通道(channel)可以向多个队列(queue)发送消息
直连模型的总结以及相关API的使用:
1. 消息的持久化设置:队列与消息均设置为持久化。在创建连接并获取通道后,在通道与队列进行绑定的过程中,需要设置队列的持久化。在通道给队列发送消息的时候,也要设置消息的持久化参数。设置持久化后,在rabbit服务重启后,消息和队列对从磁盘中完成恢复。
<二>. Work模型(工作队列work Queue)
简介:当消息处理比较耗时时,可能存在生产者生产消息的速度远远大于消息消费的速度,从而导致消息在队列中的堆积,得不到即时消费,影响业务。之前是一个队列对应一个消费者,生产者生产十个消息到队列的时间。消费者才消费一个消息,导致了队列消息堆积得不到及时的处理。从而此时就可以使用work模型,多个消费者绑定到同一个队列,共同消费队列中的消息,队列中的消息一旦被消费就会消除。所以不会重复处理消息的。
1. 创建消息生产者(参考上面代码)
2. 创建两个一样的消费者,绑定同一个队列(参考上面代码)
将某个消费者设置为一秒钟处理一条消息后发现,该消费者处理的消息数量是不会改变的。这是因为MQ给两个消费者是循环平均分配
Connection connection = MQconnectFactory.getConnetcionStatic();
Channel channel = connection.createChannel();
channel.queueDeclare("work", true, false, false, null);
channel.basicConsume("work", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//将该消费者模拟设置为一秒钟处理一个消息。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("consumer-1:" + new String(body));
}
});
3. 此时存在两个问题,
问题一:如果A消费者在消费过程中,例如队列有100个消息,AB两个消费者,当这两个消费者连接到该队列后,会立马每人领走50个消息,并将该消息在队列中删除。但如果A被强行关闭或者dead掉了,则该消费者中的所有被分配的消息都会被丢失掉。
原因:是因为在消费者处理消息时,启动了自动确认机制。channel.basicConsume("hello",true,new DefaultConsumer(channel)),第二个参数为是否进行自动确认为true,此时消费者会告诉rabbitmq已经将这些消息处理了,rabbitmq则会立马在队列中将这些消息删除。
解决方案:将该参数设置为false,并且在消费完成每一个消息后,进行手动确认
Connection connection = MQconnectFactory.getConnetcionStatic();
Channel channel = connection.createChannel();
//意思是消费者在消费消息时,每次在通道里传送一条消息。这样消费者就会一条消息一条消息的进行处理了。
//channel.basicQos(1);
channel.queueDeclare("work1", true, false, false, null);
channel.basicConsume("work1", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
//将该消费者模拟设置为一秒钟处理一个消息。
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("consumer-1:" + new String(body));
//手动确认消息机制
// 参数1:告诉MQ我需要确认的是哪一个消息,参数二,是否进行多消息同时确认
channel.basicAck(envelope.getDeliveryTag(), true);
}
});
问题二:A消费者处理消息很慢很慢,导致了业务不能及时得到处理。从而长此以往形成了业务阻塞。
原因:因为AB消费者是通过MQ的循环机制完成任务的平均分配
解决方案:将MQ队列中的消息不进行平均分配,也就是说,让AB两个消费者不再是一次每人直接领走50个任务了,而是让AB消费者每次处理一个任务,处理完了再领下一个任务。这样,A处理的慢领到的任务就会少,B处理快,领导的任务自然就多了。从而让业务消息AB一起处理:
Connection connection = MQconnectFactory.getConnetcionStatic();
Channel channel = connection.createChannel();
//通道中每次从队列给消费者传递一个任务去处理
channel.basicQos(1);
channel.queueDeclare("work1", true, false, false, null);
channel.basicConsume("work1", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer-2:" + new String(body));
//参数2:每次确认一个消息,不开启多消息确认机制
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
结果:A消费者处理了三条消息,B消费者处理了97条消息
总结:
work模式总结:
1. 默认情况下,rabbitMQ将按顺序将每个消息发送给下一个消费者,也就是说,在work模型中,不管有多少个消费者,不管每个消费者处理消息的速度如何。他们处理的消息数量都是一样的。
例:生产者给1队列放置了1000条消息,目前有AB两个消费者消费,A,1秒钟处理一个,B一秒钟处理十个。此时A处理的数量=B处理的数量。如果在处理过程中加入了消费者C。则此时ABC处理消息按照C加入的剩余消息来三个消费者均分处理。
2. 问题:如果消费者A在处理消息过程中阻塞或者dead机,但是A所分配的消息已经被MQ确认,此时就会导致A剩下的所有消息会被丢失。
解决:在消费者进行消费消息时,将自动确认机制设置为false,并在每消费完一个消息后,手动完成消息的确认。
<三>. fanout模型(广播队列fanout Queue)
简介:
①广播模型的意思是:交换机与多个队列进行绑定,队列与每一个消费者将进行绑定。生产者发送消息给交换机,交换机将该消息决定发送给哪些队列,然后交给消费者去消费。从而生产者只需要将消息交给队列,就可以完成消息的发布。从而实现,一个消息被多个消费者实现。该类型的交换机会忽略routingkey(路径)
②交换机在被发出消息后,他会将该消息立马发送给绑定了该交换机的队列。如果不存在与该交换机绑定的队列,消息就不会发给任何消费端,从而消息被丢失。官方文档解释:如果没有队列绑定到交换机,消息将丢失,但这对我们来说是可以的。如果没有消费者在听,我们可以安全地丢弃该消息。
③当消费者被断开连接时,所对应的队列会立马从服务端删除
场景:例如在某个订单系统:将库存系统,发货系统,通知系统分别绑定提交订单的交换机。用户提交订单后,发出提交订单的消息给交换机,交换机分发这些消息给队列,队列完成相应的库存减少,发货,以及用户通知等操作。从而完成一系列的订单操作。
1. 生产者:生产者在通道中绑定交换机,即可完成把消息发送到交换机。
//通道对象声明指定交换机,参数1:交换机名称 参数2:交换机类型固定的类型
channel.exchangeDeclare("order", "fanout");
//发布消息,将交换机的名字传进去,不用传队列了。
channel.basicPublish("order", "", null, "this is a new order".getBytes());
channel.close();
connetcionStatic.close();
2. 消费者:
①通道绑定交换机
②生成临时队列,交换机与队列完成绑定,临时队列当连接关闭会自动删除,autodelete属性为true
③读取消息完成消费
//1. 通道绑定交换机
channel.exchangeDeclare("order", "fanout");
//2. 获取临时队列。当没有消息的时候队列自动删除
String queue = channel.queueDeclare().getQueue();
//3. 在通道中将交换机和队列进行绑定
//参数1:队列名 参数2:交换机名 参数3:routingkey路径该类型不需要该参数
channel.queueBind(queue, "order", "");
//4. 进行消费消息
channel.basicConsume(queue, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("这是库存系统进行减少库存操作" + new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
3. 创建多个消费者绑定该交换机,即可完成一个消息多个消费
<四>. routing模型(订阅路由队列)
第一种订阅方式:direct通过固定直连路由完成订阅
简介:基于fanout模型的基础上,在该模式中,添加了路由(routingkey)的概念,也就是说生产者在发消息的时候,决定将该消息发送给哪个交换机的同时,再加一个条件决定给哪个绑定路由的消费者。
①生产者向交换机发送消息时,需要对该消息绑定一个路由;
②交换机将消息发送给-绑定了该交换机、-路由与生产者发出的路由一致 的完全匹配的队列
③消费者C1绑定了error的路由交换机,则发出消息并且路由的error的消息就会被C1接受处理
④消费者C2绑定了三个路由,则发出的这三种路由的消息都会被C2接受处理
⑤同理,如果生产者发送消息时,没有任何绑定和订阅,则消息就会被抛弃丢失
这种模式的好处是:生产者发送给交换机的消息,不再是广播的形式每个消费者都会受到,而是再去进行分类。每个消息设置一个路由类型,只有订阅了该类型的消费者,才会收到消息。
1. 生产者:
Connection connetcionStatic = MQconnectFactory.getConnetcionStatic();
Channel channel = connetcionStatic.createChannel();
//通道生命交换机以及交换机类型
channel.exchangeDeclare("log_direct", "direct");
//发布消息,并指定消息的路由 参数: 交换机、路由、额外参数持久化、消息
channel.basicPublish("log_direct", "error", null, "this is a error msg".getBytes());
channel.close();
connetcionStatic.close();
2. 消费者:
//消费者1:
Connection connetcionStatic = MQconnectFactory.getConnetcionStatic();
Channel channel = connetcionStatic.createChannel();
//通道生命交换机和类型
channel.exchangeDeclare("log_direct", "direct");
//生成临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue, "log_direct", "info");
channel.queueBind(queue, "log_direct", "warning");
channel.queueBind(queue, "log_direct", "error");
//读取队列的消息
channel.basicConsume(queue, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer1:" + new String(body));
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
//消费者2:
Connection connetcionStatic = MQconnectFactory.getConnetcionStatic();
Channel channel = connetcionStatic.createChannel();
//通道生命交换机和类型
channel.exchangeDeclare("log_direct", "direct");
//生成临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue, "log_direct", "error");
//读取消息
channel.basicConsume(queue, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumer1:" + new String(body));
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
<五>. routing模型(订阅动态路由topic模型)
简介:topic模式也叫做动态订阅路由模式,他基本与direct模式相似,只不过在它的基础上,可以让队列在绑定交换机的时候支持通配符的方式绑定。这种routingkey一般是由一个或者多个单词组成,一般为order.*,意思为order.*所有符合该模型的路由都会被该队列监听。
路由匹配规则:
mq.*: 意思是mq.xxx的都会被匹配
mq.#:意思是mq.xxx、mq.xxx.xxx.xxx 等有mq.后面一个或者多个单词组成的包括(mq)都会被匹配
1. 生产者:
//获取连接对象和通道
Connection connection = MQconnectFactory.getConnetcionStatic();
Channel channel = connection.createChannel();
//通道生命交换机以及类型
channel.exchangeDeclare("topic_test", "topic");
//发布消息,使用topic的动态路由routingKey
channel.basicPublish("topic_test", "log.info", null, "this is a info log".getBytes());
channel.close();
connection.close();
2. 消费者
//获取连接对象和通道
Connection connection = MQconnectFactory.getConnetcionStatic();
Channel channel = connection.createChannel();
//通道声明交换机
channel.exchangeDeclare("topic_test", "topic");
//创建临时队列
String queue = channel.queueDeclare().getQueue();
//队列绑定交换机
channel.queueBind(queue, "topic_test", "log.#");
//获取消息
channel.basicConsume(queue, false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("log.*获取消息:" + new String(body));
//手动确认消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
<六>. Work模型(工作队列work Queue)
<七>. Work模型(工作队列work Queue)
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)