Netty源码剖析之Netty启动流程

简介: 了解netty启动流程,有助于学习netty,进行自定义组件扩展

准备

1、NettyServer

public class NettyServer {

    public static void main(String[] args) throws InterruptedException {

        // 1、创建bossGroup线程组:处理网络连接事件。默认线程数:2*处理器线程数
        NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 2、创建workGroup线程组:处理网络read/write事件。 默认线程数:2*处理器线程数
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        // 3、创建服务端启动助手
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        // 4、服务端启动助手,设置线程组
        serverBootstrap.group(bossGroup,workerGroup)
                // 5、设置服务端Channel实现类
                .channel(NioServerSocketChannel.class)
                // 6、设置bossGroup线程队列中等待连接个数
                .option(ChannelOption.SO_BACKLOG,128)
                // 7、设置workerGroup中线程活跃状态
                .childOption(ChannelOption.SO_KEEPALIVE,true)
                // 使用channelInitializer 可以配置多个handler
                .childHandler(new ChannelInitializer<SocketChannel>() {// 8、设置一个通道初始化对象
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // 9、向pipeline中添加自定义的channelHandler, 处理socketChannel传送的数据
                        ch.pipeline().addLast(new NettyServerHandler());
                    }
                });

        // 10、服务端启动并绑定端口
        ChannelFuture future = serverBootstrap.bind(9999).sync();
        // 给服务器启动绑定结果,对结果进行监听,触发回调
        future.addListener((ChannelFuture channelFuture)-> {
            if(channelFuture.isSuccess()){
                System.out.println("服务器启动成功");
            }else {
                System.out.println("服务器启动失败");
            }
        });


        // 11、关闭监听通道和连接池,将异步改同步
        future.channel().closeFuture().sync();
        bossGroup.shutdownGracefully();
        workerGroup.shutdownGracefully();
    }
}

2、NettyServerHandler

/**
 * 自定义的channelHandler处理器
 *
 * 事件触发,触发相应函数
 */
public class NettyServerHandler implements ChannelInboundHandler {

    /**
     * 通道读取事件
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuffer = (ByteBuf)msg;
        System.out.println("客户端:"+byteBuffer.toString(CharsetUtil.UTF_8));
    }

    /**
     * 通道数据读取完毕事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        TimeUnit.SECONDS.sleep(2);
        ctx.writeAndFlush(Unpooled.copiedBuffer("叫我靓仔!!!".getBytes()));
    }

    /**
     * 发生异常捕获事件
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 通道就绪事件
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    }


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

    }
}

3、NettyClient

/**
 * nettyClient
 */
public class NettyClient {

    public static void main(String[] args) throws InterruptedException {
        // 1、创建线程组
        NioEventLoopGroup group = new NioEventLoopGroup();
        // 2、创建客户端启动助手bootstrap
        Bootstrap bootstrap = new Bootstrap();
        // 3、配置线程组
        bootstrap.group(group)
                // 4、定义socketChannel的实现类
                .channel(NioSocketChannel.class)
                // 5、定义channelHandler, 处理socketChannel的数据
                .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //6、向pipeline中添加自定义业务处理handler
                        ch.pipeline().addLast(new NettyClientHandler());
                    }
                });

        // 7、启动客户端, 等待连接服务端, 同时将异步改为同步
        ChannelFuture future = bootstrap.connect(new InetSocketAddress(9999)).sync();
        // 8、关闭通道和关闭连接池
        future.channel().closeFuture().sync();
        group.shutdownGracefully();


    }
}

4、NettyClientHandler

/**
 * 自定义的channelHandler处理器
 * <p>
 * 事件触发,触发相应函数
 */
public class NettyClientHandler implements ChannelInboundHandler {

