常用的有ActiveMQ,RabbitMQ,kafka,RocketMQ。

ActiveMQ是Apache下的开源项目,完全支持JMS1.1和J2EE1.4规范的JMS Provider实现。 

由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

activemq的queue和topic

    JMS中定义了两种消息模型:

                  点对点(point to point, queue)

                  发布/订阅(publish/subscribe,topic)

什么是点对点---------可以看成是银行的一对一服务    我们去银行办业务,柜台业务员肯定是跟你一对一的服务的吧,TA不可能一次性对接多个客户吧

也就是queue 不可重复         你去银行存钱寄给你爸妈,你打100块钱,他们之中一个能取到你打的钱吧,不可能两个一起收到每人100块吧

Queue支持存在多个消费者。
package com.zkb;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {
    private static final String BROKER_URL = "tcp://localhost:61616";
    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test-queue");
            MessageConsumer consumer = session.createConsumer(destination);
            int i = 0;
            while (true) {
                Message message = consumer.receive(20000);
                if (message != null) {
                    if (message instanceof TextMessage) {
                        String text = ((TextMessage) message).getText();
                        System.out.println("第 " + i++ + "条消息 : " + text);
                    }
                } else {
                    break;
                }
            }
        } catch (Exception e) {

        }

    }
}
package com.zkb;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer2 {
    private static final String BROKER_URL = "tcp://localhost:61616";
    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test-queue");
            MessageConsumer consumer = session.createConsumer(destination);
            int i = 0;
            while (true) {
                Message message = consumer.receive(20000);
                if (message != null) {
                    if (message instanceof TextMessage) {
                        String text = ((TextMessage) message).getText();
                        System.out.println("第 " + i++ + "条消息 : " + text);
                    }
                } else {
                    break;
                }
            }
        } catch (Exception e) {

        }

    }
}
package com.zkb;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Producer {
    private static final String BROKER_URL = "tcp://localhost:61616";
    private static final Boolean NON_TRANSACTED = false;
    private static final int NUM_MESSAGES_TO_SEND = 100;
    private static final long DELAY = 100;

    public static void main(String[] args) {
        String url = BROKER_URL;
        if (args.length > 0) {
            url = args[0].trim();
        }
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "password", url);
        Connection connection = null;
        try {

            connection = connectionFactory.createConnection();
            connection.start();

            Session session = connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test-queue");
            MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {
                TextMessage message = session.createTextMessage("Message #" + i);
                System.out.println("message:  #" + i);
                producer.send(message);
                Thread.sleep(DELAY);
            }

            producer.close();
            session.close();

        } catch (Exception e) {
            System.out.println("Caught exception!");
        }
//        finally {
//            if (connection != null) {
//                try {
//                    connection.close();
//                } catch (JMSException e) {
//                    System.out.println("Could not close an open connection...");
//                }
//            }
//        }
    }
}

可以看到 queue 生产者总共生产了 0~99 100条消息

两个各消费了50个 由此可见    但是对一个消息而言,只会有一个消费者可以消费、其它的则不能消费此消息了

 

发布/订阅:Topic

package com.zkb.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppProducer {
    private static final String url = "tcp://127.0.0.1:61616";
    private static final String topicName = "topic-test";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic(topicName);
        MessageProducer producer = session.createProducer(destination);
        for (int i = 0; i < 10; i++) {
            TextMessage textMessage = session.createTextMessage("test" + i);
            producer.send(textMessage);
            System.out.println("发送消息"+textMessage.getText());
        }
        connection.close();
    }
}
package com.zkb.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppConsumer {
    private static final String url = "tcp://127.0.0.1:61616";
    private static final String topicName = "topic-test";
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic(topicName);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                try {
                    System.out.println("接收消息  = [" + ((TextMessage) message).getText() + "]");
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}
package com.zkb.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class AppConsumer2 {
    private static final String url = "tcp://127.0.0.1:61616";
    private static final String topicName = "topic-test";
    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
        Connection connection = connectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination destination = session.createTopic(topicName);
        MessageConsumer consumer = session.createConsumer(destination);
        consumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                try {
                    System.out.println("接收消息  = [" + ((TextMessage) message).getText() + "]");
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

可以看到我这边发了10条消息

两个消费端各10条  被重复消费了

    就好像播放电视一样,只要播放了,只要收看了的人都可以看到

 

好了明白了区别就老规矩,咱们用到实际项目中去应用,我们这里就springboot和它简单集成一下

 

 

@PostMapping("/produce")
	@ApiOperation(value="queue发送",notes = "queue发送")
	@ResponseBody
	public Result produce(Mail mail) throws Exception{
		Result result = new Result();
		producer.sendMail(queue, mail);
		return result.success();
	}
	
	@PostMapping("/topic")
	@ApiOperation(value="topic发送",notes = "topic发送")
	@ResponseBody
	public Result topic(Mail mail) throws Exception{
		Result result = new Result();
		producer.sendMail(topic, mail);
		return  result.success();
	}
@JmsListener(destination = "myqueue", containerFactory = "jmsListenerContainerQueue")
	public void displayMail(Mail mail) {
		System.out.println("listen1从ActiveMQ队列myqueue中取出一条消息:");
		System.out.println(mail.toString());
	}
@JmsListener(destination = "mytopic", containerFactory="jmsListenerContainerTopic")
	public void displayTopic(Mail msg) {
		System.out.println("consumer1从ActiveMQ的Topic:mytopic中取出一条消息:");
		System.out.println(msg);
	}

这边只贴一下关键的代码吧,具体看demo

demo自取https://download.csdn.net/download/qq_14926283/13695423(没打算挣积分,但是csdn积分自己会上涨,真心需要可私聊,也可以自己下载)

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