(八)Java网络编程之IO模型篇-内核Select、Poll、Epoll多路复用函数源码深度历险!

简介: select/poll、epoll这些词汇相信诸位都不陌生,因为在Redis/Nginx/Netty等一些高性能技术栈的底层原理中,大家应该都见过它们的身影,接下来重点讲解这块内容。

引言

   select/poll、epoll这些词汇相信诸位都不陌生,因为在Redis/Nginx/Netty等一些高性能技术栈的底层原理中,大家应该都见过它们的身影,接下来重点讲解这块内容,不过在此之前,先上一张图概述Java-NIO的整体结构:

001.png

观察上述结构,其实Buffer、Channel的定义并不算复杂,仅是单纯的三层结构,因此对于源码这块不再去剖析,有兴趣的根据给出的目录结构去调试源码,自然也能摸透其原理实现。

而最关键的是Selector选择器,它是整个NIO体系中较为复杂的一块内容,同时它也作为Java-NIO与内核多路复用模型的“中间者”,但在上述体系中,却出现了之前未曾提及过的SelectorProvider系定义,那么它的作用是干嘛的呢?主要目的是用于创建选择器,在Java中创建一般是通过如下方式:

// 创建Selector选择器
Selector selector = Selector.open();

// Selector类 → open()方法
public static Selector open() throws IOException {
   
   
    return SelectorProvider.provider().openSelector();
}

从源码中可明显得知,选择器最终是由SelectorProvider去进行实例化,不过值得一提的是:Selector的实现是基于工厂模式与SPI机制构建的。对于不同OS而言,其对应的具体实现并不相同,因此在Windows系统下,我们只能观测到WindowsSelectorXXX这一系列的实现,而在Linux系统时,对于的则是EPollSelectorXXX这一系列的实现,所以要牢记的是,Java-NIO在不同操作系统的环境中,提供了不同的实现,如下:

  • Windowsselect
  • Unixpoll
  • Mackqueue
  • Linuxepoll

当然,本次则重点剖析Linux系统下的select、poll、epoll的具体实现,对于其他系统而言,原理大致相同。

一、JDK层面的源码入口

   简单的对于Java-NIO体系有了全面认知后,接下来以JDK源码作为入口进行剖析。在Java中,会通过Selector.select()方法去监听事件是否被触发,如下:

// 轮询监听选择器上注册的通道是否有事件被触发
while (selector.select() > 0){
   
   }

// Selector抽象类 → select()抽象方法
public abstract int select() throws IOException;

// SelectorImpl类 → select()方法
public int select() throws IOException {
   
   
    return this.select(0L);
}
// SelectorImpl类 → select()完整方法
public int select(long var1) throws IOException {
   
   
    if (var1 < 0L) {
   
   
        throw new IllegalArgumentException("Negative timeout");
    } else {
   
   
        return this.lockAndDoSelect(var1 == 0L ? -1L : var1);
    }
}

当调用Selector.select()方法后,最终会调用到SelectorImpl类的select(long var1)方法,而在该方法中,又会调用lockAndDoSelect()方法,如下:

// SelectorImpl类 → lockAndDoSelect()方法
private int lockAndDoSelect(long var1) throws IOException {
   
   
    // 先获取锁确保线程安全
    synchronized(this) {
   
   
        // 在判断当前选择是否处于开启状态
        if (!this.isOpen()) {
   
   
            // 如果已关闭则抛出异常
            throw new ClosedSelectorException();
        } else {
   
    // 如若处于开启状态
            // 获取所有注册在当前选择器上的事件
            Set var4 = this.publicKeys;
            int var10000;
            // 再次加锁
            synchronized(this.publicKeys) {
   
   
                // 获取所有已就绪的事件
                Set var5 = this.publicSelectedKeys;
                // 再次加锁
                synchronized(this.publicSelectedKeys) {
   
   
                    // 真正的调用select逻辑,获取已就绪的事件
                    var10000 = this.doSelect(var1);
                }
            }
            // 返回就绪事件的数量
            return var10000;
        }
    }
}

在该方法中,对于其他逻辑不必太过在意,重点可注意:最终会调用doSelect()触发真正的逻辑操作,接下来再看看这个方法:

// SelectorImpl类 → doSelect()方法
protected abstract int doSelect(long var1) throws IOException;

// WindowsSelectorImpl类 → doSelect()方法
protected int doSelect(long var1) throws IOException {
   
   
    // 先判断一下选择器上是否还有注册的通道
    if (this.channelArray == null) {
   
   
        throw new ClosedSelectorException();
    } else {
   
    // 如果有的话
        // 先获取一下阻塞等待的超时时长
        this.timeout = var1;
        // 然后将一些取消的事件从选择器上移除
        this.processDeregisterQueue();
        // 再判断一下是否存在线程中断唤醒
        // 这里主要是结合之前的wakeup()方法唤醒阻塞线程的
        if (this.interruptTriggered) {
   
   
            this.resetWakeupSocket();
            return 0;
        } else {
   
    // 如果没有唤醒阻塞线程的需求出现
            // 先判断一下辅助线程的数量(守护线程),多则减,少则增
            this.adjustThreadsCount();
            // 更新一下finishLock.threadsToFinish为辅助线程数
            this.finishLock.reset();
            // 唤醒所有的辅助线程
            this.startLock.startThreads();
            try {
   
   
                // 设置主线程中断的回调函数
                this.begin();

                try {
   
   
                    // 最终执行真正的poll逻辑,开始拉取事件
                    this.subSelector.poll();
                } catch (IOException var7) {
   
   
                    this.finishLock.setException(var7);
                }
                // 唤醒并等待所有未执行完的辅助线程完成
                if (this.threads.size() > 0) {
   
   
                    this.finishLock.waitForHelperThreads();
                }
            } finally {
   
   
                this.end();
            }
            // 检测状态
            this.finishLock.checkForException();
            this.processDeregisterQueue();
            // 获取当前选择器监听的事件的触发数量
            int var3 = this.updateSelectedKeys();
            // 本轮poll结束,重置WakeupSocket,为下次执行做准备
            this.resetWakeupSocket();
            // 最终返回获取到的事件数
            return var3;
        }
    }
}

整个过程下来其实也并不短暂,但大体就分为三步:

  • ①前置动作:判断通道数、获取阻塞时长、移除取消的事件以及判断是否需要被唤醒。
  • ②核心动作:更新并唤醒所有辅助线程、设置主线程中断的回调、执行poll拉取事件。
  • ③后置动作:唤醒辅助线程完成工作、检测状态、重置条件、获取事件数并返回。

在这里面,有一个辅助线程的概念,这跟最大文件描述符有关,每当选择器上注册的通道数超过1023时,新增一条线程来管理这些新增的通道。其实是1024,但其中有一个要用于唤醒,所以是1023(这里看可能有些懵,但待会分析过后就理解了)。

在这个过程中,最最最关键点在于其中的一行代码:

this.subSelector.poll();

在这里调用了poll方法,执行具体的事件拉取逻辑,进一步往下走:

// WindowsSelectorImpl类 → poll()方法
private int poll() throws IOException {
   
   
    return this.poll0(WindowsSelectorImpl.this.pollWrapper.pollArrayAddress, 
    Math.min(WindowsSelectorImpl.this.totalChannels, 1024), 
    this.readFds, this.writeFds, this.exceptFds,
    WindowsSelectorImpl.this.timeout);
}

// WindowsSelectorImpl类 → poll0()方法
private native int poll0(long var1, int var3, int[] var4, 
                int[] var5, int[] var6, long var7);

最后会调用WindowsSelectorImpl.poll()方法,而该方法最终会调用本地的native方法:poll0()方法,而在JVM的源码实现中,该方法最终会调用内核所提供的函数。

OK~,由于WindowsIDEA工具辅助,所以方便调试源码,因此这里以WindowsSelectorXXX系的举例说明,但由于整个Java-NIO的核心组件,都是基于工厂模式编写的源码,所以其他操作系统下的源码位置也相同,仅最终调用的内核函数不同!!!

最终稍做总结,JDK层面的源码入口,核心流程如下:

  • Selector抽象类 → select()抽象方法
  • SelectorImpl类 → select()方法
  • SelectorImpl类 → lockAndDoSelect()方法
  • SelectorImpl类 → doSelect()方法
  • XxxSelectorImpl类 → doSelect()方法
  • XxxSelectorImpl类 → poll()方法
  • XxxSelectorImpl类 → JNI本地的poll0()方法

如若在Windows系统下,上述的XxxSelectorImpl类则为WindowsSelectorImpl,同理,如若在Linux系统下,XxxSelectorImpl类则为EpollSelectorImpl

最后,如果大家对于JDK层面的EPoll感兴趣,可自行反编译Linux版的JDK源码,EpollSelectorXXX的相关定义位于:jdk\src\solaris\classes\sun\nio\ch\目录下。

二、JDK源码级别的入口

   经过第一阶段的分析后,会发现最终其实调用了native本地方法poll0(),在之前的《JVM运行时数据区-本地方法栈》的文章提到过,当程序执行时碰到native关键字修饰的方法时,会调用C/C++所编写的本地方法库中的实现,那么又该如何查找native方法对应的源码呢?接着一起来聊一下。


①由于Oracle-jdk是收费的,所以咱们首先下载open-jdk1.8的源码,可以自行在Open-JDK官网下载,但官网下载时,常常会由于网络不稳定而中断,下载起来相当费劲,因此也为大家提供一下《open-jdk1.8》的源码链接。


②下载之后解压源码包,然后进入jdk8-master\jdk\src\目录,在其中你会看到不同操作系统下的Java实现,JDK源码会以操作系统的类型分包,不同系统的对应不同的实现,如下:

002.png

但关于Linux系统下的Java-NIO实现,实际上并不在linux目录中,而是在solaris目录,进入solaris目录如下:

003.png

solaris目录中还包含了LinuxOS、SunOS(SolarisOS/UnixOS)以及MacOS等操作系统下的Java-NIO实现,但关于MacOS下的Java-NIO完整实现,则位于前面的macosx目录中,这里仅包含一部分,结构如下:

004.png

观察上图会发现,solaris目录中包含了KQueue、EPoll、Poll、DevPollIO多路复用模型的Java实现,但关于Mac-KQueue的完整实现则在macosx目录。

OK~,到目前为止大家对于JDK源码的目录结构应该有了基本认知。

稍微总结一下,重点就是搞清楚两个位置:

  • jdk8-master\jdk\src\xxxOS\classes\sun\nio\ch:对应nio包下的Java代码。
  • jdk8-master\jdk\src\xxxOS\native\sun\nio\ch:对应nio包中native方法的JNI代码。

③搞清楚JDK源码目录的结构后,那以之前分析的Windows-NIO为例:

private native int poll0(long var1, int var3, int[] var4, 
                int[] var5, int[] var6, long var7);

对于poll0()这个本地方法,又该如何查找对应的源码呢?根据上述的源码结构,先去到\windows\native\sun\nio\ch目录中,然后找到与之对应的WindowsSelectorImpl.c文件,最终就能在该文件中定位到对应的JNI方法:Java_sun_nio_ch_WindowsSelectorImpl_00024SubSelector_poll0(名字略微有些长)。


④找到对应的JNI方法源码后,其中存在这么一行:

005.png

观察之后不难发现,其实最终还会调用到OS内核的提供的select()函数,所以poll0()实际上会依赖OS提供的多路复用函数实现相应的功能,对于其他操作系统而言,也是同理。

但是接下来只会重点叙述Linux下的三大IO多路复用函数:select、poll、epoll,而对于Windows-select、Mac-kqueue不会进行深入讲解(不是不想分析,而是由于Windows、Mac系统都属于闭源的,想分析也无法获取其具体的源码实现过程)。

三、文件描述符与自实现网络服务器

   到目前可得知:Java中的NIO最终会依赖于操作系统所提供的多路复用函数去实现,而Linux系统下对应的则是epoll模型,但epoll的前身则是select、poll,因此我们先分析select、poll多路复用函数,再分析其缺点,逐步引出epoll的由来,最终进一步对其进行全面剖析。

   相信大家在学习Linux时,都听说过“Linux本质上就是一个文件系统”这句话,在Linux-OS中,万事万物皆为文件,连网络连接也不例外,因此在分析多路复用模型之前,咱们首先对这些基础概念做一定了解。

3.1、文件描述符(FD)

   在上述中提到过:Linux的理念就是“一切皆文件”,在Linux中几乎所有资源都是以文件的形式呈现的。如磁盘的数据是文件,网络套接字是文件,系统配置项也是文件等等,所有的数据内容在Linux都是通过文件系统来管理的。
   既然所有的内容都是文件,那当我们要操作这些内容时,又该如何处理呢?为了方便系统执行,Linux都是通过文件描述符File Descriptor对文件进行操作,对于文件描述符这个概念可以通过一个例子来理解:

Object obj = new Object();

上述是Java创建对象的一行代码,类比Linux的文件系统,后面new Object()实例化出来的对象可以当成是具体的文件内容,而前面的引用obj则可理解为是文件描述符。Linux通过FD操作文件,其实本质上与Java中通过reference引用操作对象的过程无异。

而当出现网络套接字连接时,所有的网络连接都会以文件描述符的形式在内核中存在,也包括后面会提及的多路复用函数select、poll、epoll都会基于FD对网络连接进行操作,因此先阐明这点,作为后续分析的基础。

3.2、自己设计网络连接服务器

   在分析之前,我们先自己设想一下,如果有个需求:请自己设计一套网络连接系统,那么此时你会怎么做呢?此刻例如来了5个网络连接,如下:

006.png

