Netty——ChannelHandlerContext
ChannelHandlerContext接口ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关 联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandler- Context。ChannelHandlerContext 的主要功能是管理它所关联的 Chan
ChannelHandlerContext接口
ChannelHandlerContext 代表了 ChannelHandler 和 ChannelPipeline 之间的关 联,每当有 ChannelHandler 添加到 ChannelPipeline 中时,都会创建 ChannelHandler- Context。ChannelHandlerContext 的主要功能是管理它所关联的 ChannelHandler 和在 同一个 ChannelPipeline 中的其他 ChannelHandler 之间的交互。
ChannelHandlerContext 有很多的方法,其中一些方法也存在于 Channel 和 ChannelPipeline 本身上,但是有一点重要的不同。
- 如果调用 Channel 或者 ChannelPipeline 上的这 些方法,它们将沿着整个 ChannelPipeline 进行传播。
- 而调用位于 ChannelHandlerContext 上的相同方法,则将从当前所关联的 ChannelHandler 开始,并且只会传播给位于该 ChannelPipeline 中的下一个能够处理该事件的 ChannelHandler。
当使用 ChannelHandlerContext 的 API 的时候,请牢记以下两点:
- ChannelHandlerContext 和 ChannelHandler 之间的关联(绑定)是永远不会改
变的,所以缓存对它的引用是安全的; - 相对于其他类的同名方法,ChannelHandler Context的方法将产生更短的事件流,应该尽可能地利用这个特性来获得最大的性能。
使用ChannelHandlerContext
通过 ChannelHandlerContext 获取到 Channel 的引用。调用 Channel 上的 write()方法将会导致写入事件从尾端到头部地流经 ChannelPipeline。
//从ChannelHandlerContext访问Channel
ChannelHandlerContext ctx = ...;
Channel channel = ctx.channel();
channel.write(Unpooled.copieBuffer("Netty in Action", CharsetUtil.UTF_8))
//通过ChannelHandlerContext访问ChannelPipeline
ChannelHandlerContext ctx = ...;
ChannelPipeline pipeline = ctx.pipeline();
pipeline.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
虽然被调用的 Channel 或 ChannelPipeline 上的 write()方法将一直传播事件通 过整个 ChannelPipeline,但是在 ChannelHandler 的级别上,事件从一个 ChannelHandler 到下一个 ChannelHandler 的移动是由 ChannelHandlerContext 上的调用完成的。
为什么会想要从 ChannelPipeline 中的某个特定点开始传播事件呢?
- 为了减少将事件传经对它不感兴趣的 ChannelHandler 所带来的开销。
- 为了避免将事件传经那些可能会对它感兴趣的 ChannelHandler。
要想调用从某个特定的 ChannelHandler 开始的处理过程,必须获取到在(ChannelPipeline)该 ChannelHandler 之前的 ChannelHandler 所关联的 ChannelHandlerContext。这个 ChannelHandlerContext 将调用和它所关联的 ChannelHandler 之后的 ChannelHandler。
//调用ChannelHandlerContext的write()方法
ChannelHandlerContext ctx = ...;
ctx.write(Unpooled.copiedBuffer("Netty in Action", CharsetUtil.UTF_8));
//write()方法将把缓冲区数据发送到下一个ChannelHandler
消息将从下一个 ChannelHandler 开始流经 ChannelPipeline,绕过了 所有前面的 ChannelHandler。
ChannelHandler和ChannelHandlerContext的高级用法:
可以通过调用 ChannelHandlerContext 上的 pipeline()方法来获得被封闭的 ChannelPipeline 的引用。这使得运行时得以操作 ChannelPipeline 的 ChannelHandler,我们可以利用这一点来实现一些复杂的设计。例如, 你可以通过将 ChannelHandler 添加到 ChannelPipeline 中来实现动态的协议切换。
另一种高级的用法是缓存到 ChannelHandlerContext 的引用以供稍后使用,这可能会发 生在任何的 ChannelHandler 方法之外,甚至来自于不同的线程。
//缓存到ChannelHandlerContext的引用
public class WriteHandler extends ChannelHandlerAdapter {
private ChannelHandlerContext ctx;
@Over
public void handlerAdded(ChannelHandlerContext ctx){
this.ctx = ctx;
}
public void send(String msg){
//使用之前的ctx来发送消息
ctx.writeAndFlush(msg);
}
}
因为一个 ChannelHandler 可以从属于多个 ChannelPipeline,所以它也可以绑定到多 个 ChannelHandlerContext 实例。对于这种用法指在多个 ChannelPipeline 中共享同一 个 ChannelHandler,`对应的 ChannelHandler 必须要使用@Sharable 注解标注;否则, 试图将它添加到多个 ChannelPipeline 时将会触发异常`。显而易见,为了安全地被用于多个 并发的 Channel(即连接),这样的 ChannelHandler 必须是线程安全的。
```java
//可共享的ChannelHandler
@ChannelHandler.Sharable
public class SharableHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Channel read message: " + msg);
//记录方法调用,并转发给下一个ChannelHandler
ctx.fireChannelRead(msg);
}
}
前面的 ChannelHandler 实现符合所有的将其加入到多个 ChannelPipeline 的需求, 即它使用了注解@Sharable 标注,并且也不持有任何的状态。相反,下面中的实现将 会导致问题。
@ChannelHandler.Sharable
public class UnsharableHandler extends ChannelInboundHandlerAdapter {
private int count;
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
count++;
System.out.println("channelRead(...) called the " + count + "tieme");
ctx.fireChannelRead(msg);
}
}
这段代码的问题在于它拥有状态,即用于跟踪方法调用次数的实例变量count。将这个类 的一个实例添加到ChannelPipeline将极有可能在它被多个并发的Channel访问时导致问 题。(当然,这个简单的问题可以通过使channelRead()方法变为同步方法来修正。)
总之,只应该在确定了你的 ChannelHandler 是线程安全的时才使用@Sharable 注解。
为何要共享同一个ChannelHandler 在多个ChannelPipeline中安装同一个ChannelHandler的一个常见的原因是用于收集跨越多个 Channel 的统计信息。
异常处理
处理入站异常:
如果在处理入站事件的过程中有异常被抛出,那么它将从它在 ChannelInboundHandler 里被触发的那一点开始流经 ChannelPipeline。要想处理这种类型的入站异常,你需要在你 的 ChannelInboundHandler 实现中重写下面的方法:
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception
//基本的入站异常处理
public class InboundException extends ChannelInboundHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
因为异常将会继续按照入站方向流动(就像所有的入站事件一样),所以实现了前面所示逻 辑的 ChannelInboundHandler 通常位于 ChannelPipeline 的最后。这确保了所有的入站 异常都总是会被处理,无论它们可能会发生在 ChannelPipeline 中的什么位置。
- ChannelHandler.exceptionCaught()的默认实现是简单地将当前异常转发给ChannelPipeline 中的下一个 ChannelHandler;
- 如果异常到达了 ChannelPipeline 的尾端,它将会被记录为未被处理;
- 要想定义自定义的处理逻辑,你需要重写
exceptionCaught()
方法。然后你需要决定是否需要将该异常传播出去。
处理出站异常:
用于处理出站操作中的正常完成以及异常的选项,都基于以下的通知机制:
- 每个出站操作都将返回一个ChannelFuture。注册到ChannelFuture的ChannelFutureListener 将在操作完成时被通知该操作是成功了还是出错了。
- 几乎所有的 ChannelOutboundHandler 上的方法都会传入一个 ChannelPromise 的实例。作为 ChannelFuture 的子类,ChannelPromise 也可以被分配用于异步通
知的监听器。但是,ChannelPromise 还具有提供立即通知的可写方法:
ChannelPromise setSuccess();
ChannelPromise setFailure(Throwable cause);
添加 ChannelFutureListener 只需要调用 ChannelFuture 实例上的 addListener (ChannelFutureListener)
方法,并且有两种不同的方式可以做到这一点。其中最常用的方式是, 调用出站操作(如 write()方法)所返回的 ChannelFuture 上的 addListener()方法。
//添加ChannelFutureListener到ChannelFuture
ChannelFuture future = channel.write(someMessage);
future.addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture f){
if(!f.isSuccess()){
f.cause().printStackTrace();
f.channel().close();
}
}
});
第二种方式是将 ChannelFutureListener 添加到即将作为参数传递给 ChannelOutboundHandler 的方法的 ChannelPromise。
//添加ChannelFutureListener到ChannelPromise
public class OutboundExceptionHandler extens ChannelOutboundHandlerAdapter {
@Over
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise){
promise.addListener(new ChannelFutureListener(){
@Override
public void operationComplete(ChannelFuture f){
if(!f.isSuccess()){
f.cause().printStackTrance();
f.channel().close();
}
}
});
}
}
为何选择一种方式而不是另一种呢?
对于细致的异常处理,你可能会发现,在调用出站操 作时添加 ChannelFutureListener 更合适,而对于一般的异常处 理,你可能会发现,自定义的 ChannelOutboundHandler 实现的方式 更加的简单。
如果你的 ChannelOutboundHandler 本身抛出了异常会发生什么呢?在这种情况下, Netty 本身会通知任何已经注册到对应 ChannelPromise 的监听器。
开放原子开发者工作坊旨在鼓励更多人参与开源活动,与志同道合的开发者们相互交流开发经验、分享开发心得、获取前沿技术趋势。工作坊有多种形式的开发者活动,如meetup、训练营等,主打技术交流,干货满满,真诚地邀请各位开发者共同参与!
更多推荐
所有评论(0)