一:普通消息

RocketMQ提供了三种方式来发送普通消息:可靠同步发送、可靠异步发送、单向发送。

1.1 可靠同步发送

同步发送是指发送消息后必须等待RocketMQ服务返回发送的结果,这里会一直同步阻塞,直到拿到RocketMQ服务返回发送的结果才继续往下执行代码。同步发送一般应用于对发送成功可靠性要求很严格的场景。

// 同步发送会有发送结果返回值
// rocketMQTemplate 使用冒号将topic和tag分割
SendResult sendResult = rocketMQTemplate.syncSend("test-topic:test-tag", "test msg");

1.2 可靠异步发送

异步发送是指发送消息后不需要等待RocketMQ服务器返回的发送结果,而是直接执行后面的逻辑。发送方通过设置回调接口来接收RocketMQ服务器异步返回的发送结果,并根据具体的发送结果进行相应处理。

rocketMQTemplate.asyncSend("test-topic:test-tag", "test message", new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        // 成功回调
        System.out.println("回调结果: " + sendResult);
    }

    @Override
    public void onException(Throwable throwable) {
        // 异常回调
        System.out.println(throwable);
    }
});

System.out.println("----------");

先打印 ------ 后打印回调函数。
在这里插入图片描述

1.3 单向发送

单向发送只负责发送消息,不等待RocketMQ服务器返回的发送结果,也不提供回调函数来接收RocketMQ服务器的响应结果,只负责发送至于发送成功还是发送失败我不关心。单向发送通常用于对可靠性要求不高的场景。

rocketMQTemplate.sendOneWay("test-topic:test-tag", "test msg");

注意:普通消息(以上三种)不保证发送消息的顺序和消费者消费的顺序一致性。

二:顺序消息

一个topic默认有4个消息队列(Message Queue),生产者在发送同一个topic的多个消息会被存储到不同的消息队列中,默认是按照轮询的模式来依次存储到每个队列中的,而消费消息首先从哪个队列中获取消息是不确定的,这导致发送的顺序和消费的消息很可能会不一致。

在这里插入图片描述

如果想要保证发送消息的顺序和消费消息的顺序要一致,只需要保证将消息都发送到同一个消息队列上即可,而不是轮询的放到每个队列上。顺序消息只需要在原来的发送方法后面增加Orderly后缀(syncSendOrderly、asyncSendOrderly、sendOneWayOrderly),并在最后传入一个值Hash Key,RocketMQ会根据这个值计算要发送到哪一个队列上。

String orderId = order.getId().toString();
for (int i = 0; i < 10; i++) {
	// Hash Key 一般是一组顺序消息的对应的一个唯一值,如 张三创建订单、张三订单付款、张三订单完成,这里hash key就用张三对应的订单id或者订单号
    rocketMQTemplate.sendOneWayOrderly("test-topic", order, orderId);
}

10个消息都发送到queueId=3的队列上了。
在这里插入图片描述

消费消息时需要指定消费者顺序(单线程)获取消息。

@Component
@RocketMQMessageListener(consumerGroup = "testConsumerGroup", topic = "test-topic", consumeMode = ConsumeMode.ORDERLY)
public class TestTopicConsumer1Listener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("TestTopicConsumerListener 消费消息:" + message);
    }
}

顺序消息:保证同一组内的消息按顺序消费。
在这里插入图片描述

三:延迟消息

延迟消息是指当生产者发送消息到Broker中不会被消费者立即消费,而是延迟指定的时间消费,注意RocketMQ不支持自定义延迟任意时间,只提供了18个固定时间级别供选择,级别从1开始到18,1s, 5s, 10s, 30s, 1m, 2m, 3m, 4m, 5m , 6m, 7m, 8m, 9m, 10m, 20m, 30m, 1h, 2h。注意由于网络延迟,所以延迟消息消费也会大于指定的延迟时间。延迟消息在发送时只需要指定delayLevel参数即可。

public SendResult syncSend(String destination, Message<?> message, long timeout, int delayLevel);
GenericMessage message = new GenericMessage(order.toString());
rocketMQTemplate.syncSend("test-topic", message, 3000, 3);

四:批量消息

使用Template发送批量消息时,发送的消息必须是org.springframework.messaging.Message的子类。批量发消息对消息的内容长度有限制,最大为4M,如果超过4M只能分为多批次发送。