那么又该如何处理这些请求呢?最简单的方式:

007.png

对于每个到来的网络连接都为其创建一条线程,每个连接由单独的线程负责处理,所以最初的BIO也是这样来的,由于设计起来非常简单,所以它成为了最初的网络IO模型,但这种方式的缺陷非常明显,在之前的BIO章节也曾分析过,无法支撑高并发的流量访问,因此这种多线程的方式去实现自然行不通了,兜兜转转又得回到单线程的角度去思考,单线程如何处理多个网络请求呢?最简单的方式,伪代码如下:

// 不断轮询监听所有的网络连接
while(true){
   
   
    // 遍历所有的网络套接字连接
    for(SocketFD xFD : FDS){
   
   
        // 判断网络连接中是否有数据
        if (xFD.data != null){
   
   
            // 从套接字中读取网络数据
            readData();
            // 将网络数据交给应用程序处理(写入对应的程序缓冲区)
            processingData();
            // ......
        }
    }
}

如上代码,当有网络连接到来时,将其加入FDS数组中,然后由单条线程不断的轮询监听所有网络套接字,如果套接字中有数据,则从中将网络数据读取出来,然后将读取到的网络数据交给应用程序处理。

这似乎是不是就通过单线程的方式解决了多个网络连接的问题?答案是Yes,但相较而言,性能自然不堪入目,如果内核是这样去处理网络连接,对于并发支持自然也上不去,那Linux内核具体是如何处理的呢?一起来看看。

四、多路复用函数 - select()

   在JDK1.8的源码中,刚刚似乎并未发现Selectxxx这系列的定义,这是由于Linux内核2.6之后的版本中,已经使用epoll代替了select,所以对应的JDK1.5之后版本,也将Linux-select的实现给移除了,所以如若想观测到Linux-select相关的实现,那还需先安装一个kernel-2.6以下的Linux系统,以及还需要下载JDK1.5的源码,这样才能分析完整的select实现。

我大致过了一下内核中的源码,对于select函数的实现大致在2000行左右,大致看下来后,由于对C语言没有那么熟悉,并且源码实现较长,因此后续不再以全源码链路的方式剖析,而是适当结合部分核心源码进行阐述。当然,如若你的C语言功底还算扎实,那可以下载《Linux2.6.28.6版本内核源码》解压调试。

先讲清楚接下来的分析思路,在后续分析IO多路复用函数时,大体会以调用入口 → 函数定义 → 核心结构体 → 核心源码 → 函数缺陷这个思路进行展开。

4.1、Java-select函数的JNI入口

   对于Open-JDK1.4、1.5的源码,由于年代较久远了,实在没有找到对应的JDK源码,所以在这里分析Linux-select函数时,就以前面分析的Windows-select思路举例说明,如下:

  • Java中通过调用选择器的select()方法监听客户端连接。
  • ②线程执行时,会执行到当前平台对应的选择器实现类的doSelect()方法。
  • ③接着会调用实现类对应的poll()轮询方法,最终在该方法中会调用其native方法。
  • ④当线程需要执行本地方法时,触发JNI调用,会在本地方法库中查找对应的C实现。
  • ⑤定位到native本地方法对应的C语言函数,然后执行对应的C代码。
  • ⑥在C代码的函数中,最终会发起系统调用,那假设此时系统调用的函数为select()

此时,对于Java是如何调用底层操作系统内核函数的过程就分析出来了,但是由于这里没有下载到对应版本的源码,因此无法通过源码进行演示,但就算没有对应的源码作为依据也无大碍,因为无论是什么类型的操作系统,也无论调用的是哪个多路复用函数,本质上入口都是相同的,只是JNI调用时会存在些许差异。

4.2、内核select函数的定义

   OK~,得知了Java-NIO执行的前因后果后,现在来聊一聊最初NIO会调用的系统函数:select,在Linux中的定义如下:

// 定义位于/sys/select.h文件中
int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

select函数定义中,存在五个参数,如下:

  • nfds:表示FDS中有效的FD数量,全部文件描述符的最大值+1
  • readfds:表示需要监控读事件发生的文件描述符集合。
  • writefds:表示需要监控写事件发生的文件描述符集合。
  • exceptfds:表示需要监控异常/错误发生的文件描述符集合。
  • timeout:表示select在没有事件触发的情况下,会阻塞的时间。

4.3、select结构体 - fd_set、timeval

   在上述中简单了解select的定义与参数后,大家可能会有些晕乎乎的,这是由于这五个参数中涉及到两组类型的定义,分别为fd_set、timeval,先来看看它们是如何定义的:

// 相关定义位于linux/types.h、linux/posix_types.h文件中
// -------linux/types.h----------
// 这里定义了一个__kerenl_fd_set的类型,别名为fd_set。
typedef __kerenl_fd_set fd_set;
省略其他.....

// -------linux/posix_types.h----------
/* 
  unsigned long表示无符号长整型,占4bytes/32bits
  sizeof()函数是求字节的长度,sizeof(unsigned long)=4
  因此最终这里的__NFDBITS=(8 * 4)=32
*/
#undef __NFDBITS
#define __NFDBITS (8 * sizeof(unsigned long))

// 这里限制了最大长度为1024(可修改,不推荐)
#undef __FD_SETSIZE
#define __FD_SETSIZE 1024

// 根据前面的__NFDBITS求出long数组的最大容量为:1024/32=32个
#undef __FD_SET_LONGS
#define __FD_SET_LONGS (__FD_SETSIZE/__NFDBITS)

// 这两组定义则是用于置位、复位(清除置位)的
#undef __FDELT
#define __FDELT(d) ((d) / __NFDBITS)
#undef __FDMASK
#define __FDMASK(d) (1UL << (d) % __NFDBITS)

// 这里定义了__kerenl_fd_set类型,本质上是一个long数组
typedef struct {
   
   
    unsigned long fds_bits [__FDSET_LONGS];
} __kerenl_fd_set;

观察上述源码,其实你会发现fd_set的定义是__kerenl_fd_set类型的,而__kerenl_fd_set的定义本质上就是一个long数组,同时在__kerenl_fd_set的定义中,也声明了最大长度为1024,相信了解过多路复用函数的小伙伴都知道select模型的最大缺陷之一就在于:最多只能监听1024个文件描述符,而对于具体是为什么,相信看到这个源码大家就彻底清楚了。

PS:首先基于上述的知识,已经得知最大长度为1024,但这1024并非代表着:数组可以拥有1024long元素,而是限制了这个long数组最多只能有1024个比特位的长度,也就是数组中最多能拥有1024/32=32个元素。对于这点,在源码中也有定义,大家可参考源码中的注释。

OK~,那这个long类型的数组究竟有什么作用呢?简单来说明一下,在这个fd_set的数组中,其实每个位对应着一个FD文件描述符的状态,0代表没有事件发生,1则代表有事件触发,如下图:

008.png

在这个数组中,所有的long元素,在计算机底层本质上都会被转换成bit存储,而每一个bit位都对应着一个FD,所以这个数组本质上就组成了一个位图结构,同时为了方便操作这个位图,在之前的sys/select.h文件中还提供了一组宏函数,如下:

// 位于/sys/select.h文件中
// 将一个fd_set数组所有位都置零
int FD_ZERO(int fd, fd_set *fdset);
// 将指定的某个位复位(赋零)
int FD_CLR(int fd, fd_set *fdset);
// 将指定的某个位置位(赋一)
int FD_SET(int fd, fd_set *fd_set);   
// 检测指定的某个位是否被置位
int FD_ISSET(int fd, fd_set *fdset);

// 这里则是上述宏函数的实现(位操作过程)
# define __FD_ZERO(set)  \
  do {                                          \
    unsigned int __i;                                  \
    fd_set *__arr = (set);                              \
    for (__i = 0; __i < sizeof (fd_set) / sizeof (__fd_mask); ++__i)          \
      __FDS_BITS (__arr)[__i] = 0;                          \
  } while (0)

#define __FD_SET(d, set) \
  ((void) (__FDS_BITS (set)[__FD_ELT (d)] |= __FD_MASK (d)))
#define __FD_CLR(d, set) \
  ((void) (__FDS_BITS (set)[__FD_ELT (d)] &= ~__FD_MASK (d)))
#define __FD_ISSET(d, set) \
  ((__FDS_BITS (set)[__FD_ELT (d)] & __FD_MASK (d)) != 0)

对于定义的几组宏函数,可以参考上述注释中的解释,而对于这些函数是如何实现的,大家可以自行阅读贴出的源码。接下来再看看timeval结构体是如何定义的:

struct timeval {
   
   
   long    tv_sec;         /* 秒 */
   long    tv_usec;        /* 毫秒 */
};

其实这个结构体就是一个阻塞的时间,好比select传入的timeout参数为3,则timeval.tv_sec=3、timeval.tv_usec=3000,代表调用select()没有获取到有效事件的情况下,在3s内会不断循环检测。当然,这个timeout的值会分为三种情况:

  • 0:表示调用select()函数后不等待,没有就绪事件时直接返回。
  • NULL:表示调用select()函数后无限等待,阻塞至出现中断信号或触发事件后返回。
  • 正数:表示调用select()函数后,在指定的时间内等待事件触发,超时则返回。

至此,对于select()函数所需参数中,涉及到的两个结构体已经弄明白了,那么再回来看看select()的五个参数。

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

调用select()时,中间的三个参数要求传入fd_set类型,它们分别对应着:那些文件描述符需要监听读事件发生、那些文件描述符需要监听写事件发生、那些文件描述符需要监听异常错误发生。当调用select()函数后会陷入阻塞,直到有描述符的事件就绪(有数据可读、可写或出现异常错误)或超时后才会返回。而select()函数返回也会存在三种状态:

  • 0:当描述符集合中没有事件触发,并且超出设置的时间后,会返回0
  • -1:当select执行过程中,出现异常/错误时则会返回-1
  • 正数:如果监视的文件描述符集合中有事件发生(有数据),则会对应的事件数量。

4.4、select()函数的使用案例

   在上述中已经对于select()函数的一些基础知识建立了认知,接下来上个伪代码感受一下select()函数的使用过程:

/* ----------①---------- */
// 创建服务端socket套接字,并监听客户端连接
serverSockfd = socket(AF_INET,SOCK_STREAM,0);
// 省略.....
bind(serverSockfd,IP,Port);
listen(serverSockfd,numfds);
// 这里是已经接收的客户端连接集合
fds[numfds] = accept(serverSockfd,.....);

/* ----------②---------- */
// 将所有的客户端连接,分别加入对应的位图中
FD_SET readfds, writefds, exceptfds;
int read_count = 0, write_count = 0, except_count = 0;
for (i = 0; i < numfds; i++) {
   
   
    if (fds[i].events == 读取事件){
   
   
        // 加入readfds
    }
    if (fds[i].events == 写入事件){
   
   
        // 加入writefds
    }
    // 省略.....
}

/* ----------③---------- */
// 求出最大的fds值
maxfds = ....;
struct timeval timevalue, *tv;
// 省略.....

/* ----------④---------- */
while(1){
   
   
    // 初始化位图
    FD_ZERO(readfds);
    FD_ZERO(writefds);
    FD_ZERO(exceptfds);
    // 分别对每个位图中需要监听的FD进行置位
    for (i = 0; i < numfds; i++) {
   
   
        if (fds[i].events == 读取事件){
   
   
        FD_SET(fds[i],&readfds);
    }
    // 省略其他置位处理.....
    }

    // 调用select函数
    int result = select(maxfds+1, &readfds, &writefds, &exceptfds, tv);

    /* ----------⑤---------- */
    if (result == 0){
   
   
    // 处理超时并返回....
    }
    if (result < 0){
   
   
    // 处理异常并返回....
    }

    /* ----------⑥、⑦---------- */
    // 能执行到这里,代表select()返回大于0
    for (i = 0; i < numfds; i++) {
   
   
    if(FD_ISSET(fds[i],&readfds)){
   
   
        // 读取被置位的socket.....
        read(fds[i], buffer,0,MAXBUF);
    }
    // 省略其他......
    }
}

上述的伪代码虽然看着较多,但本质上并不难,大体分为如下几步:

  • ①创建服务端的Socket套接字并绑定相关的地址,建立监听,等待客户端连接。
  • ②将所有的客户端连接,根据注册的事件,分别将其加入到对应的位图中。
  • ③求出文件描述符的最大值,并对于超时时间这个参数进行初始化构建。
  • ④对位图做置位,调用select()函数并传入的相关参数,等待内核处理完成。
  • ⑤根据内核的返回结果,进行对应处理,如超时处理、异常处理、事件处理等。
  • ⑥如果没有超时以及出现错误,那么则遍历判断那个FD有数据的(被置位)。
  • ⑦对于有事件发生的FD,根据其事件类型进行对应的处理(读、写数据)。

对于这个伪代码,其实也是调用select()函数的通用模型,以JavaJNI调用为例,其实大体的过程也是相同的,如下:

009.png

没有下载到JDK1.5的源码,所以以Windows-select的调用为例。

4.5、内核select函数核心源码

   在上述过程中,我们调用了select()函数实现了IO多路复用,但调用之后select()的执行过程,相对而言其实是未知,那么接着再来看看select()的核心源码,剖析一下调用select后,内核究竟会如何处理。

内核源码的执行流程:sys_select() → SYSCALL_DEFINE5() → core_sys_select() → do_select() → f_op->poll/tcp_poll()

所有的系统调用,都可以在它的名字前加上“sys_”前缀,这就是它在内核中对应的函数。比如系统调用open、read、write、select,与之对应的内核函数为:sys_open、sys_read、sys_write、sys_select,因此上述的sys_select()其实就是select()函数再内核中对应的函数。

