一、EMQX介绍

EMQX是大规模分布式MQTT消息服务器,可以高效可靠连接海量物联网设备,实时处理分发消息与事件流数据,助力构建关键业务的物联网与云应用。EMQX 作为物联网应用开发和物联网平台搭建必须用到的基础设施软件,主要在边缘和云端实现物联网设备互联与设备上云,提供物联网设备接入、协议处理、消息路由、数据存储、流数据处理等核心能力。

二、EMQX安装

访问官网下载安装包:下载 EMQX

解压zip文件得到软件目录

运行EMQX,打开cmd命令窗口,进入软件bin目录,输入emqx start命令启动软件

登录emqx控制台,访问http://127.0.0.1:18083/,默认用户名、密码是admin、public。

三、Java实现发送和订阅消息

3.1 创建客户端基础代码
  • 引入pom依赖
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>
  • 订阅和发布消息相关代码
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;


public class App {
    public static void main(String[] args) {
        String subTopic = "testtopic/#";
        String pubTopic = "testtopic/1";
        String content = "Hello World";
        int qos = 2;
        String broker = "tcp://127.0.0.1:1883";
        String clientId = "emqx_test";
        MemoryPersistence persistence = new MemoryPersistence();

        try {
            MqttClient client = new MqttClient(broker, clientId, persistence);

            // MQTT 连接选项
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setUserName("用户名");
            connOpts.setPassword("密码".toCharArray());

            // 保留会话
            connOpts.setCleanSession(true);

            MqttCallback callback = new OnMessageCallback();

            // 设置回调
            client.setCallback(callback);

            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);

            System.out.println("Connected");
            System.out.println("Publishing message: " + content);

            // 订阅主题
            client.subscribe(subTopic);

            // 消息发布所需参数
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);

            // 发布消息
            client.publish(pubTopic, message);
            System.out.println("Message published");

//            client.disconnect();
//            System.out.println("Disconnected");
//            client.close();
//            System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}
  • 接收消息相关代码
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

public class OnMessageCallback implements MqttCallback {
    public void connectionLost(Throwable cause) {
        // 连接丢失后,一般在这里面进行重连
        System.out.println("连接断开,可以做重连");
    }

    public void messageArrived(String topic, MqttMessage message) throws Exception {
        // subscribe后得到的消息会执行到这里面
        System.out.println("接收消息主题:" + topic);
        System.out.println("接收消息Qos:" + message.getQos());
        System.out.println("接收消息内容:" + new String(message.getPayload()));
    }

    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("deliveryComplete---------" + token.isComplete());
    }
}

3.2 创建客户端进阶代码
  • 引入pom依赖
<!--mqtt-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.1</version>
</dependency>

<!-- fastJSON -->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.56</version>
</dependency>
  • 定义发送消息客户端的配置
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;

@Configuration
@IntegrationComponentScan
public class MqttSenderConfig {

/**
 * 发布的bean名称
 */
public static final String CHANNEL_NAME_OUT = "mqttOutboundChannel";

/**
 * 客户端与服务器之间的连接意外中断,服务器将发布客户端的"遗嘱"消息
 */
private static final byte[] WILL_DATA;
static {
    WILL_DATA = "offline".getBytes();
}
private static final String username = "admin";
private static final String password = "DCDremote@997";
private static final String url = "tcp://127.0.0.1:1883";
private static final String clientId = "honeywell-server1";
private static final String defaultTopic = "default";

//    @Value("${mqtt.username}")
//    private String username;
//
//    @Value("${mqtt.password}")
//    private String password;
//
//    @Value("${mqtt.url}")
//    private String url;
//
//    @Value("${mqtt.sender.clientId}")
//    private String clientId;
//
//    @Value("${mqtt.sender.topic}")
//    private String defaultTopic;

@Bean
public MqttConnectOptions getMqttConnectOption(){
    MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
    mqttConnectOptions.setCleanSession(true);
    mqttConnectOptions.setConnectionTimeout(10);
    mqttConnectOptions.setKeepAliveInterval(90);
    mqttConnectOptions.setAutomaticReconnect(true);
    mqttConnectOptions.setUserName(username);
    mqttConnectOptions.setPassword(password.toCharArray());
    mqttConnectOptions.setServerURIs(new String[]{url});
    mqttConnectOptions.setKeepAliveInterval(30);
    return mqttConnectOptions;
}

@Bean
public MqttPahoClientFactory mqttClientsFactory() {
    DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
    factory.setConnectionOptions(getMqttConnectOption());
    return factory;
}

@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler  mqttOutbound() {
    MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientsFactory());
    messageHandler.setAsync(true);
    messageHandler.setDefaultTopic(defaultTopic);
    messageHandler.setDefaultQos(1);
    return messageHandler;
}

