引用资料,也可以直接看这两个链接即可,本文只是做了一些整合
1、 https://www.runoob.com/w3cnote/mqtt-intro.html
2、 https://blog.csdn.net/quending/article/details/85254661

正文:

1 序言

基于研究多进程设备中,主进程和其他业务进程间,消息分发处理机制 的考虑,记录如下,故文章重点在服务端消息机制的处理方面,mqtt协议本身本非重点

2 概述

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,是一个基于客户端-服务器的消息发布/订阅传输协议。

MQTT最大优点在于,以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛,在物联网、小型设备、移动应用等方面有较广泛的应用。

3 协议介绍

MQTT协议需要客户端和服务器端通讯完成,在通讯过程中,MQTT协议中有三种身份:发布者(Publish)、代理(Broker)(服务器)、订阅者(Subscribe)。其中,消息的发布者和订阅者都是客户端,消息代理是服务器,消息发布者可以同时是订阅者。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GinBs8T8-1607485570622)(file:///C:\Users\256737\AppData\Local\Temp\msohtmlclip1\01\clip_image002.jpg)]

MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

(1)Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

(2)payload,可以理解为消息的内容,是指订阅者具体要使用的内容。

MQTT会构建底层网络传输:它将建立客户端到服务器的连接,提供两者之间的一个有序的、无损的、基于字节流的双向传输。当应用数据通过MQTT网络发送时,MQTT会把与之相关的服务质量(QoS)和主题名(Topic)相关连。

3.1 客户端 Client

一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。客户端可以:

(1)发布其他客户端可能会订阅的信息;

(2)订阅其它客户端发布的消息;

(3)退订或删除应用程序的消息;

(4)断开与服务器连接。

3.2 服务端 Broker

MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间,它可以:

(1)接受来自客户的网络连接;

(2)接受客户发布的应用信息;

(3)处理来自客户端的订阅和退订请求;

(4)向订阅的客户转发应用程序消息。

3.3 订阅、主题、会话

订阅(Subscription)
包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

会话(Session)
每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

主题名(Topic Name)
连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

主题筛选器(Topic Filter)
一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

负载(Payload):消息订阅者所具体接收的内容。

3.4 服务质量QoS

MQTT支持三种消息发布服务质量(QoS):

至多一次(QoS==0),消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。

至少一次(QoS==1),确保消息到达,但消息重复可能会发生。

只有一次(QoS==2),确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。

4 消息机制

以Mosquitto-0.1为例,简要剖析mqtt-broker的消息处理机制。

一个mqtt - broker要完成哪些任务?

作为并发服务器,维护多个客户端的TCP链路
处理客户端mqtt connect、disconnect、subscribe、publish、ping等请求
处理消息持久化即消息永久保存问题、处理不同QoS消息

Mosquitto-0.1是怎样完成broker要完成的任务的?

使用pselect处理客户端并发

使用结构mqtt3_context维护每个客户端信息

使用sqlite3创建表客户端clients、订阅sbus、消息messages、持久化retain,来实现对客户端-订阅/发布/持久化-消息的管理

4.1 概述

Mosquitto V0.1版本,实现了独立、完整的MQTT V3.1协议的服务端(broker)。源码行数约3000行,使用C语言编写,.c文件13个,broker使用其中的10个文件。因为mosquitto基于sqlite3,其编译链接和运行,需要libsqlite3.so 。

文件名主要函数描述
conf.cmqtt3_config_read读取并解析配置文件
context.cmqtt3_context_initmqtt3_context_cleanup提供mqtt3_context的初始化和清理接口。mqtt3_context结构含socket fd,客户端id,最后一次收发时间,保活时间等参数。
memory.cmqtt3_callocmqtt3_free等提供内存分配和使用接口
database.cmqtt3_db_openmqtt3_db_close_mqtt3_db_tables_create_mqtt3_db_statement_preparemqtt3_db_XXX_insert等提供mqtt相关sqlite3数据库操作接口
net.cmqtt3_socket_listen提供TCP socket接口
mosquito.chandle_read入口函数所在文件
raw_send.cmqtt3_raw_publishmqtt3_raw_puback等提供mqtt原始报文发送接口
raw_send_client.cmqtt3_raw_connectmqtt3_raw_disconnectmqtt3_raw_subscribemqtt3_raw_unsubscribe提供客户端mqtt原始报文发送接口,broker不使用该文件
raw_send_server.cmqtt3_raw_connack提供connect ack发送接口
read_handle.cmqtt3_handle_publishmqtt3_handle_pubackmqtt3_handle_pingreq等提供socket上读入数据处理接口
read_handle_client.cmqtt3_handle_connackmqtt3_handle_subackmqtt3_handle_unsuback提供客户端socket上读入数据处理接口,broker不使用该文件
read_handle_server.cmqtt3_handle_connectmqtt3_handle_disconnectmqtt3_handle_subscribemqtt3_handle_unsubscribe提供mqtt conn/disconn/sub/unsub命令处理接口
util.cmqtt3_command_to_string提供工具,未使用

文件调用关系

mosquito.c   - - >  conf.c
mosquito.c 	- - >  read_handle_server.c - - > context.c
					|- - >  database.c
							|- - > raw_send.c
					|- - >  raw_send_server.c
					|- - >  net.c
mosquito.c   - - >  read_handle.c
					|- - > database.c
					| - - > net.c
mosquito.c   - - >  memory.c
mosquito.c   - - >  database.c

4.2 数据结构

4.2.1 数据库表

mosquitto启动后共创建5个表:

主要用于版本信息的config
CREATE TABLE config ( [key] TEXT PRIMARY KEY,  value TEXT);
记录客户端信息的clients
CREATE TABLE clients (
    sock         INTEGER,
    id           TEXT    PRIMARY KEY,
    clean_start  INTEGER,
    will         INTEGER,
    will_retain  INTEGER,
    will_qos     INTEGER,
    will_topic   TEXT,
    will_message TEXT,
    last_mid     INTEGER
);
订阅信息subs
CREATE TABLE subs ( client_id TEXT, sub       TEXT, qos       INTEGER);
持久化消息retain
CREATE TABLE retain (
    sub        TEXT,
    qos        INTEGER,
    payloadlen INTEGER,
    payload    BLOB
);
客户端消息messages
CREATE TABLE messages (
    client_id  TEXT,
    timestamp  INTEGER,
    direction  INTEGER,
    status     INTEGER,
    mid        INTEGER,
    dup        INTEGER,
    qos        INTEGER,
    retain     INTEGER,
    sub        TEXT,
    payloadlen INTEGER,
    payload    BLOB
);

4.2.2 mqtt3_config系统全局配置

CREATE TABLE messages (
    client_id  TEXT,
    timestamp  INTEGER,
    direction  INTEGER,
    status     INTEGER,
    mid        INTEGER,
    dup        INTEGER,
    qos        INTEGER,
    retain     INTEGER,
    sub        TEXT,
    payloadlen INTEGER,
    payload    BLOB
);

4.2.3 mqtt3_context客户端上下文

typedef struct _mqtt3_context{
    int sock;
    time_t last_msg_in;
    time_t last_msg_out;
    uint16_t keepalive;
    bool clean_start;
    char *id;
} mqtt3_context;

**mqtt3_context****方法:**mqtt3_context_init:初始化,mqtt3_context_cleanup:关闭socket;如果clean_start置上,则还需要删除subs、messages、clients中与对应客户端有关的行

4.2.4 mqtt3_msg_status消息状态

typedef enum {
    ms_invalid = 0,
    ms_publish = 1,
    ms_publish_puback = 2,
    ms_wait_puback = 3,
    ms_publish_pubrec = 4,
    ms_wait_pubrec = 5,
    ms_resend_pubrel = 6,
    ms_wait_pubrel = 7,
    ms_resend_pubcomp = 8,
    ms_wait_pubcomp = 9
} mqtt3_msg_status;

基本函数的介绍,见 https://blog.csdn.net/quending/article/details/85254661 原文

4.3 算法和处理流程

img

4.3.1 系统初始化

mqtt3_config_read

读取系统配置文件,完成mqtt3_config的初始化(未提供配置时使用默认配置,例如默认监听端口1883)

mqtt3_db_open

  1. 根据系统配置打开/创建数据库,并调用_mqtt3_db_tables_create使用CREATE TABLE IF NOT EXISTS的方式创建5个表。
  2. 调用_mqtt3_db_invalidate_sockets初始化客户端socket: UPDATE clients SET sock=-1

mqtt3_socket_listen

开启TCP监听(默认端口1883)。

系统初始化完成后,进入处理循环。

4.3.2 Broker socket新连接处理

收到新的TCP connect之后,accept并且调用mqtt3_context_init进行context初始化

4.3.3 Broker socket 读操作及处理

TCP socket读数据在handle_read()中根据mqtt报文类型进行处理。Broker可能收到的mqtt报文有connect、publish、puback、pubrec、pubrel、pubcomp、subscribe、unsubscribe、ping、disconnect共10种(不含错误的报文)。

1) mqtt connect处理(ack)

