SpringBoot 集成 Netty 作为服务端常用 TCP/UDP 通讯协议示例
文章目录1 摘要2 通讯协议3 核心代码3.1 Netty 服务器3.2 Netty 服务端工具类3.3 Netty 服务处理器3.4 协议请求头封装类4 测试4.1 测试工具4.2 测试数据4.2 测试数据1 摘要在 IoT 物联网设备通讯领域,服务端和设备端的通讯通常基于 TCP/UDP 协议,在这些协议的基础上定义自己的私有通讯协议。本文将介绍 SpringBoot 集成 Netty 在服务
文章目录
1 摘要
在 IoT 物联网设备通讯领域,服务端和设备端的通讯通常基于 TCP/UDP 协议,在这些协议的基础上定义自己的私有通讯协议。本文将介绍 SpringBoot 集成 Netty 在服务端常用的TCP、UDP通讯协议示例。
SpringBoot 集成 Netty 作为服务端:
SpringBoot 2.7 集成 Netty 4 模拟服务端与客户端通讯入门教程
SpringBoot 2.7 集成 Netty 4 实现 UDP 通讯
2 通讯协议
字段名 | 字节数组下标 | 字节长度 | 说明 |
---|---|---|---|
flag | byte[0]-byte[3] | 4byte | 固定包头标识,使用"9527"作为固定包头 |
cmd | byte[4] | 1byte | 通讯命令 |
operate | byte[5] | 1byte | 操作,11-发送请求,需要回复;12-发送请求,不需要回复;21-回复操作成功,22-回复操作失败 |
length | byte[6]-byte[7] | 2byte | 数据长度(从包头固定标识开始计算,总字节数) |
msgNo | byte[8] | 1byte | 消息编号(1-255) |
传输数据格式按照 Hex 数据格式进行传输
以上为数据包头部分协议,内容数据拼接在包头后边。
示例:
只包含请求头的原始数据:
{
"flag": "9527",
"cmd": 20,
"operate": 11,
"length": 9,
"msgNo": 85
}
编码后数据:
39353237140b000955
包含内容的原始数据:
{
"flag": "9527",
"cmd": 20,
"operate": 11,
"length": 12,
"msgNo": 85,
"data": "123"
}
编码后数据:
39353237140b000c55313233
3 核心代码
3.1 Netty 服务器
./demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/init/UdpRadarNettyServer.java
package com.ljq.demo.springboot.netty.server.init;
import com.ljq.demo.springboot.netty.server.handler.UdpRadarNettyServerHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioDatagramChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.net.InetSocketAddress;
/**
* @Description: 初始化雷达 udp 服务
* @Author: Mr.lu
* @Date: 2024/8/15
*/
@Slf4j
@Component
public class UdpRadarNettyServer implements ApplicationRunner {
@Value("${netty.portUdpRadar:9135}")
private Integer nettyPort;
@Resource
private UdpRadarNettyServerHandler udpRadarNettyServerHandler;
/**
* 通道
*/
private Channel channel;
@Override
public void run(ApplicationArguments args) throws Exception {
this.start();
}
/**
* 启动服务
*
* @throws InterruptedException
*/
public void start() throws InterruptedException {
// 连接管理线程池
EventLoopGroup mainGroup = new NioEventLoopGroup(2);
EventLoopGroup workGroup = new NioEventLoopGroup(8);
// 工作线程池
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(mainGroup)
// 指定 nio 通道,支持 UDP
.channel(NioDatagramChannel.class)
// 广播模式
.option(ChannelOption.SO_BROADCAST, true)
// 设置读取缓冲区大小为 10M
.option(ChannelOption.SO_RCVBUF, 1024 * 1024 * 10)
// 设置发送缓冲区大小为 10M
.option(ChannelOption.SO_SNDBUF, 1024 * 1024 * 10)
// 线程池复用缓冲区
.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
// 指定 socket 地址和端口
.localAddress(new InetSocketAddress(nettyPort))
// 添加通道 handler
.handler(new ChannelInitializer<NioDatagramChannel>() {
@Override
protected void initChannel(NioDatagramChannel nioDatagramChannel) throws Exception {
nioDatagramChannel.pipeline()
// 指定工作线程,提高并发性能
.addLast(workGroup, udpRadarNettyServerHandler);
}
});
// 异步绑定服务器,调用sync()方法阻塞等待直到绑定完成
ChannelFuture future = bootstrap.bind().sync();
this.channel = future.channel();
log.info("---------- [init] UDP Radar netty server start, port:{} ----------", nettyPort);
}
public Channel getChannel() {
return this.channel;
}
}
3.2 Netty 服务端工具类
一些基础共用的操作方法
./demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/util/UdpRadarServerUtil.java
package com.ljq.demo.springboot.netty.server.util;
import cn.hutool.core.convert.Convert;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.extra.spring.SpringUtil;
import com.ljq.demo.springboot.netty.server.init.UdpRadarNettyServer;
import com.ljq.demo.springboot.netty.server.model.response.UdpRadarHeader;
import io.netty.buffer.Unpooled;
import io.netty.channel.socket.DatagramPacket;
import lombok.extern.slf4j.Slf4j;
import java.net.InetSocketAddress;
import java.util.Objects;
/**
* @Description: udp 雷达服务端工具类
* @Author: Mr.lu
* @Date: 2024/8/15
*/
@Slf4j
public class UdpRadarServerUtil {
/**
* 请求头字节数
*/
public static final int HEADER_BYTE_LENGTH = 9;
/**
* 请求头数据长度
*/
public static final int HEADER_DATA_LENGTH = 18;
/**
* 消息最大长度
*/
public static final int MSG_MAX_LENGTH = 1000;
/**
* 请求头标识
*/
public static final String HEADER_FLAG = "39353237";
private UdpRadarServerUtil() {
}
/**
* 校验服务端接收到的数据是否合法
*
* @param hexMsg
* @return
*/
public static boolean validate(String hexMsg) {
if (StrUtil.isBlank(hexMsg) || hexMsg.length() < HEADER_DATA_LENGTH
|| hexMsg.length() > MSG_MAX_LENGTH || !hexMsg.startsWith(HEADER_FLAG)) {
return false;
}
return true;
}
/**
* 解析请求头
*
* @param hexMsg
* @return
*/
public static UdpRadarHeader parseHeader(String hexMsg) {
UdpRadarHeader header = new UdpRadarHeader();
header.setFlag(HexUtil.decodeHexStr(hexMsg.substring(0, 8)));
header.setCmd(HexUtil.hexToInt(hexMsg.substring(8, 10)));
header.setOperate(HexUtil.hexToInt(hexMsg.substring(10, 12)));
header.setLength(HexUtil.hexToInt(hexMsg.substring(12, 16)));
header.setMsgNo(HexUtil.hexToInt(hexMsg.substring(16, 18)));
return header;
}
/**
* 生成发送数据(仅请求头)
*
* @param header
* @return
*/
public static String getHexData(UdpRadarHeader header) {
if (Objects.isNull(header)) {
return null;
}
StringBuilder dataBuilder = new StringBuilder();
dataBuilder.append(HexUtil.encodeHexStr(header.getFlag()));
dataBuilder.append(String.format("%02x", header.getCmd()));
dataBuilder.append(String.format("%02x", header.getOperate()));
dataBuilder.append(String.format("%04x", header.getLength()));
dataBuilder.append(String.format("%02x", header.getMsgNo()));
return dataBuilder.toString();
}
/**
* 生成发送数据(包含业务数据)
*
* @param header
* @param data
* @return
*/
public static String getHexData(UdpRadarHeader header, String data) {
if (Objects.isNull(header)) {
return null;
}
StringBuilder dataBuilder = new StringBuilder(getHexData(header));
if (StrUtil.isNotBlank(data)) {
dataBuilder.append(HexUtil.encodeHexStr(data));
}
return dataBuilder.toString();
}
/**
* 向客户端发送数据
*
* @param clientIp
* @param clientPort
* @param hexMsg
*/
public static void sendData(String clientIp, int clientPort, String hexMsg) {
// 获取UDP服务端
UdpRadarNettyServer udpRadarNettyServer = SpringUtil.getBean(UdpRadarNettyServer.class);
if (Objects.isNull(udpRadarNettyServer)) {
log.warn("UDP Radar Server is not running");
return;
}
DatagramPacket sendPacket = new DatagramPacket(Unpooled.copiedBuffer(Convert.hexToBytes(hexMsg)),
new InetSocketAddress(clientIp, clientPort));
udpRadarNettyServer.getChannel().writeAndFlush(sendPacket);
}
/**
* 向客户端发送数据
*
* @param clientId
* @param clientPort
* @param header
* @param data
*/
public static void sendData(String clientId, int clientPort,UdpRadarHeader header, String data) {
// 组装数据
String hexMsg = getHexData(header, data);
// 发送数据
sendData(clientId, clientPort, hexMsg);
}
}
注意事项:
在使用字节传递数据的过程中通常需要进行数据转换,这里给出字符串、数字转化为16进制的Hex格式数据示例:
使用 HuTool 工具包中的 HexUtil 工具类进行转换。
字符串转换为Hex数据:
HexUtil.encodeHexStr("xxx")
转化后的Hex格式字符串长度为原始字符串长度的 2 倍。
数字转换为Hex数据:
// 转换后一个数字占2个字符,不够的高位补0
String.format("%02x", 123)
// 转换后一个数字占4个字符,不够的高位补0
String.format("%04x", 123)
将 Hex 格式的字符串转化为业务所需数据:
根据协议截取对应的字符串,然后进行转换,这里给出Hex格式字符串转化为字符串、数字
Hex 字符串转换为字符串:
HexUtil.decodeHexStr("hexString")
Hex 格式字符串转化为数字:
HexUtil.hexToInt("hexString")
3.3 Netty 服务处理器
./demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/handler/UdpRadarNettyServerHandler.java
package com.ljq.demo.springboot.netty.server.handler;
import cn.hutool.core.util.HexUtil;
import cn.hutool.core.util.RandomUtil;
import com.ljq.demo.springboot.netty.server.model.response.UdpRadarHeader;
import com.ljq.demo.springboot.netty.server.util.UdpRadarServerUtil;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.socket.DatagramPacket;
import io.netty.util.concurrent.DefaultThreadFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import java.net.InetSocketAddress;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* @Description: udp 雷达服务端处理器
* @Author: Mr.lu
* @Date: 2024/8/15
*/
@Slf4j
@Component
@ChannelHandler.Sharable
public class UdpRadarNettyServerHandler extends SimpleChannelInboundHandler<DatagramPacket> {
/**
* 工作线程池
*/
private final ExecutorService executorService = new ThreadPoolExecutor(4, 8, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000), new DefaultThreadFactory("UDP-Radar-work-pool"),
new ThreadPoolExecutor.CallerRunsPolicy());
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, DatagramPacket packet) throws Exception {
// 读取数据
// 十六进制数据字符串
String hexMsg = ByteBufUtil.hexDump(packet.content()).toUpperCase();
// 校验数据
if (!UdpRadarServerUtil.validate(hexMsg)) {
log.info("invalid data");
return;
}
InetSocketAddress clientAddress = packet.sender();
// 业务处理
executorService.execute(() -> {
// 解析请求头
UdpRadarHeader header = UdpRadarServerUtil.parseHeader(hexMsg);
log.info("UDP Radar server receive msg,client: {},cmd:{},operate:{}", clientAddress.getHostString(),
header.getCmd(), header.getOperate());
// 根据命令执行对应的业务
// TODO 业务处理,返回响应客户端数据
String responseHexMsg = demoService(hexMsg);
// 回复客户端
if (Objects.equals(11, header.getOperate())) {
// 发送数据
UdpRadarServerUtil.sendData(clientAddress.getHostString(), clientAddress.getPort(), responseHexMsg);
}
});
}
/**
* 示例业务方法
*
* @param hexMsg
* @return
*/
private String demoService(String hexMsg) {
// TODO 业务处理
UdpRadarHeader header = UdpRadarServerUtil.parseHeader(hexMsg);
// 回复内容
UdpRadarHeader responseHeader = new UdpRadarHeader();
responseHeader.setCmd(header.getCmd());
responseHeader.setOperate(21);
responseHeader.setLength(UdpRadarServerUtil.HEADER_BYTE_LENGTH + 5);
responseHeader.setMsgNo(header.getMsgNo());
// 编码响应数据
StringBuilder responseHexMsgBuilder = new StringBuilder();
responseHexMsgBuilder.append(UdpRadarServerUtil.getHexData(responseHeader));
responseHexMsgBuilder.append(String.format("%02x", RandomUtil.randomInt(0,128)));
responseHexMsgBuilder.append(HexUtil.encodeHexStr(RandomUtil.randomString(4)));
return responseHexMsgBuilder.toString();
}
}
注意事项:
这里展示了如何读取客户端上传的字节流、如何解析数据、如何获取客户端信息、如何回复客户端等问题。
将字节流转化 Hex 格式字符串。
// 十六进制数据字符串
String hexMsg = ByteBufUtil.hexDump(packet.content()).toUpperCase();
读取到客户端的 ip 地址信息:
InetSocketAddress clientAddress = packet.sender();
这里因为使用的是 io.netty.channel.socket.DatagramPacket
对象,里边包含了客户端的传输数据以及客户端的ip地址信息,因此从这里获取 ip 地址,这时是无法从 io.netty.channel.ChannelHandlerContext
对象中获取到客户端的 ip 地址信息的。
注意:使用以下方式获取的客户端 ip 地址信息为空:
InetSocketAddress clientAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
如果服务处理类的数据对象泛型为 io.netty.buffer.ByteBuf
则可以使用以上方式获取客户端 ip 地址信息。
向客户端发送信息,需要先获取到 Netty 服务的通道(Channel),然后包装发送对象,再进行发送:
// 获取UDP服务端
UdpRadarNettyServer udpRadarNettyServer = SpringUtil.getBean(UdpRadarNettyServer.class);
// 包装发送对象,将数据先转化为 Hex 字符串,然后再转换成 byte 数组,再将 byte 数组转化为 ByteBuf 对象
DatagramPacket sendPacket = new DatagramPacket(Unpooled.copiedBuffer(Convert.hexToBytes(hexMsg)),
new InetSocketAddress(clientIp, clientPort));
// 发送给客户端
udpRadarNettyServer.getChannel().writeAndFlush(sendPacket);
3.4 协议请求头封装类
数据请求头封装为对象,方便业务处理
./demo-netty-server/src/main/java/com/ljq/demo/springboot/netty/server/model/response/UdpRadarHeader.java
package com.ljq.demo.springboot.netty.server.model.response;
import lombok.Data;
import java.io.Serializable;
/**
* @Description: UDP 雷达请求头参数
* @Author: Mr.lu
* @Date: 2024/8/15
*/
@Data
public class UdpRadarHeader implements Serializable {
private static final long serialVersionUID = 4765355155562316019L;
/**
* 固定包头标识(4byte)
*/
private String flag = "9527";
/**
* 命令(1byte)
*/
private Integer cmd;
/**
* 操作,11-发送请求,需要回复;12-发送请求,不需要回复;21-回复操作成功,22-回复操作失败(1byte)
*/
private Integer operate;
/**
* 数据长度(从包头固定标识开始计算,总字节数)(2byte)
*/
private Integer length;
/**
* 消息编号(1-255)(1byte)
*/
private Integer msgNo;
}
4 测试
4.1 测试工具
推荐使用 NetAssist 网络调试助手来模拟设备发送 TCP/UDP 指令
下载链接: https://www.cmsoft.cn/resource/102.html
4.2 测试数据
原始发送数据:
{
"flag": "9527",
"cmd": 20,
"operate": 11,
"length": 9,
"msgNo": 85
}
编码后的发送数据:
39353237140b000955
后台日志:
2024-08-20 09:52:41 | INFO | UDP-Radar-work-pool-3-1 | com.ljq.demo.springboot.netty.server.handler.UdpRadarNettyServerHandler 55| UDP Radar server receive msg,client: 192.168.15.32,cmd:20,operate:11
至此,一个常用的 TCP、UDP通讯协议的 Netty 服务端搭建完成。
手来模拟设备发送 TCP/UDP 指令
下载链接: https://www.cmsoft.cn/resource/102.html
[外链图片转存中…(img-Rm4tQJGe-1724123199430)]
4.2 测试数据
原始发送数据:
{
"flag": "9527",
"cmd": 20,
"operate": 11,
"length": 9,
"msgNo": 85
}
编码后的发送数据:
39353237140b000955
后台日志:
2024-08-20 09:52:41 | INFO | UDP-Radar-work-pool-3-1 | com.ljq.demo.springboot.netty.server.handler.UdpRadarNettyServerHandler 55| UDP Radar server receive msg,client: 192.168.15.32,cmd:20,operate:11
至此,一个常用的 TCP、UDP通讯协议的 Netty 服务端搭建完成。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)