一、JMS概述

      JMS(Java Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性

二、ActiveMQ的概述

    1、概念

          ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,提供客户端支持跨语言和协议,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

    2、特点

           1、 多种语言和协议编写客户端。语言: Java、C、C++、C#、Ruby、Perl、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP      

       2、完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)     

       3、对spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性      

       4、通过了常见J2EE服务器(如 Geronimo、JBoss 4、GlassFish、WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

       5、支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA

       6、支持通过JDBC和journal提供高速的消息持久化

       7、从设计上保证了高性能的集群,客户端-服务器,点对点

       8、支持Ajax

       9、支持与Axis的整合

      10、可以很容易得调用内嵌JMS provider,进行测试

    3、消息传输形式

         对于消息的传递有两种类型:

    (1)点对点的,即一个生产者和一个消费者一一对应;

    (2)发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。

     JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  · StreamMessage -- Java原始值的数据流

  · MapMessage--一套名称-值对

  · TextMessage--一个字符串对象(一般用这种形式

  · ObjectMessage--一个序列化的 Java对象

  · BytesMessage--一个字节的数据流

三、ActiveMQ下载安装

    1、开发环境

        System:Windows

      JDK:1.8

      IDE:eclipse

      apache ActiveMQ 5.8

    2、安装与介绍

         (1)下载activeMQ,官网地址http://activemq.apache.org/,最新版本是5.15.0,下载使用5.14.5版本,对应的apache-  activemq-5.14.5-bin.zip,Windows版本的。

      (2)直接解压apache-activemq-5.14.5-bin.zip,就完成安装。

         (3)解压后的目录结构如下

        

             +bin (windows下面的bat和unix/Linux下面的sh) 启动ActiveMQ的启动服务就在这里

             +conf (activeMQ配置目录,包含最基本的activeMQ配置文件)

             +data (默认是空的)

             +docs (index,replease版本里面没有文档)

             +example (几个例子)

             +lib (activeMQ使用到的lib)

             +webapps (系统管理员控制台代码)

             +webapps-demo(系统示例代码)

            -activemq-all-5.8.0.jar (ActiveMQ的binary)

            -user-guide.html (部署指引)

            -LICENSE.txt

            -NOTICE.txt 

            -README.txt

           此时可以进入bin目录下,使用activemq.bat双击启动(windows用户可以选择系统位数也可以用cmd窗口进行启动,如果你是 linux的话,就用命令行的发送去启动),如果没有问题,会出现下面的页面

        

        如果出现这种异常java.NET.SocketException: Unrecognized Windows Sockets error: 0: JVM_Bind,说明activeMQ端口被占用,需要kill点占用端口61616(mq默认端口)进程,或者在/conf/activemq.xml更改端口。

     (4)启动成功就可以访问管理员界面:http://localhost:8161/admin,默认用户名和密码admin/admin。如果你想修改用户名和密码的话,在conf/jetty-realm.properties中修改用户,一般不更改

   

       导航菜单中,Queues是队列方式消息。Topics是主题方式消息。Subscribers消息订阅监控查询。Connections可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接。Network是网络链接数监控。Send可以发送消息数据。

    (5)ActiveMQ使用图解

        

     (6)ActiveMQ发送与接收消息的基本步骤

        发送消息的基本步骤:

       (1)、创建连接使用的工厂类JMS ConnectionFactory

       (2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

       (3)、使用连接Connection 建立会话Session

       (4)、使用会话Session和管理对象Destination创建消息生产者MessageSender

       (5)、使用消息生产者MessageSender发送消息

       消息接收者从JMS接受消息的步骤

       (1)、创建连接使用的工厂类JMS ConnectionFactory

       (2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

       (3)、使用连接Connection 建立会话Session

       (4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver

       (5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。

三、代码演示

        1、创建一个java的project,在工程下添加activemq-all-5.14.5.jar的jar包

       

      2、Queue队列方式发送点对点消息数据

       1)消息生产者代码

package com.mq;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * activemq消息生产者
 * @author cheng
 *
 */
public class MyProducer {
	
	public static void main(String[] args) throws Exception {
		//创建连接工厂ConnectionFactory  mq连接连接时tcp协议,默认端口是61616
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		//创建连接connection
		Connection connection = connectionFactory.createConnection();
		//启动连接
		connection.start();
		//创建session
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//使用创建一个Destination目的对象,有两种:queue、topic,queue
		Destination destination = session.createQueue("myqueue");
		//使用session创建消息生成者
		MessageProducer producer = session.createProducer(destination);
		//创建TestMessage对象
		TextMessage textMessage = session.createTextMessage("hello activemq");
		//发送消息
		producer.send(textMessage);
		//关闭资源
		producer.close();
		session.close();
		connection.close();
	}

}
         运行main方法,就可以把"hello activemq"发送到ActiveMQ服务器中,下图可以看到        

      

       2)消息接受者

package com.mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * 消息信息接收者
 * @author cheng
 *
 */
public class MyReceiver {

	public static void main(String[] args) throws Exception {
		//创建一个连接工厂对象
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		//使用连接工厂对象创建一个连接
		Connection connection = connectionFactory.createConnection();
		//开启连接
		connection.start();
		//使用连接对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//使用Session创建一个Destination,Destination应该和消息的发送端一致。
		Queue queue = session.createQueue("myqueue");
		//使用Session创建一个Consumer对象
		MessageConsumer consumer = session.createConsumer(queue);
		//向Consumer对象中设置一个MessageListener对象,用来接收消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				//取消息的内容
				if(message instanceof TextMessage){
					TextMessage textMessage = (TextMessage) message;
					try {
						String text = textMessage.getText();
						//打印消息内容
						System.out.println(text);
					} catch (JMSException e) {
						e.printStackTrace();
					}
					
				}
			}
		});
		System.in.read();
		//关闭资源
		consumer.close();
		session.close();
		connection.close();
	}
	
}
        运行main方法,就会接收到消息生产者发送的消息           
       3、Topic主题发布和订阅消息

            1)消息生产者代码,运行main方法就可以发送消息       

