RabbitMQ 轻松入门看我就够了~
什么是消息队列“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。RabbitMQ的特点RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一
什么是消息队列
“消息队列(Message Queue)”是在消息的传输过程中保存消息的容器。在消息队列中,通常有生产者和消费者两个角色。生产者只负责发送数据到消息队列,谁从消息队列中取出数据处理,他不管。消费者只负责从消息队列中取出数据处理,他不管这是谁发送的数据。
RabbitMQ的特点
RabbitMQ是一款使用Erlang语言开发的,实现AMQP(高级消息队列协议)的开源消息中间件。首先要知道一些RabbitMQ的特点,官网可查:
可靠性。支持持久化,传输确认,发布确认等保证了MQ的可靠性。
灵活的分发消息策略。这应该是RabbitMQ的一大特点。在消息进入MQ前由Exchange(交换机)进行路由消息。分发消息策略有:简单模式、工作队列模式、发布订阅模式、路由模式、通配符模式。
支持集群。多台RabbitMQ服务器可以组成一个集群,形成一个逻辑Broker。
多种协议。RabbitMQ支持多种消息队列协议,比如 STOMP、MQTT 等等。
支持多种语言客户端。RabbitMQ几乎支持所有常用编程语言,包括 Java、.NET、Ruby 等等。
可视化管理界面。RabbitMQ提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker。
插件机制。RabbitMQ提供了许多插件,可以通过插件进行扩展,也可以编写自己的插件。
核心概念
Server:又称Broker,接受客户端的连接,实现AMQP实体服务。需要安装rabbitmq-server.
Connection:连接。应用程序与Broker的网络连接。
Channel:网络信道。几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立多个信道,每个信道代表一个会话任务。
Message:消息。服务于应用程序之间传递的数据,由Properties与body组成,Properties可以对消息进行修饰,比如消息的优先级,延迟等高级特性;body则是消息体的内容。
Virtual Host:虚拟地址。用于建立逻辑隔离,最上层的消息路由。一个虚拟主机内可以有多个exchange和queue,但是不能有相同名称的exchange。
Exchange:交换机。接受消息,根据路由将消息发送到绑定的队列。(不具备消息存储的能力)
Bindings:Exchange和queue之间的虚拟连接,可以包含多个routing key。
Routing Key:路由规则。根据路由规则,将消息发送到指定消费者。
Queue:队列。保存消息并将消息发送至消费者。
使用场景
RabbitMQ主要用于解耦、削峰以及异步;
我们可以通过一个简单的用户下单之后发送短信及邮件的功能来简单说明RabbitMQ的作用。由下图及代码可见,如果我们在开发中直接将以下业务写成串行执行会极大的消耗时间,同时如果短信或邮件服务执行失败还会导致用户下单失败。
public void makeOrder(){
// 保存订单
orderservice.saveOrder();
// 发送短信
messageService.send();
// 发送邮件
emailService.send();
// 发送APP
appService.send();
}
这种问题首先可以采用并行执行的方式,在将订单消息保存至数据库后,然后开启线程池分别执行发送邮件及发送短信的服务;虽然这种方式可以提升响应时间,但是也存在一定的问题。首先需要手动维护线程池,同时还需要实现高可靠以及高可用等,最重要的是线程池的代码还耦合在我们的程序代码中。
所以我们可以采用异步消息队列的方式来解决这种问题,他的优势在于:
- 完全解耦、用MQ建立连接。
- 有独立的线程池以及运行模型。
- MQ具有持久化功能,确保消息的可靠性。
由下图可见,采用异步消息队列的方式可以帮助我们极大的提升处理速度,当速度提升之后便有更多的时间去处理其他业务,这就达到了削峰的功能。
同时该方式还可以对代码进行解耦:首先MQ就是一个单独的高可用服务,同时我们可以将短信服务邮件服务拆分成不同的模块、之后还可以添加其他的模块,比如微信模块等;
RabbitMQ安装
可以参考该文章安装RabbitMQ。
RabbitMQ安装
RabbitMQ支持消息的模式
我们可以从RabbitMQ官网看到RabbitMQ具有多种工作模式以及使用方法。
注意事项:以下几个模式由于没在代码中直接声明交换机以及队列,需要需要手动在rabbitmq界面中手动创建交换机以及绑定队列,手动申明交换机及队列在最后贴出。简单模式中没有声明交换机是因为如果在代码中没有声明交换机,则会采用rabbitmq中的默认交换机。
1. 简单模式
简单模式可以理解为生产者发送消息,消费者接受消息。
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//2.创建连接Connection
Connection connection = connectionFactory.newConnection("生产者");
//3.通过连接获取通过Channel
Channel channel = connection.createChannel();
//4.通过通道创建交换机,队列,绑定关系,路由KEY,发送消息和接受消息
/**
* 队列名称、
* 是否要持久化、
* 排他性(是否独占)、
* 是否自动删除(随着最后一个消费者结束后是否将队列自动删除)、
* 携带附加参数
*/
channel.queueDeclare("queue1",false,false,false,null);
//5.准备消息内容
String msg = "Hello RabbitMQ";
//6.发送消息给队列queue
/**
* 交换机 -- 没有指定交换机会绑定服务中的默认交换机
* 队列、路由key
* 消息的状态控制
* 消息主题
*/
channel.basicPublish("",queueName,null,msg.getBytes(StandardCharsets.UTF_8));
System.out.printf("消息发送成功!");
//7.关闭连接
if (channel!=null&&channel.isOpen()){
channel.close();
}
//8.关闭通道
if (connection!=null&&connection.isOpen()){
connection.close();
}
}
}
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//所有的中間件技術都是基于tcp/ip协议;rabbitMQ遵循AMQP协议
// ip:port
//1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//2.创建连接Connection
Connection connection = connectionFactory.newConnection("生产者");
//3.通过连接获取通过Channel
Channel channel = connection.createChannel();
//4.接受消息
String queueName = "queue1";
channel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("收到消息:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("接受失败了");
}
});
System.in.read();
//7.关闭连接
if (channel != null && channel.isOpen()) {
channel.close();
}
//8.关闭通道
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
2.工作模式
工作模式的特点为分发机制,由此特点又分为轮询分发机制以及公平机制。
轮询分发机制使得每个消费者依次消费信息,不会由于性能问题而导致消费的数量不等。(两个消费者代码相同)
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//获取连接
Connection connection = connectionFactory.newConnection("生产者");
//从连接中获取通道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare("queue1", false, false, false, null);
//发送消息
for (int i = 0; i < 20; i++) {
String msg = "测试消息:" + i;
channel.basicPublish("", "queue1", null, msg.getBytes(StandardCharsets.UTF_8));
}
System.out.printf("消息发送成功!");
//9.关闭连接
if (channel != null && channel.isOpen()) {
channel.close();
}
//10.关闭通道
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
public class Work1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者-work1");
channel = connection.createChannel();
//同一时刻 服务器只会推送一条消息给消费者
Channel finalChannel = channel;
//finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try {
System.out.println("work-1收到的消息是:" + new String(delivery.getBody()));
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("接受消息失败。");
}
});
System.in.read();
} catch (Exception e) {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
}
公平机制采用的是能者多劳思想,性能好的服务器消费更多的消息。需要注意的是在公平分发机制中,消费端需要采用手动应答的机制,同时需要设置qos指标,代表同一时刻服务器推送多少条消息给消费者,该指标需要根据磁盘内存、占有率等条件自行判断,不宜设置过大,否则没有意义,可以设置默认值为1。
生产者代码和轮询机制代码相同,消费者代码需要注意的是将两个消费者类的休眠时间设置不同即可。
public class Work1 {
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
Connection connection = null;
Channel channel = null;
try {
connection = connectionFactory.newConnection("消费者-work1");
channel = connection.createChannel();
//同一时刻 服务器只会推送一条消息给消费者
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue1", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try {
System.out.println("work-1收到的消息是:" + new String(delivery.getBody()));
Thread.sleep(2000);
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("接受消息失败。");
}
});
System.in.read();
} catch (Exception e) {
if (channel != null && channel.isOpen()) {
channel.close();
}
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
}
3.发布订阅模式 Fanout
发布订阅模式对应交换机中的Fanout模式。这种类型的交换机需要将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//2.创建连接Connection
Connection connection = connectionFactory.newConnection("生产者");
//3.通过连接获取通过Channel
Channel channel = connection.createChannel();
//4.通过通道创建交换机,队列,绑定关系,路由KEY,发送消息和接受消息
/**
* 队列名称、
* 是否要持久化、
* 排他性(是否独占)、
* 是否自动删除(随着最后一个消费者结束后是否将队列自动删除)、
* 携带附加参数
*/
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
//5.准备消息内容
String msg = "Hello RabbitMQ";
//6.准备交换机
String exchangeName = "fanout-exchange";
//7.定义路由key
String routKey = "";
//8.指定交换机的类型
String type = "fanout";
/**
* 交换机 -- 没有指定交换机会绑定服务中的默认交换机
* 队列、路由key
* 消息的状态控制
* 消息主题
*/
channel.basicPublish(exchangeName, routKey, null, msg.getBytes(StandardCharsets.UTF_8));
System.out.printf("消息发送成功!");
//9.关闭连接
if (channel != null && channel.isOpen()) {
channel.close();
}
//10.关闭通道
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
public class Consumer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
//创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//获取队列名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
//获取连接
connection = connectionFactory.newConnection("生产者");
//通过连接获取通过Channel
channel = connection.createChannel();
//定义消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName + "收到消息:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("接受失败了");
}
});
System.in.read();
//7.关闭连接
if (channel != null && channel.isOpen()) {
channel.close();
}
//8.关闭通道
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
};
public static void main(String[] args) {
new Thread(runnable,"queues1").start();
new Thread(runnable,"queues2").start();
new Thread(runnable,"queues3").start();
}
}
4.路由模式 Direct
路由模式对应交换机中的Direct类型。该类型需要绑定一个队列,要求该消息与一个特定的路由键完全匹配。当然了,可以由多个相同的路由,这样就可以发送到多个队列中。
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//2.创建连接Connection
Connection connection = connectionFactory.newConnection("生产者");
//3.通过连接获取通过Channel
Channel channel = connection.createChannel();
//4.通过通道创建交换机,队列,绑定关系,路由KEY,发送消息和接受消息
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
//5.准备消息内容
String msg = "Hello RabbitMQ";
//6.准备交换机
String exchangeName = "direct_exchange";
//7.定义路由key
String routKey = "email";
//8.指定交换机的类型
String type = "direct";
channel.basicPublish(exchangeName, routKey, null, msg.getBytes(StandardCharsets.UTF_8));
System.out.printf("消息发送成功!");
//9.关闭连接
if (channel != null && channel.isOpen()) {
channel.close();
}
//10.关闭通道
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
public class Consumer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
//创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//获取队列名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
//获取连接
connection = connectionFactory.newConnection("生产者");
//通过连接获取通过Channel
channel = connection.createChannel();
//定义消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName + "收到消息:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("接受失败了");
}
});
System.in.read();
//7.关闭连接
if (channel != null && channel.isOpen()) {
channel.close();
}
//8.关闭通道
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
};
public static void main(String[] args) {
new Thread(runnable,"queues1").start();
new Thread(runnable,"queues2").start();
new Thread(runnable,"queues3").start();
}
}
5.主题模式 topic
主题模式对应交换机中的topic类型。这种交换机是使用通配符去匹配,路由到对应的队列。通配符有两种:"*" 、 “#”。需要注意的是通配符前面必须要加上".“符号。
*符号:有且只匹配一个词。比如 a.*可以匹配到"a.b”、“a.c”,但是匹配不了"a.b.c"。
#符号:匹配0个或多个词。比如"rabbit.#“既可以匹配到"rabbit.a.b”、“rabbit.a”,也可以匹配到"rabbit.a.b.c"。
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//2.创建连接Connection
Connection connection = connectionFactory.newConnection("生产者");
//3.通过连接获取通过Channel
Channel channel = connection.createChannel();
//4.通过通道创建交换机,队列,绑定关系,路由KEY,发送消息和接受
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
//5.准备消息内容
String msg = "Hello RabbitMQ";
//6.准备交换机
String exchangeName = "topic_exchange";
//7.定义路由key
String routKey = "com.order.xxx";
//8.指定交换机的类型
String type = "topic";
channel.basicPublish(exchangeName, routKey, null, msg.getBytes(StandardCharsets.UTF_8));
System.out.printf("消息发送成功!");
//9.关闭连接
if (channel != null && channel.isOpen()) {
channel.close();
}
//10.关闭通道
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
public class Consumer {
private static Runnable runnable = new Runnable() {
@Override
public void run() {
//创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接属性
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
//获取队列名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
//获取连接
connection = connectionFactory.newConnection("生产者");
//通过连接获取通过Channel
channel = connection.createChannel();
//定义消息的回调
Channel finalChannel = channel;
finalChannel.basicConsume(queueName, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(delivery.getEnvelope().getDeliveryTag());
System.out.println(queueName + "收到消息:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("接受失败了");
}
});
System.in.read();
//7.关闭连接
if (channel != null && channel.isOpen()) {
channel.close();
}
//8.关闭通道
if (connection != null && connection.isOpen()) {
connection.close();
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
};
public static void main(String[] args) {
new Thread(runnable,"queues1").start();
new Thread(runnable,"queues2").start();
new Thread(runnable,"queues3").start();
}
}
6.参数模式 Header
参数模式对应交换机中的Header类型。这种交换机用的相对没这么多。它跟上面三种有点区别,它的路由不是用routingKey进行路由匹配,而是在匹配请求头中所带的键值进行路由。如图所示:
创建队列需要设置绑定的头部信息,有两种模式:全部匹配和部分匹配。如上图所示,交换机会根据生产者发送过来的头部信息携带的键值去匹配队列绑定的键值,路由到对应的队列。
补充
虽然在rabbitMQ界面中声明交换机及队列很简单,但是我们还是要学会怎么在代码中进行声明。下方代码是在生产者中对交换机及队列进行声明。
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工程
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setPort(5672);
connectionFactory.setUsername("admin");
connectionFactory.setPassword("admin");
connectionFactory.setVirtualHost("/");
//2.创建连接Connection
Connection connection = connectionFactory.newConnection("生产者");
//3.通过连接获取通过Channel
Channel channel = connection.createChannel();
//4.通过通道创建交换机,队列,绑定关系,路由KEY,发送消息和接受消息
String queueName = "queue1";
channel.queueDeclare(queueName, false, false, false, null);
//5.准备消息内容
String msg = "Hello RabbitMQ";
//6.准备交换机
String exchangeName = "direct_message_exchange";
//7.定义路由key
String routKey = "order";
//8.指定交换机的类型
String type = "direct";
//可以直接在web界面中将queue和exchange事先绑定好,不需要在代码中声明,使代码更简洁。
//代码中声明交换机; 持久化:不会随着服务器重启而丢失;true不丢失,false丢失;
channel.exchangeDeclare(exchangeName, type, true);
//声明队列
channel.queueDeclare("queue5", true, false, false, null);
channel.queueDeclare("queue6", true, false, false, null);
channel.queueDeclare("queue7", true, false, false, null);
//消息队列绑定交换机
channel.queueBind("queue5","direct_message_exchange","order");
channel.queueBind("queue6","direct_message_exchange","order111");
channel.queueBind("queue7","direct_message_exchange","order");
channel.basicPublish(exchangeName, routKey, null, msg.getBytes(StandardCharsets.UTF_8));
System.out.printf("消息发送成功!");
//9.关闭连接
if (channel != null && channel.isOpen()) {
channel.close();
}
//10.关闭通道
if (connection != null && connection.isOpen()) {
connection.close();
}
}
}
更多推荐
所有评论(0)