1 摘要

在 IoT 物联网设备通讯领域,服务端和设备端的通讯通常基于 TCP/UDP 协议,在这些协议的基础上定义自己的私有通讯协议。本文将介绍 SpringBoot 集成 Netty 在服务端常用的TCP、UDP通讯协议示例。

SpringBoot 集成 Netty 作为服务端:

SpringBoot 2.7 集成 Netty 4 模拟服务端与客户端通讯入门教程

SpringBoot 2.7 集成 Netty 4 实现 UDP 通讯

2 通讯协议

字段名字节数组下标字节长度说明
flagbyte[0]-byte[3]4byte固定包头标识,使用"9527"作为固定包头
cmdbyte[4]1byte通讯命令
operatebyte[5]1byte操作,11-发送请求,需要回复;12-发送请求,不需要回复;21-回复操作成功,22-回复操作失败
lengthbyte[6]-byte[7]2byte数据长度(从包头固定标识开始计算,总字节数)
msgNobyte[8]1byte消息编号(1-255)

传输数据格式按照 Hex 数据格式进行传输

以上为数据包头部分协议,内容数据拼接在包头后边。

示例:

只包含请求头的原始数据:

{
    "flag": "9527",
    "cmd": 20,
    "operate": 11,
    "length": 9,
    "msgNo": 85
}

编码后数据:

39353237140b000955

包含内容的原始数据:

{
    "flag": "9527",
    "cmd": 20,
    "operate": 11,
    "length": 12,
    "msgNo": 85,
    "data": "123"
}

编码后数据:

39353237140b000c55313233

3 核心代码

3.1 Netty 服务器

./demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/init/UdpRadarNettyServer.java
package com.ljq.demo.springboot.netty.server.init;

import com.ljq.demo.springboot.netty.server.handler.UdpRadarNettyServerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.net.InetSocketAddress;

/**
 * @Description: 初始化雷达 udp 服务
 * @Author: Mr.lu
 * @Date: 2024/8/15
 */
@Slf4j
@Component
public class UdpRadarNettyServer implements ApplicationRunner {

    @Value("${netty.portUdpRadar:9135}")
    private Integer nettyPort;

    @Resource
    private UdpRadarNettyServerHandler udpRadarNettyServerHandler;

    /**
     * 通道
     */
    private Channel channel;

    @Override
    public void run(ApplicationArguments args) throws Exception {
        this.start();
    }

    /**
     * 启动服务
     *
     * @throws InterruptedException
     */
    public void start() throws InterruptedException {
        // 连接管理线程池
        EventLoopGroup mainGroup = new NioEventLoopGroup(2);
        EventLoopGroup workGroup = new NioEventLoopGroup(8);
        // 工作线程池
        Bootstrap bootstrap = new Bootstrap();
        bootstrap.group(mainGroup)
                // 指定 nio 通道,支持 UDP
                .channel(NioDatagramChannel.class)
                // 广播模式
                .option(ChannelOption.SO_BROADCAST, true)
                // 设置读取缓冲区大小为 10M
                .option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10)
                // 设置发送缓冲区大小为 10M
                .option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10)
                // 线程池复用缓冲区
                .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                // 指定 socket 地址和端口
                .localAddress(new InetSocketAddress(nettyPort))
                // 添加通道 handler
                .handler(new ChannelInitializer<NioDatagramChannel>() {
                    @Override
                    protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
                        nioDatagramChannel.pipeline()
                                // 指定工作线程,提高并发性能
                                .addLast(workGroup, udpRadarNettyServerHandler);
                    }
                });
        // 异步绑定服务器,调用sync()方法阻塞等待直到绑定完成
        ChannelFuture future = bootstrap.bind().sync();
        this.channel = future.channel();
        log.info("---------- [init] UDP Radar netty server start, port:{} ----------", nettyPort);
    }

    public Channel getChannel() {
        return this.channel;
    }

}

3.2 Netty 服务端工具类

一些基础共用的操作方法

./demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/util/UdpRadarServerUtil.java
package com.ljq.demo.springboot.netty.server.util;

