JMS应用之消息队列ActiveMQ的基本运用
一、JMS概述 JMS(Java Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。二、ActiveMQ的概述 1、概念 ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveM
一、JMS概述
JMS(Java Message Service,java消息服务)API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。
二、ActiveMQ的概述
1、概念
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,提供客户端支持跨语言和协议,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
2、特点
1、 多种语言和协议编写客户端。语言: Java、C、C++、C#、Ruby、Perl、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP
2、完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
3、对spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
4、通过了常见J2EE服务器(如 Geronimo、JBoss 4、GlassFish、WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
5、支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA
6、支持通过JDBC和journal提供高速的消息持久化
7、从设计上保证了高性能的集群,客户端-服务器,点对点
8、支持Ajax
9、支持与Axis的整合
10、可以很容易得调用内嵌JMS provider,进行测试
3、消息传输形式
对于消息的传递有两种类型:
(1)点对点的,即一个生产者和一个消费者一一对应;
(2)发布/订阅模式,即一个生产者产生消息并进行发送后,可以由多个消费者进行接收。
JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
· StreamMessage -- Java原始值的数据流
· MapMessage--一套名称-值对
· TextMessage--一个字符串对象(一般用这种形式)
· ObjectMessage--一个序列化的 Java对象
· BytesMessage--一个字节的数据流
三、ActiveMQ下载安装
1、开发环境
System:Windows
JDK:1.8
IDE:eclipse
apache ActiveMQ 5.8
2、安装与介绍
(1)下载activeMQ,官网地址http://activemq.apache.org/,最新版本是5.15.0,下载使用5.14.5版本,对应的apache- activemq-5.14.5-bin.zip,Windows版本的。
(2)直接解压apache-activemq-5.14.5-bin.zip,就完成安装。
(3)解压后的目录结构如下
+bin (windows下面的bat和unix/Linux下面的sh) 启动ActiveMQ的启动服务就在这里
+conf (activeMQ配置目录,包含最基本的activeMQ配置文件)
+data (默认是空的)
+docs (index,replease版本里面没有文档)
+example (几个例子)
+lib (activeMQ使用到的lib)
+webapps (系统管理员控制台代码)
+webapps-demo(系统示例代码)
-activemq-all-5.8.0.jar (ActiveMQ的binary)
-user-guide.html (部署指引)
-LICENSE.txt
-NOTICE.txt
-README.txt
此时可以进入bin目录下,使用activemq.bat双击启动(windows用户可以选择系统位数也可以用cmd窗口进行启动,如果你是 linux的话,就用命令行的发送去启动),如果没有问题,会出现下面的页面
如果出现这种异常java.NET.SocketException: Unrecognized Windows Sockets error: 0: JVM_Bind,说明activeMQ端口被占用,需要kill点占用端口61616(mq默认端口)进程,或者在/conf/activemq.xml更改端口。
(4)启动成功就可以访问管理员界面:http://localhost:8161/admin,默认用户名和密码admin/admin。如果你想修改用户名和密码的话,在conf/jetty-realm.properties中修改用户,一般不更改
导航菜单中,Queues是队列方式消息。Topics是主题方式消息。Subscribers消息订阅监控查询。Connections可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接。Network是网络链接数监控。Send可以发送消息数据。
(5)ActiveMQ使用图解
(6)ActiveMQ发送与接收消息的基本步骤
发送消息的基本步骤:
(1)、创建连接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection 建立会话Session
(4)、使用会话Session和管理对象Destination创建消息生产者MessageSender
(5)、使用消息生产者MessageSender发送消息
消息接收者从JMS接受消息的步骤
(1)、创建连接使用的工厂类JMS ConnectionFactory
(2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动
(3)、使用连接Connection 建立会话Session
(4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver
(5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。
三、代码演示
1、创建一个java的project,在工程下添加activemq-all-5.14.5.jar的jar包
2、Queue队列方式发送点对点消息数据
1)消息生产者代码
package com.mq;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* activemq消息生产者
* @author cheng
*
*/
public class MyProducer {
public static void main(String[] args) throws Exception {
//创建连接工厂ConnectionFactory mq连接连接时tcp协议,默认端口是61616
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//创建连接connection
Connection connection = connectionFactory.createConnection();
//启动连接
connection.start();
//创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用创建一个Destination目的对象,有两种:queue、topic,queue
Destination destination = session.createQueue("myqueue");
//使用session创建消息生成者
MessageProducer producer = session.createProducer(destination);
//创建TestMessage对象
TextMessage textMessage = session.createTextMessage("hello activemq");
//发送消息
producer.send(textMessage);
//关闭资源
producer.close();
session.close();
connection.close();
}
}
运行main方法,就可以把"hello activemq"发送到ActiveMQ服务器中,下图可以看到
2)消息接受者
package com.mq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* 消息信息接收者
* @author cheng
*
*/
public class MyReceiver {
public static void main(String[] args) throws Exception {
//创建一个连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//使用连接工厂对象创建一个连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用连接对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session创建一个Destination,Destination应该和消息的发送端一致。
Queue queue = session.createQueue("myqueue");
//使用Session创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(queue);
//向Consumer对象中设置一个MessageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//取消息的内容
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
//打印消息内容
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
}
运行main方法,就会接收到消息生产者发送的消息
3、Topic主题发布和订阅消息
1)消息生产者代码,运行main方法就可以发送消息
package com.mq;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* activemq消息生产者Topic主题
* @author cheng
*
*/
public class MyTopic {
public static void main(String[] args) throws Exception {
//创建连接工程 ConnectionFactory
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//创建连接connection
Connection connection = connectionFactory.createConnection();
//启动
connection.start();
//创建session
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建Destination,应该使用topic
Topic topic = session.createTopic("mytopic");
//使用session创建消息生成者
MessageProducer producer = session.createProducer(topic);
//创建TestMessage对象
TextMessage textMessage = session.createTextMessage("hello activemq topic");
//发送消息
producer.send(textMessage);
//关闭资源
producer.close();
session.close();
connection.close();
}
}
2)消息生产者接受者,运行main方法就可以接收消息,运行三个接收者如图
package com.mq;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import org.apache.activemq.ActiveMQConnectionFactory;
/**
* activemq消息接受者Topic主题
* @author cheng
*
*/
public class MyTopicReceiver {
public static void main(String[] args) throws Exception {
//创建一个连接工厂对象
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
//使用连接工厂对象创建一个连接
Connection connection = connectionFactory.createConnection();
//开启连接
connection.start();
//使用连接对象创建一个Session对象
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//使用Session创建一个Destination,Destination应该和消息的发送端一致。
Topic topic = session.createTopic("mytopic");
//使用Session创建一个Consumer对象
MessageConsumer consumer = session.createConsumer(topic);
//向Consumer对象中设置一个MessageListener对象,用来接收消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//取消息的内容
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage) message;
try {
String text = textMessage.getText();
//打印消息内容
System.out.println(text);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
});
System.out.println("topic消费者3.。。。");
System.in.read();
//关闭资源
consumer.close();
session.close();
connection.close();
}
}
3)运行生产者main方法,此时三个接收者都能接收到消息,如图
更多推荐
所有评论(0)