为什么

大家的项目一定有这种场景,系统发布公告,消息更更新,商家和客户私聊,这种场景,为了保证实时性总不能是http一直长轮询,所以就要用到今天说的websocket

WebSocket 是一种网络通信协议,提供了一个在单个长时间连接上进行全双工、双向交互的通道。WebSocket 设计用来在浏览器和服务器之间进行交互式通信会话。用户可以在网页上发送消息到服务器并接收即时响应,而不必每次都重新加载页面。以下是有关 WebSocket 的详细笔记,包括其业务场景、为何出现、HTTP的不足,以及不同的消息推送方式。

为什么出现

这部分是计算机网络中的内容 建议看<<图解http>>

WebSocket 的出现主要是为了解决 HTTP 协议在实时通信方面的一些局限性:

  • 连接重用:HTTP 协议在每次请求时都需要重新建立连接(HTTP/1.1 之前),这在需要频繁通信的场景中效率很低。
  • 非实时性:传统的 HTTP 请求-响应模型不能满足实时互动的需求,因为服务器无法主动向客户端推送信息。
  • 开销较大:每次 HTTP 请求都会携带完整的头信息,增加了不必要的网络负载。

HTTP的不足

  • 单工通信:HTTP 是单工的,客户端发送请求后服务器才能响应,服务器不能主动发送消息。
  • 频繁的连接开销:每个 HTTP 连接在传输完毕后通常都需要关闭,再次通信需要重新建立连接,这在需要频繁实时交互的应用中显得尤为低效。
  • 头部开销:HTTP 请求和响应都包含大量的头部信息,这对于小数据包的传输非常不利。

并且由于http是单向的,必须有客户端发起请求,我们开发的服务端才会接收返回响应

常见的消息推送方式

轮询方式

在这里插入图片描述
SSe
在这里插入图片描述

以及现在说的websocket
在这里插入图片描述

执行过程

因为websocket 也是从http升级而来,更改协议

先了解http的执行过程
  1. 建立连接:浏览器(客户端)通过网络向服务器发起一个 TCP 连接。其中包含3次握手

    • 第一次握手:客户端向服务器发送一个SYN包,告诉服务器我要跟你建立连接。这个SYN包里面包含了客户端的初始序列号。

    • 第二次握手:服务器收到SYN包后,会回复一个SYN+ACK包给客户端。这个ACK是确认客户端的SYN包的,表示服务器已经收到了。同时,服务器也会发送一个自己的SYN包给客户端,告诉客户端我也要跟你建立连接。

    • 第三次握手:客户端收到服务器的SYN+ACK包后,会再回复一个ACK包给服务器。这个ACK是确认服务器的SYN包的,表示客户端也收到了服务器的建立连接请求。

  2. 发送 HTTP 请求:客户端构建 HTTP 请求,包括方法(GET、POST、PUT、DELETE 等)、URI、协议版本,以及必要的请求头和请求体请求发送到服务器。
    服务器处理请求:服务器接收到请求后,解析请求内容,并根据请求的资源和方法执行相应的动作(如从数据库检索数据、处理提交的表单等)。
    发送 HTTP 响应:

  3. 服务器构建 HTTP 响应,包括状态码(如 200 OK、404 Not Found)、响应头和响应体。
    响应发回到客户端。

  4. 关闭连接:在 HTTP/1.0 中,默认情况下,服务器在发送响应后关闭 TCP 连接。HTTP/1.1 支持持久连接(Connection: keep-alive),允许多个请求和响应在同一个连接中传输,从而减少了建立和关闭连接的频率和成本。(这里包含四次挥手)

    • 第一次挥手:客户端向服务器发送一个FIN包,通知将要断开连接了。

    • 第二次挥手:服务器收到FIN包后,会回复一个ACK回调包给客户端,表示已经收到了客户端的断开连接请求。

    • 第三次挥手:服务器在发送完所有数据后,会向客户端发送一个FIN包,告诉客户端我也要断开连接了。

    • 第四次挥手:客户端收到服务器的FIN包后,会回复一个ACK包给服务器,表示已经收到了服务器的断开连接请求。

而websocket的过程可以通下面案列,出现请求状态101的就表示升级为web socket

比如gpt的回复页面
在这里插入图片描述

可以查看请求的消息,一般是初次响应的响应体
在这里插入图片描述

连接过程

客户端发起请求:客户端发送一个特殊的 HTTP 请求,请求升级到 WebSocket。这个请求看起来像一个标准的 HTTP 请求,但包含一些特定的头部字段来指示这是一个 WebSocket 升级请求:

  • Upgrade: websocket:明确请求升级到 WebSocket。

  • Connection: Upgrade:指示这是一个升级请求。

  • Sec-WebSocket-Key:一个 Base64 编码的随机值,服务器将用它来构造一个响应头,以确认连接的有效性。

  • Sec-WebSocket-Version:指示 WebSocket 协议的版本,通常是 13。
    服务器响应:如果服务器支持 WebSocket,并接受升级请求,则它会返回一个 HTTP 101 Switching Protocols 响应,包含以下头部:

  • Upgrade: websocket 和 Connection: Upgrade:确认升级到 WebSocket。

  • Sec-WebSocket-Accept:使用客户端的 Sec-WebSocket-Key 计算得出的一个值,用于验证连接。
    建立 WebSocket 连接:一旦握手成功,原始的 HTTP 连接就升级到 WebSocket 连接。此时,客户端和服务器可以开始在这个长连接上双向发送数据。

数据传输:与 HTTP 不同,WebSocket 允许服务器直接发送消息给客户端,而不需要客户端先发送请求,这对于需要实时数据更新的应用非常有用(例如在线游戏、交易平台等)。
接下来就是实现了,模拟消息聊天

代码实现

代码地址
前端页面(vue+vuetify ui) 有兴趣可以看看
在这里插入图片描述

实现步骤

先讲一下原理,之前说得websocket是双向通道,客户端连接服务端的端点,也就是没有一个连接就有一个端点实例
再java中是通过会话管理通道
建立过程可以知道,再建立连接之前会先进行握手,那么我们就可以再握手的时候对该线程用户进行验证,然后具体端点会有几个对应的生命周期 建立成功 接收到消息 连接关闭 连接异常我们就可以写对应事件,然后再建立成功时候,可以把对应建立成功的会话,再把对应的通道和用户ida进行保存,这样就可以根据id找到具体通道,进行发送消息。

代码逻辑客户端像服务端发起连接,服务端就要有一个路由对应这个连接进行处理,这个路由称为端点endpoint
所以核心就是写端点 完成端点生命周期

值得注意:我的boot 是3,jdk 17,虽然过程一样,但是springboot2关于网络编程这一块的包都是再javax ,3开始就是 jakarta包了

1.添加依赖

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>
  1. 配置类
    作用是对端点进行扫描

