前言:

上一篇我们分享了 Spring Boot 整合 RocketMQ 完成单向消息、同步消息、异步消息发送的过程,本篇我们来分享一下 RocketMQ 延时消息的使用。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 整合 RocketMQ 之普通消息

延迟消息使用场景:

在分布式定时调度触发、任务超时处理等场景,需要实现精准、可靠的定时事件触发。使用 RocketMQ 的定时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

典型场景一:分布式定时调度

在这里插入图片描述

在分布式定时调度场景下,需要实现各类精度的定时任务,例如每天5点执行文件清理,每隔2分钟触发一次消息推送等需求。传统基于数据库的定时调度方案在分布式场景下,性能不高,实现复杂。基于 Apache RocketMQ 的定时消息可以封装出多种类型的定时触发器。

典型场景二:任务超时处理

在这里插入图片描述

以电商交易场景为例,订单下单后暂未支付,此时不可以直接关闭订单,而是需要等待一段时间后才能关闭订单。使用 Apache RocketMQ 定时消息可以实现超时任务的检查触发。

  • 精度高、开发门槛低:基于消息通知方式不存在定时阶梯间隔。可以轻松实现任意精度事件触发,无需业务去重。
  • 高性能可扩展:传统的数据库扫描方式较为复杂,需要频繁调用接口扫描,容易产生性能瓶颈。 Apache RocketMQ 的定时消息具有高并发和水平扩展的能力。

注意:定时消息和延时消息本质相同,都是服务端根据消息设置的定时时间在某一固定时刻将消息投递给消费者消费。

什么是定时消息

定时消息是 Apache RocketMQ 提供的一种高级消息类型,消息被发送至服务端后,在指定时间后才能被消费者消费。通过设置一定的定时时间可以实现分布式场景的延时调度触发效果。

  • Apache RocketMQ 定时消息设置的定时时间是一个预期触发的系统时间戳,延时时间也需要转换成当前系统时间后的某一个时间戳,而不是一段延时时长。
  • 定时时间的格式为毫秒级的Unix时间戳,您需要将要设置的时刻转换成时间戳形式。
  • 定时时间必须设置在定时时长范围内,超过范围则定时不生效,服务端会立即投递消息。
  • 定时时长最大值默认为24小时,不支持自定义修改,更多信息,请参见参数限制。
  • 定时时间必须设置为当前时间之后,若设置到当前时间之前,则定时不生效,服务端会立即投递消息。

示例如下:

  • 定时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望消息在下午19:20:00定时投递,则定时时间为2022-06-09 19:20:00,转换成时间戳格式为1654773600000。
  • 延时消息:例如,当前系统时间为2022-06-09 17:30:00,您希望延时1个小时后投递消息,则您需要根据当前时间和延时时长换算成定时时刻,即消息投递时间为2022-06-09 18:30:00,转换为时间戳格式为1654770600000。

定时消息生命周期

在这里插入图片描述

  • 初始化:消息被生产者构建并完成初始化,待发送到服务端的状态。
  • 定时中:消息被发送到服务端,和普通消息不同的是,服务端不会直接构建消息索引,而是会将定时消息单独存储在定时存储系统中,等待定时时刻到达。
  • 待消费:定时时刻到达后,服务端将消息重新写入普通存储引擎,对下游消费者可见,等待消费者消费的状态。
  • 消费中:消息被消费者获取,并按照消费者本地的业务逻辑进行处理的过程。 此时服务端会等待消费者完成消费并提交消费结果,如果一定时间后没有收到消费者的响应,Apache RocketMQ会对消息进行重试处理。
  • 消费提交:消费者完成消费处理,并向服务端提交消费结果,服务端标记当前消息已经被处理(包括消费成功和失败)。 Apache RocketMQ 默认支持保留所有消息,此时消息数据并不会立即被删除,只是逻辑标记已消费。消息在保存时间到期或存储空间不足被删除前,消费者仍然可以回溯消息重新消费。
  • 消息删除:Apache RocketMQ按照消息保存机制滚动清理最早的消息数据,将消息从物理文件中删除。

RocketMQ 实现定时消息发送

RocketMQ 支持定时的延迟消息,但不支持任意时间精度的延迟消息,免费版共支持 18 中延迟时间的消息,如下:

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

需要注意的是 RocketMQ 的延迟消息使用的是 delayLevel 延迟级别来表示的,并不是直接使用时间,例如:delayLevel = 1 延迟时间为 1秒,delayLevel = 2 延迟时间为 5秒,delayLevel = 3 延迟时间为 10秒,delayLevel = 4 延迟时间为 30秒,以此类推。。。

RocketMQ 延迟消息 API

