Limit Concurrency你真的会吗

简介: Limit Concurrency你真的会吗

一. 按照正常思路来一个架构设计



1. 基础知识必备


  • sync.WaitGroup
  • chan
  • go func


2. 原始消费架构模式设计


package main
import (
 "fmt"
 "math/rand"
 "sync"
 "time"
)
func main() {
 const concurrencyProcesses = 10 // 限制最大并发处理数量
 const jobCount = 100 // 有多少任务
 var wg sync.WaitGroup
 wg.Add(jobCount)
 found := make(chan int)
 limitCh := make(chan struct{}, concurrencyProcesses) // 用chan去控制并发数量
 for i := 0; i < jobCount; i++ { //处理100个job
  limitCh <- struct{}{} // 控制并发数量
  go func(val int) {
   defer func() {
    wg.Done()
    <-limitCh
   }()
   waitTime := rand.Int31n(1000)
   fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
   time.Sleep(time.Duration(waitTime) * time.Millisecond)
   found <- val
  }(i)
 }
 go func() { // 等待goroutine结束并且关闭结果chan
  wg.Wait()
  close(found)
 }()
 var results []int
 for p := range found { // 从结果chan获取数据
  fmt.Println("Finished job:", p)
  results = append(results, p)
 }
 fmt.Println("result:", results)
}


大家猜猜结果是怎么样的?


3. 公布结果

640.png


为什么deadlock?


4. 究其原因


这就是Limit Concurrency问题,即限制并发问题


在读取第一个job的时候,现将空的struct放进limitCh中,这个时候limitCh中就只剩下9个可以继续处理,接着重复这个步骤继续放入job,直到10个job装满limitCh。但是当第11个job需要处理的时候,程序就直接停止在limitCh <- struct{}{}处,因为前10个goroutine不知道什么时候才能处理结束,这样第11个job它后面的代码就完全没机会执行,造成整个系统deadlock


但是如果你的job数量小于等于10,是完全看不出来任何问题的,系统可以正常运行,只有job数量大于并发数量的时候才会出这个BUG。


二. 按照BUG再设计一版



相信大家想到了一种解决方式,既然程序是卡在limitCh <- struct{}{},那么就直接将这段处理逻辑丢到go func中处理就好了。


...
    for i := 0; i < jobCount; i++ {
        go func() { //丢到go func中去处理
            limitCh <- struct{}{}
        }()
        go func(val int) {
        ...

结果:


640.png


发现100个job同时处理直到结束,并没有达到限制并发处理的要求,虽然满足最终把这些job处理完成,但是我们要的可是Limit Concurrency


三. 最佳实践,也是大家拿来就可以用的限制并发框架模式



既然要限制并发数量,那么就建立特定数量的worker,每个worker读取chan就可以了,所以

第一步就是先建立queue的通道,将所有job都放入queue中,但是以go func方式去处理,避免阻塞main程序。


第二步就是建立特定的worker数量来消化全部的job。


package main
import (
 "fmt"
 "math/rand"
 "sync"
 "time"
)
func main() {
 const concurrencyProcesses = 10 // 限制最大并发处理数量
 const jobCount = 100 // 有多少任务
 var wg sync.WaitGroup
 wg.Add(jobCount)
 found := make(chan int)
 queue := make(chan int)
 go func(queue chan<- int) { // 生产端负责生产任务到queue
  for i := 0; i < jobCount; i++ {
   queue <- i
  }
  close(queue)
 }(queue)
 for i := 0; i < concurrencyProcesses; i++ { // 当前只有10个并发 他们做的事情就是处理任务 并且将结果set in found
  go func(queue <-chan int, found chan<- int) {
   for val := range queue {
    defer wg.Done()
    waitTime := rand.Int31n(1000)
    fmt.Println("job:", val, "wait time:", waitTime, "millisecond")
    time.Sleep(time.Duration(waitTime) * time.Millisecond)
    found <- val
   }
  }(queue, found) // 传递进去 防止data race
 }
 go func() { // 等待goroutine结束并且关闭结果chan
  wg.Wait()
  close(found)
 }()
 var results []int
 for p := range found { // 从结果chan获取数据
  fmt.Println("Finished job:", p)
  results = append(results, p)
 }
 fmt.Println("result:", results)
}

可以看到这里的for循环是以concurrencyProcesses为并发数量去处理job了。10个goroutine通过内层for循环不断读取chan中的job,直到queue中没有job为止,这样生产端生产完job之后会关闭queue,那么goroutine最后收到关闭消息后退出for循环,结束goroutine。

可以看下结果:

640.png


小结



其实还有很多常见的其他限制并发处理模式,但是我喜欢用上面这种处理模式。这里有个细节需要大家注意哈,就是控制并发的时候要处理好数据竞争(data race)问题,如果处理不好,可能最后结果不一定是对的。

- END -

相关文章
|
6月前
|
缓存 监控 时序数据库
influxdb报错:cache-max-memory-size exceeded
influxdb报错:cache-max-memory-size exceeded
209 0
|
存储 缓存 大数据
Starrocks执行查询报错:Memory of process exceed limit. Used: XXX, Limit: XXX. Mem usage has exceed the limit of BE
Starrocks执行查询报错:Memory of process exceed limit. Used: XXX, Limit: XXX. Mem usage has exceed the limit of BE
|
算法 安全 Shell
译 | Concurrency is not Parallelism(二)
译 | Concurrency is not Parallelism(二)
68 0
|
程序员 Linux Go
译 | Concurrency is not Parallelism(一)
译 | Concurrency is not Parallelism
82 0
|
缓存 Go
译 | Concurrency is not Parallelism(四)
译 | Concurrency is not Parallelism
70 0
|
负载均衡 安全 Go
译 | Concurrency is not Parallelism(三)
译 | Concurrency is not Parallelism(三)
75 0
|
SQL 关系型数据库 MySQL
postgre分页查询报错:ERROR: LIMIT #,# syntax is not supported 建议:Use separate LIMIT and OFFSET clauses
postgre分页查询报错:ERROR: LIMIT #,# syntax is not supported 建议:Use separate LIMIT and OFFSET clauses
407 0
postgre分页查询报错:ERROR: LIMIT #,# syntax is not supported 建议:Use separate LIMIT and OFFSET clauses
|
Oracle Java 关系型数据库
GC Overhead Limit Exceeded Error
GC Overhead Limit Exceeded Error
170 0
Using 1 worker with 2048MB memory limit 异常
这里使用的是node 16版本,在家里电脑可以,但是公司电脑不行,不知道为啥 最后换了14版本,就可以了。
2004 0
Using 1 worker with 2048MB memory limit 异常
|
存储 缓存 算法
Block Throttle - Low Limit
传统的 block throttle 的语义是,cgroup 不能超过用户配置的 IOPS/BPS,此时所有 cgroup 会自由竞争 IO 资源;那么其存在的问题就是,如果用户配置的 IOPS/BPS 过高,所有 cgroup 之间就会完全自由竞争 IO 资源,从而无法保证 QoS,而如果用户配置的 IOPS/BPS 过低,又无法充分发挥 block 设备的性能 Facebook 的 Shao
600 0