接着来看看SYSCALL_DEFINE5()、core_sys_select()函数的内容:

// 位于fs/select.c文件中(sys_select函数)
SYSCALL_DEFINE5(select, int, n, fd_set __user *, inp, fd_set __user *, outp,
    fd_set __user *, exp, struct timeval __user *, tvp)
{
    struct timespec end_time, *to = NULL;
    struct timeval tv;
    int ret;
    // 判断是否传入了超时时间
    if (tvp) {
        if (copy_from_user(&tv, tvp, sizeof(tv))) 
            return -EFAULT;

        to = &end_time;
        // 如果已经到了超时时间,则中断执行并返回
        if (poll_select_set_timeout(to,
                tv.tv_sec + (tv.tv_usec / USEC_PER_SEC),
                (tv.tv_usec % USEC_PER_SEC) * NSEC_PER_USEC))
            return -EINVAL;
    }
    // 未超时或没有设置超时时间的情况下,调用core_sys_select
    ret = core_sys_select(n, inp, outp, exp, to);
    ret = poll_select_copy_remaining(&end_time, tvp, 1, ret);

    return ret;
}

// 位于fs/select.c文件中(core_sys_select函数)
int core_sys_select(int n, fd_set __user *inp, fd_set __user *outp,
               fd_set __user *exp, struct timespec *end_time)
{
    fd_set_bits fds;
    void *bits;
    int ret, max_fds;
    unsigned int size;
    struct fdtable *fdt;
    /* 由于涉及到了用户态和内核态的切换,因此将位图存储在栈上,
       (尽量提升状态切换时的效率,这里采用栈的方式存储) */
    long stack_fds[SELECT_STACK_ALLOC/sizeof(long)];

    ret = -EINVAL;
    if (n < 0)
        goto out_nofds;

    // 先计算出max_fds值
    rcu_read_lock();
    fdt = files_fdtable(current->files);
    max_fds = fdt->max_fds;
    rcu_read_unlock();
    if (n > max_fds)
        n = max_fds;

    // 根据前面计算的max_fds值,判断一下前面开栈空间是否足够
    // (在这里涉及到一个新的结构体:fd_set_bits,稍后详细分析)
    size = FDS_BYTES(n);
    bits = stack_fds;
    if (size > sizeof(stack_fds) / 6) {
        // 如果空间不够则调用内核的kmalloc为fd_set_bits分配更大的空间
        ret = -ENOMEM;
        bits = kmalloc(6 * size, GFP_KERNEL);
        if (!bits)
            goto out_nofds;
    }
    // 将fd_set_bits中六个位图指针指向分配好的内存位置
    fds.in      = bits;
    fds.out     = bits +   size;
    fds.ex      = bits + 2*size;
    fds.res_in  = bits + 3*size;
    fds.res_out = bits + 4*size;
    fds.res_ex  = bits + 5*size;

    // 将用户空间提交的三个fd_set拷贝到内核空间
    if ((ret = get_fd_set(n, inp, fds.in)) ||
        (ret = get_fd_set(n, outp, fds.out)) ||
        (ret = get_fd_set(n, exp, fds.ex)))
        goto out;
    zero_fd_set(n, fds.res_in);
    zero_fd_set(n, fds.res_out);
    zero_fd_set(n, fds.res_ex);

    // 调用select模型的核心函数do_select()
    ret = do_select(n, &fds, end_time);

    if (ret < 0)
        goto out;
    // 检测到有信号则系统调用退出,返回用户空间执行信号处理函数
    if (!ret) {
        ret = -ERESTARTNOHAND;
        if (signal_pending(current))
            goto out;
        ret = 0;
    }

    if (set_fd_set(n, inp, fds.res_in) ||
        set_fd_set(n, outp, fds.res_out) ||
        set_fd_set(n, exp, fds.res_ex))
        ret = -EFAULT;
// goto跳转的对应点
out:
    if (bits != stack_fds)
        kfree(bits);
out_nofds:
    return ret;
}

源码看过去,看起来有些多,对于C语言不太熟悉的小伙伴可能看的会一脸懵,但没关系,我们不去讲细了,重点理解其主干内容,上述源码分为如下几步:

  • ①先判断调用select()时,是否设置了超时时间:
    • 是:记录一下超时的时间点,并判断一下是否超时,超时则中断并返回。
    • 否:没有超时或没设置超时时间,则调用core_sys_select()函数。
  • ②计算出最大的文件描述符,然后采用开栈方式存储递交的参数值。
  • ③根据计算出的max_fds值,判断开栈空间能否可以存储递交的参数值:
    • 不能:调用内核的kmalloc分配器为fd_set_bits分配更大的空间(新分配的内存是在堆)。
    • 能:更改fd_set_bits中的指针指向,然后将递交的三个fd_set拷贝到内核空间。
  • ④上述工作全部已就绪后,调用select()函数中的核心函数:do_select()处理。

在上述过程中,理解起来并不复杂,唯一的疑惑点就在于多出了一个新的结构体:fd_set_bits,那它究竟是什么意思呢?先来看看它的定义:

typedef struct {
   
   
    unsigned long *in, *out, *ex;
    unsigned long *res_in, *res_out, *res_ex;
} fd_set_bits;

很明显,fd_set_bits是由六个元素组成的,这六个元素分别对应着六个位图,其中前三个则对应调用select()函数时递交的三个参数:readfds、writefds、exceptfds,而后三个则对应着select()执行完成之后返回的位图,为什么还需要有后面三个呢?

因为select()在遍历需要监听的文件描述符列表时,也需要三个对应的位图来记录哪些FD中是有数据的,因此也需要有三个位图对应着传入的三个位图,在select()执行完成后,如若有Socket中存在数据需要处理,那则会将这三个位图中对应的Socket位置进行置位,然后从内核空间再将其拷贝回用户空间,以供程序处理。

OK~,了解fd_set_bits结构后,对于core_sys_select函数中做的工作就自然理解了,一句话总结一下这个函数做的工作:

core_sys_select只不过是在为后面要调用的do_select()函数做准备工作而已。

当然,在上述的core_sys_select函数中还涉及到两个函数:get_fd_set()、set_fd_set(),其实现如下:

// 调用了copy_from_user()函数,也就是从用户空间拷贝数据到内核空间
static inline
int get_fd_set(unsigned long nr, void __user *ufdset, unsigned long *fdset)
{
   
   
    nr = FDS_BYTES(nr);
    if (ufdset)
        return copy_from_user(fdset, ufdset, nr) ? -EFAULT : 0;

    memset(fdset, 0, nr);
    return 0;
}

// 调用了__copy_to_user()函数,也就是将数据从内核空间拷贝回用户空间
static inline unsigned long __must_check
set_fd_set(unsigned long nr, void __user *ufdset, unsigned long *fdset)
{
   
   
    if (ufdset)
        return __copy_to_user(ufdset, fdset, FDS_BYTES(nr));
    return 0;
}

从最终调用的copy_from_user()、copy_to_user()两个函数中就能得知,这就是用于用户空间与内核空间之间数据拷贝的函数而已。

那么再来看看select()的核心函数do_select()吧,先上源码:

int do_select(int n, fd_set_bits *fds, struct timespec *end_time)
{
   
   
    ktime_t expire, *to = NULL;
    // -------- 核心结构:poll_wqueues -------------
    struct poll_wqueues table;
    poll_table *wait;
    int retval, i, timed_out = 0;
    unsigned long slack = 0;

    // 先获取一下最大的文件描述符
    rcu_read_lock();
    retval = max_select_fd(n, fds);
    rcu_read_unlock();
    // 如果获取到的值为负数,则返回select()执行过程中错误
    if (retval < 0)
        return retval;
    n = retval;

    // 初始化poll_wqueues结构体中的poll_table,并更改__pollwait的指针指向
    poll_initwait(&table);
    wait = &table.pt;

    // 如果系统调用select()函数时,设置的超时时间为0,
    // 那么赋值timed_out = 1,表示未获取到事件的情况下不阻塞,直接返回。
    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
   
   
        wait = NULL;
        timed_out = 1;
    }

    // 如果设置了超时时间,则预估一下还剩下多少时间
    if (end_time && !timed_out)
        slack = estimate_accuracy(end_time);

    retval = 0; // 这个是最终返回的值
    // 开启轮询,这里是核心!!!
    for (;;) {
   
   
        unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp;

        // 对于每个需要监听的fd,向其等待队列中注册后一个entry
        set_current_state(TASK_INTERRUPTIBLE);

        // 准备工作
        inp = fds->in; outp = fds->out; exp = fds->ex;
        rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex;
        for (i = 0; i < n; ++rinp, ++routp, ++rexp) {
   
   
            unsigned long in, out, ex, all_bits, bit = 1, mask, j;
            unsigned long res_in = 0, res_out = 0, res_ex = 0;
            const struct file_operations *f_op = NULL;
            struct file *file = NULL;

            // 做一次位或操作,对于并集为0的FD直接忽略
            // (在前面分析过,只有置位=1的,才代表这个FD需要被监听事件)
            in = *inp++; out = *outp++; ex = *exp++;
            all_bits = in | out | ex;
            if (all_bits == 0) {
   
   
                i += __NFDBITS;
                continue;
            }

            // 内层循环:开始对需要监听的FD进行扫描(核心中的核心!!)
            for (j = 0; j < __NFDBITS; ++j, ++i, bit <<= 1) {
   
   
                int fput_needed;
                if (i >= n)
                    break;
                if (!(bit & all_bits))
                    continue;
                file = fget_light(i, &fput_needed);
                // 这里是重点:主要做了f_op->poll这个操作(具体含义后面细聊)
                if (file) {
   
   
                    f_op = file->f_op;
                    mask = DEFAULT_POLLMASK;
                    // 检测对应的FD是否能够进行IO操作
                    if (f_op && f_op->poll)
                        // 会调用具体设备的poll()方法
                        mask = (*f_op->poll)(file, retval ? NULL : wait);
                    fput_light(file, fput_needed);

                    // 判断对应的文件描述符目前的状态
                    // 如果是可读状态,则将其res_in集合对应的坑位置1
                    if ((mask & POLLIN_SET) && (in & bit)) {
   
   
                        res_in |= bit;
                        retval++;
                    }
                    // 如果是可写状态,则将其res_out集合.......
                    if ((mask & POLLOUT_SET) && (out & bit)) {
   
   
                        res_out |= bit;
                        retval++;
                    }
                    if ((mask & POLLEX_SET) && (ex & bit)) {
   
   
                        res_ex |= bit;
                        retval++;
                    }
                }
            }
            // 对于监听到有数据的FD,赋值给之前要返回的位图中
            if (res_in)
                *rinp = res_in;
            if (res_out)
                *routp = res_out;
            if (res_ex)
                *rexp = res_ex;
            cond_resched();
        }
        // 如果扫描到了活跃FD、或出现超时、出现唤醒信号以及指向碰到错误
        // 中断循环扫描,返回到之前的core_sys_select()函数中
        // 如若是被唤醒或超时了,则会重新扫描一次所有FD
        wait = NULL;
        if (retval || timed_out || signal_pending(current))
            break;
        if (table.error) {
   
   
            retval = table.error;
            break;
        }

        // 第一次循环时,如果设置了超时时间,那么则将时间赋值给to指针
        if (end_time && !to) {
   
   
            expire = timespec_to_ktime(*end_time);
            to = &expire;
        }
        /* 未扫描到活跃的FD,则调用schedule_hrtimeout_range函数,
        函数作用:让当前程序进入睡眠,让出CPU资源,避免无效扫描浪费CPU,
        调用时传入了to,这是调用时指定的阻塞时间,超时则返回0,
        如果在睡眠过程中,被socket唤醒则返回-EINTR  */
        if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
            timed_out = 1; // 睡眠超时后置1,方便后面退出循环返回到上层
    }
    __set_current_state(TASK_RUNNING);

    // 清理各个驱动程序的等待队列头,
    // 同时释放所有空出来的poll_table_page页(包含的poll_table_entry)
    poll_freewait(&table);

    // 返回扫描到的活跃FD数量
    return retval;
}

对于源码的执行过程,在上面都已给出了相关注释,但看起来有些费力,我们稍后再去总结一遍,但在此之前我们需要先理解两个内容:活跃FD数、poll_wqueues结构体。

活跃FD数:表示有事件发生的文件描述符,比如一个网络套接字中有数据可读,那么这个Socket对应的FD则可记为一次活跃数。如果一个FD同时触发了两个事件,那么则会计算两次活跃数。

poll_wqueues结构体则属于do_select()函数中的一个核心结构,定义如下:

// 位于include/linux/poll.h文件中
struct poll_wqueues {
   
   
    // 驱动注册,回调函数__pollwait的指针
    poll_table pt;
    // 如果下面的inline_entries不够 就会需要
    struct poll_table_page * table;
    int error;
     // 记录下面的table使用过的下标
    int inline_index;
    // 对应下述的poll_table_entry结构
    struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES];
};

// 加入等待队列的节点
struct poll_table_entry {
   
   
    struct file * filp;
    wait_queue_t wait;
    wait_queue_head_t * wait_address;
};

// 回调函数的指针
typedef struct poll_table_struct {
   
   
    poll_queue_proc qproc;
} poll_table;

对于这个结构体而言,核心就在于其中的pt成员,它是poll_table类型的,不过想要理解它,那首先必须明白一个知识点:

当某个进程需要对一个IO设备(例如socket)进行读写时,如果发现此设备的数据暂且还未就绪,所以不能进行读写操作,当前进程就需要阻塞等待。为了实现阻塞进程,那每个socket/IO设备都有个等待队列,当进程需要阻塞等待数据时,就可以将该进程添加到对应的等待队列中进行休眠,当socket数据就绪后,再唤醒队列中的进程。

poll_table结构就是为了将进程添加到等待队列中而创造的,在上述源码中调用poll_initwait()函数后,就会将poll_wqueues中的poll_table成员的poll_queue.proc设置为__pollwait()回调函数,当后续执行到f_op->poll()时会调用poll_wait()函数,最终就会执行到这里设置的__pollwait()回调,这两个函数实现如下:

// 将当前进程添加到wait参数指定的等待列表(poll_table)中
poll_wait(struct file *filp, wait_queue_head_t *queue, poll_table *wait)
{
    if (p && wait_address)
        p->qproc(filp, wait_address, p);
}

// 设置唤醒回调函数为pollwake函数,并将poll_table_entry.wait加入等待队列
static void __pollwait(struct file *filp, wait_queue_head_t *wait_address,
                poll_table *p)
{
    struct poll_wqueues *pwq = container_of(p, struct poll_wqueues, pt);
    struct poll_table_entry *entry = poll_get_entry(p);
    if (!entry)
        return;
    get_file(filp);
    entry->filp = filp;
    // 设置等待队列头
    entry->wait_address = wait_address;
    // 设置关注的事件
    entry->key = p->key;
    // 设置等待队列节点的回调函数为pollwake()
    init_waitqueue_func_entry(&entry->wait, pollwake);
    // 私有数据 poll_wqueues
    entry->wait.private = pwq;
    // 将 poll_table_entry 添加到对应的等待队列上
    add_wait_queue(wait_address, &entry->wait);
}

OK~,到这里看的可能会有些懵,因为这是跟后续的唤醒动作有关的,待会儿结合具体的设备驱动一起来理解,现在咱们重点先分析一下do_select()函数的核心过程:

  • ①准备阶段:获取最大文件描述符值、设置阻塞回调、处理超时时间等。
  • ②开启轮询,将不需要监听的FD忽略,需要监听的FD都向其等待队列注册一个entry
  • ③开启循环将所有需要监听的FD全部扫描一遍,判断FD对应的设备是否有数据可读写:
    • 有:直接跳到步骤⑤。
    • 没有:内核调用schedule让当前进程睡眠xx秒,让出cpu进入阻塞。
  • ④如果有FD主动唤醒了当前进程,或xx秒后自己醒了,再次跳回步骤③。
  • ⑤如果从文件描述符集合中扫描到了有数据可读写的FD,记录相应的活跃个数。
  • ⑥将就绪事件结果保存在fdsres_in、res_out、res_ex集合中,然后调用poll_freewait()函数移除各个驱动程序的等待队列头,最后返回对应的活跃FD数。

do_select()函数的核心流程总结给出来了,其实粗略理解起来也不难,唯一有些绕的估计就是进程阻塞/唤醒这块的内容,下面重点来说一下这块。

do_select()中,扫描FD时有一个核心操作:
mask = (*f_op->poll)(file, retval ? NULL : wait);
在这步操作中,会调用文件描述符对应设备的poll检测当前是否能够进行IO操作,那么对于网络Socket套接字而言,调用poll之后,对应的接口就是sock_poll(),其定义位于net/ipv4/,如下:

static unsigned int sock_poll(struct file *file, poll_table * wait)
{
   
   
    struct socket *sock;

    sock = socki_lookup(file->f_dentry->d_inode);
    return sock->ops->poll(file, sock, wait);
}

实现很简单,首先会通过socki_lookup()函数将文件描述符转换为具体的Socket套接字,然后会调用该socket.poll()函数,例如这里的套接字是TCP类型的,那么对应的实现就是tcp_poll()函数:

// 位于net/ipv4/tcp/目录下
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
{
   
   
    unsigned int mask;
    struct sock *sk = sock->sk;
    struct tcp_sock *tp = tcp_sk(sk);

    poll_wait(file, sk->sk_sleep, wait);
    if (sk->sk_state == TCP_LISTEN)
        return inet_csk_listen_poll(sk);
    // 用mask来记录socket数据是否可被读写
    mask = 0;
    // 开始进行判断
    if (sk->sk_err)
        mask = POLLERR;
    if (sk->sk_shutdown == SHUTDOWN_MASK || sk->sk_state == TCP_CLOSE)
        mask |= POLLHUP;
    if (sk->sk_shutdown & RCV_SHUTDOWN)
        mask |= POLLIN | POLLRDNORM | POLLRDHUP;
    if ((1 << sk->sk_state) & ~(TCPF_SYN_SENT | TCPF_SYN_RECV)) {
   
   
        int target = sock_rcvlowat(sk, 0, INT_MAX);

        if (tp->urg_seq == tp->copied_seq &&
            !sock_flag(sk, SOCK_URGINLINE) &&
            tp->urg_data)
            target--;
        if (tp->rcv_nxt - tp->copied_seq >= target)
            mask |= POLLIN | POLLRDNORM;

        if (!(sk->sk_shutdown & SEND_SHUTDOWN)) {
   
   
            if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk)) {
   
   
                mask |= POLLOUT | POLLWRNORM;
            } else {
   
     /* send SIGIO later */
                set_bit(SOCK_ASYNC_NOSPACE,
                    &sk->sk_socket->flags);
                set_bit(SOCK_NOSPACE, &sk->sk_socket->flags);

                /* Race breaker. If space is freed after
                 * wspace test but before the flags are set,
                 * IO signal will be lost.
                 */
                if (sk_stream_wspace(sk) >= sk_stream_min_wspace(sk))
                    mask |= POLLOUT | POLLWRNORM;
            }
        }
        if (tp->urg_data & TCP_URG_VALID)
            mask |= POLLPRI;
    }
    // 最终返回当前socket是否可被读写
    return mask;
}

在这个函数中,首先会调用poll_wait()函数将当前进程添加到wait等待列表中,然后检测socket目前数据是否可以被读写,最终通过mask变量来记录当前套接字的数据是否可被读写,如果可读写会将对应的FD记录为活跃状态。如若不可读写则会先返回,然后等当前进程遍历完所有FD后,所有的FD都不能进行I/O操作的情况下,当前进程则会进入休眠阻塞状态。

如果进程陷入休眠阻塞状态后,它被再次唤醒只有两种情况:
①为进程设置的休眠时间到了自己醒来。
②由对应的驱动设备主动唤醒。

第一种情况都懂就不聊了,重点来说说第二种,这种唤醒则是由I/O设备决定的,之前分析__pollwait函数时,在最后调用了add_wait_queue(wait_address, &entry->wait)函数,在对应的等待队列上插入了一个entry,那当I/O设备的数据就绪后,就会去遍历等待队列找到这个entry,然后会调用设置好的pollwake()回调函数唤醒对应的进程。此时由于数据已经准备好了,所以当select被唤醒后,自然就能扫描到对应的FD变为了可读写状态,然后返回给用户态的程序。

当然,对于唤醒这块的具体实现位于/sys/wait.h、wait.c文件中,感兴趣的可自行研究。

至此,select()函数被调用后,在内核具体是如何工作的,整个源码流程也就大致分析清楚了,现在咱们会简单总结一下,梳理清楚完整流程。

4.6、select底层原理小结

   在经过上述一系列分析后,我们大致摸透了select()运行的底层原理,但估摸着大家看下来都有一点云里雾里的感觉,因此再简单的写一个完整流程的总结:

  • ①外部调用select()函数,传入最大文件描述符值、三个FD集合以及超时时间。
  • ②用六个位图组成的fd_set_bits结构存储传入的FD集合,用kmalloc为其分配栈空间。
  • ③将用户态传递的fd_set拷贝到内核空间,紧接着调用do_select()函数。
  • ④获取传入的最大文件描述符值、设置阻塞回调函数、处理超时时间等。
  • ⑤开启轮询,将不需要监听的FD忽略,需要监听的FD都向其等待队列注册一个entry
  • ⑥开启循环将所有需要监听的FD全部扫描一遍,判断FD对应的设备是否有数据可读写:
    • 有:直接跳到步骤⑧。
    • 没有:内核调用schedule让当前进程睡眠xx秒,让出cpu进入阻塞。
  • ⑦如果有FD主动唤醒了当前进程,或xx秒后自己醒了,再次跳回步骤⑥。
  • ⑧如果从文件描述符集合中扫描到了有数据可读写的FD,记录相应的活跃个数。
  • ⑨将就绪事件结果保存在fdsres_in、res_out、res_ex集合中,然后调用poll_freewait()函数移除各个驱动程序的等待队列头,最后返回对应的活跃FD数。
  • ⑩将扫描到的FD从内核拷贝会用户态空间,同时向程序返回已触发的事件数。

其实整个流程下来,select分析的内容颇多,这是因为它也是后续两个函数的基础,把它的过程弄明白了,在分析后面的函数时,过程也是换汤不换药的,步骤都大致相同。

4.7、select的缺点分析与思考

   详细了解了select()函数后,再来想想它有哪些不足的地方呢?

①由32long元素组成的fd_set,最大只能表示1024位,因此最多只能监听1024socket,所以对于高并发的I/O场景很难提供支持。

②因为监听FD的工作是内核完成的,所以每次调用select()时,都需将FD集合从用户态拷贝到内核态空间,这个过程开销会较大。

③当监听的FD集合中,某个Socket上有数据可读写后,会唤醒陷入睡眠的select,但select醒来后也不知道那个FD有数据,因此会重新将整个集合遍历一次,造成了很大程度上的浪费。

④每次调用select函数时,由于需要监听的文件描述符不同,所以需要构建新的fd_set集合,也就是上一次使用过的fd_set不可被重用,造成较大的资源开销。

上述四点,则是select多路复用模型的四个致命缺陷,由于这些原因导致它并不适合于一些高性能的场景,因此才有后续的poll、epoll等模型出现。

但在分析其他两个函数之前,再思考一个问题,假设此时CPU正在处理一个IO数据,但此刻另外一个Socket上也来了数据,那么这个数据会被丢弃吗?

答案是不会的,因为有专门用于处理I/O数据的硬件:DMA控制器以及网卡,在网络连接到来时,如果CPU正在处理另外一条网络连接的数据,新连接的网络数据并不会被丢弃,而是会由网卡将数据接收并放入内核缓冲区。同理,如果是本地IO,则会由DMA控制器处理。

五、多路复用函数 - poll()

   poll函数则是基于select函数创造出来的,其实它和select的区别不大,唯一一点区别就在于:核心结构不同了,在poll中出现了一种新的结构体pollfd,它不存在最大数量的限制。但其实poll的性能与select差距是不大的,因此可以将poll理解成增强版select

5.1、poll()函数的定义

poll的定义也和select相差不大,准确来说,所有的多路复用函数定义都差不多,如下:

int poll(struct pollfd* fds, nfds_t nfds, int timeout);

相较之前的select函数,poll的入参少了两个,这是因为在其中将结构体优化成了pollfd,关于这点待会儿聊。先看看三个入参:

  • fds:这是由pollfd组成的数组,数组每个元素表示要监听的文件描述符及相应的事件。
  • nfds:这里表示数组中一共传了多少个元素。
  • timeout:这个参数很好理解,表示未获取到就绪事件时,允许的等待时间。

然后再看看这个函数的返回,也是一个int值,和select函数的返回值含义相同,也包括后续会分析的epoll,返回值也是int类型,其含义也都一样。

5.2、poll()的核心结构体:pollfd

紧接着再来看看poll的结构体:pollfd,其定义如下:

struct pollfd{
   
   
  int fd;
  short events;
  short revents;
};

这个结构体中有三个成员,分别对应着:文件描述符、需要监听的事件以及触发的事件,其中fd、events是在调用时就需传入的,而revents则是由内核监听到事件触发后填充的。

5.3、poll()底层源码分析

   了解了pollfd结构之后,对于poll()的使用方式和select大致相同,所以不再举例说明,接下来再看看poll()的源码过程,其实过程也大致与select相似,并且其实现也同样位于select.c文件中,执行流程如下:

sys_poll → SYSCALL_DEFINE3 → do_sys_poll → do_poll → f_op->poll

整个过程中,最核心的就是do_poll()函数,但先来看看前面的函数实现:

// 位于fs/select.c文件中(sys_select函数)
SYSCALL_DEFINE3(poll, struct pollfd __user *, ufds, unsigend int, 
    nfds, long, timeout_msesc)
{
   
   
    struct timespec end_time, *to = NULL;
    int ret;
    // 判断是否传入了超时时间,如果传入了则进行相应的超时处理
    if (timeout_msesc > 0) {
   
   
        to = &end_time;
        poll_select_timeout(to, timeout_msesc / MSEC_PER_SEC,
            NSEC_PER_MSEC * (timeout_msecs % MSEC_PER_SEC));
    }

    // 调用最为核心的 do_sys_poll()函数
    ret = do_sys_poll(n, inp, outp, exp, to);

    if (ret == -EINTR) {
   
   
        struct restart_block *restart_block;

        restart_block = &current_thread_info()->restart_block;
        restart_block->fn = do_restart_poll;
        restart_block->poll.ufds = ufds;
        restart_block->poll.nfds = nfds;

        if (timeout_msesc >= 0) {
   
   
            restart_block->poll.tv_sec = end_time.tv_sec;
            restart_block->poll.tv_nsec = end_time.tv_nsec;
            restart_block->poll.has_timeout = 1;
        } else 
            restart_block->poll.has_timeout = 0;

        ret = -ERESTART_REESTARTBLOCK;
    }
    return ret;
}

