一、Websocket使用步骤

在Spring Boot中使用WebSocket是一个常见的需求,因为WebSocket提供了一种在客户端和服务器之间进行全双工通信的方式。Spring Boot通过Spring的WebSocket支持,使得在应用程序中集成WebSocket变得非常简单。

以下是在Spring Boot中使用WebSocket的基本步骤:

1、添加依赖:
首先,你需要在你的pom.xml(如果你使用Maven)或build.gradle(如果你使用Gradle)中添加WebSocket的依赖。对于Spring Boot项目,你通常只需要添加spring-boot-starter-websocket依赖。

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

2、配置WebSocket:
接下来,你需要创建一个配置类实现接口WebSocketConfigurer来启用WebSocket,并配置相关的端点和消息代理。使用@EnableWebSocket注解来启用WebSocket功能。

3、创建控制器:
一旦WebSocket被配置,你就可以创建一个控制器来处理WebSocket消息。
创建一个class类,集成抽象类AbstractWebSocketHandler,或者使用注解@ServerEndpoint来声明Websocket Endpoint

4、客户端连接:
最后,你需要在客户端连接到WebSocket服务器。这可以通过使用WebSocket API来实现。

二、示例1:继承抽象类 AbstractWebSocketHandler

后端代码

1、创建Websocket处理类Ws1Handler,继承抽象类AbstractWebSocketHandler

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import org.thymeleaf.util.StringUtils;

import static com.hj.springboot3.ws.demo1.Ws1Pool.broadcast;

/**
 * websocket事件处理
 * <p>
 * 链接:/ws/demo1?userId=xxxx
 */
public class Ws1Handler extends AbstractWebSocketHandler {
    private static final Logger logger = LoggerFactory.getLogger(Ws1Handler.class);

    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        Ws1Pool.add(session);
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        Ws1Pool.remove(session);
    }


    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        if (StringUtils.equals("ping", message.getPayload())) {
            // 心跳消息
            session.sendMessage(new TextMessage("pong"));
            return;
        }
        logger.info("receive Msg :" + message.getPayload());
        TextMessage msg = new TextMessage(message.getPayload());
        // 回发信息 给 js前端
        session.sendMessage(msg);
    }

}

消息对象VO

@Data
@NoArgsConstructor
@AllArgsConstructor
public class WsMsgVo {
	private String text;
	private Long userId;
}

将请求参数字符串转换成map 工具类

public class ParamUtil {

    /** 将请求参数字符串转换成map */
    public static Map<String, String> parser(String queryString) {

        Map<String, String> map = new HashMap<String, String>();
        if (StringUtils.isNotBlank(queryString)) {
            String[] params = queryString.split("&");
            for (String p : params) {
                String[] strs = p.split("=");
                if (strs.length == 2) {
                    map.put(strs[0], strs[1]);
                }
            }
        }
        return map;
    }
}

2、创建websocket链接池

存储所有在线用户链接,并实现发送消息和广播消息的功能;使用异步线给前端发送消息

/**
 * websocket链接池
 */
public class Ws1Pool {
    private static final Logger logger = LoggerFactory.getLogger(Ws1Pool.class);
    private static final Map<String, WebSocketSession> pool = new ConcurrentHashMap<>();
    private static final Map<Long, List<String>> userMap = new ConcurrentHashMap<>();
    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void add(WebSocketSession inbound) {
        pool.put(inbound.getId(), inbound);
        Map<String, String> map = ParamUtil.parser(inbound.getUri().getQuery());
        Long userId = Long.valueOf(map.get("userId"));
        logger.info("add userId:{}", userId);
        List<String> lstInBound = userMap.computeIfAbsent(userId, k -> new ArrayList<>());
        lstInBound.add(inbound.getId());
        logger.info("add connetion {},total size {}", inbound.getId(), pool.size());
    }

