一,MQTT 及其在物联网中的应用

MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,设计用于低带宽、延迟高、不稳定的网络环境,特别适合物联网(IoT)应用。它采用了发布/订阅(Pub/Sub)模型,以简化设备之间的消息交换,是物联网领域广泛采用的通信协议之一。

MQTT 的核心概念:

  1. Broker(消息代理):MQTT Broker 是 MQTT 通信的核心,负责接收、存储并分发消息。所有客户端通过 Broker 进行通信,客户端之间不会直接相互通信。
  2. Publisher(发布者):客户端可以作为发布者,向某个主题(Topic)发布消息。
  3. Subscriber(订阅者):客户端可以订阅一个或多个主题,Broker 会将对应主题的消息推送给订阅者。
  4. Topic(主题):消息通过主题组织,Publisher 发布消息时会指定主题,Subscriber 订阅相应主题来接收消息。
  5. QoS(服务质量):MQTT 提供三种消息传递服务质量:
    QoS 0:消息最多传递一次,不保证消息会被成功接收。
    QoS 1:消息至少传递一次,保证消息至少会被接收到一次。
    QoS 2:消息传递一次且仅一次,保证消息不会重复或丢失。

MQTT 专为物联网(IoT)设备和低带宽、高延迟或不可靠的网络设计。它的主要特点包括:

  • 轻量级: 协议简单,消息开销小,适合资源受限的设备。
  • 发布/订阅模式: 允许一对多的消息分发和应用程序解耦。
  • 可靠性: 提供三种服务质量级别(QoS),确保消息传递。
  • 安全性: 支持TLS加密和用户名/密码认证。
  • 保留消息: 可以存储最后一条消息,新订阅者可立即获得最新状态。

由于其轻量级的特性,MQTT 广泛应用于物联网中,适用于以下场景:

  • 远程监控与管理:通过 MQTT 协议,物联网设备(如传感器、智能家居设备等)可以定期向服务器上传数据,实现远程监控。服务器也可以通过 MQTT 向设备发送控制命令,进行远程管理。
  • 实时数据传输:MQTT 支持低延迟的消息传输,适合需要实时数据更新的场景,如工业自动化、智能电网、车联网等。
  • 低功耗设备通信:由于 MQTT 协议的低带宽和低开销,适合电池供电的物联网设备,通过节省功耗延长设备的使用寿命。
  • 智能家居:MQTT 协议广泛应用于智能家居系统中,例如控制灯光、恒温器、安防设备等,用户可以通过手机或其他终端远程控制家中的设备。
  • 车联网(V2X):在车联网中,MQTT 可以用于车辆与后台服务器之间的数据传输,如状态监控、远程控制、紧急事件处理等。

在典型的物联网架构中,MQTT 作为一种协议桥接了物联网设备与云平台之间的数据传输,架构通常包括以下主要部分:

  • 物联网设备(IoT Devices):如传感器、智能家居设备等,设备通过 MQTT 协议将数据发布到 MQTT Broker,也可以订阅主题接收命令。
  • MQTT Broker:位于物联网系统的核心位置,负责管理客户端的连接和消息的传输。所有设备和服务器的通信都通过 Broker 进行,Broker 根据主题分发消息。
  • 云平台(Cloud Platform):云平台通常订阅设备的数据主题,接收到设备上传的数据进行存储和处理。云平台也可以通过 Broker 向设备发布控制命令。
  • 用户终端(User Interface):通过手机应用、网页等形式,用户可以远程查看设备状态和数据,并发送控制命令,控制物联网设备的运行。
         +--------------------+         +--------------------+
         |                    |         |                    |
         |  User Interface    |         |    Cloud Platform  |
         |      (Mobile)      |         |(Web App, Analytics)|
         +--------------------+         +--------------------+
                   ^                                ^
                   |                                |
                   |        Subscribe/Publish       |
                   |                                |
                   v                                v
         +----------------------------------------------+
         |               MQTT Broker                    |
         |    (Mosquitto, EMQX, HiveMQ, etc.)           |
         +----------------------------------------------+
                   ^                                ^
                   |                                |
            Publish|                                |Subscribe
                   |                                |
                   v                                v
       +--------------------+              +--------------------+
       |  IoT Device 1       |              |  IoT Device 2       |
       | (Sensors, Actuators)|              | (Sensors, Actuators)|
       +--------------------+              +--------------------+
  1. 物联网设备(如 IoT Device 1 和 IoT Device 2)通过 MQTT 协议连接到 Broker,并且分别发布数据或订阅控制命令的主题。
  2. MQTT Broker 是中心节点,负责接收物联网设备发布的消息,并将消息推送给订阅者(如云平台或用户终端)。
  3. 云平台 订阅设备的数据,并通过处理这些数据实现监控、分析或控制操作。云平台也可以通过发布控制命令,控制物联网设备的行为。
  4. 用户终端(如移动应用或 Web 界面)允许用户查看设备的状态和数据,并发送控制命令到云平台,云平台再通过 MQTT Broker 传递给相应的物联网设备。

