Skip to content

Latest commit

 

History

History
98 lines (72 loc) · 3.23 KB

netty2_work_group.md

File metadata and controls

98 lines (72 loc) · 3.23 KB

这篇主要关注下事件是如何通过管道一步步分发到worker线程进行处理, 而worker线程, 则主要用于处理客户端的读写事件

根据上一篇的分析,调用流程:

logHandle-->ServerBootstrapAcceptor-->用户处理器

  1. 连接接入

首先NioEventLoop作为一个持续监听用户连接的入口,在其run方法维护着一个死循环,

run方法同时处理IO事件和普通任务,这里我感觉代码不是很直观,虽然通过ioRatio对IO事件和普通任务做了一定的优先级侧重,

个人认为从源码角度看,职责不够清晰,如果没有性能考虑,应该拆分成两个线程

还是分析调用流程吧:

AbstractChannel.unsafe.read()--->doReadMessages-->

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel());

    try {
        if (ch != null) {
            //注意此NioSocketChannel的构建
            buf.add(new NioSocketChannel(this, ch));
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}

doReadMessages中,通过调用channel.accept得到客户端socketchannel, 将ServerSocketChannel和客户端socketChannel包装成一个NioSocketChannel,存放到List<Object>

我们需要注意下NioSocketChannel的构造函数,因为这里隐含了workGroup的绑定事件

//NioSocketChannel.java
public NioSocketChannel(Channel parent, SocketChannel socket) {
    super(parent, socket);
    config = new NioSocketChannelConfig(this, socket.socket());
}

//AbstractNioByteChannel.java
protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
    //这里指明了客户端需要绑定的监听事件:OP_READ
    super(parent, ch, SelectionKey.OP_READ);
}

----> 循环调用 pipeline.fireChannelRead(NioSocketChannel) ----->HeadContext--->LogHandler-->ServerBootstrapAcceptor

下面是ServerBootstrip.ServerBootstrapAcceptor.java

public void channelRead(ChannelHandlerContext ctx, Object msg) {
    //msg就是NioSocketChannel,包含ServerSocketChannel和客户端socketChannel
    final Channel child = (Channel) msg;

    child.pipeline().addLast(childHandler);

    setChannelOptions(child, childOptions, logger);
    setAttributes(child, childAttrs);

    try {
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

后续的register流程和bosswork相同,不再赘述。

bossGroup传递过来的channel对象,在workgroup处会注册其read/write事件,

即同一个channel贯穿 bossGroupworkGroup

bossGroup处注册的是accept事件,在workGroup处注册的是read/write事件