@Bean
public MessageChannel mqttOutboundChannel() {
    return new DirectChannel();
}
}
  • 编写接收消息的客户端的相关配置
package org.jianying.emqxstudy.mqtt;


import com.alibaba.fastjson.JSON;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import java.util.Arrays;
import java.util.List;
import java.util.Map;



@Configuration
public class MqttReceiverConfig {

    final static Logger logger = LoggerFactory.getLogger(MqttReceiverConfig.class);


    /**
     * 订阅的bean名称
     */
    public static final String CHANNEL_NAME_IN = "mqttInboundChannel";

    // 客户端与服务器之间的连接意外中断,服务器将发布客户端的“遗嘱”消息
    private static final byte[] WILL_DATA;

    static {
        WILL_DATA = "offline".getBytes();
    }

    private static final String username = "admin";
    private static final String password = "DCDremote@997";
    private static final String url = "tcp://127.0.0.1:1883";

    // 接收消息的客户端id
    private static final String clientId = "test-server";

    // 接收的消息主题, $SYS/brokers 表示发送的是系统主题
    private static final String defaultTopic = "$SYS/brokers/+/clients/#,hello/info/faceid/#,hello/server/result/#,info_topic";

//    @Value("${mqtt.username}")
//    private String username;
//
//    @Value("${mqtt.password}")
//    private String password;
//
//    @Value("${mqtt.url}")
//    private String url;
//
//    @Value("${mqtt.receiver.clientId}")
//    private String clientId;
//
//    @Value("${mqtt.receiver.topic}")
//    private String defaultTopic;

    @Bean
    public MqttConnectOptions getMqttConnectOptions() {
        MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
        mqttConnectOptions.setCleanSession(true);
        mqttConnectOptions.setConnectionTimeout(10);
        mqttConnectOptions.setKeepAliveInterval(90);
        mqttConnectOptions.setAutomaticReconnect(true);
        mqttConnectOptions.setUserName(username);
        mqttConnectOptions.setPassword(password.toCharArray());
        mqttConnectOptions.setServerURIs(new String[]{url});
        mqttConnectOptions.setKeepAliveInterval(60);
        return mqttConnectOptions;
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        factory.setConnectionOptions(getMqttConnectOptions());
        return factory;
    }

    //接收通道
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }

    @Bean
    public MessageProducer inbound() {

        List<String> topicList = Arrays.asList(defaultTopic.trim().split(","));
        String[] topics = new String[topicList.size()];
        topicList.toArray(topics);

        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(clientId, mqttClientFactory(),
                        topics);
        adapter.setCompletionTimeout(10000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }

    //通过通道获取数据
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                logger.info(("收到消息" + message.getHeaders().get("mqtt_receivedTopic") + message.getPayload()));
                // 主题
                String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
                // 消息体
                Map maps = (Map) JSON.parse(message.getPayload().toString().trim());
                // 判断设备状态
                if (topic.contains("$SYS/brokers") && !topic.contains("faceid-server") && !topic.contains("faceid-mqtt-server")) {
                    if (maps.get("clientid").toString().contains("uniwin-mqtt-client")) {

                    }
                } else if (topic.contains("uniwin/server/result/faceid")) { //结果返回
                    if (maps.get("type") != null && !maps.get("type").equals("")) {

                    }
                } else {
                    System.out.println("info...");
                    if (maps.get("type") != null && !maps.get("type").equals("")) {
                        String type = maps.get("type").toString();
                        // 设备心跳检测
                        if (type.equals("heart")) {

                        }
                        // 上传打卡记录
                        if (type.equals("note")) {
                        }
                        // 上传设备参数
                        if (type.equals("param_upload")) {
                        }
                    }
                }

            }
        };
    }

}

  • 编写发送消息的工具类
