go并发编程

简介: go的GMP并发模型,让go天然支持高并发,先了解一下GMP模型吧GMP G协程,M工作线程、P处理器,M必须持有P才可以执行GP维护着一个协程G队列,P依次将G调度到M中运行if M0中G0发生系统调用,M0将释放P,冗余的M1获取P,继续执行P队列中剩余的G。(只要P不空闲就充分利用了CPU)G0系统调用结束后,如果有空闲的P,则获取P继续执行G0,否则将G0放入全局队列,M0进入缓存池睡眠。(全局队列中的G主要来自从系统调用中恢复的G)下面介绍一下编程常用的同步(synchronize)原语互斥锁 mutex rwmutex,要了解自旋和饥饿模式自旋最多4次,cpu核

go的GMP并发模型,让go天然支持高并发,先了解一下GMP模型吧

GMP G协程,M工作线程、P处理器,M必须持有P才可以执行G

P维护着一个协程G队列,P依次将G调度到M中运行

if M0中G0发生系统调用,M0将释放P,冗余的M1获取P,继续执行P队列中剩余的G。(只要P不空闲就充分利用了CPU)

G0系统调用结束后,如果有空闲的P,则获取P继续执行G0,否则将G0放入全局队列,M0进入缓存池睡眠。(全局队列中的G主要来自从系统调用中恢复的G)

下面介绍一下编程常用的同步(synchronize)原语

互斥锁 mutex rwmutex,要了解自旋和饥饿模式

自旋最多4次,cpu核数要大于1,Processor大于1

饥饿模式阻塞超过1ms,饥饿模式下不会启动自旋过程

channel go的精华所在,注意不能关闭已经关闭的channel,不能向已关闭的channel写数据,会panic

Once#

多用于实现单例模式

饿汉模式,一般是直接创建一个包级变量直接使用即可,注意既然是单例模式,就不能让他人随意创建,类型要是私有的,使用接口暴露方法,让外部获得私有变量

懒汉模式,在第一次使用时创建,这里需要注意并发安全,可以使用sync.Once来保证并发安全

type Singleton interface {
  Work() string
}
type singleton2 struct{}
func (s *singleton2) Work() string {
  return "singleton2 is working"
}
func newSingleton2() *singleton2 {
  return &singleton2{}
}
var (
  instance *singleton2
  once     sync.Once
)
// GetSingleton2 用于获取单例模式对象
func GetSingleton2() Singleton {
  once.Do(func() {
    instance = newSingleton2()
  })
  return instance
}

Pool#

sync.Pool 是 Go 语言标准库中的一个并发安全的对象池,用于缓存和重用临时对象,以提高性能。通常用于减少内存分配和垃圾回收的开销。PS:一般将变量Put回Pool中,需要将变量重置

var studentPool = sync.Pool{
  New: func() any {
    return new(Student)
  },
}
func BenchmarkUnmarshalWithPool(b *testing.B) {
  for n := 0; n < b.N; n++ {
    stu := studentPool.Get().(*Student)
    json.Unmarshal(buf, stu)
    studentPool.Put(stu)
  }
}

Cond#

Cond是条件变量,可以让一组goroutine等待某个条件的发生。当条件发生时,调用Broadcast或者Signal来通知所有等待的goroutine继续执行。

需要注意的是 sync.Cond 都要在构造的时候绑定一个 sync.Mutex。Wait() 和 Signal() 函数必须在锁保护下的临界区中执行。Wait()一般放在for循环中,因为可能会出现虚假唤醒

var mu = &sync.Mutex{}
var done = false
func read(name string, c *sync.Cond) {
  c.L.Lock()
  for !done {
    c.Wait()
  }
  log.Println(name, "reading")
  c.L.Unlock()
}
func write(name string, c *sync.Cond) {
  time.Sleep(1 * time.Second)
  c.L.Lock()
  done = true
  c.L.Unlock()
  log.Println(name, "writting finish")
  c.Broadcast()
}
func main() {
  cond := sync.NewCond(mu)
  go read("reader1", cond)
  go read("reader2", cond)
  go read("reader3", cond)
  write("writer", cond)
  time.Sleep(time.Second * 3)
}

WaitGroup#

注意Wait后不能再Add,否则会panic

// WaitGroup 完美写法
func main() {
  taskNum := 3
  ch := make(chan any)
  go func() {
    group := &sync.WaitGroup{}
    for i := 1; i <= taskNum; i++ {
      group.Add(1)
      go func(i int) {
        defer group.Done()
        ch <- i
      }(i)
    }
    // 确保所有取数据的协程都完成了工作,才关闭 ch
    group.Wait()
    close(ch)
  }()
  for i := range ch {
    log.Println("goroutine ", i)
  }
  log.Println("finish")
}

