前言:

上一篇我们分享了 RocketMQ 消息重试的一些基本原理,本篇我们基于 Spring Boot 整合 RocketMQ 来分享一下 RocketMQ 消息基于手动提交的案例,在分享手动进行消息 ACK 中也会分享消息重试的使用。

RocketMQ 系列文章传送门

RocketMQ 的介绍及核心概念讲解

Spring Boot 整合 RocketMQ 之普通消息

Spring Boot 整合 RocketMQ 之定时/延时消息

Spring Boot 整合 RocketMQ 之顺序消息

Spring Boot 整合 RocketMQ 之事务消息

RocketMQ 之消息重试机制

同步消息手动提交 ACK 案例分享

同步消息 Producer 消息发送代码案例

同步消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下同步消息 Producer 消息发送代码,具体如下:

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;

/**
 * @ClassName: OneWayMessageProducer
 * @Author: zhangyong
 * @Date: 2024/9/27 17:27
 * @Description: 同步消息发送者
 */
@Slf4j
@Component
public class SyncMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * @param message:
     * @date 2024/10/10 17:47
     * @description 同步消息发送
     */
    public void sendSyncMessage(String message) {
        rocketMqTemplate.syncSend("sync-topic", MessageBuilder.withPayload(message).build());
    }

}

同步消息手动 ACK Consumer 端代码案例分享

RocketMQ 消息手动 ACK 就不能再使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式来实现了,RocketMQListener 的源码如下:

package org.apache.rocketmq.spring.core;

public interface RocketMQListener<T> {
    void onMessage(T var1);
}

我们可以看到 RocketMQListener 中只提供了一个 onMessage 方法,且返回值为 void,不接受返回值,因此没办法进行手动 ACK。

这里我们使用 DefaultMQPushConsumer 来实现消息的手动 ACK,DefaultMQPushConsumer 实现了 MQPushConsumer 接口,具体实现代码如下:

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ClassName: ManualCommitSyncCounsumer
 * @Author: Author
 * @Date: 2024/10/16 16:23
 * @Description:
 */
@Slf4j
@Component
public class ManualCommitSyncCounsumer {


    /**
     * @date 2024/10/16 17:19
     * @description 同步消息消费成功 手动提交
     */
    @PostConstruct
    public void onSyncMessage() throws MQClientException {
        //消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sync-group");
        //设置最大消息重试次数
        //consumer.setMaxReconsumeTimes(2);
        //RocketMQ 地址可以 可以用配置文件
        consumer.setNamesrvAddr("xxx-xxx-rocketmq.xxx.com:19876");
        //订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息
        consumer.subscribe("sync-topic", "*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            //存储消息id 和消费次数的关系
            final Map<String, Integer> map = new HashMap<>();
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String dateStr = sdf.format(new Date());
                //消息处理
                MessageExt messageExt = list.get(0);
                String message = new String(messageExt.getBody());
                String msgId = messageExt.getMsgId();
                //获取消息消费次数
                Integer count = map.get(msgId);
                if (count == null) {
                    count = 0;
                }
                //次数+1
                count = count + 1;
                //覆盖map
                map.put(msgId, count);
                if (count > 2) {
                    log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回消费消费成功,消息内容:{}", dateStr, msgId, count, message);
                    //消息消费成功后移除
                    map.remove(msgId);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息内容:{}", dateStr, msgId, count, message);
                //模拟除 0 异常
                //int a = 1 / 0;
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,消息消费成功,消息内容:{}", dateStr, msgId, count, message);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                //return null;
            }
        });
        consumer.start();
    }

}

RocketMQ 同步消费端手动 ACK 结果验证:

正常消费返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS

2024-10-19 10:23:07.575  INFO 10820 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:23:07.575  INFO 10820 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:23:07,当前消息id:7F0000012A4418B4AAC25EECF4F50000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了

没有模拟除 0 异常,正常消费返回,一次就消费成功,结果符合预期。

模拟除 0 异常

