ActiveMQ认识和深入(一),下载安装,编写测试用例
ActiveMQ认识和深入(一)ActiveMQ 概述Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行什么消息中间件消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统 。消息中间件利用高效可靠的消息传递机制
ActiveMQ认识和深入(一),下载安装,编写测试用例
ActiveMQ 概述
Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行
什么消息中间件
消息中间件是基于队列与消息传递技术,在网络环境中为应用系统提供同步或异步、可靠的消息传输的支撑性软件系统 。
消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
为什么要系统里面要引入消息中间件
系统之间直接调用可能会存在以下问题:
- 系统之间的接口耦合比较严重,面对大流量并发时,容易被冲垮。
- 面对大流量并发时,容易被冲垮。
- 等待同步存在性能问题。
消息中间件就是来解决这些问题来达到目的:解耦,削峰,异步。
- 系统解耦,当新的模块进来,可以做到代码改动最小;
- 设置流量缓冲池,可以让后端系统按照自身吞吐量进行消费,不被冲垮;
- 强弱依赖梳理能将非关键调用链路的操作异步化并提升整体系统的吞吐能力;
大致过程就是这样:
发送者把消息发送给消息服务器,消息服务器将消息存放在若干 队列/主题 中,在合适的时候,消息服务器会将消息转发给接受者。
在这个过程中,发送和接受都是异步的,也就是发送无需等待,而且发送者和接受者的生命周期也没有必然关系。
尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接受者。
比较 Kafka,RabbitMQ,RocketMQ,ActiveMQ区别
- Kafaka:底层是编程语言是:Java/Scala;
- RabbitMQ:底层编程语言是
erlang
; - RocketMQ:底层编程语言是
Java
,前身是阿里巴巴自己研发的Notify
和Kafaka 相结合的产物,阿里巴巴发现Notity
满足不了业务的所有场景之后,结合Kafka的优点而诞生的。 - ActiveMQ:底层编程语言是
Java
技术选型: 小的企业机构,系统平台编程语言是Java,ActiveMQ足够,如果量上去可以选用 RocketMQ
总结上述:
- 实现高可用,高性能,可伸缩,易用,和安全;
- 异步消息的消费和处理,控制消息的消费顺序;
- 可以和 spring/springboot整合编码;
- 配置集群容错的MQ集群;
下载和安装
官网地址:(https://activemq.apache.org/)
下载地址:(https://activemq.apache.org/components/classic/download/)
windos下载解压安装
-
启动
找到路径:D:\java\apache-activemq-5.16.2\bin\win64 双击运行
-
访问控制台
登录密码: admin/admin
linux 下载安装
- 上传文件到服务器,并解压
root@moling:/usr/local# ls
aegis bin data etc games kafka logs monodb nginx sbin src
apache-activemq-5.16.2-bin.tar.gz cloudmonitor docker.txt evloution include lib man nacos redis share
root@moling:/usr/local# tar -zxvf apache-activemq-5.16.2-bin.tar.gz
root@moling:/usr/local# ls
aegis bin data etc games kafka logs monodb nginx sbin src
apache-activemq-5.16.2 cloudmonitor docker.txt evloution include lib man nacos redis share
#启动服务 ./activemq start
root@moling:/usr/local/apache-activemq-5.16.2/bin# ./activemq start
INFO: Loading '/usr/local/apache-activemq-5.16.2//bin/env'
INFO: Using java '/usr/local/lib/jdk/bin/java'
INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
INFO: pidfile created : '/usr/local/apache-activemq-5.16.2//data/activemq.pid' (pid '1113856')
#查看启动状态 ./activemq status
root@moling:/usr/local/apache-activemq-5.16.2/bin# ./activemq status
INFO: Loading '/usr/local/apache-activemq-5.16.2//bin/env'
INFO: Using java '/usr/local/lib/jdk/bin/java'
ActiveMQ is running (pid '1113856')
#停止服务
bin/activemq stop
#带运行日志启动
./activemq start >/myactiveMQ/run_activemq.log
#建立软连接
ln -s /root/apache-activemq-5.16.2/bin/activemq /etc/init.d
#设置开启启动
chkconfig activemq on
#通过系统指令
service activemq start
service activemq status
service activemq stop
- 访问控制台
登录密码: admin/admin
文件夹介绍
bin
文件夹 ,是启动停止等命令文件夹;conf
文件夹, 配置文件夹,activemq.xml 配置端口连接等参数;data
文件夹,数据存放文件夹;doc
文件夹 ,用户帮助文档文件;examples
文件夹 ,示例文件;lib
文件夹 , 放入各种依赖jar
包,假如需要将mq消息保存进入mysql数据库,这里需要将mysql 的驱动jar包放入该文件夹;webapp,webapps-demo
文件夹 ,activemq
,控制台运行程序;
JAVA编码ActiveMQ通信
编码流程图
目的地Destination (队列/主题)两大特性:
- 队列模式:在点对点的消息传递域中,目的地被称为队列(
queue
);
- 生产者将消息发布到
queue
中,每个消息只有一个消费者。属于 1:1的关系; - 当生产消息,后启动 1号消费者,再启动2号消费者,此时,1号消费者已经将队列的消息消费完,2号消费者是不会消费到消息的;
- 先启动2个消费者,再生成6条消息,两个消费者会同时得到3条消息,默认机制是轮询,你一条我一条;
- 主题模式:在发布订阅消息传递域中,目的地被称为主题(
topoic
);
- 生产者将消息发布到
topic
中,每个消息可以有多个消费者。属于 1:N 的关系; - 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息;
- 生产者生产时,
topic
不保存消息它无状态的不落地,假如无人订阅就去生产,那就是一条废消息,所以,一般先启动消费者再启动生产者; - JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,好比我们的微信公众号订阅;
创建工程,引入依赖
<!--activeMQ 依赖-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.16.2</version>
</dependency>
测试编写用例消费生产者代码(队列)
package com.kelecc.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 功能描述: 消费生产者
*
* @author: keLeCc
*/
public class JmsProduceQueue {
private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//创建连接工厂,安装给定的url地址,采用默认用户名密码;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//通过连接工厂,获得连接并启动访问;
Connection factoryConnection = factory.createConnection ();
factoryConnection.start();
//创建会话, 事务不开启,自动签收
Session session = factoryConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地 队列(queue)
Queue queue = session.createQueue(QUEUE_NAME);
//创建消息的生成者
MessageProducer messageProducer = session.createProducer(queue);
//使用消息 messageProducer 生产3条消息发送到MQ 的队列里面
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("msg---" + i);
messageProducer.send(textMessage);
}
//关闭资源
messageProducer.close();
session.close();
factoryConnection.close();
System.out.println("=====消息发布完成=====");
}
}
查看控制台
Number Of Pending Messages | 等待消费的消息 | 这个是当前未出队列的数量。公式 = 总接收数 - 总出列树 |
---|---|---|
Number Of Consumers | 消费者数量 | 消费者端的消费者数量 |
Messages Enqueued | 进队消息数 | 进入队列的总数量,包括出列的。这个数量只增不减 |
Messages Dequeued | 出队消息数 | 可以理解为是消费者消费掉的数量 |
总结:
- 当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1;
- 当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1;
- 再来一条消息时,等待消费的消息是1,进入队列的消息就是2;
测试编写用例消息消费者代码(队列)
- 调用 receive() 方法测试用例
package com.kelecc.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 功能描述:
*
* @author: keLeCc
*/
public class JmsConSumerQueue {
private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
private static final String QUEUE_NAME = "queue01";
public static void main(String[] args) throws JMSException {
//创建连接工厂,安装给定的url地址,采用默认用户名密码;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//通过连接工厂,获得连接并启动访问;
Connection factoryConnection = factory.createConnection ();
factoryConnection.start();
//创建会话, 事务不开启,自动签收
Session session = factoryConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地 队列(queue)
Queue queue = session.createQueue(QUEUE_NAME);
//创建消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
/*同步阻塞方式receive(),订阅者或接收者调用 MessageConsumer 的 receive()方法来接收消息;
* receive()方法在能够接收到消息之前(或超时之前)将一直阻塞;
*/
while (true){
//如何 调用 receive() 参数为空, 它会一直等待
TextMessage textMessage = (TextMessage) messageConsumer.receive();
//如何 调用 时间超过5分钟 结束, 参数值为毫秒
//TextMessage textMessage = (TextMessage) messageConsumer.receive(300000L);
if(null != textMessage){
System.out.println("=====消费者接收到消息====="+textMessage.getText());
}else{
break;
}
}
//关闭资源
messageConsumer.close();
session.close();
factoryConnection.close();
}
}
-
调用监听器接收消息
package com.kelecc.activemq.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; import java.io.IOException; /** * 功能描述: * * @author: keLeCc */ public class JmsConSumerListenerQueue { private static final String ACTIVEMQ_URL = "tcp://localhost:61616"; private static final String QUEUE_NAME = "queue01"; public static void main(String[] args) throws JMSException, IOException { //创建连接工厂,安装给定的url地址,采用默认用户名密码; ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL); //通过连接工厂,获得连接并启动访问; Connection factoryConnection = factory.createConnection (); factoryConnection.start(); //创建会话, 事务不开启,自动签收 Session session = factoryConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); //创建目的地 队列(queue) Queue queue = session.createQueue(QUEUE_NAME); //创建消费者 MessageConsumer messageConsumer = session.createConsumer(queue); //通过监听的方式来接收消息,异步非阻塞方式 messageConsumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { if(null != message && message instanceof Message){ try { TextMessage textMessage = (TextMessage) message; System.out.println("=====消费者接收到消息====="+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }); //保持控制台,因为监听需要时间,所以这里不写,会存在消息没有消费到 System.in.read(); messageConsumer.close(); session.close(); factoryConnection.close(); } }
消息的生成和消费有一定的顺序讲究;
- 当生产消息,后启动 1号消费者,再启动2号消费者,此时,1号消费者已经将队列的消息消费完,2号消费者是不会消费到消息的;
- 先启动2个消费者,再生成6条消息,两个消费者会同时得到3条消息,默认机制是轮询,你一条我一条;
查看控制台
等待消费的消息 0 条,消费者1个(是因为我没有关), 出队的消息有3个。
测试编写用例消费生产者代码(主题)
package com.kelecc.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* 功能描述:生产者主题
*
* @author: keLeCc
*/
public class JmsProduceTopic {
private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
private static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = (Topic)session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i <= 3; i++) {
TextMessage textMessage = session.createTextMessage("msg---" + i);
messageProducer.send(textMessage);
}
messageProducer.close();
session.close();
connection.close();
}
}
查看控制台
先启动消费者,再启动生产者,多个消费者得到的消息是一样的
测试编写用例消息消费者代码(主题)
package com.kelecc.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
/**
* 功能描述:监听器消费者主题
*
* @author: keLeCc
*/
public class JmsConSumerListenerTopic {
private static final String ACTIVEMQ_URL = "tcp://localhost:61616";
private static final String TOPIC_NAME= "topic01";
public static void main(String[] args) throws JMSException, IOException {
//创建连接工厂,安装给定的url地址,采用默认用户名密码;
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
//通过连接工厂,获得连接并启动访问;
Connection factoryConnection = factory.createConnection ();
factoryConnection.start();
//创建会话, 事务不开启,自动签收
Session session = factoryConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建目的地 主题(topic )
Topic topic = (Topic) session.createTopic(TOPIC_NAME);
//创建消费者
MessageConsumer messageConsumer = session.createConsumer(topic);
//通过监听的方式来接收消息,异步非阻塞方式
messageConsumer.setMessageListener((message)-> {
if (null != message && message instanceof Message) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("=====消费者接收到消息=====" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
保持控制台,因为监听需要时间,所以这里不写,会存在消息没有消费到
System.in.read();
messageConsumer.close();
session.close();
factoryConnection.close();
}
}
查看控制台
先启动消费者,再启动生产者,多个消费者得到的消息是一样的
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)