【协议定义】客户端的connect应该回复ack

【处理】mqtt3_handle_connect为处理客户端mqtt connect的函数,最重要的是两个任务:

a\设置context id域;

b\将(发起mqtt connect的)客户端信息插入表clients。

context->id = client_id;
mqtt3_db_client_insert(context, will, will_retain, will_qos, will_topic, ill_message);

【示例】查看sqlite3数据库clients表

img

2) mqtt disconnect处理(no ack)

【协议定义】disconnect消息:e0 00,不需要应答。

【处理】 mqtt3_handle_disconnect调用mqtt3_socket_close

/**
关闭socket;
重置context sock;
根据客户端重置clients表中的相应行中sock列
*/
int mqtt3_socket_close(mqtt3_context *context){
    mqtt3_db_client_invalidate_socket(context->id, context->sock);
    rc = close(context->sock);
    context->sock = -1;
}
3) mqtt ping处理(pong)

【协议定义】对客户端的ping request直接回复ping response.理论上broker收不到pong,因此即使收到,就简单丢弃。

4) mqtt subscribe处理(ack,retain pub)

【协议定义】subscribe需要ack

【处理】int mqtt3_handle_subscribe(mqtt3_context *context)是处理客户端订阅的函数:

A、调用mqtt3_db_sub_insert将该订阅信息插入subs表

