listenStream里面也有核心的三步:
- 通过syscall.Bind 完成绑定
- 通过listenFunc 完成监听
- 调用netFD自身init完成初始化操作:netFD.init ->poll.FD.init->FD.pollDesc.init
我们主要看fd.init逻辑。
最终是调用的pollDesc的init函数。这个函数有重要的两步。
- runtime_pollServerInit:主要会根据不同的平台去调用对应平台的netpollInit函数来创建I/O多路复用的实例。比如linux下创建epoll。
- runtime_pollOpen(uintptr(fd.Sysfd)):主要将已经创建完的Listener fd注册到上述实例当中,比如将fd注册到epoll中,底层通过syscall调用epoll_ctl。
更具体的流程,
首先serviceInit.Do 保证当中的runtime_pollServerInit只会初始化一次。这很好理解,类似epoll实例全局初始化一次即可。
接着我们看下runtime_pollServerInit函数,
这是咋回事,和我们平常看过的函数长的不太一样,执行体呢?
其实这个函数是通过 go:linkname连接到具体实现的函数poll_runtime_pollServerInit。找起来也很简单,
看到poll_runtime_pollServerInit()上面的 //go:linkname xxx 了吗?不了解的可以看看Go官方文档`go:linkname。
所以最终runtime_pollServerInit调用的是,
通过调用poll_runtime_pollServerInit->netpollGenericInit,netpollGenericInit
里调用netpollinit函数完成初始化。
注意。这里的netpollinit,是基于当前系统来调用对应系统的netpollinit函数的。
什么意思?
文章开始有提到Go底层网络模型是基于I/O多路复用。
不同平台对I/O多路复用有不同的实现方式。比如Linux的epoll,MacOS的kqueue,而Windows的icop。
所以对应,如果你当前是Linux,那么最终调用的是src/runtime/netpoll_epoll.go下的 netpollinit函数,然后会创建一个epoll实例,并把值赋给epfd,作为整个runtime中唯一的event-loop使用。
其他的,比如MacOS下的kqueue,也存在netpollinit函数。
以及Windows下的icop。
我们回到pollDesc.init 操作,
完成第一步初始化操作后,第二步就是调用runtime_pollOpen。
老套路通过//go:linkname找到对应的实现,实际上是调用的poll_runtime_pollOpen函数。
这个函数里面再调用netpollopen函数,netpollopen函数和上面的netpollinit函数一样,不同平台都有它的实现。linux平台下,
netpollopen函数,首先会通过指针把pollDesc保存到epollevent的一个字节数组data里。
然后会把传递进来的fd(刚才初始化完成的那个Listener监听器)注册到epoll当中,且通过指定 _EPOLLET将epoll设置为边缘触发(Edge Triggered)模式。
如果让我用一句话来说明epoll水平触发和边缘触发的区别,那就是,
水平触发下epoll_wait在文件描述符没有读写完会一直触发,而边缘触发只在是在变成可读写时触发一次。
到这里整个Listen 动作也就结束了,然后层层返回。最终到业务返回的是一个 Listener,按照本篇的例子,本质上还是一个TCPListener。
Accept解析
接着当我们调用listen.Accept的时候,
最终netFD的accept函数。netFD中通过调用fd.pfd(实际上是FD)的Accept函数获取到socket fd,通过这个fd创建新的netFD表示这是一个新连接的fd。
并且会和Listen时一样调用netFD.init做初始化,因为当前epoll已经初始化一次了,所以这次只是把这个新连接的fd也加入到epoll事件队列当中,用于监听conn fd的读写I/O事件。
具体我们看FD.Accept是咋么执行的。
首先是一个死循环for,死循环里调用了accept函数,本质上通过systcall调用系统accept接收新连接。当有新连接时,最终返回一个文件描述符fd。
当accept获取到一个fd,会调用systcall.SetNonblock把这个fd设置成非阻塞的I/O。然后返回这个连接fd。
因为我们在Listen的时候已经把对应的Listener fd设置成非阻塞I/O了。
所以调用accept这一步是不会阻塞的。只是下面会进行判断,根据判断 err ==syscall.EAGAIN 来调用fd.pd.waitRead阻塞住用户程序。
直到I/O事件ready,被阻塞在fd.pd.waitRead的代码会继续执行continue,重新一轮的accept, 此时对应fd上的 I/O已然ready,最终就返回一个conn类型的fd。
我刚才说的调用fd.pd.waitRead会被阻塞,直到对应I/O事件ready。我们来看它具体逻辑,
最终到runtime_pollWait函数,老套路了,我们找到具体的实现函数。
poll_runtime_pollWait 里的for循环就是为了等待对应的I/O ready才会返回,否则的话一直调用netpollblock函数。
pollDesc结构我们之前提到,它就是底层事件驱动的封装。
其中有两个重要字段: rg和wg,都是指针类型,实际这两个字段存储的就是Go底层的g,更具体点是等待i/O ready的g。
比如当创建完一个Listener,调用Accept开始接收客户端连接。如果没有对应的请求,那么最终会把g放入到pollDesc的rg。
如果是一个conn类型的fd等待可写I/O,那么会把g放入到pollDesc的wg中。
具体就是根据mode来判断当前是什么类型的等待事件。
netpollblock里也有一个for循环,如果已经ready了,那么直接返回给上一层就行了。否则的话,设置gpp为等待状态pdWait。
这里还有一点atomic.Loaduintptr(gpp),这是为了防止异常情况下出现死循环问题。比如如果gpp的值不是pdReady也不是0,那么意味着值是pdWait,那就成了double wait,必然导致死循环。
如果gpp未ready且成功设置成pdWait,正常情况下,最终会调用gopark,会挂起g且把对应的g放入到pollDesc 的wg|rg 当中。
进入gopark。
这一块代码不是很难,基本的字段打了备注,核心还是要看park_m这个函数。
在park_m函数中,首先会通过CAS并发安全地修改g的状态。
然后调用dropg解绑g和m的关系,也就是m把当前运行的g置空,g把当前绑定的m置空。
后面的代码是根据当前场景来解释的。我们知道此时m的waitunlockf 其实就是netpollblockcommit。
netpollblockcommit会把当前已经是_Gwaiting状态下的g赋值给gpp。如果赋值成功,netpollWaiters会加1。
这个全局变量表示当前等待I/O事件ready的g数量,调度器再进行调度的时候可以根据此变量判断是否存在等待I/O事件的g。
如果此时当前gpp下的fd的I/O已经ready。那么gpp的状态必然已不是pdWait,赋值失败。返回false。
回到park_m,
如果netpollblockcommit返回true,那么直接触发新一轮的调度。
如果netpollblockcommit返回false,意味着当前g已经不需要被挂起了,所以需要把状态调整为_Grunnable,然后安排g还是在当前m上执行。
当I/O事件ready,会一层层返回,获取到新的socket fd,创建conn类型的netFD,初始化netFD(其实就是把这个conn类型的fd也加入epoll事件队列,用于监听),最终最上游会获取到一个Conn类型的网络连接,就可以基于这个连接做Read、Write等操作了。
Read/Write 解析
后续的Conn.Read 和 Conn.Write 原理和Accept 类似。
上图给出了Write操作,可以看出核心部分和accept操作时一样的。对于Read操作,就不再重复了。
从上面的分析中我们已经知道,Go的netpoller底层通过对epoll|kqueue|iocp的封装,使用同步的编程手法达到异步执行的效果,无论是一个Listener还是一个Conn,它的核心都是netFD。
netFD又和底层的PollDesc结构绑定,当读写出现EAGAIN错误时,会通过调用gopark把当前g给park住,同时会将当前的g存储到对应netFD的PollDesc的wg|rg当中。
直到这个netFD再次发生对应的读写事件,才会重新把当前g放入到调度系统进行调度。
还有最后一个问题,我们咋么知道哪些FD发生读写事件了?
I/O已就绪
答案就是netpoll()函数。
此函数会调用epollwait函数,本质上就是Linux中epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout)
。
在之前调用epoll_ctl
,注册fd对应的I/O事件到epoll实例当中。
这里的epoll_wait
实际上会阻塞监听epoll实例上所有fd的I/O事件,通过传入的第二个参数(用户内存地址events)。
当有对应的I/O事件到来时,内核就会把发生事件对应的fd复制到这块用户内存地址(events),解除阻塞。
然后我们遍历这个events,去获取到对应的事件类型、pollDesc,再通过调用netpollready函数获取到pollDesc对应被gopark的g,最终把这些g加入到一个链表当中,返回。
也就是说只要调用这个函数,我们就能获取到之前因为I/O未ready而被gopark挂起,现在I/O已ready的g链表了。
我们可以找到四个调用处,如下,
- startTheWorldWithSema
- findrunnable
- pollWork
- sysmon
这和go的调度有关,当然这不是本章的内容。
当这四种方法调用netpoll函数得到一个可运行的g链表时,都会调用同一个函数injectglist。
这个函数本质上就是把链表中所有g的状态从Gwaiting->Grunnable。然后按照策略,把这些g推送到本地处理器p或者全家运行队列中等待被调度器执行。
到这里,整个流程就已经剖析完毕。不能再写了。
总结
Go netpoller通过在底层对epoll/kqueue/iocp这些不同平台下对I/O多路复用实现的封装,加上自带的goroutine(上文我一直用g表达),从而实现了使用同步编程模式达到异步执行的效果。
代码很长,涉及到的模块也很多,整体看完代码还是非常爽的。
另外早有人提出,由于一个连接对应一个goroutine,瞬时并发场景下,大量的goroutine会被不断创建。
原生netpoller无法提供足够的性能和控制力,如无法感知连接状态、连接数量多导致利用率低、无法控制协程数量等。针对这些问题,可以参考下gnet以及 KiteX 这两个项目的网络模型。