    /**
     * 通道读取事件
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        System.out.println("服务端:" +
                byteBuf.toString(CharsetUtil.UTF_8));
    }

    /**
     * 通道数据读取完毕事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("不行,不行啊!!!".getBytes()));
    }

    /**
     * 发生异常捕获事件
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    /**
     * 通道就绪事件
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.writeAndFlush(Unpooled.copiedBuffer("你好哇 小客客!!!".getBytes()));
    }


    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

    }

    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {

    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {

    }
}

Netty启动流程

1、绑定端口,封装程SocketAddress
在这里插入图片描述
在这里插入图片描述

**2、创建初始化Channel,将NioServerSocketChannel绑定到Boss
NioEventLoopGroup中的EventLoop中的Selector上,指定Selector监听事件为accept**

在这里插入图片描述

2.1 反射创建NioServerSocketChannel

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

给ServerSocketChannel绑定id(唯一标识),NioMessageUnsafe(channel数据读写类),ChannelPipeline(channel业务处理管道,可以设置许多ChannelHandler进行编解码,业务处理)

在这里插入图片描述
1、感兴趣的事件是 0 指客户端建立连接
2、ServerSocketChannel开启非堵塞模式,阻塞模式会怎么样?试想下,如果服务器通道阻塞,一个客户端SocketChannel与ServerSocketChannel建立连接,进行相关操作,其它的客户端怎么办?通道被占用了啊,当然这是我的理解

2.2 初始化NioServerSocketChannel

在这里插入图片描述

@Override
    void init(Channel channel) {
        setChannelOptions(channel, newOptionsArray(), logger);
        setAttributes(channel, newAttributesArray());

        ChannelPipeline p = channel.pipeline();

        // 获取workerGroup
        final EventLoopGroup currentChildGroup = childGroup;
        /**
         * ServerBootStrap通过childHandler添加的childHandler,最终会在下面的ServerBootstrapAcceptor中添加进pipeline
         */
        final ChannelHandler currentChildHandler = childHandler;
        final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
        final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