2024-10-19 10:19:59.026  INFO 34052 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:19:59,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:20:09.029  INFO 34052 --- [MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:20:09,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:20:39.032  INFO 34052 --- [MessageThread_4] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:20:39,当前消息id:7F000001850418B4AAC25EEA14AF0003,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了

可以看到消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。

正常消费但是返回 NULL

2024-10-19 10:25:47.338  INFO 27256 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:47.338  INFO 27256 --- [MessageThread_1] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:47,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【1】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:57.344  INFO 27256 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:25:57.344  INFO 27256 --- [MessageThread_2] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:25:57,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【2】次消费,消息消费成功,消息内容:小明同学你妈喊你回家吃饭了
2024-10-19 10:26:27.347  INFO 27256 --- [MessageThread_3] c.o.s.r.c.ManualCommitSyncCounsumer      : 当前时间:2024-10-19 10:26:27,当前消息id:7F0000016A7818B4AAC25EEF65120000,是第【3】次消费,直接返回消费消费成功,消息内容:小明同学你妈喊你回家吃饭了

可以看到返回 NULL 和模拟除 0 异常是一样的效果,消息在不断重试消息,分别是 2024-10-19 10:19:59、2024-10-19 10:20:09、2024-10-19 10:20:39 触发了消费,时间间隔也是 10秒、30秒,结果符合预期。

上面的案例我们没有控制消息消费重试次数,我们可以设置一个消息消费重试次数,代码如下:

//设置最大消息重试次数
consumer.setMaxReconsumeTimes(2);

使用 @RocketMQMessageListener 注解 + 实现 RocketMQListener 接口的方式消费消息的时候消费失败也会自动进行重试,因此我们一定要控制好重试次数,可以 @RocketMQMessageListener 注解的 maxReconsumeTimes 来控制重试次数(高版本的 starter 才有该属性)。

不管使用哪种方式进行消费,对于达到重试次数还是消费失败的消息一般有两种处理方式,分別是:

  • 直接返回消息消费成功,使用本地库记录消息进行重新推送或者人工介入处理。
  • 不进行处理,让消息进入死信队列,然后去监听死信队列进行处理。

顺序消息 Producer 消息发送代码案例

顺序消息发送的代码前面分享过,感兴趣的朋友也可以通过上面的系列文章链接去查看,这里还是简单的分享一下顺序消息 Producer 消息发送代码,具体如下:

package com.order.service.rocketmq.producer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;


/**
 * @ClassName: OneWayMessageProducer
 * @Author: Author
 * @Date: 2024/9/27 17:27
 * @Description: 顺序消息发送者
 */
@Slf4j
@Component
public class OrderlyMessageProducer {

    @Autowired
    private RocketMQTemplate rocketMqTemplate;

    /**
     * @date 2024/10/11 15:45
     * @description 同步顺序消息
     */
    public void syncSendOrderly() {
        //hashKey 用来计算决定消息发送到哪个队列  一般是订单 ID 等信息  这里我们模拟订单 ID 发送
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 创建").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 支付").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:666666 确认收货").build(), "666666");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 创建").build(), "888888");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 支付").build(), "888888");
        rocketMqTemplate.syncSendOrderly("orderly-topic", MessageBuilder.withPayload("同步顺序消息,订单编号:888888 确认收货").build(), "888888");
    }

}

顺序消息 Consumer 消息消费代码案例

同步消息手动 ACK 我们是注册了一个 MessageListenerConcurrently 消息监听器,顺序消息的手动 ACK 我们需要注册一个 MessageListenerOrderly 的消息监听器,具体代码如下:

package com.order.service.rocketmq.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
 * @ClassName: ManualCommitOrderlyMessageConsumer
 * @Author: Author
 * @Date: 2024/10/10 17:35
 * @Description: 顺序消息消费
 */
@Slf4j
@Component
public class ManualCommitOrderlyMessageConsumer {

    /**
     * @date 2024/10/16 17:19
     * @description 顺序消息消费成功 手动提交
     */
    @PostConstruct
    public void onOrderlyMessage() throws MQClientException {
        //消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("orderly-group");
        //设置最大消息重试次数
        //consumer.setMaxReconsumeTimes(2);
        //RocketMQ 地址可以 可以用配置文件
        consumer.setNamesrvAddr("dev-ztn-rocketmq.eminxing.com:19876");
        //订阅一个或多个topic,并指定tag过滤条件,这里指定 * 表示接收所有 tag 的消息
        consumer.subscribe("orderly-topic", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            //存储消息id 和消费次数的关系
            final Map<String, Integer> map = new HashMap<>();
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                String dateStr = sdf.format(new Date());
                //消息处理
                MessageExt messageExt = list.get(0);
                String message = new String(messageExt.getBody());
                String msgId = messageExt.getMsgId();
                //获取消息消费次数
                Integer count = map.get(msgId);
                if (count == null) {
                    count = 0;
                }
                //次数+1
                count = count + 1;
                //覆盖map
                map.put(msgId, count);
                if (count > 2) {
                    log.info("当前时间:{},当前消息id:{},是第【{}】次消费,直接返回顺序消费消费成功,消息内容:{}", dateStr, msgId, count, message);
                    //消息消费成功后移除
                    map.remove(msgId);
                    return ConsumeOrderlyStatus.SUCCESS;
                }
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息内容:{}", dateStr, msgId, count, message);
                //模拟除 0 异常
                //int a = 1 / 0;
                log.info("当前时间:{},当前消息id:{},是第【{}】次消费,顺序消息消费成功,消息内容:{}", dateStr, msgId, count, message);
                return ConsumeOrderlyStatus.SUCCESS;
                //return null;
            }
        });
        consumer.start();
    }

}

