Golang
因为有着比线程更加轻量级的协程的出现,使得并发编程的上手难度一下子变得亲民起来。而channel
的引入,使得goroutine
之间的通信变得异常的便捷。
但好用并不意味着毫无风险,go channel
使用不当,也极易引起goroutine
泄漏。
何谓goroutine
泄漏?就是开启了goroutine
,却并没有及时回收,导致goroutine
越积越多,如果程序及时关闭还不会出现问题,如果是在服务器中,程序长期运行,就会导致资源占用十分恐怖。
虽然goroutine
比线程更轻量级,但每个goroutine
至少也会有8~10K的空间,如果goroutine
达到了一个恐怖的量级,内存的占用也是十分可怕的。
笔者近期就遇到了一个生产环境的goroutine
泄漏问题。
问题复盘
我们有一套go语言开发的程序部署在客户的机器,该程序主要用来接收http请求,并将相应的请求解释成客户端配置文件,下发给客户端。服务器与客户端之间采用的是TCP长连接,二者之间靠心跳机制保活。
除此之外,该服务器还会接收客户端发过来的自监控性能指标信息,通过写ES或其他数据库的方式落到硬盘,以供监控分析。
以上这是背景。
起因是发现其中某一台机器上,该程序运行一段时间后,内存占用达到了数个G,而客户端的连接数量其实并不多。
通过pprof
查看,发现goroutine
的数量多得很不正常,甚至达到了13万多个。
用pprof
工具定位,最终发现了问题所在,就是因为goroutine
泄漏了。
那么,goroutine
怎么会泄漏呢?经过分析代码终于发现了端倪,原来问题出在自监控信息上。
为了接受客户端自监控性能指标的信息,我们在服务器的配置文件中配置了数据库信息,如ES
、MySQL
、InfluxDb
等,客户可以根据自己的需要选择合适的数据库进行存储。自监控信息通过心跳包携带上来。
在设计上是这么做的:心跳接受自监控信息和写数据库分别位于两个协程中,彼此之间通过channel
通信。
channel
的定义如下,它有1000
个缓存:
Runner{ input: make(chan ProcessMetric, 1000), config: config, exporters: make(map[string]Exporter, 0), }
在心跳接收的地方:
pm := exporters.ProcessMetric{ AgentId: agentID, Timestamp: time.Unix(agentHeartbeatInfo.Monitor.Timestamp/1000, 0), HostName: agentHeartbeatInfo.System.HostName, Ip: agentHeartbeatInfo.System.Connection.Ip, Pid: subProcess.Pid, ProcessName: subProcess.Procname, Cmd: subProcess.Cmd, CpuUsageRate: subProcess.CpuUsageRate, MemUsage: subProcess.MemUsage, SendLines: subProcess.SendLines, } svr.exporterService.Input() <- pm
写入数据库的地方:
for { select { case metric := <-exporter.input: exporter.process(metric) case <-exporter.t: return } }
乍看起来这样设计似乎没有什么问题,但是为了代码的健壮性,当用户在配置文件里一个数据库都没配置时,就不会去写到数据库。
if len(runner.config.Exporters) == 0 { return nil }
问题就出在这里。本来这段代码只是为了提高健壮性的,因为监控信息一般都会打开,偏偏真有客户因为机器上没有部署相应的数据库,所以没有打开,所以导致心跳一直在往channel
里发信息,但是channel
的接收端由于直接return
了,导致无法读取,当1000
个缓存满了之后,消息就全部阻塞在那里,导致goroutine
越来越多,最终达到了数十万个。
问题重现
为了重现这个问题,我将代码抽象出来了,大致如下程序所示:
package main import ( "fmt" "net/http" _ "net/http/pprof" "runtime" "time" ) func main() { go http.ListenAndServe("0.0.0.0:6060", nil) //注册pprof监控 ch := make(chan int, 5) //go channel,负责go routine之间通信,5个缓存 flag := false //bool类型标记,模拟配置信息,false表示没有配置 //第一个 go routine, 模拟写数据库信息,这里简化,直接读取channel的内容 go func() { //当flag为false时,直接return,这行代码是导致go routine泄漏的关键 if !flag { return } for { select { case recv := <-ch: //读取 channel fmt.Println("recive channel message:", recv) } } }() //for 循环模拟TCP长连接,每隔500ms向channel写一条数据,模拟心跳上报客户端自监控信息 for { i := 0 time.Sleep(500 * time.Millisecond) go func() { fmt.Println("goroutine count:", runtime.NumGoroutine()) i++ ch <- i }() } }
运行上面的程序,发现当channel
中5个缓存满了之后,每向channel
中写一次数据,goroutine
就会多一个,如果不停止程序,goroutine
还将无限增加下去。
问题修复
修复这个问题的方法也很简单,在发送端也做一个判断,当flag
为false
的时候不向channel
发送数据就可以了。
for { i := 0 time.Sleep(500 * time.Millisecond) go func() { fmt.Println("goroutine count:", runtime.NumGoroutine()) i++ if flag { ch <- i } }() }
修复后运行情况如下所示:
可见无论运行多久,goroutine
数量始终保持在3个,不会随着时间的推移无休止增加。
原理剖析
很多人可能不明白,为什么channel
阻塞,会造成goroutine
增加。看现象似乎每向channel
写入一次数据,就会产生一个goroutine
,而每从channel
中取出一个数据,就会销毁这个goroutine
。
其实这么理解也是说得通的。channel
本来就是为了goroutine
通信用的,数据的传输自然要借助于goroutine
。
我们打开channel
的源码(src/runtime/chan.go
),可以看到向channel
写入信息的函数其实是chansend1
,内部调用的是chansend
函数。
// entry point for c <- x from compiled code //go:nosplit func chansend1(c *hchan, elem unsafe.Pointer) { chansend(c, elem, true, getcallerpc()) }
channel
对应的数据结构封装在一个hchan
的结构体里,这个结构体的结构如下所示:
type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
在这个结构体里,维护了两类队列,一个基于数组的循环队列buf
,主要用来缓存数据,还有两个用户缓存阻塞的goroutine
的双向队列sendq
和recvq
。
这个buf
的大小dataqsiz
其实就是我们创建channel
时指定的缓存大小。当qcount
的数量小于dataqsiz
的时候,其实数据是可以放入缓存的。
当buf
满了之后,数据就无法再放入缓存队列中了,它就会阻塞在那地方,这些阻塞的数据会新创建一个goroutine
,并把这个goroutine
存放到sendq
队列中。
// Block on the channel. Some receiver will complete our operation for us. gp := getg() //这里调用acquireSudog()创建一个goroutine mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil //放入sendq队列 c.sendq.enqueue(mysg) //goroutine进入休眠 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
如果数据一直发不不去,那么这个goroutine
将一直在这里休眠,直到有数据发送出去了,就会唤醒它。
// someone woke us up. if mysg != gp.waiting { throw("G waiting list is corrupted") }
最后调用releaseSudog
函数回收这个sudog
(goroutine
)。
通过上面的分析,我想大家已经理解了,为什么channel
有数据阻塞,就会导致goroutine
得不到释放,从而导致更严重的goroutine
泄漏问题。
总结
编程千万条,协程第一条;协程有泄露,亲人两行泪!
[参考资料]