springboot集成ActiveMQ

ActiveMQ简介

ActiveMQ是Apache提供的一个开源的消息系统,完全采用Java来实现,因此它能很好地支持JMS(Java Message Service,即Java消息服务)。
SpringBoot提供了对JMS的支持,并且对主流的消息中间件如 RabbitMQ、Apache Kafka、Apache ActiveMQ 等都提供了集成。

特点:
  • 支持Java消息服务(JMS) 1.1 版本
  • Spring Framework
  • 集群 (Clustering)
  • 支持的编程语言包括:C、C++、C#、Delphi、Erlang、Adobe Flash、Haskell、Java、JavaScript、Perl、PHP、Pike、Python和Ruby
  • 协议支持包括:OpenWire、REST、STOMP、WS-Notification、MQTT、XMPP以及AMQP
消息形式:
  • 点对点(queue) :一次生产,只能有一个消费者消费
  • 一对多(topic) :一次生产,可以有多个消费者消费

springboot集成ActiveMQ

  • 添加依赖
<!-- spring-boot-starter-activemq 不用写版本号,会根据springboot的版本号自己选择-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
  • 配置application.yml
server:
  port: 8088 # 配置端口
  servlet:
      context-path: /moyundong # 配置项目名称

# 配置queue和topic
myTopicTest: springboot2-test17-topicTest01
myQueueTest: springboot2-test17-queueTest01

配置文件里面配置了一个myTopicTest和一个myQueueTest
::: warning 注意
这里我们并没有配置ActiveMQ的相关配置信息,比如url,用户名和密码等
:::

  • 创ActiveMQ建配置类
@Configuration
@EnableJms //自动配置jms
public class ActiveMQConfig {
    @Value("${myTopicTest}")
    public String topicName;
    @Value("${myQueueTest}")
    public String queueName;

    /**
     * 创建一个queue消息模型的bean
     * @return
     */
    @Bean
    public Queue queue() {
        return new ActiveMQQueue(queueName);
    }

    /**
     * topic模式的ListenerContainer,springboot默认只配置queue类型消息,如果要使用topic类型的消息,则需要配置该bean
     * @param connectionFactory
     * @return
     */
    @Bean
    public JmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //这里必须设置为true,false则表示是queue类型
        factory.setPubSubDomain(true);
        return factory;
    }

    /**
     * 创建一个topic消息模型的bean
     * @return
     */
    @Bean
    public Topic topic() {
        return new ActiveMQTopic(topicName) ;
    }
}

配置类里面主要创建了3个bean,JmsListenerContainerFactory是为了开启topic,queue不用单独开启。然后再根据topicName和queueName创建了两个消息bean

  • 创建消费者类1
@Component
public class Consumer01 {

    /**
     * 接收queue类型消息
     * destination对应配置类中ActiveMQQueue("springboot2-test17-queueTest01")设置的名字
     * queue类型不用加containerFactory
     * @param msg
     */
    @JmsListener(destination="springboot2-test17-queueTest01")
    public void ListenQueue(String msg){
        System.out.println("Consumer01接收到queue消息:" + msg);
    }

    /**
     * 接收topic类型消息
     * destination对应配置类中ActiveMQTopic("springboot2-test17-topicTest01")设置的名字
     * containerFactory对应配置类中注册JmsListenerContainerFactory的bean名称
     * @param msg
     */
    @JmsListener(destination="springboot2-test17-topicTest01", containerFactory = "jmsTopicListenerContainerFactory")
    public void ListenTopic(String msg){
        System.out.println("Consumer01接收到topic消息:" + msg);
    }
}
  • 创建消费者类2
@Component
public class Consumer02 {

    /**
     * 接收queue类型消息
     * destination对应配置类中ActiveMQQueue("springboot2-test17-queueTest01")设置的名字
     * queue类型不用加containerFactory
     * @param msg
     */
    @JmsListener(destination="springboot2-test17-queueTest01")
    public void ListenQueue(String msg){
        System.out.println("Consumer02接收到queue消息:" + msg);
    }

    /**
     * 接收topic类型消息
     * destination对应配置类中ActiveMQTopic("springboot2-test17-topicTest01")设置的名字
     * containerFactory对应配置类中注册JmsListenerContainerFactory的bean名称
     * @param msg
     */
    @JmsListener(destination="springboot2-test17-topicTest01", containerFactory = "jmsTopicListenerContainerFactory")
    public void ListenTopic(String msg){
        System.out.println("Consumer02接收到topic消息:" + msg);
    }
}
  • 创建测试类,也是生产者
@RestController
@RequestMapping("test")
public class ActiveMQTestController {
    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    /**
     * 这个bean是在ActiveMQConfig配置文件中定义的
     */
    @Autowired
    private Queue queue;

    /**
     * 这个bean是在ActiveMQConfig配置文件中定义的
     */
    @Autowired
    private Topic topic;

    /**
     * 发送queue类型消息
     * @param msg
     */
    @GetMapping("/queue")
    public void sendQueueMsg(String msg){
        //生产一个queue类型的消息
        jmsTemplate.convertAndSend(queue, msg);
    }