package com.mq;

import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * activemq消息生产者Topic主题
 * @author cheng
 *
 */
public class MyTopic {
	
	public static void main(String[] args) throws Exception {
		//创建连接工程 ConnectionFactory
		ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		//创建连接connection
		Connection connection = connectionFactory.createConnection();
		//启动
		connection.start();
		//创建session
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//创建Destination,应该使用topic
		Topic topic = session.createTopic("mytopic");
		//使用session创建消息生成者
		MessageProducer producer = session.createProducer(topic);
		//创建TestMessage对象
		TextMessage textMessage = session.createTextMessage("hello activemq topic");
		//发送消息
		producer.send(textMessage);
		//关闭资源
		producer.close();
		session.close();
		connection.close();
	}
	
}
        2)消息生产者接受者,运行main方法就可以接收消息,运行三个接收者如图

package com.mq;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;

import org.apache.activemq.ActiveMQConnectionFactory;

/**
 * activemq消息接受者Topic主题
 * @author cheng
 *
 */
public class MyTopicReceiver {
	
	public static void main(String[] args) throws Exception {
		//创建一个连接工厂对象
		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
		//使用连接工厂对象创建一个连接
		Connection connection = connectionFactory.createConnection();
		//开启连接
		connection.start();
		//使用连接对象创建一个Session对象
		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
		//使用Session创建一个Destination,Destination应该和消息的发送端一致。
		Topic topic = session.createTopic("mytopic");
		//使用Session创建一个Consumer对象
		MessageConsumer consumer = session.createConsumer(topic);
		//向Consumer对象中设置一个MessageListener对象,用来接收消息
		consumer.setMessageListener(new MessageListener() {
			
			@Override
			public void onMessage(Message message) {
				//取消息的内容
				if(message instanceof TextMessage){
					TextMessage textMessage = (TextMessage) message;
					try {
						String text = textMessage.getText();
						//打印消息内容
						System.out.println(text);
					} catch (JMSException e) {
						e.printStackTrace();
					}
					
				}
			}
		});
		System.out.println("topic消费者3.。。。");
		System.in.read();
		//关闭资源
		consumer.close();
		session.close();
		connection.close();
	}
	
}

      

     

     

       3)运行生产者main方法,此时三个接收者都能接收到消息,如图
        

       

      

  

     


       


Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