1 架构变化

RocketMQ 5.0 架构上的变化主要是为了更好的走向云原生

RocketMQ 4.x 架构如下:

Broker 向 Name Server 注册 Topic 路由信息,Producer 和 Consumer 则从 Name Server 获取路由信息,然后 Producer 根据路由信息向 Broker 发送消息,Consumer 则根据路由信息从 Broker 拉取消息。

这个架构存在以下几个问题:

  1. 富客户端,客户端同时支持 Java、C++、Go 等各种语言,如果为了跟应用程序隔离,把客户端部署到 sidecar 中,这个 sidecar 会很大,部署难度高;

2.Broker 同时承担计算和存储的功能,不利于云原生环境下的资源解耦。

RocketMQ 5.0 架构如下图:

RocketMQ 5.0 在架构上主要做了两个优化:1. 通过引入无状态的代理模块,将 Broker 原来的协议适配、权限管理、消息管理等计算功能抽离到了代理模块中,Broker 则专注于存储功能。这样在云环境下可以更好地进行资源调度;

2.RocketMQ 5.0 基于 gRPC 支持多语言 SDK,各语言 SDK API 在本地语言层面对齐,API 非常轻量级,更容易被使用和集成。

2 集成事件、流处理

RocketMQ 5.0 采用事件驱动架构来支持消息流式处理和轻计算,可以实现消息的就近计算和分析。

RocketMQ 5.0 增加了 RocketMQ-EventBridge 组件,这个组件兼容标准 CloudEvents 协议标准,既可以链接社区活跃的生态,又可以跟各大云厂商的产品进行集成,对云原生的支持非常友好。下面这张图来自官网:

2.1 流式处理

为了更好地支持流失处理,RocketMQ 5.0 在原有 MessageQueue 的基础上抽象出了逻辑队列。一个逻辑队列可以包含多个物理队列,以此拼接出流式队列。如下图:

这样可以更加轻量级,做到秒级的扩缩容,即使物理节点发生变化也不需要复制迁移数据,数据存储的调度也更加灵活。

2.2 计算框架

在计算框架方面,RocketMQ 5.0 主要有两个变化:

  1. 引入流式处理框架 RSteams,这样 RocketMQ 自身就可以完成轻量级的理和计算;

  2. 引入轻量级 SQL 查询引擎 RSQLDB,RSQLDB 可以兼容了 Flink/Blink SQL 标准,实现了 RocketMQ 和 Flink/Blink 的融合。比如对于轻量级的计算,可以使用 SQL 在 RocketMQ 完成,而对于重量级的计算,RocketMQ 资源受限时,可以从 RocketMQ 迁移到 Flink 处理。

3 高可用

在 RocketMQ 5.0 之前,高可用架构有两个阶段:

1.RocketMQ 4.5 之前采用 Master-Slave 部署,这种架构 Master 发生故障后是不能自动切换的,对集群的影响会比较大;

2.RocketMQ 4.5 之后采用基于 raft 协议的 DLedger 算法来进行主从切换,架构如下图:

3.1 Master-Slave 架构优化

RocketMQ 5.0 对 Master-Slave 架构和基于 Raft 的架构都做了优化。

对于 Master-Slave 架构的升级,RocketMQ 5.0 引入了 BrokerContainer 的概念,一个 BrokerContainer 中可以部署多个 Broker,这些 Broker 拥有独立的端口,功能完全独立,可以通过 admin 来增加或减少 Broker。如下图:

这样一个 BrokerContainer 中的多个 Broker 可以共享同一个节点的资源,提高资源利用率。

同时,在一个 BrokerContainer 中可以同时部署 Broker 的 Master 和 Slave 节点,这样就可以通过 Master/Slave 交叉部署来实现节点对等,如下图两节点对等部署:

即使 Node1 挂了,Node2 节点中的 Broker1 可以提供读功能,并不会丢消息,Broker2 可以提供读写功能。

再看下面三个节点的对等部署架构图:

每个 Node 的 BrokerContainer 中都有 1 个 Master 跟 2 个 Slave 节点,如果其中一个 Node 挂了,其他两个 Node 上的 Broker 可以继续提供读写服务。

3.2 Raft 架构优化