在Spring Boot中,ServerEndpointExporter是一个非常重要的Bean,
* 因为它负责扫描和注册所有带有@ServerEndpoint注解的WebSocket端点。
*
* 以下是该Bean的作用及其配置的详细说明:
*
* 作用
* 扫描带有@ServerEndpoint注解的类:ServerEndpointExporter会自动扫描应用程序中的所有@ServerEndpoint注解的类,并将它们注册为WebSocket端点。
* 注册WebSocket端点:它将找到的WebSocket端点注册到默认的容器(如Tomcat、Jetty、Undertow等),使这些端点能够接受和处理WebSocket连接和消息。


@Configuration
public class WebSocketConfig {
 
    @Bean
    public ServerEndpointExporter serverEndpointExporter(){
        return new ServerEndpointExporter();
    }


}

3.写端点配置类 (每个端点的作用不一样所以对端点的配置类也不一样)
集成该配置类进行重写 可以见名知意的发现:检查跨域 获取容器默认配置,获取端点实列都是一些等,而主要这里修改的就是 modifyHandshake(修改握手) 这里说的是还没有建立的时候,这里可以进行处理(比如保存用户信息,解析token,生成唯一info等等)
在这里插入图片描述

@Configuration
public class GetHttpSessionConfig  extends ServerEndpointConfig.Configurator {
    /**
     * 注意:没有一个客户端发起握手,端点就有一个新的实列 那么引用的这个配置也是新的实列 所以内存地址不一样 这里sec的用胡属性也不同就不会产生冲突
     * 修改握手机制  就是第一次http发送过来的握手
     * @param sec   服务器websocket端点的配置
     * @param request
     * @param response
     */
    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
//        super.modifyHandshake(sec, request, response);
        HttpSession httpSession =(HttpSession) request.getHttpSession();
//        将从握手的请求中获取httpsession

        /**
         * 一般会在请求头中添加token 解析出来id作为键值对
         */
        Map<String, Object> properties = sec.getUserProperties();
        /**
         * 一个客户端和和服务器发起一次请求交互 就有一个唯一session
           *存储session 是为了能够从其中用户用户info 到时候作为wssession的key 但是session 不共享 为此redis改进 或者token也是可以的
         * 这里使用UUID作为标识
         */
//        properties.put(HttpSession.class.getName(),httpSession);
        String sessionKey = UUID.randomUUID().toString().replaceAll("-", "");
        properties.put("Connected",sessionKey);
    }
}


代码解析:该端点配置是再模拟新用户访问一个网站,网站主动给用户推送广告的情况,所以用户没有登录,就无法从httpsession,请求头,token,redis中获取个人信息了,所以这里使用的uuid,作为游客的websocketid
因为每有一个客户端建立websocket连接就有一个端点实列,一个端点配置,所以这里的 properties.put(“Connected”,sessionKey);key值相同无妨

  1. 编写端点引用编写的端点配置类 注意每个周期的参数顺序不能错否则会报错
@Slf4j
@Component
@CrossOrigin(origins = "*")
@ServerEndpoint(value = "/chat",configurator = GetHttpSessionConfig.class)//协议升级路由
public class ChatEndpoint {//只要和该路由建立连接 就new 一个新的实列 对应一个该endpoint对象
    //模拟储存当前用户的朋友全信息
    private  static final     Map<String, Session> Friendgroup=new ConcurrentHashMap<String,Session>();//线程安全的银蛇
   private HttpSession httpSession;//存放当前用户信息
    /**
     * 定义的当前用户
     */
    private String userId;

    /**
     * 第一个参数必须是session
     * @param session
     * @param sec   不能是Server
     */
    @OnOpen
    public void onOpen(Session session,EndpointConfig sec){
//        1.保存当前连接用户状态
//每个端点获取该端点携带的httpsession数据
//    this.httpSession = (HttpSession) sec.getUserProperties().get(HttpSession.class.getName());
//    this.httpSession.getAttribute("user");
        String sessionKey =(String) sec.getUserProperties().get("Connected");
        this.userId=sessionKey;//用户上下文填充
//2.把成功建立升级的会话让放入会话组
        Friendgroup.put(sessionKey,session);
//之所以获取http session 是为了获取获取httpsession中的数据 (用户名 /账号/信息)
        System.out.println("websocket建立成功");
//        2.广播消息(如果是好咧别表上下) 模拟放房间提示
        String content="用户id"+sessionKey+"已经上线 愉快玩耍吧";
        Message message = Message.builder()
                .content(content)
                .isSystem(true).build();
        broadcast(message);
        System.out.println("WebSocket 连接建立成功: " + sessionKey);
//        3.

    }
    /**
     * 当断开连接
     * @param session
     */
    @OnClose
    public void onClose(Session session) {
        // 找到关闭会话对应的用户 ID 并从 Friendgroup 中移除
      String sessionKey =(String) sec.getUserProperties().get("Connected");

        if (sessionKey != null) {
            Friendgroup.remove(sessionKey);

            // 广播消息给所有好友
            String content = "用户ID " + sessionKey + " 已经下线";
            Message message = Message.builder()
                    .content(content)
                    .isSystem(true)
                    .build();
            broadcast(message);
        }
    }

    /**
     * 这是接收和处理来自用户的消息的地方。我们需要在这里处理消息逻辑,可能包括广播消息给所有连接的用户。
     * @param //前端可以携带来自forname 但是我在这个实列化内部做了一个上下文
     *
     * 如果接收的消息的是对象 需要解码器,@PathParam("roomId") String roomId, 如果参数写在了第一位 那么就需要使用该注解获取路由的参数信息
     */
    @OnMessage
    public void onMessage(Session session,String message) throws IOException {
        System.out.println("接收到消息"+message);
        Message o = (Message) JSON.parse(message);
        
         String userId =(String) sec.getUserProperties().get("Connected");
        Message message1 = Message.builder().sender(userId)
                .toReceiver(o.getToReceiver())
                .content(o.getContent())
                .build();
        Session session1 = Friendgroup.get(o.ReceiveruserId);
        session1.getBasicRemote().sendText(JSON.toJSONString(message1));
        // 你的其他逻辑
    }

    /**
     * 处理WebSocket中发生的任何异常。可以记录这些错误或尝试恢复。
     */
    @OnError
    public void onError(Throwable error) {
        System.out.println("onError......"+error.getMessage());

    }