这个函数仅是过渡的作用,稍微做了一些辅助工作,然后就直接调用了do_sys_poll,那么再来看看这个,源码实现如下:

int do_sys_poll(struct pollfd __user *ufds, unsigned int nfds,
        struct timespec *end_time)
{
   
      
    // 这里依旧用到了poll_wqueues结构
    struct poll_wqueues table;
     int err = -EFAULT, fdcount, len, size;
    // 这里和select相同,采用栈方式存储用户态传递的数据
    // 同时在这里是基于long做了对齐填充的,能够充分利用局部性原理
    long stack_pps[POLL_STACK_ALLOC/sizeof(long)];
    // poll_list是新的结构体:本质上是一个单向链表
    struct poll_list *const head = (struct poll_list *)stack_pps;
     struct poll_list *walk = head; // 定义链表头结点
     unsigned long todo = nfds;

    if (nfds > current->signal->rlim[RLIMIT_NOFILE].rlim_cur)
        return -EINVAL;

    len = min_t(unsigned int, nfds, N_STACK_PPS);
    // 将用户空间传入的所有FD,以链表的形式填充在poll_list中
    for (;;) {
   
   
        walk->next = NULL;
        walk->len = len;
        if (!len)
            break;
        // 将用户态传递的数据拷贝到内核空间
        if (copy_from_user(walk->entries, ufds + nfds-todo,
                    sizeof(struct pollfd) * walk->len))
            goto out_fds;

        todo -= walk->len;
        if (!todo)
            break;

        // 在这里如若空间不够,也会调用kmalloc分配更大的空间
        len = min(todo, POLLFD_PER_PAGE);
        size = sizeof(struct poll_list) + sizeof(struct pollfd) * len;
        walk = walk->next = kmalloc(size, GFP_KERNEL);
        if (!walk) {
   
   
            err = -ENOMEM;
            goto out_fds;
        }
    }
    // 这里依旧调用了poll_initwait函数做了初始化工作
    poll_initwait(&table);
    // 然后这里是重点:调用了do_poll()函数对FD做监听
    fdcount = do_poll(nfds, head, &table, end_time);
    // 善后工作:清空各设备等待队列上的节点信息
    poll_freewait(&table);

    for (walk = head; walk; walk = walk->next) {
   
   
        struct pollfd *fds = walk->entries;
        int j;
        // 这是最后的工作:将监听到的事件填充到revents中,
        // 然后通过__put_user写回用户态空间,最后利用goto跳转返回
        for (j = 0; j < walk->len; j++, ufds++)
            if (__put_user(fds[j].revents, &ufds->revents))
                goto out_fds;
      }

    err = fdcount;
out_fds:
    walk = head->next;
    while (walk) {
   
   
        struct poll_list *pos = walk;
        walk = walk->next;
        kfree(pos);
    }

    return err;
}

在这里面,其实就是将之前的do_select()函数的工作拆开了,拆为了do_sys_poll、do_poll两个函数实现,其他过程大致与do_select函数相同,不同点在于这里面又出现了一个新的结构体:poll_list,定义如下:

struct poll_list {
   
   
    struct poll_list *next;
    int len;
    struct pollfd entries[0];
};

从上述结构体的成员很明显就可看出,这是一个典型的单向的数组链表结构,第一个成员代表下一个节点(数组链表)是谁,第二个成员代表后面可变长数组的元素数量,第三个成员则是一个变长数组,里面存放当前这段内存上的pollfd

对于这个变长数组大家会存在些许疑惑,明明上面定义的长度为[0],为何可以变长呢?这是利用到了C语言里的数组拓展技术,感兴趣的可点击>>这里<<详细了解。

同时,对于“数组链表结构”大家可能有些许疑惑,链表、数组这是两个结构,为何会被组合在一块呢?这是由于poll中,会先采用栈上分配的方式存储pollfd,但是当用户态传入的pollfd过多时,栈上内存可能不太够用,因此就会调用kmalloc分配新的内存,而前面分析select时提过:kmalloc分配的新空间是基于堆内存的,所以此时poll就会同时使用多块内存,示意图如下:

010.png

也就是说:如果栈上能存储用户空间传递的pollfd,那么只会出现一个poll_list在栈上,如果存储不下则会有多个,除开第一个数组外,其他的都在堆上,因此poll_list结构中的指针会指向另外一个数组。

OK~,弄明白了poll_list结构体后,对于do_sys_poll函数的执行流程就不再重复了,大家可参考我源码中给出的备注,下面直入主题,一起来看看do_poll()函数会做什么工作:

static int do_poll(unsigned int nfds,  struct poll_list *list,
           struct poll_wqueues *wait, struct timespec *end_time)
{
   
      
    // 在这里会注册等待阻塞时的回调函数
    poll_table* pt = &wait->pt;
    ktime_t expire, *to = NULL;
    int timed_out = 0, count = 0;
    unsigned long slack = 0;

    // 处理超时时间
    if (end_time && !end_time->tv_sec && !end_time->tv_nsec) {
   
   
        pt = NULL;
        timed_out = 1;
    }
    if (end_time && !timed_out)
        slack = estimate_accuracy(end_time);

    // 开启轮询:一直监听所有的pollfd
    for (;;) {
   
   
        struct poll_list *walk;

        set_current_state(TASK_INTERRUPTIBLE);
        // 外层循环:遍历所有的poll_list
        for (walk = list; walk != NULL; walk = walk->next) {
   
   
            struct pollfd * pfd, * pfd_end;
            pfd = walk->entries;
            pfd_end = pfd + walk->len;
            // 内存循环:遍历poll_list.entries数组中的所有pollfd
            for (; pfd != pfd_end; pfd++) {
   
   
                // 对于每个pollfd对应的驱动的poll()
                if (do_pollfd(pfd, pt)) {
   
   
                    // 返回值不为0,表示当前FD有数据可读写,count++
                    count++;
                    pt = NULL;
                }
            }
        }
        // 这里是防止下次循环时再次注册等待队列
        pt = NULL;
        if (!count) {
   
   
            count = wait->error;
            if (signal_pending(current))
                count = -EINTR;
        }
        if (count || timed_out)
            break;

        // 在第一次循环时,如果设置了超时时间,那么做一次转换
        if (end_time && !to) {
   
   
            expire = timespec_to_ktime(*end_time);
            to = &expire;
        }

        // 如若没有FD出现读写事件,则让当前进程陷入睡眠阻塞状态
        if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))
            timed_out = 1;
    }
    __set_current_state(TASK_RUNNING);
    // 最终返回扫描出的活跃FD数量
    return count;
}

其实在这个过程中,无非就是开启轮询对之前的poll_list进行遍历,然后会对每个pollfd调用do_pollfd函数,就是检测每个FD上数据是否可读写,如果所有pollfd都遍历完成后,依旧没有发现可读写的FD,则让当前进程睡眠阻塞,由于在函数最开始也设置了回调函数,因此当某个FD数据准备就绪后,会由对应的驱动程序唤醒poll。最后再把do_pollfd函数的实现放出来:

static inline unsigned int do_pollfd(struct pollfd *pollfd, poll_table *pwait)
{
   
   
    // 定义了一个mask接收FD的可读写状态
    unsigned int mask;
    int fd;

    mask = 0;
    fd = pollfd->fd;
    if (fd >= 0) {
   
   
        int fput_needed;
        struct file * file;

        file = fget_light(fd, &fput_needed);
        mask = POLLNVAL;
        if (file != NULL) {
   
   
            mask = DEFAULT_POLLMASK;
            // 在这里又再次对每个FD的poll进行了调用
            if (file->f_op && file->f_op->poll)
                // 这行代码与之前select函数的相同
                mask = file->f_op->poll(file, pwait);
            /* Mask out unneeded events. */
            mask &= pollfd->events | POLLERR | POLLHUP;
            fput_light(file, fput_needed);
        }
    }
    pollfd->revents = mask;
    return mask;
}

这个函数的工作不出所料,的确就是对每个FD进行了询问:“你的数据可不可以让我进行读写操作呀”?剩下的工作与select函数后面的过程相同,因此不再继续分析,想要加深印象的再跳回select最后那段分析即可。

5.4、poll()总结与劣势浅谈

   到目前为止,关于多路复用模型中的poll()函数也分析明白了,其实有了select函数的基础后,对于poll而言,看起来相信应该是十分轻松的。当然,由于poll()函数的实现和select大致是相同的,因此也不再花费时间去对它进行总结。

相较于select而言,由于poll内部是基于数组链表构建的,所以没有select位图的限制,也就解决了select中最多只能监听1024个连接的缺陷。同时由于内核返回监听到的事件时,是通过pollfd.revents进行传递的,因此pollfd是可以被重用的,在下次使用时将pollfd.revents置零即可。但对于其他两点缺陷,在poll中也依旧存在,不过在epoll中却得到了解决,所以接下来重点分析epoll实现。

六、多路复用函数 - epoll()

   epoll也是IO多路复用模型中最重要的函数,几乎目前绝大部分的高性能框架,都是基于它构建的,例如Nginx、Redis、Netty等,所以对于掌握epoll知识的必要性显得越发重要。因为在你理解epoll之前,你只知道这些技术栈性能很高,但不清楚为什么,而你理解epoll之后,对于这些技术栈性能高的原因也就自然就懂了,那接下来我们一起聊聊epoll

   epoll与之前的select、poll函数不同,它整个过程由epoll_create、epoll_ctl、epoll_wait三个函数组成,同时最关键的点也在于:epoll直接在内核中维护着一个FD集合,外部不再需要将整个要监听的FD集合拷贝到内核了,而是调用epoll_ctl函数进行管理即可。

对于Java-NIO中的JNI入口,和之前分析的思路相同,因此就不再进行演示,感兴趣的自己根据执行流程,打开相应的目录文件就可以看到。

接着重点看看epoll系列的函数定义。

6.1、epoll函数定义

   刚刚聊到过,epoll存在三个函数,它们的定义都位于sys/epoll.h文件中,那么接下来一个个瞧瞧,先看看epoll_create

int epoll_create(int size);
int epoll_create1(int flags);

你没看错,create函数其实有两个,但对于入参为size的函数在很早之前就被弃用了,因此一般调用create函数都是在调用epoll_create1(),这个函数的作用是申请内核创建一个epollfd文件,同时申请一个eventpoll结构体(稍后讲),然后返回epollfd对应的文件描述符。最后再聊聊它的入参:

  • size:代表指定内核中维护的FD集合长度,2.6.8版本之后成为了动态集合,被弃用。
  • flags:这个参数主要有两个传递值:
    • 0:正常创建epollfd传入的值。
    • EPOLL_CLOEXEC:当fork子进程时,子进程不会包含epollfd(多进程epoll时使用)。

了解了create函数后,再来看看epoll_ctl函数的定义:

int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event);

先来说说ctl函数的作用吧,这个函数主要就是对于内核维护的epollfd集合进行增删改操作,参数释义如下:

  • epfd:表示指定要操作的epollfd
  • op:表示当前要进行的操作,选项如下:
    • EPOLL_CTL_ADD:注册操作,代表要往内核维护的集合中新增一个epollfd
    • EPOLL_CTL_MOD:修改操作,代表要更改某个epollfd所对应的事件。
    • EPOLL_CTL_DEL:删除操作,代表要哦承诺内核的集合中移除一个epollfd
  • fd:表示epollfd对应的文件描述符。
  • event:表示当前描述符的事件队列。

最后看看epoll_wait函数的定义:

int epoll_wait(int epfd, struct epoll_event* evlist, int maxevents, int timeout);

这个函数的作用就类似于之前的select、poll,调用之后会阻塞等待至I/O事件发生,参数释义如下:

  • epfd:表示一个等待事件发生的epollfd
  • evlist:这里用于接收内核已监听到的事件集合。
  • maxevents:指上述集合出现就绪事件时,一次能够拷贝的最大长度。
    • 如果上述集合中的就绪事件小于该值,则一次性全部拷贝过来。
    • 如果上述集合中的就绪事件大于该值,则一次最多拷贝maxevents个事件。
  • timeout:这个参数和之前的select、poll相同,指定超时时间。

OK,简单了解三个函数后,大家需牢记的一点是:这三个函数都是配套使用的,遵循上述的顺序,以create、ctl、wait这种方式依次进行调用,然后就能对一或多个文件描述符进行监听。当然,对于调用后究竟发生了什么?我们接下来通过源码的方式去揭开面纱。

6.2、epoll的核心结构体

   在epoll中存在两个核心结构体:epoll_event、eventpoll,这两个结构体贯穿了epoll整个流程,这里先简单看看它们的定义:

struct epoll_event
{
   
   
  // epoll注册的事件
  uint32_t events; 
  // 这个可以理解成epoll要监听的FD详细结构体
  epoll_data_t data;
} __attribute__ ((__packed__));

// 上述data成员的结构定义
typedef union epoll_data
{
   
   
    // 自定义的附带信息,一般传事件的回调函数,当事件发生时,
    // 通过回调函数将事件添加到list上(Java-Linux-AIO的实现原型)
    void *ptr;
    // 要监听的描述符对应
    int fd;
    uint32_t u32;
    uint64_t u64;
} epoll_data_t;

上述的epoll_event简单来说,可以将其理解成由“文件描述符-需要监听的事件”组成的键值对结构,其中data是文件描述符的详细结构(可以理解成对应着一个FD),而events则代表着该FD需要监听的事件,这些事件是在调用epoll_ctl函数时,由用户态程序指定的,主要有下述一些事件项:

  • EPOLLIN:表示文件描述符可读。
  • EPOLLOUT:表示文件描述符可写。
  • EPOLLPRI:表示文件描述符有带外数据可读。
  • EPOLLERR:表示文件描述符发生错误。
  • EPOLLHUP:表示文件描述符被挂断。
  • EPOLLET:将 EPOLL 设为边缘触发(Edge Trigger)模式(后续分析)。
  • EPOLLONESHOT:表示对这个文件描述符只监听一次事件。

