分析NioEventLoopGroup最主有两个疑问
1.next work如何分配NioEventLoop
2.boss group 与child group 是如何协作运行的
从EventLoopGroup接口约定通过register方法从channel或promise转换成ChannelFuture对象
next方法就是用来分配NioEventLoop
public interface EventLoopGroup extends EventExecutorGroup { @Override EventLoop next(); ChannelFuture register(Channel channel); ChannelFuture register(ChannelPromise promise); @Deprecated ChannelFuture register(Channel channel, ChannelPromise promise); }
为了节省篇副,做了代码整理
1.NioEventLoopGroup构造时绑定SelectorProvider.provider(),通过newChild生成单个EventLoop
2.next实现是个环形循环
3.register方法是将channel转换成ChannelFuture
读者如果感兴趣可以在这几个方法打上断点看看
public class NioEventLoopGroup extends MultithreadEventLoopGroup { public NioEventLoopGroup(int nThreads, Executor executor) { this(nThreads, executor, SelectorProvider.provider()); } @Override protected EventLoop newChild(Executor executor, Object... args) throws Exception { return new NioEventLoop(this, executor, (SelectorProvider) args[0], ((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2]); } /GenericEventExecutorChooser实现next// @Override public EventExecutor next() { return executors[Math.abs(idx.getAndIncrement() % executors.length)]; } /SingleThreadEventLoop实现register// @Override public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); } @Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; } }
我们用过程的方式来模拟NioEventLoopGroup使用
如果读者有印象netty server 至少有两组NioEventLoopGroup 一个是boss 另一个是child
public class TestBossChildGroup { static SocketAddress address = new InetSocketAddress("localhost", 8877); @Test public void server() throws IOException { SelectorProvider bossProvider = SelectorProvider.provider(); SelectorProvider childProvider = SelectorProvider.provider(); int count = 2; AbstractSelector bossSelector = bossProvider.openSelector(); AbstractSelector[] childSelectors = new AbstractSelector[count]; for (int i = 0; i < count; i++) { childSelectors[i] = childProvider.openSelector(); } //server绑定访问端口 并向Selector注册OP_ACCEPT ServerSocketChannel serverSocketChannel = bossProvider.openServerSocketChannel(); serverSocketChannel.configureBlocking(false); serverSocketChannel.bind(address); serverSocketChannel.register(bossSelector, SelectionKey.OP_ACCEPT); int i = 0; while (true) { int s = bossSelector.select(300); if (s > 0) { Set<SelectionKey> keys = bossSelector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); //为什么不用elseIf 因为 key interestOps 是多重叠状态,一次返回多个操作 if (key.isAcceptable()) { System.out.println("isAcceptable"); //这里比较巧妙,注册OP_READ交给别一个Selector处理 key.channel().register(childSelectors[i++ % count], SelectionKey.OP_READ); } //这部分是child eventLoop处理 if (key.isConnectable()) { System.out.println("isConnectable"); } if (key.isWritable()) { System.out.println("isWritable"); } if (key.isReadable()) { System.out.println("isReadable"); } key.interestOps(~key.interestOps()); it.remove(); } } } } @Test public void client() throws IOException { SocketChannel clientSocketChannel = SelectorProvider.provider().openSocketChannel(); clientSocketChannel.configureBlocking(true); clientSocketChannel.connect(address); } }
所有评论(0)