Go语言并发模型与模式:Fan-out / Fan-in 模式

简介: Fan-out/Fan-in 是一种经典的并发设计模式,用于任务分发与结果聚合。Fan-out 将任务分发给多个 worker 并发执行,Fan-in 将结果汇聚统一处理。适用于数据抓取、批量计算等“多产一收”场景。通过 goroutine 和 channel,可构建高效的数据处理流水线,具备高吞吐与扩展性。使用时需注意通道设计、异常处理及取消控制等问题。

 

在并发系统中,Fan-out / Fan-in 模式是一种经典的设计方式,用于在多个 goroutine 之间进行任务分发和结果聚合,常用于提高处理吞吐量和并发能力。


一、什么是 Fan-out / Fan-in 模式?

  • Fan-out(扇出):将任务从一个入口分发给多个 worker 并发执行。
  • Fan-in(扇入):将多个 worker 的结果汇聚到一个通道中进行统一处理。

这种模式适用于“多产一收”的数据处理流程,如数据抓取、批量计算等。


二、基本结构图

┌────────────┐
     │ 任务生产者 │
     └────┬───────┘
      Fan-out
  ┌──────┴──────┐
  ▼      ▼      ▼
Worker Worker Worker
  │      │      │
  └──────┬──────┘
       Fan-in
   ┌─────▼─────┐
   │ 结果处理器 │
   └───────────┘

三、代码示例

package main
import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)
func producer(count int) <-chan int {
    out := make(chan int)
    go func() {
        for i := 0; i < count; i++ {
            out <- i
        }
        close(out)
    }()
    return out
}
func worker(id int, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for job := range in {
            time.Sleep(time.Millisecond * time.Duration(rand.Intn(500))) // 模拟处理
            fmt.Printf("Worker %d processed job %d\n", id, job)
            out <- job * 2
        }
        close(out)
    }()
    return out
}
func merge(channels ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)
    output := func(c <-chan int) {
        for val := range c {
            out <- val
        }
        wg.Done()
    }
    wg.Add(len(channels))
    for _, c := range channels {
        go output(c)
    }
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}
func main() {
    rand.Seed(time.Now().UnixNano())
    input := producer(10)
    // Fan-out:启动3个worker处理任务
    w1 := worker(1, input)
    w2 := worker(2, input)
    w3 := worker(3, input)
    // Fan-in:合并3个worker输出
    result := merge(w1, w2, w3)
    for res := range result {
        fmt.Println("结果:", res)
    }
}

四、应用场景

Fan-out / Fan-in 非常适合如下场景:

应用场景 示例
并发抓取网页 多个 URL 同时请求并聚合结果
批量图像处理 多图片缩放或加水印
数据清洗与计算 并发处理 CSV/日志数据
大量任务排队处理 多任务分发并收集结果

五、注意事项

✅ 优点:

  • • 利用多核并发,显著提高处理效率;
  • • 模块清晰,生产者-工作者-聚合器分离;
  • • 易于扩展和监控。

⚠️ 注意事项:

  • • 输入通道必须是“广播型”,即可被多个 worker 消费;
  • • 合并函数 merge 要注意关闭输出通道;
  • • worker 中如有异常(如 panic)应提前恢复;
  • • 可加 context 实现取消控制;

六、小结

Fan-out / Fan-in 是构建并发处理流水线的核心模式,结合 goroutine 和 channel,可以构建高吞吐、高可扩展的数据处理系统。

 

