RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成.

RabbitMQ工作模型图

  1. Publisher
    消息的生产者
  2. Consumer
    消息的消费者,两种消费消息的模式,Pull模式(主动获取basicGet)/Push模式(被动接收basicConsume)
    RabbitMQ 中 pull 和 push 都有实现。而 kafka 和RocketMQ只有pull。
  3. Exchange
    交换器,用来接收生产者发出的消息并将这些消息路由给服务器中的队列。
  4. Binding 
    绑定,用于消息队列和交换器之间的关联。
  5. Queue
    消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。
    实际上 RabbitMQ 是用数据库来存储消息的,这个数据库跟RabbitMQ一样是用Erlang 开发的,名字叫 Mnesia。
  6. Routing-key
    路由键,RabbitMQ决定消息投递到哪个队列的规则。
    队列通过路由健绑定到交换器。
    消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和 绑定使用的路由键进行匹配。匹配则投递到该队列,不匹配将进入私信队列。
  7. Connection
    链接,指RabbitMQ服务器和服务建立的TCP连接。
  8. Channel
    通道,如果所有的生产者发送消息和消费者接收消息,都直接创建和释放TCP长连接的话,对于 Broker 来说肯定会造成很大的性能损耗,也会浪费时间。所以在 AMQP 里面引入了 Channel 的概念,它是一个虚拟的连接。我们把它翻译成通道,或者消息信道。这样我们就可以在保持的TCP 长连接里面去创建和释放Channel,大大了减少了资源消耗。
    不同的 Channel 是相互隔离的,每个 Channel 都有自己的编号。
  9. Broker
    中介,表示消息队列服务器实体。 RabbitMQ的服务器我们把它叫做 Broker,中文翻译是代理/中介,因为MQ服务器帮助我们做的事情就是存储、转发消息。
  10. Vhost
    虚拟主机,每个虚拟主机对应一套交换机、队列、和他们之间的绑定关系。不同的VHOST中可以有同名的 Exchange 和 Queue,它们是完全透明的。Vhost 除了可以提高硬件资源的利用率之外,还可以实现资源的隔离和权限的控制。可以给不同的系统之间使用单独的Vhost,创建专属的用户,给用户分配对应的Vhost权限。RabbitMQ默认Vhost名字是“/”

在这里插入图片描述

消息分发机制 Exchanges and Exchange Types

我们说到 RabbitMQ 引入 Exchange 是为了实现消息的灵活路由,到底有哪些路由方式?
RabbitMQ 中一共有四种类型的交换机。

Exchange typeDefault pre-declared names
Direct exchange (直连)(Empty string) and amq.direct
Fanout exchange(广播)amq.fanout
Topic exchange(主题)amq.topic
Headers exchangeamq.match (and amq.headers in RabbitMQ)

Direct Exchange 直链模式 也是默认模式 (发布订阅模式 完全匹配)
在这里插入图片描述

Fanout exchange (广播模式) Fanout交换将消息路由到绑定到其的所有队列,并且忽略了路由键。体育赛事广播、游戏排行广播、分布式系统可以广播各种状态和配置更新、群聊
在这里插入图片描述Topic exchange (主题,规则匹配)
一个队列与主题类型的交换机绑定时,可以在绑定键中使用通配符。支持两个通配符:
# 代表匹配 0 个或者多个单词
* 代表匹配不多不少一个单词

Headers Exchange
不常用

持久化机制

RabbitMQ 的持久化分为消息持久化、队列持久化、交换器持久化。无论是持久化消息还是非持久化消息都可以被写入磁盘。
非持久化消息在RabbitMQ宕机时会丢失,即使已写入磁盘也会删除。
队列、交换机持久化参数boolean durable
消息持久化 参数设置

// MessageProperties.PERSISTENT_TEXT_PLAIN
 public static final BasicProperties PERSISTENT_TEXT_PLAIN =
        new BasicProperties("text/plain",
                            null,
                            null,
                            2,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);
}

即 deliveryMode = 2

  1. 内存控制
    RabbitMQ 中通过内存阈值参数控制内存的使用量,当内存使用超过配置的阈值 时,RabbitMQ 会阻塞客户端的连接并停止接收从客户端发来的消息,以免服务崩溃, 同时,会发出内存告警,此时客户端于与服务端的心跳检测也会失效。
    内存控制可以通过命令来设置修改参数,RabbitMQ 提供 relative 与 absolute 两种配置方式
// 百分比 fraction 建议 0.4~0.66
rabbitmqctl set_vm_memory_high_watermark <fraction>
// 绝对值,固定大小,单位为 KB、MB、GB
rabbitmqctl set_vm_memory_high_watermark absolute <value>

