简单记录一下使用netty方式实现SSE的服务端功能

简要说明

Server-Sent Events (SSE) 是一种用于在客户端和服务器之间建立单向通信的技术。
它允许服务器主动向客户端推送实时更新,而不需要客户端不断地请求数据。
Server-Sent Events (SSE) 的流行可以追溯到 HTML5 的引入,

最大特点:

  • 前端JS原生支持
  • 只接受服务端数据,单向通讯
  • 原生支持断开重连

他和我们现在经常接触的 websocket,mqtt,类rabbitmq 有说明区别,
同样是客户端服务端的数据访问,同样用于取代客户端轮询访问方式,他们有审美不一样或者说使用场景是什么,下面表格简要说明一下:

技术SSE (Server-Sent Events)WebSocketMQTT类RabbitMQ
类型单向通信双向通信发布/订阅模式消息队列
协议基于 HTTP独立于 HTTP轻量级消息传递协议支持多种协议(如 AMQP)
使用场景实时更新(如新闻/股票信息推送)实时双向通信(如聊天)物联网设备通信,例如硬件设备主动上报给服务端信号信息可靠消息传递和任务队列,用于服务端系统之间通讯
优点- 简单易用- 低延迟- 轻量级- 强大的消息路由功能
- 自动重连- 支持双向通信- 支持 QoS 级别- 提供持久化消息存储
- 支持文本数据推送- 支持二进制数据传输- 发布/订阅解耦- 支持多种消息模式
缺点- 仅支持单向通信- 实现相对复杂- 需要 MQTT 代理- 设置和管理相对复杂
- 不支持二进制数据- 需要额外的安全措施- 对简单实时应用复杂- 可能需要更多资源

基于Netty

关于java版本的SSE服务端实现,网上大多举例不正确或者说并没有实现SSE的技术特性 (例如网上举例说 创建一个 servlet,你会发现基本上是http轮询,因为一次service请求后,IO通讯就断开了,前端只会不断重连请求)。

Netty 强大健壮的异步IO通讯框架。

功能需求

  • 在SpringBoot项目中创建SSE服务端功能
  • 基于Netty框架
  • 前端样例可自由断开或连接
  • 支持携带Get请求的参数
  • 前端样例支持断开重连,连接状态展示

后端代码

1. 创建一个SpringBoot 应用

这里测试的SpringBoot 版本是 2.6.14
使用的JDK版本是 17

2. 创建服务端功能

引入netty Maven依赖

        <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.113.Final</version>
        </dependency>

创建 SSE 服务端的EventLoopGroup,假设绑定端口 8849


import com.middol.yfagv.model.oms.properties.SseProperties;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;

import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;


/**
 * SSE服务 server sent events
 *
 * @author admin
 */
@Slf4j
@Service
public class SseServer {
    EventLoopGroup bossGroup = new NioEventLoopGroup();
    EventLoopGroup workerGroup = new NioEventLoopGroup();
    private boolean started = false;

    @PostConstruct
    public void init() {
        log.debug("SSE服务初始化完毕");
    }

    public void shutdown() {
        log.debug("SSE服务 Shutting down server...");
        // 优雅关闭 workerGroup
        if (!workerGroup.isShutdown()) {
            workerGroup.shutdownGracefully(5, 10, TimeUnit.SECONDS);
        }
        // 优雅关闭 bossGroup
        if (!bossGroup.isShutdown()) {
            bossGroup.shutdownGracefully(5, 10, TimeUnit.SECONDS);
        }
        log.debug("SSE服务 Server shut down gracefully.");
    }

    @Async
    public void start() throws Exception {
        if (started) {
            return;
        }
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new HttpServerCodec());
                            ch.pipeline().addLast(new HttpObjectAggregator( 1024 * 1024));
                            ch.pipeline().addLast(new ChunkedWriteHandler());
                            ch.pipeline().addLast(new SseHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);