    /**
     * 将系统的公告等需要推送的消息发布给所有已经建立连接的用户
     * 用于系统更细发布公告之类的 或者用户上线通知其他
     * @param message
     */
    private  void broadcast(Message message) throws RuntimeException {
        if (message.isSystem()){
        Friendgroup.entrySet()
                .forEach(item->{
//                    遍历每一个键值对
                    Session userSession = item.getValue();
                    try {
                        userSession
                                .getBasicRemote() //同步消息发送器
                                .sendText(JSON.toJSONString(message));
                    } catch (IOException e) {
//                        记录日志 保存文件便于查看
                        throw new RuntimeException(e);
                    }
                    ;
                });
        return;
    }
        else{
            try {
                Friendgroup.get(message.getSender())
                        .getBasicRemote()
                        .sendText(JSON.toJSONString(message));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

发消息的核心代码: Friendgroup.get(message.getSender())
.getBasicRemote()
.sendText(JSON.toJSONString(message));

端点中引用的消息对象:


/**
 * 定义的消息发送对象
 */
@Data
@Builder
public class Message {
    //没有toname toname 是发送请求时候携带的

    //是否系统消息
    private boolean isSystem;
    //私聊情况 :a->服务器->b显示      弹幕:a->服务器->广播 ->前端消息
     private String sender;//来自哪一位用户发的 如果是私聊
    private String content;  //消息内容
    //A->服务器 指定的发送私聊人 这里id
    private String toReceiver;
}

代码解读;
各个注解标识代表对应的生命周期
onopen建立成功:这里的逻辑就是读取配置类中的上下文,得到用户信息存入自身的上下文私有对象,然后用这个作为key,把管理socket的会话存入该map(线程安全:多个websocket会话 必须选这个),然后发布广公告
onclose :关闭前触发,移除会话组,然后提示
onerror:发生错误时候触发,一般记录日志
onMessage:接收到消息时候触发

注意!!!有些初学者会把环境隔离和websocket处理器这个单实例bean 搞混

说是多个连接时候,类中的局部变量id会不断覆盖!,这是肯定的!默认就是单实列bean,这个类中的局部变量肯定是同一个!这个会话map的作用是为了私聊或者群发的?所以是类变量,每个连接的会话session是独立隔离的!,所以需要是对单独连接有保存属性的要求是保存在这这个socket的会话中,不是websocket这个单实例类的局部变量中!

在次说一下,有些人问的这个问题,学好基础,springboot 默认都是但实列,用的变量都是同一个!!!所以如果多个连接保证上下文隔离都是放在会话中,或者是bean改为多实例

因为我这里的演示数据就是一对一的,所以覆盖局部变量id的问题的无所谓,但是实际上应该把用户属性保存在会话中,每个websocket 连接的会话是独立的,局部变量单实例用的都是同一个

@Slf4j
@Component
@CrossOrigin(origins = "*")
@ServerEndpoint(value = "/chat", configurator = GetHttpSessionConfig.class) // 协议升级路由
public class ChatEndpoint {

    // 模拟储存当前所有连接的会话信息(线程安全)
    private static final Map<String, Session> Friendgroup = new ConcurrentHashMap<>();

    // 用户的会话信息
    private HttpSession httpSession;
    private String userId;

    /**
     * 连接建立时调用,保存当前用户的会话信息。
     * @param session
     * @param sec
     */
    @OnOpen
    public void onOpen(Session session, EndpointConfig sec) {
        // 获取连接时携带的 HttpSession 数据
        this.httpSession = (HttpSession) sec.getUserProperties().get(HttpSession.class.getName());
        //从会话取id 
        this.userId = (String) sec.getUserProperties().get("Connected"); // 获取用户 ID

        // 将该会话放入会话组中
        Friendgroup.put(userId, session);
        log.info("WebSocket 连接建立成功: {}", userId);

        // 广播消息:用户上线
        String content = "用户 " + userId + " 已经上线";
        Message message = new Message(content, true);
        broadcast(message);
    }

    /**
     * 连接断开时调用,移除用户的会话信息。
     * @param session
     */
    @OnClose
    public void onClose(Session session) {
        // 获取用户 ID 从会话取id 避免多个连接 导致当先局部变量id 是其他连接的值 
        String sessionKey = (String) session.getUserProperties().get("Connected");
        
        if (sessionKey != null) {
            Friendgroup.remove(sessionKey);
            log.info("用户 {} 已下线", sessionKey);

            // 广播消息:用户下线
            String content = "用户 " + sessionKey + " 已经下线";
            Message message = new Message(content, true);
            broadcast(message);
        }
    }

    /**
     * 接收来自用户的消息并转发给目标用户。
     * @param session
     * @param message
     * @throws IOException
     */
    @OnMessage
    public void onMessage(Session session, String message) throws IOException {
        log.info("接收到消息: {}", message);

        // 解析消息并发送给接收者
        Message o = JSON.parseObject(message, Message.class);
        String senderId = (String) session.getUserProperties().get("Connected");

        // 目标接收者的 session
        Session receiverSession = Friendgroup.get(o.getReceiverUserId());
        if (receiverSession != null) {
            Message messageToSend = new Message(senderId, o.getReceiverUserId(), o.getContent());
            receiverSession.getBasicRemote().sendText(JSON.toJSONString(messageToSend));
        } else {
            log.warn("接收者 {} 不在线", o.getReceiverUserId());
        }
    }

    /**
     * 处理 WebSocket 发生的错误。
     * @param error
     */
    @OnError
    public void onError(Throwable error) {
        log.error("WebSocket 错误: {}", error.getMessage(), error);
    }

    /**
     * 将系统公告或消息推送给所有在线用户。
     * @param message
     */
    private void broadcast(Message message) {
        Friendgroup.forEach((userId, userSession) -> {
            try {
                userSession.getBasicRemote().sendText(JSON.toJSONString(message));
            } catch (IOException e) {
                log.error("消息推送失败", e);
            }
        });
    }
}

很方便记忆的是前端也是这几个并且命名也一样
在这里插入图片描述
进行测试:通过alert打印出来
在这里插入图片描述
确实接收到服务器消息
在这里插入图片描述
日志也输出成功
在这里插入图片描述
当然这样就实现了上号时候服务器之前的群发,当然一对一私发也可以但是需要再请求头或者哪里携带身份凭证,握手测试demo 所以就是用的无携带凭证后端生成随机uuid作为会话id,所以为了演示点对点的私聊,这里就不做jwt了选择发送握手的时候携带参数

优化演示demo

前端演示
在这里插入图片描述

前端这里不想再做多用户了,直接修改钩子函数多创建几个窗口,让这些角色都再后台注册会话,写了俩个页面,一个明日香,一个真嗣

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

并且用字体颜色来表明俩人
修改后端端点代码

@Slf4j
@Component
@CrossOrigin(origins = "*")
@ServerEndpoint(value = "/chat/{userName}",configurator = GetHttpSessionConfig.class)//协议升级路由
public class ChatEndpoint {//只要和该路由建立连接 就new 一个新的实列 对应一个该endpoint对象
    //模拟储存当前用户的朋友全信息
    private  static final     Map<String, Session> Friendgroup=new ConcurrentHashMap<String,Session>();//线程安全的银蛇
   private HttpSession httpSession;
    /**
     * 定义的当前用户
     */
    private String userId;

    /**
     * 第一个参数必须是session
     * @param session
     * @param sec   不能是Server
     */
    @OnOpen
    public void onOpen(Session session,EndpointConfig sec,@PathParam("userName") String userName){

        this.userId=userName;//用户上下文填充
//2.把成功建立升级的会话让放入会话组
        String sessionKey=userName;
        Friendgroup.put(userName,session);
//之所以获取http session 是为了获取获取httpsession中的数据 (用户名 /账号/信息)
        System.out.println("websocket建立成功");
//        2.广播消息(如果是好咧别表上下) 模拟放房间提示
        String content="用户id"+sessionKey+"已经上线 愉快玩耍吧";
        Message message = Message.builder()
                .content(content)
                .isSystem(true).build();
        broadcast(message);
        System.out.println("WebSocket 连接建立成功: " + sessionKey);
//        3.

    }
    /**
     * 当断开连接
     * @param session
     */
    @OnClose
    public void onClose(Session session) {
        // 找到关闭会话对应的用户 ID 并从 Friendgroup 中移除
        String sessionKey = this.userId;

        if (sessionKey != null) {
            Friendgroup.remove(sessionKey);

            // 广播消息给所有好友
            String content = "用户ID " + sessionKey + " 已经下线";
            Message message = Message.builder()
                    .content(content)
                    .isSystem(true)
                    .build();
            broadcast(message);
        }
    }

    /**
     * 这是接收和处理来自用户的消息的地方。我们需要在这里处理消息逻辑,可能包括广播消息给所有连接的用户。
     * @param //前端可以携带来自forname 但是我在这个实列化内部做了一个上下文
     *
     * 如果接收的消息的是对象 需要解码器,@PathParam("roomId") String roomId, 如果参数写在了第一位 那么就需要使用该注解获取路由的参数信息
     */
    @OnMessage
    public void onMessage(Session session,String message) throws IOException {
        System.out.println("接收到消息"+message);
        JSONObject json = JSON.parseObject(message);
        // 从JSONObject中提取必要的字段
        String sender = json.getString("sender");
        String content = json.getString("content");
        String toReceiver = json.getString("toReceiver");

        // 创建Message对象
        Message message1 = Message.builder()
                .sender(sender)
//                .toReceiver(toReceiver) //发给谁这个信息无需填写
                .content(content)
                .build();
//调用发送方的会话 发送给他的客户端显示
        Session session1 = Friendgroup.get(toReceiver);
        session1.getBasicRemote().sendText(JSON.toJSONString(message1));
        // 你的其他逻辑

        }

    /**
     * 处理WebSocket中发生的任何异常。可以记录这些错误或尝试恢复。
     */
    @OnError
    public void onError(Throwable error) {
        System.out.println("onError......"+error.getMessage());

    }


    /**
     * 将系统的公告等需要推送的消息发布给所有已经建立连接的用户
     * 用于系统更细发布公告之类的 或者用户上线通知其他
     * @param message
     */
    private  void broadcast(Message message) throws RuntimeException {
        if (message.isSystem()){
        Friendgroup.entrySet()
                .forEach(item->{
//                    遍历每一个键值对
                    Session userSession = item.getValue();
                    try {
                        userSession
                                .getBasicRemote() //同步消息发送器
                                .sendText(JSON.toJSONString(message));
                    } catch (IOException e) {
//                        记录日志 保存文件便于查看
                        throw new RuntimeException(e);
                    }
                    ;
                });
        return;
    }
        else{
            try {
                Friendgroup.get(message.getSender())
                        .getBasicRemote()
                        .sendText(JSON.toJSONString(message));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }
}

主要修改是 请求时候直接再路由携带了用户名字,那么这里演示就不再使用uuid随机生成的key来管理会话了,使用用户名字来管理会话模拟好友,实际开发还是需要再握手的时候解析http请求数据来存储关键信息哈

端点携带参数的形式很想restful风格

/路由/{参数名}
@PathParam("userName") String userName

私聊逻辑
发送私聊的一对一实现原理 发送消息时候携带需要发送的对象key,通过该健获取该用户的会话然后发送信息,这里的key是用户名

  @OnMessage
    public void onMessage(Session session,String message) throws IOException {
        System.out.println("接收到消息"+message);
        JSONObject json = JSON.parseObject(message);
        // 从JSONObject中提取必要的字段
        String sender = json.getString("sender");
        String content = json.getString("content");
        String toReceiver = json.getString("toReceiver");

        // 创建Message对象
        Message message1 = Message.builder()
                .sender(sender)
//                .toReceiver(toReceiver) //发给谁这个信息无需填写
                .content(content)
                .build();
//调用发送方的会话 发送给他的客户端显示
        Session session1 = Friendgroup.get(toReceiver);
        session1.getBasicRemote().sendText(JSON.toJSONString(message1));
        // 你的其他逻辑

        }

前端代码:这里写了发送方数据是为了前端渲染页面,如果再其他里面做了处理,可以不用写发送方数据,减少负载,主要是发送消息对容和对方的会话存储的key
在这里插入图片描述

异步优化

当出现高并发等高性能需求时候,可以采用异步发发送器,让线程不在这里堵塞
替换为AsycRemote
在这里插入图片描述

测试:
当明日香发送你好时候
在这里插入图片描述

真嗣用户可以成功收到,并且回复也可以收到
在这里插入图片描述
就此完成实现,

总结步骤

添加依赖:确保在pom.xml中添加Spring WebSocket和WebSocket依赖。

创建WebSocket处理器(端点):编写一个处理WebSocket消息的处理器。
完成对应的生命周期
如果需要传递http第一次握手时候处理信息 需要添加对应的处理配置

配置WebSocket:配置WebSocket相关的Bean和端点(值得注意的是每一个端点对象对一个用户线程 所以spring的单实列bean和异步处理再这里无法生效 具体会在踩坑笔记中提及)整合的一些细节

实践细节

websocket 中无法依赖注入?

我在socket中收到的消息 想要使用redis 或者mq进行存储和转发 但是却发现@Auwered字段和@Resouce都无法注入依赖

@ServerEndpoint注解的类在WebSocket服务器端点中无法直接使用Spring的依赖注入机制,如字段注入(@Autowired和@Resource)或构造器注入。这是因为@ServerEndpoint注解的类是由Java WebSocket API(JSR 356)管理的,而不是由Spring容器直接管理的。

为了在@ServerEndpoint类中实现依赖注入,可以使用SpringConfigurator或其他类似的配置器来辅助实现。以下是一个示例,展示了如何使用SpringConfigurator来实现依赖注入:

所以需要修稿配置类

端点配置
@Component
public class SpringConfigurator extends ServerEndpointConfig.Configurator {

    private static ApplicationContext applicationContext;

    /**、
     * 这段代码的作用是将Spring的ApplicationContext注入到SpringConfigurator类中,以便在WebSocket端点中实现依赖注入。
     * 具体来说,这段代码通过Spring的@Autowired注解将Spring上下文(ApplicationContext)注入到SpringConfigurator类的静态字段中。
     * @param applicationContext
     */
    @Autowired
    public void setApplicationContext(ApplicationContext applicationContext) {
        SpringConfigurator.applicationContext = applicationContext;
    }

    /**
     * @doc
     * 为什么写了这个类就可以在 WebSocket 端点中进行注入数据?
     * 在 WebSocket 端点中,您可以使用 Spring 的 @Autowired 注解注入 Spring 管理的 Bean,因为 SpringConfigurator 类配置了 getEndpointInstance
     * 方法来从 Spring 上下文中获取 WebSocket 端点实例。这允许您在 WebSocket 端点中利用 Spring 的依赖注入和管理特性。
     * @param clazz
     * @return
     * @param <T>
     * @throws InstantiationException
     */
    @Override
    public <T> T getEndpointInstance(Class<T> clazz) throws InstantiationException {
        return applicationContext.getBean(clazz);
    }
    @Override
    public void modifyHandshake(ServerEndpointConfig sec, HandshakeRequest request, HandshakeResponse response) {
//        super.modifyHandshake(sec, request, response);

//        将从握手的请求中获取httpsession
        String device_id = request.getParameterMap().get("device_id").get(0);


        /**
         * 一般会在请求头中添加token 解析出来id作为键值对
         */
        java.util.Map<String, Object> properties =  sec.getUserProperties();
        /**
         */
        properties.put("connection",Device_PREFIX+device_id);
    }
}

SOCKET 端点
@Slf4j
@Component
@ServerEndpoint(value = "/pda", configurator = SpringConfigurator.class)
public class DeviceEndpoint {

    private final MqService mqService;
    @Autowired
    public DeviceEndpoint(MqService mqService) {
        this.mqService = mqService;
    }
    public static final ConcurrentHashMap<String, Session> deviceMap = new ConcurrentHashMap<>();
    private String deviceId="";


    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
         deviceId = (String) config.getUserProperties().get("connection");
        deviceMap.put(deviceId, session);
        log.info("设备连接:{}", deviceId);
    }
    @OnMessage
    public void onMessage(Session session, String message) {
            log.info("接收到设备数据:{}", message);
            mqService.sysSend(message,  deviceId  );
//            session.getBasicRemote().sendText("数据处理成功");
    }
    @OnClose
    public void onClose(Session session) {
        String deviceId = (String) session.getUserProperties().get("connection");
            if (deviceId != null && !deviceId.isEmpty()) {
                deviceMap.remove(deviceId);
                log.info("设备断开连接:{}", deviceId);
            }
    }
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("WebSocket error for session {}: {}", session.getId(), error.getMessage(), error);
    }
}

为什么不同的连接访问websocket端点的局部变量是同一个

在Springboot整合的这个websocket中,使用compoent注解 默认就会把端点注册为单实例 ,;这样就有个问题 多台用户访问一个端点的时候,按照websocket的设计思想应该是一个线程一个连接 ,环境之间隔离 ,但是单实例不隔离了 因为如果把连接携带的参数放在当前端点类的局部变量,就导致但实列下 这些变量相互覆盖,但是socket一般连接穿的比较少的化 握手阶段就可以直接把参数设置 sec.getUserProperties()配置清单中; 不推荐多实列端点 如果是需要多实例 每个端点中的变量都需要存储当前连接时候开起多实例

我的现在的业务场景

每s接近100个消息送到socket,这种极端环境下,这样就产生问题了,**设备是有序发送消息是有序的,但是由于网络原因到达的时序就不同 **.
所以我的解决方案是存入的时候就采用有序队列
并且每个消息采用字符串前面+序列号的形式(不能采用json 无论是goson还是fastjson 数据量大了以后会解析为空),然后每次从队列取值取出有序队列的第一个值 进行比较 这个顺序是否大于我服务器的时间令牌
问题二:当出现我这里 俩三分钟 要处理的消息有几十万的情况下,socket的处理线程会变成多线程(我代码中大概如上 没有显示起线程 但是还是有默认线程),那么就出现了线程问题(我个人是觉得我接收到同时接收到消息的并发 多了 spring容器避免消息丢失 默认采用多线程处理 ,就不再是一个线程一个会话了 是多个线程·一个会话 更何况我这里还有mq发消息 的处理 ,即使是同步发送,但是这个没有发完处理逻辑 ,马上socket端点又接受到消息了)
好 目前的解决思路就是多实列 然后维护一个类属性的时间令牌和会话对象,并且使用锁和原子类保证令牌的改变原子和有序,多实列的化

服务器场景 多个线程处理消息 导致 比对的令牌不是原子的
在这里插入图片描述


/**
 * 原因分析
 * @Component 注解的作用:Spring 默认会将标注了 @Component 的类作为单例(singleton)管理,也就是说,
 * 只有一个 DeviceEndpoint 实例在应用中共享。如果多个客户端连接,都会共享同一个实例,这会导致 orderToken 被多个连接共享。
 * WebSocket 的实例化方式:@ServerEndpoint 注解通常由 Web 容器\
 * (如 Tomcat)管理,而不是 Spring,因此每次客户端连接时,应该有一个新的 DeviceEndpoint 实例创建。但如果使用 @Component 注解,Spring 容器会把这个类变成单例,这可能破坏 Web 容器原本的多实例管理模式。
 */
@Component
@Scope("prototype")
  // 确保每个设备只有一个 DeviceEndpoint 实例
@AllArgsConstructor
@Slf4j
@ServerEndpoint(value = "/pda", configurator = SpringConfigurator.class)
public class DeviceEndpoint {

    private final MqService mqService;

    // 使用线程安全的ConcurrentHashMap来管理设备会话
   public static final ConcurrentHashMap<String, Session> deviceMap = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, AtomicInteger> orderMap = new ConcurrentHashMap<>();
    private static final ConcurrentHashMap<String, PriorityBlockingQueue<String>> messageQueues = new ConcurrentHashMap<>();
    private Session session;
    private String deviceId = "";
    private int orderToken = 0;

    @Autowired
    public DeviceEndpoint(MqService mqService) {
        this.mqService = mqService;
    }

    // 当设备连接时触发
    @OnOpen
    public void onOpen(Session session, EndpointConfig config) throws IOException {
        deviceId = (String) config.getUserProperties().get("connection");
        session.getBasicRemote().sendText("连接成功");
        deviceMap.put(deviceId, session);
        messageQueues.put(deviceId, new PriorityBlockingQueue<>(11,
                (msg1, msg2) -> Integer.compare(extractOrderFromMessage(msg1), extractOrderFromMessage(msg2))));
        orderMap.put(deviceId, new AtomicInteger(0));
        log.info("设备连接成功:{}", deviceId);
    }


    @OnMessage
    public void onMessage(String message, Session session) throws IOException {
        log.info("接收到设备消息:{}", message);
        if (message.equals("id")) {
            session.getBasicRemote().sendText(deviceId);
            return;
        }
        if (message == null || message.trim().isEmpty()) {
            log.warn("接收到空或无效的消息");
            return;
        }

        // 将消息添加到设备的消息队列
        PriorityBlockingQueue<String> queue = messageQueues.get(deviceId);
        if (queue != null) {
            queue.add(message);
            processOutOfOrderMessages(deviceId);
        }
    }

    @OnClose
    public void onClose(Session session) {
        String deviceId = (String) session.getUserProperties().get("connection");
        if (deviceId != null && !deviceId.isEmpty()) {
            deviceMap.remove(deviceId);
            log.info("设备断开连接:{}", deviceId);
        }
    }

    @OnError
    public void onError(Session session, Throwable error) throws IOException {
//        session.getBasicRemote().sendText(" server error");
        log.error("WebSocket error for session {}: {}", session.getId(), error.getMessage(), error);
        session.close();
    }
    // 处理乱序消息的方法
    private void processOutOfOrderMessages(String deviceId) {
        PriorityBlockingQueue<String> queue = messageQueues.get(deviceId);
        AtomicInteger orderToken = orderMap.get(deviceId);

        if (queue != null && orderToken != null) {
            while (!queue.isEmpty()) {
                String queuedMessage = queue.peek();
                if (queuedMessage == null) {
                    log.warn("取出的消息为空,队列内容:{}", queue);
                    queue.poll();
                    continue;
                }

                int queuedOrder = extractOrderFromMessage(queuedMessage);
                if (queuedOrder == 0) {
                    orderToken.set(0);
                }

                // 如果消息的顺序号大于等于当前的顺序号,则处理消息
                if (queuedOrder >= orderToken.get()) {
                    queue.poll();  // 从队列中移除消息
                    String processedMessage = queuedMessage.substring(queuedMessage.indexOf('{'));
                    mqService.sysSend(processedMessage, deviceId);
                    log.info("处理消息已经处理:{}, 当前服务器顺序值{}", processedMessage, queuedOrder);
                    orderToken.set(queuedOrder);  // 更新顺序号
                } else {
                    log.warn("丢弃顺序不正确的消息: {} 当前服务器顺序值{}", queuedMessage, orderToken.get());
                    queue.poll();  // 丢弃不符合顺序的消息
                }
            }
        }
    }


    private static int extractOrderFromMessage(String message) {
        try {
            return Integer.parseInt(message.substring(0, message.indexOf('{')));
        } catch (Exception e) {
            log.error("提取消息顺序号时发生错误,消息格式不正确: {}", message, e);
            return -1;
        }
    }
}

还是不够极限 由于mq的耗时 导致每次消息的消费时间就有很多websocket的消息推送到了,除开mq异步发送的优化外,websocket 也需要加一个重试的任务队列

@Slf4j
@Component
//@Scope("prototype") 如果是需要多实例 每个端点中的变量都需要存储当前连接时候开起多实例
@ServerEndpoint(value = "/pda", configurator = SpringConfigurator.class)
public class DeviceEndpoint {
    private  final MqService mqService;
    public static final ConcurrentHashMap<String, Session> deviceMap = new ConcurrentHashMap<>();
    private String deviceId="";
    private static final int CORE_POOL_SIZE = 20; // 核心线程数
    private static final int MAX_POOL_SIZE = 100; // 最大线程数
    private static final long KEEP_ALIVE_TIME = 60L; // 线程空闲时间
//    累计1000个消息还有没有处理就新建线程
    private static final int QUEUE_CAPACITY = 1000; // 队列容量
//    private static final ExecutorService executorService = Executors.newFixedThreadPool(3);
    private static final ExecutorService executorService = new ThreadPoolExecutor(
            CORE_POOL_SIZE,
            MAX_POOL_SIZE,
            KEEP_ALIVE_TIME,
            TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(QUEUE_CAPACITY),
            new CustomThreadFactory(),
            new CustomRejectedExecutionHandler()
    );

    private static class CustomRejectedExecutionHandler implements RejectedExecutionHandler {
        
//        等待任务队列
        private final BlockingQueue<Runnable> retryQueue = new LinkedBlockingQueue<>();

        public CustomRejectedExecutionHandler() {
            // 启动处理被拒绝任务的线程
            ExecutorService retryExecutor = Executors.newSingleThreadExecutor();
            retryExecutor.submit(this::retryRejectedTasks);
        }

        /**
         * 被拒绝触发
         * @param r the runnable task requested to be executed
         * @param executor the executor attempting to execute this task
         */
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            log.warn("任务被拒绝,当前线程池已满,将任务放入等待队列");
            retryQueue.offer(r); // 将被拒绝的任务放入等待队列
        }

        // 尝试重新执行等待队列中的任务
        private void retryRejectedTasks() {
            while (true) {
                try {
                    Runnable task = retryQueue.take(); // 从等待队列中取出任务
                    executorService.execute(task); // 重新提交到线程池执行
                } catch (Exception e) {
                    log.error("重新执行被拒绝任务失败", e);
                }
            }
        }
    }

//
    private  final LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<>();
    @Autowired
    public DeviceEndpoint(MqService mqService) {
        this.mqService = mqService;
    }

    @OnOpen
    public void onOpen(Session session, EndpointConfig config) {
       this.deviceId = (String) config.getUserProperties().get("connection");
        deviceMap.put(deviceId, session);
        log.info("设备连接成功:{}", deviceId);
    }
    @OnMessage
    public void onMessage(Session session, String message) throws IOException {
        if (Strings.isBlank(message)) {
            String deviceId = (String) session.getUserProperties().get("connection");
            log.warn("设备会话{}发送消息为空", deviceId);
            return;
        }

        // 提交到线程池直接处理
        executorService.submit(() -> {
            try {
                log.info("接收到设备数据:{}", message);
                //不使用封装的同步发送
//                mqService.sysSend(message, deviceId);
                mqService.sendMessageAsync(message, deviceId);
            } catch (Exception e) {
                log.info("线程池活跃线程数: {}", ((ThreadPoolExecutor) executorService).getActiveCount());
                log.error("发送消息到MQ失败", e);
            } finally {

            }
        });
    }
    @OnClose
    public void onClose(Session session) {
        try {
            String deviceId = (String) session.getUserProperties().get("connection");
            if (deviceId != null && !deviceId.isEmpty()) {
                deviceMap.remove(deviceId);
                log.info("设备断开连接:{}", deviceId);
            }
        } finally {
            try {
                session.close();
            } catch (IOException e) {
                log.error("关闭会话时发生错误", e);
            }
        }
    }
    @OnError
    public void onError(Session session, Throwable error) {
        try {
            log.error("websocket 产生错误当前会话id {}:错误信息 {}", session.getId(), error.getMessage(), error);
        } finally {
            try {
                session.close();
            } catch (IOException e) {
                log.error("关闭会话时发生错误", e);
            }
        }
    }
}

注意这个改成单实例也是一样的,因为设置的存储数据是类变量,多实列策略是之前的场景当为了隔离大量消息的客户端,来避免多个线程的消息脏读问题,但是这样资源消耗也很大

netty +web socket 进行优化

由于 spring-boot-starter-websocket 是采用的tomcat 作为服务器 ,在这种长连接io的场景并发量不如bio模型的netty,所以采用Springboot服务在嵌套一个netty 服务器

关于netty 入门(黑马的)

springBoot + netty搭建高性能 websocket 服务 & 性能测试(包含python 测试脚本)
但是我自己跟着码的时候是有问题的 第一次握手携带query参数的时候要么无法握手要么这个连接发送消息即报错
依赖安装

	<dependencies>
		<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
		<dependency>
			<groupId>io.netty</groupId>
			<artifactId>netty-all</artifactId>
			<version>4.1.36.Final</version>
		</dependency>

	</dependencies>

看完上述笔记可知,netty是一个bio的服务器重要组件有事件循环组,EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理 Channel 上源源不断的 io 事件。而我们的需要做的websocket处理器添加到管道

因为原生 WebSocket 缺少类似 Session 的连接管理,所以我们实现了一个自定义的连接池 NioWebSocketChannelPool 来管理客户端的连接通道。
由于原生的websocket 并没有像starter 一样采用session 管理连接,而是通道,所以这里也建立一个保存连接的容器池

客户端连接容器

因为原生 WebSocket 缺少类似 Session 的连接管理,所以我们实现了一个自定义的连接池 NioWebSocketChannelPool 来管理客户端的连接通道。


@Slf4j
@Component
@Data
public class NioWebSocketChannelPool {

    // 使用 DefaultChannelGroup 管理通道组,使用 ConcurrentMap 高效查找通道
    private final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private final ConcurrentMap<String, Channel> channelMap = new ConcurrentHashMap<>();

    /**
     * 新增一个客户端通道
     *如果觉得channl的id的asLongText()方法过于冗长可以推荐使用业务:id:id的形式作为key
     * @param channel 新的通道
     */
    public void addChannel(Channel channel) {
        channels.add(channel);
        channelMap.put(channel.id().asLongText(), channel);  // 使用 map 高效查找
    }

    /**
     * 移除一个客户端连接通道
     *
     * @param channel 要移除的通道
     */
    public void removeChannel(Channel channel) {
        channels.remove(channel);
        channelMap.remove(channel.id().asLongText());  // 从 map 中移除
    }

    /**
     * 获取所有活跃的客户端连接
     *
     * @return 活跃通道组
     */
    public ChannelGroup getChannels() {
        return channels;
    }

    /**
     * 通过通道 ID 获取通道
     *
     * @param channelId 通道 ID
     * @return 通道,若未找到则返回 null
     */
    public Channel getChannelById(String channelId) {
        return channelMap.get(channelId);  // 使用 map 的 O(1) 查找时间
    }

    /**
     * 向所有连接的客户端广播消息
     *
     * @param message 要广播的消息
     */
    public void broadcastMessage(String message) {
        channels.forEach(channel -> {
            if (channel.isActive()) {
                channel.writeAndFlush(message).addListener(future -> {
                    if (!future.isSuccess()) {
                        log.error("发送消息到客户端失败,目标客户端:{}", channel.id());
                    }
                });
            }
        });
    }
}

配置

将 WebSocket 的配置项封装到一个配置类 WebSocketProperties 中,方便通过 @ConfigurationProperties 统一管理。
整合到ioc ,配置信息

@Data
@Component
@ConfigurationProperties(prefix = "chat.websocket")

public class WebSocketProperties {
    @Value("${chat.websocket.port}")
    private Integer port ; // 监听端口
    @Value("${chat.websocket.path}")
    private String path ; // 请求路径
    /**
     * Boss Group:
     * 一般设置为 1 到 2 个线程。由于 boss 组负责接受连接请求,通常不需要太多线程。
     * Work Group:
     * 线程数量建议设置为 CPU 核心数的 2 倍到 4 倍。例如,如果服务器有 8 个 CPU 核心,可以设置为 16 到 32 个线程。可以使用以下公式:
     *
     *     Work Threads = Core CPU Count × (2 到 4)
     */
    @Value("${chat.websocket.boss}")
    private Integer boss ; // bossGroup线程数
    @Value("${chat.websocket.work}")
    private Integer work ; // workGroup线程数

}


对应的yaml 配置

chat:
  websocket:
    port: 4400
    path: /ws
    boss: 2
    work: 20

websocket处理器

NioWebSocketHandler 负责处理 WebSocket 事件,包括连接、断开、消息接收、握手处理等。


/**
 * 通道初始化的时候注释掉 WebSocketServerProtocolHandler(webSocketProperties.getPath(), null, true, 65536))的逻辑
 * 因为在客户端请求的时候,会先发送一个 HTTP 请求,然后再发送 WebSocket 请求,导致 WebSocketServerProtocolHandler 无法正确处理 WebSocket 请求
 * 所以注释掉 WebSocketServerProtocolHandler 相关逻辑,由 NioWebSocketHandler 自己处理 WebSocket 请求
 * 如果不注释掉 WebSocketServerProtocolHandler 相关逻辑,则会导致客户端无法正常连接到 WebSocket 服务端
 */
@Slf4j
@ChannelHandler.Sharable
@Component
@AllArgsConstructor
public class NioWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private final NioWebSocketChannelPool webSocketChannelPool;
    private final WebSocketProperties webSocketProperties;

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        log.info("客户端连接成功:{}", ctx.channel().id().asLongText());
        webSocketChannelPool.addChannel(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) {
        log.info("客户端断开连接:{}", ctx.channel().id().asLongText());
        // 移除连接并释放资源
        webSocketChannelPool.removeChannel(ctx.channel());
        ctx.close();
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            if (msg instanceof FullHttpRequest) { // HTTP 请求处理 携带参数的
                handleHttpRequest(ctx, (FullHttpRequest) msg);
            } else if (msg instanceof WebSocketFrame) { // WebSocket 帧处理
                super.channelRead(ctx, msg); // 转交给 `channelRead0` 处理
            }
        } catch (Exception e) {
            log.error("处理客户端请求时发生异常:", e);
            ctx.close();
        }
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) {
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
        } else if (frame instanceof TextWebSocketFrame) {
            handleTextFrame(ctx, (TextWebSocketFrame) frame);
        } else if (frame instanceof CloseWebSocketFrame) {
            ctx.close();
        } else {
            log.warn("收到未知类型的 WebSocket 帧:{}", frame.getClass().getName());
        }
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) {
        try {
            String uri = request.uri();
            log.info("客户端请求路径:{}", uri);

            // 解析查询参数
            String path = uri.split("\\?", 2)[0];
            Map<String, String> params = RequestUriUtils.getParams(uri);
            log.info("解析出的参数:{}", params);
            //解析请求头
            System.out.println("携带的token:" + request.headers().get("Authorization"));
//            遍历携带参数
            params.forEach((k, v) -> {
                log.info("参数:{}={}", k, v);
            });
            if (webSocketProperties.getPath().equals(RequestUriUtils.getBasePath(path))) {
                WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
                        getWebSocketLocation(request), null, true, 5 * 1024 * 1024);
                WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(request);
                if (handshaker == null) {
                    WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
                } else {
                    handshaker.handshake(ctx.channel(), request);
                }
            } else {
                log.warn("路径不匹配:{}", path);
                ctx.close();
            }
        } finally {
            request.release(); // 确保 FullHttpRequest 被正确释放
        }
    }

    private void handleTextFrame(ChannelHandlerContext ctx, TextWebSocketFrame frame) {
        String message = frame.text();
        log.info("客户端发送消息:{}", message);

        if (message.startsWith("BROADCAST:")) {
            broadcastMessageToAll(message.substring("BROADCAST:".length()));
        } else if (message.startsWith("TO:")) {
            String[] parts = message.split(":", 3);
            if (parts.length == 3) {
                sendMessageToOne(parts[1], parts[2]);
            } else {
                log.warn("私信格式错误:{}", message);
            }
        } else {
            ctx.channel().writeAndFlush(new TextWebSocketFrame("收到消息:" + message));
        }
    }

    private String getWebSocketLocation(FullHttpRequest request) {
        String location = request.headers().get(HttpHeaderNames.HOST) + webSocketProperties.getPath();
        return "ws://" + location;
    }

    private void broadcastMessageToAll(String message) {
        // 使用异步任务避免阻塞主线程
        webSocketChannelPool.getChannels().forEach(channel -> {
            if (channel.isActive()) {
                channel.writeAndFlush(new TextWebSocketFrame(message)).addListener(future -> {
                    if (!future.isSuccess()) {
                        log.error("广播消息失败,目标客户端:{}", channel.id(), future.cause());
                    }
                });
            }
        });
    }


    private void sendMessageToOne(String channelId, String message) {
        Channel channel = webSocketChannelPool.getChannelById(channelId);
        if (channel != null && channel.isActive()) {
            channel.writeAndFlush(new TextWebSocketFrame(message)).addListener(future -> {
                if (!future.isSuccess()) {
                    log.error("发送消息失败,目标客户端:{}", channelId, future.cause());
                }
            });
        } else {
            log.warn("目标客户端未在线:{}", channelId);
        }
    }


    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        log.error("处理过程中发生异常:", cause);
        ctx.close();
    }
}