二,在 Windows 10 中安装与测试 MQTT broker

  1. 下载 Mosquitto:
    - 访问 https://mosquitto.org/download/
    - 下载最新版本的 Windows 安装程序 (64-bit)

  2. 安装 Mosquitto:
    - 运行下载的安装程序
    - 按照安装向导的提示进行操作

  3. 添加 添加 Mosquitto 安装路径到系统路径。

  4. 配置 Mosquitto:
    - 打开记事本,以管理员身份运行
    - 打开安装路径的文件: mosquitto.conf
    - 添加或修改以下行:

     listener 1883 # MQTT默认端口
     allow_anonymous true # 允许匿名连接(仅用于测试)
     persistence true # 开启持久化,重启后保留消息
     persistence_location G:\mosquitto\data # 持久化文件存储路径
    
  5. 启动 Mosquitto 服务:

    • 打开命令提示符(以管理员身份运行)
    • 输入以下命令:
       net start mosquitto
      
  6. 设置 Mosquitto 为自动启动:
    - 打开 “Services” (服务)
    - 找到 “Mosquitto Broker” 服务
    - 右击并选择 “Properties”
    - 将 “Startup type” 设置为 “Automatic”
    - 点击 “Apply” 然后 “OK”

  7. 测试 Mosquitto:
    - 打开两个命令提示符窗口
    - 在第一个窗口中,输入订阅命令: mosquitto_sub -t test/topic
    - 在第二个窗口中,输入发布命令: mosquitto_pub -t test/topic -m “Hello MQTT”
    - 如果在第一个窗口中看到 “Hello MQTT”,则说明 Mosquitto 运行正常

  8. 安全配置(生产环境)——启用用户认证:
    - 添加用户: mosquitto_passwd -c G:\mosquitto\passwd username
    - 在配置文件中启用认证: password_file G:\mosquitto\password.txt

  9. 监控和日志
    - 查看Mosquitto日志文件: G:\mosquitto\log\mosquitto.log
    - 使用工具如Prometheus+Grafana对Mosquitto进行监控

三,物联网设备通过 MQTT 发布与订阅数据

(一)模拟

由于手边暂时没有设备,这里就用python写一个程序来模拟。

1,首先开一个命令行窗口启动 MQTT:

G:\mosquitto>mosquitto -v
1725282189: mosquitto version 2.0.18 starting
1725282189: Using default config.
1725282189: Starting in local only mode. Connections will only be possible from clients running on this machine.
1725282189: Create a configuration file which defines a listener to allow remote access.
1725282189: For more details see https://mosquitto.org/documentation/authentication-methods/
1725282189: Opening ipv4 listen socket on port 1883.
1725282189: Opening ipv6 listen socket on port 1883.
1725282189: mosquitto version 2.0.18 running
  • 此命令会启动 Mosquitto 并以详细模式输出日志。

2,编写模拟物联网设备的 Python 客户端。
用 paho-mqtt Python 库,它是一个流行的 MQTT 客户端库,可以方便地发布和订阅消息。

import json
import random
import time

import paho.mqtt.client as mqtt

# MQTT Broker 的地址和端口
broker_address = "localhost"  # 本地服务器
broker_port = 1883

# 设备 ID
device_id = "device_001"
data_topic = f"devices/{device_id}/data"
control_topic = f"devices/{device_id}/control"

# MQTT 客户端
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, device_id)


# 连接回调函数
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    # 连接成功后,订阅控制命令主题
    client.subscribe(control_topic)


# 消息回调函数
def on_message(client, userdata, msg):
    print(f"Message received from {msg.topic}: {msg.payload.decode()}")


# 连接 MQTT 服务器
client.on_connect = on_connect
client.on_message = on_message
client.connect(broker_address, broker_port, 60)

# 启动 MQTT 客户端的网络循环
client.loop_start()

