目录

 一:rabbitMQ的安装

二:MQ知识锦集

三:rabbitMQ的使用

<一>:直连模式

<二>. Work模型(工作队列work Queue)

<三>. fanout模型(广播队列fanout Queue)

<四>. routing模型(订阅路由队列)

<五>. routing模型(订阅动态路由topic模型)

<六>. Work模型(工作队列work Queue)

<七>. Work模型(工作队列work Queue)


 

引言

      RabbitMQ是基于AMQP协议(具有天然的跨平台性),由erlang语言开发,天生具有高并发的语言优势。是目前部署最广泛的开源消息中间件,它的优势在于erlang语言开发,适用于socket开发,其次是它天生与spring框架整合非常方便,最后它在处理消息的丢失,事务一致性方便处理的十分严密,几乎没有丢失。

生产者产生消息后,放在exchange中,exchange将消息一个一个的放在消息队列中,消费者与消息队列进行绑定,从而获取消息

目录

 一:rabbitMQ的安装

二:rabbitMQ的使用

<一>:直连模式


 一:rabbitMQ的安装

1. 官网:https://www.rabbitmq.com/

 下载不止要下载rabbit的安装包,还要下载erlang语言的支持,因为rabbitMQ是基于该语言开发的。而且erlang的版本要与rabbit的版本兼容

有三个:erlang-22.0.7-1.el7.x86_64.rpm、rabbitmq-server-3.7.18.rpm、socat-1.7.3.2.rpm

安装教程参考博客:https://blog.csdn.net/qq_20492999/article/details/81254242

2. 登录到rabbit的web管理界面,可进行添加账号,虚拟主机绑定,通道管理,交换机管理等操作

rabbitMQ的服务端口默认是5672,管理界面默认端口是15672,集群的通讯端口是25672

对于MQ来说,连接时需要设置虚拟主机(virtuaHost),默认的虚拟主机是:/,可以对虚拟主机添加用户,讲虚拟主机与用户进行绑定,在连接时,设置虚拟主机和绑定该虚拟主机的用户,完成连接。虚拟主机的概念就是用来区分不同的项目的,例如我A系统生产者和消费者统一都是连接的xxx这个虚拟主机,B系统连接的是mmm虚拟主机,这样可以进行业务模块区分,相当于数据库的不同库的概念。也可以是相同系统中的不同环境或者不同业务模块,也可以根据虚拟主机进行隔离区分。

3. 在rabbitMQ的web管理界面中,可以添加用户添加虚拟主机, 查看交换机,查看消息队列等信息

二:MQ知识锦集

  • MQ消息的自动确认机制。开启后可能会丢失消息。关闭自动确认消息机制,消费完成后手动确认消息
  • MQ消息的持久化:队列与消息均设置为持久化。在创建连接并获取通道后,在通道与队列进行绑定的过程中,需要设置队列的持久化。在通道给队列发送消息的时候,也要设置消息的持久化参数。设置持久化后,在rabbit服务重启后,消息和队列对从磁盘中完成恢复。

 

三:rabbitMQ的使用

<一>:直连模式

生产者发送消息(通过通道将消息直接放在队列)、消费者绑定消息队列,等待消息的到来

    1. 发布消息