    /**
     * 发送topic类型消息
     * @param msg
     */
    @GetMapping("/topic")
    public void sendTopicMsg(String msg){
        //生产一个topic类型的消息
        jmsTemplate.convertAndSend(topic, msg);
    }
}
  • 创建定时任务,也是生产者
@Component
@Slf4j
public class ScheduleTaskTest {
    @Autowired
    private JmsMessagingTemplate jmsTemplate;

    /**
     * 这个bean是在ActiveMQConfig配置文件中定义的
     */
    @Autowired
    private Queue queue;

    /**
     * 这个bean是在ActiveMQConfig配置文件中定义的
     */
    @Autowired
    private Topic topic;

    /**
     * 5秒钟生产一次
     */
    @Scheduled(cron="0/5 * * * * ?")
    private void task2(){
        log.info("生产一次消息");
        jmsTemplate.convertAndSend(queue,"hello queue!");
        jmsTemplate.convertAndSend(topic, "hello topic!");
    }
}

这个定时任务也是一个生产者,每隔5秒生产一个queue和topic类型的消息

  • 启动测试,
    启动成功后,我们会在控制台看到下面的信息,证明springboot默认开启了ActiveMQ的服务器,springboot2.1版本对应的ActiveMQ是5.15.7
2020-07-06 12:08:06.817  INFO 17436 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.15.7 (localhost, ID:LAPTOP-2G32U4LI-566-159439-0:1) is starting
2020-07-06 12:08:06.819  INFO 17436 --- [           main] o.apache.activemq.broker.BrokerService   : Apache ActiveMQ 5.15.7 (localhost, ID:LAPTOP-2G32U4LI-566-15940639-0:1) started

系统启动成功并且运行一段时间后,我们看到控制台打印如下信息,证明topic类型的消息,生产一次只要订阅的消费者都可以进行消费,但是queue类型的消息生产一次,只能是一个消费者进行消费

2020-07-06 12:08:10.002  INFO 17436 --- [   scheduling-1] com.moyundong.task.ScheduleTaskTest      : 生产一次消息
Consumer02接收到topic消息:hello topic!
Consumer02接收到queue消息:hello queue!
Consumer01接收到topic消息:hello topic!
2020-07-06 12:08:15.002  INFO 17436 --- [   scheduling-1] com.moyundong.task.ScheduleTaskTest      : 生产一次消息
Consumer01接收到queue消息:hello queue!
Consumer02接收到topic消息:hello topic!
Consumer01接收到topic消息:hello topic!
2020-07-06 12:08:20.002  INFO 17436 --- [   scheduling-1] com.moyundong.task.ScheduleTaskTest      : 生产一次消息
Consumer02接收到queue消息:hello queue!
Consumer01接收到topic消息:hello topic!
Consumer02接收到topic消息:hello topic!
2020-07-06 12:08:25.001  INFO 17436 --- [   scheduling-1] com.moyundong.task.ScheduleTaskTest      : 生产一次消息
Consumer01接收到queue消息:hello queue!
Consumer01接收到topic消息:hello topic!
Consumer02接收到topic消息:hello topic!

还可以通过http://localhost:8088/moyundong/test/topic?msg=hello topichttp://localhost:8088/moyundong/test/queue?msg=hello queue分别进行topic和queue类型的测试

自定义ActiveMQ服务器

上面我们使用的是springboot默认自带的ActiveMQ服务器,但是实际应用中,我们大部分都是使用独立的ActiveMQ服务器。

下载ActiveMQ

官网下载地址:ActiveMQ下载

下载界面如下:

安装

我下载的是windows版本,直接解压就可以了,不用安装

启动

在解压后的文件里找到apache-activemq-5.15.13\bin\win64目录下的activemq.bat文件,双击即可【jdk安装自己解决】
在浏览器http://localhost:8161就可以登录了,默认用户名密码是admin和admin

修改application.yml并测试
server:
  port: 8088 # 配置端口
  servlet:
      context-path: /moyundong # 配置项目名称
spring:
  activemq:
    #ActiveMQ通讯地址
    broker-url: tcp://localhost:61616
    #用户名
    user: admin
    #密码
    password: admin
    packages:
      #信任所有的包
      trust-all: true
    pool:
      #是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
      enabled: false
# 配置queue和topic
myTopicTest: springboot2-test17-topicTest01
myQueueTest: springboot2-test17-queueTest01

我们再配置文件里面新增了ActiveMQ的配置信息,这时候再启动服务,就看不到控制台启动ActiveMQ的5.15.7版本了,这时候使用的是我们下载并且启动的5.15.13版本了。
这时候我们可以通过ActiveMQ的管理器看到queue和topic底下有我们的消息了。

queue

topic

本节示例下载地址:java相关demo下载列表

1介绍
2springboot定时任务
3springboot定时任务配置详解
4springboot动态定时任务
5springboot集成websocket
6springboot多数据源
7springboot配置druid监听
8springboot自定义注解
9springboot常见注解详解
10springboot接收参数详解
11springboot验证机制@Valid和@Validated
12springboot集成Swagger2
13springboot集成swagger-bootstrap-ui
14springboot集成shiro
15springboot集成shiro(二)
16springboot集成jwt
17springboot集成ActiveMQ
18springboot缓存机制

🍉🍉🍉 欢迎大家来博客了解更多内容:java乐园 🍉🍉🍉

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