aligaduo 个人博客​www.zhangxiaoshuai.funMQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的"轻量级"通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用。

MQTT是一个基于客户端-服务器的消息发布/订阅传输协议。MQTT协议是轻量、简单、开放和易于实现的,这些特点使它适用范围非常广泛。在很多情况下,包括受限的环境中,如:机器与机器(M2M)通信和物联网(IoT)。其在,通过卫星链路通信传感器、偶尔拨号的医疗设备、智能家居、及一些小型化设备中已广泛使用。

EMQ XEMQ X Broker 是基于高并发的 Erlang/OTP 语言平台开发,支持百万级连接和分布式集群架构,发布订阅模式的开源 MQTT 消息服务器。

它到底能干啥?MQTT 协议广泛应用于物联网、移动互联网、智能硬件、车联网、电力、能源等领域。

EMQ X MQTT服务器是一款由 EMQ 开源的支持分布式集群架构的 MQTT 消息服务器,可处理千万级别的并发客户端,目前广泛应用于全球物联网市场。

那既然我们要使用EMQX,那我们需要先安装它,搭建它可以运行的环境,本次给出的是Windows版本和Linux版本(官方推荐是最好将EMQX部署在Linux系统上);

Windows版本安装

下载

安装启动安装方式也是非常的银杏,开箱即用。将压缩包解压至我们想要放置的位置;

进入bin目录下,打开命令窗口,然后输入emqx start就可以启动emqx了

功能试用启动之后,我们可以使用官方提供的Dashboard进行简单测试;

打开浏览器输入地址:localhost:18083,账号:admin 密码:public

刚进去Clients中应该是没有用户端连接的;

找到Tools中的Websocket,通过设置一些参数,可以进行消息的发布和订阅;

启动客户端连接

订阅主题

发布消息

注意:我们订阅的是哪个主题的消息,在这个主题的消息发布之后,才可以收到;如果是不同主题的消息发布之后我们是接收不到的

测试

可以看到,在我们发布消息的之后,订阅端已经收到了我们发布到消息

node-red

node-red是一款可视化物联网编排工具,使用它来帮助我们理解,效果会更好一些;(安装方法很多,这里不再赘述)

Linux版本安装

本机环境:CentOS 7 64-bit

下载安装

启动emqx start

启动之后,查看一下emqx的状态是不是成功启动的

emqx_ctl status

如果显示的是

Node 'emqx@127.0.0.1' is started

emqx 4.0.0 is running

说明已经启动成功;

功能试用虽然我们是装在linux系统(非可视化版本)中,但是也可以在windows系统中达到上面的测试效果,这就需要我们开放一些emqx需要的一些端口

EMQ官网中有对命令和具体操作有很详细的文档

EMQ X支持多种编程语言的客户端

Java Demo

public class ClientMQTT {

public static final String HOST = "tcp://192.168.112.210:1883";

public static final String TOPIC1 = "test";

private static final String clientid = "JK115522";

private MqttClient client;

private MqttConnectOptions options;

//private String userName = "mqtt"; //非必须 //private String passWord = "mqtt"; //非必须 private ScheduledExecutorService scheduler;

private void start() {

try {

// host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence());

// MQTT的连接设置 options = new MqttConnectOptions();

// 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,设置为true表示每次连接到服务器都以新的身份连接 options.setCleanSession(false);

// 设置连接的用户名 // options.setUserName(userName); // 设置连接的密码 // options.setPassword(passWord.toCharArray()); // 设置超时时间 单位为秒 options.setConnectionTimeout(10);

// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 options.setKeepAliveInterval(20);

//设置断开后重新连接 options.setAutomaticReconnect(true);

// 设置回调 client.setCallback(new PushCallback());

MqttTopic topic = client.getTopic(TOPIC1);

//setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 //遗嘱 options.setWill(topic, "close".getBytes(), 1, true);

client.connect(options);

//订阅消息 int[] Qos = {1};//0:最多一次 、1:最少一次 、2:只有一次 String[] topic1 = {TOPIC1};

client.subscribe(topic1, Qos);

} catch (Exception e) {

e.printStackTrace();

}

}

public static void main(String[] args) {

ClientMQTT client = new ClientMQTT();

client.start();

}

}

public class ServerMQTT {

//tcp://MQTT安装的服务器地址:MQTT定义的端口号 public static final String HOST = "tcp://192.168.112.210:1883";

//定义一个主题 public static final String TOPIC = "test";

//定义MQTT的ID,可以在MQTT服务配置中指定 private static final String clientid = "JK115522";

private MqttClient client;

private static MqttTopic topic11;

// private String userName = "mqtt"; //非必须// private String passWord = "mqtt"; //非必须

private static MqttMessage message;

/*** 构造函数* @throws MqttException*/

public ServerMQTT() throws MqttException {

// MemoryPersistence设置clientid的保存形式,默认为以内存保存 client = new MqttClient(HOST, clientid, new MemoryPersistence());

connect();

}

/*** 用来连接服务器*/

private void connect() {

MqttConnectOptions options = new MqttConnectOptions();

options.setCleanSession(false);

// options.setUserName(userName);// options.setPassword(passWord.toCharArray()); // 设置超时时间 options.setConnectionTimeout(10);

// 设置会话心跳时间 options.setKeepAliveInterval(20);

try {

client.setCallback(new PushCallback());

client.connect(options);

topic11 = client.getTopic(TOPIC);

} catch (Exception e) {

e.printStackTrace();

}

}

/**** @param topic* @param message* @throws MqttPersistenceException* @throws MqttException*/

public static void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException,

MqttException {

MqttDeliveryToken token = topic.publish(message);

token.waitForCompletion();

System.out.println("message is published completely! "

+ token.isComplete());

}

public static void sendMessage(String clieId,String msg)throws Exception{

ServerMQTT server = new ServerMQTT();

server.message = new MqttMessage();

server.message.setQos(1); //保证消息能到达一次 server.message.setRetained(true);

String str ="{\"clieId\":\""+clieId+"\",\"mag\":\""+msg+"\"}";

server.message.setPayload(str.getBytes());

try{

publish(server.topic11 , server.message);

//断开连接// server.client.disconnect(); }catch (Exception e){

e.printStackTrace();

}

}

/*** 启动入口* @param args* @throws MqttException*/

public static void main(String[] args) throws Exception {

for(int i = 0;i < 100;i ++){

sendMessage("123444","哈哈");

}

}

}

public class PushCallback implements MqttCallback {

public void connectionLost(Throwable cause) {

// 连接丢失后,一般在这里面进行重连 System.out.println("连接断开,可以做重连");

}

public void deliveryComplete(IMqttDeliveryToken token) {

System.out.println("deliveryComplete---------" + token.isComplete());

}

public void messageArrived(String topic, MqttMessage message) throws Exception {

// subscribe后得到的消息会执行到这里面 System.out.println("接收消息主题 : " + topic);

System.out.println("接收消息Qos : " + message.getQos());

System.out.println("接收消息内容 : " + new String(message.getPayload()));

}

}

微信公众号:

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