B、调用mqtt3_db_retain_find检查retain表里是否有消息(关键字sub)需要立即推送给该订阅者,并根据结果进行消息推送操作(描述见publish处理)。

在MQTT协议里关于消息的持久化规定对于持久的、最新一条PUBLISH消息,服务器要马上推送给新的订阅者(注:仅最新的一条,不是所有)。

【示例】1:mqtt sub(-t msg)之后,查看subs有该订阅信息:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-KPghnJeK-1607485570625)(file:///C:\Users\256737\AppData\Local\Temp\msohtmlclip1\01\clip_image005.png)]

【示例】2:有持久消息时,订阅者会收到推送,如下,retain存有主题为/messages/vb的消息:

img

在订阅(-t /messages/vb)之后立即收到该消息:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-p3jxxh3o-1607485570627)(file:///C:\Users\256737\AppData\Local\Temp\msohtmlclip1\01\clip_image009.jpg)]

该新订阅者除了收到sub ack之外还收到了retain消息(qos=0):ret_vb,如下:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-l7U1T9wU-1607485570628)(file:///C:\Users\256737\AppData\Local\Temp\msohtmlclip1\01\clip_image011.jpg)]

5) mqtt unsubscribe处理(ack)

【协议定义】取消订阅需要ack。

【处理】int mqtt3_handle_unsubscribe(mqtt3_context *context)调用mqtt3_db_sub_delete删除subs表相应行。

注意:订阅者单纯的TCP断开,不会发送unsubscribe消息(因为根本来不及完成这个交互)。

6) mqtt publish处理(QoS=0/1/2)

【协议定义】publish有retain标志;publish根据QoS有不同处理

QoS=0,broker应立即把消息推送给订阅者,不回ack.

QoS=1,broker应立即把消息推送给订阅者,并回复ACK.

QoS=2,pub-recv-rel-comp,broker首先把消息暂存,然后经过recv-rel-comp握手之后,再把消息推送给订阅者。

【处理】mqtt3_handle_publish调用mqtt3_db_messages_queue或mqtt3_db_message_insert更新数据库,调用mqtt3_raw_XX回复客户端。