RocketMQ 顺序消费端手动 ACK 结果验证:

正常消费返回 ConsumeOrderlyStatus.SUCCESS

2024-10-19 13:46:44.293  INFO 29348 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:46:44.294  INFO 29348 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EBC0000,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC00001,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:46:44.295  INFO 29348 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC30002,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:46:44.298  INFO 29348 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:46:44.298  INFO 29348 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC50003,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:46:44.300  INFO 29348 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:46:44.300  INFO 29348 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC70004,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:46:44.302  INFO 29348 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:46:44.303  INFO 29348 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:46:44,当前消息id:7F00000172A418B4AAC25FA75EC90005,是第【1】次消费,顺序消息消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,结果符合预期。

模拟除 0 异常

2024-10-19 13:50:45.587  INFO 27604 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:45,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:46.588  INFO 27604 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:46,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:47.592  INFO 27604 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D4F0000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 13:50:47.592  INFO 27604 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:47,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:48.593  INFO 27604 --- [orderly-group_7] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:48,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:49.595  INFO 27604 --- [orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D540001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 13:50:49.595  INFO 27604 --- [orderly-group_8] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:49,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:50.610  INFO 27604 --- [orderly-group_9] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:50,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:51.611  INFO 27604 --- [rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D550002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 13:50:51.611  INFO 27604 --- [rderly-group_10] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:51,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:52.617  INFO 27604 --- [rderly-group_11] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:52,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:53.618  INFO 27604 --- [rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D580003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 13:50:53.618  INFO 27604 --- [rderly-group_12] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:53,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:54.620  INFO 27604 --- [rderly-group_13] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:54,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:55.627  INFO 27604 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D590004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 13:50:55.627  INFO 27604 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:55,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:50:56.629  INFO 27604 --- [rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:56,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 13:50:57.631  INFO 27604 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 13:50:57,当前消息id:7F0000016BD418B4AAC25FAB0D5B0005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,结果符合预期。

这里需要注意一点的时候上一条消息没有消费成功时,后面一条消息永远不会被消费,这个从我们的日志中也能够体现出来,因此我们在使用顺序消息的时候一定要注意消息的重试次数,消息重试次数我们可以通过自己的业务来判断消费几次,也可以使用 RocketMQ 来实现,代码如下:

//设置最大消息重试次数
consumer.setMaxReconsumeTimes(2);

模拟返回 NULL

2024-10-19 14:00:51.484  INFO 22728 --- [rderly-group_14] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:51,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:52.485  INFO 22728 --- [rderly-group_15] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:52,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:53.492  INFO 22728 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C170000,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 创建
2024-10-19 14:00:53.492  INFO 22728 --- [rderly-group_16] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:53,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:54.492  INFO 22728 --- [rderly-group_17] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:54,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:55.494  INFO 22728 --- [rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1B0001,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 支付
2024-10-19 14:00:55.494  INFO 22728 --- [rderly-group_18] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:55,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:56.495  INFO 22728 --- [rderly-group_19] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:56,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:57.497  INFO 22728 --- [rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1D0002,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:666666 确认收货
2024-10-19 14:00:57.497  INFO 22728 --- [rderly-group_20] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:57,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:58.510  INFO 22728 --- [orderly-group_1] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:58,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:59.520  INFO 22728 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C1F0003,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 创建
2024-10-19 14:00:59.520  INFO 22728 --- [orderly-group_2] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:00:59,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:00.521  INFO 22728 --- [orderly-group_3] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:00,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:01.523  INFO 22728 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C200004,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 支付
2024-10-19 14:01:01.523  INFO 22728 --- [orderly-group_4] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:01,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【1】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 14:01:02.525  INFO 22728 --- [orderly-group_5] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:02,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【2】次消费,顺序消息内容:同步顺序消息,订单编号:888888 确认收货
2024-10-19 14:01:03.527  INFO 22728 --- [orderly-group_6] s.r.c.ManualCommitOrderlyMessageConsumer : 当前时间:2024-10-19 14:01:03,当前消息id:7F00000158C818B4AAC25FB44C220005,是第【3】次消费,直接返回顺序消费消费成功,消息内容:同步顺序消息,订单编号:888888 确认收货

可以看到是按发送顺序消费的,且每条消息都是经过第三次消费后才返回成功的,每次消费具体上一次消费的时间间隔是 1 秒,和模拟除 0异常 一样的结果,结果符合预期。

总结:RocketMQ 消息重试次数可以通过 @RocketMQMessageListener 注解的 maxReconsumeTimes 属性来设置,也可以通过编码来设置,不管采用何种方式来设置,我们都要在业务编码中做好处理,过多重视造成的性能问题,以及没有合理处理消费失败的消息造成的消息丢失问题,对于顺序消息我们更要慎重对待,顺序消息回到导致消息阻塞。

如有不正确的地方欢迎各位指出纠正。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