import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.ljq.demo.springboot.netty.server.init.UdpRadarNettyServer;
import com.ljq.demo.springboot.netty.server.model.response.UdpRadarHeader;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j;

import java.net.InetSocketAddress;
import java.util.Objects;

/**
 * @Description: udp 雷达服务端工具类
 * @Author: Mr.lu
 * @Date: 2024/8/15
 */
@Slf4j
public class UdpRadarServerUtil {

    /**
     * 请求头字节数
     */
    public static final int HEADER_BYTE_LENGTH = 9;

    /**
     * 请求头数据长度
     */
    public static final int HEADER_DATA_LENGTH = 18;

    /**
     * 消息最大长度
     */
    public static final int MSG_MAX_LENGTH = 1000;

    /**
     * 请求头标识
     */
    public static final String HEADER_FLAG = "39353237";

    private UdpRadarServerUtil() {
    }


    /**
     * 校验服务端接收到的数据是否合法
     *
     * @param hexMsg
     * @return
     */
    public static boolean validate(String hexMsg) {
        if (StrUtil.isBlank(hexMsg) || hexMsg.length() < HEADER_DATA_LENGTH
                || hexMsg.length() > MSG_MAX_LENGTH || !hexMsg.startsWith(HEADER_FLAG)) {
            return false;
        }
        return true;
    }

    /**
     * 解析请求头
     *
     * @param hexMsg
     * @return
     */
    public static UdpRadarHeader parseHeader(String hexMsg) {
        UdpRadarHeader header = new UdpRadarHeader();
        header.setFlag(HexUtil.decodeHexStr(hexMsg.substring(0, 8)));
        header.setCmd(HexUtil.hexToInt(hexMsg.substring(8, 10)));
        header.setOperate(HexUtil.hexToInt(hexMsg.substring(10, 12)));
        header.setLength(HexUtil.hexToInt(hexMsg.substring(12, 16)));
        header.setMsgNo(HexUtil.hexToInt(hexMsg.substring(16, 18)));
        return header;
    }


    /**
     * 生成发送数据(仅请求头)
     *
     * @param header
     * @return
     */
    public static String getHexData(UdpRadarHeader header) {
        if (Objects.isNull(header)) {
            return null;
        }
        StringBuilder dataBuilder = new StringBuilder();
        dataBuilder.append(HexUtil.encodeHexStr(header.getFlag()));
        dataBuilder.append(String.format("%02x", header.getCmd()));
        dataBuilder.append(String.format("%02x", header.getOperate()));
        dataBuilder.append(String.format("%04x", header.getLength()));
        dataBuilder.append(String.format("%02x", header.getMsgNo()));
        return dataBuilder.toString();
    }


    /**
     * 生成发送数据(包含业务数据)
     *
     * @param header
     * @param data
     * @return
     */
    public static String getHexData(UdpRadarHeader header, String data) {
        if (Objects.isNull(header)) {
            return null;
        }
        StringBuilder dataBuilder = new StringBuilder(getHexData(header));
        if (StrUtil.isNotBlank(data)) {
            dataBuilder.append(HexUtil.encodeHexStr(data));
        }
        return dataBuilder.toString();
    }


    /**
     * 向客户端发送数据
     *
     * @param clientIp
     * @param clientPort
     * @param hexMsg
     */
    public static void sendData(String clientIp, int clientPort, String hexMsg) {
        // 获取UDP服务端
        UdpRadarNettyServer udpRadarNettyServer = SpringUtil.getBean(UdpRadarNettyServer.class);
        if (Objects.isNull(udpRadarNettyServer)) {
            log.warn("UDP Radar Server is not running");
            return;
        }
        DatagramPacket sendPacket = new DatagramPacket(Unpooled.copiedBuffer(Convert.hexToBytes(hexMsg)),
                new InetSocketAddress(clientIp, clientPort));
        udpRadarNettyServer.getChannel().writeAndFlush(sendPacket);
    }

    /**
     * 向客户端发送数据
     *
     * @param clientId
     * @param clientPort
     * @param header
     * @param data
     */
    public static void sendData(String clientId, int clientPort,UdpRadarHeader header, String data) {
        // 组装数据
        String hexMsg = getHexData(header, data);
        // 发送数据
        sendData(clientId, clientPort, hexMsg);
    }



}