# 模拟设备定期发送数据
try:
    while True:
        # 模拟传感器数据
        sensor_data = {
            "temperature": round(random.uniform(20.0, 30.0), 2),
            "humidity": round(random.uniform(30.0, 70.0), 2),
            "timestamp": time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
        }

        # 将数据发布到 data_topic
        client.publish(data_topic, json.dumps(sensor_data))
        print(f"Published data: {sensor_data}")

        # 每隔 5 秒发送一次数据
        time.sleep(5)

except KeyboardInterrupt:
    print("Simulation stopped.")

# 停止 MQTT 客户端
client.loop_stop()
client.disconnect()
  • 运行 Python 脚本,模拟设备会定期向 devices/device_001/data 主题发布传感器数据

3,测试设备与 MQTT Broker 的通信。
开一个命令行窗口监听设备数据:

mosquitto_sub -h localhost -t "devices/device_001/data"

在这里插入图片描述
开一个命令行窗口发送控制命令:

mosquitto_pub -h localhost -t "devices/device_001/control" -m '{"command": "turn_on", "target": "fan"}'

在这里插入图片描述

(二)使用 ESP8266 + DHT11 + MicroPython

使用ESP8266外接一个DHT11温湿度传感器,用MicroPython编程,让ESP8266采集温湿度数据,并通过MQTT发送到部署在电脑上的MQTTbroker。

1,硬件:
- ESP8266 板子(如 NodeMCU 或 Wemos D1 Mini)
- DHT11 温湿度传感器

2,接线:
- DHT11 VCC → ESP8266 3.3V
- DHT11 GND → ESP8266 GND
- DHT11 数据引脚 → ESP8266 GPIO 引脚(如 GPIO 2/D4)

3,下载并将 MicroPython 固件刷入 ESP8266 板子上:

  • 安装 esptool.py:pip install esptool
  • 刷入固件:
    esptool.py --port /dev/ttyUSB0 erase_flash
    esptool.py --port /dev/ttyUSB0 --baud 460800 write_flash --flash_size=detect 0 esp8266-<version>.bin
    

4,连接到 ESP8266:
通过串口工具(如 PuTTY 或 Thonny),连接到 ESP8266,使用 REPL 进行调试和执行 MicroPython 代码。

5,安装 MQTT 和 DHT11 驱动:
MicroPython 中已经有内置的 umqtt 和 dht 模块,用于 MQTT 通信和 DHT11 传感器数据采集。

6,编写 MicroPython 代码:

import network
import time
import dht
import machine
from umqtt.simple import MQTTClient

# Wi-Fi 配置
SSID = 'your_ssid'
PASSWORD = 'your_password'

# MQTT Broker 配置
MQTT_BROKER = '192.168.1.100'  # 电脑上的 MQTT Broker IP
MQTT_TOPIC = 'home/temperature'
CLIENT_ID = 'esp8266_dht11'

# 连接 Wi-Fi
def connect_wifi(ssid, password):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print('Connecting to network...')
        wlan.connect(ssid, password)
        while not wlan.isconnected():
            pass
    print('Network connected:', wlan.ifconfig())

# 初始化 DHT11 传感器
dht_pin = machine.Pin(2)  # DHT11 数据引脚接到 GPIO2/D4
sensor = dht.DHT11(dht_pin)

# 连接 Wi-Fi
connect_wifi(SSID, PASSWORD)

# 连接 MQTT
client = MQTTClient(CLIENT_ID, MQTT_BROKER)
client.connect()

try:
    while True:
        # 采集数据
        sensor.measure()
        temperature = sensor.temperature()
        humidity = sensor.humidity()
        
        # 创建消息
        msg = f'{{"temperature": {temperature}, "humidity": {humidity}}}'
        print('Publishing:', msg)
        
        # 发布到 MQTT 服务器:消息会发布到 home/temperature 主题上。
        client.publish(MQTT_TOPIC, msg)
        
        # 每隔10秒发送一次数据
        time.sleep(10)
        
except KeyboardInterrupt:
    print("Stopping...")
    client.disconnect()
  • 连接到 Wi-Fi。
  • 使用 DHT11 传感器采集温湿度数据。
  • 将数据通过 MQTT 协议发送到本地的 MQTT Broker。

四,云平台通过 MQTT 订阅数据与发布命令

