前言

RocketMQ中的顺序消息是一种对消息发送和消费顺序有严格要求的消息类型,主要用于需要保持消息顺序处理的业务场景。

顺序消息分为局部顺序(小名叫做分区顺序)和全局顺序两种类型,下面我们来一起学习一下!

在这里插入图片描述

局部顺序(分区顺序)

局部顺序,也称为分区顺序,是指在同一个Topic下,通过Sharding Key(分片键)将消息分配到不同的队列中,每个队列内的消息按照严格的先进先出原则进行发布和消费,但不同队列之间的消息顺序不做要求。注意是同队列遵循,不同队列不做要求,这也是局部的规则。

在RocketMQ里,局部消息顺序的实现依赖于消息的分发机制和消费者的消费策略。

  • 消息生产者:在把消息发送到Broker之前,生产者会根据消息的业务类型(比如订单ID)进行分区,确保同一个业务类型的消息发送到同一个队列中。
  • Broker:Broker接收到消息后,会把消息存储在相应的队列中。由于同一个队列中的消息是有序的,因此可以确保局部消息的顺序性。
  • 消息消费者:消费者在消费消息时,根据消息的业务类型(比如订单ID)选择相应的消息队列进行消费,从而确保同一业务类型的消息按顺序被处理。

比如我们以一个案例举个栗子:

生产者生产消息代码(流程已注释):

// 创建一个消息生产者实例,指定生产者组名"example_group_name"  
MQProducer producer = new DefaultMQProducer("example_group_name");  
//NameServer的地址,已经在环境变量中配置好了:NAMESRV_ADDR=192.168.220.135:9876  
// RocketMQ客户端库会自动从环境变量中读取这个地址  
// 启动消息生产者  
producer.start();  
// 发送10个订单,每个订单包含6个步骤的消息  
for (int i = 0; i < 10; i++) {  
    int orderId = i; // 设置订单ID  
  
    // 对于每个订单,发送6个步骤的消息  
    for(int j = 0 ; j <= 5 ; j ++){  
        // 创建一个消息实例,指定Topic、Tag、Key和消息体  
        // Topic为"OrderTopicTest",Tag为"order_"+orderId,Key为"KEY"+orderId,消息体为字符串"order_"+orderId+" step "+j的字节数组  
        Message msg = new Message("OrderTopicTest", "order_"+orderId, "KEY" + orderId,  
                        ("order_"+orderId+" step " + j).getBytes(RemotingHelper.DEFAULT_CHARSET));  
        // 这里通过orderId对队列列表的大小取模来决定消息发送到哪个队列,实现简单的负载均衡  
        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {  
            @Override  
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {  
                Integer id = (Integer) arg; // arg传递的是orderId  
                int index = id % mqs.size(); // 根据orderId和队列数量计算索引  
                return mqs.get(index); // 返回计算得到的队列  
            }  
        }, orderId); // 传递orderId作为arg  
        // 打印发送结果  
        System.out.printf("%s%n", sendResult);  
    }  
}  
// 关闭生产者实例,释放资源  
producer.shutdown();

配置NAMESRV_ADDR环境变量:

在这里插入图片描述
填入NAMESRV_ADDR=192.168.220.135:9876:
在这里插入图片描述

运行OrderProducer:

在这里插入图片描述
消费者者消费消息代码(流程已注释):

// 创建默认的消息推送消费者实例,消费者组名"example_group_name"  
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("example_group_name");  
// 设置消费者从哪个位置开始消费消息,这里我们设置为从第一个偏移量开始消费  
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);  
// 订阅"OrderTopicTest" Topic,使用"*"订阅此Topic下的所有Tag  
consumer.subscribe("OrderTopicTest", "*");  
// 这里使用有序消息监听器,确保消息按发送顺序被消费  
consumer.registerMessageListener(new MessageListenerOrderly() {  
    @Override  
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,  
                                               ConsumeOrderlyContext context) {  
        // 自动提交消费位移  
        context.setAutoCommit(true);    
        // 遍历消费到的消息列表  
        for(MessageExt msg:msgs){  
            // 打印消息内容  
            System.out.println("消息内容:"+new String(msg.getBody()));  
        }  
        // 消息消费成功,返回SUCCESS状态  
        return ConsumeOrderlyStatus.SUCCESS;  
    }  
});  
// 启动消费者实例  
consumer.start();  
// 输出消费者启动的提示信息  
System.out.printf("Consumer Started.%n");

同样在OrderConsumer类中配置NAMESRV_ADDR地址:

在这里插入图片描述
配置完成后,运行这个类:

在这里插入图片描述
以order_5为例可以看出结果,局部消息就是可以保证队列内的消息有序,但是无法保证不同队列中的消息有序!!!

在使用RocketMQ的局部消息时也需要注意:

  1. 局部顺序消息只保证同一队列内的消息顺序,不同队列之间的消息顺序不做要求
  2. 生产者需要保证同一业务单元的消息使用相同的Sharding Key
  3. 消费者需要确保同一队列内的消息由同一个消费线程或消费实例处理。

全局顺序

全局顺序是指在同一个Topic下,所有的消息都按照严格的先进先出原则进行发布和消费,也就是说整个Topic内的消息顺序都是一致的。

全局顺序适用于性能要求不高,并且业务上需要严格保证所有消息顺序的场景。所以,由于全局顺序消息的性能较低(因为只有一个队列处理所有消息),所以实际应用中较少使用。

全局顺序消息实际上是一种特殊的分区顺序消息,Topic中只有一个队列。因此,全局顺序消息的实现原理与分区顺序消息相同,只是将分区数量设置为1。

使用时需要注意:

  • 全局顺序消息的性能较低,因为所有的消息都在一个队列中处理。
  • 如果业务场景对性能要求较高,建议使用局部顺序消息代替全局顺序消息。

乱序消息

当多个消息队列并行消费,消费者部署在多台机器上,组成Consumer Group时,每台机器从可能不同的消息队列中消费消息,从而导致乱序。

或者Broker的宕机或网络延迟可能导致消息重试到其他消息队列,从而破坏原有顺序。

消费者消费消息不需要关注消息的顺序。消费者使用MessageListenerConcurrently类做消息监听。

比如我们将局部消息中消费者的代码修改一下:

//这里使用并发消息监听器,允许并发地消费消息  
consumer.registerMessageListener(new MessageListenerConcurrently() {  
    @Override  
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {  
        // 遍历消费到的消息列表  
        for(MessageExt msg:msgs){  
            // 打印消息内容  
            System.out.println("消息内容:"+new String(msg.getBody()));  
        }   
        // 消息消费成功,返回CONSUME_SUCCESS状态  
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;  
    }  
}); 

运行代码,如下图:

在这里插入图片描述

消费者在消费消息时并不是有序的!!!

文章总结

RocketMQ中的顺序消息分为局部顺序和全局顺序,乱序消息等等。

局部顺序消息通过Sharding Key将消息分配到不同的队列中,每个队列内的消息保持顺序性;而全局顺序消息则整个Topic内的消息都保持顺序性,但性能较低。在选择顺序消息类型时,需要根据实际业务需求和性能要求进行权衡。

本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!

在这里插入图片描述

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