int mqtt3_handle_publish(mqtt3_context *context, uint8_t header)
{
……
    switch(qos){
        case 0:
            if(mqtt3_db_messages_queue(sub, qos, payloadlen, payload, retain)) rc = 1;
            break;
        case 1:
            if(mqtt3_db_messages_queue(sub, qos, payloadlen, payload, retain)) rc = 1;
            if(mqtt3_raw_puback(context, mid)) rc = 1;
            break;
        case 2:
            if(mqtt3_db_message_insert(context->id, mid, md_in, ms_wait_pubrec, retain, sub, qos, payloadlen, payload)) rc = 1;
            if(mqtt3_raw_pubrec(context, mid)) rc = 1;
            break;
     }
……
}

对应qos值分别:

0:调用mqtt3_db_messages_queue(),为插入/更新retain表(因为publish消息可以是retain型)和messages表,消息方向md_out。

1:同0,多一个puback回复。

2:会调用mqtt3_db_message_insert暂存消息并回复pubrec。注意消息方向md_in,状态ms_wait_pubrec。这说明这条消息是暂存的,在握手完成后,立即删除这条暂存消息并把原pub消息进行推送,见pubrel处理。

注意:发布者在发布之后,都会disconnect断连。这一点和订阅者不同,订阅者是和服务端保持长连接的!

7) mqtt puback处理(no ack)

【协议定义】什么情况下broker会收到puback并且需要处理呢?

在broker向订阅客户publish QoS=1的消息时,收到回复puback表示客户端收到该消息。

【处理】mqtt3_handle_puback调用

mqtt3_handle_puback (client_id, mid, dir)

从messages中删除相应一条消息:client_id==订阅者的id。表示推送完成不再需要重复推送,因此从数据库中删除。

【示例】1:

客户端 id_sub01订阅:-t topic01

img

客户端id_pub01发布消息主题为msg,内容为:this is msg from pub01 qos1

img

根据就低原则(参见下面描述),id_sub01只会收到broker的publish(QoS=0),不需要回复puback:

img

【示例】2:客户端 id_sub01订阅:-t topic02 –q 1

img

客户端id_sub01在id_pub01发布QoS=1的消息后收到的是QoS=1的消息

img

8) mqtt pubrec & pubcomp处理

【协议定义】什么情况下broker会收到pubrec和pubcomp?

—当broker向客户端publish QoS=2的消息时,收到客户端的回应不是puback,而是pubrec,表示客户端收到了publish消息,broker将回复pubrel,客户端收到后回复pubcomp。

补充,关于pubXXX信令

https://mcxiaoke.gitbook.io/mqtt/03-controlpackets

【处理】

mqtt3_handle_pubrec处理broker收到pubrec:
  • 调用mqtt3_db_message_update,根据参数更新messages中消息,设置消息状态为

ms_wait_pubcomp,同时更新时间戳,SQL语句可以伪码描述为:

UPDATE messages SET status=ms_wait_pubcomp, timestamp=now
WHERE client_id={client_id} AND mid={mid} AND direction=md_out
  • 调用mqtt3_raw_pubrel给客户端回复pubrel
mqtt3_handle_pubcomp处理broker收到pubrel:
  • 调用mqtt3_db_message_delete依据参数删除messages中消息。
DELETE FROM messages WHERE client_id={client_id} AND mid={mid} AND direction=md_out
9) mqtt pubrel处理

【协议定义】什么情况下broker会收到pubrel?

当客户端向broker publish QoS=2的消息时,broker首先回复pubrec,客户端收到后回复pubrel,broker应回复pubcomp。

【处理】mqtt3_handle_pubrel对收到pubrel进行处理:

调用mqtt3_db_message_release:

找到publish时暂存的md_in消息,取出其内容,并调用mqtt3_db_messages_queue将publish消息插入messages(以及retain如果是持久消息),完成后删除暂存md_in消息;

调用mqtt3_raw_pubcomp回复pubcomp

4.3.4 Broker socket 写操作及相关处理

1)mqtt3_db_outgoing_check发送数据准备,其中
SELECT sock FROM clients JOIN messages ON clients.id=messages.client_id "
				"WHERE (messages.status=1 OR messages.status=2 OR messages.status=4 OR messages.status=6 OR messages.status=8) "
				"AND messages.direction=1 AND sock<>-1

