ActiveMQ认识和深入(一),下载安装,编写测试用例


ActiveMQ 概述

Apache ActiveMQApache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行

什么消息中间件


消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统 。

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

为什么要系统里面要引入消息中间件


系统之间直接调用可能会存在以下问题:

  1. 系统之间的接口耦合比较严重,面对大流量并发时,容易被冲垮。
  2. 面对大流量并发时,容易被冲垮。
  3. 等待同步存在性能问题。

在这里插入图片描述

消息中间件就是来解决这些问题来达到目的:解耦削峰异步

  1. 系统解耦,当新的模块进来,可以做到代码改动最小;
  2. 设置流量缓冲池,可以让后端系统按照自身吞吐量进行消费,不被冲垮;
  3. 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力;

大致过程就是这样:

​ 发送者把消息发送给消息服务器,消息服务器将消息存放在若干 队列/主题 中,在合适的时候,消息服务器会将消息转发给接受者。

在这个过程中,发送和接受都是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然关系。

尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。

在这里插入图片描述

比较 Kafka,RabbitMQ,RocketMQ,ActiveMQ区别


  1. Kafaka:底层是编程语言是:Java/Scala;
  2. RabbitMQ:底层编程语言是 erlang
  3. RocketMQ:底层编程语言是Java,前身是阿里巴巴自己研发的Notify和Kafaka 相结合的产物,阿里巴巴发现Notity满足不了业务的所有场景之后,结合Kafka的优点而诞生的。
  4. ActiveMQ:底层编程语言是Java

在这里插入图片描述
在这里插入图片描述

技术选型: 小的企业机构,系统平台编程语言是Java,ActiveMQ足够,如果量上去可以选用 RocketMQ

总结上述:

  1. 实现高可用,高性能,可伸缩,易用,和安全;
  2. 异步消息的消费和处理,控制消息的消费顺序;
  3. 可以和 spring/springboot整合编码;
  4. 配置集群容错的MQ集群;

下载和安装


