Springboot集成RocketMQ——简单使用
Springboot集成RocketMQ,进行简单使用
目录
1.MQ选型
目前市面上的MQ选型:主要分为3个类型
- Kafka:吞吐量大,且性能好,集群高可用;会丢失数据,功能较为单一(即场景单一,适合于数据量大且频繁,如日志分析等)
- RabbitMQ:消息可靠性高,功能全面;吞吐量较低,并发性能不高,消息积累会严重影响性能(即消息消费需较快)
- RocketMQ:高吞吐、高性能、高可用;官方文档及周边生态不成熟,客户端只支持java。
简而言之,Apache RocketMQ 自诞生以来,因其架构简单、业务功能丰富、具备极强可扩展性等特点被众多企业开发者以及云厂商广泛采用。
2.RocketMQ基本架构
RocketMQ的基本架构如下图所示:
Producer,生产者:消息的生产者,一般为上游系统。
Topic,主题:消息传输和存储的顶层容器,用于标识同一类业务逻辑的消息。 主题的作用主要如下:(1)定义数据的分类隔离:将不同业务类型的数据拆分到不同的主题中管理,通过主题实现存储的隔离性和订阅隔离性。(2)定义数据的身份和权限:由于消息本身是匿名无身份的,同一分类的消息使用相同的主题来做身份识别和权限管理。
Queue,队列:队列是 RocketMQ 中消息存储和传输的实际容器,也是 RocketMQ 消息的最小存储单元。 RocketMQ 的所有主题都是由多个队列组成,以此实现队列数量的水平拆分和队列内部的流式存储。
Subscription,订阅关系:订阅关系是 RocketMQ 系统中消费者获取消息、处理消息的规则和状态配置。订阅关系由消费者分组动态注册到服务端系统,并在后续的消息传输中按照订阅关系定义的过滤规则进行消息匹配和消费进度维护。通过配置订阅关系,可控制如下传输行为:(1)消息过滤规则:用于控制消费者在消费消息时,选择主题内的哪些消息进行消费。(2)消费状态:RocketMQ 服务端默认提供订阅关系持久化的能力,即消费者分组在服务端注册订阅关系后,当消费者离线并再次上线后,可以获取离线前的消费进度并继续消费。
ConsumerGroup,消费者分组:消费者分组是 RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。同一分组下的多个消费者将按照分组内统一的消费行为和负载均衡策略消费消息。
Comsumer,消费者:消息的消费者,即对消息进行接收和处理的相关下游系统。
一般来说,在RocketMQ中,生产者生产出消息后,指定对应的Topic、订阅关系(Tags参数)、队列(hashkey参数)后,将消息发送至RocketMQ客户端;消费者对RocketMQ客户端进行监听,当监听到有自己订阅的Topic下的消息时,进行接收并进行消费。
3.Springboot集成RocketMQ
首先,引入相关依赖:
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.1.0</version>
</dependency>
其次,对RocketMQ进行配置:
server:
port: 8080
spring:
application:
name: cloud-rocket-mq
rocketmq:
name-server: 127.0.0.1:9876
producer:
group: test-group #生产者组名,规定在一个应用里面必须唯一
send-message-timeout: 5000 #消息发送的超时时间,单位ms
retry-times-when-send-async-failed: 5 #异步消息发送失败重试的次数
RocketMQ支持我们异步发送普通消息。
普通消息是指:上游系统(生产者)将用户下单支付这一业务事件封装成独立的普通消息并发送至Apache RocketMQ服务端,下游按需从服务端订阅消息并按照本地消费逻辑处理下游任务。每个消息之间都是相互独立的,且不需要产生关联。
(1)生产者代码编写:
@Slf4j
@RestController
public class SendMessageController {
@Resource
private RocketMQTemplate rocketMQTemplate;
@PostMapping("/send")
public void send(@RequestParam("message") String message) throws InterruptedException {
//发送异步消息,参数:topic、消息
rocketMQTemplate.convertAndSend("topic_test:tagA",message+"tagA");
rocketMQTemplate.convertAndSend("topic_test:tagB",message+"tagB");
log.info("已发送异步消息");
}
}
(2)消费者代码编写:
@Service
@Slf4j
@RocketMQMessageListener(topic = "topic_test", consumerGroup = "consumer_topic_test",selectorExpression = "tagA || tagC")
public class MessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("收到消息:"+s);
}
}
(3)代码逻辑:
在生产者端,我们发送了一个消息到 topic_test 这一Topic下,并指定tagA订阅规则下的消费者组可以进行消费。
在消费者端,我们定义其消费者组名称,订阅关系为:订阅 topic_test 下的 tagA 或者 tagB消息,并进行消费。
可以看到,消费者成功监听到 topic_test:tagA 下的消息。
4.顺序消息
RocketMQ中可以发送顺序消息,即支持消费者按照发送消息的先后顺序获取消息,从而实现业务场景中的顺序处理。 相比其他类型消息,顺序消息在发送、存储和投递的处理过程中,更多强调多条消息间的先后顺序关系。
如上图所示,在分布式系统中,我们有多个生产者,执行同一套代码,顺序消息可以保证系统按照多个生产者发出消息的前后顺序,进行顺序消费,如:以证券、股票交易撮合场景为例,对于出价相同的交易单,坚持按照先出价先交易的原则,下游处理订单的系统需要严格按照出价顺序来处理订单。
代码:
//发送顺序消息,参数:topic,消息,hashkey,相同hashkey发送至同一个队列
rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 1).build(),"queue");
rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 2).build(),"queue");
5.延时消息
即消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。使用 RocketMQ 定时消息可以实现超时任务的检查触发。
代码:
//发送延时消息
rocketMQTemplate.syncSend("topic_test:tagA", MessageBuilder.withPayload(message).build(), 3000, 2);
其中,第四个参数为延时级别,分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2h
6.事务消息
分布式系统调用的特点为一个核心业务逻辑的执行,同时需要调用多个下游业务进行处理。因此,如何保证核心业务和多个下游业务的执行结果完全一致,是分布式事务需要解决的主要问题。
事务消息就是在普通消息基础上,支持二阶段的提交能力。将二阶段提交和本地事务绑定,实现全局提交结果的一致性。
简单来说就是,保证本地事务执行成功,消费者才会接受消息进行消费。
执行过程:
代码:
(1)生产者:
@RequestMapping("/send/transaction")
public void sendTransactionMessage(@RequestParam("msg") String msg){
//发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等
//参数一:topic;参数二:消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topic_test:tagA"
, MessageBuilder.withPayload(msg).build(),null);
//发送状态
String sendStatus = result.getSendStatus().name();
//本地事务执行状态
String localState = result.getLocalTransactionState().name();
log.info("发送状态:"+sendStatus+";本地事务执行状态"+localState);
}
(2)消费者端代码和上文相同,保持不变。
(3)本地事务:
/**
* 生产者消息监听器:
* 用于监听本地事务执行的状态和检查本地事务状态。
* @author qzz
*/
@RocketMQTransactionListener
@Slf4j
public class TransactionMsgConfig implements RocketMQLocalTransactionListener {
/**
* 执行本地事务(在发送消息成功时执行)
* @param message
* @param o
* @return commit or rollback or unknown
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
try {
//处理业务
String jsonStr = new String((byte[]) message.getPayload(), StandardCharsets.UTF_8);
log.info("执行本地业务,消息为:"+jsonStr);
//模拟网络波动
//Thread.sleep(35000);
//被除数为0,模拟业务出错
//int a = 10/0;
}catch (Exception e){
log.error("事务执行出错:"+e.getMessage());
//返回ROLLBACK状态,进行回滚
return RocketMQLocalTransactionState.ROLLBACK;
}
log.info("事务提交,消息正常处理");
//返回COMMIT状态的消息会立即被消费者消费到
return RocketMQLocalTransactionState.COMMIT;
}
/**
* 检查本地事务的状态
* @param message
* @return
*/
@Override
//超时、事务状态unknown等会调用该方法
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
log.info("消息回查");
return RocketMQLocalTransactionState.ROLLBACK;
}
}
我们需要编写一个本地事务执行类继承 RocketMQLocalTransactionListener 类。
在该类中我们对本地事务的异常进行捕捉,如果出现异常,则返回 ROLLBACK执行状态,顺利执行,则最终返回 COMMIT状态。
如果出现超时等网络波动或是UNKNOWN状态等情况,该类则会调用 checkLocalTransaction方法,返回方法中定义的事务状态。
(4)执行:
1.顺利执行,消费者成功消费:
可以看到,消息成功发送,消费者成功消费。
2.本地事务出现异常:
可以看到,本地事务抛出了异常,事务进行了回滚,消费者没有进行消费。
3.模拟超时
可以看到,当事务在一段时间内未返回对应事务状态 ,则会调用对应回查方法,直至事务成功返回事务执行状态。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)