前言
我们有很多情况下需要主动关闭goroutine,如需要实现一个系统自动熔断的功能就需要主动关闭goroutine
为什么要中断GoRoutine?
场景:
俩个相互依赖的的操作,“依赖”是指如果其中一个失败,那么另一个就没有意义,而不是第二个操作依赖第一个操作的结果(那种情况下,两个操作不能并行)。在这种情况下,如果我们很早就知道其中一个操作失败,那么我们就会希望能取消所有相关的操作。
goroutine介绍
goroutine是Go语言实现并发编程的利器,是 Go语言中的轻量级线程实现,由 Go 运行时(runtime)管理,简单的一个指令go function就能启动一个goroutine;Go 程序会智能地将 goroutine 中的任务合理地分配给每个 CPU。
但是,Go语言并没有提供终止goroutine的接口,也就是说,我们不能从外部去停止一个goroutine,只能由goroutine内部退出(main函数终止除外);
几种停止的办法
1. 使用 for-range
for-range 从 channel 上接收值,直到 channel 关闭,该结构在Go并发编程中很常用,这对于从单一通道上获取数据去执行某些任务是十分方便的
package main import ( "fmt" "sync" ) //源码&面试>>https://javapub.blog.csdn.net/category_11938137.html var wg sync.WaitGroup //等待组,用来阻塞程序 func worker(ch chan int) { defer wg.Done() //等待组 -1 for v := range ch { fmt.Println(v) } } func main() { ch := make(chan int) wg.Add(1) //等待组 +1 go worker(ch) for i := 0; i < 5; i++ { ch <- i } close(ch) //必须要加close,因为在打印完0、1、2、3、4后会发生阻塞,直到chan关闭。 wg.Wait() }
去掉close的情况
2. 使用 for-select (向退出通道发出退出信号)
当channel比较多时,for-range结构借不是很方便了;
Go语言提供了另外一种和channel相关的语法: select;
select能够让goroutine在多个通信操作上等待(可以理解为监听多个channel);
由于这个特性,for-select结构在Go并发编程中使用的频率很高;
我在使用Go的开发中,这是我用的最多的一种组合形式:
这里用 quit通道接收退出信号。
package main import ( "fmt" "sync" "time" ) var wg sync.WaitGroup func worker(in, quit <-chan int) { defer wg.Done() for { select { case <-quit: fmt.Println("收到退出信号") return //必须return,否则goroutine不会结束 case v := <-in: fmt.Println(v) } } } func main() { quit := make(chan int) //退出通道 in := make(chan int) wg.Add(1) go worker(in, quit) for i := 0; i < 3; i++ { in <- i time.Sleep(1 * time.Second) } quit <- 1 //想通道写入退出信号 wg.Wait() }
3. 使用for-select(关闭退出通道)
当我们就需要向 quit 通道中发送100次数据,如果再用以上的代码就很麻烦,有一个很简单的方法,关闭 channel,这样所有监听 quit channel 的 goroutine 就都会收到关闭信号。
package main //源码&面试>>https://javapub.blog.csdn.net/category_11938137.html import ( "fmt" "sync" "time" ) var wg sync.WaitGroup func worker(in, quit <-chan int) { defer wg.Done() for { select { case <-quit: fmt.Println("收到退出信号") return //必须return,否则goroutine不会结束 case v := <-in: fmt.Println(v) } } } func main() { quit := make(chan int) //退出通道 in := make(chan int) wg.Add(1) go worker(in, quit) for i := 0; i < 3; i++ { in <- i time.Sleep(1 * time.Second) } // quit <- 1 //想通道写入退出信号 close(quit) // 直接关闭通道,程序退出 wg.Wait() }
4. 使用for-select(关闭多个channel)
如果select上监听了多个通道,需要所有的通道都关闭后才能结束goroutine,这里就利用select的一个特性,select不会在nil的通道上进行等待,因此将channel赋值为nil即可,此外,还需要利用channel的ok值。
package main //源码&面试>>https://javapub.blog.csdn.net/category_11938137.html import ( "fmt" "sync" "time" ) var wg sync.WaitGroup func worker(in1, in2 <-chan int) { defer wg.Done() for { select { case v, ok := <-in1: if !ok { fmt.Println("收到退出信号1") in1 = nil } fmt.Println(v) case v, ok := <-in2: if !ok { fmt.Println("收到退出信号2") in2 = nil } fmt.Println(v) } if in1 == nil && in2 == nil { return } } } func main() { in1 := make(chan int) in2 := make(chan int) wg.Add(2) go worker(in1, in2) go worker(in1, in2) for i := 0; i < 3; i++ { in1 <- i time.Sleep(1 * time.Second) in2 <- i } close(in1) close(in2) wg.Wait() }
5. 使用context包
context包是官方提供的一个用于控制多个goroutine写作的包;
使用context的cancel信号,可以终止goroutine的运行,context是可以向下传递的
package main //源码&面试>>https://javapub.blog.csdn.net/category_11938137.html import ( "context" "errors" "fmt" "time" ) func operation1(ctx context.Context) error { // 让我们假设这个操作会因为某种原因失败 // 我们使用time.Sleep来模拟一个资源密集型操作 time.Sleep(100 * time.Millisecond) return errors.New("failed") } func operation2(ctx context.Context) { // 我们使用在前面HTTP服务器例子里使用过的类似模式 select { case <-time.After(500 * time.Millisecond): fmt.Println("done") case <-ctx.Done(): fmt.Println("halted operation2") } } func main() { // 新建一个上下文 ctx := context.Background() // 在初始上下文的基础上创建一个有取消功能的上下文 ctx, cancel := context.WithCancel(ctx) //需要取消时,就调用cancel(),发出取消事件。 // 在不同的goroutine中运行operation2 go func() { operation2(ctx) }() err := operation1(ctx) fmt.Println(err) // 如果这个操作返回错误,取消所有使用相同上下文的操作 if err != nil { cancel() } } // func main() { // // 创建一个监听8000端口的服务器 // http.ListenAndServe(":8000", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // ctx := r.Context() // // 输出到 STDOUT 展示处理已经开始 // fmt.Fprint(os.Stdout, "processing request\n") // // 通过select监听多个channel // select { // case <-time.After(2 * time.Second): // // 如果两秒后接受到了一个消息后,意味请求已经处理完成 // // 我们写入"request processed"作为响应 // w.Write([]byte("request processed")) // case <-ctx.Done(): // // 如果处理完成前取消了,在STDERR中记录请求被取消的消息 // fmt.Fprint(os.Stderr, "request cancelled\n") // } // })) // }