Go并发调度进阶-GMP初始化,最难啃的有时候耐心看完还是很简单的

简介: Go并发调度进阶-GMP初始化,最难啃的有时候耐心看完还是很简单的

640.png

Go并发调度进阶



2. GMP初始化


1. M的初始化

640.png


M 只有自旋和非自旋两种状态。自旋的时候,会努力找工作;找不到的时候会进入非自旋状态,之后会休眠,直到有工作需要处理时,被其他工作线程唤醒,又进入自旋状态。


// src/runtime/proc.go
func mcommoninit(mp *m, id int64) {
 _g_ := getg()
 ...
 lock(&sched.lock)
 ...
 //random初始化,用于窃取 G
 mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed))
 mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed))
 if mp.fastrand[0]|mp.fastrand[1] == 0 {
  mp.fastrand[1] = 1
 }
 // 创建用于信号处理的gsignal,只是简单的从堆上分配一个g结构体对象,然后把栈设置好就返回了
 mpreinit(mp)
 if mp.gsignal != nil {
  mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
 }
 // 把 M 挂入全局链表allm之中
 mp.alllink = allm
 ...
}


这里传入的 id 是-1,初次调用会将 id 设置为 0,这里并未对M0做什么关于调度相关的初始化,所以可以简单的认为这个函数只是把M0放入全局链表allm之中就返回了。当然这个M0就是主M。


2. P的初始化

640.png


  • 通常情况下(在程序运行时不调整 P 的个数),P 只会在上图中的四种状态下进行切换。当程序刚开始运行进行初始化时,所有的 P 都处于 _Pgcstop 状态, 随着 P 的初始化(runtime.procresize),会被置于 _Pidle

  • 当 M 需要运行时,会 runtime.acquirep 来使 P 变成 Prunning 状态,并通过 runtime.releasep 来释放。

  • 当 G 执行时需要进入系统调用,P 会被设置为 _Psyscall, 如果这个时候被系统监控抢夺(runtime.retake),则 P 会被重新修改为 _Pidle

  • 如果在程序运行中发生 GC,则 P 会被设置为 _Pgcstop, 并在 runtime.startTheWorld 时重新调整为 _Prunning

var allp       []*p 
func procresize(nprocs int32) *p {
 // 获取先前的 P 个数
 old := gomaxprocs
 // 更新统计信息
 now := nanotime()
 if sched.procresizetime != 0 {
  sched.totaltime += int64(old) * (now - sched.procresizetime)
 }
 sched.procresizetime = now
 // 根据 runtime.MAXGOPROCS 调整 p 的数量,因为 runtime.MAXGOPROCS 用户可以自行设定
 if nprocs > int32(len(allp)) { 
  lock(&allpLock)
  if nprocs <= int32(cap(allp)) {
   allp = allp[:nprocs]
  } else {
   nallp := make([]*p, nprocs) 
   copy(nallp, allp[:cap(allp)])
   allp = nallp
  }
  unlock(&allpLock)
 }
 // 初始化新的 P
 for i := old; i < nprocs; i++ {
  pp := allp[i]
  // 为空,则申请新的 P 对象
  if pp == nil {
   pp = new(p)
  }
  pp.init(i)
  atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
 }
 _g_ := getg()
 // P 不为空,并且 id 小于 nprocs ,那么可以继续使用当前 P
 if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
  // continue to use the current P
  _g_.m.p.ptr().status = _Prunning
  _g_.m.p.ptr().mcache.prepareForSweep()
 } else { 
  // 释放当前 P,因为已失效
  if _g_.m.p != 0 { 
   _g_.m.p.ptr().m = 0
  }
  _g_.m.p = 0
  p := allp[0]
  p.m = 0
  p.status = _Pidle
  // P0 绑定到当前的 M0
  acquirep(p) 
 }
 // 从未使用的 P 释放资源
 for i := nprocs; i < old; i++ {
  p := allp[i]
  p.destroy() 
  // 不能释放 p 本身,因为他可能在 m 进入系统调用时被引用
 }
 // 释放完 P 之后重置allp的长度
 if int32(len(allp)) != nprocs {
  lock(&allpLock)
  allp = allp[:nprocs]
  unlock(&allpLock)
 }
 var runnablePs *p
 // 将没有本地任务的 P 放到空闲链表中
 for i := nprocs - 1; i >= 0; i-- {
  p := allp[i]
  // 当前正在使用的 P 略过
  if _g_.m.p.ptr() == p {
   continue
  }
  // 设置状态为 _Pidle 
  p.status = _Pidle
  // P 的任务列表是否为空
  if runqempty(p) {
   // 放入到空闲列表中
   pidleput(p)
  } else {
   // 获取空闲 M 绑定到 P 上
   p.m.set(mget())
            // 
   p.link.set(runnablePs)
   runnablePs = p
  }
 }
 stealOrder.reset(uint32(nprocs))
 var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
 return runnablePs
}