官网地址:(https://activemq.apache.org/)

下载地址:(https://activemq.apache.org/components/classic/download/)

在这里插入图片描述

windos下载解压安装


在这里插入图片描述

  • 启动

    找到路径:D:\java\apache-activemq-5.16.2\bin\win64 双击运行

在这里插入图片描述

  • 访问控制台

    登录密码: admin/admin
    在这里插入图片描述

linux 下载安装


  • 上传文件到服务器,并解压
 root@moling:/usr/local# ls
 aegis                              bin           data        etc        games    kafka  logs  monodb  nginx  sbin   src
 apache-activemq-5.16.2-bin.tar.gz  cloudmonitor  docker.txt  evloution  include  lib    man   nacos   redis  share
 root@moling:/usr/local# tar -zxvf apache-activemq-5.16.2-bin.tar.gz 
 root@moling:/usr/local# ls
 aegis                   bin           data        etc        games    kafka  logs  monodb  nginx  sbin   src
 apache-activemq-5.16.2  cloudmonitor  docker.txt  evloution  include  lib    man   nacos   redis  share
 #启动服务 ./activemq start
 root@moling:/usr/local/apache-activemq-5.16.2/bin# ./activemq start
 INFO: Loading '/usr/local/apache-activemq-5.16.2//bin/env'
 INFO: Using java '/usr/local/lib/jdk/bin/java'
 INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
 INFO: pidfile created : '/usr/local/apache-activemq-5.16.2//data/activemq.pid' (pid '1113856')
 
 
 #查看启动状态 ./activemq status
 root@moling:/usr/local/apache-activemq-5.16.2/bin# ./activemq status
 INFO: Loading '/usr/local/apache-activemq-5.16.2//bin/env'
 INFO: Using java '/usr/local/lib/jdk/bin/java'
 ActiveMQ is running (pid '1113856')
 
 
 #停止服务
 bin/activemq stop
 
#带运行日志启动
./activemq start >/myactiveMQ/run_activemq.log
  #建立软连接
  ln -s /root/apache-activemq-5.16.2/bin/activemq /etc/init.d
   
  #设置开启启动
  chkconfig activemq on
   
  #通过系统指令
  service activemq start
  service activemq status
  service activemq stop
  
  • 访问控制台

登录密码: admin/admin

在这里插入图片描述

文件夹介绍


  1. bin 文件夹 ,是启动停止等命令文件夹;
  2. conf 文件夹, 配置文件夹,activemq.xml 配置端口连接等参数;
  3. data文件夹,数据存放文件夹;
  4. doc文件夹 ,用户帮助文档文件;
  5. examples文件夹 ,示例文件;
  6. lib文件夹 , 放入各种依赖jar包,假如需要将mq消息保存进入mysql数据库,这里需要将mysql 的驱动jar包放入该文件夹;
  7. webapp,webapps-demo文件夹 ,activemq,控制台运行程序;

JAVA编码ActiveMQ通信


编码流程图


在这里插入图片描述

目的地Destination (队列/主题)两大特性:

  • 队列模式:在点对点的消息传递域中,目的地被称为队列(queue);

在这里插入图片描述

  1. 生产者将消息发布到 queue 中,每个消息只有一个消费者。属于 1:1的关系;
  2. 当生产消息,后启动 1号消费者,再启动2号消费者,此时,1号消费者已经将队列的消息消费完,2号消费者是不会消费到消息的;
  3. 先启动2个消费者,再生成6条消息,两个消费者会同时得到3条消息,默认机制是轮询,你一条我一条;
  • 主题模式:在发布订阅消息传递域中,目的地被称为主题(topoic);

在这里插入图片描述

  1. 生产者将消息发布到 topic 中,每个消息可以有多个消费者。属于 1:N 的关系;
  2. 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息;
  3. 生产者生产时,topic不保存消息它无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者;
  4. JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅;

在这里插入图片描述
在这里插入图片描述

创建工程,引入依赖


在这里插入图片描述

        <!--activeMQ 依赖-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.16.2</version>
        </dependency>

测试编写用例消费生产者代码(队列)


package com.kelecc.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 功能描述: 消费生产者
 *
 * @author: keLeCc
 */
public class JmsProduceQueue {

    private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        //创建连接工厂,安装给定的url地址,采用默认用户名密码;
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //通过连接工厂,获得连接并启动访问;
        Connection factoryConnection = factory.createConnection ();
        factoryConnection.start();

        //创建会话, 事务不开启,自动签收
        Session session = factoryConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地 队列(queue)
        Queue queue = session.createQueue(QUEUE_NAME);
        //创建消息的生成者
        MessageProducer messageProducer = session.createProducer(queue);
        //使用消息 messageProducer 生产3条消息发送到MQ 的队列里面
        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg---" + i);
            messageProducer.send(textMessage);
        }
        //关闭资源
        messageProducer.close();
        session.close();
        factoryConnection.close();
        System.out.println("=====消息发布完成=====");
    }
}

在这里插入图片描述

查看控制台


在这里插入图片描述

Number Of Pending Messages等待消费的消息这个是当前未出队列的数量。公式 = 总接收数 - 总出列树
Number Of Consumers消费者数量消费者端的消费者数量
Messages Enqueued进队消息数进入队列的总数量,包括出列的。这个数量只增不减
Messages Dequeued出队消息数可以理解为是消费者消费掉的数量

总结:

  1. 当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1;
  2. 当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1;
  3. 再来一条消息时,等待消费的消息是1,进入队列的消息就是2;

测试编写用例消息消费者代码(队列)


  • 调用 receive() 方法测试用例
package com.kelecc.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 功能描述:
 *
 * @author: keLeCc
 */
public class JmsConSumerQueue {

    private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
    private static final String QUEUE_NAME = "queue01";

    public static void main(String[] args) throws JMSException {
        //创建连接工厂,安装给定的url地址,采用默认用户名密码;
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //通过连接工厂,获得连接并启动访问;
        Connection factoryConnection = factory.createConnection ();
        factoryConnection.start();

        //创建会话, 事务不开启,自动签收
        Session session = factoryConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地 队列(queue)
        Queue queue = session.createQueue(QUEUE_NAME);
        //创建消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);

        /*同步阻塞方式receive(),订阅者或接收者调用 MessageConsumer 的 receive()方法来接收消息;
        * receive()方法在能够接收到消息之前(或超时之前)将一直阻塞;
         */
        while (true){
            //如何 调用 receive() 参数为空, 它会一直等待
            TextMessage textMessage = (TextMessage) messageConsumer.receive();
            //如何 调用 时间超过5分钟 结束, 参数值为毫秒
            //TextMessage textMessage = (TextMessage) messageConsumer.receive(300000L);
            if(null != textMessage){
                System.out.println("=====消费者接收到消息====="+textMessage.getText());
            }else{
                break;
            }
        }
        //关闭资源
        messageConsumer.close();
        session.close();
        factoryConnection.close();


    }
}

