基于EMQX在Vue和Nodejs中使用mqtt实现多端实时通信
在vue项目中通过WebSocket客户端连接mqtt进行:连接,订阅,发布,接收等基础操作。
简单介绍一下EMQX
1.EMQX是一个开源的分布式物联网(MQTT)消息服务器。它是一个高性能、高可靠性的消息传输中间件,用于连接和传输物联网设备之间的数据。
2.EMQX提供了MQTT协议的完整实现,并支持大规模的设备连接、消息发布/订阅、设备管理和数据路由。它具有水平可扩展性,可以轻松地处理数百万个同时连接的设备,并提供可靠且低延迟的消息传递。
3.EMQX还提供了丰富的功能,如设备身份验证、消息持久化、QoS(服务质量)控制、故障转移、集群和数据分析等。它可以用于构建各种物联网应用,如智能家居、工业自动化、智能交通等。
上面是官方的一些介绍,其实EMQX的主要作用就是用来实现实时通信,而且这个技术广泛运用在物联网项目中,大多数的硬件通信协议都是tcp,而软件大多数是http或者https,所以呢,利用EMQX这个中间件就可以实现实时通信了
通过这张图也能更好地理解,订阅者订阅的主题只要与发布者的主题一致,那订阅者就可以接收到这个消息,而且订阅者不只是一个,而是成百上千的,我下面也会演示多端订阅的效果,其实利用EMQX实现此功能不只有mqtt,像QUIC 和 WebSocke都是可以的,这里我介绍一种,其他的都是一样融汇贯通即可。
安装EMQX
首先呢,我们要在自己的电脑上面安装EMQX
这是下载网址:下载 EMQX
根据我下面的图指引下载即可,选择好自己的操作系统和版本,点击下载后解压出来就行,(我下载的是5.1版本,每个版本的客户端界面可能有些许不同)
按照它的指引,进入到下面的目录
在里面输入cmd回车
输入此命令 emqx start
然后打开任务管理器,看到有此进程就证明我们开启成功了
然后在浏览器中输入此网址:http://localhost:18083/
第一次登录的初始账号密码为:admin public
登录后他会让你更改密码,改一个容易记住的即可
这就是管理面板的界面了,在这里可以看到有多少用户连接
注意,我们这里是开启emqx服务后,就代表自己是一台服务器了,就代表我们作为其中的中转站,也就是此图片中的这样
接下来就可以去Vue项目和Nodejs中编辑代码了
在Vue中用mqtt连接EMQX
1.首先在Vue项目中安装mqtt
npm i mqtt --save
2.在组件中引入mqtt.js
import mqtt from 'mqtt/dist/mqtt'
3.在methods中写入连接mqtt的方法并在mounted生命周期中调用此方法
initMqtt() {
// 连接配置选项
let options = {
connectTimeout: 4000, // 超时时间
// 认证信息
clientId: '111', //不填默认随机生成一个ID
username: 'admin', //用户名
password: '123456' //密码
}
this.client = mqtt.connect('ws://localhost:8083/mqtt', options) //调用连接的api
//连接成功
this.client.on('connect', (e) => {
console.log('连接成功', e)
})
//重连提醒
this.client.on('reconnect', (error) => {
console.log('正在重连', error)
})
//连接失败提醒
this.client.on('error', (error) => {
console.log('连接失败', error)
})
}
上面的代码中, options
是客户端连接选项,以下是主要参数说明,详情参数可见:mqtt - npm。
-
keepalive:心跳时间,默认 60秒,设置 0 为禁用;
-
clientId: 客户端 ID ,默认通过
'mqttjs_' + Math.random().toString(16).substr(2, 8)
随机生成; -
username:连接用户名(如果有);
-
password:连接密码(如果有);
-
clean:true,设置为 false 以在离线时接收 QoS 1 和 2 消息;
-
reconnectPeriod:默认 1000 毫秒,两次重新连接之间的间隔,客户端 ID 重复、认证失败等客户端会重新连接;
-
connectTimeout:默认 30 * 1000毫秒,收到 CONNACK 之前等待的时间,即连接超时时间。
在我们的管理面板中就可以看到id为:111的连接成功了
注意一下,此地址:ws://localhost:8083/mqtt 就是连接我们电脑中开启的EMQX服务器的默认地址,当然如果其他地方有开启作为服务器,你修改这里的地址即可
当我们连接成功之后可以看到在Vue实例身上就挂载了mqtt中的client对象,在之后我们可以直接使用this调用它身上的方法
在我们每次要更新页面时一定要及时的断开与mqtt的连接,否则就会出现重复连接的情况,我们可以选择一个组件销毁时的生命周期钩子函数来断开连接,这里我使用的是:beforeDestroy
// 在组件销毁之前与mqtt断开连接
beforeDestroy() {
//断开连接
this.client.end()
this.client = null
console.log('已经与mqtt客户端断开连接')
}
4.订阅单个消息
我们可以在methods中写一个订阅单个消息的方法,可以在mqtt连接之后调用此订阅方法
//订阅一个信息
subscribe() {
const str = 'setM3'
this.client.subscribe(str, { qos: 0 }, (err) => {
if (!err) {
console.log(`主题为:“${str}” 的消息订阅成功`)
} else {
console.log('消息订阅失败')
}
})
},
在控制台中可以看到已经订阅成功了
5.订阅多个消息
//订阅多个信息
subscribes() {
const arr = ['setM2', 'setM3']
this.client.subscribe(arr, { qos: 1 }, (err) => {
if (!err) {
console.log(`主题为:“${arr}” 的消息订阅成功`)
} else {
console.log('消息订阅失败')
}
})
},
这里我们的多个消息就订阅成功了
6.发布消息
//发布信息
publish(topic, message) {
if (!this.client.connected) {
console.log('客户端未连接')
return
}
this.client.publish(topic, message, { qos: 0 }, (err) => {
if (!err) {
console.log(`主题为:${topic},内容为:${message} 发布成功`)
}
})
},
我们可以在订阅完消息之后就发布我们的消息,下面举个例子,我们可以在订阅单个消息中嵌入此句代码:需要注意的是我们要确定发布消息的主题,和内容,只有订阅了此主题的人才会接收到此数据,还有就是我们的消息一定要满足其为JSON格式的数据
this.publish('setM2', '{"data":"123"}')
可以看到控制台中已经显示发布成功了:
这里我之所以能接收到数据,因为我在发布之前也订阅了setM2,所以我作为订阅者也接受到了此数据,接受数据的函数在下面
7.接收消息
这里需要注意的是,我们接收的消息只能是发布出去的消息,没有发布的消息我们是不能接收的
//接收消息
receive() {
this.client.on('message', (topic, message) => {
console.log(`收到来自:${topic} 的消息:${message}`)
const data = JSON.parse(`${msg}`) //接受到控制信号的数据
console.log(data)
})
}
我们利用js的JSON.parse将其转换为对象,就可以很方便的操作其中的数据了
我们可以在发布消息后调用此方法,既可以查看接收到的消息
在node.js端利用mqtt连接EMQX
这里的代码其实和vue中是类似的,所以我就不一一解析了,毕竟都是js代码嘛
这是mqtt的连接配置代码:将其导出后使用
const mqtt = require('mqtt')
//mqtt端口号:
// 1883 MQTT 协议端口
// 8883 MQTT/SSL 端口
// 8083 MQTT/WebSocket 端口
// 8080 HTTP API 端口
// 18083 Dashboard 管理控制台端口
// 连接选项
const options = {
clean: true, // true: 清除会话, false: 保留会话
connectTimeout: 4000, // 超时时间
// 认证信息
clientId: 'emqx_test',
username: 'admin',
password: '123456'
}
//ws://localhost:8083/mqtt
const connectUrl = 'ws://localhost:8083/mqtt'
const client = mqtt.connect(connectUrl, options) //建立连接
module.exports = client
这是其他的代码:
const express = require('express')
const client = require('./mqtt')
const app = express()
client.on('connect', () => {
console.log('连接成功')
})
client.on('reconnect', (error) => {
console.log('正在重连:', error)
})
client.on('error', (error) => {
console.log('连接失败:', error)
})
//订阅setM3消息
client.subscribe('setM3', (err) => {
console.log('订阅setM3成功')
})
client.on('message', (topic, message) => {
const formattedTime = dayjs().format('YYYY-MM-DD HH:mm:ss')
console.log('收到消息:', topic, message.toString())
const data = JSON.parse(message)
console.log(data)
})
app.listen(8080, () => {
console.log('api server running at http://127.0.0.1:8080')
})
然后将其运行在8080端口
可以看到我们的管理面板此时已经有二个用户了
其实在管理面板中我们也可以模拟用户连接,配置必要的选项和id等即可连接
连接好后此时就有三个用户连接了
我们的Vue客户端订阅的主题为setM2和setM3,我们的nodejs服务端订阅的主题是setM3
现在我们可以在管理面板中的此模拟用户来发送对应的数据,然后看看其他二个用户能不能接收到数据
我模拟发送的数据如下,主题为setM3,内容是一个JSON数据,点击发布
我们的客户端和服务端都接收到了此数据
我们再来试试发送主题为setM2的数据看看情况如何
可以看到客户端是可以接收到此请求的,服务端因为没有订阅是接收不到此数据的
我们再来模拟客户端发送数据试试,在这之前我们可以改写一下代码,服务端订阅setM1,模拟的用户也可以订阅setM1
这是我们客户端发送的消息
this.publish('setM1', '{"data":"都看到这里了,给个三连不过分吧"}')
我们模拟的用户能接收到数据
服务端也可以
看到这里相信你对emqx和mqtt已经有了一定的理解,会基本的使用了,如有问题可以随时评论或者私信我,我会回复,给个三连吧(疯狂暗示)
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)