procresize方法的执行过程如下:

  1. allp 是全局变量 P 的资源池,如果 allp 的切片中的处理器数量少于期望数量,会对切片进行扩容;
  2. 扩容的时候会使用 new 申请一个新的 P ,然后使用 init 初始化,需要注意的是初始化的 P 的 id 就是传入的 i 的值,状态为 _Pgcstop;
  3. 然后通过 g.m.p 获取 M0,如果 M0 已与有效的 P 绑定上,则将 被绑定的 P 的状态修改为 _Prunning。否则获取 allp[0] 作为 P0 调用 runtime.acquirep 与 M0 进行绑定;
  4. 超过处理器个数的 P 通过p.destroy释放资源,p.destroy会将与 P 相关的资源释放,并将 P 状态设置为 _Pdead;
  5. 通过截断改变全局变量 allp 的长度保证与期望处理器数量相等;
  6. 遍历 allp 检查 P 的是否处于空闲状态,是的话放入到空闲列表中;


3. G的初始化

640.png


这是G的状态流转图,G的初始化相当复杂,需要大家下去对照源码再看一遍。


func newproc(siz int32, fn *funcval) {
 //从 fn 的地址增加一个指针的长度,从而获取第一参数地址
 argp := add(unsafe.Pointer(&fn), sys.PtrSize)
 gp := getg()
 pc := getcallerpc() // 获取调用方 PC/IP 寄存器值
 // 用 g0 系统栈创建 Goroutine 对象
 // 传递的参数包括 fn 函数入口地址, argp 参数起始地址, siz 参数长度, gp(g0),调用方 pc(goroutine)
 systemstack(func() {
  newg := newproc1(fn, argp, siz, gp, pc)
  _p_ := getg().m.p.ptr() //获取p
  runqput(_p_, newg, true) //并将其放入 P 本地队列的队头或全局队列
//检查空闲的 P,将其唤醒,准备执行 G,但我们目前处于初始化阶段,主 Goroutine 尚未开始执行,因此这里不会唤醒 P。
  if mainStarted {
   wakep()
  }
 })
}
// 创建一个运行 fn 的新 g,具有 narg 字节大小的参数,从 argp 开始。
// callerps 是 go 语句的起始地址。新创建的 g 会被放入 g 的队列中等待运行。
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
 _g_ := getg()//因为是在系统栈运行所以此时的 g 为 g0
 acquirem() //禁止抢占
 if fn == nil {
  _g_.m.throwing = -1 // do not dump full stacks
  throw("go of nil func value")
 }
 ...
 siz := narg
 siz = (siz + 7) &^ 7
 ...
// 当前工作线程所绑定的 p
// 初始化时 _p_ = g0.m.p,也就是 _p_ = allp[0]
 _p_ := _g_.m.p.ptr()
// 从 p 的本地缓冲里获取一个没有使用的 g,初始化时为空,返回 nil
 newg := gfget(_p_)
 if newg == nil {
  // 创建一个拥有 _StackMin 大小的栈的 g
  newg = malg(_StackMin)
  // 将新创建的 g 从 _Gidle 更新为 _Gdead 状态
  casgstatus(newg, _Gidle, _Gdead)
  // 将 Gdead 状态的 g 添加到 allg,这样 GC 不会扫描未初始化的栈
  allgadd(newg) 
 }
 if newg.stack.hi == 0 {
  throw("newproc1: newg missing stack")
 }
 if readgstatus(newg) != _Gdead {
  throw("newproc1: new g is not Gdead")
 }
 ...
 // 确定 sp 位置
 sp := newg.stack.hi - totalSize
 // 确定参数入栈位置
 spArg := sp
 ...
 if narg > 0 {
  // 将参数从执行 newproc 函数的栈拷贝到新 g 的栈
  memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
  ...
 }
 // 设置newg的调度相关信息
 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
 newg.sched.sp = sp
 newg.stktopsp = sp
 newg.sched.pc = funcPC(goexit) + sys.PCQuantum 
 newg.sched.g = guintptr(unsafe.Pointer(newg))
 gostartcallfn(&newg.sched, fn)
 newg.gopc = callerpc
 newg.ancestors = saveAncestors(callergp)
 newg.startpc = fn.fn
 if _g_.m.curg != nil {
  newg.labels = _g_.m.curg.labels
 }
 if isSystemGoroutine(newg, false) {
  atomic.Xadd(&sched.ngsys, +1)
 }
 // 设置 g 的状态为 _Grunnable,可以运行了
 casgstatus(newg, _Gdead, _Grunnable)
 // 设置goid
 newg.goid = int64(_p_.goidcache)
 _p_.goidcache++
 ...
 releasem(_g_.m) //恢复抢占 本质上是加锁
 return newg
}