通道初始化,添加websocket的处理

@AllArgsConstructor
@Component
public class NioWebSocketChannelInitializer extends ChannelInitializer<SocketChannel> {


  final   private WebSocketProperties webSocketProperties;

final     private NioWebSocketHandler nioWebSocketHandler;

    @Override
    protected void initChannel(SocketChannel socketChannel) {
        socketChannel.pipeline()
                // 1. HTTP请求处理器
                .addLast(new HttpServerCodec()) // 解码HTTP请求
                .addLast(new ChunkedWriteHandler()) // 处理大文件的分块写操作
                .addLast(new HttpObjectAggregator(8192)) // 合并HTTP请求成完整对象

                //WebSocketServerProtocolHandler 会自动处理 WebSocket 握手。
                // 如果 nioWebSocketHandler 或其他逻辑中重复处理了握手,也会导致冲突。
                // 所以,NioWebSocketHandler 中如果连接时候携带?token=xxx 等参数,发消息就会失败
//                .addLast(new WebSocketServerProtocolHandler(webSocketProperties.getPath(), null, true, 65536))
                .addLast(nioWebSocketHandler)
        ; // 处理其他路径下的 WebSocket 消息
    }

}

实现ioc 的初始化和销毁周期 让netty和tomcat 一起销毁创建

