springboot 集成 activemq
2-消息发送应用是 e-car 项目,接收端是 tcm-chatgpt项目,当然,同一个项目也是可以发送和接收的。1-在两个不同的应用发送和接收消息。
·
一:说明
1-在两个不同的应用发送和接收消息
2-消息发送应用是 e-car 项目,接收端是 tcm-chatgpt项目,当然,同一个项目也是可以发送和接收的
二:e-car项目配置
1 引入activemq依赖
<!-- 集成 ActiveMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
2 application启动类配置消息监听
加上 @EnableJms 注解, 开启JMS
@EnableJms // 开启JMS
@SpringBootApplication(scanBasePackages="com.nrbc.ecar")
@MapperScan("com.nrbc.ecar.dao.mapper")
public class EcarAppClientApplication {
public static void main(String[] args) {
SpringApplication.run(EcarAppClientApplication.class, args);
}
}
3 application.yml配置
- 1 注意: 开启主题策略,默认是关闭 开启主题模式,要设置为true
如果要使用队列模式,pub-sub-domain 要设置为false jms:
pub-sub-domain: false - 同时使用jms的Queue(队列)和Topic(发布订阅),可查看这边文章: 文章链接
spring:
# activemq相关配置
activemq:
broker-url: tcp://localhost:61616
user: liping
password: liping
packages:
# 配置信任所有的包,这个配置为了支持发送对象消息(如果传递的是对象则需要设置为true,默认是传字符串)
trust-all: true
# 开启主题策略,默认是关闭 开启主题模式
# 注意:如果要使用队列模式,pub-sub-domain 要设置为false
jms:
pub-sub-domain: true
# 配置activemq队列的名称和主题名称
amq:
qname:
queueName-1:
queueName-1
topicName:
name-1:
topic-prot-1
name-2:
topic-prot-2
4 MQConfig.java 配置类
/**
* 专门配置mq通道的配置类
*/
@Slf4j
@Configuration
public class MQConfig {
@Value("${amq.topicName.name-1}")
private String tpName;
@Bean(name = "queueName")
Queue queueName() {
return new ActiveMQQueue("test_queue");
}
/**
* 主题(发布\订阅模式)通道
* @author kazaf
* @date 2024/4/24 16:43
*/
@Bean(name = "topic1")
Topic queueFind() {
log.info("${amq.topicName.name-1}=" + tpName);
return new ActiveMQTopic("topic-model");
}
@Bean(name = "topic2")
Topic topic2() {
return new ActiveMQTopic("topic-model2");
}
@Bean(name = "topic3")
Topic topic3() {
return new ActiveMQTopic(tpName);
}
}
5 ecar 项目中的监听
/**
* 专门配置mq通道的配置类
*/
@Slf4j
@Component
public class MQListener {
/*@JmsListener(destination = "test_queue")
public void jiant (String message) {
System.out.println("监听到消息》:" + message);
log.info("监听到消息---》:" + message);
}*/
@JmsListener(destination = "topic-model")
public void reciveTopic(String message) {
log.info("11接收主题消息》:"+message);
}
@JmsListener(destination = "topic-model2")
public void topicReceive2(String message) {
log.info("topic-2监听到消息---》:" + message);
}
/**
* 主题名称从配置文件中动态获取
*/
@JmsListener(destination = "${amq.topicName.name-1}")
public void topicReceive3(String message) {
log.info("topic-3监听到消息---》:" + message);
}
}
6 junit 发送消息
- 调用类需要注入消息模板,队列名称或者主题名称
也可以编写接口发送,demo随个人习惯
@Slf4j
@SpringBootTest(classes = EcarAppClientApplication.class)
@RunWith(SpringRunner.class)
public class ActivemqTest {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queueName;
@Autowired
private Topic topic1;
@Autowired
private Topic topic2;
@Autowired
private Topic topic3;
@Test
public void queueSender() {
String message="我是队列发出的信息";
jmsMessagingTemplate.convertAndSend(queueName, message);
log.info("queueSender发送--》:"+message);
// TimeUnit.SECONDS.sleep(60);
}
/**
* 发送 主题消息(广播)
* @author kazaf
* @date 2024/4/24 17:09
*/
@Test
public void topicSender() {
String message="我是topic-1a";
jmsMessagingTemplate.convertAndSend(topic1, message);
log.info("topicSender发送--》:"+message);
String message2="我是topic2";
jmsMessagingTemplate.convertAndSend(topic2, message2);
log.info("topic2Sender发送--》:"+message2);
String message3="我是topic3发送的";
jmsMessagingTemplate.convertAndSend(topic3, message3);
log.info("topic3Sender发送--》:"+message3);
}
}
三:tcm-chatgpt项目配置
- 1、2、3、5 跟e-car项目一样的配置
5 MQListener.java 监听消息
代码
/**
* @Description: mq监听类
* @Author: kazaf
* @Date: 2024-04-24 9:34
*/
@Slf4j
@Component
public class MQListener {
@JmsListener(destination = "test_queue")
public void onMessage(String message) {
System.out.print(message);
//获取到消息后可以干一些事情
log.info("恰恰监听到的消息》:"+message);
}
@JmsListener(destination = "topic-model")
public void reciveTopic(String message) {
log.info("11接收主题消息》:"+message);
}
@JmsListener(destination = "topic-model2")
public void reciveTopic2(String message) {
log.info("22接收主题消息》:"+message);
}
}
三 测试
启动activemq服务
-
window端直接启动 bin\win64\activemq.bat 批处理文件
-
访问 http://localhost:8161 输入 admin / admin 的默认用户名密码登录(根据自己是否修改过)
-
启动tcm-chatgpt服务
-
启动e-car服务
-
运行 ActivemqTest.java 测试类中的 queue发送消息或者Topic发送消息
队列
发布订阅模式
四:发送对象消息
配置队列名称
/**
* 专门配置mq通道的配置类
*/
@Slf4j
@Configuration
public class MQConfig {
/**
*
* @author kazaf
* @date 2024/4/25 10:51
*/
@Bean(name = "textMessageQueue")
Queue textMessageQueue() {
return new ActiveMQQueue("textMessage-queue");
}
@Bean(name = "objMessageQueue")
Queue objMessageQueue() {
return new ActiveMQQueue("objMessage-queue");
}
}
开启监听
@Slf4j
@Component
public class MQListener {
/**
* 主题名称从配置文件中动态获取
*/
@JmsListener(destination = "textMessage-queue")
public void queueTextMessageReceive(String message) throws JMSException {
log.info("queueTextMessageReceive监听到消息---》:" + message);
}
@JmsListener(destination = "objMessage-queue")
public void queueMapMessageReceive(Map<String,Object> map) {
log.info("queueObjMessageReceive-Map-监听到消息---》:"
+ map.get("name")+"--money="+map.get("money"));
}
/**
* 主题名称从配置文件中动态获取
*/
@JmsListener(destination = "objMessage-queue")
public void queueObjMessageReceive(Message message) throws JMSException {
if (message instanceof TextMessage) {
log.info("queueObjMessageReceive-Text-监听到消息---》:" + ((TextMessage) message).getText());
} else if (message instanceof MapMessage) {
MapMessage mapMessage = (MapMessage) message;
log.info("queueObjMessageReceive-Map-监听到消息---》:"
+ mapMessage.getString("name")+"--money="+mapMessage.getString("money"));
} else if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message;
User user = (User) objectMessage.getObject();
log.info("queueObjMessageReceive-obj-监听到消息---》:" + user);
}
log.info("queueObjMessageReceive-监听到消息--");
}
测试类
@Slf4j
@SpringBootTest(classes = EcarAppClientApplication.class)
@RunWith(SpringRunner.class)
public class ActivemqTest {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private JmsTemplate jmsTemplate;
@Autowired
private Queue textMessageQueue;
@Autowired
private Queue objMessageQueue;
@Test
public void senderTextMessage() {
jmsTemplate.send(textMessageQueue, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message = session.createTextMessage();
message.setText("发送TextMessage消息");
return message;
}
});
}
@Test
public void senderObjMessage() {
/*
User user = new User();
user.setUsername("李哥哥");
user.setPassword("mima123");
jmsMessagingTemplate.convertAndSend(objMessageQueue, user);
*/
Map<String, Object> map = new HashMap<>();
map.put("name", "掐果果");
map.put("money", 1231111111.55);
jmsMessagingTemplate.convertAndSend(objMessageQueue, map);
}
}
接收ObjectMessage的消息
Map接收的消息
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献2条内容
所有评论(0)