简单有个概念之后,再来看看另外一个核心结构体eventpoll

struct eventpoll {
   
   
    // 这个是一把自旋锁(多线程Epoll时使用)
    spinlock_t lock;
    // 这个是一把互斥锁(多线程Epoll时使用)
    // 添加、修改、删除、监听、返回时都会使用这把锁确保线程安全
    struct mutex mtx;
    // 调用epoll_wait()时, 会在这个等待队列上休眠阻塞
    wait_queue_head_t wq;
    // 这个是用于epollfd本身被poll时使用(一般用不上)
    wait_queue_head_t poll_wait;
    // 存储所有I/O事件已经就绪的FD链表
    struct list_head rdllist;
    // 红黑树结构:存放所有需要监听的节点
    struct rb_root rbr;
    // 一个连接着所有树节点的单向链表
    struct epitem *ovflist;
    // 这里保存一些用户变量, 如fd监听数量的最大值等
       struct user_struct *user;
};

struct epitem {
   
   
    // 红黑树节点(red_black_node的缩写)
    struct rb_node rbn;
    // 链表节点,方便存储到eventpoll.rdllist中
    struct list_head rdllink;
    // 下一个节点指针
    struct epitem *next;
    // 当前epitem对应的fd
    struct epoll_filefd ffd;
    // 这两个不太懂,似乎跟等待队列有关
    int nwait;
    struct list_head pwqlist;
    // 当前epitem属于那个eventpoll
    struct eventpoll *ep;
    // 链表头
    struct list_head fllink;
    // 当前epitem对应的事件(FD需要监听的事件)
    struct epoll_event event;
};

在前面提到过,epoll舍弃了select、poll函数中的思想,不再从用户态全量拷贝FD集合到内核,而是自己在内核中维护了一个FD集合,而对于FD的管理则是基于eventpoll结构实现的,eventpoll主要负责管理epoll事件,在其内部主要有三个成员需要咱们重点关注:

  • list_head rdllist:存放所有I/O事件已就绪的列表。
  • rb_root rbr:用于存放注册时epollfd描述符的红黑树结构。
  • wait_queue_head_t wq:休眠阻塞时的等待队列。

当然,对于这个结构体在后续源码中会经常看到,因此稍后会结合源码理解。

6.3、Epoll源码深度历险

   整个Epoll机制由于是三个函数组成的,因此调试源码时则需要依次调试,我们依旧按照epoll的调用顺序对其源码进行剖析。

6.3.1、epoll_create()函数源码分析

epoll_create()函数对应的系统调用为SYSCALL_DEFINE1(),展开后则对应着内核的sys_epoll_create函数,如下:

SYSCALL_DEFINE1(epoll_create, int, size)
{
   
   
    if (size <= 0)
        return -EINVAL;
    // 直接调用了create1函数
    return sys_epoll_create1(0);
}

从上述这点即可看出,为何说size入参实际上在后续的版本被弃用了,因为无论传入的size等于多少,本质上只会判断一下是否小于0,然后就调用了create1()函数,入参则被写死为0了。接着来看看sys_epoll_create1()

SYSCALL_DEFINE1(epoll_create1, int, flags)
{
    int error;
    struct eventpoll *ep = NULL;//主描述符
    // 检查一下常量一致性(没啥用)
    BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
    // 判断一下flags是否传递了CLOEXEC
    if (flags & ~EPOLL_CLOEXEC)
        return -EINVAL;
    // 创建一个eventpoll并为其分配空间,分配出错则直接返回执行错误
    error = ep_alloc(&ep);
    if (error < 0)
        return error;
    // 这里是创建一个匿名真实的FD并与eventpoll关联(稍后细聊)
    error = anon_inode_getfd("[eventpoll]", &eventpoll_fops, ep,
                 O_RDWR | (flags & O_CLOEXEC));
    // 如果前面匿名FD创建失败,释放之前为ep分配的空间
    if (error < 0)
        ep_free(ep);
    // 返回匿名FD或错误码
    return error;
}

static int ep_alloc(struct eventpoll **pep)
{
    int error;
    struct user_struct *user;
    struct eventpoll *ep;
    // 调用kzalloc为ep分配空间
    user = get_current_user();
    error = -ENOMEM;
    ep = kzalloc(sizeof(*ep), GFP_KERNEL);
    if (unlikely(!ep))
        goto free_uid;
    // 对ep的每个成员进行初始化
    spin_lock_init(&ep->lock);
    mutex_init(&ep->mtx);
    init_waitqueue_head(&ep->wq);
    init_waitqueue_head(&ep->poll_wait);
    INIT_LIST_HEAD(&ep->rdllist);
    ep->rbr = RB_ROOT;
    ep->ovflist = EP_UNACTIVE_PTR;
    ep->user = user;

    *pep = ep;
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_alloc() ep=%p\n",
             current, ep));
    return 0;
// 分配空间失败时,清空之前的初始化值,返回错误码
free_uid:
    free_uid(user);
    return error;
}

sys_epoll_create1()源码也并不复杂,总共就两步:

  • ①调用ep_alloc()函数创建并初始化一个eventpoll对象。
  • ②调用anon_inode_getfd()函数把eventpoll对象映射到一个FD上,并返回这个FD

不过对于第二步,这玩意儿说起来也比较复杂,想深入研究的可以看看 Linux创建匿名FD 的知识,我们这里就简单的概述一下:

由于epollfd本身在操作系统上并不存在真正的文件与之对应,所以内核需要为其分配一个真正的struct file结构,并且能够具备真正的FD,然后前面创建出的eventpoll对象则会作为一个私有数据保存在file.private_data指针上。这样做的目的在于:为了能够通过FD找到一个真实的struct file,并且能够通过这个file找到eventpoll对象,然后再通过eventpoll找到epollfd,从而能够形成一条“关系链”。

6.3.2、epoll_ctl()函数源码分析

epoll_create()函数的源码并不复杂,现在紧接着再来看看管理操作epollepoll_ctl()源码实现,这个函数与之对应的系统调用为SYSCALL_DEFINE4(),展开后则对应sys_epoll_ctl(),下面一起看看:

SYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,
        struct epoll_event __user *, event)
{
   
   
    int error;
    struct file *file, *tfile;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;
    // 错误处理动作及从用户空间将epoll_event结构拷贝到内核空间
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p)\n",
             current, epfd, op, fd, event));
    error = -EFAULT;
    if (ep_op_has_event(op) &&
        copy_from_user(&epds, event, sizeof(struct epoll_event)))
        goto error_return;

    // 通过传入的epfd得到前面创建的真实struct file结构
    error = -EBADF;
    file = fget(epfd);
    if (!file)
        goto error_return;

    // 这里是获取到需要监听的FD对应的真实struct file结构
    tfile = fget(fd);
    if (!tfile)
        goto error_fput;

    // 判断一下要监听的目标设备是否实现了poll逻辑
    error = -EPERM;
    if (!tfile->f_op || !tfile->f_op->poll)
        goto error_tgt_fput;

    // 判断一下传递的epfd是否有对应的eventpoll对象
    error = -EINVAL;
    if (file == tfile || !is_file_epoll(file))
        goto error_tgt_fput;

    // 根据private_data指针获取其中存放的eventpoll对象(上面聊过的)
    ep = file->private_data;

    // 接下来的操作会开始对内核中的结构进行修改,先加锁确保操作安全
    mutex_lock(&ep->mtx);
    // 先从红黑树结构中,根据FD查找一下对应的节点是否存在
    epi = ep_find(ep, tfile, fd);
    error = -EINVAL;
    // 开始判断用户具体要执行何种操作
    switch (op) {
   
   
    // 如果是要注册(向内核添加一个FD)
    case EPOLL_CTL_ADD:
        // 先判断之前是否已经添加过一次当前FD
        if (!epi) {
   
   
            // 如果没有添加,则调用ep_insert()函数将当前fd注册
            epds.events |= POLLERR | POLLHUP;
            error = ep_insert(ep, &epds, tfile, fd);
        } else // 之前这个FD添加过一次,则返回错误码,不允许重复注册
            error = -EEXIST;
        break;
    // 如果是删除(从内核中移除一个FD)
    case EPOLL_CTL_DEL:
        // 如果前面从红黑树中能找到与FD对应的节点
        if (epi)
            // 调用ep_remove()函数移除相应的节点
            error = ep_remove(ep, epi);
        else // 如果红黑树上都没有FD对应的节点,则无法移除,返回错误码
            error = -ENOENT;
        break;
    // 如果是修改(修改内核中FD对应的事件)
    case EPOLL_CTL_MOD:
        // 和上面的删除同理,调用ep_modify()修改FD对应的节点信息
        if (epi) {
   
   
            epds.events |= POLLERR | POLLHUP;
            error = ep_modify(ep, epi, &epds);
        } else // 树上没有对应的节点,依旧返回错误码
            error = -ENOENT;
        break;
    }
    // 修改完成之后,为了确保其他进程可操作,记得释放锁哦~
    mutex_unlock(&ep->mtx);

// 这里是对应上述各种错误情况的goto
error_tgt_fput:
    fput(tfile);
error_fput:
    fput(file);
error_return:
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p) = %d\n",
             current, epfd, op, fd, event, error));
    return error;
}

epoll_ctl()函数的实现过程,看起来是相当直观明了,总结一下:

  • ①先将用户传递的epoll事件集合epoll_event结构从用户空间拷贝到内核。
  • ②通过epfd找到与之对应的struct file结构,再找到FD对应的file结构。
  • ③判断要监听的FD设备是否实现了poll功能,再根据private_data获取eventpoll
  • ④上锁,然后通过传入的fd在红黑树中查找有没有对应的节点,然后处理用户操作。
  • ⑤如果是注册操作,先判断当前FD之前是否注册过,在树上是否有相应节点:
    • 有:代表之前已经添加过一次,不能重复添加,返回错误码。
    • 没有:调用ep_insert()函数,将当前FD添加到红黑树中。
  • ⑥如果是删除操作,先看一下树上有没有与目标FD对应的节点:
    • 有:调用ep_remove()函数,将当前FD对应的节点从树上移除。
    • 没有:代表FD之前都没有添加过,找不到要移除的节点,返回错误码。
  • ⑦如果是修改操作,先看一下树上有没有与目标FD对应的节点:
    • 有:调用ep_modify()函数,根据用户的操作项,修改对应节点信息。
    • 没有:代表FD之前都没有添加过,找不到要修改的节点,返回错误码。
  • ⑧操作完成后,释放锁,同时如果前面有错误则利用goto处理前面的错误信息。

相信认真看一遍源码,以及上述流程后,对于epoll_ctl()函数的逻辑就明白了,当然,诸位有些绕的地方估计在epoll内部结构之间的关系,上个图理解:

011.png

整个结构关联起来略显复杂,但如若之前的epoll_create()函数真正理解后,其实也并不难懂,调用epoll_create后会先创建两个结构体:一个file结构、一个eventpoll结构,然后会将eventpoll保存在file.private_data指针中,同时再将这个file的文件描述符返回给调用者(用户态程序),此时这个返回的FD就是所谓的epollfd

然后当我们调用epoll_ctl()尝试将一个要监听的SocketFD加入到内核时,我们首先需要传递一个epfd,而后再ctl()函数内部会根据这个epfd找到之前创建的file结构,再根据其private_data指针找到前面创建的eventpoll对象,然后定位该对象的内部成员:rbr红黑树,在将要监听的FD封装成epitem节点加入树中。

当然,为了求证上述观点,接下来再看看ep_insert()函数的实现。

static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
             struct file *tfile, int fd)
{
   
   
    int error, revents, pwake = 0;
    unsigned long flags;
    struct epitem *epi;
    // 这里是一个新的结构体(类似于select、poll中的poll_wqueues结构)
    struct ep_pqueue epq;
    // 检查目前是否达到了当前用户进程的最大监听数
    if (unlikely(atomic_read(&ep->user->epoll_watches) >=
             max_user_watches))
        return -ENOSPC;
    // 利用SLAB机制分配一个epitem节点
    if (!(epi = kmem_***_alloc(epi_***, GFP_KERNEL)))
        return -ENOMEM;
    // 初始化epitem节点的一些成员
    INIT_LIST_HEAD(&epi->rdllink);
    INIT_LIST_HEAD(&epi->fllink);
    INIT_LIST_HEAD(&epi->pwqlist);
    epi->ep = ep;
    // 将要监听的FD以及它的file结构设置到epitem.ffd成员中
    ep_set_ffd(&epi->ffd, tfile, fd);
    epi->event = *event;
    epi->nwait = 0;
    epi->next = EP_UNACTIVE_PTR;
    // 同时开始准备调用fd对应设备的poll
    epq.epi = epi;
    // 这里和select、poll差不多,设置执行poll_wait()时,
    // 其回调函数为ep_ptable_queue_proc(稍后分析)
    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
    // tfile是需要监听的fd对应的file结构
    // 这里就是去调用fd对应设备的poll,询问I/O数据是否可读写
    revents = tfile->f_op->poll(tfile, &epq.pt);
    // 这里是防止执行出现错误的检测动作
    error = -ENOMEM;
    if (epi->nwait < 0)
        goto error_unregister;
    // 每个FD会将所有监听自己的epitem链起来
    spin_lock(&tfile->f_lock);
    list_add_tail(&epi->fllink, &tfile->f_ep_links);
    spin_unlock(&tfile->f_lock);
    // 上述所有工作完成后,将epitem插入到红黑树中
    ep_rbtree_insert(ep, epi);
    // 判断一下前面调用poll之后,对应设备上的I/O事件是否就绪
    if ((revents & event->events) && !ep_is_linked(&epi->rdllink)) {
   
   
        // 如果已经就绪,那直接将当前epitem节点添加到eventpoll.rdllist中
        list_add_tail(&epi->rdllink, &ep->rdllist);
        // 同时唤醒正在阻塞等待的进程
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }
    spin_unlock_irqrestore(&ep->lock, flags);
    atomic_inc(&ep->user->epoll_watches);
    // 调用poll_wait()执行回调函数(为了防止锁资源占用,这是在锁外调用)
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return 0;
// 执行出错的goto代码块
error_unregister:
    ep_unregister_pollwait(ep, epi);
    spin_lock_irqsave(&ep->lock, flags);
    if (ep_is_linked(&epi->rdllink))
        list_del_init(&epi->rdllink);
    spin_unlock_irqrestore(&ep->lock, flags);
    kmem_***_free(epi_***, epi);
    return error;
}