            // 绑定端口并同步
            ChannelFuture f = b.bind(8849).sync();
            log.debug("SSE服务启动完成,绑定端口:{}", 8849);
            started = true;
            // 添加关闭钩子
            Runtime.getRuntime().addShutdownHook(new Thread(this::shutdown));
            // 等待服务器通道关闭
            f.channel().closeFuture().sync();
        } finally {
            shutdown();
        }
    }
}

创建处理前端请求的 ChannelHandler,这里我们假设只处理url 是 /events的前端请求。

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.*;
import lombok.extern.slf4j.Slf4j;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

/**
 * SSE处理器
 *
 * @author admin
 */
@Slf4j
@ChannelHandler.Sharable
public class SseHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

    private static final String PREFIX = "events";

    // 定义一个 AttributeKey 用于存储 ScheduledFuture
    private static final AttributeKey<ScheduledFuture<?>> SCHEDULED_FUTURE_KEY = AttributeKey.valueOf("scheduledFuture");

    @Override
    public void channelActive(io.netty.channel.ChannelHandlerContext ctx) throws Exception {
        // 获取远程地址
        String remoteAddress = ctx.channel().remoteAddress().toString();
        log.debug(">>>>>>>>>>>>>>>>>>>>>>>>SseHandler: channelActive, remoteAddress={}", remoteAddress);
        super.channelActive(ctx);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 获取远程地址
        String remoteAddress = ctx.channel().remoteAddress().toString();
        log.debug(">>>>>>>>>>>>>>>>>>>>>>>>SseHandler: channelInactive, remoteAddress={}", remoteAddress);
        // 从 ChannelHandlerContext 中获取定时任务并取消
        ScheduledFuture<?> scheduledFuture = ctx.channel().attr(SCHEDULED_FUTURE_KEY).get();
        if (scheduledFuture != null) {
            // 显式取消定时任务
            scheduledFuture.cancel(false);
        }
        super.channelInactive(ctx);
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest request) {
        if (request.method() == HttpMethod.OPTIONS) {
            // 处理预检请求
            FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
            response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
            response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, OPTIONS");
            response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Content-Type");
            ctx.writeAndFlush(response);
            return;
        }

        if (HttpUtil.is100ContinueExpected(request)) {
            send100Continue(ctx);
        }

        // 检查请求的 URI 是否以指定的前缀开始
        String uri = request.uri();
        // 解析 GET 参数
        QueryStringDecoder queryStringDecoder = new QueryStringDecoder(uri);
        Map<String, List<String>> parameters = queryStringDecoder.parameters();
        if (parameters != null && !parameters.isEmpty()) {
            log.debug(">>>>>>>>>>>>>>>>>>>>>>>>SseHandler: parameters={}", parameters);
        }
        if (!uri.startsWith("/" + PREFIX)) {
            ctx.writeAndFlush(new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND));
            return;
        }

        // 设置 CORS 头
        HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
        response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/event-stream");
        response.headers().set(HttpHeaderNames.CACHE_CONTROL, "no-cache");
        response.headers().set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);

        // CORS 头
        response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_ORIGIN, "*"); // 允许所有域
        response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_METHODS, "GET, OPTIONS"); // 允许的请求方法
        response.headers().set(HttpHeaderNames.ACCESS_CONTROL_ALLOW_HEADERS, "Content-Type"); // 允许的请求头

        ctx.write(response);

        // 发送初始 SSE 事件
        sendSseEvent(ctx, "Connected to SSE server");

        // 定期发送 SSE 事件
        long initialDelay = 0L;
        long period = 5L;
        ctx.executor().scheduleAtFixedRate(
                () -> sendSseEvent(ctx, "CurrentTimeMillis: " + System.currentTimeMillis()),
                initialDelay, period, java.util.concurrent.TimeUnit.SECONDS);

        // 将定时任务的引用存储在 ChannelHandlerContext 的属性中
        ctx.channel().attr(SCHEDULED_FUTURE_KEY).set(scheduledFuture);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 获取远程地址
        String remoteAddress = ctx.channel().remoteAddress().toString();
        log.error(">>>>>>>>>>>>>>>>>>>>>>>>SseHandler: exceptionCaught, remoteAddress={}", remoteAddress, cause);
        // 关闭连接,自动释放相关资源
        ctx.close();
    }

    protected static void send100Continue(ChannelHandlerContext ctx) {
        FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.CONTINUE);
        ctx.write(response);
    }

    protected void sendSseEvent(ChannelHandlerContext ctx, String data) {
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(("data: " + data + "\n\n").getBytes(StandardCharsets.UTF_8));
        ctx.writeAndFlush(new DefaultHttpContent(buffer));
    }

}