@Slf4j
@Component
public class NioWebSocketServer implements InitializingBean, DisposableBean {

    @Autowired
    private WebSocketProperties webSocketProperties;
    @Autowired
    private NioWebSocketChannelInitializer webSocketChannelInitializer;

    private EventLoopGroup bossGroup;
    private EventLoopGroup workGroup;
    private ChannelFuture channelFuture;

    @Override
    public void afterPropertiesSet() throws Exception {
        try {
            bossGroup = new NioEventLoopGroup(webSocketProperties.getBoss());
            workGroup = new NioEventLoopGroup(webSocketProperties.getWork());
/**
 * 启动 Netty 服务器
 */
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.option(ChannelOption.SO_BACKLOG, 1024)
                    .group(bossGroup, workGroup)
                    .channel(NioServerSocketChannel.class)
                    .localAddress(webSocketProperties.getPort())
                    .childHandler(webSocketChannelInitializer);

            channelFuture = serverBootstrap.bind().sync();
        } finally {
            if (channelFuture != null && channelFuture.isSuccess()) {
                log.info("netty 服务器启动成功 on port: {} (websocket) with context path '{}'", webSocketProperties.getPort(), webSocketProperties.getPath());
            } else {
                log.error("Netty server startup failed.");
                if (bossGroup != null)
                    bossGroup.shutdownGracefully().sync();
                if (workGroup != null)
                    workGroup.shutdownGracefully().sync();
            }
        }
    }