创建 G 的过程也是相对比较复杂的,我们来总结一下这个过程:

  1. 首先尝试从 P 本地 gfree 链表或全局 gfree 队列获取已经执行过的 g
  2. 初始化过程中程序无论是本地队列还是全局队列都不可能获取到 g,因此创建一个新的 g,并为其分配运行线程(执行栈),这时 g 处于 _Gidle 状态
  3. 创建完成后,g 被更改为 _Gdead 状态,并根据要执行函数的入口地址和参数,初始化执行栈的 SP 和参数的入栈位置,并将需要的参数拷贝一份存入执行栈中
  4. 根据 SP、参数,在 g.sched 中保存 SP 和 PC 指针来初始化 g 的运行现场
  5. 将调用方、要执行的函数的入口 PC 进行保存,并将 g 的状态更改为 _Grunnable
  6. 给 Goroutine 分配 id,并将其放入 P 本地队列的队头或全局队列(初始化阶段队列肯定不是满的,因此不可能放入全局队列)
  7. 检查空闲的 P,将其唤醒,准备执行 G,但我们目前处于初始化阶段,主 Goroutine 尚未开始执行,因此这里不会唤醒 P。


4. 小结


结合上篇GMP结构和这篇的初始化,那么对于GMP的调度我相信会有一定的理解,起码你知道了底层的一些点点滴滴,后续文章还会给大家继续讲解GMP调度,你坚持学习到最后发现,最难啃的GMP也就那么回事!


相关文章
|
2月前
|
存储 负载均衡 监控
如何利用Go语言的高效性、并发支持、简洁性和跨平台性等优势,通过合理设计架构、实现负载均衡、构建容错机制、建立监控体系、优化数据存储及实施服务治理等步骤,打造稳定可靠的服务架构。
在数字化时代,构建高可靠性服务架构至关重要。本文探讨了如何利用Go语言的高效性、并发支持、简洁性和跨平台性等优势,通过合理设计架构、实现负载均衡、构建容错机制、建立监控体系、优化数据存储及实施服务治理等步骤,打造稳定可靠的服务架构。
55 1
|
2月前
|
Go 调度 开发者
探索Go语言中的并发模式:goroutine与channel
在本文中,我们将深入探讨Go语言中的核心并发特性——goroutine和channel。不同于传统的并发模型,Go语言的并发机制以其简洁性和高效性著称。本文将通过实际代码示例,展示如何利用goroutine实现轻量级的并发执行,以及如何通过channel安全地在goroutine之间传递数据。摘要部分将概述这些概念,并提示读者本文将提供哪些具体的技术洞见。
|
3月前
|
Java 大数据 Go
Go语言:高效并发的编程新星
【10月更文挑战第21】Go语言:高效并发的编程新星
65 7
|
2月前
|
并行计算 安全 Go
Go语言的并发特性
【10月更文挑战第26天】Go语言的并发特性
26 1
|
3月前
|
安全 程序员 Go
深入浅出Go语言的并发之道
在本文中,我们将探索Go语言如何优雅地处理并发编程。通过对比传统多线程模型,我们将揭示Go语言独特的goroutine和channel机制是如何简化并发编程,并提高程序的效率和稳定性。本文不涉及复杂的技术术语,而是用通俗易懂的语言,结合生动的比喻,让读者能够轻松理解Go语言并发编程的核心概念。
|
25天前
|
存储 监控 算法
员工上网行为监控中的Go语言算法:布隆过滤器的应用
在信息化高速发展的时代,企业上网行为监管至关重要。布隆过滤器作为一种高效、节省空间的概率性数据结构,适用于大规模URL查询与匹配,是实现精准上网行为管理的理想选择。本文探讨了布隆过滤器的原理及其优缺点,并展示了如何使用Go语言实现该算法,以提升企业网络管理效率和安全性。尽管存在误报等局限性,但合理配置下,布隆过滤器为企业提供了经济有效的解决方案。
72 8
员工上网行为监控中的Go语言算法:布隆过滤器的应用
|
1月前
|
存储 Go 索引
go语言中数组和切片
go语言中数组和切片
45 7
|
1月前
|
Go 开发工具
百炼-千问模型通过openai接口构建assistant 等 go语言
由于阿里百炼平台通义千问大模型没有完善的go语言兼容openapi示例,并且官方答复assistant是不兼容openapi sdk的。 实际使用中发现是能够支持的,所以自己写了一个demo test示例,给大家做一个参考。
|
1月前
|
程序员 Go
go语言中结构体(Struct)
go语言中结构体(Struct)
112 71
|
1月前
|
存储 Go 索引
go语言中的数组(Array)
go语言中的数组(Array)
115 67