找出“有哪些messages要发送到哪些客户端”。

这里关键是clients.id=messages.client_id:

A、clients.id 是客户端(包括订阅者、发布者,其实只有订阅者会保存在数据库中)的id。如前分析:订阅者在mqtt conn之后提供client id,mosquitto broker将id保存在clients中;

B、messages.client_id,如前分析:messages表中client_id列来自于subs匹配行的client_id列(即订阅者id)

通过这个sql语句,找到了本条messages有哪些订阅者,下一步的动作是推送给这些订阅者,在mqtt3_db_outgoing_check中的处理是找到这些订阅者的socket fd,并加入FD_SET中(FD_ISSET返回真),在接下来的pselect循环会进行处理。

messages.direction=1指的是md_out,发出消息。

messages.status=1、2、4、6、8 分别是下列状态:

ms_publish:消息待推送,在mqtt3_db_message_write中即将完成推送。

ms_publish_puback:消息待推送并且需要收到puback,在mqtt3_db_message_write中即将完成推送并置消息状态为ms_wait_puback。

ms_publish_pubrec:消息待推送并且需要收到pubrec,在mqtt3_db_message_write中即将完成推送并置消息状态为ms_wait_ pubrec。

ms_resend_pubrel:ms_wait_pubrel的消息超时没收到客户端回复pubrel,在mqtt3_db_message_write中即将重发。

ms_resend_pubcomp:ms_wait_ pubcomp的消息超时没收到客户端回复pubcomp,在mqtt3_db_message_write中即将重发。

2)mqtt3_db_message_write发送消息

在经过select及FD_ISSET后,已经进入了某个客户端的发送。mqtt3_db_message_write为完成发送的执行体。

SQL语句select绑定的是client_id,根据messages表中现有待发出的消息的状态,有不同的动作:

switch(status){
    case ms_publish:
        if(!mqtt3_raw_publish(context, false, qos, retain, mid, sub, payloadlen, payload)){
            mqtt3_db_message_delete_by_oid(OID);
        }
        break;
        
    case ms_publish_puback:
        if(!mqtt3_raw_publish(context, false, qos, retain, mid, sub, payloadlen, payload)){
            mqtt3_db_message_update(context->id, mid, md_out, ms_wait_puback);
        }        
        break;
        
    case ms_publish_pubrec:
        if(!mqtt3_raw_publish(context, false, qos, retain, mid, sub, payloadlen, payload)){
            mqtt3_db_message_update(context->id, mid, md_out, ms_wait_pubrec);
        }
        break;
        
    case ms_resend_pubrel:
        if(!mqtt3_raw_pubrel(context, mid)){
            mqtt3_db_message_update(context->id, mid, md_out, ms_wait_pubrel);
        }
        break;
        
    case ms_resend_pubcomp:
        if(!mqtt3_raw_pubcomp(context, mid)){
            mqtt3_db_message_update(context->id, mid, md_out, ms_wait_pubcomp);
        }
        break;
}

ms_publish:推送后删除

ms_publish_puback:推送并置消息状态为ms_wait_puback(在客户端收到并回复puback之后,broker的接收socket处理hand_read会删除推送消息,这样完成了消息状态的闭环)

ms_publish_pubrec:推送并置消息状态为ms_wait_ pubrec

ms_resend_pubrel:ms_wait_pubrel的消息超时没收到客户端回复pubrel,重发后恢复ms_wait_pubrel状态

ms_resend_pubcomp:ms_wait_ pubcomp的消息超时没收到客户端回复pubcomp,重发后恢复ms_wait_ pubcomp状态

4.3.5 Broker socket出错处理

如前所述,客户端断开TCP,不会引起mqtt层相应操作,但broker会完成清理:

删除该订阅者的信息(clients、subs、messages),有遗言的把遗言加入messages(如果遗言持久化消息,还会保存到retain供后续订阅者使用)。

示例1:

  1. 带持久化遗言的订阅(–will-payload “it’s a will msg” --will-retain --will-topic msg)
  2. TCP断开,broker将遗言保存到retain表中:

img

  1. 新的订阅(-t msg)除收到suback外还会收到该主题的遗言

img

4.3.6 各流程 数据库动作

Cli sendclientssubs**(key****:sub****)**messagesretain
connect插入/更新
disconnect依据id和sock,重置sock = -1
TCP disconnect删除删除1、删除客户端相关联消息;2、插入客户端的遗言消息插入(订阅/发布)客户的遗言持久化消息
subscribe插入依据右边表格(retain sub查询结果)插入mqtt3_db_messages_queue(retain_msg) md_out消息(QoS决定了消息状态)1、关键字sub查询,并给左边表格使用2、带遗言持久化的订阅会插入行
unsubscribe删除
publish查询是否有订阅者,结果供右边表格使用QoS=0/1:依据左边表格结果插入的mid_out消息;QoS=2:先插入md_in, ms_wait_pubrec消息,在握手完成后删除该临时消息,并插入publish消息md_out发布的消息是retain型则插入
ping

4.3.7 各流程 协议报文方向

方向
connect上行(即客户端到服务器)
disconnect上行(即客户端到服务器)
TCP disconnect双向
subscribe上行
unsubscribe上行
publish双向。Publish可以发送消息到broker,broker也使用mqtt publish把来自其他客户端的消息推送给subscribe
ping上行

4.3.8 各流程 消息状态

Broker收到客户端pub时,应把消息暂存在messages中:

1)QoS==0/1,直接暂存

2)QoS==2,需要broker回复pubrec,消息状态:ms_publish_pubrec,方向:md_in

Broker转发客户端pub的消息给subscribe时,根据就低原则,选取客户端订阅时QoS和本pub消息的QoS两者小值:

1)QoS==0,不需要客户端回复ack,消息状态:ms_publish,方向md_out

2)QoS==1,需要客户端回复puback,消息状态:ms_publish_puback,方向md_out

3)QoS==2,需要客户端回复pubrec,消息状态:ms_publish_pubrec,方向:md_out

5.5.2 代码分析

代码层面,以mosquito-1.6 服务端代码为例简要分析:

A、程序启动后,经过前期的配置读取或生成默认参数,准备创建socket

B、根据配置的情况在net__socket_listen中逐个创建socket

C、net__socket_listen中,根据getaddrinfo返回的情况,进行socket生成

\1. rc = getaddrinfo(listener->host, service, &hints, &ainfo);

\5. listener->sock_count = 0; listener->socks = NULL;

\3. for(rp = ainfo; rp; rp = rp->ai_next){

\4. if(rp->ai_family == AF_INET){

\5. log__printf(NULL, MOSQ_LOG_INFO, “Opening ipv4 listen socket on port %d.”, ntohs(((struct sockaddr_in *)rp->ai_addr)->sin_port));

\6. }else if(rp->ai_family == AF_INET6){

\7. log__printf(NULL, MOSQ_LOG_INFO, “Opening ipv6 listen socket on port %d.”, ntohs(((struct sockaddr_in6 *)rp->ai_addr)->sin6_port));

\8. }else{

\9. continue;

\10. }

\11. sock = socket(rp->ai_family, rp->ai_socktype, rp->ai_protocol);

\15. //省略后续代码。。。

\13. }

D、虽然可以在socket创建过程中设置为unix_domain的相关参数,但在其他位置,多处负责socket失效重新创建的处理函数及调用setsockopt处,需要对原本的tcp相关标志进行逐一修改 —— 不利于修改后的程序稳定性及维护。

5.5.3 结论

在多进程通信的需求下:

1) 配置文件角度,文件不提供unix域设定方式,无法通过配置参数解决

2) 代码修改角度,并非仅在socket创建阶段修改就可以,涉及的其他代码分布较多,不利于稳定性和后期维护

5.3 资源消耗 BIN/VmSize/VmRSS

5.3.1 测试环境

1)基于x86_64的网管设备;mqtt未使用unix域****socket

2)服务端:Mosquito-1.6 编译后的独立可执行程序。

3)客户端:paho-client-1.3 静态编译后,独立可执行的,订阅程序sub_s、发布程序pub_s .