    public static void remove(WebSocketSession socket) {
        String sessionId = socket.getId();
        List<String> lstInBound = null;
        Map<String, String> map = ParamUtil.parser(socket.getUri().getQuery());
        Long userId = Long.valueOf(map.get("userId"));
        logger.info("remove userId:{}", userId);
        if (StringUtils.isNotBlank(sessionId)) {
            lstInBound = userMap.get(userId);
            if (lstInBound != null) {
                lstInBound.remove(sessionId);
                if (lstInBound.isEmpty()) {
                    userMap.remove(userId);
                }
            }
        }

        pool.remove(sessionId);
        logger.info("remove connetion {},total size {}", sessionId, pool.size());
    }

    /** 推送信息 */
    public static void broadcast(WsMsgVo vo) {
        Long userId = vo.getUserId();
        List<String> lstInBoundId;
        if (userId == null || userId == 0L) {
            // 发送给所有人
            lstInBoundId = userMap.values().stream().flatMap(List::stream).collect(Collectors.toList());
        } else {
            lstInBoundId = userMap.get(userId);
        }
        if (lstInBoundId == null || lstInBoundId.isEmpty()) {
            return;
        }
        threadPool.execute(() -> {
            try {
                for (String id : lstInBoundId) {
                    // 发送给指定用户
                    WebSocketSession connection = pool.get(id);
                    if (connection != null) {
                        synchronized (connection) {
                            TextMessage msg = new TextMessage(vo.getText());
                            connection.sendMessage(msg);
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("broadcast error: userId:{}", userId, e);
            }
        });
    }

}

3、创建Websocket配置类,并注册地址/ws/demo1,将地址和处理类绑定

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

/**
 * Websocket配置
 */
@Configuration
@EnableWebSocket
public class Ws1Config implements WebSocketConfigurer {

	@Override
	public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
		registry.addHandler(ws1Handler(), "/ws/demo1").setAllowedOrigins("*");
	}

	@Bean
	public Ws1Handler ws1Handler() {
		return new Ws1Handler();
	}

}

前端代码

在这段代码中,我们首先创建了一个新的WebSocket对象,并传入了WebSocket服务的URL。然后,我们为这个WebSocket连接添加了四个事件监听器:

  1. onopen:当WebSocket连接打开时触发。
  2. onmessage:当从服务器接收到消息时触发。
  3. onerror:当WebSocket发生错误时触发。
  4. onclose:当WebSocket连接关闭时触发。

你可以根据自己的需求,在这些事件监听器中添加相应的逻辑。例如,在onopen事件监听器中发送一个消息给服务器,或者在onmessage事件监听器中处理从服务器接收到的消息。

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org"
      xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout">
<head>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>websocket demo1</title>
</head>
<body>
<div>
    <div id="content">
        <div>信息面板</div>
    </div>
    <input type="text" id="input" placeholder="请输入内容" />
    <button id="send" onclick="doSend()">发送</button>

</div>
<script type="text/javascript">
    // 创建一个新的WebSocket并连接到指定的URL
    var socket = new WebSocket('ws://localhost:8080/ws/demo1?userId=001');

    // 当WebSocket连接打开时触发
    socket.onopen = function(event) {
        console.log('Connection opened');

        // 可以选择在这里发送一些数据到服务器
        socket.send('Hello, server!');
    };

    // 当从服务器接收到数据时触发
    socket.onmessage = function(event) {
        if (event.data == null || event.data == '' || "pong" == event.data) {
            //心跳消息
            console.log("Info: 心跳消息");
        } else {
            console.log('Message from server ', event.data);
            var div_msg = document.createElement('div');
            div_msg.textContent = event.data;
            document.getElementById('content').appendChild(div_msg)
        }
    };

    // 当WebSocket连接关闭时触发
    socket.onclose = function(event) {
        console.log('Connection closed');
    };

    // 当WebSocket连接发生错误时触发
    socket.onerror = function(error) {
        console.error('WebSocket Error:', error);
    };
    function doSend(){
        var input_dom = document.getElementById('input')
        var value = input_dom.value;
        input_dom.value=''
        input_dom.focus()
        socket.send(value);
    }

</script>

</body>
</html>

三、示例2:使用注解@ServerEndpoint

后端代码

1、创建Websocket处理类Ws3Handler,并使用注解@ServerEndpoint声明

import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.thymeleaf.util.StringUtils;

import java.io.IOException;

/**
 * websocket事件处理
 * <p>
 * 链接:/ws/demo3?userId=xxxx
 */
@Component
@ServerEndpoint("/ws/demo3")
public class Ws3Handler {
    private static final Logger logger = LoggerFactory.getLogger(Ws3Handler.class);

    @OnOpen
    public void onOpen(Session session) {
        Ws3Pool.add(session);
    }

    @OnClose
    public void OnClose(Session session) {
        Ws3Pool.remove(session);
    }

    @OnMessage
    public void onMessage(Session session, String message) throws IOException {
        if (StringUtils.equals("ping", message)) {
            // 心跳消息
            session.getBasicRemote().sendText("pong");
            return;
        }
        logger.info("receive Msg :" + message);
//        session.getBasicRemote().sendText(message);
        Ws3Pool.broadcast(new WsMsgVo(message, 0L));
    }

    /**
     * 错误时调用
     */
    @OnError
    public void onError(Session session, Throwable throwable) {
        throwable.printStackTrace();
    }

}

2、创建websocket链接池

存储所有在线用户链接,并实现发送消息和广播消息的功能;使用异步线给前端发送消息

/**
 * websocket链接池
 */
public class Ws3Pool {
    private static final Logger logger = LoggerFactory.getLogger(Ws3Pool.class);
    private static Map<String, Session> pool = new ConcurrentHashMap<>();
    private static final Map<Long, List<String>> userMap = new ConcurrentHashMap<>();
    private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);

    public static void add(Session session) {
        pool.put(session.getId(), session);
        Map<String, String> map = ParamUtil.parser(session.getQueryString());
        Long userId = Long.valueOf(map.get("userId"));
        logger.info("add userId:{}", userId);
        List<String> lstInBound = userMap.computeIfAbsent(userId, k -> new ArrayList<>());
        lstInBound.add(session.getId());
        logger.info("add connetion {},total size {}", session.getId(), pool.size());
    }

    public static void remove(Session session) {
        String sessionId = session.getId();
        List<String> lstInBound = null;
        Map<String, String> map = ParamUtil.parser(session.getQueryString());
        Long userId = Long.valueOf(map.get("userId"));
        logger.info("remove userId:{}", userId);
        if (StringUtils.isNotBlank(sessionId)) {
            lstInBound = userMap.get(userId);
            if (lstInBound != null) {
                lstInBound.remove(sessionId);
                if (lstInBound.isEmpty()) {
                    userMap.remove(userId);
                }
            }
        }

        pool.remove(sessionId);
        logger.info("remove connetion {},total size {}", sessionId, pool.size());
    }

    /** 推送信息 */
    public static void broadcast(WsMsgVo vo) {
        Long userId = vo.getUserId();
        List<String> lstInBoundId;
        if (userId == null || userId == 0L) {
            // 发送给所有人
            lstInBoundId = userMap.values().stream().flatMap(List::stream).collect(Collectors.toList());
        } else {
            lstInBoundId = userMap.get(userId);
        }
        if (lstInBoundId == null || lstInBoundId.isEmpty()) {
            return;
        }
        threadPool.execute(() -> {
            try {
                for (String id : lstInBoundId) {
                    // 发送给指定用户
                    Session session = pool.get(id);
                    if (session != null) {
                        synchronized (session) {
                            session.getBasicRemote().sendText(vo.getText());
                        }
                    }
                }
            } catch (Exception e) {
                logger.error("broadcast error: userId:{}", userId, e);
            }
        });
    }

}

3、创建Websocket配置类,并注册 ServerEndpointExporter

ServerEndpointExporter 是 Spring Boot 中的一个重要组件,用于导出 WebSocket 服务器端点配置。在 Spring 应用程序中,特别是当你使用 Spring Boot 时,ServerEndpointExporter 能够自动注册使用 @ServerEndpoint 注解声明的 WebSocket 端点。

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocket
public class Ws3Config {