        /**
         * 向pipeline中添加一个channelHandler
         * ChannelInitializer对象也是一个ChannelHandler
         * TODO 这里向pipeline里添加了一个ChannelHandler,会在哪里被触发呢?
         * 答:这里的channelHandler对象会在DefaultChannelPipeline#callHandlerAddedForAllHandlers方法里执行
         * 需要注意的是,这里是在ChannelInitializer抽象类的handlerAdded里调用了initChannel的抽象方法,从而调进
         * 此处的匿名对象类方法里。
         *
         */
        p.addLast(new ChannelInitializer<Channel>() {
            @Override
            public void initChannel(final Channel ch) {
                final ChannelPipeline pipeline = ch.pipeline();

                // TODO 这里是ServerBootStrap中设置的handler
                ChannelHandler handler = config.handler();

                /**
                 * 将ServerBootStrap在初始化的时候添加的ChannelHandler在此处真正的添加进pipeline中(在ServerTest中ServerBootStrap#childHandler添加的)
                 */
                if (handler != null) {
                    pipeline.addLast(handler);
                }


                /**
                 * 拿出NioServerSocketChannel绑定的NioEventLoop来执行以下线程
                 *
                 * TODO 这里addLast进来的ServerBootstrapAccetor是用于workerGroup注册客户端用的!
                 * TODO 这里的pipeline扮演这很重要的作用。
                 */
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
    }

ServerSocketChannel初始化本质上就是为绑定的ChannelPipeline(链表)设置ChannelInitializer(ChannelHandler)。这个ChannelInitializer 主要作用:
1、为ServerSocketChannel, 设置 Channelhandler(自己配置的)
2、通过NioServerSocketChannel绑定的NioEventLoop来执行以下线程任务
ServerBootstrapAccetor是用于workerGroup注册客户端用的! 这个功能后面debug展示

2.3、ServerSocketChannel注册到NioEventLoop中的Selector中

在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

就是从BossEventLoopGroup中拿一个NioEventLoop出来,将ServerSocketChannel注册到NioEventLoop上
在这里插入图片描述
在这里插入图片描述

获取到SocketChannel在创建的时候创建的NioMessageUnsafe类,进行注册

在这里插入图片描述
通过NioEvetLoop 进行 一个通道注册任务,执行任务

private void execute(Runnable task, boolean immediate) {

        // 判断当前线程是在NioEventLoop线程内,还是在外部线程
        boolean inEventLoop = inEventLoop();
        /**
         * 这里添加的Task是一个注册NioServerSocketChannel的任务!
         * 是AbstractChannel$AbstractUnsafe的一个匿名Runnable类
         */
        addTask(task);

        // 如果是外部线程调用的
        if (!inEventLoop) {
            /**
             * 这里启动一个线程,就是NioEventLoop!!启动的是NioEventLoop
             */
            startThread();
            if (isShutdown()) {
                boolean reject = false;
                try {
                    if (removeTask(task)) {
                        reject = true;
                    }
                } catch (UnsupportedOperationException e) {
                    // The task queue does not support removal so the best thing we can do is to just move on and
                    // hope we will be able to pick-up the task before its completely terminated.
                    // In worst case we will log on termination.
                }
                if (reject) {
                    reject();
                }
            }
        }

        if (!addTaskWakesUp && immediate) {
           
             */
            wakeup(inEventLoop);
        }
    }
/**
     * 核心工作就是启动NioEventLoop线程
     */
    private void doStartThread() {
        assert thread == null;

        /**
         * 1. 这里executor是ThreadExecutorMap类中的一个匿名Executor内部类,是Executor apply()这个方法返回的实例对象
         * 2. 这里传入的execute()里面的匿名内部类是SingleThreadEventExecutor中的,所以
         *      是SingleThreadEventExecutor$对象
         * 3. 执行的runnable是FastThreadLocalRunnable对象
         */
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    // 核心,就是启动NioEventLoop!!!
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        if (logger.isErrorEnabled()) {
                            logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                    SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must " +
                                    "be called before run() implementation terminates.");
                        }
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks. At this point the event loop
                        // is in ST_SHUTTING_DOWN state still accepting tasks which is needed for
                        // graceful shutdown with quietPeriod.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }

                        // Now we want to make sure no more tasks can be added from this point. This is
                        // achieved by switching the state. Any new tasks beyond this point will be rejected.
                        for (;;) {
                            int oldState = state;
                            if (oldState >= ST_SHUTDOWN || STATE_UPDATER.compareAndSet(
                                    SingleThreadEventExecutor.this, oldState, ST_SHUTDOWN)) {
                                break;
                            }
                        }

                        // We have the final set of tasks in the queue now, no more can be added, run all remaining.
                        // No need to loop here, this is the final pass.
                        confirmShutdown();
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            // Lets remove all FastThreadLocals for the Thread as we are about to terminate and notify
                            // the future. The user may block on the future and once it unblocks the JVM may terminate
                            // and start unloading classes.
                            // See https://github.com/netty/netty/issues/6596.
                            FastThreadLocal.removeAll();

                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.countDown();
                            int numUserTasks = drainTasks();
                            if (numUserTasks > 0 && logger.isWarnEnabled()) {
                                logger.warn("An event executor terminated with " +
                                        "non-empty task queue (" + numUserTasks + ')');
                            }
                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

在这里插入图片描述
在这里插入图片描述

1、判断当前线程是NioEventLoop内部线程还是外部线程(由于是主线程启动,所以是外部线程)
2、将通道注册任务放入NioEventLoop中任务队列中
3、包装任务,通过NioEventLoop,主要是为了从NioEventLoop的run方法开始执行
3、NioEventLoop通过ThreadFacoty创建线程,线程开始执行NioEventLoop run方法

 /**
     * NIOEventLoop执行核心
     */
    @Override
    protected void run() {
        int selectCnt = 0;      // 阻塞选择次数
        // 从NioEventLoop中的 taskQueue中 判断是否存在事件
        for (;;) {      // 轮训注册到selector的IO事件           为什么for(;;)比while(1)好?因为for(;;)底层的指令更少,效率更高
            try {
                int strategy;   // strategy = 0 default
                try {
                    strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());     // 获取策略。如果有任务则使用非阻塞方式
                    switch (strategy) {
                    case SelectStrategy.CONTINUE:
                        continue;

                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO

                    case SelectStrategy.SELECT:         // select事件执行
                        long curDeadlineNanos = nextScheduledTaskDeadlineNanos();       // 当前截止时间

                        if (curDeadlineNanos == -1L) {      // 表明没有定时任务
                            curDeadlineNanos = NONE; // nothing on the calendar
                        }
                        nextWakeupNanos.set(curDeadlineNanos);
                        try {
                            if (!hasTasks()) {      // 如果没有任务,则select阻塞等待任务     任务存放在SingleThreadEventLoop
                                // TODO 测试
                                System.err.println("[CurrentThread = " + Thread.currentThread().getName() + "]I'm selecting... waiting for selectKey or tasks!");
                                strategy = select(curDeadlineNanos);
                            }
                        } finally {
                            // This update is just to help block unnecessary selector wakeups
                            // so use of lazySet is ok (no race condition)
                            // 标记未唤醒状态
                            nextWakeupNanos.lazySet(AWAKE);
                        }
                        // fall through
                    default:
                    }
                } catch (IOException e) {
                    // If we receive an IOException here its because the Selector is messed up. Let's rebuild
                    // the selector and retry. https://github.com/netty/netty/issues/8566
                    rebuildSelector0();
                    selectCnt = 0;
                    handleLoopException(e);
                    continue;
                }

                System.err.println("[CurrentThread = " + Thread.currentThread().getName() + "] select() 调用完了,此时已经有事件进来了?");
                selectCnt++;    // 选择次数+1
                cancelledKeys = 0;
                needsToSelectAgain = false;

                final int ioRatio = this.ioRatio;       // 这里的ioRatio默认是50
                boolean ranTasks;
                if (ioRatio == 100) {
                    try {
                        if (strategy > 0) {
                            processSelectedKeys();      // 处理选择key,处理io相关的逻辑
                        }
                    } finally {
                        ranTasks = runAllTasks();   // 处理外部线程扔到taskQueue里的任务,这里的taskQueue是一个mpscQueue
                    }
                } else if (strategy > 0) {
                    final long ioStartTime = System.nanoTime();     // 计算处理选择key的时间
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                } else {
                    /**
                     * 在Netty中,有两种任务,普通任务和定时任务。在执行任务的时候,会把定时任务队列里的task扔进普通任务队列里,
                     * 这里的普通任务队列就是mpscQueue,接着就挨个执行mpscQueue里的任务。
                     *
                     * 任务:普通任务 、定时任务
                     * 队列:普通任务队列mpscQueue 、 定时任务队列
                     *
                     */
                    ranTasks = runAllTasks(0); // This will run the minimum number of tasks
                }

                if (ranTasks || strategy > 0) {
                    if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
                        logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
                                selectCnt - 1, selector);
                    }
                    selectCnt = 0;
                } else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)       解决空轮训Bug,重置selectCnt,重新生成selector
                    selectCnt = 0;
                }
            } catch (CancelledKeyException e) {
                // Harmless exception - log anyway
                if (logger.isDebugEnabled()) {
                    logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
                            selector, e);
                }
            } catch (Error e) {
                throw (Error) e;
            } catch (Throwable t) {
                handleLoopException(t);
            } finally {
                // Always handle shutdown even if the loop processing threw an exception.
                try {
                    if (isShuttingDown()) {
                        closeAll();
                        if (confirmShutdown()) {
                            return;
                        }
                    }
                } catch (Error e) {
                    throw (Error) e;
                } catch (Throwable t) {
                    handleLoopException(t);
                }
            }
        }
    }