public void test1() throws Exception{
        //创建MQ连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("127.0.0.1");
        //设置连接端口号
        connectionFactory.setPort(5672);
        //设置连接的虚拟主机
        connectionFactory.setVirtualHost("/admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        //获取连接对象
        Connection connection = connectionFactory.newConnection();

        //获取连接中通道对象
        Channel channel = connection.createChannel();
/*
        通道绑定 对应消息队列
        p1: 队列名称,如果不存在会自动创建
        p2: 是否持久化,即rabbitmq重启后,该队列是否还存在,true时,该队列会存放在磁盘。false时,重启后不复存在,仅仅是队列的持久化,不管是true还是false。队列中的消息都有可能丢失,如果想让具体的消息也完成持久化,需要在发布消息的时候对队列跟消息均进行持久化设置。
        p3: 是否独占队列,代表此队列是否只能被当前连接所使用。true:其他连接不可使用,连接ji连接对象
        p4: 是否在消费完成后,自动删除队列,true:删除,false:不删除.但是是在消费者与队列连接以及通道彻底关闭时,并且消息已经被消费完成。才会自动删除。
        p5: 额外附加参数
         */
        channel.queueDeclare("hello",false,false,false,null);

        // 此时与MQ的连接已经完成,通道已经创建,通道与队列已经完成绑定。
        // 可以进行发布消息了
        /*
        p1: 交换机名称
        p2: 队列名称
        p3: 传递消息额外设置,可以通过参数,将消息设置为持久化MessageProperties.PERSISTENT_TEXT_PLAIN,当队列与消息【均为】持久化时,消息就不会被丢失
        p4: 消息的具体内容
         */
        channel.basicPublish("","hello",MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());

        channel.close();
        connection.close();

    }


    
    2. 消费消息

public static void main(String[] args) throws Exception{
        //创建MQ连接工厂
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置连接rabbitmq主机
        connectionFactory.setHost("127.0.0.1");
        //设置连接端口号
        connectionFactory.setPort(5672);
        //设置连接的虚拟主机
        connectionFactory.setVirtualHost("/admin");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");

        Connection connection = connectionFactory.newConnection();
        //创建通道对象
        Channel channel = connection.createChannel();

        //通道绑定队列,消费者对同一个队列的定义,需要与消费者对该队列的定义一致,不要出现,消费者定义该队列为持久化,
        //生产者定义的是不持久化,否则会出现问题
        channel.queueDeclare("hello", false, false, false, null);

        //消费消息
        //方法一:
        /*
        p1: 消费消息队列的名称
        p2: 开启消息的自动确认机制,为true时:也就是消费者收到消息后,不管后续处理如何,会立即告诉rabbitmq已经完成了这些消息的处理,mq队列中就会将给该消费者的消息进行删除
        p3: 消费时的回调接口
         */
        /*channel.basicConsume("hello",true,new DefaultConsumer(channel) {
            @Override //body参数,就是我们拿到的消息
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("message is " + new String(body));
            }
        });*/


        //方法二:rabbitMQ官网给出的example:
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody());
            System.out.println("message is that:" + message);
        };
        channel.basicConsume("hello", true, deliverCallback, consumerTag -> {});
        //如果通道关闭或者连接关闭,就会导致我们的消费者还没来得及消费消息,或者还没有消费完,通道连接就关闭了,
        //从而导致消息没有被正确处理
        /*channel.close();
        connection.close();*/
    }


    3. 特性:这种方式是最简单的点对点的消息队列模型,《一个通道只能被一个消费者使用》。
       尽管简单,但是使用可以根据业务逻辑变得广泛

    4. 对于实际的项目中,可以将MQ的连接信息封装在配置文件中,通过启动bean的方式将MQ的连接工厂进行注册,后续使用时可通过注入的方式使用连接工厂获取通道

 一个通道(channel)可以向多个队列(queue)发送消息

直连模型的总结以及相关API的使用:

1. 消息的持久化设置:队列与消息均设置为持久化。在创建连接并获取通道后,在通道与队列进行绑定的过程中,需要设置队列的持久化。在通道给队列发送消息的时候,也要设置消息的持久化参数。设置持久化后,在rabbit服务重启后,消息和队列对从磁盘中完成恢复。

<二>. Work模型(工作队列work Queue)

简介:当消息处理比较耗时时,可能存在生产者生产消息的速度远远大于消息消费的速度,从而导致消息在队列中的堆积,得不到即时消费,影响业务。之前是一个队列对应一个消费者,生产者生产十个消息到队列的时间。消费者才消费一个消息,导致了队列消息堆积得不到及时的处理。从而此时就可以使用work模型,多个消费者绑定到同一个队列,共同消费队列中的消息,队列中的消息一旦被消费就会消除。所以不会重复处理消息的。

1. 创建消息生产者(参考上面代码)

2. 创建两个一样的消费者,绑定同一个队列(参考上面代码)

将某个消费者设置为一秒钟处理一条消息后发现,该消费者处理的消息数量是不会改变的。这是因为MQ给两个消费者是循环平均分配

 Connection connection = MQconnectFactory.getConnetcionStatic();
        Channel channel = connection.createChannel();

        channel.queueDeclare("work", true, false, false, null);

        channel.basicConsume("work", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //将该消费者模拟设置为一秒钟处理一个消息。
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumer-1:" + new String(body));
            }
        });

 3. 此时存在两个问题,

问题一:如果A消费者在消费过程中,例如队列有100个消息,AB两个消费者,当这两个消费者连接到该队列后,会立马每人领走50个消息,并将该消息在队列中删除。但如果A被强行关闭或者dead掉了,则该消费者中的所有被分配的消息都会被丢失掉。

原因:是因为在消费者处理消息时,启动了自动确认机制。channel.basicConsume("hello",true,new DefaultConsumer(channel)),第二个参数为是否进行自动确认为true,此时消费者会告诉rabbitmq已经将这些消息处理了,rabbitmq则会立马在队列中将这些消息删除。