    /**
     * ServerEndpointExporter 作用
     * 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }
}

在这个配置类中,我们定义了一个 serverEndpointExporter 方法,它返回一个 ServerEndpointExporter 的实例。这样,Spring 容器就会管理这个 bean,并自动注册所有使用 @ServerEndpoint 注解声明的 WebSocket 端点。

前端代码

(可以和上面的例子一致)

四、前端代码封装

为了更方便的使用websocket,以及确保在后端服务器重启后,前端websocket能够自动重连,我们可以增加心跳机制

1、创建 ws.handler.js

var WsHander = {};
WsHander.socket = null;
WsHander.connect = (function (host) {
    WsHander.host = host;
    if ("WebSocket" in window) {
        WsHander.socket = new WebSocket(host);
    } else if ("MozWebSocket" in window) {
        WsHander.socket = new MozWebSocket(host);
    } else {
        console.log("Error: WebSocket is not supported by this browser.");
        return;
    }
    WsHander.socket.onopen = function () {
        console.log("Info: websocket已启动.");
        // 心跳检测重置
        heartCheck.reset().start(WsHander.socket);
    };
    WsHander.socket.onclose = function () {
        console.log("Info: websocket已关闭.");
        WsHander.reconnect();
    };
    WsHander.socket.onmessage = function (message) {
        heartCheck.reset().start(WsHander.socket);
        if (message.data == null || message.data === '' || "pong" === message.data) {
            //心跳消息
            console.log("Info: 心跳消息");
        } else {
            // 收到 Websocket消息,执行业务操作
            if (doOnWsMessage){
                doOnWsMessage(message.data);
            }
        }
    };
});
WsHander.reconnect = function (){
    WsHander.connect(WsHander.host);
}
WsHander.initialize = function (userId, uri) {
    WsHander.currUserId = userId;
    if (WsHander.currUserId) {
        if (window.location.protocol === "http:") {
            WsHander.connect("ws://" + window.location.host + uri+"?userId=" + WsHander.currUserId);
        } else {
            WsHander.connect("wss://" + window.location.host + uri+"?userId=" + WsHander.currUserId);
        }
    }
};
WsHander.sendMessage = (function (message) {
    if (message !== "") {
        WsHander.socket.send(message);
    }
});



//心跳检测
var heartCheck = {
    timeout: 5000,// 5秒
    timeoutObj: null,
    reset: function () {
        clearTimeout(this.timeoutObj);
        return this;
    },
    start: function (ws) {
        var self = this;
        this.timeoutObj = setTimeout(function () {
            // 这里发送一个心跳,后端收到后,返回一个心跳消息,
            // onmessage拿到返回的心跳就说明连接正常
            ws.send("ping");
        }, this.timeout);
    }
}

2、在html页面中使用

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org"
      xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout">
<head>
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>websocket demo1</title>
</head>
<body>
<div>
    <div id="content">
        <div>信息面板</div>
    </div>
    <input type="text" id="input" placeholder="请输入内容" />
    <button id="send" onclick="doSend()">发送</button>

</div>
<script src="../static/ws.handler.js" ></script>
<script type="text/javascript">

    WsHander.initialize("001", "/ws/demo3")

    // 当从服务器接收到数据时触发
    function doOnWsMessage(messageData){
        console.log('Message from server ', messageData);
        var div_msg = document.createElement('div');
        div_msg.textContent = messageData;
        document.getElementById('content').appendChild(div_msg)
    }

    function doSend(){
        var input_dom = document.getElementById('input')
        var value = input_dom.value;
        input_dom.value=''
        input_dom.focus()
        WsHander.sendMessage(value)
    }

</script>

</body>
</html>
Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