以上核心业务方法是 channelRead0, 里面设置了可以跨越,
为什么要跨越?原因是netty服务端绑定的端口和本身 SpringBoot的应用端口不是一样,前端页面可能即要请求SpringBoot的业务接口也需要SSE服务接口。

以上是简单模拟向前端页面推送时间戳信息,每隔5秒一次,如果我要推送Bean方法里面的业务数据该如何做?

最简单方法是修改sendSseEvent里面的业务逻辑,使用SpringUtil获得Bean

    protected void sendSseEvent(ChannelHandlerContext ctx, String data) {
        ByteBuf buffer = ctx.alloc().buffer();
        buffer.writeBytes(("data: " + JSONObject.toJSONString(SpringUtil.getBean(YourService.class).querySome() +  "\n\n")).getBytes(StandardCharsets.UTF_8));
        
        ctx.writeAndFlush(new DefaultHttpContent(buffer));
    }

最后写一个手动启动Netty服务的Controller方法,这个主要用于测试,可以设置SpringBoot启动时自动启动Netty服务。

    @Lazy
    @Resource
    private SseServer sseServer;

    @ApiOperation(value = "启动SSE服务")
    @PostMapping("startSseServer")
    public ResponseVO<String> startSseServer() {
        try {
            sseServer.start();
        } catch (Exception e) {
            return ResponseVO.fail(e.getMessage(), DateUtil.now());
        }
        return ResponseVO.success(ResponseVO.SUCCESS_MSG, DateUtil.now());
    }

3. 创建前端功能

