ActiveMQ
一、什么是ActiveMQ?ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。二、ActiveMQ的下载1、主页:http://activemq.apache.org/ 目前...
一、什么是ActiveMQ?
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
二、ActiveMQ的下载
1、主页:http://activemq.apache.org/ 目前最新版本:5.15.3
2、开发包及源码下载地址:http://activemq.apache.org/activemq-5111-release.html
3、ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码 admin/admin
图一:启动activemq
图二:本地访问ActiveMQ 服务启动地址:http://127.0.0.1:8161/admin/ 用户名/密码 admin/admin
注:启动服务后,可以看到后台管理界面。
三、ActiveMQ的点对点使用
Session.AUTO_ACKNOWLEDGE。当客户成功的从receive 方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息。
Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。
Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝的确认消息的提交。如果 JMS provider 失败,那么可能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置为 true。
消息生产者
public class JMSProducer {
private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话
Destination destination;//消息目的地
MessageProducer messageProducer;//消息生产者
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
try {
connection = connectionFactory.createConnection();//通过工厂获取连接
connection.start();//启动连接
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//创建session
destination = session.createQueue("新闻队列");
messageProducer = session.createProducer(destination);//创建消息生产者
//发送消息
for(int i=0;i<10;i++){
TextMessage msg = session.createTextMessage( (i + 1) +"号");
messageProducer.send(destination,msg);
}
session.commit();
}catch (Exception e){
e.printStackTrace();
}finally {
if(connection!=null){
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
消息消费者
public class JMSConsumer {
private static final String USERNAME= ActiveMQConnection.DEFAULT_USER; // 默认的连接用户名
private static final String PASSWORD=ActiveMQConnection.DEFAULT_PASSWORD; // 默认的连接密码
private static final String BROKEURL=ActiveMQConnection.DEFAULT_BROKER_URL; // 默认的连接地址
public static void main(String[] args) {
ConnectionFactory connectionFactory;//连接工厂
Connection connection = null;//连接
Session session;//会话
Destination destination;//消息目的地
MessageConsumer consumer;//消息消费者
connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEURL);
try {
connection = connectionFactory.createConnection();//通过工厂获取连接
connection.start();//启动连接
session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);//创建session
destination = session.createQueue("新闻队列");
consumer = session.createConsumer(destination);//创建消息消费者
//发送消息
while(true){
TextMessage message = (TextMessage) consumer.receive(10000);
if(message != null){
System.out.println(message.getText());//获取消息
}
}
}catch (Exception e){
e.printStackTrace();
}
}
接收消息监听器
四、ActiveMQ的发布和订阅
更多推荐
所有评论(0)