Netty——WebSocket开发
WebSocket是HTML5开始提供的一种浏览器与服务器间进行全双工通信的网络技术,WebSocket通信协议于2011年被IETF定为RFC6455,WebSockeet API被W3C定为标准。在WebSocket API中,浏览器和服务器只需要做一个握手动作,然后浏览器和服务器之间就行程了一条快速通道,两者就可以直接相互传送数据了。WebSocket基于TCP双向全双工进行消息传递,在同一
目录
- 1、WebSocket协议
- 2、HTML5 WebSocket API
- 3、Netty中的WebSocket
- 4、实例(自行实现WebSocket握手)
- 5、实例(使用WebSocketServerProtocolHandler)
- 5.1、WebSocketServerProtocolHandlerhandler的Added处理器添加事件
- 5.2、WebSocketServerProtocolHandshakeHandler的channelRead
- 5.3、isNotWebSocketPath验证URL
- 5.4、sendHttpResponse发送消息
- 5.5、WebSocketServerHandshakerFactory的newHandshaker创建握手对象
- 5.6、forbiddenHttpRequestResponder
- 5.7、WebSocketServerHandshaker的handshake
- 5.8、applyHandshakeTimeout
- 5.9、改写后的代码
1、WebSocket协议
1.1、协议简介
WebSocket是HTML5开始提供的一种浏览器与服务器间进行全双工通信的网络技术,WebSocket通信协议于2011年被IETF定为RFC6455,WebSockeet API被W3C定为标准。
在WebSocket API中,浏览器和服务器只需要做一个握手动作,然后浏览器和服务器之间就行程了一条快速通道,两者就可以直接相互传送数据了。WebSocket基于TCP双向全双工进行消息传递,在同一时刻,既可以发送消息,,也可以接收消息,相比HTTP的半双工协议,性能得到很大提升。
WebSocket的特点:
- 单一的TCP连接,采用全双工模式通信
- 对代理、防火墙和路由器透明
- 无头部信息、Cookie和身份验证
- 无安全开销
- 通过"ping/pong"帧保持链路激活
- 服务器可以主动传递消息给客户端,不再需要客户端轮询
1.2、WebSocket连接建立
建立WebSocket连接时,需要通过客户端或者浏览器发出握手请求,请求消息示例如图:
为了建立一个WebSocket连接,客户端浏览器首先要向服务器发起一个HTTP请求,这个请求和通常的HTTP请求不同,包含了一些附加头信息,其中附加头信息"Upgrade:WebSocket"表明这是一个申请协议升级的HTTP请求。服务器端解析这些附加的头信息,然后生成应答信息返回给客户端,客户端和服务器端的WebSocket连接就建立起来了,双方可以通过这个连接通道自由的传递信息,并且这个连接会持续存在直到客户端或者服务器端的某一方主动关闭连接。
服务端返回给客户端的应答消息如图:
请求消息中的"Sec-WebSocket-Key"是随机的,服务器端会用这些数据来构造出一个SHA-1的信息摘要,把"Sec-WebSocket-Key"加上一个魔法字符串"258EAFA5-E914-47DA-95CA-C5AB0DC85B11"。使用SHA-1加密,然后进行BASE-64编码,将结果作为"Sec-WebSocket-Accept"头的值,返回给客户端。
1.3、WebSocket生命周期
握手成功之后,服务端和客户端就可以通过"messages"的方式进行通信了,一个消息由一个或者多个帧组成,WebSocket的消息并不一定对应一个特定网络层的帧,它可以被分割成多个帧或者被合并。
帧都有自己对应的类型,属于同一个消息的多个帧具有相同类型的数据。从广义上将,数据类型可以是文本数据(UTF-8[RFC3629]文字)、二进制数据和控制帧。
1.4、 WebSocket连接关闭
为关闭WebSocket连接,客户端和服务端需要通过一个安全的方法关闭底层TCP连接以及TLS会话。如果合适,丢弃任何可能已经接收的字节,必要时(比如受到攻击)可以通过任何可用的手段关闭连接。
底层的TCP连接,在正常情况下,应该首先由服务器关闭。在异常情况下(例如在一个合理的时间周期后没有接收到服务器的TCP Close),客户端可以发起TCP Close。因此,当服务器被指示关闭WebSocket连接时,它应该立即发起一个TCP Close草走;客户端应该等待服务器的TCP Close。
WebSocket的握手关闭消息带有一个状态码和一个可选的关闭原因,它必须按照协议要求发送一个Close控制帧,当对端受到关闭控制帧指令时,需要主动关闭WebSocket连接。
2、HTML5 WebSocket API
2.1、创建实例
创建一个 WebSocket 实例,即和 WebSocket 服务器之间建立了连接:
var socket = new WebSocket('ws://domain:port');
2.2、发送数据
它是用来给 WebSocket 服务器发送数据的方法:
var socket = new WebSocket('ws://domain:port');
socket.send("send message to server");
2.3、关闭连接
它是用来关闭和 WebSocket 服务器之间的连接:
var socket = new WebSocket('ws://domain:port');
socket.close();
2.4、回调函数
onopen
它是用来监听客户端链接 WebSocket 服务器成功的事件:
var socket = new WebSocket('ws://domain:port');
socket.onopen = function(evt){
};
onerror
它是用来监听客户端和 WebSocket 服务器数据交互产生错误(连接失败、发送或者接收数据失败、处理事件失败等)的事件:
var socket = new WebSocket('ws://domain:port');
socket.onerror = function(evt){
};
onmessage
它是用来监听客户端和 WebSocket 服务器之间发送数据的事件,其中数据会包含在回调函数的传入参数evt中:
var socket = new WebSocket('ws://domain:port');
socket.onmessage = function(evt){
};
onclose
它是用来监听客户端与 WebSocket 服务器断开连接的事件:
var socket = new WebSocket('ws://domain:port');
socket.onclose = function(evt){
};
3、Netty中的WebSocket
3.1、WebSocket的版本
WebSocketVersion是一个枚举类型,它里面定义了websocket的4个版本,除了UNKNOWN之外,我们可以看到websocket的版本有0,7,8,13这几个。
public enum WebSocketVersion {
UNKNOWN,
V00,
V07,
V08,
V13;
/**
* @return Value for HTTP Header 'Sec-WebSocket-Version'
*/
public String toHttpHeaderValue() {
if (this == V00) {
return "0";
}
if (this == V07) {
return "7";
}
if (this == V08) {
return "8";
}
if (this == V13) {
return "13";
}
throw new IllegalStateException("Unknown web socket version: " + this);
}
}
3.2、FrameDecoder和FrameEncoder
我们知道websocket的消息是通过frame来传递的,因为不同websocket的版本影响到的是frame的格式的不同。所以我们需要不同的FrameDecoder和FrameEncoder来在WebSocketFrame和ByteBuf之间进行转换。
既然websocket有四个版本,那么相对应的就有4个版本的decoder和encoder:
WebSocket00FrameDecoder
WebSocket00FrameEncoder
WebSocket07FrameDecoder
WebSocket07FrameEncoder
WebSocket08FrameDecoder
WebSocket08FrameEncoder
WebSocket13FrameDecoder
WebSocket13FrameEncoder
3.3、WebSocketServerHandshaker
netty提供了一个WebSocketServerHandshaker类来统一使用encoder和decoder的使用。netty提供一个工厂类WebSocketServerHandshakerFactory根据客户端请求header的websocket版本不同,来返回不同的WebSocketServerHandshaker。
public WebSocketServerHandshaker newHandshaker(HttpRequest req) {
CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);
if (version != null) {
if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) {
// Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).
return new WebSocketServerHandshaker13(
webSocketURL, subprotocols, decoderConfig);
} else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) {
// Version 8 of the wire protocol - version 10 of the draft hybi specification.
return new WebSocketServerHandshaker08(
webSocketURL, subprotocols, decoderConfig);
} else if (version.equals(WebSocketVersion.V07.toHttpHeaderValue())) {
// Version 8 of the wire protocol - version 07 of the draft hybi specification.
return new WebSocketServerHandshaker07(
webSocketURL, subprotocols, decoderConfig);
} else {
return null;
}
} else {
// Assume version 00 where version header was not specified
return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig);
}
}
同样的, 我们可以看到,netty为websocket也定义了4种不同的WebSocketServerHandshaker。
WebSocketServerHandshaker中定义了handleshake方法,通过传入channel,并向其添加encoder和decoder
public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, final ChannelPromise promise)
p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
而添加的这两个newWebSocketEncoder和newWebsocketDecoder就是各个WebSocketServerHandshaker的具体实现中定义的。
3.4、WebSocketFrame
所有的ecode和decode都是在WebSocketFrame和ByteBuf中进行转换。WebSocketFrame继承自DefaultByteBufHolder,表示它是一个ByteBuf的容器。除了保存有ByteBuf之外,它还有两个额外的属性,分别是finalFragment和rsv。
finalFragment表示该frame是不是最后一个Frame。对于大数据量的消息来说,会将消息拆分成为不同的frame,这个属性特别有用。
我们再看一下websocket协议消息的格式:
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
rsv代表的是消息中的扩展字段,也就是RSV1,RSV2和RSV3。
除此之外就是ByteBuf的一些基本操作了。
WebSocketFrame是一个抽象类,它的具体实现类有下面几种:
BinaryWebSocketFrame //二进制的消息帧
CloseWebSocketFrame //关闭连接的控制帧
ContinuationWebSocketFrame //表示消息中多于一个帧的标识
PingWebSocketFrame //客户端发送的心跳帧
PongWebSocketFrame //服务端响应的心跳帧
TextWebSocketFrame //文本的消息帧
4、实例(自行实现WebSocket握手)
4.1、Netty服务端
public class WebSocketServer {
public void run(int port) throws Exception {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//HttpServerCodec,将请求和应答消息编码或者解码为HTTP消息
pipeline.addLast("http-codec", new HttpServerCodec());
//HttpObjectAggregator,将HTTP消息的多个部分组合成一条完整的HTTP消息
pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
//ChunkedWriteHandler,用来向客户端发送HTML5文件,主要用于支持浏览器和服务端进行WebSocket通信
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
//自定义处理协议内容
pipeline.addLast("handler", new WebSocketServerHandler());
}
});
Channel ch = server.bind(port).sync().channel();
System.out.println("Web socket server started at port " + port + ".");
System.out.println("Open your browser and navigate to http://localhost:" + port + "/");
ch.closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new WebSocketServer().run(port);
}
}
public class WebSocketServerHandler extends ChannelInboundHandlerAdapter {
private WebSocketServerHandshaker handshaker;
private static String getWebSocketLocation(FullHttpRequest req) {
String location = req.headers().get("Host") + "/ws";
return "ws://" + location;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//传统HTTP接入
if (msg instanceof FullHttpRequest) {
handleHttpRequest(ctx, (FullHttpRequest) msg);
}
//WebSocket接入
else if (msg instanceof WebSocketFrame) {
handleWebSocketFrame(ctx, (WebSocketFrame) msg);
}
}
/**
* 处理WebSocket接入
* @param ctx
* @param frame
*/
private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
//判断是否是关闭链路的指令
if (frame instanceof CloseWebSocketFrame) {
handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
return;
}
//判断是否是Ping消息
if (frame instanceof PingWebSocketFrame) {
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
//只支持文本消息,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame)) {
throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
}
if (ctx == null || this.handshaker == null || ctx.isRemoved()) {
throw new Exception("尚未握手成功,无法向客户端发送WebSocket消息");
}
//返回应答消息
String request = ((TextWebSocketFrame) frame).text();
ctx.channel().write(new TextWebSocketFrame(
request + ", 欢迎使用Netty WebSocket服务,现在时刻:" + new Date().toString()));
}
/**
* 处理HTTP接入
* @param ctx
* @param req
*/
private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
//如果HTTP解码失败,返回HTTP异常
//如果消息头中没有包含Upgrade字段或者它的值不是websocket,则返回Http 400响应
if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
return;
}
//构造握手响应返回,本机测试
//构造握手工厂
WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, false);
//创建握手处理类WebSocketServerHandshaker
handshaker = wsFactory.newHandshaker(req);
if (handshaker == null) {
//不支持websocket协议
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
//构造我后响应消息返回给客户端
ChannelFuture future = handshaker.handshake(ctx.channel(), req);
if (future.isSuccess()) {
//dosomething
}
}
}
/**
* 发送HTTP响应
* @param ctx
* @param req
* @param res
*/
private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) {
//返回应答给客户端
if (res.status().code() != 200) {
ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8);
res.content().writeBytes(buf);
buf.release();
HttpUtil.setContentLength(res, res.content().readableBytes());
}
//发送应答消息
ChannelFuture f = ctx.channel().writeAndFlush(res);
//如果是非Keep-Alive,关闭连接
if (!HttpUtil.isKeepAlive(req) || res.status().code() != 200) {
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
4.2、浏览器客户端
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
Netty WebSocket 时间服务器
</head>
<br/>
<body>
<br>
<script type="text/javascript">
var socket;
if (!window.WebSocket) {
window.WebSocket = window.MozWebSocket;
}
if (window.WebSocket) {
socket = new WebSocket("ws://localhost:8080/websocket");
socket.onmessage = function (event) {
console.log("xxx" + event)
var ta = document.getElementById('responseText');
ta.value = ta.value + '\n' + event.data;
};
socket.onopen = function (event) {
var ta = document.getElementById('responseText');
ta.value = "打开WebSocket服务正常,浏览器支持WebSocket!";
};
socket.onclose = function (event) {
var ta = document.getElementById('responseText');
ta.value = "WebSocket 关闭!";
}
} else {
alert("抱歉,您的浏览器不支持WebSocket协议!");
}
function send(message) {
if (!window.WebSocket) {
return;
}
if (socket.readyState == WebSocket.OPEN) {
socket.send(message);
}else {
alert("WebSocket连接没有建立成功!");
}
}
</script>
<form onsubmit="return false;">
<input type="text" name="message" value="NettyWebSocket">
<br/><br/>
<input type="button" value="发送WebSocket请求消息" onclick="send(this.form.message.value)">
<hr color="blue"/>
<h3>服务端返回的应答消息</h3>
<textarea id="responseText" style="width:500px; height: 300px;"></textarea>
</form>
</body>
</html>
4.3、运行效果
5、实例(使用WebSocketServerProtocolHandler)
5.1、WebSocketServerProtocolHandlerhandler的Added处理器添加事件
WebSocketServerProtocolHandler是Netty提供的用于将HTTP协议升级为ws协议的Handler。
首先就是添加WebSocketServerProtocolHandler之后的handlerAdded事件。因为是WebSocket协议,肯定需要一些处理器,所以这里就会添加一些处理器,比如第一次的握手处理器,UFT8帧验证器来验证文本帧,还要关闭帧处理器,用来响应关闭帧
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
ChannelPipeline cp = ctx.pipeline();
if (cp.get(WebSocketServerProtocolHandshakeHandler.class) == null) {
// Add the WebSocketHandshakeHandler before this one.在前面添加一个握手处理器
cp.addBefore(ctx.name(), WebSocketServerProtocolHandshakeHandler.class.getName(),
new WebSocketServerProtocolHandshakeHandler(serverConfig));
}
if (serverConfig.decoderConfig().withUTF8Validator() && cp.get(Utf8FrameValidator.class) == null) {
// Add the UFT8 checking before this one.在前面添加帧验证器
cp.addBefore(ctx.name(), Utf8FrameValidator.class.getName(),
new Utf8FrameValidator());
}
if (serverConfig.sendCloseFrame() != null) {//添加关闭帧处理器
cp.addBefore(ctx.name(), WebSocketCloseFrameHandler.class.getName(),
new WebSocketCloseFrameHandler(serverConfig.sendCloseFrame(), serverConfig.forceCloseTimeoutMillis()));
}
}
添加完之后就是这个样子(先不管自定义的处理器):
5.2、WebSocketServerProtocolHandshakeHandler的channelRead
它负责做以下几件事情:
- 验证协议url。
- 验证GET的请求升级。
- 替换当前处理器为forbiddenHttpRequestResponder。
- 创建握手WebSocketServerHandshaker 对象,进行握手。
- 启动一个定义任务进行超时回调。
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
final FullHttpRequest req = (FullHttpRequest) msg;
if (isNotWebSocketPath(req)) {//不是websocket路径就不管
ctx.fireChannelRead(msg);
return;
}
try {
if (!GET.equals(req.method())) {//只有GET支持的升级的
sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN, ctx.alloc().buffer(0)));
return;
}
//创建握手工厂
final WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
getWebSocketLocation(ctx.pipeline(), req, serverConfig.websocketPath()),
serverConfig.subprotocols(), serverConfig.decoderConfig());
final WebSocketServerHandshaker handshaker = wsFactory.newHandshaker(req);//创建一个握手处理器
final ChannelPromise localHandshakePromise = handshakePromise;//握手回调
if (handshaker == null) {//不支持的版本
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
} else {
WebSocketServerProtocolHandler.setHandshaker(ctx.channel(), handshaker);//设置处理器
ctx.pipeline().replace(this, "WS403Responder",
WebSocketServerProtocolHandler.forbiddenHttpRequestResponder());//把当前处理器替换掉,变成403
final ChannelFuture handshakeFuture = handshaker.handshake(ctx.channel(), req);
handshakeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {//发送不成功
localHandshakePromise.tryFailure(future.cause());
ctx.fireExceptionCaught(future.cause());
} else {//发送成功
localHandshakePromise.trySuccess();
// 保持兼容性 触发事件
ctx.fireUserEventTriggered(//这个HANDSHAKE_COMPLETE是过时的
WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE);
ctx.fireUserEventTriggered(//这个是新的
new WebSocketServerProtocolHandler.HandshakeComplete(
req.uri(), req.headers(), handshaker.selectedSubprotocol()));
}
}
});
applyHandshakeTimeout();
}
} finally {
req.release();
}
}
5.3、isNotWebSocketPath验证URL
这个主要就是验证URL是否是WebSocke的URL,主要就是判断创建时候传进去的这个"/wc":
private boolean isNotWebSocketPath(FullHttpRequest req) {
return checkStartsWith ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath);
}
5.4、sendHttpResponse发送消息
如果响应的状态码不是200或者请求不是设置长连接,就关闭通道了。
private static void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, HttpResponse res) {
ChannelFuture f = ctx.channel().writeAndFlush(res);
if (!isKeepAlive(req) || res.status().code() != 200) {//req不支持KeepAlive,或者res状态码不是200就等写完成了关闭通道
f.addListener(ChannelFutureListener.CLOSE);
}
}
ChannelFutureListener CLOSE = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
future.channel().close();
}
};
5.5、WebSocketServerHandshakerFactory的newHandshaker创建握手对象
根据请求头信息的sec-websocket-version来决定要哪个版本的握手对象,一般都是13,如果都不支持就会返回null。
public WebSocketServerHandshaker newHandshaker(HttpRequest req) {
//从请求头获取WEBSOCKET版本,根据不同版本,返回不同握手对象
CharSequence version = req.headers().get(HttpHeaderNames.SEC_WEBSOCKET_VERSION);
if (version != null) {
if (version.equals(WebSocketVersion.V13.toHttpHeaderValue())) {
// Version 13 of the wire protocol - RFC 6455 (version 17 of the draft hybi specification).
return new WebSocketServerHandshaker13(
webSocketURL, subprotocols, decoderConfig);
} else if (version.equals(WebSocketVersion.V08.toHttpHeaderValue())) {
// Version 8 of the wire protocol - version 10 of the draft hybi specification.
return new WebSocketServerHandshaker08(
webSocketURL, subprotocols, decoderConfig);
} else if (version.equals(WebSocketVersion.V07.toHttpHeaderValue())) {
// Version 8 of the wire protocol - version 07 of the draft hybi specification.
return new WebSocketServerHandshaker07(
webSocketURL, subprotocols, decoderConfig);
} else {
return null;
}
} else {//没指定版本的情况
// Assume version 00 where version header was not specified
return new WebSocketServerHandshaker00(webSocketURL, subprotocols, decoderConfig);
}
}
5.6、forbiddenHttpRequestResponder
这个就是用来创建禁止HTTP请求的响应器,只要握手对象创建好了,就不需要响应HTTP了,直接就把当前处理器WebSocketServerProtocolHandler给替换了。
static ChannelHandler forbiddenHttpRequestResponder() {
return new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof FullHttpRequest) {
((FullHttpRequest) msg).release();
FullHttpResponse response =
new DefaultFullHttpResponse(HTTP_1_1, HttpResponseStatus.FORBIDDEN, ctx.alloc().buffer(0));
ctx.channel().writeAndFlush(response);//从通道尾部开始
} else {
ctx.fireChannelRead(msg);
}
}
};
}
替换之后就是这样:
5.7、WebSocketServerHandshaker的handshake
握手对象进行握手,其实就是发送响应数据。先会创建一个FullHttpResponse 响应,然后把跟HTTP相关的聚合,压缩处理器删除,如果有HttpServerCodec,那就在前面添加websocket的编解码器,等发送响应成功了把HttpServerCodec删了。如果是HTTP编解码器,就把解码器先替换成websocket的解码器,等发送响应成功了,再把编码器替换成websocket的编码器。
public final ChannelFuture handshake(Channel channel, FullHttpRequest req,
HttpHeaders responseHeaders, final ChannelPromise promise) {
if (logger.isDebugEnabled()) {
logger.debug("{} WebSocket version {} server handshake", channel, version());
}
FullHttpResponse response = newHandshakeResponse(req, responseHeaders);//创建响应
ChannelPipeline p = channel.pipeline();
if (p.get(HttpObjectAggregator.class) != null) {
p.remove(HttpObjectAggregator.class);//删除聚合
}
if (p.get(HttpContentCompressor.class) != null) {//删除压缩
p.remove(HttpContentCompressor.class);
}
ChannelHandlerContext ctx = p.context(HttpRequestDecoder.class);//请求解码器
final String encoderName;
if (ctx == null) {//不存在
// this means the user use an HttpServerCodec
ctx = p.context(HttpServerCodec.class);//HttpServerCodec是否存在
if (ctx == null) {//也不存在,就没办法解码http了,失败了
promise.setFailure(
new IllegalStateException("No HttpDecoder and no HttpServerCodec in the pipeline"));
return promise;
}//在之前添加WebSocket编解码
p.addBefore(ctx.name(), "wsencoder", newWebSocketEncoder());
p.addBefore(ctx.name(), "wsdecoder", newWebsocketDecoder());
encoderName = ctx.name();
} else {
p.replace(ctx.name(), "wsdecoder", newWebsocketDecoder());//替换HttpRequestDecoder
encoderName = p.context(HttpResponseEncoder.class).name();
p.addBefore(encoderName, "wsencoder", newWebSocketEncoder());//在HttpResponseEncoder之前添加编码器
}//监听发出事件
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
ChannelPipeline p = future.channel().pipeline();
p.remove(encoderName);//成功了就把http的编码器删除了,HttpServerCodec或者HttpResponseEncoder
promise.setSuccess();
} else {
promise.setFailure(future.cause());
}
}
});
return promise;
}
发送回调前是这样:
发送回调成功后是这样:
5.8、applyHandshakeTimeout
发送可能会等好久,所以就给了个超时的定时任务,默认设置是10秒,超时了就触发超时事件,然后关闭通道,如果发送回调了,就把定时任务取消。
private void applyHandshakeTimeout() {
final ChannelPromise localHandshakePromise = handshakePromise;
final long handshakeTimeoutMillis = serverConfig.handshakeTimeoutMillis();
if (handshakeTimeoutMillis <= 0 || localHandshakePromise.isDone()) {
return;//完成了就不管了
}
//起一个定时任务
final Future<?> timeoutFuture = ctx.executor().schedule(new Runnable() {
@Override
public void run() {
if (!localHandshakePromise.isDone() &&
localHandshakePromise.tryFailure(new WebSocketHandshakeException("handshake timed out"))) {
ctx.flush()//没完成就刷出去,触发超时事件,然后关闭
.fireUserEventTriggered(ServerHandshakeStateEvent.HANDSHAKE_TIMEOUT)
.close();
}
}
}, handshakeTimeoutMillis, TimeUnit.MILLISECONDS);
//如果成功了,就把超时任务取消
// Cancel the handshake timeout when handshake is finished.
localHandshakePromise.addListener(new FutureListener<Void>() {
@Override
public void operationComplete(Future<Void> f) throws Exception {
timeoutFuture.cancel(false);
}
});
}
完成握手后:
5.9、改写后的代码
public class WebSocketServer {
public void run(int port) throws Exception {
EventLoopGroup boosGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(boosGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//HttpServerCodec,将请求和应答消息编码或者解码为HTTP消息
pipeline.addLast("http-codec", new HttpServerCodec());
//HttpObjectAggregator,将HTTP消息的多个部分组合成一条完整的HTTP消息
pipeline.addLast("aggregator", new HttpObjectAggregator(65535));
//ChunkedWriteHandler,用来向客户端发送HTML5文件,主要用于支持浏览器和服务端进行WebSocket通信
pipeline.addLast("http-chunked", new ChunkedWriteHandler());
//自定义处理协议内容
// pipeline.addLast("handler", new WebSocketServerHandler());
/*
说明
1. 对应websocket ,它的数据是以 帧(frame) 形式传递
2. 可以看到WebSocketFrame 下面有六个子类
3. 浏览器请求时 ws://localhost:7000/hello ws://localhost:7000后面的表示请求的uri
4. WebSocketServerProtocolHandler 核心功能是将 http协议升级为 ws协议 , 只有升级为ws协议才能保持长连接
5. 是通过一个 状态码 101
WebSocketServerProtocolHandler对应的地址就要和ws://localhost:7000/hello他的uri的地址要对应
netty通过WebSocketServerProtocolHandler将http协议做了一个提升,返回的状态码变成了101
缺点是:后续只能响应websocket协议,无法再响应普通http请求。
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
//自定义的websocket文本帧处理
pipeline.addLast(new MyWebSocketFrameHandler());
}
});
Channel ch = server.bind(port).sync().channel();
System.out.println("Web socket server started at port " + port + ".");
System.out.println("Open your browser and navigate to http://localhost:" + port + "/");
ch.closeFuture().sync();
} finally {
boosGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
try {
port = Integer.parseInt(args[0]);
} catch (NumberFormatException e) {
e.printStackTrace();
}
}
new WebSocketServer().run(port);
}
}
public class MyWebSocketFrameHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
String request = msg.text();
System.out.println("服务器收到消息:" + request);
//返回响应
ctx.channel().writeAndFlush(new TextWebSocketFrame(
request + ", 欢迎使用Netty WebSocket服务,现在时刻:" + new Date().toString()));
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
//id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一
System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText());
System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("异常发生 " + cause.getMessage());
ctx.close(); //关闭连接
}
}
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)