注意事项:

在使用字节传递数据的过程中通常需要进行数据转换,这里给出字符串、数字转化为16进制的Hex格式数据示例:

使用 HuTool 工具包中的 HexUtil 工具类进行转换。

字符串转换为Hex数据:

HexUtil.encodeHexStr("xxx")

转化后的Hex格式字符串长度为原始字符串长度的 2 倍。

数字转换为Hex数据:

// 转换后一个数字占2个字符,不够的高位补0
String.format("%02x", 123)
// 转换后一个数字占4个字符,不够的高位补0
String.format("%04x", 123)

将 Hex 格式的字符串转化为业务所需数据:

根据协议截取对应的字符串,然后进行转换,这里给出Hex格式字符串转化为字符串、数字

Hex 字符串转换为字符串:

HexUtil.decodeHexStr("hexString")

Hex 格式字符串转化为数字:

HexUtil.hexToInt("hexString")

3.3 Netty 服务处理器

./demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/handler/UdpRadarNettyServerHandler.java
package com.ljq.demo.springboot.netty.server.handler;

import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.RandomUtil;
import com.ljq.demo.springboot.netty.server.model.response.UdpRadarHeader;
import com.ljq.demo.springboot.netty.server.util.UdpRadarServerUtil;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * @Description: udp 雷达服务端处理器
 * @Author: Mr.lu
 * @Date: 2024/8/15
 */
@Slf4j
@Component
@ChannelHandler.Sharable
public class UdpRadarNettyServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {

    /**
     * 工作线程池
     */
    private final ExecutorService executorService = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10000), new DefaultThreadFactory("UDP-Radar-work-pool"),
            new ThreadPoolExecutor.CallerRunsPolicy());

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception {
        // 读取数据
        // 十六进制数据字符串
        String hexMsg = ByteBufUtil.hexDump(packet.content()).toUpperCase();
        // 校验数据
        if (!UdpRadarServerUtil.validate(hexMsg)) {
            log.info("invalid data");
            return;
        }
        InetSocketAddress clientAddress = packet.sender();
        // 业务处理
        executorService.execute(() -> {
            // 解析请求头
            UdpRadarHeader header = UdpRadarServerUtil.parseHeader(hexMsg);
            log.info("UDP Radar server receive msg,client: {},cmd:{},operate:{}", clientAddress.getHostString(),
                    header.getCmd(), header.getOperate());
            // 根据命令执行对应的业务
            // TODO  业务处理,返回响应客户端数据
            String responseHexMsg = demoService(hexMsg);

            // 回复客户端
            if (Objects.equals(11, header.getOperate())) {
                // 发送数据
                UdpRadarServerUtil.sendData(clientAddress.getHostString(), clientAddress.getPort(), responseHexMsg);
            }
        });

    }


    /**
     * 示例业务方法
     *
     * @param hexMsg
     * @return
     */
    private String demoService(String hexMsg) {
        // TODO 业务处理

        UdpRadarHeader header = UdpRadarServerUtil.parseHeader(hexMsg);

        // 回复内容
        UdpRadarHeader responseHeader = new UdpRadarHeader();
        responseHeader.setCmd(header.getCmd());
        responseHeader.setOperate(21);
        responseHeader.setLength(UdpRadarServerUtil.HEADER_BYTE_LENGTH + 5);
        responseHeader.setMsgNo(header.getMsgNo());
        // 编码响应数据
        StringBuilder responseHexMsgBuilder = new StringBuilder();
        responseHexMsgBuilder.append(UdpRadarServerUtil.getHexData(responseHeader));
        responseHexMsgBuilder.append(String.format("%02x", RandomUtil.randomInt(0,128)));
        responseHexMsgBuilder.append(HexUtil.encodeHexStr(RandomUtil.randomString(4)));
        return responseHexMsgBuilder.toString();
    }

}

注意事项:

这里展示了如何读取客户端上传的字节流、如何解析数据、如何获取客户端信息、如何回复客户端等问题。

将字节流转化 Hex 格式字符串。