在这里插入图片描述

  • 调用监听器接收消息

    package com.kelecc.activemq.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.io.IOException;
    
    /**
     * 功能描述:
     *
     * @author: keLeCc
     */
    public class JmsConSumerListenerQueue {
        private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
        private static final String QUEUE_NAME = "queue01";
    
        public static void main(String[] args) throws JMSException, IOException {
            //创建连接工厂,安装给定的url地址,采用默认用户名密码;
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            //通过连接工厂,获得连接并启动访问;
            Connection factoryConnection = factory.createConnection ();
            factoryConnection.start();
    
            //创建会话, 事务不开启,自动签收
            Session session = factoryConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            //创建目的地 队列(queue)
            Queue queue = session.createQueue(QUEUE_NAME);
            //创建消费者
            MessageConsumer messageConsumer = session.createConsumer(queue);
    
            //通过监听的方式来接收消息,异步非阻塞方式
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(null != message && message instanceof  Message){
                        try {
                            TextMessage textMessage = (TextMessage) message;
                            System.out.println("=====消费者接收到消息====="+textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //保持控制台,因为监听需要时间,所以这里不写,会存在消息没有消费到
            System.in.read();
            messageConsumer.close();
            session.close();
            factoryConnection.close();
        }
    }
    
    

消息的生成和消费有一定的顺序讲究;

  1. 当生产消息,后启动 1号消费者,再启动2号消费者,此时,1号消费者已经将队列的消息消费完,2号消费者是不会消费到消息的;
  2. 先启动2个消费者,再生成6条消息,两个消费者会同时得到3条消息,默认机制是轮询,你一条我一条;

查看控制台


在这里插入图片描述

等待消费的消息 0 条,消费者1个(是因为我没有关), 出队的消息有3个。

测试编写用例消费生产者代码(主题)


package com.kelecc.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * 功能描述:生产者主题
 *
 * @author: keLeCc
 */
public class JmsProduceTopic {
    private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
    private static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = factory.createConnection();
        connection.start();

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = (Topic)session.createTopic(TOPIC_NAME);

        MessageProducer messageProducer = session.createProducer(topic);

        for (int i = 1; i <= 3; i++) {
            TextMessage textMessage = session.createTextMessage("msg---" + i);
            messageProducer.send(textMessage);
        }

        messageProducer.close();
        session.close();
        connection.close();

    }
}

查看控制台


在这里插入图片描述
先启动消费者,再启动生产者,多个消费者得到的消息是一样的

测试编写用例消息消费者代码(主题)


package com.kelecc.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
import java.io.IOException;

/**
 * 功能描述:监听器消费者主题
 *
 * @author: keLeCc
 */
public class JmsConSumerListenerTopic {
    private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
    private static final String TOPIC_NAME= "topic01";

    public static void main(String[] args) throws JMSException, IOException {
        //创建连接工厂,安装给定的url地址,采用默认用户名密码;
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        //通过连接工厂,获得连接并启动访问;
        Connection factoryConnection = factory.createConnection ();
        factoryConnection.start();

        //创建会话, 事务不开启,自动签收
        Session session = factoryConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地 主题(topic )
       Topic topic = (Topic) session.createTopic(TOPIC_NAME);
        //创建消费者
        MessageConsumer messageConsumer = session.createConsumer(topic);

        //通过监听的方式来接收消息,异步非阻塞方式
        messageConsumer.setMessageListener((message)-> {
            if (null != message && message instanceof Message) {
                try {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("=====消费者接收到消息=====" + textMessage.getText());
                } catch (JMSException e) {
                        e.printStackTrace();
                }
            }
        });
        保持控制台,因为监听需要时间,所以这里不写,会存在消息没有消费到
        System.in.read();
        messageConsumer.close();
        session.close();
        factoryConnection.close();
    }

}

查看控制台


在这里插入图片描述

在这里插入图片描述

先启动消费者,再启动生产者,多个消费者得到的消息是一样的

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