解决方案:将该参数设置为false,并且在消费完成每一个消息后,进行手动确认

Connection connection = MQconnectFactory.getConnetcionStatic();
        Channel channel = connection.createChannel();

        //意思是消费者在消费消息时,每次在通道里传送一条消息。这样消费者就会一条消息一条消息的进行处理了。
        //channel.basicQos(1);

        channel.queueDeclare("work1", true, false, false, null);

        channel.basicConsume("work1", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    //将该消费者模拟设置为一秒钟处理一个消息。
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("consumer-1:" + new String(body));
                //手动确认消息机制
                // 参数1:告诉MQ我需要确认的是哪一个消息,参数二,是否进行多消息同时确认
                channel.basicAck(envelope.getDeliveryTag(), true);
            }
        });

问题二:A消费者处理消息很慢很慢,导致了业务不能及时得到处理。从而长此以往形成了业务阻塞。

原因:因为AB消费者是通过MQ的循环机制完成任务的平均分配

解决方案:将MQ队列中的消息不进行平均分配,也就是说,让AB两个消费者不再是一次每人直接领走50个任务了,而是让AB消费者每次处理一个任务,处理完了再领下一个任务。这样,A处理的慢领到的任务就会少,B处理快,领导的任务自然就多了。从而让业务消息AB一起处理:

