转自 https://github.com/killme2008/Metamorphosis/wiki/Spring-Supports


1 配置消息会话工厂

在Sring容器内配置一个MessageSessionFactory:

<!--  message session factory -->
    <bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">
        <property name="zkConnect" value="127.0.0.1:2181"/>
        <property name="zkSessionTimeoutMs" value="30000"/>
        <property name="zkConnectionTimeoutMs" value="30000"/>
        <property name="zkSyncTimeMs" value="5000"/>
    </bean>

主要是zookeeper参数配置,需要跟服务端的zk配置保持一致。更多参数参见AbstractMetaqMessageSessionFactory的javadoc。

2 消息体转换器

MetaQ客户端提供了Java序列化实现的JavaSerializationMessageBodyConverter给你使用,你也可以自定义自己的消息body转换器,比如采用其他序列化协议,如protobufs,hessian等。配置一个消息body转换器,比如我们就使用Java序列化

 <!--  message body converter using java serialization. -->
    <bean id="messageBodyConverter"  
     class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>

3 使用MetaqTemplate发送消息

 <!--  template to send messages. -->
    <bean id ="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">    
        <property name="messageSessionFactory" ref="sessionFactory"/>
        <property name="messageBodyConverter" ref="messageBodyConverter"/>
    </bean>

metaqTemplate用到了上面配置的messageSessionFactory和messageBodyConverter,用来创建producer和转换消息体。
配置了metaqTemplate之后,你将可以在你要发送消息的JavaBean里引用这个对象,并使用它发送消息。

4 MessageBuilder创建消息并发送

 final String topic = "date";
 final SendResult sendResult =
                    template.send(MessageBuilder.withTopic(topic).withBody(new Date());

上面的例子发送topic为date的消息,消息体为java.util.Date对象,
MetaqTemplate将调用messageBodyConverter将消息体的date对象转换为byte数组,构建message对象再发送。
因为我们使用了JavaSerializationMessageBodyConverter做消息体转换,
因此任何可序列化的JavaBean其实都可以作为消息体发送,使用MessageBuilder.withBody方法设置即可。


5 订阅消息

<!--  topics to be subscribed. -->
    <bean id = "dateTopic" class="com.taobao.metamorphosis.client.extension.spring.MetaqTopic">
        <!-- consumer group -->
        <property name="group" value="testGroup"/>
        <!--  topic -->
        <property name="topic" value="date"/>
        <!--  max buffer size to fetch messages -->
        <property name="maxBufferSize" value="16384"/>
    </bean>
配置了订阅者的分组,想要订阅的topic以及maxBufferSize大小。


6 继承DefaultMessageListener

package com.taobao.metamorphosis.example.spring;

import com.taobao.metamorphosis.client.extension.spring.DefaultMessageListener;
import com.taobao.metamorphosis.client.extension.spring.MetaqMessage;
import java.util.Date;


/**
 * Process date messages listener.
 * 
 * @author dennis
 * 
 */
public class DateMessageListener extends DefaultMessageListener<Date> {

    @Override
    public void onReceiveMessages(MetaqMessage<Date> msg) {
        Date date = msg.getBody();
        System.out.println("receive date message:" + date);
    }

}
onReceiveMessages收到的是封装Message后的MetaqMessage对象,
它会使用你配置的消息体转换器将byte数组转换成java.util.Date对象,
你可以直接通过getBody获取上文metaqTemplate例子中发送的日期对象。


编写好消息处理器之后,需要配置消息处理器:

 <!--  message listener -->
    <bean id= "messageListener" class="com.taobao.metamorphosis.example.spring.DateMessageListener">
        <!--  threads to process these messages. -->
        <property name="processThreads" value="10"/>
    </bean>

7 配置MessageListenerContainer

有了MetaqTopic和消息处理器之后,我们就可以使用MessageListenerContainer来真正地订阅消息了:

  <!--  listener container to subscribe topics -->
    <bean id ="listenerContainer" class="com.taobao.metamorphosis.client.extension.spring.MessageListenerContainer"> 
         <property name="messageSessionFactory" ref="sessionFactory"/>
         <property name="messageBodyConverter" ref="messageBodyConverter"/>
         <property name="subscribers">
             <map>
                 <entry key-ref="dateTopic" value-ref="messageListener"/>
             </map>
         </property>
    </bean>




Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