Spring整合metaq
转自 https://github.com/killme2008/Metamorphosis/wiki/简单例子1 配置消息会话工厂在Sring容器内配置一个MessageSessionFactory:主要是zookeeper参数配置,需要跟服务端的zk配置保持一致。更多参数参见Ab
·
转自 https://github.com/killme2008/Metamorphosis/wiki/Spring-Supports
1 配置消息会话工厂
在Sring容器内配置一个MessageSessionFactory:
<!-- message session factory -->
<bean id="sessionFactory" class="com.taobao.metamorphosis.client.extension.spring.MetaqMessageSessionFactoryBean">
<property name="zkConnect" value="127.0.0.1:2181"/>
<property name="zkSessionTimeoutMs" value="30000"/>
<property name="zkConnectionTimeoutMs" value="30000"/>
<property name="zkSyncTimeMs" value="5000"/>
</bean>
主要是zookeeper参数配置,需要跟服务端的zk配置保持一致。更多参数参见
AbstractMetaqMessageSessionFactory
的javadoc。2 消息体转换器
MetaQ客户端提供了Java序列化实现的JavaSerializationMessageBodyConverter
给你使用,你也可以自定义自己的消息body转换器,比如采用其他序列化协议,如protobufs,hessian等。配置一个消息body转换器,比如我们就使用Java序列化
<!-- message body converter using java serialization. -->
<bean id="messageBodyConverter"
class="com.taobao.metamorphosis.client.extension.spring.JavaSerializationMessageBodyConverter"/>
3 使用MetaqTemplate发送消息
<!-- template to send messages. -->
<bean id ="metaqTemplate" class="com.taobao.metamorphosis.client.extension.spring.MetaqTemplate">
<property name="messageSessionFactory" ref="sessionFactory"/>
<property name="messageBodyConverter" ref="messageBodyConverter"/>
</bean>
metaqTemplate用到了上面配置的messageSessionFactory和messageBodyConverter,用来创建producer和转换消息体。
配置了metaqTemplate之后,你将可以在你要发送消息的JavaBean里引用这个对象,并使用它发送消息。
4 MessageBuilder创建消息并发送
final String topic = "date";
final SendResult sendResult =
template.send(MessageBuilder.withTopic(topic).withBody(new Date());
上面的例子发送topic为date的消息,消息体为java.util.Date对象,
MetaqTemplate将调用messageBodyConverter将消息体的date对象转换为byte数组,构建message对象再发送。
因为我们使用了JavaSerializationMessageBodyConverter做消息体转换,
因此任何可序列化的JavaBean其实都可以作为消息体发送,使用MessageBuilder.withBody方法设置即可。
5 订阅消息
<!-- topics to be subscribed. -->
<bean id = "dateTopic" class="com.taobao.metamorphosis.client.extension.spring.MetaqTopic">
<!-- consumer group -->
<property name="group" value="testGroup"/>
<!-- topic -->
<property name="topic" value="date"/>
<!-- max buffer size to fetch messages -->
<property name="maxBufferSize" value="16384"/>
</bean>
配置了订阅者的分组,想要订阅的topic以及maxBufferSize大小。
6 继承DefaultMessageListener
package com.taobao.metamorphosis.example.spring;
import com.taobao.metamorphosis.client.extension.spring.DefaultMessageListener;
import com.taobao.metamorphosis.client.extension.spring.MetaqMessage;
import java.util.Date;
/**
* Process date messages listener.
*
* @author dennis
*
*/
public class DateMessageListener extends DefaultMessageListener<Date> {
@Override
public void onReceiveMessages(MetaqMessage<Date> msg) {
Date date = msg.getBody();
System.out.println("receive date message:" + date);
}
}
onReceiveMessages收到的是封装Message后的MetaqMessage对象,
它会使用你配置的消息体转换器将byte数组转换成java.util.Date对象,
你可以直接通过getBody获取上文metaqTemplate例子中发送的日期对象。
编写好消息处理器之后,需要配置消息处理器:
<!-- message listener -->
<bean id= "messageListener" class="com.taobao.metamorphosis.example.spring.DateMessageListener">
<!-- threads to process these messages. -->
<property name="processThreads" value="10"/>
</bean>
7 配置MessageListenerContainer
有了MetaqTopic
和消息处理器之后,我们就可以使用MessageListenerContainer
来真正地订阅消息了:
<!-- listener container to subscribe topics -->
<bean id ="listenerContainer" class="com.taobao.metamorphosis.client.extension.spring.MessageListenerContainer">
<property name="messageSessionFactory" ref="sessionFactory"/>
<property name="messageBodyConverter" ref="messageBodyConverter"/>
<property name="subscribers">
<map>
<entry key-ref="dateTopic" value-ref="messageListener"/>
</map>
</property>
</bean>
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
已为社区贡献1条内容
所有评论(0)