RocketMQ进击(五)集群消费模式与广播消费模式
楔子:新一天的旅程,掠过天空海湾,越过低谷高山,跃过深渊浅滩,在天南地北,走两江四岸,与日月星辰,看锦绣山河。1. 两种消费模式RocketMQ 有两种消费模式:集群消费模式:CLUSTERING,可以理解为同组公共消费。公共资源我拿了你就没有。即同一 Topic 下,一个ConsumerGroup 下如果有多个实例(可以是多个进程,或者多个机器),那么这些实例会均摊消费这...
楔子:新一天的旅程,掠过天空海湾,越过低谷高山,跃过深渊浅滩,在天南地北,走两江四岸,与日月星辰,看锦绣山河。
1. 两种消费模式
RocketMQ 有两种消费模式:
集群消费模式:CLUSTERING,可以理解为同组公共消费。公共资源我拿了你就没有。即同一 Topic 下,一个 ConsumerGroup 下如果有多个实例(可以是多个进程,或者多个机器),那么这些实例会均摊消费这些消息,但我消费了这条消费你就不会再消费。消费者默认是集群消费方式。适用于大部分消息业务。
广播消费模式:BROADCASTING,可以理解为同组各自消费。即同一 Topic 下,同一消息会被多个实例各自都消费一次。所以,广播消费模式中的 ConsumerGroup 概念没有太大的意义。这适用于一些分发消息的场景。
1.1. 集群消费模式
1.1.1 生产者
package com.meiwei.service.mq.tcp.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class ClusteringMqProducer {
// Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
// Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
// Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_CLUSTERING";
public static void main(String[] args) throws Exception {
// 声明并实例化一个 producer 生产者来产生消息
// 需要一个 producer group 名字作为构造方法的参数
DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-clustering");
// 指定 NameServer 地址列表,多个nameServer地址用半角分号隔开。此处应改为实际 NameServer 地址
// NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
producer.setNamesrvAddr("127.0.0.1:9876");
// 在发送MQ消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
// 循环发送MQ测试消息
String content = "";
for (int i = 0; i < 10; i++) {
// 配置容灾机制,防止当前消息异常时阻断发送流程
try {
content = "【MQ测试消息】测试消息 " + i;
// Message Body 可以是任何二进制形式的数据,消息队列不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH, "KEY" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(message);
// 日志打印
System.out.printf("Send MQ message success! Topic: %s,Tag: %s, msgId: %s, Message: %s %n",
message.getTopic(), message.getTags(), sendResult.getMsgId(), new String(message.getBody()));
} catch (Exception e) {
// 消息发送失败
System.out.printf("%-10d Exception %s %n", i, e);
e.printStackTrace();
}
}
// 在发送完消息之后,销毁 Producer 对象。如果不销毁也没有问题
producer.shutdown();
}
}
1.1.2 集群消费模式消费
package com.meiwei.service.mq.tcp.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 集群消息模式
*/
public class ClusteringMqConsumer {
// Message 所属的 Topic 一级分类,须要与提供者的频道保持一致才能消费到消息内容
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_CLUSTERING";
public static void main(String[] args) throws Exception {
// 声明并初始化一个 consumer
// 需要一个 consumer group 名字作为构造方法的参数
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-clustering");
// 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
consumer.setNamesrvAddr("127.0.0.1:9876");
// 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH);
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(mq->{
System.out.printf("Thread: %s, Host: %s, Key: %s, QueueId: %s, Topic: %s, Tags: %s, Message: %s",
Thread.currentThread().getName(),
mq.getBornHost(),
mq.getKeys(),
mq.getQueueId(),
mq.getTopic(),
mq.getTags(),
new String(mq.getBody()));
System.out.println();
});
// 返回消费状态
// CONSUME_SUCCESS 消费成功
// RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 调用 start() 方法启动 consumer
consumer.start();
System.out.println("Clustering Consumer Started.");
}
}
注:将此消费者拷贝一份为 ClusteringMqConsumer2,其它不动,以便测试。
1.1.3 测试及结果
启动集群消费者1、集群消费者2 和 消息生产者。
消息生产者(Producer)发送结果:
集群消费模式 消费者1 消费结果:
集群消费模式 消费者2 消费结果:
可以看到两个消费者是在多线程下的分摊消费效果。
1.2. 广播消费模式
1.2.1 生产者
package com.meiwei.service.mq.tcp.producer;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
public class BroadcastingMqProducer {
// Topic 为 Message 所属的一级分类,就像学校里面的初中、高中
// Topic 名称长度不得超过 64 字符长度限制,否则会导致无法发送或者订阅
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
// Tag 为 Message 所属的二级分类,比如初中可分为初一、初二、初三;高中可分为高一、高二、高三
private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_BROADCASTING";
public static void main(String[] args) throws Exception {
// 声明并实例化一个 producer 生产者来产生消息
// 需要一个 producer group 名字作为构造方法的参数
DefaultMQProducer producer = new DefaultMQProducer("meiwei-producer-broadcasting");
// 指定 NameServer 地址列表,多个nameServer地址用半角分号隔开。此处应改为实际 NameServer 地址
// NameServer 的地址必须有,但也可以通过启动参数指定、环境变量指定的方式设置,不一定要写死在代码里
producer.setNamesrvAddr("127.0.0.1:9876");
// 在发送MQ消息前,必须调用 start 方法来启动 Producer,只需调用一次即可
producer.start();
// 循环发送MQ测试消息
String content = "";
for (int i = 0; i < 10; i++) {
// 配置容灾机制,防止当前消息异常时阻断发送流程
try {
content = "【MQ测试消息】测试消息 " + i;
// Message Body 可以是任何二进制形式的数据,消息队列不做任何干预,需要 Producer 与 Consumer 协商好一致的序列化和反序列化方式
Message message = new Message(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH, "KEY" + i, content.getBytes(RemotingHelper.DEFAULT_CHARSET));
// 发送消息
SendResult sendResult = producer.send(message);
// 日志打印
System.out.printf("Send MQ message success! Topic: %s,Tag: %s, msgId: %s, Message: %s %n",
message.getTopic(), message.getTags(), sendResult.getMsgId(), new String(message.getBody()));
} catch (Exception e) {
// 消息发送失败
System.out.printf("%-10d Exception %s %n", i, e);
e.printStackTrace();
}
}
// 在发送完消息之后,销毁 Producer 对象。如果不销毁也没有问题
producer.shutdown();
}
}
1.2.2. 广播消费模式消费
package com.meiwei.service.mq.tcp.consumer;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import java.util.List;
/**
* 广播消费模式(Broadcasting)
* 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
*/
public class BroadcastingMqConsumer {
// Message 所属的 Topic 一级分类,须要与提供者的频道保持一致才能消费到消息内容
private static final String MQ_CONFIG_TOPIC = "TOPIC_MEIWEI_SMS_NOTICE_TEST";
private static final String MQ_CONFIG_TAG_PUSH = "PID_MEIWEI_SMS_BROADCASTING";
private static final String MQ_CONFIG_TAG_PUSH_OTHER = "PID_MEIWEI_SMS_OTHER";
public static void main(String[] args) throws Exception {
// 声明并初始化一个 consumer
// 需要一个 consumer group 名字作为构造方法的参数
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("meiwei-consumer-broadcasting");
// 同样也要设置 NameServer 地址,须要与提供者的地址列表保持一致
consumer.setNamesrvAddr("127.0.0.1:9876");
// 设置广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
// 设置 consumer 所订阅的 Topic 和 Tag,*代表全部的 Tag
consumer.subscribe(MQ_CONFIG_TOPIC, MQ_CONFIG_TAG_PUSH + " || " + MQ_CONFIG_TAG_PUSH_OTHER);
// 注册消息监听者
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
list.forEach(mq->{
System.out.printf("Thread: %s, Host: %s, Key: %s, QueueId: %s, Topic: %s, Tags: %s, Message: %s",
Thread.currentThread().getName(),
mq.getBornHost(),
mq.getKeys(),
mq.getQueueId(),
mq.getTopic(),
mq.getTags(),
new String(mq.getBody()));
System.out.println();
});
// 返回消费状态
// CONSUME_SUCCESS 消费成功
// RECONSUME_LATER 消费失败,需要稍后重新消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 调用 start() 方法启动 consumer
consumer.start();
System.out.println("Broadcasting Consumer Started.");
}
}
注:
- 将此消费者拷贝一份为 BroadcastingMqConsumer2,其它不动。
- 再拷贝一份为 BroadcastingMqConsumer3,修改其 ConsumerGroup 参数为 meiwei-consumer-broadcasting3。
这样得到 Consumer 和 Consumer2 同组,又与 Consumer3 不同组的消费者实例,以便测试。
1.2.3. 测试及结果
分别启动广播消费者1、广播消费者2 和 消息生产者。
消息生产者(Producer)发送结果:
广播消费模式 消息者1(BroadcastingMqConsumer)消费结果:
广播消费模式 消息者2(BroadcastingMqConsumer2)消费结果:
可以看到两个消息消费者都收到了同样的消息。
再启动广播消费者3(BroadcastingMqConsumer3),再次执行消息生产者,广播消费者3同样消费了所有消息。
2. 源码分析
MessageModel:
package org.apache.rocketmq.common.protocol.heartbeat;
public enum MessageModel {
BROADCASTING("BROADCASTING"),
CLUSTERING("CLUSTERING");
private String modeCN;
private MessageModel(String modeCN) {
this.modeCN = modeCN;
}
public String getModeCN() {
return this.modeCN;
}
}
DefaultMQPushConsumer:
public DefaultMQPushConsumer(String consumerGroup, RPCHook rpcHook, AllocateMessageQueueStrategy allocateMessageQueueStrategy) {
this.messageModel = MessageModel.CLUSTERING;
this.consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
this.consumeTimestamp = UtilAll.timeMillisToHumanString3(System.currentTimeMillis() - 1800000L);
this.subscription = new HashMap();
this.consumeThreadMin = 20;
this.consumeThreadMax = 64;
this.adjustThreadPoolNumsThreshold = 100000L;
this.consumeConcurrentlyMaxSpan = 2000;
this.pullThresholdForQueue = 1000;
this.pullThresholdSizeForQueue = 100;
this.pullThresholdForTopic = -1;
this.pullThresholdSizeForTopic = -1;
this.pullInterval = 0L;
this.consumeMessageBatchMaxSize = 1;
this.pullBatchSize = 32;
this.postSubscriptionWhenPull = false;
this.unitMode = false;
this.maxReconsumeTimes = -1;
this.suspendCurrentQueueTimeMillis = 1000L;
this.consumeTimeout = 15L;
this.consumerGroup = consumerGroup;
this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;
this.defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);
}
默认的 DefaultMQPushConsumer 内部定义了很多默认值,比如默认为集群消费方式,线程最小默认20,最大默认64,批量下拉消息默认32,默认一次消费一条消息等等。
参考资料:
RocketMQ 官网:http://rocketmq.apache.org/docs/motivation/
阿里云消息队列 MQ:https://help.aliyun.com/document_detail/29532.html
阿里巴巴中间件团队:http://jm.taobao.org/2016/11/29/apache-rocketmq-incubation/
RocketMQ进击物语:
RocketMQ进击(零)RocketMQ这个大水池子
RocketMQ进击(一)Windows环境下安装部署Apache RocketMQ
RocketMQ进击(二)一个默认生产者,两种消费方式,三类普通消息详解分析
RocketMQ进击(三)顺序消息与高速公路收费站
RocketMQ进击(四)定时消息(延时队列)
RocketMQ进击(五)集群消费模式与广播消费模式
RocketMQ进击(六)磕一磕RocketMQ的事务消息
RocketMQ进击(七)盘一盘RocketMQ的重试机制
RocketMQ进击(八)RocketMQ的日志收集Logappender
RocketMQ异常:RocketMQ顺序消息收不到或者只能收到一部分消息
RocketMQ异常:Unrecognized VM option 'MetaspaceSize=128m'
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)