import org.springframework.integration.annotation.MessagingGateway;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    /**
     * 发送信息到MQTT服务器
     *
     * @param data 发送的文本
     */
    void sendToMqtt(String data);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
                    String payload);

    /**
     * 发送信息到MQTT服务器
     *
     * @param topic 主题
     * @param qos 对消息处理的几种机制。
     * 0 表示的是订阅者没收到消息不会再次发送,消息会丢失。
     * 1 表示的是会尝试重试,一直到接收到消息,但这种情况可能导致订阅者收到多次重复消息。
     * 2 多了一次去重的动作,确保订阅者收到的消息有一次。
     * @param payload 消息主体
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic,
                    @Header(MqttHeaders.QOS) int qos,
                    String payload);

}
3.3 EMQX API使用示例
  • EMQX 提供了 HTTP API 以实现与外部系统的集成,例如查询客户端信息、发布消息和创建规则等。
  • EMQX 的 HTTP API 服务默认监听 8081 端口,可通过 etc/plugins/emqx_management.conf 配置文件修改监听端口,或启用 HTTPS 监听。EMQX 4.0.0 (opens new window)以后的所有 API 调用均以 api/v4 开头。

详细介绍可以看官方API文档:HTTP API | EMQX 4.3 文档

下面的代码是以 v4.3 版本为例,以API调用的方式操作EMQX服务。

3.3.1 消息发布
  • 请求方式

POST http://localhost:8081/api/v4/mqtt/publish

  • 请求参数(json数据)

Name

Type

Required

Default

Description

topic

String

Optional

主题,与 topics 至少指定其中之一

topics

String

Optional

以 , 分割的多个主题,使用此字段能够同时发布消息到多个主题

clientid

String

Required

客户端标识符

payload

String

Required

消息正文

encoding

String

Optional

plain

消息正文使用的编码方式,目前仅支持 plain 与 base64 两种

qos

Integer

Optional

0

QoS 等级

retain

Boolean

Optional

false

是否为保留消息

  • 请求成功返回结果

Name

Type

Description

code

Integer

0

  • 代码示例
public void publiceMessage() {
    // 封装请求体参数
    Map<String, Object> map = new HashMap<>();
    map.put("clientid", "clientId");
    map.put("topic", "pubTopic");
    map.put("payload", "{\"iot_type\":10}");
    map.put("qos", 1);
    // json字符串请求体(注意:不转为json字符串,直接拼接字符串容易报错)
    String requestBody = JsonUtil.obj2string(map);
    // emqx路径
    String emqxApiBaseUrl = "http://127.0.0.1:8081"; // EMQ X API 地址
    try {
        // 构建发布消息的 URL
        String publicConnectionUrl = emqxApiBaseUrl + "/api/v4/mqtt/publish";
        // 创建 URL 对象
        URL url = new URL(publicConnectionUrl);
        // 创建 HttpURLConnection
        HttpURLConnection connection = (HttpURLConnection) url.openConnection();
        // 设置访问权限(使用Authorization方式)
        connection.setRequestProperty("Authorization", "Basic YWRtaW46cHVibGlj");
        connection.setRequestProperty("Content-Type", "application/json");
        connection.setRequestMethod("POST");
        connection.setDoOutput(true);
        // 将请求体写入连接
        try (OutputStream os = connection.getOutputStream()) {
            byte[] input = requestBody.getBytes(StandardCharsets.UTF_8);
            os.write(input, 0, input.length);
        }
        // 获取响应码
        int responseCode = connection.getResponseCode();
        if (responseCode == HttpURLConnection.HTTP_OK) {
            System.out.println("Successfully public: " + clientId);
        } else {
            System.out.println("Failed to public. Response Code: " + responseCode);
        }
        // 关闭连接
        connection.disconnect();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
3.3.2 根据clientid断开并剔除客户端

踢除指定客户端。注意踢除客户端操作会将连接与会话一并终结。

  • 请求方式

DELETE http://localhost:8081/api/v4/clients/{clientid}

  • 请求参数(Path Parameters)

Name

Type

Required

Description

clientid

String

True

ClientID

  • 请求成功结果

Name

Type

Description

code

Integer

0

  • 代码示例
/**
 * 根据客户端id关闭客户端
 * @param clientId
 */
private void closeClientByClientId(String clientId) {
    String emqxApiBaseUrl = "http://localhost:8081";  // EMQ X API 地址
    try {
        // 对clientId进行编码(避免clientId有特殊字符,比如/)
        String encodedClientId = URLEncoder.encode(clientId, StandardCharsets.UTF_8.toString());

        // 构建关闭连接的 URL
        String closeConnectionUrl = emqxApiBaseUrl + "/api/v4/clients/" + encodedClientId;

        // 创建 URL 对象
        URL url = new URL(closeConnectionUrl);

        // 创建 HttpURLConnection
        HttpURLConnection connection = (HttpURLConnection) url.openConnection();

        // 设置访问权限
        connection.setRequestProperty("authorization", "Basic YWRtaW46cHVibGlj");

        // 设置请求方法为 DELETE
        connection.setRequestMethod("DELETE");

        // 获取响应码
        int responseCode = connection.getResponseCode();

        if (responseCode == HttpURLConnection.HTTP_NO_CONTENT) {
            System.out.println("Successfully closed client connection with clientId: " + clientId);
        } else {
            System.out.println("Failed to close client connection. Response Code: " + responseCode);
        }
        
        // 关闭连接
        connection.disconnect();
    } catch (Exception e) {
        e.printStackTrace();
    }
}
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