结合Java代码实现RocketMQ的生产与消费消息
在前面的文章中,已经详细介绍并使用到了消息生产者,消息消费者,broker等集群相关的知识,这篇文章介绍一下其他的小组件以及使用Java代码实现生产者对消息的生成,消费者消费消息等知识点。希望这篇文章能帮助到正在学习RocketMQ知识点的小伙伴儿们!!!
前言
在前面的文章中,已经详细介绍并使用到了消息生产者,消息消费者,broker等集群相关的知识,这篇文章介绍一下其他的小组件以及使用Java代码实现生产者对消息的生成,消费者消费消息等知识点。
希望这篇文章能帮助到正在学习RocketMQ知识点的小伙伴儿们!!!
RocketMQ其他组件
在RocketMQ中,除了生产者,消费者,还有一些其他的小组件,接下来逐一介绍一下他们。
监听器(Listener)
定义:监听器是消费者用于处理消息的组件。在PushConsumer(推)模式下,消费者客户端必须设置消费监听器,以便在接收到消息时执行相应的处理逻辑。(比如一会儿下面的代码)
偏移量(Offset)
定义:偏移量是指在消费消息时,记录消费者已经消费到的消息位置的值。每个消息都有一个唯一的偏移量值,它代表了消息在消息队列中的位置。
偏移量具有很大的作用:它能够保证消费者在重启或宕机后能够从上次消费的位置继续消费消息,避免重复消费或漏消费。
简而言之就是它能告诉消费者已经消费到哪一条消息!!
- 集群模式:在集群消费模式下,消息队列的消费进度保存在Broker端。消费者每次消费完消息后,会将最新的消费进度同步到Broker,以便在消费者重启或者故障转移的时候能够从上一次消费的位置继续消费。
- 广播模式:在广播消费模式下,消息队列的消费进度保存在消费者本地。因为广播模式下每条消息都会被所有消费者消费,所以不需要在Broker端保存消费进度。
所以,偏移量的的实现方式有两种:包括存储在本地文件(OffsetStore)和存储在Broker中这两种方式。(这样一看,清晰了吧)
命名服务器(NameServer)
这个上几篇文章介绍过并且用到过,这个比较重要再介绍一下!
定义:命名服务器是RocketMQ中的轻量级路由服务,存储生产者和消费者与Broker之间的路由信息。
它的作用:提供Broker的动态注册与发现服务,生产者和消费者通过NameServer查询Broker的路由信息
,从而进行消息的投递和消费。
消息组成
- Topic:消息主题,对不同的业务消息进行分类。
- Tag:消息标签,进一步区分某个Topic下的消息分类。使用Tag可以实现对Topic中的消息进行
过滤
。消费者可以根据Tag来订阅自己感兴趣的消息,而不是接收Topic下的所有消息。 - Message Body:消息体,消息的实际内容。
- Keys:消息的键值,标识消息的唯一性。在RocketMQ中,每个消息都可以设置Keys字段,以便在需要的时候根据Keys来查询或者定位消息。
- 属性:除了上面滴,RocketMQ的消息还可以包含一系列的属性信息,比如
消息的发送时间、生产者信息
等等。这些属性信息以键值对的形式存在,随着消息一起被存储和传输。
实现生产与消费消息
按之前的步骤搭建完成RocketMQ集群后…
首先我们创建一个空的maven工程,在pom.xml文件中添加RocketMQ的依赖(RocketMQ的依赖版本需要与虚拟机中的保持一致,这里选择和之前一样的4.7.1版本):
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.1</version>
</dependency>
生产消息
然后编写生产者生产消息的代码:
// 1.创建一个DefaultMQProducer实例,指定生产者组名
DefaultMQProducer producer = new DefaultMQProducer("my-producer-group1");
// 2.设置NameServer的地址
producer.setNamesrvAddr("192.168.220.135:9876");
// 3.启动生产者实例
producer.start();
// 4.使用for循环发送10条消息
for (int i = 0; i < 10; i++) {
// 创建一条消息,指定Topic为"MyTopic1",Tag标签为"TagA",消息体为"hello rocketmq"加上循环变量的值,同时把字符串转换为字节数组
Message message = new Message("MyTopic1","TagA",("hello rocketmq"+i).getBytes(StandardCharsets.UTF_8));
// 5.发送消息并接收发送结果
SendResult sendResult = producer.send(message);
// 打印发送结果,包括消息ID、发送状态等信息
System.out.println(sendResult);
}
// 6.发送完所有消息后,关闭生产者实例,释放资源
producer.shutdown();
生产者生产消息和消费者消费消息这块的代码都相对较为简单,已经在代码块中加了注释,这里就不再赘述了。
这个时候就可以访问虚拟机+端口号来搜索到发送的消息详情了!
消费消息
// 1.和生产者一样,创建一个DefaultMQPushConsumer实例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer-group1");
// 2.设置NameServer的地址,消费者通过这个地址与NameServer进行通信,来获取Broker的地址信息
consumer.setNamesrvAddr("192.168.220.135:9876");
// 订阅一个或多个Topic,以及Tag来过滤需要消费的消息。我们订阅了"MyTopic1",使用"*"来匹配此Topic下的所有Tag
consumer.subscribe("MyTopic1", "*");
// 3.注册消息监听器,用于处理从Broker接收到的消息。使用MessageListenerConcurrently接口的实现,表示并行消费
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
// 4.当收到消息时,方法会被调用。
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
// 5.遍历消息列表,并打印每条消息的内容(注意:这里直接打印msg对象不会得到预期的消息内容字符串)
for (MessageExt msg : msgs) {
// 所以我们打印msg.getBody()的内容,为了保留消息原样
System.out.println("已收到消息" + msg);
}
// 6.返回消费状态,这里表示消息已成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 7.启动消费者实例
consumer.start();
// 8.打印日志消费者已启动
System.out.println("消费者已启动");
这里需要注意,msg包含了消息的详细信息,包括消息体、标签、属性等等。如果想打印消息内容,应该使用msg.getBody()方法获取消息体的字节数组,并且把它转换为字符串(如果消息体是文本的话)。
本篇文章到这里就结束了,后续会继续分享RocketMQ相关的知识,感谢各位小伙伴们的支持!
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)