// 方式一
List<GenericMessage> list = new ArrayList<>();
list.add(new GenericMessage("1"));
list.add(new GenericMessage("2"));

rocketMQTemplate.syncSend("test-topic", list);

// 方式二:使用生产者批量发送,Message是org.apache.rocketmq.common.message.Message
rocketMQTemplate.getProducer.send(Collection<Message> msgs);

五:消费消息模式

消息模式有两种,默认是负载均衡模式:

  • 负载均衡(MessageModel.CLUSTERING):多个消费者共同瓜分所有消息,一个消息只能被一个消费者消费掉。
  • 广播模式(MessageModel.BROADCASTING):每个消费者都会消费同样的消息。一个消息会被所有消费者消费掉。

消费模式使用messageModel参数来指定,注意:如果要测试多个消费者,不能写多个监听器,否则SpringBoot启动失败,只能写一个监听器,然后分别使用不同的端口分别启动Spring Boot。

@Component
@RocketMQMessageListener(consumerGroup = "testConsumerGroup", topic = "test-topic", messageModel = MessageModel.BROADCASTING)
public class TestTopicConsumer1Listener implements RocketMQListener<String> {
    @Override
    public void onMessage(String message) {
        System.out.println("TestTopicConsumerListener 消费消息:" + message);
    }
}

在这里插入图片描述
在这里插入图片描述

六:过滤消息

消费者过滤消息可以通过两种方式:

  1. Tag,生产者发送消息时可以指定Tag,消费者消费消息可以指定Tag过滤,也可以使用通配符*表示所有Tag,也可以使用||或来表示多个Tag,使用Template发送带Tag的消息是使用冒号分隔跟在主题后面。
  2. SQL 基本语法

6.1 Tag过滤

selectorType默认是Tag,selectorExpression默认是*,可以通过或符号||来选择消费多个Tag。

@Component
@RocketMQMessageListener(
        consumerGroup = "testConsumerGroup",
        topic = "test-topic",
        selectorType = SelectorType.TAG,
        selectorExpression = "test-tag || dev-tag")
public class TestTopicConsumerListener implements RocketMQListener<MessageExt> {
    @Override
    public void onMessage(MessageExt messageExt) {
        System.out.println(messageExt);
        System.out.println("body:" + new String(messageExt.getBody()));
    }
}

6.2 SQL92过滤

在发送消息的时候RocketMQ运行携带一个Map,可以通过Map中的每个值作为条件来过滤消息。SQL92过滤可以使用SQL中常用的Where条件来过滤要消费的消息,支持如下常用的SQL条件:

AND, OR
>, >=, <, <=, =
BETWEEN A AND B, equals to >=A AND <=B
NOT BETWEEN A AND B, equals to >B OR <A
IN ('a', 'b'), equals to ='a' OR ='b', this operation only support String type.
IS NULL, IS NOT NULL, check parameter whether is null, or not.
=TRUE, =FALSE, check parameter whether is true, or false.

样例:
(a > 10 AND a < 100) OR (b IS NOT NULL AND b=TRUE)

RocketMQ默认是关闭SQL92过滤方式的,如果需要需要通过配置文件配置,然后重启RocketMQ。org.apache.rocketmq.client.exception.MQClientException: CODE: 1 DESC: The broker does not support consumer to filter message by SQL92

在这里插入图片描述
启动时指定broker.conf

nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

通过控制台查看SQL92过滤方式是否启用。
在这里插入图片描述

// 方式一:使用Spring封装方式携带属性是通过setHeader来设置的
// setHeader 就是设置userProperty,注意header不能使用id,timestamp作为key
rocketMQTemplate.syncSend("test-filter-topic",
        MessageBuilder.withPayload("msg boday").setHeader("age", 2).build());

// 方式二:使用rocketmq-cliet原生的发送方法是在消息对象上通过putUserProperty(String key, String value) 来设置的
Message message = new Message();
message.setTopic("test-topic");
message.putUserProperty("age", "2");
rocketMQTemplate.getProducer().send(message1);


@RocketMQMessageListener(
        consumerGroup = "testConsumerGroup",
        topic = "test-filter-topic",
        selectorType = SelectorType.SQL92, 
        selectorExpression = "age=2"
)
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