// 十六进制数据字符串
String hexMsg = ByteBufUtil.hexDump(packet.content()).toUpperCase();

读取到客户端的 ip 地址信息:

InetSocketAddress clientAddress = packet.sender();

这里因为使用的是 io.netty.channel.socket.DatagramPacket 对象,里边包含了客户端的传输数据以及客户端的ip地址信息,因此从这里获取 ip 地址,这时是无法从 io.netty.channel.ChannelHandlerContext 对象中获取到客户端的 ip 地址信息的。

注意:使用以下方式获取的客户端 ip 地址信息为空:

InetSocketAddress clientAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();

如果服务处理类的数据对象泛型为 io.netty.buffer.ByteBuf 则可以使用以上方式获取客户端 ip 地址信息。

向客户端发送信息,需要先获取到 Netty 服务的通道(Channel),然后包装发送对象,再进行发送:

// 获取UDP服务端
UdpRadarNettyServer udpRadarNettyServer = SpringUtil.getBean(UdpRadarNettyServer.class);
// 包装发送对象,将数据先转化为 Hex 字符串,然后再转换成 byte 数组,再将 byte 数组转化为 ByteBuf 对象
DatagramPacket sendPacket = new DatagramPacket(Unpooled.copiedBuffer(Convert.hexToBytes(hexMsg)),
                new InetSocketAddress(clientIp, clientPort));
// 发送给客户端
udpRadarNettyServer.getChannel().writeAndFlush(sendPacket);

3.4 协议请求头封装类

数据请求头封装为对象,方便业务处理

./demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/model/response/UdpRadarHeader.java
package com.ljq.demo.springboot.netty.server.model.response;

import lombok.Data;

import java.io.Serializable;

/**
 * @Description: UDP 雷达请求头参数
 * @Author: Mr.lu
 * @Date: 2024/8/15
 */
@Data
public class UdpRadarHeader implements Serializable {

    private static final long serialVersionUID = 4765355155562316019L;


    /**
     * 固定包头标识(4byte)
     */
    private String flag = "9527";

    /**
     * 命令(1byte)
     */
    private Integer cmd;

    /**
     * 操作,11-发送请求,需要回复;12-发送请求,不需要回复;21-回复操作成功,22-回复操作失败(1byte)
     */
    private Integer operate;

    /**
     * 数据长度(从包头固定标识开始计算,总字节数)(2byte)
     */
    private Integer length;

    /**
     * 消息编号(1-255)(1byte)
     */
    private Integer msgNo;



}

4 测试

4.1 测试工具

推荐使用 NetAssist 网络调试助手来模拟设备发送 TCP/UDP 指令

下载链接: https://www.cmsoft.cn/resource/102.html

image-20240820095457058

4.2 测试数据

原始发送数据:

{
    "flag": "9527",
    "cmd": 20,
    "operate": 11,
    "length": 9,
    "msgNo": 85
}

编码后的发送数据:

39353237140b000955

后台日志:

2024-08-20 09:52:41 | INFO  | UDP-Radar-work-pool-3-1 | com.ljq.demo.springboot.netty.server.handler.UdpRadarNettyServerHandler 55| UDP Radar server receive msg,client: 192.168.15.32,cmd:20,operate:11

至此,一个常用的 TCP、UDP通讯协议的 Netty 服务端搭建完成。

手来模拟设备发送 TCP/UDP 指令

下载链接: https://www.cmsoft.cn/resource/102.html

[外链图片转存中…(img-Rm4tQJGe-1724123199430)]

4.2 测试数据

原始发送数据:

{
    "flag": "9527",
    "cmd": 20,
    "operate": 11,
    "length": 9,
    "msgNo": 85
}

编码后的发送数据:

39353237140b000955

后台日志:

2024-08-20 09:52:41 | INFO  | UDP-Radar-work-pool-3-1 | com.ljq.demo.springboot.netty.server.handler.UdpRadarNettyServerHandler 55| UDP Radar server receive msg,client: 192.168.15.32,cmd:20,operate:11

至此,一个常用的 TCP、UDP通讯协议的 Netty 服务端搭建完成。

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