就是NioEventLoop中的线程进行死循环处理事件,通过事件进行驱动处理

1、判断NioEventLoop中的任务队列中是否存在任务 (存在通道注册任务哦!!),从而定义处理策略

 strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());     // 获取策略。如果有任务则使用非阻塞方式

在这里插入图片描述
有任务,那么 通过 selector 获取select 值,selector如果获取这个值呢?没看懂
在这里插入图片描述

2、拿到的select值为0,好像是没有SelectedKey,那么执行runTask,从任务队列中获取 之前的通道注册事件

protected boolean runAllTasks(long timeoutNanos) {

        // 从定时任务队列中把任务聚合到普通队列里
        fetchFromScheduledTaskQueue();


        // 从普通任务队列里拿任务
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {

            // 真正执行了任务
            safeExecute(task);
            taskTimes++;

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            // 当累积到64个任务的时候,这里判断是因为任务的执行是比较耗时的
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();

                // 如果当前时间 >= 截止时间,即已经超过了截止时间了
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            // 将任务从任务队列中弹出
            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

在这里插入图片描述
任务真正被执行了哦!
在这里插入图片描述

@Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                /**
                 * 调用selectableChannel的register方法,用于在给定的NioEventLoop上的selector上注册这个通道channel,并返回一个选择键
                 *
                 * OP_READ = 1 << 0                 读操作位
                 * OP_WRITE = 1 << 2                写操作位
                 * OP_CONNECT = 1 << 3              客户端连接到服务端操作位
                 * OP_ACCEPT = 1 << 4               服务端接受客户端链接操作位
                 *
                 * 此处调用了jdk nio的selectableChannel的register方法,传入的操作位是0,表明对任何事件都不感兴趣,仅仅是完成注册操作。
                 *
                 * 向selector注册channel成功后,会返回一个selectionKey,后续可以拿着这个selectionKey获取到channel。
                 *
                 * javaChannel()拿到的是:AbstractSelectableChannel,在其register方法里,会调用addKey,给selectionKey添加默认数组大小3
                 * 并最终调用jdk底层register
                 *
                 */
                selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    // Force the Selector to select now as the "canceled" SelectionKey may still be
                    // cached and not removed because no Select.select(..) operation was called yet.
                    /**
                     * 由于未调用select#select(),因此可能仍然在缓存,而未删除但是已经取消了的selectionKey,强制调用selectNow()
                     * 将selectionKey从Selector上删除
                     */
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    // We forced a select operation on the selector before but the SelectionKey is still cached
                    // for whatever reason. JDK bug ?
                    throw e;
                }
            }
        }
    }

