springboot进阶学习(十七)springboot集成ActiveMQ
springboot集成ActiveMQActiveMQ简介ActiveMQ是Apache提供的一个开源的消息系统,完全采用Java来实现,因此它能很好地支持JMS(Java Message Service,即Java消息服务)。SpringBoot提供了对JMS的支持,并且对主流的消息中间件如 RabbitMQ、Apache Kafka、Apache ActiveMQ 等都提供了集成。特点:支持
springboot集成ActiveMQ
ActiveMQ简介
ActiveMQ是Apache提供的一个开源的消息系统,完全采用Java来实现,因此它能很好地支持JMS(Java Message Service,即Java消息服务)。
SpringBoot提供了对JMS的支持,并且对主流的消息中间件如 RabbitMQ、Apache Kafka、Apache ActiveMQ 等都提供了集成。
特点:
- 支持Java消息服务(JMS) 1.1 版本
- Spring Framework
- 集群 (Clustering)
- 支持的编程语言包括:C、C++、C#、Delphi、Erlang、Adobe Flash、Haskell、Java、JavaScript、Perl、PHP、Pike、Python和Ruby
- 协议支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP
消息形式:
- 点对点(queue) :一次生产,只能有一个消费者消费
- 一对多(topic) :一次生产,可以有多个消费者消费
springboot集成ActiveMQ
- 添加依赖
<!-- spring-boot-starter-activemq 不用写版本号,会根据springboot的版本号自己选择-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
- 配置
application.yml
server:
port: 8088 # 配置端口
servlet:
context-path: /moyundong # 配置项目名称
# 配置queue和topic
myTopicTest: springboot2-test17-topicTest01
myQueueTest: springboot2-test17-queueTest01
配置文件里面配置了一个myTopicTest和一个myQueueTest
::: warning 注意
这里我们并没有配置ActiveMQ的相关配置信息,比如url,用户名和密码等
:::
- 创ActiveMQ建配置类
@Configuration
@EnableJms //自动配置jms
public class ActiveMQConfig {
@Value("${myTopicTest}")
public String topicName;
@Value("${myQueueTest}")
public String queueName;
/**
* 创建一个queue消息模型的bean
* @return
*/
@Bean
public Queue queue() {
return new ActiveMQQueue(queueName);
}
/**
* topic模式的ListenerContainer,springboot默认只配置queue类型消息,如果要使用topic类型的消息,则需要配置该bean
* @param connectionFactory
* @return
*/
@Bean
public JmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
//这里必须设置为true,false则表示是queue类型
factory.setPubSubDomain(true);
return factory;
}
/**
* 创建一个topic消息模型的bean
* @return
*/
@Bean
public Topic topic() {
return new ActiveMQTopic(topicName) ;
}
}
配置类里面主要创建了3个bean,JmsListenerContainerFactory是为了开启topic,queue不用单独开启。然后再根据topicName和queueName创建了两个消息bean
- 创建消费者类1
@Component
public class Consumer01 {
/**
* 接收queue类型消息
* destination对应配置类中ActiveMQQueue("springboot2-test17-queueTest01")设置的名字
* queue类型不用加containerFactory
* @param msg
*/
@JmsListener(destination="springboot2-test17-queueTest01")
public void ListenQueue(String msg){
System.out.println("Consumer01接收到queue消息:" + msg);
}
/**
* 接收topic类型消息
* destination对应配置类中ActiveMQTopic("springboot2-test17-topicTest01")设置的名字
* containerFactory对应配置类中注册JmsListenerContainerFactory的bean名称
* @param msg
*/
@JmsListener(destination="springboot2-test17-topicTest01", containerFactory = "jmsTopicListenerContainerFactory")
public void ListenTopic(String msg){
System.out.println("Consumer01接收到topic消息:" + msg);
}
}
- 创建消费者类2
@Component
public class Consumer02 {
/**
* 接收queue类型消息
* destination对应配置类中ActiveMQQueue("springboot2-test17-queueTest01")设置的名字
* queue类型不用加containerFactory
* @param msg
*/
@JmsListener(destination="springboot2-test17-queueTest01")
public void ListenQueue(String msg){
System.out.println("Consumer02接收到queue消息:" + msg);
}
/**
* 接收topic类型消息
* destination对应配置类中ActiveMQTopic("springboot2-test17-topicTest01")设置的名字
* containerFactory对应配置类中注册JmsListenerContainerFactory的bean名称
* @param msg
*/
@JmsListener(destination="springboot2-test17-topicTest01", containerFactory = "jmsTopicListenerContainerFactory")
public void ListenTopic(String msg){
System.out.println("Consumer02接收到topic消息:" + msg);
}
}
- 创建测试类,也是生产者
@RestController
@RequestMapping("test")
public class ActiveMQTestController {
@Autowired
private JmsMessagingTemplate jmsTemplate;
/**
* 这个bean是在ActiveMQConfig配置文件中定义的
*/
@Autowired
private Queue queue;
/**
* 这个bean是在ActiveMQConfig配置文件中定义的
*/
@Autowired
private Topic topic;
/**
* 发送queue类型消息
* @param msg
*/
@GetMapping("/queue")
public void sendQueueMsg(String msg){
//生产一个queue类型的消息
jmsTemplate.convertAndSend(queue, msg);
}
/**
* 发送topic类型消息
* @param msg
*/
@GetMapping("/topic")
public void sendTopicMsg(String msg){
//生产一个topic类型的消息
jmsTemplate.convertAndSend(topic, msg);
}
}
- 创建定时任务,也是生产者
@Component
@Slf4j
public class ScheduleTaskTest {
@Autowired
private JmsMessagingTemplate jmsTemplate;
/**
* 这个bean是在ActiveMQConfig配置文件中定义的
*/
@Autowired
private Queue queue;
/**
* 这个bean是在ActiveMQConfig配置文件中定义的
*/
@Autowired
private Topic topic;
/**
* 5秒钟生产一次
*/
@Scheduled(cron="0/5 * * * * ?")
private void task2(){
log.info("生产一次消息");
jmsTemplate.convertAndSend(queue,"hello queue!");
jmsTemplate.convertAndSend(topic, "hello topic!");
}
}
这个定时任务也是一个生产者,每隔5秒生产一个queue和topic类型的消息
- 启动测试,
启动成功后,我们会在控制台看到下面的信息,证明springboot默认开启了ActiveMQ的服务器,springboot2.1版本对应的ActiveMQ是5.15.7
2020-07-06 12:08:06.817 INFO 17436 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.15.7 (localhost, ID:LAPTOP-2G32U4LI-566-159439-0:1) is starting
2020-07-06 12:08:06.819 INFO 17436 --- [ main] o.apache.activemq.broker.BrokerService : Apache ActiveMQ 5.15.7 (localhost, ID:LAPTOP-2G32U4LI-566-15940639-0:1) started
系统启动成功并且运行一段时间后,我们看到控制台打印如下信息,证明topic类型的消息,生产一次只要订阅的消费者都可以进行消费,但是queue类型的消息生产一次,只能是一个消费者进行消费
2020-07-06 12:08:10.002 INFO 17436 --- [ scheduling-1] com.moyundong.task.ScheduleTaskTest : 生产一次消息
Consumer02接收到topic消息:hello topic!
Consumer02接收到queue消息:hello queue!
Consumer01接收到topic消息:hello topic!
2020-07-06 12:08:15.002 INFO 17436 --- [ scheduling-1] com.moyundong.task.ScheduleTaskTest : 生产一次消息
Consumer01接收到queue消息:hello queue!
Consumer02接收到topic消息:hello topic!
Consumer01接收到topic消息:hello topic!
2020-07-06 12:08:20.002 INFO 17436 --- [ scheduling-1] com.moyundong.task.ScheduleTaskTest : 生产一次消息
Consumer02接收到queue消息:hello queue!
Consumer01接收到topic消息:hello topic!
Consumer02接收到topic消息:hello topic!
2020-07-06 12:08:25.001 INFO 17436 --- [ scheduling-1] com.moyundong.task.ScheduleTaskTest : 生产一次消息
Consumer01接收到queue消息:hello queue!
Consumer01接收到topic消息:hello topic!
Consumer02接收到topic消息:hello topic!
还可以通过http://localhost:8088/moyundong/test/topic?msg=hello topic
与http://localhost:8088/moyundong/test/queue?msg=hello queue
分别进行topic和queue类型的测试
自定义ActiveMQ服务器
上面我们使用的是springboot默认自带的ActiveMQ服务器,但是实际应用中,我们大部分都是使用独立的ActiveMQ服务器。
下载ActiveMQ
官网下载地址:ActiveMQ下载
下载界面如下:
安装
我下载的是windows版本,直接解压就可以了,不用安装
启动
在解压后的文件里找到apache-activemq-5.15.13\bin\win64
目录下的activemq.bat文件,双击即可【jdk安装自己解决】
在浏览器http://localhost:8161就可以登录了,默认用户名密码是admin和admin
修改application.yml
并测试
server:
port: 8088 # 配置端口
servlet:
context-path: /moyundong # 配置项目名称
spring:
activemq:
#ActiveMQ通讯地址
broker-url: tcp://localhost:61616
#用户名
user: admin
#密码
password: admin
packages:
#信任所有的包
trust-all: true
pool:
#是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
enabled: false
# 配置queue和topic
myTopicTest: springboot2-test17-topicTest01
myQueueTest: springboot2-test17-queueTest01
我们再配置文件里面新增了ActiveMQ的配置信息,这时候再启动服务,就看不到控制台启动ActiveMQ的5.15.7版本了,这时候使用的是我们下载并且启动的5.15.13版本了。
这时候我们可以通过ActiveMQ的管理器看到queue和topic底下有我们的消息了。
queue
topic
本节示例下载地址:java相关demo下载列表
1介绍
2springboot定时任务
3springboot定时任务配置详解
4springboot动态定时任务
5springboot集成websocket
6springboot多数据源
7springboot配置druid监听
8springboot自定义注解
9springboot常见注解详解
10springboot接收参数详解
11springboot验证机制@Valid和@Validated
12springboot集成Swagger2
13springboot集成swagger-bootstrap-ui
14springboot集成shiro
15springboot集成shiro(二)
16springboot集成jwt
17springboot集成ActiveMQ
18springboot缓存机制
🍉🍉🍉 欢迎大家来博客了解更多内容:java乐园 🍉🍉🍉
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)