云平台要做的就是:

  • 接收 MQTT 消息:Django 应用需要订阅 MQTT Broker 的温度数据。
  • 分析数据并作出决策:当接收到的温度数据超过 50°C 时,发送命令打开风扇,否则关闭风扇。
  • 通过 MQTT 发送控制命令:使用 Django 应用中的 MQTT 客户端发送控制命令(如 fan/on 或 fan/off)到 MQTT Broker,控制风扇。

1,接收温度数据并分析。
创建一个管理命令(management command)来启动 MQTT 客户端并订阅主题。

# myapp/
#  management/
#    commands/
#      mqtt_subscriber.py


import json
import paho.mqtt.client as mqtt
from django.core.management.base import BaseCommand

MQTT_BROKER = 'localhost'  # MQTT Broker 地址
TEMPERATURE_TOPIC = 'home/temperature'  # 订阅的温度主题
FAN_CONTROL_TOPIC = 'home/fan/control'  # 风扇控制命令主题
CLIENT_ID = 'django_subscriber'

# 温度阈值
TEMP_THRESHOLD = 50  # 摄氏度

# 定义 MQTT 客户端回调函数
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")
    client.subscribe(TEMPERATURE_TOPIC)

def on_message(client, userdata, msg):
    print(f"Message received from {msg.topic}: {msg.payload.decode()}")
    try:
        # 解析消息并提取温度数据
        data = json.loads(msg.payload.decode())
        temperature = data.get('temperature')

        # 检查温度是否超过阈值,并发送控制命令
        if temperature is not None:
            if temperature > TEMP_THRESHOLD:
                print("Temperature is above 50°C. Turning on the fan.")
                client.publish(FAN_CONTROL_TOPIC, 'on')
            else:
                print("Temperature is below 50°C. Turning off the fan.")
                client.publish(FAN_CONTROL_TOPIC, 'off')
        else:
            print("No temperature data found.")
    except Exception as e:
        print(f"Error processing message: {e}")

class Command(BaseCommand):
    help = 'Start MQTT subscriber and control fan based on temperature'

    def handle(self, *args, **kwargs):
        # 创建 MQTT 客户端并设置回调
        client = mqtt.Client(CLIENT_ID)
        client.on_connect = on_connect
        client.on_message = on_message

        # 连接 MQTT Broker
        client.connect(MQTT_BROKER, 1883, 60)

        # 开始 MQTT 客户端的循环
        try:
            client.loop_forever()
        except KeyboardInterrupt:
            client.disconnect()
            print("MQTT client disconnected.")

在项目根目录下运行以下命令启动 MQTT 订阅服务:

python manage.py mqtt_subscriber

2,ESP8266 接收风扇控制命令并控制风扇。
在 ESP8266 上编写代码,订阅 home/fan/control 主题,并根据接收到的消息来控制电机(模拟风扇)。

import machine
import network
import time
from umqtt.simple import MQTTClient

# Wi-Fi 配置
SSID = 'your_ssid'
PASSWORD = 'your_password'

# MQTT 配置
MQTT_BROKER = '192.168.1.100'  # 电脑上的 MQTT Broker IP
FAN_CONTROL_TOPIC = 'home/fan/control'
CLIENT_ID = 'esp8266_fan_controller'

# 定义 GPIO 引脚来控制电机(风扇)
fan_pin = machine.Pin(5, machine.Pin.OUT)  # GPIO5/D1

# 连接 Wi-Fi
def connect_wifi(ssid, password):
    wlan = network.WLAN(network.STA_IF)
    wlan.active(True)
    if not wlan.isconnected():
        print('Connecting to network...')
        wlan.connect(ssid, password)
        while not wlan.isconnected():
            pass
    print('Network connected:', wlan.ifconfig())

# MQTT 回调函数
def on_message(topic, msg):
    print(f"Message received on topic {topic}: {msg}")
    if msg == b'on':
        fan_pin.on()  # 打开风扇
        print("Fan turned on")
    elif msg == b'off':
        fan_pin.off()  # 关闭风扇
        print("Fan turned off")

# 连接 Wi-Fi
connect_wifi(SSID, PASSWORD)

# 连接 MQTT Broker 并订阅主题
client = MQTTClient(CLIENT_ID, MQTT_BROKER)
client.set_callback(on_message)
client.connect()
client.subscribe(FAN_CONTROL_TOPIC)

try:
    while True:
        client.check_msg()  # 检查是否有新消息
        time.sleep(1)
        
except KeyboardInterrupt:
    print("Disconnecting from broker.")
    client.disconnect()
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