扩展:errgroup,可以捕获goroutine中的panic,只要有一个goroutine出错,就会取消所有的goroutine

atomic#

原子操作,用于解决并发问题,比如计数器,锁等

func main() {
  // 定义一个共享的计数器,使用 int64 类型
  var counter int64
  // 使用 WaitGroup 来等待所有 goroutine 完成
  var wg sync.WaitGroup
  // 启动多个 goroutine 来增加计数器的值
  for i := 0; i < 5; i++ {
    wg.Add(1)
    go func() {
      // 在每个 goroutine 中原子地增加计数器的值
      for j := 0; j < 100000; j++ {
        atomic.AddInt64(&counter, 1)
      }
      wg.Done()
    }()
  }
  // 等待所有 goroutine 完成
  wg.Wait()
  // 输出最终的计数器值
  fmt.Println("Final Counter Value:", atomic.LoadInt64(&counter))
}

context#

WithCancel#

context.WithCancel() 创建可取消的 Context 对象,即可以主动通知子协程退出。

func reqTask(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
          fmt.Println("stop", name)
return
default:
          fmt.Println(name, "send request")
          time.Sleep(1 * time.Second)
       }
    }
}
func main() {
    ctx, cancel := context.WithCancel(context.Background())
go reqTask(ctx, "worker1")
    time.Sleep(3 * time.Second)
    cancel()
    time.Sleep(3 * time.Second)
}

WithValue#

如果需要往子协程中传递参数,可以使用 context.WithValue()

type Options struct{ Interval time.Duration }
func reqTask(ctx context.Context, name string) {
for {
select {
case <-ctx.Done():
          fmt.Println("stop", name)
return
default:
          fmt.Println(name, "send request")
          op := ctx.Value("options").(*Options)
          time.Sleep(op.Interval * time.Second)
       }
    }
}
func main() {
    ctx, cancel := context.WithCancel(context.Background())
    vCtx := context.WithValue(ctx, "options", &Options{1})
go reqTask(vCtx, "worker1")
go reqTask(vCtx, "worker2")
    time.Sleep(3 * time.Second)
    cancel()
    time.Sleep(3 * time.Second)
}

WithTimeout#

如果需要控制子协程的执行时间,可以使用 context.WithTimeout 创建具有超时通知机制的 Context 对象。

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)

WithDeadlineWithTimeout 类似,不同的是 WithDeadline 可以指定一个具体的时间点。

ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Second))

semaphore#

用于限制对一组共享资源的访问。也可以手动用channel实现

// 创建一个具有初始计数为 2 的信号量
sem := semaphore.NewWeighted(2)
// 启动多个 goroutine 模拟并发访问
for i := 1; i <= 5; i++ {
  go func(id int) {
    fmt.Printf("Goroutine %d is trying to acquire the semaphore\n", id)
    // 尝试获取信号量,如果已经达到限制,则阻塞
    err := sem.Acquire(context.Background(), 1)
    if err != nil {
      fmt.Printf("Error acquiring semaphore in goroutine %d: %v\n", id, err)
      return
    }
    fmt.Printf("Goroutine %d has acquired the semaphore\n", id)
    time.Sleep(2 * time.Second)
    // 释放信号量
    sem.Release(1)
    fmt.Printf("Goroutine %d has released the semaphore\n", id)
  }(i)
}
// 等待所有 goroutine 完成
time.Sleep(10 * time.Second)

singleflight#

golang.org/x/sync/singleflight 防止缓存击穿