基于 Raft 架构虽然可以实现主节点故障后自动切换,但也存在几个问题:

  1. 消息日志副本数必须是 3 个以上,这个是 Raft 协议自动选主的要求,造成资源浪费;

  2. Raft 选主过程中必须有多数节点同意才能选主成功,副本数越多时间开销会越大,这会加大 ACK 延时;

  3. CommitLog 主从同步需要使用 DLedger 库,也就是说 CommitLog 被看作是 Raft log 进行复制,这样 RocketMQ 原生的零拷贝、堆外内存的优势无法保留了。

RocketMQ 5.0 专门增加了轻量级的 DLedgerControlller 选主组件,将选主的切换能力上移,这个组件是可拔插的,既可以部署在 NameServer 中,也可以部署在本地。如下图:

引入了 DLedgerControlller 组件后,消息主备复制还是采用 RocketMQ 原生的基于 Master-Slave 架构的复制能力,复制效率高。

4. 定时/延时消息

5.0 支持毫秒级任意时间点定时任务
参考
https://blog.csdn.net/agonie201218/article/details/133610576

 //定时/延时消息发送
        MessageBuilder messageBuilder = new MessageBuilderImpl();;
        //以下示例表示:延迟时间为10分钟之后的Unix时间戳。
        Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000;
        Message message = messageBuilder.setTopic("topic")
                //设置消息索引键,可根据关键字精确查找某条消息。
                .setKeys("messageKey")
                //设置消息Tag,用于消费端根据指定Tag过滤消息。
                .setTag("messageTag")
                .setDeliveryTimestamp(deliverTimeStamp)
                //消息体
                .setBody("messageBody".getBytes())
                .build();
        try {
            //发送消息,需要关注发送结果,并捕获失败等异常。
            SendReceipt sendReceipt = producer.send(message);
            System.out.println(sendReceipt.getMessageId());
        } catch (ClientException e) {
            e.printStackTrace();
        }
        //消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。
        MessageListener messageListener = new MessageListener() {
            @Override
            public ConsumeResult consume(MessageView messageView) {
                System.out.println(messageView.getDeliveryTimestamp());
                //根据消费结果返回状态。
                return ConsumeResult.SUCCESS;
            }
        };
        //消费示例二:使用SimpleConsumer消费定时消息,主动获取消息进行消费处理并提交消费结果。
        List<MessageView> messageViewList = null;
        try {
            messageViewList = simpleConsumer.receive(10, Duration.ofSeconds(30));
            messageViewList.forEach(messageView -> {
                System.out.println(messageView);
                //消费处理完成后,需要主动调用ACK提交消费结果。
                try {
                    simpleConsumer.ack(messageView);
                } catch (ClientException e) {
                    e.printStackTrace();
                }
            });
        } catch (ClientException e) {
            //如果遇到系统流控等原因造成拉取失败,需要重新发起获取消息请求。
            e.printStackTrace();
        }

5 消息(Message)

消息类型

  • 定义:当前消息的类型。
  • 取值:从客户端SDK接口获取。Apache RocketMQ 支持的消息类型如下:
    • Normal:普通消息,消息本身无特殊语义,消息之间也没有任何关联。
    • FIFO:顺序消息,Apache RocketMQ 通过消息分组MessageGroup标记一组特定消息的先后顺序,可以保证消息的投递顺序严格按照消息发送时的顺序。
    • Delay:定时/延时消息,通过指定延时时间控制消息生产后不要立即投递,而是在延时间隔后才对消费者可见。
    • Transaction:事务消息,Apache RocketMQ 支持分布式事务消息,支持应用数据库更新和消息调用的事务一致性保障。

6 流控机制

消息流控基本概念

消息流控指的是系统容量或水位过高, Apache RocketMQ 服务端会通过快速失败返回流控错误来避免底层资源承受过高压力。

触发条件

Apache RocketMQ 的消息流控触发条件如下:

  • 存储压力大:参考消费进度管理的原理机制,消费者分组的初始消费位点为当前队列的最大消费位点。若某些场景例如业务上新等需要回溯到指定时刻前开始消费,此时队列的存储压力会瞬间飙升,触发消息流控。
  • 服务端请求任务排队溢出:若消费者消费能力不足,导致队列中有大量堆积消息,当堆积消息超过一定数量后会触发消息流控,减少下游消费系统压力。

7.无状态代理模

https://blog.csdn.net/alisystemsoftware/article/details/127248969

RocketMQ 5.0 文档地址 https://rocketmq.apache.org/zh/docs/

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