activemq的默认分发策略是org.apache.activemq.broker.region.policy.SimpleDispatchPolicy,不用任何配置就是它,这个类在activemq-broker子项目下。

我们看下他的源代码:

/**
 * Simple dispatch policy that sends a message to every subscription that
 * matches the message.
 * 
 * @org.apache.xbean.XBean
 * 
 */
public class SimpleDispatchPolicy implements DispatchPolicy {
	private static Logger log = LoggerFactory.getLogger(SimpleDispatchPolicy.class);

    public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers)
            throws Exception {
    	
        int count = 0;
        for (Subscription sub : consumers) {
            // Don't deliver to browsers
            if (sub.getConsumerInfo().isBrowser()) {
                continue;
            }
            // 是否当前订阅者对该消息感兴趣?
            if (!sub.matches(node, msgContext)) {
                sub.unmatched(node);
                continue;
            }
            //给这个订阅者发消息
        	sub.add(node);
            count++;
            
        }

        return count > 0;
    }

}

源代码非常简单,就是遍历订阅者,然后发消息就是了。

org.apache.activemq.broker.region.policy.这个包下不仅有SimpleDispatchPolicy也有其他的几个策略PriorityDispatchPolicy、StrictOrderDispatchPolicy等。

如果你想使用其他的策略,可以在activemq.xml里配置。

<policyEntry topic="PTP.>">
    <dispatchPolicy>
        <priorityDispatchPolicy/>
    </dispatchPolicy>
</policyEntry>

 上面的配置意思是:以PTP.开头的主题采用PriorityDispatchPolicy这个策略模式。

如果你想自己实现,可以继承DispatchPolicy 然后重写方法。

但是要注意几点:

1.要像其他DispatchPolicy 一样,类注释上面加@org.apache.xbean.XBean

2.写好的类放入org.apache.activemq.broker.region.policy这个包里

3.maven  package 整个项目,不是单个activemq-broker子项目,而是整个项目,这样activemq-spring项目就会加载所有@org.apache.xbean.XBean注册到其下的一个activemq.xsd里,如果第三条不实现的话,那么在启动的时候就会报错。

 把替換 activemq-spring-x.x.x.jar和activemq-broker-x.x.x-.jar俩包放%ACTIVEMQ%\lib里面替换掉。

4.activemq.xml里如上面那个配置即可,最里面是Policy的首字母小写。

 

一个例子。我想在主题模式里实现点对点,我懒省事了。修改了SimpleDispatchPolicy 类,然后重新只打activemq-broker包替换掉。修改如下:

public boolean dispatch(MessageReference node, MessageEvaluationContext msgContext, List<Subscription> consumers)
            throws Exception {
    	Message message = node.getMessage();//获取messgae
    	Object property = message.getProperty("CLIENT_ID");//获取发送时传的针对哪个clientid
    	ActiveMQDestination destination = message.getDestination();
    	
        int count = 0;
        for (Subscription sub : consumers) {
            // Don't deliver to browsers
            if (sub.getConsumerInfo().isBrowser()) {
                continue;
            }
            // Only dispatch to interested subscriptions
            if (!sub.matches(node, msgContext)) {
                sub.unmatched(node);
                continue;
            }
            //如果是P2P.myco,那么我看clientid是否相同,相同才发
            if (destination.isTopic() && destination.getPhysicalName().equals("P2P.myco")) {
            	String clientId = sub.getContext().getClientId();
            	if (!clientId.equals(property)) {
            		continue;
            	}
          
            }
            
        	sub.add(node);
            count++;
            
        }

        return count > 0;
    }

 

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