延迟消息的 API 没有什么特殊之处,就是在之前的同步、异步发送消息的 API 上加了一个 delayLevel 的参数,如下:

在这里插入图片描述

带有 delayLevel 参数的 API 就是发送延迟消息的 API,我们发现同时还带有 long 类型的 timeout 参数,这个是设置消息发送超时时间,默认是3秒,我们也可以自行设置。

延迟消息生产者代码

这里我们分别分享了同步延迟消息和异步延迟消息的生产者代码。

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


@Slf4j
@Component
public class DelayMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * @param message:
     * @date 2024/10/10 17:47
     * @description 同步延时消息发送 只有一个方法支持
     */
    public void sendSyncDelayMessage(String message) {
        //延迟消息支持的时间 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        //delayLevel 可以理解为索引 并不是时间的意思
        rocketMqTemplate.syncSend("delay-topic", MessageBuilder.withPayload("延迟1秒消息测试").build(), 3000, 1);
        rocketMqTemplate.syncSend("delay-topic", MessageBuilder.withPayload("延迟10秒消息测试").build(), 3000, 3);
        rocketMqTemplate.syncSend("delay-topic", MessageBuilder.withPayload("延迟30秒消息测试").build(), 3000, 4);
        log.info("同步延时消息发送成功,当前时间戳:{}", System.currentTimeMillis());
    }

    /**
     * @param message:
     * @date 2024/10/10 17:47
     * @description 异步延时消息发送 只有一个方法支持
     */
    public void sendAsyncDelayMessage(String message) {
        rocketMqTemplate.asyncSend("delay-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("异步延时消息发送成功,当前时间戳:{}", System.currentTimeMillis());
            }

            @Override
            public void onException(Throwable throwable) {
                log.error("异步延时消息发送失败");
            }
        }, 3000, 3);
    }

}

延迟消息消费者代码

延迟消息消费者代码没有什么特殊之处,和同步消息的消费者代码一样。

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "delay-group", topic = "delay-topic")
public class DelayMessageConsumer implements RocketMQListener<String> {

    @Override
    public void onMessage(String message) {
        log.info("延时消息消费成功,当前时间戳:{},消息类容:{}", System.currentTimeMillis(), message);
    }
}

触发延迟消息发送代码

@GetMapping("/send-sync-delay")
public String sendSyncDelayMessage(@RequestParam String message){
	delayMessageProducer.sendSyncDelayMessage(message);
	return "success";
}

@GetMapping("/send-async-delay")
public String sendAsyncDelayMessage(@RequestParam String message){
	delayMessageProducer.sendAsyncDelayMessage(message);
	return "success";
}

延迟消息结果验证

同步延迟消息验证:

2024-10-14 16:44:22.366  INFO 18180 --- [nio-8086-exec-2] c.o.s.r.producer.DelayMessageProducer    : 同步延时消息发送成功,当前时间戳:1728895462366
2024-10-14 16:44:23.350  INFO 18180 --- [MessageThread_1] c.o.s.r.consumer.DelayMessageConsumer    : 延时消息消费成功,当前时间戳:1728895463350,消息类容:延迟1秒消息测试
2024-10-14 16:44:32.358  INFO 18180 --- [MessageThread_2] c.o.s.r.consumer.DelayMessageConsumer    : 延时消息消费成功,当前时间戳:1728895472358,消息类容:延迟10秒消息测试
2024-10-14 16:44:52.378  INFO 18180 --- [MessageThread_3] c.o.s.r.consumer.DelayMessageConsumer    : 延时消息消费成功,当前时间戳:1728895492378,消息类容:延迟30秒消息测试

验证结果符合预期。

异步延迟消息验证:

2024-10-14 16:45:16.920  INFO 18180 --- [ublicExecutor_1] c.o.s.r.producer.DelayMessageProducer    : 异步延时消息发送成功,当前时间戳:1728895516920
2024-10-14 16:45:26.916  INFO 18180 --- [MessageThread_4] c.o.s.r.consumer.DelayMessageConsumer    : 延时消息消费成功,当前时间戳:1728895526916,消息类容:send-async-delay

验证结果符合预期。

注意事项

  • RocketMQ 定时消息的定时时长参数精确到毫秒级,但是默认精度为1000ms,即定时消息为秒级精度。
  • RocketMQ 定时消息的状态支持持久化存储,系统由于故障重启后,仍支持按照原来设置的定时时间触发消息投递,若存储系统异常重启,可能会导致定时消息投递出现一定延迟。

总结:本篇简单分享了 RokcetMQ 延迟/定时消息的使用,实际业务代码中使用只需加入自己的业务即可,希望可以帮助到有需要的小伙伴,下一篇我们继续分享 RocketMQ 的顺序消息使用。

如有不正确的地方欢迎各位指出纠正。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