前端代码,我直接ChatGPT帮我生成一个用于测试SSE功能的页面:

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSE Example</title>
    <script>
        let eventSource; // 声明 eventSource 变量
        let isConnected = false; // 连接状态标志

        function toggleEventSource() {
            const button = document.getElementById('toggleButton');
            const inputUrl = document.getElementById('urlInput').value.trim(); // 获取输入框中的 URL

            if (!inputUrl) {
                alert('请输入有效的 URL');
                return;
            }

            if (isConnected) {
                eventSource.close(); // 关闭连接
                button.innerText = "开启 EventSource"; // 更新按钮文本
                button.classList.remove('close'); // 移除关闭状态的样式
                button.classList.add('open'); // 添加开启状态的样式
                document.getElementById('status').innerText = "Disconnected from SSE server.";
                document.getElementById('status').style.color = "red";
                isConnected = false; // 更新连接状态
            } else {
                eventSource = new EventSource(inputUrl); // 使用输入框中的 URL 创建新的 EventSource 实例

                eventSource.onopen = function() {
                    console.log("Connection to server opened.");
                    const status = document.getElementById('status');
                    status.innerText = "Connected to SSE server.";
                    status.style.color = "green";
                    status.style.fontWeight = "bold";
                    button.innerText = "关闭 EventSource"; // 更新按钮文本
                    button.classList.remove('open'); // 移除开启状态的样式
                    button.classList.add('close'); // 添加关闭状态的样式
                    isConnected = true; // 更新连接状态
                };

                eventSource.onmessage = function(event) {
                    console.log("Received message: " + event.data);
                    const messagesDiv = document.getElementById('messages');
                    messagesDiv.innerHTML += `<p>${event.data}</p>`;

                    // 检查行数并在超过100行时清空内容
                    const lines = messagesDiv.getElementsByTagName('p').length;
                    if (lines > 100) {
                        messagesDiv.innerHTML = ''; // 清空内容
                        console.log("Messages cleared after exceeding 100 lines.");
                    }
                };

                eventSource.onerror = function() {
                    console.error("EventSource failed.");
                    const status = document.getElementById('status');
                    status.innerText = "Exception: Connection to SSE server lost.";
                    status.style.color = "red";
                    status.style.fontWeight = "bold";
                    button.innerText = "开启 EventSource"; // 更新按钮文本
                    button.classList.remove('close'); // 移除关闭状态的样式
                    button.classList.add('open'); // 添加开启状态的样式
                    isConnected = false; // 更新连接状态
                };
            }
        }
    </script>
    <style>
        body {
            font-family: Arial, sans-serif;
            margin: 20px;
            padding: 20px;
            background-color: #f4f4f4;
            border-radius: 8px;
        }
        #toggleButton {
            padding: 10px 20px; /* 增加内边距 */
            font-size: 16px; /* 增大字体 */
            color: white; /* 字体颜色 */
            border: none; /* 去掉边框 */
            border-radius: 5px; /* 圆角 */
            cursor: pointer; /* 鼠标悬停时显示手型 */
            transition: background-color 0.3s; /* 背景颜色过渡效果 */
        }
        #toggleButton.open {
            background-color: #007bff; /* 开启状态按钮背景颜色 */
        }
        #toggleButton.open:hover {
            background-color: #0056b3; /* 开启状态悬停时的背景颜色 */
        }
        #toggleButton.open:active {
            background-color: #004080; /* 开启状态点击时的背景颜色 */
        }
        #toggleButton.close {
            background-color: #f17b87; /* 关闭状态按钮背景颜色 */
        }
        #toggleButton.close:hover {
            background-color: #d9534f; /* 关闭状态悬停时的背景颜色 */
        }
        #toggleButton.close:active {
            background-color: #c9302c; /* 关闭状态点击时的背景颜色 */
        }
        #urlInput {
            width: 400px; /* 输入框宽度 */
            padding: 8px; /* 增加内边距 */
            font-size: 16px; /* 增大字体 */
            margin-right: 10px; /* 添加与按钮的间距 */
        }
        #messages {
            margin-top: 20px;
            padding: 10px;
            background-color: #fff;
            border: 1px solid #ccc;
            border-radius: 4px;
            max-height: 300px;
            overflow-y: auto;
        }
        p {
            margin: 5px 0;
            font-size: 18px; /* 增大内容字体大小 */
        }
        #status {
            font-size: 20px; /* 增大状态字体大小 */
        }
    </style>
</head>
<body>
<h1>SSE Example</h1>
<label for="urlInput"></label><input type="text" id="urlInput" placeholder="Enter SSE URL" value="http://localhost:8849/events"/> <!-- URL 输入框 -->
<button id="toggleButton" class="open" onclick="toggleEventSource()">开启 EventSource</button>
<br/><br/><br/>
<div id="status">请点击 [开启 EventSource] 按钮开启 EventSource。</div>
<br/>
<div id="messages"></div>
</body>
</html>

4. 测试SSE

启动SpringBoot服务,启动Netty服务

谷歌浏览器输入 http://localhost:8088/test1.html
这里的8088是SpringBoot应用端口,test1.html 是以上创建的页面,放在SpringBoot 的静态资源文件目录下的页面文件。
在这里插入图片描述
点击 【开启 EventSource】
在这里插入图片描述
点击 关闭 按钮
在这里插入图片描述
再次开启,然后关闭后台服务,然后再次开启后台服务测试 SSE自动重连
在这里插入图片描述

在这里插入图片描述

封装为组件

依据Netty的高性能实现的SSE服务端功能,基本上实现了SSE的所有技术特点,在理想情况下,一台 普通的 32GB 内存的服务器可以支持数几十万个前端连接,基本满足中小企业业务需求量。

使用SSE其实针对中小企业来说最大的优点其实是部署的便捷性,无需引入其他消息队列中间件等服务。

以上是测试样例,以下是封装成 Spring-boot-starter 组件,代码仓库如下:

https://github.com/dwhgygzt/sse-spring-boot-starter

https://gitee.com/banana6/sse-spring-boot-starter

欢迎下载测试交流!

Logo

开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!

更多推荐