4)订阅发布:发布者向固定话题每2秒发送一次消息(除消息编号自增,其他内容固定),订阅者只订阅同一话题,异步接收。

5.3.2 运行截图

图:服务端启动运行

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1MgrMAWh-1607785111512)(file:///C:\AppData\Local\Temp\msohtmlclip1\01\clip_image004.jpg)]

图:订阅发布双方交互

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-1seZCiND-1607785111513)(file:///C:\AppData\Local\Temp\msohtmlclip1\01\clip_image006.jpg)]

图:三者网络连接状态

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RuevwMp0-1607785111514)(file:///C:\AppData\Local\Temp\msohtmlclip1\01\clip_image008.jpg)]

5.3.3 资源消耗指标

主要记录观察服务端、订阅端、发布端、3个进程各自的资源消耗,每秒采样一次,结果如下

1)服务端mosquito

进程虚拟内存vmSize大小14020kB,物理内存vmRss大小1820kB

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-BScXKrwm-1607785111515)(file:///C:\AppData\Local\Temp\msohtmlclip1\01\clip_image010.jpg)]

2)订阅端进程

进程虚拟内存vmSize大小74964kB,物理内存vmRss大小1568kB

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-kc28SQ5F-1607785111517)(file:///C:\AppData\Local\Temp\msohtmlclip1\01\clip_image015.jpg)]

3)发布端进程

进程虚拟内存vmSize大小7376kB,物理内存vmRss大小1568kB

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-eu1ulPXr-1607785111518)(file:///C:\AppData\Local\Temp\msohtmlclip1\01\clip_image014.jpg)]

4)汇总

虚拟内存vmSize kB物理内存vmRss kB
服务端 mosquito140201820
订阅端 sub749671568
发布端 pub73761568

5.4 报文分析

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-IrDJIKs3-1607785111519)(file:///C:\AppData\Local\Temp\msohtmlclip1\01\clip_image016.jpg)]

5.4.1 报文格式

MQTT控制报文由三部分组成,MQTT控制报文的结构

Fixed header 固定报头所有控制报文都包含
Variable header 可变报头部分控制报文包含
Payload 有效载荷部分控制报文包含

5.4.2 固定报头

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-J3AeGxzB-1607785111519)(file:///C:\AppData\Local\Temp\msohtmlclip1\01\clip_image018.jpg)]

1**)MQTT控制报文的类型(命令字),4:7位**

名字报文流动方向描述
Reserved0禁止保留
CONNECT1客户端到服务端客户端请求连接服务端
CONNACK2服务端到客户端连接报文确认
PUBLISH3两个方向都允许发布消息
PUBACK4两个方向都允许QoS 1消息发布收到确认
PUBREC5两个方向都允许发布收到(保证交付第一步)
PUBREL6两个方向都允许发布释放(保证交付第二步)
PUBCOMP7两个方向都允许QoS 2消息发布完成(保证交互第三步)
SUBSCRIBE8客户端到服务端客户端订阅请求
SUBACK9服务端到客户端订阅请求报文确认
UNSUBSCRIBE10客户端到服务端客户端取消订阅请求
UNSUBACK11服务端到客户端取消订阅报文确认
PINGREQ12客户端到服务端心跳请求
PINGRESP13服务端到客户端心跳响应
DISCONNECT14客户端到服务端客户端断开连接
Reserved15禁止保留

2**)标志****Flag**

固定报头第1个字节的剩余的4位 [3-0]包含每个MQTT控制报文类型特定的标志。

控制报文固定报头标志Bit 3Bit 2Bit 1Bit 0
CONNECTReserved0000
CONNACKReserved0000
PUBLISHUsed in MQTT 3.1.1DUP1QoS2QoS2RETAIN3
PUBACKReserved0000
PUBRECReserved0000
PUBRELReserved0010
PUBCOMPReserved0000
SUBSCRIBEReserved0010
SUBACKReserved0000
UNSUBSCRIBEReserved0010
UNSUBACKReserved0000
PINGREQReserved0000
PINGRESPReserved0000
DISCONNECTReserved0000

DUP1 =控制报文的重复分发标志;

QoS2 = PUBLISH报文的服务质量等级;(00,01,10)

RETAIN3 = PUBLISH报文的保留标志

3**)剩余长度**