其实对于ep_insert()这个函数呢,说清楚来也并不复杂,简单总结一下:

  • ①对于要监听的FD会先分配一个epitem节点,并且根据FD对节点进行初始化。
  • ②最开始声明了一个结构体ep_pqueue,然后会利用它为FD设置poll_wait回调函数。
  • ③尝试调用FD对应设备的poll,询问当前FDI/O数据是否可被读写。
  • ④调用ep_rbtree_insert()函数将已经构建好的epitem节点插入红黑树中。
  • ⑤判断一下当前FDI/O事件是否已就绪(可被读写),如果可以则唤醒等待的进程。

当然,对于新的结构体ep_pqueue,它的功能和之前聊到的poll_wqueues功能大致相同,主要用于设置唤醒回调,定义如下:

struct ep_pqueue {
   
   
    poll_table pt;
    struct epitem *epi;
};

很明显就可以看出,与poll_wqueues结构中同样存在poll_table成员,不熟悉的跳回之前讲poll_wqueues的环节。不过EpollSelect、Poll还是存在些许不同的,在之前设置poll_wait()的回调函数是__pollwait(),但在这里设置的确是ep_ptable_queue_proc()函数,那这个函数会做什么事情呢?来看看:

static void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,
                 poll_table *pt)
{
   
   
    struct epitem *epi = ep_item_from_epqueue(pt);
    struct eppoll_entry *pwq;
    if (epi->nwait >= 0 && (pwq = kmem_***_alloc(pwq_***, GFP_KERNEL))) {
   
   
        // 初始化等待队列, 指定ep_poll_callback为唤醒时的回调函数
        init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
        pwq->whead = whead;
        pwq->base = epi;
        // 将节点加入到等待队列中.....
        add_wait_queue(whead, &pwq->wait);
        list_add_tail(&pwq->llink, &epi->pwqlist);
        epi->nwait++;
    } else {
   
   
        /* We have to signal that an error occurred */
        epi->nwait = -1;
    }
}

上述重心我们只需要知道一个点,这个函数会在执行f_op->poll时被调用的,在这里最重要的是设置了一个唤醒时的回调函数ep_poll_callback(),也就是当某个设备上I/O事件就绪后,唤醒进程时会调用的函数,实现如下:

static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
{
   
   
    int pwake = 0;
    unsigned long flags;
    // 从等待队列获取epitem节点(主要目的在于要确认哪个进程在等待当前设备就绪)
    struct epitem *epi = ep_item_from_wait(wait);
    // 获取当前epitem节点所在的eventpoll
    struct eventpoll *ep = epi->ep;
    spin_lock_irqsave(&ep->lock, flags);
    if (!(epi->event.events & ~EP_PRIVATE_BITS)) // 检测错误
        goto out_unlock;
    // 检测目前设备上就绪的事件是否为我们要监听的事件
    if (key && !((unsigned long) key & epi->event.events))
        // 如果不是,则直接跳转goto
        goto out_unlock;
    // 这里是用来处理事件并发出现时的情况,
    // 假设当前的回调方法被执行,但epoll_wait()已经获取到了别的IO事件,
    // 那么此时将当前设备发生的事件,epitem会用一个链表存储,
    // 此时不立即发给应用程序,也不丢弃本次IO事件,
    // 而是等待下次调用epoll_wait()函数时返回
    if (unlikely(ep->ovflist != EP_UNACTIVE_PTR)) {
   
   
        if (epi->next == EP_UNACTIVE_PTR) {
   
   
            epi->next = ep->ovflist;
            ep->ovflist = epi;
        }
        // 然后直接跳转goto
        goto out_unlock;
    }
    // 正常情况下,将当前已经触发IO事件的epitem节点放入readylist就绪列表
    if (!ep_is_linked(&epi->rdllink))
        list_add_tail(&epi->rdllink, &ep->rdllist);
    // 唤醒调用epoll_wait后阻塞的进程...
    if (waitqueue_active(&ep->wq))
        wake_up_locked(&ep->wq);
    // 如果epollfd也在被poll, 也唤醒队列里面的所有成员(多进程epoll情况)
    if (waitqueue_active(&ep->poll_wait))
        pwake++;
// 前面的goto跳转处理
out_unlock:
    spin_unlock_irqrestore(&ep->lock, flags);
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return 1;
}

这个唤醒回调函数,主要干的事情就是处理了几种特殊情况,然后将IO事件就绪的节点添加到了eventpoll.readylist就绪列表,紧接着唤醒了调用epoll_wait()函数后阻塞的进程。

至此,epoll_ctl()函数调用后,会执行的流程就已经分析明白了。当然,对于具体如何插入、移除、修改节点的函数就不分析了,这里就是红黑树结构的知识,大家可参考HashMap集合的元素管理原理。接下来重点看看epoll_wait()函数。

6.3.3、epoll_wait()函数源码分析

epoll_wait()也是整个Epoll机制中最重要的一步,前面的create、ctl函数都仅仅是在为wait函数做准备工作,epoll_wait()是一个阻塞函数,调用后会导致当前程序发生阻塞等待,直至获取到有效的IO事件或超时为止。

epoll_wait()对应的系统调用为SYSCALL_DEFINE4()函数,展开后则是sys_epoll_wait(),不多说直接上源码:

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,
        int, maxevents, int, timeout)
{
   
   
    int error;
    struct file *file;
    struct eventpoll *ep;
    // 获取的最大事件数量必须大于0,并且不超出ep的最大事件数
    if (maxevents <= 0 || maxevents > EP_MAX_EVENTS)
        return -EINVAL;
     // 内核会验证用户接收事件的这一段内存空间是不是有效的.
    if (!access_ok(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))) {
   
   
        error = -EFAULT;
        goto error_return;
    }
    error = -EBADF;
    // 根据epollfd获取对应的struct file真实文件
    file = fget(epfd);
    if (!file)
        goto error_return;
    error = -EINVAL;
    // 检查一下获取到的file是不是一个epollfd
    if (!is_file_epoll(file))
        goto error_fput;
    // 获取file的private_data数据,也就是根据file获取eventpoll对象
    ep = file->private_data;
    // 获取到eventpoll对象调用ep_poll()函数(这个是核心函数!)
    error = ep_poll(ep, events, maxevents, timeout);
error_fput:
    fput(file);
error_return:
    return error;
}

上述逻辑也不难,首先对于用户态调用epoll_wait()函数时传递的一些参数进行了效验,因为内核对于进程采取的态度是绝对不信任,因此对于用户进程递交的任何参数都会进行效验,确保无误后才会采取下一步措施。当上述代码前面效验了参数的“合法性”后,又根据epfd获取了对应的file,然后又根据file获取到了eventpoll对象,最后调用了ep_poll()函数并传入了eventpoll对象,再看看这个函数:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
   
   
    int res, eavail;
    unsigned long flags;
    long jtimeout;
    // 等待队列
    wait_queue_t wait;
    // 如果调用epoll_wait时传递了阻塞时间,那么先计算休眠时间, 
    // 毫秒要转换为HZ电磁波动的频率(比较严谨,控制的非常精细)
    jtimeout = (timeout < 0 || timeout >= EP_MAX_MSTIMEO) ?
        MAX_SCHEDULE_TIMEOUT : (timeout * HZ + 999) / 1000;
retry:
    spin_lock_irqsave(&ep->lock, flags);
    res = 0;
    // 判断一下eventpoll.readylist事件就绪列表是否为空
    if (list_empty(&ep->rdllist)) {
   
   
        // 初始化等待队列,准备将当前进程挂起阻塞
        init_waitqueue_entry(&wait, current);
        // 挂载到如果eventpoll.wq等待队列中
        __add_wait_queue_exclusive(&ep->wq, &wait);

        // 核心循环!
        for (;;) {
   
   
            // 准备进入阻塞,先将当前进程设置为睡眠状态(可被信号唤醒)
            set_current_state(TASK_INTERRUPTIBLE);
            // 如果睡眠之前,readylist中有数据了或已经到了给定的超时事件
            if (!list_empty(&ep->rdllist) || !jtimeout)
                break; // 不睡了,直接中断循环
            // 如果出现唤醒信号,也中断循环,不睡了退出干活
            if (signal_pending(current)) {
   
   
                res = -EINTR;
                break;
            }
            // 上述的几个情况都未发生,在这里准备正式进入睡眠状态
            spin_unlock_irqrestore(&ep->lock, flags);
            // 开始睡觉(如果指定了阻塞时间,jtimeout时间过后会自动醒来)
            jtimeout = schedule_timeout(jtimeout); // 正式进入睡眠阻塞
            spin_lock_irqsave(&ep->lock, flags);
        }
        // 出现唤醒信号、或事件已就绪、或已超时,则将进程从等待队列移除
        __remove_wait_queue(&ep->wq, &wait);
        // 这里设置一下当前进程的状态为运行状态(活跃状态)
        set_current_state(TASK_RUNNING);
    }
    // 因为超时的情况下当前进程也会醒来,所以这里需要再次判断一下:
    // 目前到底是否已经有数据就绪了,这里会返回一个布尔值给 eavail
    eavail = !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;
    spin_unlock_irqrestore(&ep->lock, flags);
    // 如果确定有事件发生,那则调用ep_send_events()将事件拷贝给用户进程
    if (!res && eavail &&
        !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
        goto retry;
    // 最后返回本次监听到的事件数
    return res;
}

上述的ep_poll()函数,对比select、poll而言要简单很多,整个流程也没几步:

  • ①转换给定的阻塞时间,并判断就绪列表中事件是否已就绪,没有则准备休眠阻塞。
  • ②先初始化等待队列并挂载到eventpoll.wq中,然后进入循环,设置进程的状态。
  • ③在正式进入睡眠之前,再次检测是否有事件就绪、是否已超时、是否出现唤醒信号。
  • ④如果都没有出现,则调用schedule_timeout(jtimeout)函数让进程睡眠一定时间。
  • ⑤当睡眠超时、或出现唤醒信号、或事件已就绪,将当前进程从队列移除并设置运行状态。
  • ⑥有事件到来非超时的情况下,则调用ep_send_events()将就绪事件拷贝给用户进程。

OK~,上述总结的流程已经描述的十分清晰了,接下来再看看拷贝就绪事件的函数ep_send_events()

static int ep_send_events(struct eventpoll *ep,
              struct epoll_event __user *events, int maxevents)
{
   
   
    struct ep_send_events_data esed;
    // 获取用户进程传递的maxevents值
    esed.maxevents = maxevents;
    // 获取用户态存放就绪事件的集合
    esed.events = events;
    // 调用ep_scan_ready_list()函数进行具体处理
    return ep_scan_ready_list(ep, ep_send_events_proc, &esed);
}

这个函数非常简单,一眼看明白了,其实最终的拷贝工作是由ep_scan_ready_list()完成的,那么再来看看它:

static int ep_scan_ready_list(struct eventpoll *ep,
                  int (*sproc)(struct eventpoll *,
                       struct list_head *, void *),
                  void *priv)
{
   
   
    int error, pwake = 0;
    unsigned long flags;
    struct epitem *epi, *nepi;
    LIST_HEAD(txlist);
    // 先上锁,防止出现安全问题
    mutex_lock(&ep->mtx);
    spin_lock_irqsave(&ep->lock, flags);
    // 将rdllist中所有就绪的节点转移到txlist,然后清空rdllist
    list_splice_init(&ep->rdllist, &txlist);
    ep->ovflist = NULL;
    spin_unlock_irqrestore(&ep->lock, flags);
    // sproc是前面调用当前函数时传递的ep_send_events_proc(),
    // 会通过这个函数处理每个epitem节点
    error = (*sproc)(ep, &txlist, priv);
    spin_lock_irqsave(&ep->lock, flags);
    // 之前曾讲到过,如果epoll正在拷贝数据时又发生了IO事件,
    // 那么则会将这些IO事件保存在ovflist组成一个链表,现在来处理这些事件
    for (nepi = ep->ovflist; (epi = nepi) != NULL;
         nepi = epi->next, epi->next = EP_UNACTIVE_PTR) {
   
   
        // 将这些直接放入readylist列表中
        if (!ep_is_linked(&epi->rdllink))
            list_add_tail(&epi->rdllink, &ep->rdllist);
    }
    ep->ovflist = EP_UNACTIVE_PTR;
    // 上一次没有处理完的epitem节点, 重新插入到readylist
    // 因为epoll一次只能拷贝maxevents个事件返回用户态
    list_splice(&txlist, &ep->rdllist);
    // readylist不为空, 直接唤醒
    if (!list_empty(&ep->rdllist)) {
   
   
        // 唤醒的前置工作
        if (waitqueue_active(&ep->wq))
            wake_up_locked(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }
    spin_unlock_irqrestore(&ep->lock, flags);
    mutex_unlock(&ep->mtx);
    // 为了防止长时间占用锁,在锁外执行唤醒工作
    if (pwake)
        ep_poll_safewake(&ep->poll_wait);
    return error;
}

流程就不写了,源码中标注的很清楚,下面再来看看ep_send_events_proc()函数是如何处理每个epitem节点的:

// 注意点:这里的入参list_head并不是readylist,而是上面函数的txlist
static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,
                   void *priv)
{
   
   
    struct ep_send_events_data *esed = priv;
    int eventcnt;
    unsigned int revents;
    struct epitem *epi;
    struct epoll_event __user *uevent;
    // 先用循环扫描整个列表(不一定会全部处理,最多只处理maxevents个)
    for (eventcnt = 0, uevent = esed->events;
         !list_empty(head) && eventcnt < esed->maxevents;) {
   
   
        // 依次获取到其中的一个epitem节点
        epi = list_first_entry(head, struct epitem, rdllink);
        // 紧接着从列表中将这个节点移除
        list_del_init(&epi->rdllink);
        // 再次读取当前节点对应FD所触发的事件,其实在唤醒回调函数中,
        // 这个工作也执行过一次,那为啥这里还需要做一次呢?
        // 答案是:为了确保读到最新的事件,因为有些FD可能前面触发了读就
        // 绪事件,后面又触发了写就绪事件,因此这里要确保严谨性。
        revents = epi->ffd.file->f_op->poll(epi->ffd.file, NULL) &
            epi->event.events;
        if (revents) {
   
   
            // 调用__put_user将就绪的事件拷贝至用户进程传递的事件集合中
            if (__put_user(revents, &uevent->events) ||
                __put_user(epi->event.data, &uevent->data)) {
   
   
                list_add(&epi->rdllink, head);
                return eventcnt ? eventcnt : -EFAULT;
            }
            eventcnt++;
            uevent++;
            if (epi->event.events & EPOLLONESHOT)
                epi->event.events &= EP_PRIVATE_BITS;
            else if (!(epi->event.events & EPOLLET)) {
   
   
                // 大名鼎鼎的ET和LT,就在这一步会有不同(稍后分析)
                list_add_tail(&epi->rdllink, &ep->rdllist);
            }
        }
    }
    return eventcnt;
}

处理每个epitem节点的函数中,重点就做了两件事:

  • ①读取了每个epitem节点最新的就绪事件。
  • ②调用__put_user()函数将就绪的事件拷贝至用户进程传递的evlist集合中。

至此,epoll_wait()函数的核心源码也全都走了一遍,最后来简单的总结一下。

6.3.4、Epoll被阻塞的进程是如何唤醒的?

   到这里,大家应该有个疑惑:epoll_wait()函数中,本质上只做了进程休眠阻塞的工作,那它什么时候会被唤醒呢?先对于这个点回答一下:大家还记得前面分析epoll_ctl()函数时,在其中调用了每个FDpoll吗?

revents = tfile->f_op->poll(tfile, &epq.pt);

也就是这行代码,在调用epoll_ctl()函数向内核插入一个节点时,就会先询问一次FDIO数据是否可被读写,此时如果可以就会直接将这个节点添加到readylist列表中,但如果对应驱动设备的IO事件还未就绪,则会将当前进程注册到每个FD对应设备的等待队列上,并设置唤醒回调函数为ep_poll_callback()

这样也就意味着,如果某个FD的事件就绪了,就会由对应的驱动设备执行这个回调,在ep_poll_callback()函数中,会先将对应的节点先插入到readylist列表,然后会尝试唤醒eventpoll等待队列中阻塞的进程。

当后续调用epoll_wait()函数时,会先判断readylist列表中是否有事件就绪,如果有就直接读取返回了,如果没有则会让当前进程阻塞休眠,并将当前进程添加到eventpoll等待队列中,然后某个FD的数据就绪后,则会唤醒这个队列中阻塞的进程,此时调用epoll_wait()陷入阻塞的进程就被唤醒工作了!!发现没有,Epoll的源码设计中是环环相扣的,十分巧妙!

OK~,搞清楚这点之后,Epoll的核心逻辑已经讲完了十之八九,还剩下Epoll的两种事件触发机制未讲到,来聊一聊吧。

6.4、Epoll的两种事件触发机制

   相信之前有简单了解过Epoll的小伙伴都明白,在Epoll中有两种事件触发模式,分别被称为水平触发与边缘触发,一般来说,边缘触发的性能远超于水平触发。

6.4.1、Epoll水平触发机制-LT模式

   LT模式也是Epoll的默认事件触发机制,也就是当某个FDepitem节点)被处理后,如果还依旧存在事件或数据,则会再次将这个epitem节点加入readylist列表中,当下次调用epoll_wait()时依旧会返回给用户进程。

大家还记得前面分析ep_send_events_proc()函数时,最后的那两行代码吗?

else if (!(epi->event.events & EPOLLET)) {
   
   
    list_add_tail(&epi->rdllink, &ep->rdllist);
}

在这个函数中,最后面做了一个简单的判断,如果当前Epoll的工作模式没有设置成EL,同时当前节点还有事件未处理,就会调用list_add_tail()函数将当前的epitem节点重新加入readylist列表。反之,ET模式下则不会这么做。

6.4.2、Epoll边缘触发机制-ET模式

   在ET模式中,就是和LT模式反过来的,当处理一个epitem节点时,就算其中还有事件没处理完,那我也不会将这个节点重新加入readylist列表,除非这个节点对应的FD又再次触发了新事件,然后再次执行了ep_poll_callback()回调函数,此时才会将其重新加入到readylist

说人话简单一点:就是ET模式下,对于当前触发的事件,只会通知用户进程一次,就算没有处理也不会重复通知,除非这个FD发生新的事件。而LT模式下则相反,无论何种情况下都能确保事件不丢失。

那又该如何设置ET触发机制呢?其实也就是在调用epoll_ctl()函数时,指定感兴趣的监听事件时,多加一个EPOLLET即可。

基于Epoll机制构建的大部分高性能应用,一般都会采用ET模式,例如Nginx

6.5、Epoll小结与一个争议的问题

   相较于之前的select函数存在的四个问题,在Epoll中得到了合理解决,但也并非Epoll的性能就一定比Select、Poll要好,在监听的文件描述符较少、且经常更换监听的目标FD的情况下,Select、Poll的性能反而会更佳。

当然,epoll在高并发的性能下,会有非常优异的表现,这是由于多方面原因造就的,比如在内核中维护FD避免反复拷贝切换、对于就绪事件回调通知,无需用户进程再次轮询查找、内部采用红黑树结构维护节点、退出ET事件触发机制等.......

   同时对于网上一个较为争议的问题:Epoll到底有没有试用MMAP共享内存呢?从Epoll源码的角度来看,其实是并未使用的,在向用户进程返回就绪事件时,本质上是调用了__put_user()函数将数据从内核拷贝到了用户态。当然,我Epoll的源码是基于内核3.x版本的,但听网上说在早版本里面用到了,但翻阅分析select源码时用的内核2.6版本源码,在里面Epoll都还未定义完整,仅有部分实现,所以也没有发现mmap相关的API调用。

不过在Java-NIOFileChannel.transferTo()方法中,以及在Linux系统的sendfile()函数中确实用到了,因此操作本地文件数据时,确实会用到mmap共享内存。因此,Java-NIO中用到了mmap,但Epoll中应该未曾用到。

七、多路复用模型总结

   看到到这里,也就接近尾声了,上面已经对于Linux系统下提供的多路复用函数进行了全面深入剖析,大家反复阅读几遍,自然能够彻底弄懂select、poll、epoll这些函数的工作原理。

不过对于Epoll的分析还有一些内容未曾提及,也就是Epoll唤醒时的惊群问题,大家感兴趣的可自行去研究,这里只埋下一个引子,当然也并不复杂。

   而整个Java-NIO都是基于底层的多路复用函数构建的,但本篇仅分析了Linux系统下的多路复用实现,在本文开篇也提到过:JVM为了维持自己的跨平台性,因此在不同的系统下会分别调用不同的多路复用函数,比如Windows-Select、Mac-KQueue函数,其中Windows-SelectLinux-Select的实现类似,因此我从JNI调用的入参上来说大致相同。而Mac-KQueue则与Linux-Epoll类似。但对于其内部实现我并不清楚,毕竟是闭源的系统,大家有兴趣可以自行研究。

最后,还有Java-AIO的内容,其底层是如何实现的呢?在异步非阻塞式IO的支持方面,Windows系统反而做的更好,因为它有专门实现IOCP机制,但Linux、Mac系统则是通过KQueue、Epoll模拟实现的。

至此,最后也简单的总结一下本篇分析的select、poll、epoll三者之间的区别:

对比项 Select Poll Epoll
内部数据结构 数组位图 数组链表 红黑树
最大监听数 1024 理论无限制 理论无限制
事件查找机制 线性轮询 线性轮询 回调事件直接写回用户态
事件处理时间复杂度 O(n) O(n) O(1)
性能对比 FD越多越差 FD越多越差 FD增多不会造成影响
FD传递机制 用~核拷贝 用~核拷贝 内核维护结构

......,除上述之外,三者还存在很多细微差异,大家认真看懂本篇自然能心中明了,因此不再赘述,到此为本文画上句号。

本文由于整篇涉及到Linux内核源码的调试,由于自己本身不是C开发,所以调试起来也显得心里憔悴,但至少对于多路复用函数的核心源码都已阐明。当然,如若文中存在不足还请谅解,对于存在误区、疑义的地方也欢迎留言指正。最后,也希望大家动动手指点赞支持,在此万分感谢^_^

相关文章
|
2月前
|
网络协议 安全 Linux
Linux C/C++之IO多路复用(select)
这篇文章主要介绍了TCP的三次握手和四次挥手过程,TCP与UDP的区别,以及如何使用select函数实现IO多路复用,包括服务器监听多个客户端连接和简单聊天室场景的应用示例。
95 0
|
9天前
|
数据采集 人工智能 Java
Java产科专科电子病历系统源码
产科专科电子病历系统,全结构化设计,实现产科专科电子病历与院内HIS、LIS、PACS信息系统、区域妇幼信息平台的三级互联互通,系统由门诊系统、住院系统、数据统计模块三部分组成,它管理了孕妇从怀孕开始到生产结束42天一系列医院保健服务信息。
25 4
|
15天前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
45 2
|
2月前
|
存储 Linux C语言
Linux C/C++之IO多路复用(aio)
这篇文章介绍了Linux中IO多路复用技术epoll和异步IO技术aio的区别、执行过程、编程模型以及具体的编程实现方式。
91 1
Linux C/C++之IO多路复用(aio)
|
20天前
|
人工智能 监控 数据可视化
Java智慧工地信息管理平台源码 智慧工地信息化解决方案SaaS源码 支持二次开发
智慧工地系统是依托物联网、互联网、AI、可视化建立的大数据管理平台,是一种全新的管理模式,能够实现劳务管理、安全施工、绿色施工的智能化和互联网化。围绕施工现场管理的人、机、料、法、环五大维度,以及施工过程管理的进度、质量、安全三大体系为基础应用,实现全面高效的工程管理需求,满足工地多角色、多视角的有效监管,实现工程建设管理的降本增效,为监管平台提供数据支撑。
34 3
|
25天前
|
运维 自然语言处理 供应链
Java云HIS医院管理系统源码 病案管理、医保业务、门诊、住院、电子病历编辑器
通过门诊的申请,或者直接住院登记,通过”护士工作站“分配患者,完成后,进入医生患者列表,医生对应开具”长期医嘱“和”临时医嘱“,并在电子病历中,记录病情。病人出院时,停止长期医嘱,开具出院医嘱。进入出院审核,审核医嘱与住院通过后,病人结清缴费,完成出院。
60 3
|
1月前
|
JavaScript Java 项目管理
Java毕设学习 基于SpringBoot + Vue 的医院管理系统 持续给大家寻找Java毕设学习项目(附源码)
基于SpringBoot + Vue的医院管理系统,涵盖医院、患者、挂号、药物、检查、病床、排班管理和数据分析等功能。开发工具为IDEA和HBuilder X,环境需配置jdk8、Node.js14、MySQL8。文末提供源码下载链接。
|
2月前
|
移动开发 前端开发 JavaScript
java家政系统成品源码的关键特点和技术应用
家政系统成品源码是已开发完成的家政服务管理软件,支持用户注册、登录、管理个人资料,家政人员信息管理,服务项目分类,订单与预约管理,支付集成,评价与反馈,地图定位等功能。适用于各种规模的家政服务公司,采用uniapp、SpringBoot、MySQL等技术栈,确保高效管理和优质用户体验。
|
2月前
|
Linux C++
Linux C/C++之IO多路复用(poll,epoll)
这篇文章详细介绍了Linux下C/C++编程中IO多路复用的两种机制:poll和epoll,包括它们的比较、编程模型、函数原型以及如何使用这些机制实现服务器端和客户端之间的多个连接。
27 0
Linux C/C++之IO多路复用(poll,epoll)
|
2月前
|
Java Linux
【网络】高并发场景处理:线程池和IO多路复用
【网络】高并发场景处理:线程池和IO多路复用
47 2