RocketMQ 4.7.0 基础讲解
目录Rocketmq基础组件讲解Rocketmq安装Rocketmq图形化界面安装与使用Rocketmq基础组件讲解Apache RocketMQ作为阿里开源的一款高性能,高吞吐量的分布式消息中间件Rocketmq作为阿里主推的消息队列拥有以下特点:支持Broker和Consumer端消息过滤支持发布订阅和点对点支持pull拉和push推两种消息模式单一队列亿级消息堆积支持单master节点,多m
目录
- Rocketmq基础组件讲解
- Rocketmq安装
- Rocketmq图形化界面安装与使用
- Springboot与RocketMq的整合
- Rocketmq主从模式搭建
Rocketmq基础组件讲解
Apache RocketMQ作为阿里开源的一款高性能,高吞吐量的分布式消息中间件
Rocketmq作为阿里主推的消息队列拥有以下特点:
支持Broker和Consumer端消息过滤 |
---|
支持发布订阅和点对点 |
支持pull拉和push推两种消息模式 |
单一队列亿级消息堆积 |
支持单master节点,多master节点,多master和多slave节点 |
任意一个节点都是高可用,水平扩展,Producer,Consumer,Queue都可以分布式 |
消息失败重试机制,支持特定Level的定时消息 |
底层采用netty |
4.3.x以后支持分布式事务 |
适合金融业务,高可用性跟踪和审计功能 |
Rocketmq节点的说明:
Producer | 消息生产者 |
---|---|
Producer Group | 消息生产组,发送同类消息的一个消息生产组 |
Consumer | 消费者 |
Consumer Group | 消费同类消息的多个实例 |
Tag | 子主题,对topic近一步细化,用于区分同一主题下的不同业务的消息 |
Topic | 主题:订单类消息主题,新闻类消息主题,是一个逻辑管理单位·,一个topic可以有多个queue,默认创建是4个手动创建是8个 |
Message | 每个消息必须指定一个Topic |
Broker | MQ程序,接收生产者的消息,提供给消费者的程序 |
Name Server | 给生产者和消费者提供路由信息,可以理解为注册中心 |
Offset | 偏移量,可以理解为消息的进度 |
Commit Log | 消息存储会写在Commit Log文件里 |
Rocketmq安装
本文参照官网安装手册:http://rocketmq.apache.org/docs/quick-start/
下载安装包:https://archive.apache.org/dist/rocketmq/4.7.0/rocketmq-all-4.7.0-source-release.zip
先决条件:
1.jdk安装 https://blog.csdn.net/gcxzflgl/article/details/106373435
2.maven安装 https://blog.csdn.net/qq_38270106/article/details/97764483
安装过程:
1.下载4.7.0版本的安装压缩包
wget https://archive.apache.org/dist/rocketmq/4.7.0/rocketmq-all-4.7.0-source-release.zip
2.解压包 tar -zxvf rocketmq-all-4.7.0-source-release.zip -C ../module/
3.进入解压后目录 进行 maven编译,初次编译时间较长,可以打两盘王者再回来
cd rocketmq-all-4.7.0-source-release/
mvn -Prelease-all -DskipTests clean install -U
[INFO] ------------------------------------------------------------------------
[INFO] Reactor Summary for Apache RocketMQ 4.7.0 4.7.0:
[INFO]
[INFO] Apache RocketMQ 4.7.0 .............................. SUCCESS [25:15 min]
[INFO] rocketmq-logging 4.7.0 ............................. SUCCESS [02:53 min]
[INFO] rocketmq-remoting 4.7.0 ............................ SUCCESS [04:06 min]
[INFO] rocketmq-common 4.7.0 .............................. SUCCESS [ 55.674 s]
[INFO] rocketmq-client 4.7.0 .............................. SUCCESS [01:43 min]
[INFO] rocketmq-store 4.7.0 ............................... SUCCESS [01:39 min]
[INFO] rocketmq-srvutil 4.7.0 ............................. SUCCESS [ 0.318 s]
[INFO] rocketmq-filter 4.7.0 .............................. SUCCESS [03:15 min]
[INFO] rocketmq-acl 4.7.0 ................................. SUCCESS [ 46.760 s]
[INFO] rocketmq-broker 4.7.0 .............................. SUCCESS [ 2.588 s]
[INFO] rocketmq-tools 4.7.0 ............................... SUCCESS [ 1.571 s]
[INFO] rocketmq-namesrv 4.7.0 ............................. SUCCESS [ 0.634 s]
[INFO] rocketmq-logappender 4.7.0 ......................... SUCCESS [01:03 min]
[INFO] rocketmq-openmessaging 4.7.0 ....................... SUCCESS [ 7.243 s]
[INFO] rocketmq-example 4.7.0 ............................. SUCCESS [ 0.726 s]
[INFO] rocketmq-test 4.7.0 ................................ SUCCESS [ 30.650 s]
[INFO] rocketmq-distribution 4.7.0 ........................ SUCCESS [16:25 min]
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 58:54 min
[INFO] Finished at: 2020-06-15T21:33:09+08:00
[INFO] ------------------------------------------------------------------------
看到以上信息,编译安装成功了
4.进入编译成功后的目录,启动Name Server 和 Broker
cd /opt/module/rocketmq-all-4.7.0-source-release/distribution/target/rocketmq-4.7.0/rocketmq-4.7.0
#启动Name Server
nohup sh bin/mqnamesrv &
#启动Broker
nohup sh bin/mqbroker -n 192.168.126.70:9876 &
5.如果启动broker提示如下错误需要手动调整虚拟机分配内存大小
Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000005c0000000, 8589934592, 0) failed; error='Cannot allocate memory' (errno=12)
vim bin/runbroker.sh
更改67行内存大小(根据自己实际情况调整)
JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn2g"
6.看到如下信息代表成功启动了
[root@localhost rocketmq-4.7.0]# jps
20289 Jps
10278 NamesrvStartup
20203 BrokerStartup
验证服务正常运行
[root@localhost rocketmq-4.7.0]# export NAMESRV_ADDR=192.168.126.70:9876
[root@localhost rocketmq-4.7.0]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
[root@localhost rocketmq-4.7.0]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
7.服务关闭
[root@localhost rocketmq-4.7.0]# sh bin/mqshutdown broker
The mqbroker(20203) is running...
Send shutdown request to mqbroker(20203) OK
[root@localhost rocketmq-4.7.0]# sh bin/mqshutdown namesrv
The mqnamesrv(10278) is running...
Send shutdown request to mqnamesrv(10278) OK
以上就RocketMq 源码安装过程
Rocketmq图形化界面安装与使用
github地址:https://github.com/apache/rocketmq-externals
1.下载下来 进行Maven编译,初次运行需要较长一段时间,打一盘王者的等待时间就够了
cd /opt/module/rocketmq-externals/rocketmq-console
更新项目中的application.yml
设置Name Server连接地址
rocketmq.config.namesrvAddr=192.168.126.70:9876
#进行编译
mvn clean package -Dmaven.test.skip=true
2.看到以下信息代表编译安装成功
[INFO]
[INFO] >>> maven-source-plugin:3.0.1:jar (attach-sources) > generate-sources @ rocketmq-console-ng >>>
[INFO]
[INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ rocketmq-console-ng ---
[INFO] Starting audit...
Audit done.
[INFO]
[INFO] --- jacoco-maven-plugin:0.7.9:prepare-agent (default-prepare-agent) @ rocketmq-console-ng ---
[INFO] argLine set to -javaagent:/root/.m2/repository/org/jacoco/org.jacoco.agent/0.7.9/org.jacoco.agent-0.7.9-runtime.jar=destfile=/opt/module/rocketmq-externals/rocketmq-console/target/jacoco.exec
[INFO]
[INFO] <<< maven-source-plugin:3.0.1:jar (attach-sources) < generate-sources @ rocketmq-console-ng <<<
[INFO]
[INFO]
[INFO] --- maven-source-plugin:3.0.1:jar (attach-sources) @ rocketmq-console-ng ---
[INFO] Building jar: /opt/module/rocketmq-externals/rocketmq-console/target/rocketmq-console-ng-1.0.0-sources.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 25:22 min
[INFO] Finished at: 2020-06-15T22:31:28+08:00
[INFO] ------------------------------------------------------------------------
[root@localhost rocketmq-console]#
3.进入target包中执行 Java -jar rocketmq-console-ng-1.0.0.jar
访问http://192.168.126.70:8080端口查看图形化界面
4.图形化界面上测试产生消息,shell进行消费
发送消息
消息产生成功
shell端进行消费
[root@localhost rocketmq-4.7.0]# sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
22:51:16.498 [main] DEBUG i.n.u.i.l.InternalLoggerFactory - Using SLF4J as the default logging framework
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [brokerName=localhost.localdomain, queueId=2, storeSize=188, queueOffset=250, sysFlag=0, bornTimestamp=1592232237604, bornHost=/192.168.126.70:47422, st
oreTimestamp=1592232237617, storeHost=/192.168.126.70:10911, msgId=C0A87E4600002A9F000000000002BEB2, commitLogOffset=179890, bodyCRC=691830935, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=251, KEYS=gcx, CONSUME_START_TIME=1592232676926, UNIQ_KEY=C0A87E465259681A95154CF9C6240000, WAIT=true, TAGS=tag}, body=[116, 111, 100, 97, 121, 32, 105, 115, 32, 97, 32, 104, 111, 116, 32, 100, 97, 121], transactionId='null'}]]
查看图形界面已经被消费掉,延时队列清空
首页仪表盘查看刚刚新增的消息
Springboot与RocketMq的整合
1)创建springboot工程
2)添加pom.xml依赖
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.7.0</version>
</dependency>
3)jms简易配置(只是为了演示整合功能,实际中放到工程配置文件中)
package com.gcxzflgl.rocketmq.jms;
public class JmsConfig {
public static final String NAME_SERVER = "192.168.126.70:9876";
public static final String TOPIC = "autoCreateTopic";
}
生产者:
package com.gcxzflgl.rocketmq.jms;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;
@Component
public class PayProducer {
private String producerGroup = "pay_producer_group";
private DefaultMQProducer producer;
public PayProducer(){
producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(JmsConfig.NAME_SERVER);
start();
}
public DefaultMQProducer getProducer(){
return this.producer;
}
/**
* 对象在使用之前必须要调用一次,只能初始化一次
*/
public void start(){
try {
this.producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 一般在应用上下文,使用上下文监听器,进行关闭
*/
public void shutdown(){
this.producer.shutdown();
}
}
消费者:
package com.gcxzflgl.rocketmq.jms;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.List;
@Component
public class PayConsumer {
private DefaultMQPushConsumer consumer;
private String consumerGroup = "pay_consumer_group";
public PayConsumer() throws MQClientException {
consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setNamesrvAddr(JmsConfig.NAME_SERVER);
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
consumer.subscribe(JmsConfig.TOPIC, "*");
//当有消息来临时触发监听
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
Message msg = msgs.get(0);
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(msgs.get(0).getBody()));
String topic = msg.getTopic();
String body = new String(msg.getBody(), "utf-8");
String tags = msg.getTags();
String keys = msg.getKeys();
System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
//消息成功接收告诉Broker更新消息状态
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
consumer.start();
System.out.println("consumer start ...");
}
}
rest访问层
```java
package com.gcxzflgl.rocketmq.rest;
import com.gcxzflgl.rocketmq.jms.JmsConfig;
import com.gcxzflgl.rocketmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
@RestController
public class PayController {
@Autowired
private PayProducer payProducer;
@RequestMapping("/api/v1/pay_cb")
public Object callback(String text) throws InterruptedException, RemotingException, MQClientException, MQBrokerException {
Message message = new Message(JmsConfig.TOPIC,"taga", ("hello gcx rocketmq = "+text).getBytes() );
SendResult sendResult = payProducer.getProducer().send(message);
System.out.println(sendResult);
return "消息生产成功!";
}
}
4) 启动项目 访问地址: http://localhost:8080/api/v1/pay_cb?text=瑞燊儿
进行生产生产消息
idea控制台打印:
SendResult [sendStatus=SEND_OK, msgId=AC24915B518418B4AAC251B3B31C0000, offsetMsgId=C0A87E4600002A9F000000000002C035, messageQueue=MessageQueue [topic=autoCreateTopic, brokerName=rocketmq-master, queueId=2], queueOffset=0]
ConsumeMessageThread_1 Receive New Messages: hello gcx rocketmq = 瑞燊儿
topic=autoCreateTopic, tags=taga, keys=null, msg=hello gcx rocketmq = 瑞燊儿
Rocketmq主从模式搭建
安装步骤
1 . 前面已经讲解过单机安装过程,我们在以安装Rocktmq的机器上进行克隆,克隆过程不再讲解。确保两台机器上(192.168.126.70、192.168.126.71)上都安装jdk1.8、maven 3.2+ 。
2 . 进入指定目录复制一份作为演示的主从节点配置(两台机器都要执行)
cd /opt/module/rocketmq-all-4.7.0-source-release/distribution/target/rocketmq-4.7.0/rocketmq-4.7.0/conf
cp -r 2m-2s-async/ m-s-async-disk
3 . 更改配置信息
master 节点:192.168.126.70
vim broker-a.proterties
namesrvAddr=192.168.126.70:9876;192.168.126.71:9876
brokerClusterName=GcxCluster
brokerName=broker-a #主节点和从节点名字需要相同
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER #主节点
flushDiskType=ASYNC_FLUSH #异步刷写到磁盘
slave 节点:192.168.126.71
vim broker-a-s.properties
namesrvAddr=192.168.126.70:9876;192.168.126.71:9876
brokerClusterName=GcxCluster
brokerName=broker-a #主节点和从节点名字需要相同
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE #从节点
flushDiskType=ASYNC_FLUSH #异步刷写到磁盘
4.启动服务
主节点:
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -c conf/m-s-async-disk/broker-a.properties &
从节点:
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -c conf/m-s-async-disk/broker-a-s.properties &
5.使用可视化控制台查看节点模式
至此,Rocketmq主从节点搭建完毕!
主从同步讲解
- Broker分为master和slave,一个master可以对应多个slave,反之不可以。master与slaver通过相同的Broker Name来匹配,不用的Broker id来定义是master还是slave。Broker向所有的NameServer节点建立长连接,定时注册topic和发送元数据信息。NameServer定时(120s)扫描存活的Broker链接,超时没响应断开心跳检测,对于consumer客户端不能感知。consumer定时(30s)从NameServer获取topic的信息,当Broker不可用时,consumer最多需要30s才会发现
- 只有master能进行写操作,slave不能进行写入操作,同步策略取决于master
- 客户端消费可以从master和slave消费,默认消费者都从master消费,如果master挂掉后,客户端从NameServer感知Broker宕机,就会从slave进行消费,感知并非实时,slave不能保证master 100%的消息都同步过来了,会存在少量丢失,一旦master恢复,未同步过去的消息会被消费掉。
- 如果consumer实例的数量多于message queue的数量,多出来的consumer将无法分配到queue,无法起到负载均衡的作用,所以需要控制message queue的数量大于等于consumer的数量最好是成倍关系。
至此,RocketMq的使用讲解完毕!
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)