ActiveMQ知识介绍
1. 介绍:apache软件基金会开源的一个消息中间件,由java语言实现,基于JMS(Java Message Service)协议的实现,支持JMS1.1和J2EE 1.4规范。流行,开源,性能强劲。支持多种语言客户端,如:Java、C、C++、C#、Ruby、Perl、PHP、Python等。2. 消息模式:点对点:每一条生产者发送的消息,只能被一个消费者消费一次,其他消费者不能再次...
·
1. 介绍:
apache基金会开源的一个消息中间件,由java语言开发,基于JMS(Java Message Service)协议的实现,支持JMS1.1和J2EE 1.4规范。流行,开源,性能强劲。支持多种语言客户端,如:Java、C、C++、C#、Ruby、Perl、PHP、Python等。
2. 消息模式:
- 点对点:每一条生产者发送的消息,只能被一个消费者消费一次,其他消费者不能再次消费。
- 主题(Topic):多个消费者订阅队列,每一条生产者发送的消息,所有订阅者都会接收进行消费。
[外链图片转存失败(img-H28BVveW-1568181460859)
3. 特点:
3.1 支持的消息类型
在javax.jms包Message接口中有定义:
TextMessage //文本消息
MapMessage //键值对消息
ObjectMessage //对象消息(需要序列化)
BytesMessage //字节消息
StreamMessage //stream流消息
3.2 消息权重:
- 概述:生产者在往队列发送消息时候,可以指定消息的权重,权重越高越被优先消费。权重的范围:0~9,默认都为0,即默认消息消费顺序就是发送时的顺序,FIFO。
- 实现方式:
1.修改activemq.xml配置文件,指定某个队列是否开启权重消费:
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
<!-- 指定开启消息权重的队列 -->
<policyEntry queue="test" prioritizedMessages="true"/>
</policyEntries>
</policyMap>
</destinationPolicy>
2.producer端程序发送消息时,指定消息权重。
//设定消息权重为6,优先消费
producer.send(message, DeliveryMode.PERSISTENT, 6, 0);
3.3 消费者权重
- 概述:ActiveMQ支持消费者优先级设置,Consumer的Priority划分为0~127个级别,127为最高,0最低,0也是ActiveMQ默认的级别。Broker通过消费者的权重进行排序,优先将消息分发给最高优先级的Consumer,直到该优先的consumer的prefetch buffer满了,Broker才把消息分发给优先级次高的。
- 使用场景:通常对网络较好、机器配置较高、处理速度较快、业务较简单的消费者设置较高的权重。
- 实现方式:
只需在创建目的地时候,目的地参数上加个权重属性。
//5.创建目的地
destination = session.createQueue("test?consumer.priority=30&consumer.prefetchSize=5");
- 备注:权重可以影响负载均衡,权重大的负载也大。
3.4 消息最大消费时间
- 概述:发送消息时,可以设置最大存活时间T,消费者可以在T时间内正常的消费该消息,一旦超过时间T则无法再消费。
- 实现方式:
1.producer发送消息前设置,然后所有发送的消息都按这个存活时间。
//设置消息最大消费时间,后续发送的消息都按这个存活时间,超时未正常消费进入死信队列
producer.setTimeToLive(10000l);
producer.send(message);
2.发送单条消息时,指定该消息的最大消费时间。
//设置消息最大消费时间,超时未正常消费进入死信队列
producer.send(message, DeliveryMode.PERSISTENT, 0, 10000);
- 备注:超过最大消费时间还未被消费的消息,会进入死信队列。
3.5 死信队列
- 概述:由于某些原因一些消息未能被消费者处理,activemq会将这些消息持久化到死信队列中,不再处理这些消息。ActiveMQ默认的死信队列名为:ActiveMQ.DLQ
- 作用:死信队列是MQ服务可靠性保障手段之一,未被处理的消息放到死信队列等待特殊处理,总比消息丢失要强得多。
- 什么情况下消息会进入死信队列?
1.消费者在最大存活时间T内未消费掉该消息
2.异步接收时,默认重试6次都无法消费掉该消息
3.默认broker端重发6次该消息,消费者都无法消费掉时
4. 事务:
- 概述:事务是MQ消息投递可靠性保障的一种手段,传统的数据库事务主要包括:A(原子性)、C(一致性)、I(隔离性)、D(持久性),在MQ当中主要满足"一致性"的要求。
- 举例:比如我们有多个producer,或要发送多条message,如果在业务上要求这些消息要么同时都发送成功,要么都发送失败,此时就需要放在同一事物中。
- 实现:
/**
* 开启事务--模拟多次发送消息,中间有业务异常导致事务提交失败,这时已发送的消息也不会在MQ中看到。
*/
public static void main(String[] args) {
//mq服务地址
String broker = "tcp://192.168.1.113:61616";
//链接工厂接口
ConnectionFactory connectionFactory = null;
//链接接口
Connection connection = null;
//会话接口
Session session = null;
//目的地接口
Destination destination = null;
//生产者接口
MessageProducer producer = null;
//消息接口
Message message = null;
String[] msgs = new String[]{"zhangsan", "lisi", "wangwu", "james", "wade", "kobe"};
try {
//1.创建链接工厂
connectionFactory = new ActiveMQConnectionFactory(broker);
//2.获取链接
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session,第一个参数为是否开启事务,第二个为ack方式。
//当开启事务时,ack方式只能设为0,跟随事务提交ack。
session = connection.createSession(true, 0);
//5.创建目的地
destination = session.createQueue("test");
//6.创建消息生产者
producer = session.createProducer(destination);
//8.发送消息
for(String m:msgs) {
message = session.createTextMessage(m+" hit the boll!");
producer.send(message);
}
//模拟出现业务异常,未正常提交事务,消息发送全都不会成功。
if(0==0) {
int s = 100/0;
}
//9.提交事务(开启事务时执行)
session.commit();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
//10.关闭资源
try {
if (producer!=null) {producer.close();}
if (session!=null) {session.close();}
if (connection!=null) {connection.close();}
System.out.printf("资源关闭...");
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
5. ACK机制:
- 为何会有ACK机制?
消息投放过程中会存在以下问题:
- prudocer生产的消息,如何确保已成功发送到了MQ中?(消息发送不丢失)
- consumer消费消息后,如何及时通知MQ把已消费的消息从服务器清除掉?(消息消费不重复)
- 概述:ACK即acknowleage,应答机制,也是MQ消息投递可靠性保障的一种手段。MQ服务器通过客户端的ack消息,确定消息是否要从服务器清除。
- ack机制的策略:
在javax.jms包Session接口中有定义:
int SESSION_TRANSACTED = 0; //事务提交时自动发送ack
int AUTO_ACKNOWLEDGE = 1; //自动确认,一旦消息接收到就附带发送ack
int CLIENT_ACKNOWLEDGE = 2; //手动确认,客户端代码手动ack
int DUPS_OK_ACKNOWLEDGE = 3;//重复副本的确认
- 实现方式:
这里主要针对消费端举例ack的实现方式:
- 开启事务情况下:
a.同步消费:
/**
* 开启事务+同步消费时的ack
*/
public static void main(String[] args) {
//mq服务地址
String broker = "tcp://192.168.1.111:61616";
//链接工厂接口
ConnectionFactory connectionFactory = null;
//链接接口
Connection connection = null;
//会话接口
Session session = null;
//目的地接口
Destination destination = null;
//消费者接口
MessageConsumer consumer = null;
//消息接口
Message message = null;
try {
//1.创建链接工厂
connectionFactory = new ActiveMQConnectionFactory(broker);
//2.获取链接
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session,第一个参数为是否开启事务,第二个为ack方式。
// 当手动确认时,事务参数需要设为true,第二个参数只能事务提交时自动ack。
// ack的时机在执行session.commit()时触发
session = connection.createSession(true, Session.SESSION_TRANSACTED);
//5.创建目的地
destination = session.createQueue("test");
//6.创建消息消费者
consumer = session.createConsumer(destination);
//7.消费消息(同步)
message = consumer.receive();
String text = ((TextMessage)message).getText();
System.out.println(text);
//8.提交事务
//注意:开启事务时,ack在commit()时同时提交
// 如果此时未正常commit(),ActiveMQ会认为该消息未正常ack,此时该消息不会在MQ服务器清除,导致再次发送给其他消费者。
// session.commit();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
//9.关闭资源
try {
if (consumer!=null) {consumer.close();}
if (session!=null) {session.close();}
if (connection!=null) {connection.close();}
System.out.printf("资源关闭...");
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
b.异步消费:
/**
* 开启事务+异步消费时的ack
*/
public static void main(String[] args) {
//mq服务地址
String broker = "tcp://192.168.1.111:61616";
//链接工厂接口
ConnectionFactory connectionFactory = null;
//链接接口
Connection connection = null;
//会话接口
Session session = null;
//目的地接口
Destination destination = null;
//消费者接口
MessageConsumer consumer = null;
try {
//1.创建链接工厂
connectionFactory = new ActiveMQConnectionFactory(broker);
//2.获取链接
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session,第一个参数为是否开启事务,第二个为ack方式。
// 当手动确认时,事务参数需要设为true,第二个参数只能事务提交时自动ack。
// ack的时机在执行session.commit()时触发
session = connection.createSession(true, Session.SESSION_TRANSACTED);
//5.创建目的地
destination = session.createQueue("test");
//6.创建消息消费者
consumer = session.createConsumer(destination);
//7.消费消息(异步),需要实现MessageListener接口
Session finalSession = session;
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//消费消息
try {
String text = ((TextMessage)message).getText();
System.out.println(text);
//8.提交事务
//注意:开启事务时,ack在commit()时同时提交
// 如果此时未正常commit(),ActiveMQ会认为该消息未正常ack,此时该消息不会在MQ服务器清除,导致再次发送给其他消费者。
// finalSession.commit();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (Exception ex) {
ex.printStackTrace();
}
}
- 自动ack情况下:
a.同步消费:
/**
* 手动确认+同步消费时的ack
*/
public static void main(String[] args) {
//mq服务地址
String broker = "tcp://192.168.1.111:61616";
//链接工厂接口
ConnectionFactory connectionFactory = null;
//链接接口
Connection connection = null;
//会话接口
Session session = null;
//目的地接口
Destination destination = null;
//消费者接口
MessageConsumer consumer = null;
//消息接口
Message message = null;
try {
//1.创建链接工厂
connectionFactory = new ActiveMQConnectionFactory(broker);
//2.获取链接
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session,第一个参数为是否开启事务,第二个为ack方式。
// 当手动确认时,事务参数需要设为false,第二个参数选客户端手动确认。
// ack的时机由代码调用处触发
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//5.创建目的地
destination = session.createQueue("test");
//6.创建消息消费者
consumer = session.createConsumer(destination);
//7.消费消息(同步)
message = consumer.receive();
String text = ((TextMessage)message).getText();
System.out.println(text);
// 注意:此时需要调用acknowledge()手动确认
// 如未显示调用该方法,ActiveMQ会认为该消息未正常ack,此时该消息不会在MQ服务器清除,导致再次发送给其他消费者。
// message.acknowledge();
} catch (Exception ex) {
ex.printStackTrace();
} finally {
//9.关闭资源
try {
if (consumer!=null) {consumer.close();}
if (session!=null) {session.close();}
if (connection!=null) {connection.close();}
System.out.printf("资源关闭...");
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
b.异步消费:
/**
* 开启事务+异步消费时的ack
*/
public static void main(String[] args) {
//mq服务地址
String broker = "tcp://192.168.1.111:61616";
//链接工厂接口
ConnectionFactory connectionFactory = null;
//链接接口
Connection connection = null;
//会话接口
Session session = null;
//目的地接口
Destination destination = null;
//消费者接口
MessageConsumer consumer = null;
//消息接口
Message message = null;
try {
//1.创建链接工厂
connectionFactory = new ActiveMQConnectionFactory(broker);
//2.获取链接
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session,第一个参数为是否开启事务,第二个为ack方式。
// 当手动确认时,事务参数需要设为false,第二个参数选客户端手动确认。
// ack的时机由代码调用处触发
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
//5.创建目的地
destination = session.createQueue("test");
//6.创建消息消费者
consumer = session.createConsumer(destination);
//7.消费消息(异步),需要实现MessageListener接口
Session finalSession = session;
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
String text = ((TextMessage)message).getText();
System.out.println(text);
// 注意:此时需要调用acknowledge()手动确认
// 如未显示调用该方法,ActiveMQ会认为该消息未正常ack,此时该消息不会在MQ服务器清除,导致再次发送给其他消费者。
// message.acknowledge();
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (Exception ex) {
ex.printStackTrace();
}
}
- 手动ack情况下:
a.同步消费:
/**
* 自动确认+同步消费时的ack
*/
public static void main(String[] args) {
//mq服务地址
String broker = "tcp://192.168.1.111:61616";
//链接工厂接口
ConnectionFactory connectionFactory = null;
//链接接口
Connection connection = null;
//会话接口
Session session = null;
//目的地接口
Destination destination = null;
//消费者接口
MessageConsumer consumer = null;
//消息接口
Message message = null;
try {
//1.创建链接工厂
connectionFactory = new ActiveMQConnectionFactory(broker);
//2.获取链接
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session,第一个参数为是否开启事务,第二个为ack方式。
// 当自动确认时,事务参数需要设为false,第二个参数选自动确认。
// ack的时机由consumer.receive()执行完时触发
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建目的地
destination = session.createQueue("test");
//6.创建消息消费者
consumer = session.createConsumer(destination);
//7.消费消息(同步)
// 注意:当执行.receive()方法时,同时就执行了ack,此时MQ会将该消息在服务器中清除。
message = consumer.receive();
String text = ((TextMessage)message).getText();
System.out.println(text);
} catch (Exception ex) {
ex.printStackTrace();
} finally {
//9.关闭资源
try {
if (consumer!=null) {consumer.close();}
if (session!=null) {session.close();}
if (connection!=null) {connection.close();}
System.out.printf("资源关闭...");
} catch (JMSException ex) {
ex.printStackTrace();
}
}
}
b.异步消费:
/**
* 自动确认+异步消费时的ack
*/
public static void main(String[] args) {
//mq服务地址
String broker = "tcp://192.168.1.111:61616";
//链接工厂接口
ConnectionFactory connectionFactory = null;
//链接接口
Connection connection = null;
//会话接口
Session session = null;
//目的地接口
Destination destination = null;
//消费者接口
MessageConsumer consumer = null;
//消息接口
Message message = null;
try {
//1.创建链接工厂
connectionFactory = new ActiveMQConnectionFactory(broker);
//2.获取链接
connection = connectionFactory.createConnection();
//3.启动连接
connection.start();
//4.获取session,第一个参数为是否开启事务,第二个为ack方式。
// 当自动确认时,事务参数需要设为false,第二个参数选自动确认。
// ack的时机由异步处理的onMessage()方法执行完时触发
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//5.创建目的地
destination = session.createQueue("test");
//6.创建消息消费者
consumer = session.createConsumer(destination);
//7.消费消息(异步),需要实现MessageListener接口
consumer.setMessageListener(new MessageListener() {
// 该方法执行完毕时会自动ack
// 注意:如果该方法内部出现异常,导致方法未执行完,那么就不会自动ack。
// 此时该消息不会在MQ服务器清除,导致该消息再次发送。
@Override
public void onMessage(Message message) {
try {
String text = ((TextMessage)message).getText();
System.out.println(text);
//以下模拟消息消费后发生异常,未正常执行ack的情景,ActiveMQ会重复推送该消息,直到超过次数进入死信队列。
int sss = 10/0;
} catch (JMSException e) {
throw new RuntimeException(e);
}
}
});
} catch (Exception ex) {
ex.printStackTrace();
}
}
6. 消息重发:
- 概述:这里指的是MQ服务器到consumer客户端的又一次消息发送,也是MQ一种保障手段。
- 说明:
- consumer调用session.recover()方法可以触发MQ消息重发
- ActiveMQ会对所有"之前已发送过但未正常ack确认"的消息进行重发
- 如果一条消息,默认被重发6次后还未正常ack,该消息会进入死信队列不再被重发
- 重发跟同步消费或异步消费没有关系,结论都一样
- 实现方式:
if("james hit the boll!".equals(s)) {
// 调用session.recover()触发消息重发
// 消息重发的范围:ActiveMQ会对所有"之前已发送过但未正常ack确认"的消息进行重发
// 如果重发6次后还未正常ack的消息,会进入死信队列不再被重发。
session.recover();
} else {
//message.acknowledge();
}
7. 持久化方式:
- LevelDB,从V5.8版本后支持
- KahaDB,从V5.3版本后支持
- JDBC数据库,从V4.X版本支持
- 集群架构及特点:
更多推荐
已为社区贡献1条内容
所有评论(0)