activemq分发策略,topic点对点
activemq的默认分发策略是org.apache.activemq.broker.region.policy.SimpleDispatchPolicy,不用任何配置就是它,这个类在activemq-broker子项目下。我们看下他的源代码:/*** Simple dispatch policy that sends a message to every subscription that* m
activemq的默认分发策略是org.apache.activemq.broker.region.policy.SimpleDispatchPolicy,不用任何配置就是它,这个类在activemq-broker子项目下。
我们看下他的源代码:
/**
* Simple dispatch policy that sends a message to every subscription that
* matches the message.
*
* @org.apache.xbean.XBean
*
*/
public class SimpleDispatchPolicy implements DispatchPolicy {
private static Logger log = LoggerFactory.getLogger(SimpleDispatchPolicy.class);
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers)
throws Exception {
int count = 0;
for (Subscription sub : consumers) {
// Don't deliver to browsers
if (sub.getConsumerInfo().isBrowser()) {
continue;
}
// 是否当前订阅者对该消息感兴趣?
if (!sub.matches(node, msgContext)) {
sub.unmatched(node);
continue;
}
//给这个订阅者发消息
sub.add(node);
count++;
}
return count > 0;
}
}
源代码非常简单,就是遍历订阅者,然后发消息就是了。
org.apache.activemq.broker.region.policy.这个包下不仅有SimpleDispatchPolicy也有其他的几个策略PriorityDispatchPolicy、StrictOrderDispatchPolicy等。
如果你想使用其他的策略,可以在activemq.xml里配置。
<policyEntry topic="PTP.>">
<dispatchPolicy>
<priorityDispatchPolicy/>
</dispatchPolicy>
</policyEntry>
上面的配置意思是:以PTP.开头的主题采用PriorityDispatchPolicy这个策略模式。
如果你想自己实现,可以继承DispatchPolicy 然后重写方法。
但是要注意几点:
1.要像其他DispatchPolicy 一样,类注释上面加@org.apache.xbean.XBean
2.写好的类放入org.apache.activemq.broker.region.policy这个包里
3.maven package 整个项目,不是单个activemq-broker子项目,而是整个项目,这样activemq-spring项目就会加载所有@org.apache.xbean.XBean注册到其下的一个activemq.xsd里,如果第三条不实现的话,那么在启动的时候就会报错。
把替換 activemq-spring-x.x.x.jar和activemq-broker-x.x.x-.jar俩包放%ACTIVEMQ%\lib里面替换掉。
4.activemq.xml里如上面那个配置即可,最里面是Policy的首字母小写。
一个例子。我想在主题模式里实现点对点,我懒省事了。修改了SimpleDispatchPolicy 类,然后重新只打activemq-broker包替换掉。修改如下:
public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers)
throws Exception {
Message message = node.getMessage();//获取messgae
Object property = message.getProperty("CLIENT_ID");//获取发送时传的针对哪个clientid
ActiveMQDestination destination = message.getDestination();
int count = 0;
for (Subscription sub : consumers) {
// Don't deliver to browsers
if (sub.getConsumerInfo().isBrowser()) {
continue;
}
// Only dispatch to interested subscriptions
if (!sub.matches(node, msgContext)) {
sub.unmatched(node);
continue;
}
//如果是P2P.myco,那么我看clientid是否相同,相同才发
if (destination.isTopic() && destination.getPhysicalName().equals("P2P.myco")) {
String clientId = sub.getContext().getClientId();
if (!clientId.equals(property)) {
continue;
}
}
sub.add(node);
count++;
}
return count > 0;
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐


所有评论(0)