初识RabbitMQ
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成.
RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成.
- Publisher
消息的生产者 - Consumer
消息的消费者,两种消费消息的模式,Pull模式(主动获取basicGet)/Push模式(被动接收basicConsume)
RabbitMQ 中 pull 和 push 都有实现。而 kafka 和RocketMQ只有pull。 - Exchange
交换器,用来接收生产者发出的消息并将这些消息路由给服务器中的队列。 - Binding
绑定,用于消息队列和交换器之间的关联。 - Queue
消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。
实际上 RabbitMQ 是用数据库来存储消息的,这个数据库跟RabbitMQ一样是用Erlang 开发的,名字叫 Mnesia。 - Routing-key
路由键,RabbitMQ决定消息投递到哪个队列的规则。
队列通过路由健绑定到交换器。
消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和 绑定使用的路由键进行匹配。匹配则投递到该队列,不匹配将进入私信队列。 - Connection
链接,指RabbitMQ服务器和服务建立的TCP连接。 - Channel
通道,如果所有的生产者发送消息和消费者接收消息,都直接创建和释放TCP长连接的话,对于 Broker 来说肯定会造成很大的性能损耗,也会浪费时间。所以在 AMQP 里面引入了 Channel 的概念,它是一个虚拟的连接。我们把它翻译成通道,或者消息信道。这样我们就可以在保持的TCP 长连接里面去创建和释放Channel,大大了减少了资源消耗。
不同的 Channel 是相互隔离的,每个 Channel 都有自己的编号。 - Broker
中介,表示消息队列服务器实体。 RabbitMQ的服务器我们把它叫做 Broker,中文翻译是代理/中介,因为MQ服务器帮助我们做的事情就是存储、转发消息。 - Vhost
虚拟主机,每个虚拟主机对应一套交换机、队列、和他们之间的绑定关系。不同的VHOST中可以有同名的 Exchange 和 Queue,它们是完全透明的。Vhost 除了可以提高硬件资源的利用率之外,还可以实现资源的隔离和权限的控制。可以给不同的系统之间使用单独的Vhost,创建专属的用户,给用户分配对应的Vhost权限。RabbitMQ默认Vhost名字是“/”
消息分发机制 Exchanges and Exchange Types
我们说到 RabbitMQ 引入 Exchange 是为了实现消息的灵活路由,到底有哪些路由方式?
RabbitMQ 中一共有四种类型的交换机。
Exchange type | Default pre-declared names |
---|---|
Direct exchange (直连) | (Empty string) and amq.direct |
Fanout exchange(广播) | amq.fanout |
Topic exchange(主题) | amq.topic |
Headers exchange | amq.match (and amq.headers in RabbitMQ) |
Direct Exchange 直链模式 也是默认模式 (发布订阅模式 完全匹配)
Fanout exchange (广播模式) Fanout交换将消息路由到绑定到其的所有队列,并且忽略了路由键。体育赛事广播、游戏排行广播、分布式系统可以广播各种状态和配置更新、群聊
Topic exchange (主题,规则匹配)
一个队列与主题类型的交换机绑定时,可以在绑定键中使用通配符。支持两个通配符:
# 代表匹配 0 个或者多个单词
* 代表匹配不多不少一个单词
Headers Exchange
不常用
持久化机制
RabbitMQ 的持久化分为消息持久化、队列持久化、交换器持久化。无论是持久化消息还是非持久化消息都可以被写入磁盘。
非持久化消息在RabbitMQ宕机时会丢失,即使已写入磁盘也会删除。
队列、交换机持久化参数boolean durable
消息持久化 参数设置
// MessageProperties.PERSISTENT_TEXT_PLAIN
public static final BasicProperties PERSISTENT_TEXT_PLAIN =
new BasicProperties("text/plain",
null,
null,
2,
0, null, null, null,
null, null, null, null,
null, null);
}
即 deliveryMode = 2
- 内存控制
RabbitMQ 中通过内存阈值参数控制内存的使用量,当内存使用超过配置的阈值 时,RabbitMQ 会阻塞客户端的连接并停止接收从客户端发来的消息,以免服务崩溃, 同时,会发出内存告警,此时客户端于与服务端的心跳检测也会失效。
内存控制可以通过命令来设置修改参数,RabbitMQ 提供 relative 与 absolute 两种配置方式
// 百分比 fraction 建议 0.4~0.66
rabbitmqctl set_vm_memory_high_watermark <fraction>
// 绝对值,固定大小,单位为 KB、MB、GB
rabbitmqctl set_vm_memory_high_watermark absolute <value>
fraction 为内存阈值,默认是 0.4,表示 RabbitMQ 使用的内存超过系统内存的 40%时,会产生内存告警,重启会失效,永久生效需要修改配置文件。
- 内存换页
在RabbitMQ内存达到内存阀值并阻塞生产者时,会尝试将内存中的消息换页到磁盘重,以释放内存空间。内存换页由换页参数控制,默认为 0.5,表示内存使用量达到内存阀值到50%是会生效进行换页。即0.4*0.5=0.2
vm_memory_high_watermark_paging_ratio=0.5
当换页阈值大于 1 时,相当于禁用了换页功能
- 磁盘控制
RabbitMQ 通过磁盘阈值参数控制磁盘的使用量,当磁盘剩余空间小于磁盘阈值 时,RabbitMQ 同样会阻塞生产者,避免磁盘空间耗尽。默认50M,由于是定时检测磁盘空间,不能完全消除因磁盘耗尽而导致崩溃的可能性。一种相对谨慎的做法是将磁盘阈值大小设置与内存相等。
rabbitmqctl set_disk_free_limit <limit> # limit 为绝对值,KB、MB、GB
rabbitmqctl set_disk_free_limit mem_relative <fraction> # fraction 为相对值,建议 1.0~2.0 之间
# rabbitmq.conf
disk_free_limit.relative=1.5
# disk_free_limit.absolute=50MB
- 插件管理
rabbitmq-plugins list # 查看
rabbitmq-plugins enable rabbitmq_management # 开启
rabbitmq-plugins disable rabbitmq_management #关闭
- 配置
查看 rabbitmq 的有效配置
rabbitmqctl environment
- DEMO 这里提供JAVA客户端代码
生产者 MyProducer.java
消费者 MyConsumer.java
这里的3步在生产者和消费者执行都行,也可以在RabbitMQ管理后台手动新增和绑定。
一般我们本地测试先创建消费者,再等待生产者发送消息,所以这里在消费者执行生命。
//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);
//声明队列
channel.queueDeclare(QUEUE_NAME, false,false,false,null);
//绑定队列和交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");
import com.rabbitmq.client.*;
public class MyProducer {
private final static String QUEUE_NAME = "SIMPLE_QUEUE";
private final static String EXCHANGE_NAME = "LIAO_EXCHANGE";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("43.142.129.217");
factory.setPort(5673);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
try {
//创建连接
Connection conn = factory.newConnection();
//创建通道
Channel channel = conn.createChannel();
//声明交换机
// channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);
//声明队列
//channel.queueDeclare(QUEUE_NAME, false,false,false,null);
//绑定队列和交换机
//channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");
// 发送消息
String msg = "Hello world, Rabbit MQ";
channel.basicPublish( EXCHANGE_NAME,"liao.routingKey", null, msg.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者
package com.liao.edu.mq.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer {
private final static String QUEUE_NAME = "LIAO_QUEUE";
private final static String EXCHANGE_NAME = "LIAO_EXCHANGE";
public static void main(String[] args) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("43.142.129.217");
factory.setPort(5673);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("admin");
try {
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
//生命交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,false,null);
//生命队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//绑定交换机和队列
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"liao.routingKey");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag : "+consumerTag);
System.out.println("deliveryTag : "+envelope.getDeliveryTag());
System.out.println("received msg : "+new String(body,"utf-8"));
}
};
// 开始获取消息
channel.basicConsume(QUEUE_NAME,true,consumer);
} catch (Exception e) {
e.printStackTrace();
}
}
}
申请了1个月的腾讯云服务,搭建的RabbitMQ服务。
http://43.142.129.217:15673/#/
admin/admin
过期前都可以用
- 延时消息插件
GitHub下载直达
下载rabbitmq_delayed_message_exchange-3.10.2.ez到本地
将插件copy到RabbitMQ容器内
# 拷贝插件
docker cp /rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:/opt/rabbitmq/plugins/
# 进入容器内
docker exec -it rabbitmq bash
# 查看插件列表
rabbitmq-plugins list
# 开启插件支持
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
# 退出容器
ctrl + p +q
# 重启容器
docker restart rabbitmq
控制台出现x-delayed-message则安装成功
更多推荐
所有评论(0)