剩余长度(Remaining Length)表示当前报文剩余部分的字节数,包括可变报头和负载的数据。剩余长度不包括用于编码剩余长度字段本身的字节数。

剩余长度字段使用一个变长度编码方案,对小于128的值它使用单字节编码。更大的值按下面的方式处理。低7位有效位用于编码数据,最高有效位用于指示是否有更多的字节。因此每个字节可以编码128个数值和一个延续位(continuation bit)。剩余长度字段最大4个字节。例:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vr5PJ6Zz-1607785111520)(file:///C:\AppData\Local\Temp\msohtmlclip1\01\clip_image020.jpg)]

5.4.3 可变报头

1)报文标识符

Bit7 - 0
byte 1报文标识符 MSB
byte 2报文标识符 LSB

​ 很多控制报文的可变报头部分包含一个两字节的报文标识符字段。这些报文是PUBLISH(QoS > 0时), PUBACK,PUBREC,PUBREL,PUBCOMP,SUBSCRIBE, SUBACK,UNSUBSCRIBE,UNSUBACK。

SUBSCRIBE,UNSUBSCRIBE和PUBLISH(QoS大于0)控制报文必须包含一个非零的16位报文标识符(Packet Identifier)[MQTT-5.3.1-1]。客户端每次发送一个新的这些类型的报文时都必须分配一个当前未使用的报文标识符 [MQTT-5.3.1-2]。如果一个客户端要重发这个特殊的控制报文,在随后重发那个报文时,它必须使用相同的标识符。当客户端处理完这个报文对应的确认后,这个报文标识符就释放可重用。QoS 1的PUBLISH对应的是PUBACK,QoS 2的PUBLISH对应的是PUBCOMP,与SUBSCRIBE或UNSUBSCRIBE对应的分别是SUBACK或UNSUBACK [MQTT-5.3.1-3]。发送一个QoS 0的PUBLISH报文时,相同的条件也适用于服务端 [MQTT-5.3.1-4]。

QoS等于0的PUBLISH报文不能包含报文标识符 [MQTT-5.3.1-5]。

PUBACK, PUBREC, PUBREL报文必须包含与最初发送的PUBLISH报文相同的报文标识符 [MQTT-5.3.1-6]。类似地,SUBACK和UNSUBACK必须包含在对应的SUBSCRIBE和UNSUBSCRIBE报文中使用的报文标识符 [MQTT-5.3.1-7]。

需要报文标识符的控制报文在 表格 5.5 -包含报文标识符的控制报文 中列出。

控制报文报文标识符字段控制报文报文标识符字段
CONNECT不需要SUBSCRIBE需要
CONNACK不需要SUBACK需要
PUBLISH需要(如果QoS > 0)UNSUBSCRIBE需要
PUBACK需要UNSUBACK需要
PUBREC需要PINGREQ不需要
PUBREL需要PINGRESP不需要
PUBCOMP需要DISCONNECT不需要

客户端和服务端彼此独立地分配报文标识符。因此,客户端服务端组合使用相同的报文标识符可以实现并发的消息交换

参考:https://github.com/mcxiaoke/mqtt/blob/master/mqtt/02-ControlPacketFormat.md

5.4.4 有效载荷

某些MQTT控制报文在报文的最后部分包含一个有效载荷。

控制报文有效载荷
CONNECT需要
CONNACK不需要
PUBLISH可选
PUBACK不需要
PUBREC不需要
PUBREL不需要
PUBCOMP不需要
SUBSCRIBE需要
SUBACK需要
UNSUBSCRIBE需要
UNSUBACK不需要
PINGREQ不需要
PINGRESP不需要
DISCONNECT不需要

6 总结

1) mqtt协议,基于订阅发布模式,主要在网络环境下提供轻量级的协议沟通机制。

2) mqtt库原生不支持unix domain socket,如多进程通信角度,考虑到可维护性不建议对于代码的socket连接相关环节进行对应的修改;

3) 在上文的测试环境下,x86-64设备端运行服务端、订阅端、发布端,资源消耗较低。

7 参考文献

1、 https://www.runoob.com/w3cnote/mqtt-intro.html

2、 https://blog.csdn.net/quending/article/details/85254661

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