SpringBoot整合ActiveMQ
SpringBoot整合ActiveMQActiveMQ的基本使用;ActiveMQ的生产者和消费者配置。
一、项目配置版本
- SpringBoot版本:3.3.0
- JDK版本:21
二、创建项目
三、依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
四、yml配置
配置代码
spring:
active-mq:
broker-url: tcp://localhost:61616
user: admin
password: admin
# jms:
# # 默认目标类型是否为主题:true 表示默认目标类型是主题(使用发布/订阅模式),false 表示默认目标类型是队列(使用点对点模式)
# pub-sub-domain: true
默认目标类型是否为主题
可以通过 pub-sub-domain
,配置默认情况下目标类型;如果没有配置,默认使用队列模式发送和接收消息。
队列模式和主题模式的区别:
- 队列(使用点对点模式),一个消息,只会被一个消费者消费;
- 主题(使用发布/订阅模式),一个消息,可以被订阅该主题的多个消费者消费;
ActiveMQ支持使用Java代码,为每个生产者和消费者单独配置目标类型;
五、目标(Destination)常量
目标(Destination)常量,定义队列或主题的名字。
package com.example.activemq.constant;
public class QueueDestination {
public static final String TEST_QUEUE_1 = "Test.Queue1";
public static final String TEST_QUEUE_2 = "Test.Queue2";
}
package com.example.activemq.constant;
public class TopicDestination {
public static final String TEST_TOPIC_1 = "Test.Topic1";
public static final String TEST_TOPIC_2 = "Test.Topic2";
}
六、队列(Queue):默认目标类型
队列(Queue)生产者
package com.example.activemq.producer;
import com.example.activemq.constant.QueueDestination;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
/**
* 队列消息生产者
* <p>
* 用于发送消息到指定的JMS队列
*/
@Component
public class QueueProducer {
/**
* JmsMessagingTemplate 用于消息的发送,它封装了消息转换和发送的逻辑
*/
private final JmsMessagingTemplate jmsMessagingTemplate;
public QueueProducer(JmsMessagingTemplate jmsMessagingTemplate) {
this.jmsMessagingTemplate = jmsMessagingTemplate;
}
/**
* 发送消息到指定的队列
*
* @param message 要发送的消息内容
*/
public void send(String message) {
// 调用 JmsMessagingTemplate 的 convertAndSend 方法,将消息发送到指定队列。
jmsMessagingTemplate.convertAndSend(QueueDestination.TEST_QUEUE_1, message);
}
}
生产者发送消息
package com.example.activemq.producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 队列消息生产者测试
*/
@SpringBootTest
class QueueProducerTest {
@Autowired
private QueueProducer producer;
@Test
void testSendToQueue() {
producer.send("Hello ActiveMQ, queue message");
}
}
队列(Queue)消费者
package com.example.activemq.consumer;
import com.example.activemq.constant.QueueDestination;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class QueueConsumer {
@JmsListener(destination = QueueDestination.TEST_QUEUE_1)
private void receive(String message) {
log.info("队列消费者,接收消息: {}", message);
}
}
消息日志
2024-06-05T22:21:24.834+08:00 INFO 15640 — [active-mq-consumer] [ntContainer#0-1] c.e.activemq.consumer.QueueConsumer : 队列消费者,接收消息: Hello ActiveMQ, queue message
ActiveMQ控制台效果
七、主题(Topic):默认目标类型
配置默认目标类型
将上面配置模块中的 pub-sub-domain
设置为 true
,则默认目标类型使用发布/订阅
模式。
spring:
jms:
# 默认目标类型是否为主题:true 表示默认目标类型是主题(使用发布/订阅模式),false 表示默认目标类型是队列(使用点对点模式)
pub-sub-domain: true
主题(Topic)生产者
package com.example.activemq.producer;
import com.example.activemq.constant.TopicDestination;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
/**
* 主题消息生产者
* <p>
* 用于发送消息到指定的JMS主题
*/
@Component
public class TopicProducer {
/**
* JmsMessagingTemplate 用于消息的发送,它封装了消息转换和发送的逻辑
*/
private final JmsMessagingTemplate jmsMessagingTemplate;
public TopicProducer(JmsMessagingTemplate jmsMessagingTemplate) {
this.jmsMessagingTemplate = jmsMessagingTemplate;
}
/**
* 发送消息到指定的主题
*
* @param message 要发送的消息内容
*/
public void send(String message) {
// 调用 JmsMessagingTemplate 的 convertAndSend 方法,将消息发送到指定主题。
jmsMessagingTemplate.convertAndSend(TopicDestination.TEST_TOPIC_1, message);
}
}
生产者发送消息
package com.example.activemq.producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 主题消息生产者测试
*/
@SpringBootTest
class TopicProducerTest {
@Autowired
private TopicProducer producer;
@Test
void testSendToTopic() {
producer.send("Hello ActiveMQ, topic message");
}
}
主题(Topic)消费者
package com.example.activemq.consumer;
import com.example.activemq.constant.TopicDestination;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TopicConsumer {
@JmsListener(destination = TopicDestination.TEST_TOPIC_1)
private void receive(String message) {
log.info("主题消费者,接收消息: {}", message);
}
}
接收消息的日志
2024-06-05T23:51:46.026+08:00 INFO 8584 — [active-mq-consumer] [ntContainer#3-1] c.e.activemq.consumer.TopicConsumer : 主题消费者,接收消息: Hello ActiveMQ, topic message
ActiveMQ控制台效果
八、自定义配置队列和主题
SpringBoot 集成 ActiveMQ,不同的生产者
可以是不同的模式;比如一个生产者是发送队列消息,一个生产者发送主题消息。同样的,不同的消费者
也可以是不同的模式;比如一个消费者是接收队列消息,一个消费者接收主题消息。
此时,可以使用Java代码配置队列(默认模式)或主题;此时可以不在 yml 中配置 pub-sub-domain
。
生产者
配置
package com.example.activemq.config;
import com.example.activemq.constant.QueueDestination;
import com.example.activemq.constant.TopicDestination;
import jakarta.jms.Queue;
import jakarta.jms.Topic;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class JmsConfig {
@Bean
public Queue queue() {
return new ActiveMQQueue(QueueDestination.TEST_QUEUE_2);
}
@Bean
public Topic topic() {
return new ActiveMQTopic(TopicDestination.TEST_TOPIC_2);
}
}
生产者代码
package com.example.activemq.producer;
import jakarta.jms.Queue;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
/**
* 队列消息生产者
*/
@Component
public class QueueMessageProducer {
private final Queue queue;
private final JmsMessagingTemplate jmsMessagingTemplate;
public QueueMessageProducer(Queue queue, JmsMessagingTemplate jmsMessagingTemplate) {
this.queue = queue;
this.jmsMessagingTemplate = jmsMessagingTemplate;
}
public void send(String message) {
jmsMessagingTemplate.convertAndSend(queue, message);
}
}
package com.example.activemq.producer;
import jakarta.jms.Topic;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Component;
/**
* 主题消息生产者
*/
@Component
public class TopicMessageProducer {
private final Topic topic;
private final JmsMessagingTemplate jmsMessagingTemplate;
public TopicMessageProducer(Topic topic, JmsMessagingTemplate jmsMessagingTemplate) {
this.topic = topic;
this.jmsMessagingTemplate = jmsMessagingTemplate;
}
public void send(String message) {
jmsMessagingTemplate.convertAndSend(topic, message);
}
}
生产者发送消息
package com.example.activemq.producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 队列消息生产者测试
*/
@SpringBootTest
class QueueMessageProducerTest {
@Autowired
private QueueMessageProducer producer;
/**
* 发送多个队列消息
*/
@Test
void testSendToQueueMultiple() {
for (int i = 0; i < 10; i++) {
producer.send("Hello ActiveMQ, queue message " + i);
}
}
}
package com.example.activemq.producer;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
/**
* 主题消息生产者测试
*/
@SpringBootTest
class TopicMessageProducerTest {
@Autowired
private TopicMessageProducer producer;
/**
* 发送多个主题消息
*/
@Test
void testSendToTopicMultiple() {
for (int i = 0; i < 2; i++) {
producer.send("Hello ActiveMQ, topic message " + i);
}
}
}
消费者
配置
package com.example.activemq.config;
import jakarta.jms.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@Configuration
public class JmsConfig {
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(false); // 设置为队列模式(点对点模式)
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
bean.setPubSubDomain(true); // 设置为发布/订阅模式(主题模式)
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
消费者代码
package com.example.activemq.consumer;
import com.example.activemq.constant.QueueDestination;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class QueueMessageConsumer {
@SneakyThrows
@JmsListener(destination = QueueDestination.TEST_QUEUE_2, containerFactory = "jmsListenerContainerQueue")
public void receive1(String message) {
log.info("队列消费者-1,接收消息: {}", message);
Thread.sleep(1000);
}
@SneakyThrows
@JmsListener(destination = QueueDestination.TEST_QUEUE_2, containerFactory = "jmsListenerContainerQueue")
public void receive2(String message) {
log.info("队列消费者-2,接收消息: {}", message);
Thread.sleep(1000);
}
@SneakyThrows
@JmsListener(destination = QueueDestination.TEST_QUEUE_2, containerFactory = "jmsListenerContainerQueue")
public void receive3(String message) {
log.info("队列消费者-3,接收消息: {}", message);
Thread.sleep(1000);
}
}
package com.example.activemq.consumer;
import com.example.activemq.constant.TopicDestination;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class TopicMessageConsumer {
@SneakyThrows
@JmsListener(destination = TopicDestination.TEST_TOPIC_2, containerFactory = "jmsListenerContainerTopic")
public void receive1(String message) {
log.info("主题消费者-1,接收消息: {}", message);
Thread.sleep(1000);
}
@SneakyThrows
@JmsListener(destination = TopicDestination.TEST_TOPIC_2, containerFactory = "jmsListenerContainerTopic")
public void receive2(String message) {
log.info("主题消费者-2,接收消息: {}", message);
Thread.sleep(1000);
}
@SneakyThrows
@JmsListener(destination = TopicDestination.TEST_TOPIC_2, containerFactory = "jmsListenerContainerTopic")
public void receive3(String message) {
log.info("主题消费者-3,接收消息: {}", message);
Thread.sleep(1000);
}
}
接收结果
队列接收结果
2024-06-06T19:12:04.256+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 0
2024-06-06T19:12:04.259+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#3-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-1,接收消息: Hello ActiveMQ, queue message 1
2024-06-06T19:12:04.284+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#2-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-3,接收消息: Hello ActiveMQ, queue message 2
2024-06-06T19:12:05.278+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 3
2024-06-06T19:12:05.278+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#3-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-1,接收消息: Hello ActiveMQ, queue message 4
2024-06-06T19:12:05.291+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#2-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-3,接收消息: Hello ActiveMQ, queue message 5
2024-06-06T19:12:06.293+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#3-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-1,接收消息: Hello ActiveMQ, queue message 7
2024-06-06T19:12:06.293+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#2-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-3,接收消息: Hello ActiveMQ, queue message 8
2024-06-06T19:12:06.294+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 6
2024-06-06T19:12:07.310+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#1-1] c.e.a.consumer.QueueMessageConsumer : 队列消费者-2,接收消息: Hello ActiveMQ, queue message 9
主题接收结果
2024-06-06T19:57:55.141+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#4-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-2,接收消息: Hello ActiveMQ, topic message 0
2024-06-06T19:57:55.141+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#6-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-1,接收消息: Hello ActiveMQ, topic message 0
2024-06-06T19:57:55.141+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#5-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-3,接收消息: Hello ActiveMQ, topic message 0
2024-06-06T19:57:56.143+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#6-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-1,接收消息: Hello ActiveMQ, topic message 1
2024-06-06T19:57:56.143+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#4-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-2,接收消息: Hello ActiveMQ, topic message 1
2024-06-06T19:57:56.150+08:00 INFO 12064 --- [active-mq-consumer] [ntContainer#5-1] c.e.a.consumer.TopicMessageConsumer : 主题消费者-3,接收消息: Hello ActiveMQ, topic message 1
ActiveMQ控制台效果
队列- 控制台效果
主题-控制台效果
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)