【SpringBoot3】双向实时通讯 websocket
在Spring Boot中使用WebSocket是一个常见的需求,因为WebSocket提供了一种在客户端和服务器之间进行全双工通信的方式。Spring Boot通过Spring的WebSocket支持,使得在应用程序中集成WebSocket变得非常简单。
文章目录
一、Websocket使用步骤
在Spring Boot中使用WebSocket是一个常见的需求,因为WebSocket提供了一种在客户端和服务器之间进行全双工通信的方式。Spring Boot通过Spring的WebSocket支持,使得在应用程序中集成WebSocket变得非常简单。
以下是在Spring Boot中使用WebSocket的基本步骤:
1、添加依赖:
首先,你需要在你的pom.xml(如果你使用Maven)或build.gradle(如果你使用Gradle)中添加WebSocket的依赖。对于Spring Boot项目,你通常只需要添加spring-boot-starter-websocket依赖。
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
2、配置WebSocket:
接下来,你需要创建一个配置类实现接口WebSocketConfigurer
来启用WebSocket,并配置相关的端点和消息代理。使用@EnableWebSocket
注解来启用WebSocket功能。
3、创建控制器:
一旦WebSocket被配置,你就可以创建一个控制器来处理WebSocket消息。
创建一个class类,集成抽象类AbstractWebSocketHandler
,或者使用注解@ServerEndpoint
来声明Websocket Endpoint
4、客户端连接:
最后,你需要在客户端连接到WebSocket服务器。这可以通过使用WebSocket API来实现。
二、示例1:继承抽象类 AbstractWebSocketHandler
后端代码
1、创建Websocket处理类Ws1Handler,继承抽象类AbstractWebSocketHandler
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.socket.CloseStatus;
import org.springframework.web.socket.TextMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;
import org.thymeleaf.util.StringUtils;
import static com.hj.springboot3.ws.demo1.Ws1Pool.broadcast;
/**
* websocket事件处理
* <p>
* 链接:/ws/demo1?userId=xxxx
*/
public class Ws1Handler extends AbstractWebSocketHandler {
private static final Logger logger = LoggerFactory.getLogger(Ws1Handler.class);
@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
Ws1Pool.add(session);
}
@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
Ws1Pool.remove(session);
}
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
if (StringUtils.equals("ping", message.getPayload())) {
// 心跳消息
session.sendMessage(new TextMessage("pong"));
return;
}
logger.info("receive Msg :" + message.getPayload());
TextMessage msg = new TextMessage(message.getPayload());
// 回发信息 给 js前端
session.sendMessage(msg);
}
}
消息对象VO
@Data
@NoArgsConstructor
@AllArgsConstructor
public class WsMsgVo {
private String text;
private Long userId;
}
将请求参数字符串转换成map 工具类
public class ParamUtil {
/** 将请求参数字符串转换成map */
public static Map<String, String> parser(String queryString) {
Map<String, String> map = new HashMap<String, String>();
if (StringUtils.isNotBlank(queryString)) {
String[] params = queryString.split("&");
for (String p : params) {
String[] strs = p.split("=");
if (strs.length == 2) {
map.put(strs[0], strs[1]);
}
}
}
return map;
}
}
2、创建websocket链接池
存储所有在线用户链接,并实现发送消息和广播消息的功能;使用异步线给前端发送消息
/**
* websocket链接池
*/
public class Ws1Pool {
private static final Logger logger = LoggerFactory.getLogger(Ws1Pool.class);
private static final Map<String, WebSocketSession> pool = new ConcurrentHashMap<>();
private static final Map<Long, List<String>> userMap = new ConcurrentHashMap<>();
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void add(WebSocketSession inbound) {
pool.put(inbound.getId(), inbound);
Map<String, String> map = ParamUtil.parser(inbound.getUri().getQuery());
Long userId = Long.valueOf(map.get("userId"));
logger.info("add userId:{}", userId);
List<String> lstInBound = userMap.computeIfAbsent(userId, k -> new ArrayList<>());
lstInBound.add(inbound.getId());
logger.info("add connetion {},total size {}", inbound.getId(), pool.size());
}
public static void remove(WebSocketSession socket) {
String sessionId = socket.getId();
List<String> lstInBound = null;
Map<String, String> map = ParamUtil.parser(socket.getUri().getQuery());
Long userId = Long.valueOf(map.get("userId"));
logger.info("remove userId:{}", userId);
if (StringUtils.isNotBlank(sessionId)) {
lstInBound = userMap.get(userId);
if (lstInBound != null) {
lstInBound.remove(sessionId);
if (lstInBound.isEmpty()) {
userMap.remove(userId);
}
}
}
pool.remove(sessionId);
logger.info("remove connetion {},total size {}", sessionId, pool.size());
}
/** 推送信息 */
public static void broadcast(WsMsgVo vo) {
Long userId = vo.getUserId();
List<String> lstInBoundId;
if (userId == null || userId == 0L) {
// 发送给所有人
lstInBoundId = userMap.values().stream().flatMap(List::stream).collect(Collectors.toList());
} else {
lstInBoundId = userMap.get(userId);
}
if (lstInBoundId == null || lstInBoundId.isEmpty()) {
return;
}
threadPool.execute(() -> {
try {
for (String id : lstInBoundId) {
// 发送给指定用户
WebSocketSession connection = pool.get(id);
if (connection != null) {
synchronized (connection) {
TextMessage msg = new TextMessage(vo.getText());
connection.sendMessage(msg);
}
}
}
} catch (Exception e) {
logger.error("broadcast error: userId:{}", userId, e);
}
});
}
}
3、创建Websocket配置类,并注册地址/ws/demo1
,将地址和处理类绑定
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;
/**
* Websocket配置
*/
@Configuration
@EnableWebSocket
public class Ws1Config implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(ws1Handler(), "/ws/demo1").setAllowedOrigins("*");
}
@Bean
public Ws1Handler ws1Handler() {
return new Ws1Handler();
}
}
前端代码
在这段代码中,我们首先创建了一个新的WebSocket
对象,并传入了WebSocket服务的URL。然后,我们为这个WebSocket连接添加了四个事件监听器:
onopen
:当WebSocket连接打开时触发。onmessage
:当从服务器接收到消息时触发。onerror
:当WebSocket发生错误时触发。onclose
:当WebSocket连接关闭时触发。
你可以根据自己的需求,在这些事件监听器中添加相应的逻辑。例如,在onopen
事件监听器中发送一个消息给服务器,或者在onmessage
事件监听器中处理从服务器接收到的消息。
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org"
xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout">
<head>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>websocket demo1</title>
</head>
<body>
<div>
<div id="content">
<div>信息面板</div>
</div>
<input type="text" id="input" placeholder="请输入内容" />
<button id="send" onclick="doSend()">发送</button>
</div>
<script type="text/javascript">
// 创建一个新的WebSocket并连接到指定的URL
var socket = new WebSocket('ws://localhost:8080/ws/demo1?userId=001');
// 当WebSocket连接打开时触发
socket.onopen = function(event) {
console.log('Connection opened');
// 可以选择在这里发送一些数据到服务器
socket.send('Hello, server!');
};
// 当从服务器接收到数据时触发
socket.onmessage = function(event) {
if (event.data == null || event.data == '' || "pong" == event.data) {
//心跳消息
console.log("Info: 心跳消息");
} else {
console.log('Message from server ', event.data);
var div_msg = document.createElement('div');
div_msg.textContent = event.data;
document.getElementById('content').appendChild(div_msg)
}
};
// 当WebSocket连接关闭时触发
socket.onclose = function(event) {
console.log('Connection closed');
};
// 当WebSocket连接发生错误时触发
socket.onerror = function(error) {
console.error('WebSocket Error:', error);
};
function doSend(){
var input_dom = document.getElementById('input')
var value = input_dom.value;
input_dom.value=''
input_dom.focus()
socket.send(value);
}
</script>
</body>
</html>
三、示例2:使用注解@ServerEndpoint
后端代码
1、创建Websocket处理类Ws3Handler,并使用注解@ServerEndpoint
声明
import jakarta.websocket.*;
import jakarta.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.thymeleaf.util.StringUtils;
import java.io.IOException;
/**
* websocket事件处理
* <p>
* 链接:/ws/demo3?userId=xxxx
*/
@Component
@ServerEndpoint("/ws/demo3")
public class Ws3Handler {
private static final Logger logger = LoggerFactory.getLogger(Ws3Handler.class);
@OnOpen
public void onOpen(Session session) {
Ws3Pool.add(session);
}
@OnClose
public void OnClose(Session session) {
Ws3Pool.remove(session);
}
@OnMessage
public void onMessage(Session session, String message) throws IOException {
if (StringUtils.equals("ping", message)) {
// 心跳消息
session.getBasicRemote().sendText("pong");
return;
}
logger.info("receive Msg :" + message);
// session.getBasicRemote().sendText(message);
Ws3Pool.broadcast(new WsMsgVo(message, 0L));
}
/**
* 错误时调用
*/
@OnError
public void onError(Session session, Throwable throwable) {
throwable.printStackTrace();
}
}
2、创建websocket链接池
存储所有在线用户链接,并实现发送消息和广播消息的功能;使用异步线给前端发送消息
/**
* websocket链接池
*/
public class Ws3Pool {
private static final Logger logger = LoggerFactory.getLogger(Ws3Pool.class);
private static Map<String, Session> pool = new ConcurrentHashMap<>();
private static final Map<Long, List<String>> userMap = new ConcurrentHashMap<>();
private static final ExecutorService threadPool = Executors.newFixedThreadPool(10);
public static void add(Session session) {
pool.put(session.getId(), session);
Map<String, String> map = ParamUtil.parser(session.getQueryString());
Long userId = Long.valueOf(map.get("userId"));
logger.info("add userId:{}", userId);
List<String> lstInBound = userMap.computeIfAbsent(userId, k -> new ArrayList<>());
lstInBound.add(session.getId());
logger.info("add connetion {},total size {}", session.getId(), pool.size());
}
public static void remove(Session session) {
String sessionId = session.getId();
List<String> lstInBound = null;
Map<String, String> map = ParamUtil.parser(session.getQueryString());
Long userId = Long.valueOf(map.get("userId"));
logger.info("remove userId:{}", userId);
if (StringUtils.isNotBlank(sessionId)) {
lstInBound = userMap.get(userId);
if (lstInBound != null) {
lstInBound.remove(sessionId);
if (lstInBound.isEmpty()) {
userMap.remove(userId);
}
}
}
pool.remove(sessionId);
logger.info("remove connetion {},total size {}", sessionId, pool.size());
}
/** 推送信息 */
public static void broadcast(WsMsgVo vo) {
Long userId = vo.getUserId();
List<String> lstInBoundId;
if (userId == null || userId == 0L) {
// 发送给所有人
lstInBoundId = userMap.values().stream().flatMap(List::stream).collect(Collectors.toList());
} else {
lstInBoundId = userMap.get(userId);
}
if (lstInBoundId == null || lstInBoundId.isEmpty()) {
return;
}
threadPool.execute(() -> {
try {
for (String id : lstInBoundId) {
// 发送给指定用户
Session session = pool.get(id);
if (session != null) {
synchronized (session) {
session.getBasicRemote().sendText(vo.getText());
}
}
}
} catch (Exception e) {
logger.error("broadcast error: userId:{}", userId, e);
}
});
}
}
3、创建Websocket配置类,并注册 ServerEndpointExporter
ServerEndpointExporter
是 Spring Boot 中的一个重要组件,用于导出 WebSocket 服务器端点配置。在 Spring 应用程序中,特别是当你使用 Spring Boot 时,ServerEndpointExporter
能够自动注册使用 @ServerEndpoint
注解声明的 WebSocket 端点。
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
@EnableWebSocket
public class Ws3Config {
/**
* ServerEndpointExporter 作用
* 这个Bean会自动注册使用@ServerEndpoint注解声明的websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
在这个配置类中,我们定义了一个 serverEndpointExporter
方法,它返回一个 ServerEndpointExporter
的实例。这样,Spring 容器就会管理这个 bean,并自动注册所有使用 @ServerEndpoint
注解声明的 WebSocket 端点。
前端代码
(可以和上面的例子一致)
四、前端代码封装
为了更方便的使用websocket,以及确保在后端服务器重启后,前端websocket能够自动重连,我们可以增加心跳机制
1、创建 ws.handler.js
var WsHander = {};
WsHander.socket = null;
WsHander.connect = (function (host) {
WsHander.host = host;
if ("WebSocket" in window) {
WsHander.socket = new WebSocket(host);
} else if ("MozWebSocket" in window) {
WsHander.socket = new MozWebSocket(host);
} else {
console.log("Error: WebSocket is not supported by this browser.");
return;
}
WsHander.socket.onopen = function () {
console.log("Info: websocket已启动.");
// 心跳检测重置
heartCheck.reset().start(WsHander.socket);
};
WsHander.socket.onclose = function () {
console.log("Info: websocket已关闭.");
WsHander.reconnect();
};
WsHander.socket.onmessage = function (message) {
heartCheck.reset().start(WsHander.socket);
if (message.data == null || message.data === '' || "pong" === message.data) {
//心跳消息
console.log("Info: 心跳消息");
} else {
// 收到 Websocket消息,执行业务操作
if (doOnWsMessage){
doOnWsMessage(message.data);
}
}
};
});
WsHander.reconnect = function (){
WsHander.connect(WsHander.host);
}
WsHander.initialize = function (userId, uri) {
WsHander.currUserId = userId;
if (WsHander.currUserId) {
if (window.location.protocol === "http:") {
WsHander.connect("ws://" + window.location.host + uri+"?userId=" + WsHander.currUserId);
} else {
WsHander.connect("wss://" + window.location.host + uri+"?userId=" + WsHander.currUserId);
}
}
};
WsHander.sendMessage = (function (message) {
if (message !== "") {
WsHander.socket.send(message);
}
});
//心跳检测
var heartCheck = {
timeout: 5000,// 5秒
timeoutObj: null,
reset: function () {
clearTimeout(this.timeoutObj);
return this;
},
start: function (ws) {
var self = this;
this.timeoutObj = setTimeout(function () {
// 这里发送一个心跳,后端收到后,返回一个心跳消息,
// onmessage拿到返回的心跳就说明连接正常
ws.send("ping");
}, this.timeout);
}
}
2、在html页面中使用
<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org"
xmlns:layout="http://www.ultraq.net.nz/thymeleaf/layout">
<head>
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>websocket demo1</title>
</head>
<body>
<div>
<div id="content">
<div>信息面板</div>
</div>
<input type="text" id="input" placeholder="请输入内容" />
<button id="send" onclick="doSend()">发送</button>
</div>
<script src="../static/ws.handler.js" ></script>
<script type="text/javascript">
WsHander.initialize("001", "/ws/demo3")
// 当从服务器接收到数据时触发
function doOnWsMessage(messageData){
console.log('Message from server ', messageData);
var div_msg = document.createElement('div');
div_msg.textContent = messageData;
document.getElementById('content').appendChild(div_msg)
}
function doSend(){
var input_dom = document.getElementById('input')
var value = input_dom.value;
input_dom.value=''
input_dom.focus()
WsHander.sendMessage(value)
}
</script>
</body>
</html>
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)