var sf singleflight.Group
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
  wg.Add(1)
  go func(id int) {
    defer wg.Done()
    // 以 "key" 为参数调用 Do 方法
    val, err, _ := sf.Do("key", func() (interface{}, error) {
      fmt.Printf("Goroutine %d is performing the operation\n", id)
      time.Sleep(2 * time.Second)
      return fmt.Sprintf("Result from goroutine %d", id), nil
    })
    if err != nil {
      fmt.Printf("Goroutine %d encountered an error: %v\n", id, err)
    } else {
      fmt.Printf("Goroutine %d got result: %v\n", id, val)
    }
  }(i)
}
wg.Wait()
相关文章
|
3月前
|
Go 调度 开发者
Go语言中的并发编程:深入理解与实践###
探索Go语言在并发编程中的独特优势,揭秘其高效实现的底层机制。本文通过实例和分析,引导读者从基础到进阶,掌握Goroutines、Channels等核心概念,提升并发处理能力。 ###
|
1月前
|
并行计算 安全 Go
Go语言中的并发编程:掌握goroutines和channels####
本文深入探讨了Go语言中并发编程的核心概念——goroutine和channel。不同于传统的线程模型,Go通过轻量级的goroutine和通信机制channel,实现了高效的并发处理。我们将从基础概念开始,逐步深入到实际应用案例,揭示如何在Go语言中优雅地实现并发控制和数据同步。 ####
|
2月前
|
存储 Go 开发者
Go语言中的并发编程与通道(Channel)的深度探索
本文旨在深入探讨Go语言中并发编程的核心概念和实践,特别是通道(Channel)的使用。通过分析Goroutines和Channels的基本工作原理,我们将了解如何在Go语言中高效地实现并行任务处理。本文不仅介绍了基础语法和用法,还深入讨论了高级特性如缓冲通道、选择性接收以及超时控制等,旨在为读者提供一个全面的并发编程视角。
|
2月前
|
安全 Serverless Go
Go语言中的并发编程:深入理解与实践####
本文旨在为读者提供一个关于Go语言并发编程的全面指南。我们将从并发的基本概念讲起,逐步深入到Go语言特有的goroutine和channel机制,探讨它们如何简化多线程编程的复杂性。通过实例演示和代码分析,本文将揭示Go语言在处理并发任务时的优势,以及如何在实际项目中高效利用这些特性来提升性能和响应速度。无论你是Go语言的初学者还是有一定经验的开发者,本文都将为你提供有价值的见解和实用的技巧。 ####
|
2月前
|
Go 调度 开发者
Go语言中的并发编程:深入理解goroutines和channels####
本文旨在探讨Go语言中并发编程的核心概念——goroutines和channels。通过分析它们的工作原理、使用场景以及最佳实践,帮助开发者更好地理解和运用这两种强大的工具来构建高效、可扩展的应用程序。文章还将涵盖一些常见的陷阱和解决方案,以确保在实际应用中能够避免潜在的问题。 ####
|
2月前
|
安全 Go 数据处理
Go语言中的并发编程:掌握goroutine和channel的艺术####
本文深入探讨了Go语言在并发编程领域的核心概念——goroutine与channel。不同于传统的单线程执行模式,Go通过轻量级的goroutine实现了高效的并发处理,而channel作为goroutines之间通信的桥梁,确保了数据传递的安全性与高效性。文章首先简述了goroutine的基本特性及其创建方法,随后详细解析了channel的类型、操作以及它们如何协同工作以构建健壮的并发应用。此外,还介绍了select语句在多路复用中的应用,以及如何利用WaitGroup等待一组goroutine完成。最后,通过一个实际案例展示了如何在Go中设计并实现一个简单的并发程序,旨在帮助读者理解并掌
|
2月前
|
安全 Java Go
Go语言中的并发编程:掌握goroutine与通道的艺术####
本文深入探讨了Go语言中的核心特性——并发编程,通过实例解析goroutine和通道的高效使用技巧,旨在帮助开发者提升多线程程序的性能与可靠性。 ####
|
2月前
|
Go 开发者
Go语言中的并发编程:掌握goroutines和channels####
本文深入探讨了Go语言中并发编程的核心概念,重点介绍了goroutines和channels的工作原理及其在实际开发中的应用。文章通过实例演示如何有效地利用这些工具来编写高效、可维护的并发程序,旨在帮助读者理解并掌握Go语言在处理并发任务时的强大能力。 ####
|
2月前
|
算法 安全 程序员
Go语言的并发编程:深入理解与实践####
本文旨在探讨Go语言在并发编程方面的独特优势及其实现机制,通过实例解析关键概念如goroutine和channel,帮助开发者更高效地利用Go进行高性能软件开发。不同于传统的摘要概述,本文将以一个简短的故事开头,引出并发编程的重要性,随后详细阐述Go语言如何简化复杂并发任务的处理,最后通过实际案例展示其强大功能。 --- ###
|
2月前
|
存储 安全 Go
Go 语言以其高效的并发编程能力著称,主要依赖于 goroutines 和 channels 两大核心机制
Go 语言以其高效的并发编程能力著称,主要依赖于 goroutines 和 channels 两大核心机制。本文介绍了这两者的概念、用法及如何结合使用,实现任务的高效并发执行与数据的安全传递,强调了并发编程中的注意事项,旨在帮助开发者更好地掌握 Go 语言的并发编程技巧。
36 2