    @Override
    public void destroy() throws Exception {
        log.info("Shutting down Netty server...");
        if (bossGroup != null)
            bossGroup.shutdownGracefully().sync();
        if (workGroup != null)
            workGroup.shutdownGracefully().sync();
        if (channelFuture != null)
            channelFuture.channel().closeFuture().syncUninterruptibly();
        log.info("Netty server shutdown.");
    }
}

此时启动springboot项目,netty 服务器也会跟着启动和销毁
go脚本极端情况总共10w连接

package main

import (
	"fmt"
	"log"
	"net/http"
	"sync"
	"time"

	"github.com/gorilla/websocket"
)

const (
	serverURL     = "ws://127.0.0.1:4400/wstest?id=" // WebSocket 服务端地址
	totalClients  = 100000                           // 总连接数
	concurrency   = 500                              // 每批并发连接数
	delayPerBatch = 1000 * time.Millisecond          // 每批连接之间的延迟
)

var (
	successCount int64 // 成功连接数
	failureCount int64 // 失败连接数
	mutex        sync.Mutex
	wg           sync.WaitGroup
)

func main() {
	startTime := time.Now()

	log.Println("开始 WebSocket 性能测试...")
	clientChan := make(chan struct{}, concurrency)

	for i := 0; i < totalClients; i++ {
		clientChan <- struct{}{}
		wg.Add(1)

		go func(clientID int) {
			defer wg.Done()
			defer func() { <-clientChan }()

			err := connectWebSocket(clientID)
			if err != nil {
				log.Printf("客户端[%d]连接失败: %v\n", clientID, err)
				mutex.Lock()
				failureCount++
				mutex.Unlock()
			} else {
				mutex.Lock()
				successCount++
				mutex.Unlock()
			}
		}(i)

		// 控制连接速率
		if i > 0 && i%concurrency == 0 {
			time.Sleep(delayPerBatch)
		}
	}

	// 等待所有客户端完成
	wg.Wait()
	duration := time.Since(startTime)

	log.Printf("测试完成: 成功连接数: %d, 失败连接数: %d, 总耗时: %s\n", successCount, failureCount, duration)
}

// connectWebSocket 尝试建立一个 WebSocket 连接
func connectWebSocket(clientID int) error {
	dialer := websocket.Dialer{}
	header := http.Header{}

	// 添加标识符到 URL 或头部 (可选)
	url := fmt.Sprintf("%s?id=%d", serverURL, clientID)

	// 发起 WebSocket 连接
	conn, _, err := dialer.Dial(url, header)
	if err != nil {
		return err
	}
	defer conn.Close()

	// 测试发送和接收消息
	if err := conn.WriteMessage(websocket.TextMessage, []byte("ping")); err != nil {
		return err
	}

	_, message, err := conn.ReadMessage()
	if err != nil {
		return err
	}
	log.Printf("客户端[%d]收到消息: %s\n", clientID, string(message))
	return nil
}

测试条件 16g内存 4bossgroup 64worker
在这里插入图片描述

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