资源竞争
所谓资源竞争,就是在程序中,同一块内存同时被多个 goroutine 访问。对于这个共享的资源(内存)每个 goroutine 都有不同的操作,就有可能造成数据紊乱。
示例:
package main import ( "fmt" "time" ) var sum = 0 func main() { //开启100个协程来让 sum + 1 for i := 1; i <= 100; i++ { go add() } // 睡眠两秒防止程序提前退出 time.Sleep(2 * time.Second) fmt.Println("sum:",sum) } func add(){ sum += 1 } //运行结果: sum:98 或 sum:99 或 ... 复制代码
- 多次运行上面的程序,发现打印的结果可能存在不同,因为我们用多个协程来操作 sum,而 sum 不是并发安全的,存在竞争。
- 我们使用 go build、go run、go test 命令时,添加 -race 标识可以检查代码中是否存在资源竞争。
解决这个问题,我们可以给资源进行加锁,让其在同一时刻只能被一个协程来操作。
sync.Mutex
- 互斥锁,使同一时刻只能有一个协程执行某段程序,其他协程等待该协程执行完再依次执行。
- 互斥锁只有两个方法 Lock (加锁)和 Unlock(解锁),当一个协程对资源上锁后,只有等该协程解锁,其他协程才能再次上锁。
- Lock 和 Unlock 是成对出现,为了防止上锁后忘记释放锁,我们可以使用 defer 语句来释放锁。
示例:
package main import ( "fmt" "sync" "time" ) var sum = 0 var mutex = sync.Mutex{} func main() { //开启100个协程来让 sum + 1 for i := 1; i <= 100; i++ { go add() } // 睡眠两秒防止程序提前退出 time.Sleep(2 * time.Second) fmt.Println("sum:",sum) } func add(){ mutex.Lock() defer mutex.Unlock() //使用defer语句,确保锁一定会被释放 sum += 1 } 复制代码
symc.RWMutex
- 上面我们使用互斥锁,来防止多个协程同时对 sum 做加法操作的时候产生数据错乱。RWMutex为读写锁,当读取竞争资源的时候,因为数据不会改变,所以不管多少个 goroutine 读都是并发安全的。
- 因为可以多个协程同时读,不再相互等待,所以在性能上比互斥锁会有很大的提升。
示例:
package main import ( "fmt" "sync" "time" ) var sum = 0 var mutex = sync.Mutex{} var rwmutex = sync.RWMutex{} func main() { //开启100个协程来让 sum + 1 for i := 1; i <= 100; i++ { go add() } for i := 1; i<= 10; i++ { go fmt.Println("sum:",getSum()) } // 睡眠两秒防止程序提前退出 time.Sleep(2 * time.Second) fmt.Println("sum:", sum) } func add(){ mutex.Lock() defer mutex.Unlock() //使用defer语句,确保锁一定会被释放 sum += 1 } func getSum() int { rwmutex.RLock() //使用读写锁 defer rwmutex.RUnlock() return sum } 复制代码
sync.WaitGroup
- 上面的示例中,我们都是要了 time.Sleep(2 * time.Second),来防止:主函数 mian 返回,提前退出程序。但是我们并不知道程序真正什么时候执行完,所以只能设置个长点的时间避免程序提前退出,这样会产生性能问题。
- 这时候我们就用到了 sync.WaitGroup ,它可以监听程序的执行,一旦全部执行完毕,程序就能马上退出。
示例:
package main import ( "fmt" "sync" ) var sum = 0 var mutex = sync.Mutex{} var rwmutex = sync.RWMutex{} func run() { var wg sync.WaitGroup //因为要监控110个协程,所以设置计数器为110 wg.Add(110) for i := 1; i <= 100; i++ { go func() { //计数器值减1 defer wg.Done() add() }() } for i := 1; i <= 10; i++ { go func() { //计数器值减1 defer wg.Done() fmt.Println("sum:", getSum()) }() } //一直等待,只要计数器值为0 wg.Wait() } func main() { run() } func add() { mutex.Lock() defer mutex.Unlock() //使用defer语句,确保锁一定会被释放 sum += 1 } func getSum() int { rwmutex.RLock() //使用读写锁 defer rwmutex.RUnlock() return sum } 复制代码
- 示例中我们先声明了 sync.WaitGroup ,然后通过 Add() 方法设置计数器的值,也就是说有多少个协程监听。
- 在每个协程执行完毕后,调用 Done 方法来使计算器减 1。
- 最后调用 Wait 方法一直等待,直到计数器为 0,所以协程全部执行完毕。
sync.Once
有时候我们只希望代码执行一次,即使是在高并发的场景下,比如创建一个单例。这种情况可以使用 sync.Once 来保证代码只执行一次。
示例:
package main import ( "fmt" "sync" ) func main() { var once sync.Once onceBody := func() { fmt.Println("Only once") } //用于等待协程执行完毕 done := make(chan bool) //启动10个协程执行once.Do(onceBody) for i := 0; i < 10; i++ { go func() { //把要执行的函数(方法)作为参数传给once.Do方法即可 once.Do(onceBody) done <- true }() } for i := 0; i < 10; i++ { <-done } } //运行结果: Only once 复制代码
- 上面这个是 Go 语言自带的示例,虽然启动了 10 个协程来执行 onceBody 函数,但是 once.DO 方法保证 onceBody 函数只会执行一次。
- sync.Once 适合用于创建单例、只加载一次资源等只需要执行一次的场景。
条件变量 sync.Cond
- 我们有一项任务,只有满足了条件情况下才能执行,否则就等着。如何获取这个条件呢?可以使用 channel 的方式,但是 channel 适用于一对一,一对多就需要用到 sync.Cond
- sync.Cond 是基于互斥锁的基础上,增加了一个通知队列,协程刚开始是等待的,通知的协程会从通知队列中唤醒一个或多个被通知的协程。
- sync.Cond 主要有以下几个方法:
- sync.NewCond(&mutex) //sync.Cond 通过 sync.NewCond 初始化,需要传入一个 mutex,因为阻塞等待通知的操作以及通知解除阻塞的操作就是基于 sync.Mutex 来实现的。
- sync.Wait() //等待通知
阻塞当前协程,直到被其他协程调用 Broadcast 或者 Signal 方法唤醒,使用的时候需要加锁,使用 sync.Cond 中的锁即可 - sync.Signal() //单发通知,随机唤醒一个协程
- sync.Broadcat() //广播通知,唤醒所有等待的协程。
示例:
package main import ( "fmt" "sync" "time" ) func main() { //3个人赛跑,1个裁判员发号施令 cond := sync.NewCond(&sync.Mutex{}) var wg sync.WaitGroup wg.Add(4) //3选手+1裁判 for i := 1; i <= 3; i++ { go func(num int) { defer wg.Done() fmt.Println(num, "号选手已经就位") cond.L.Lock() cond.Wait() //等待发令枪响 fmt.Println(num, "号选手开始跑……") cond.L.Unlock() }(i) } //等待所有goroutine都进入wait状态 time.Sleep(2 * time.Second) go func() { defer wg.Done() fmt.Println("裁判:“各就各位~~预备~~”") fmt.Println("啪!!!") cond.Broadcast() //发令枪响 }() //防止函数提前返回退出 wg.Wait() } 复制代码
运行结果:
3 号选手已经就位 1 号选手已经就位 2 号选手已经就位 裁判:“各就各位~~预备~~” 啪!!! 2 号选手开始跑…… 3 号选手开始跑…… 1 号选手开始跑…… 复制代码
最后贴一下 sync.Cond 几个方法的源码:
// Wait atomically unlocks c.L and suspends execution // of the calling goroutine. After later resuming execution, // Wait locks c.L before returning. Unlike in other systems, // Wait cannot return unless awoken by Broadcast or Signal. // Wait方法释放锁,并阻塞协程执行。满足条件解除阻塞后,当前协程需要获得锁然后Wait方法返回。 // // Because c.L is not locked when Wait first resumes, the caller // typically cannot assume that the condition is true when // Wait returns. Instead, the caller should Wait in a loop: // 由于解除阻塞后,当前协程不一定能马上获得锁,因此返回后需要再次检查条件,所以通常 // 使用循环。 // c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock() // func (c *Cond) Wait() { c.checker.check() t := runtime_notifyListAdd(&c.notify) c.L.Unlock() // 释放锁 runtime_notifyListWait(&c.notify, t) // 等待满足条件,解除阻塞 c.L.Lock() // 获取锁 } // Signal wakes one goroutine waiting on c, if there is any. // // It is allowed but not required for the caller to hold c.L // during the call. func (c *Cond) Signal() { c.checker.check() runtime_notifyListNotifyOne(&c.notify) } // Broadcast wakes all goroutines waiting on c. // // It is allowed but not required for the caller to hold c.L // during the call. func (c *Cond) Broadcast() { c.checker.check() runtime_notifyListNotifyAll(&c.notify) } 复制代码
条件变量的 Wait 方法主要做了四件事:
- 把调用它的 goroutine(也就是当前的 goroutine)加入到当前条件变量的通知队列中。
- 解锁当前的条件变量基于的那个互斥锁。
- 让当前的 goroutine 处于等待状态,等到通知到来时再决定是否唤醒它。此时,这个 goroutine 就会阻塞在调用这个 Wait 方法的那行代码上。
- 如果通知到来并且决定唤醒这个 goroutine,那么就在唤醒它之后重新锁定当前条件变量基于的互斥锁。自此之后,当前的 goroutine 就会继续执行后面的代码了。
注意事项
- 调用 wait 方法的时候一定要加锁,否则会导致程序发生 panic.
- wait 调用时需要检查等待条件是否满足,也就说 goroutine 被唤醒了不等于等待条件被满足,等待者被唤醒,只是得到了一次检查的机会而已,推荐写法如下:
// c.L.Lock() // for !condition() { // c.Wait() // } // ... make use of condition ... // c.L.Unlock() 复制代码
- Signal 和 Boardcast 两个唤醒操作不需要加锁
sync.Map
map 同时读写是线程不安全的,会发生了竞态问题。而 sync.Map 和 map 类型一样,只不过它是并发安全的。
sync.Map 的方法:
- Store : 存储 key-value 值
- Load: 根据 key 获取对应的 value 值,还可以判断 key 是否存在。
- LoadOrStore: 如果 key 对应的 value 存在,则返回 value ;不存在则存储 key-value 值。
- Delete: 删除一个 key-value 键值对
- Range:遍历 sync.Map
示例:
package main import ( "fmt" "sync" ) func main() { var syMap sync.Map // 将键值对保存到sync.Map syMap.Store("aaa", 111) syMap.Store("bbb", 222) syMap.Store("ccc", 333) fmt.Println(syMap.LoadOrStore("ddd", 444)) // 从sync.Map中根据键取值 fmt.Println(syMap.Load("aaa")) // 根据键删除对应的键值对 syMap.Delete("aaa") // 遍历所有sync.Map中的键值对 syMap.Range(func(k, v interface{}) bool { fmt.Println("k:", k, "=》 v:", v) return true }) } 复制代码
运行结果:
444 false 111 true k: bbb =》 v: 222 k: ccc =》 v: 333 k: ddd =》 v: 444 复制代码
sync.Map 没有获取 map 数量的方法,可以在 遍历的时候自行计算数量,sync.Map 为了保证并发安全,牺牲了一些性能,如果没有并发场景,推荐使用内置的 map 类。