Connection connection = MQconnectFactory.getConnetcionStatic();
        Channel channel = connection.createChannel();

        //通道中每次从队列给消费者传递一个任务去处理
        channel.basicQos(1);

        channel.queueDeclare("work1", true, false, false, null);

        channel.basicConsume("work1", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer-2:" + new String(body));
                //参数2:每次确认一个消息,不开启多消息确认机制
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

 

结果:A消费者处理了三条消息,B消费者处理了97条消息 

总结:

work模式总结:

1. 默认情况下,rabbitMQ将按顺序将每个消息发送给下一个消费者,也就是说,在work模型中,不管有多少个消费者,不管每个消费者处理消息的速度如何。他们处理的消息数量都是一样的。

例:生产者给1队列放置了1000条消息,目前有AB两个消费者消费,A,1秒钟处理一个,B一秒钟处理十个。此时A处理的数量=B处理的数量。如果在处理过程中加入了消费者C。则此时ABC处理消息按照C加入的剩余消息来三个消费者均分处理。

2. 问题:如果消费者A在处理消息过程中阻塞或者dead机,但是A所分配的消息已经被MQ确认,此时就会导致A剩下的所有消息会被丢失。

解决:在消费者进行消费消息时,将自动确认机制设置为false,并在每消费完一个消息后,手动完成消息的确认。

 

<三>. fanout模型(广播队列fanout Queue)

简介:

①广播模型的意思是:交换机与多个队列进行绑定,队列与每一个消费者将进行绑定。生产者发送消息给交换机,交换机将该消息决定发送给哪些队列,然后交给消费者去消费。从而生产者只需要将消息交给队列,就可以完成消息的发布。从而实现,一个消息被多个消费者实现。该类型的交换机会忽略routingkey(路径)

②交换机在被发出消息后,他会将该消息立马发送给绑定了该交换机的队列。如果不存在与该交换机绑定的队列,消息就不会发给任何消费端,从而消息被丢失。官方文档解释:如果没有队列绑定到交换机,消息将丢失,但这对我们来说是可以的。如果没有消费者在听,我们可以安全地丢弃该消息。

③当消费者被断开连接时,所对应的队列会立马从服务端删除

场景:例如在某个订单系统:将库存系统,发货系统,通知系统分别绑定提交订单的交换机。用户提交订单后,发出提交订单的消息给交换机,交换机分发这些消息给队列,队列完成相应的库存减少,发货,以及用户通知等操作。从而完成一系列的订单操作。

1. 生产者:生产者在通道中绑定交换机,即可完成把消息发送到交换机。

//通道对象声明指定交换机,参数1:交换机名称  参数2:交换机类型固定的类型
        channel.exchangeDeclare("order", "fanout");

        //发布消息,将交换机的名字传进去,不用传队列了。
        channel.basicPublish("order", "", null, "this is a new order".getBytes());

        channel.close();
        connetcionStatic.close();

2. 消费者:

①通道绑定交换机

②生成临时队列,交换机与队列完成绑定,临时队列当连接关闭会自动删除,autodelete属性为true

③读取消息完成消费

 //1. 通道绑定交换机
        channel.exchangeDeclare("order", "fanout");

        //2. 获取临时队列。当没有消息的时候队列自动删除
        String queue = channel.queueDeclare().getQueue();

        //3. 在通道中将交换机和队列进行绑定
        //参数1:队列名 参数2:交换机名  参数3:routingkey路径该类型不需要该参数
        channel.queueBind(queue, "order", "");

        //4. 进行消费消息
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("这是库存系统进行减少库存操作" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

3. 创建多个消费者绑定该交换机,即可完成一个消息多个消费

 

 

 

<四>. routing模型(订阅路由队列)

第一种订阅方式:direct通过固定直连路由完成订阅

简介:基于fanout模型的基础上,在该模式中,添加了路由(routingkey)的概念,也就是说生产者在发消息的时候,决定将该消息发送给哪个交换机的同时,再加一个条件决定给哪个绑定路由的消费者。

①生产者向交换机发送消息时,需要对该消息绑定一个路由;

②交换机将消息发送给-绑定了该交换机、-路由与生产者发出的路由一致  的完全匹配的队列

③消费者C1绑定了error的路由交换机,则发出消息并且路由的error的消息就会被C1接受处理

④消费者C2绑定了三个路由,则发出的这三种路由的消息都会被C2接受处理

⑤同理,如果生产者发送消息时,没有任何绑定和订阅,则消息就会被抛弃丢失

这种模式的好处是:生产者发送给交换机的消息,不再是广播的形式每个消费者都会受到,而是再去进行分类。每个消息设置一个路由类型,只有订阅了该类型的消费者,才会收到消息。

1. 生产者:

 Connection connetcionStatic = MQconnectFactory.getConnetcionStatic();
        Channel channel = connetcionStatic.createChannel();

        //通道生命交换机以及交换机类型
        channel.exchangeDeclare("log_direct", "direct");

        //发布消息,并指定消息的路由  参数: 交换机、路由、额外参数持久化、消息
        channel.basicPublish("log_direct", "error", null, "this is a error msg".getBytes());

        channel.close();
        connetcionStatic.close();

2. 消费者:

//消费者1:
Connection connetcionStatic = MQconnectFactory.getConnetcionStatic();
        Channel channel = connetcionStatic.createChannel();

        //通道生命交换机和类型
        channel.exchangeDeclare("log_direct", "direct");

        //生成临时队列
        String queue = channel.queueDeclare().getQueue();

        //绑定交换机和队列
        channel.queueBind(queue, "log_direct", "info");
        channel.queueBind(queue, "log_direct", "warning");
        channel.queueBind(queue, "log_direct", "error");

        //读取队列的消息

        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1:" + new String(body));
                //手动确认消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });


//消费者2:
Connection connetcionStatic = MQconnectFactory.getConnetcionStatic();
        Channel channel = connetcionStatic.createChannel();

        //通道生命交换机和类型
        channel.exchangeDeclare("log_direct", "direct");

        //生成临时队列
        String queue = channel.queueDeclare().getQueue();

        //绑定交换机和队列
        channel.queueBind(queue, "log_direct", "error");

        //读取消息

        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumer1:" + new String(body));
                //手动确认消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

 

<五>. routing模型(订阅动态路由topic模型)

简介:topic模式也叫做动态订阅路由模式,他基本与direct模式相似,只不过在它的基础上,可以让队列在绑定交换机的时候支持通配符的方式绑定。这种routingkey一般是由一个或者多个单词组成,一般为order.*,意思为order.*所有符合该模型的路由都会被该队列监听。

路由匹配规则:

mq.*: 意思是mq.xxx的都会被匹配

mq.#:意思是mq.xxx、mq.xxx.xxx.xxx 等有mq.后面一个或者多个单词组成的包括(mq)都会被匹配

1. 生产者:

//获取连接对象和通道
        Connection connection = MQconnectFactory.getConnetcionStatic();
        Channel channel = connection.createChannel();

        //通道生命交换机以及类型
        channel.exchangeDeclare("topic_test", "topic");

        //发布消息,使用topic的动态路由routingKey
        channel.basicPublish("topic_test", "log.info", null, "this is a info log".getBytes());

        channel.close();
        connection.close();

2. 消费者

//获取连接对象和通道
        Connection connection = MQconnectFactory.getConnetcionStatic();
        Channel channel = connection.createChannel();

        //通道声明交换机
        channel.exchangeDeclare("topic_test", "topic");

        //创建临时队列
        String queue = channel.queueDeclare().getQueue();

        //队列绑定交换机
        channel.queueBind(queue, "topic_test", "log.#");

        //获取消息
        channel.basicConsume(queue, false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("log.*获取消息:" + new String(body));
                //手动确认消息
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

 

<六>. Work模型(工作队列work Queue)

<七>. Work模型(工作队列work Queue)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