拿到ServerSocketChannel,注册到NioEventLoop中的Selector中 ,注册成功返回一个SelectionKey 。 对应代码 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); 这个0表示通道注册

 @Override
    protected final SelectionKey register(AbstractSelectableChannel ch,
                                          int ops,
                                          Object attachment)
    {
        if (!(ch instanceof SelChImpl))
            throw new IllegalSelectorException();
        SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
        k.attach(attachment);

        // register (if needed) before adding to key set
        implRegister(k);

        // add to the selector's key set, removing it immediately if the selector
        // is closed. The key is not in the channel's key set at this point but
        // it may be observed by a thread iterating over the selector's key set.
        keys.add(k);
        try {
            k.interestOps(ops);
        } catch (ClosedSelectorException e) {
            assert ch.keyFor(this) == null;
            keys.remove(k);
            k.cancel();
            throw e;
        }
        return k;
    }

1、将ServerSocketChannel,Selector 封装成 SelectionKeyImpl(所谓的通道注册凭证)
2、将ski (SelectionKeyImpl)放入 selector newKeys 队列
3、将ski 放入 selector的 SelectionKey集合中
4、设置NioEvenLoop中当前这个Selector 感兴趣的事件/操作

至此成功将ServerSocketChannel 放入 Selector中,并设置Selector所感兴趣的事件 0

2.4、调用ServerSocketChannel中的pipeLine,进行AddHandler

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

这里又会走到NioEventLoop中的run方法中,然后执行上图任务,整个流程走完

问题:

1、Selector如何进行Select
在NioEventLoop,ServerSocketChannel创建完后,会将channel注册到Selector中并设置通道感兴趣的事件,这里是注册通道事件。NioEventLoop会开启一个线程死循环,run方法内部 Selector会通过 select()/selectNow() 以阻塞非阻塞方式等待感兴趣的事件。具体需要看Selector 选择的源码

如果没有任务,select会阻塞
在这里插入图片描述
如果有任务,以非阻塞的方式执行

在这里插入图片描述

如果通道有IO事件,那么进行处理,内部会对感兴趣的客户端建立连接事件/通道注册 进行处理

在这里插入图片描述

总结

在这里插入图片描述

相关文章
|
7月前
|
Java
【Netty 网络通信】Netty 工作流程分析
【1月更文挑战第9天】Netty 工作流程分析
|
存储 缓存 NoSQL
跟着源码学IM(十一):一套基于Netty的分布式高可用IM详细设计与实现(有源码)
本文将要分享的是如何从零实现一套基于Netty框架的分布式高可用IM系统,它将支持长连接网关管理、单聊、群聊、聊天记录查询、离线消息存储、消息推送、心跳、分布式唯一ID、红包、消息同步等功能,并且还支持集群部署。
13510 1
|
7月前
|
消息中间件 Oracle Dubbo
Netty 源码共读(一)如何阅读JDK下sun包的源码
Netty 源码共读(一)如何阅读JDK下sun包的源码
132 1
|
NoSQL Java Redis
跟着源码学IM(十二):基于Netty打造一款高性能的IM即时通讯程序
关于Netty网络框架的内容,前面已经讲了两个章节,但总归来说难以真正掌握,毕竟只是对其中一个个组件进行讲解,很难让诸位将其串起来形成一条线,所以本章中则会结合实战案例,对Netty进行更深层次的学习与掌握,实战案例也并不难,一个非常朴素的IM聊天程序。 原本打算做个多人斗地主练习程序,但那需要织入过多的业务逻辑,因此一方面会带来不必要的理解难度,让案例更为复杂化,另一方面代码量也会偏多,所以最终依旧选择实现基本的IM聊天程序,既简单,又能加深对Netty的理解。
168 1
|
7月前
|
编解码 前端开发 网络协议
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
Netty Review - ObjectEncoder对象和ObjectDecoder对象解码器的使用与源码解读
166 0
|
7月前
|
编解码 安全 前端开发
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
Netty Review - StringEncoder字符串编码器和StringDecoder 解码器的使用与源码解读
260 0
|
分布式计算 网络协议 前端开发
【Netty底层数据交互源码】
【Netty底层数据交互源码】
|
Java 容器
【深入研究NIO与Netty线程模型的源码】
【深入研究NIO与Netty线程模型的源码】
|
编解码 弹性计算 缓存
Netty源码和Reactor模型
Netty源码和Reactor模型
106 0
|
设计模式 监控 前端开发
第 10 章 Netty 核心源码剖析
第 10 章 Netty 核心源码剖析
132 0