fraction 为内存阈值,默认是 0.4,表示 RabbitMQ 使用的内存超过系统内存的 40%时,会产生内存告警,重启会失效,永久生效需要修改配置文件。

  1. 内存换页
    在RabbitMQ内存达到内存阀值并阻塞生产者时,会尝试将内存中的消息换页到磁盘重,以释放内存空间。内存换页由换页参数控制,默认为 0.5,表示内存使用量达到内存阀值到50%是会生效进行换页。即0.4*0.5=0.2
vm_memory_high_watermark_paging_ratio=0.5

当换页阈值大于 1 时,相当于禁用了换页功能

  1. 磁盘控制
    RabbitMQ 通过磁盘阈值参数控制磁盘的使用量,当磁盘剩余空间小于磁盘阈值 时,RabbitMQ 同样会阻塞生产者,避免磁盘空间耗尽。默认50M,由于是定时检测磁盘空间,不能完全消除因磁盘耗尽而导致崩溃的可能性。一种相对谨慎的做法是将磁盘阈值大小设置与内存相等。
rabbitmqctl set_disk_free_limit <limit> # limit 为绝对值,KB、MB、GB
rabbitmqctl set_disk_free_limit mem_relative <fraction> # fraction 为相对值,建议 1.0~2.0 之间
# rabbitmq.conf
disk_free_limit.relative=1.5
# disk_free_limit.absolute=50MB
  1. 插件管理
rabbitmq-plugins list # 查看
rabbitmq-plugins enable rabbitmq_management # 开启
rabbitmq-plugins disable rabbitmq_management #关闭
  1. 配置
    查看 rabbitmq 的有效配置
rabbitmqctl environment
  1. DEMO 这里提供JAVA客户端代码

生产者 MyProducer.java
消费者 MyConsumer.java
这里的3步在生产者和消费者执行都行,也可以在RabbitMQ管理后台手动新增和绑定。
一般我们本地测试先创建消费者,再等待生产者发送消息,所以这里在消费者执行生命。

            //声明交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);
			//声明队列
            channel.queueDeclare(QUEUE_NAME, false,false,false,null);
            //绑定队列和交换机
            channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");
import com.rabbitmq.client.*;
public class MyProducer {
    private final static String QUEUE_NAME = "SIMPLE_QUEUE";
    private final static String  EXCHANGE_NAME = "LIAO_EXCHANGE";

    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.142.129.217");
        factory.setPort(5673);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("admin");
        try {
            //创建连接
            Connection conn = factory.newConnection();
            //创建通道
            Channel channel = conn.createChannel();
            //声明交换机
           // channel.exchangeDeclare(EXCHANGE_NAME,"direct",true);
			//声明队列
            //channel.queueDeclare(QUEUE_NAME, false,false,false,null);
            //绑定队列和交换机
           //channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,"");
            // 发送消息
            String msg = "Hello world, Rabbit MQ";
            channel.basicPublish( EXCHANGE_NAME,"liao.routingKey", null, msg.getBytes());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消费者

package com.liao.edu.mq.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer {
    private final static String QUEUE_NAME = "LIAO_QUEUE";
    private final static String  EXCHANGE_NAME = "LIAO_EXCHANGE";
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("43.142.129.217");
        factory.setPort(5673);
        factory.setVirtualHost("/");
        factory.setUsername("admin");
        factory.setPassword("admin");
            try {
                // 建立连接
                Connection conn = factory.newConnection();
                // 创建消息通道
                Channel channel = conn.createChannel();
                //生命交换机
             channel.exchangeDeclare(EXCHANGE_NAME,"direct",false,false,null);
                //生命队列
                channel.queueDeclare(QUEUE_NAME,false,false,false,null);
                //绑定交换机和队列
                channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"liao.routingKey");

                Consumer consumer = new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("consumerTag : "+consumerTag);
                        System.out.println("deliveryTag : "+envelope.getDeliveryTag());
                        System.out.println("received msg : "+new String(body,"utf-8"));
                    }
                };
                // 开始获取消息
                channel.basicConsume(QUEUE_NAME,true,consumer);
            } catch (Exception e) {
                e.printStackTrace();
            }

    }
}

申请了1个月的腾讯云服务,搭建的RabbitMQ服务。
http://43.142.129.217:15673/#/
admin/admin
过期前都可以用

  1. 延时消息插件
    GitHub下载直达
    下载rabbitmq_delayed_message_exchange-3.10.2.ez到本地
    将插件copy到RabbitMQ容器内
# 拷贝插件
docker cp /rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:/opt/rabbitmq/plugins/

# 进入容器内
docker exec -it rabbitmq bash

# 查看插件列表
rabbitmq-plugins list 

# 开启插件支持 
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

# 退出容器
ctrl + p +q

# 重启容器
docker restart rabbitmq

控制台出现x-delayed-message则安装成功
在这里插入图片描述

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