相关文章
|
6月前
|
消息中间件 编解码 Kafka
Go语言并发模型与模式:Worker Pool 模式
Worker Pool(工作池)模式是Go语言中管理高并发任务的有效方法。通过限制 Goroutine 数量,避免资源耗尽或系统崩溃。其核心包括任务通道、工作者 Goroutine、结果通道(可选)及同步机制。示例代码展示了如何分配与处理任务,同时支持带返回值的实现。该模式适用于网络服务、批量任务处理、消息消费等场景,具有限制并发、提高稳定性和结构清晰的优点。但需注意通道关闭时机、任务取消机制及错误处理等问题。Worker Pool 是构建高效任务处理系统的强大工具。
|
7月前
|
存储 Go
Go语言之接口与多态 -《Go语言实战指南》
Go 语言中的接口是实现多态的核心机制,通过一组方法签名定义行为。任何类型只要实现接口的所有方法即视为实现该接口,无需显式声明。本文从接口定义、使用、底层机制、组合、动态行为到工厂模式全面解析其特性与应用,帮助理解 Go 的面向接口编程思想及注意事项(如 `nil` 陷阱)。
216 22
|
7月前
|
Go
循环语句:for、range -《Go语言实战指南》
Go 语言使用 `for` 实现所有循环控制,无 `while` 和 `do...while`。支持经典三段式、条件判断(类似 `while`)、无限循环及 `range` 遍历数组、切片、字符串、map 和 channel。配合 `break` 和 `continue` 控制流程,字符串遍历按 Unicode 处理,避免乱码问题,map 遍历顺序随机。总结涵盖多种用法,灵活高效。
165 11
|
7月前
|
Go C++
Go语言方法与接收者 -《Go语言实战指南》
本文介绍了 Go 语言中方法的相关概念和用法。方法是绑定到特定类型上的函数,包含值接收者和指针接收者两种形式。值接收者不会改变原始数据,而指针接收者可修改原始数据,且在处理大型结构体时性能更优。文章详细对比了方法与普通函数的区别,并说明了选择指针接收者的原因,如修改原始值、提升性能及保持一致性。此外,Go 支持为任意自定义类型定义方法,不仅限于结构体。最后通过表格总结了方法的核心概念和使用场景。
211 34
|
5月前
|
JSON 安全 Go
Go语言项目工程化 —— 日志、配置、错误处理规范
本章详解Go语言项目工程化核心规范,涵盖日志、配置与错误处理三大关键领域。在日志方面,强调其在问题排查、性能优化和安全审计中的作用,推荐使用高性能结构化日志库zap,并介绍日志级别与结构化输出的最佳实践。配置管理部分讨论了配置分离的必要性,对比多种配置格式如JSON、YAML及环境变量,并提供viper库实现多环境配置的示例。错误处理部分阐述Go语言显式返回error的设计哲学,讲解标准处理方式、自定义错误类型、错误封装与堆栈追踪技巧,并提出按调用层级进行错误处理的建议。最后,总结各模块的工程化最佳实践,助力构建可维护、可观测且健壮的Go应用。
|
6月前
|
Go
Go语言同步原语与数据竞争:WaitGroup
本文介绍了 Go 语言中 `sync.WaitGroup` 的使用方法和注意事项。作为同步原语,它通过计数器机制帮助等待多个 goroutine 完成任务。核心方法包括 `Add()`(设置等待数量)、`Done()`(减少计数)和 `Wait()`(阻塞直到计数归零)。文章详细讲解了其基本原理、典型用法(如等待 10 个 goroutine 执行完毕),并提供了代码示例。同时指出常见错误,例如 `Add()` 必须在 goroutine 启动前调用,以及 WaitGroup 不可重复使用。最后总结了适用场景和使用要点,强调避免竞态条件与变量捕获陷阱。
|
6月前
|
安全 Go 调度
Go同步原语与数据竞争:原子操作(atomic)
本文介绍了Go语言中`sync/atomic`包的使用,帮助避免多goroutine并发操作时的数据竞争问题。原子操作是一种不可中断的操作,确保变量读写的安全性。文章详细说明了常用函数如`Load`、`Store`、`Add`和`CompareAndSwap`的功能与应用场景,并通过并发计数器示例展示了其实现方式。此外,对比了原子操作与锁的优缺点,强调原子操作适用于简单变量的高效同步,而不适合复杂数据结构。最后提醒开发者注意使用场景限制,合理选择同步工具以优化性能。
|
7月前
|
Go
Go语言接口的定义与实现
Go 语言的接口提供了一种灵活的多态机制,支持隐式实现和抽象编程。本文介绍了接口的基本定义、实现方式、空接口的使用、类型断言以及接口组合等核心概念,并探讨了接口与 nil 的关系及应用场景。通过示例代码详细说明了如何利用接口提升代码的可扩展性和可测试性,总结了接口的关键特性及其在依赖注入、规范定义和多态调用中的重要作用。
304 14
|
7月前
|
Go
匿名函数与闭包(Anonymous Functions and Closures)-《Go语言实战指南》
本文介绍了 Go 语言中的匿名函数与闭包特性。匿名函数是没有名字的函数,可立即调用或赋值使用;闭包能捕获外部变量并持续访问,适用于状态保存、工厂函数等场景。同时,文章探讨了闭包在并发中的注意事项,并通过示例展示了其用法。这些特性为 Go 提供了函数式编程的能力,增强了代码灵活性与抽象能力。
157 18
|
7月前
|
Go
Go语言之匿名字段与组合 -《Go语言实战指南》
Go 语言通过匿名字段(embedding)实现类似继承的组合机制。匿名字段是在结构体中嵌套类型而不显式命名字段名,自动获取嵌入类型的字段和方法访问权限。支持方法提升、指针嵌入、字段冲突处理及多重组合,强调“组合优于继承”的设计理念,助力灵活高效的代码组织方式。