一个golang并行库源码解析

本文涉及的产品
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
简介: ## 场景 有这样一种场景:四个任务A、B、C, D,其中任务B和C需要并发执行,得到结果1, 任务A执行得到结果2, 结果1和2作为任务D的参数传入,然后执行任务D得到最终结果。我们可以将任务执行顺序用如下图标识: ``` jobA jobB jobC \ \ / \ \ / \ middle \ /

场景

有这样一种场景:四个任务A、B、C, D,其中任务B和C需要并发执行,得到结果1, 任务A执行得到结果2, 结果1和2作为任务D的参数传入,然后执行任务D得到最终结果。我们可以将任务执行顺序用如下图标识:

jobA  jobB   jobC
 \      \     /
  \      \   /
   \      middle
    \      /
     \    /
     jobD

这是一个典型的多任务并发场景,实际上随着任务数量的增多,任务逻辑会更加复杂,如何编写可维护健壮的逻辑代码变得十分重要,虽然golang提供了同步机制,但是需要写很多重复无用的Add/Wait/Done代码,而且代码可读性也很差,这是不能容忍的。

本文介绍一个开源的golang并行库,源码地址https://github.com/buptmiao/parallel

数据结构

1. parallel结构体

type Parallel struct {
        wg        *sync.WaitGroup
        pipes     []*Pipeline
        wgChild   *sync.WaitGroup
        children  []*Parallel
        exception *Handler
}

parallel定义了一个多任务并发实例,主要包括:并发任务管道(pipes)、子任务并发实例(children)、子任务实例等待锁(wgChild)、当前并发任务实例等待锁(wg)

2. pipeline结构体

type Pipeline struct {
        handlers []*Handler
}  
type Handler struct {
        f    interface{}
        args []interface{}
        receivers []interface{}
}       

这里pipeline实际上是一系列并发任务实例handler,每一个handler包括任务函数f, 传入参数args以及返回结果receivers

parallel相关代码

新建parallel实例

func NewParallel() *Parallel {
        res := new(Parallel)
        res.wg = new(sync.WaitGroup)
        res.wgChild = new(sync.WaitGroup)
        res.pipes = make([]*Pipeline, 0, 10)
        return res
}       

注册handler

func (p *Parallel) Register(f interface{}, args ...interface{}) *Handler {
        return p.NewPipeline().Register(f, args...)
}
func (p *Parallel) NewPipeline() *Pipeline {
        pipe := NewPipeline()
        p.Add(pipe)
        return pipe
} 
func (p *Parallel) Add(pipes ...*Pipeline) *Parallel {
        p.wg.Add(len(pipes))
        p.pipes = append(p.pipes, pipes...)
        return p
}

新建子parallel实例

func (p *Parallel) NewChild() *Parallel {
        child := NewParallel()
        child.exception = p.exception
        p.AddChildren(child)
        return child
}
func (p *Parallel) AddChildren(children ...*Parallel) *Parallel {
        p.wgChild.Add(len(children))
        p.children = append(p.children, children...)
        return p
}

任务运行

func (p *Parallel) Run() {
        for _, child := range p.children {
                // this func will never panic
                go func(ch *Parallel) {
                        ch.Run()
                        p.wgChild.Done()
                }(child)
        }
        p.wgChild.Wait() //wait children instance done
        p.do() //run
        p.wg.Wait() //wait all job done
}
func (p *Parallel) do() {
        for _, pipe := range p.pipes {
                go p.Do()
        }
}

pipeline相关代码

新建pipeline实例

func NewPipeline() *Pipeline {
        res := new(Pipeline)
        return res
}       

注册handler

func (p *Pipeline) Register(f interface{}, args ...interface{}) *Handler {
        h := NewHandler(f, args...)
        p.Add(h)
        return h
}       

添加handler

func (p *Pipeline) Add(hs ...*Handler) *Pipeline {
        p.handlers = append(p.handlers, hs...)
        return p
}

任务运行

func (p *Pipeline) Do() {
        for _, h := range p.handlers {
                h.Do()
        }
}

handler相关代码

新建handler实例

func NewHandler(f interface{}, args ...interface{}) *Handler {
        res := new(Handler)
        res.f = f
        res.args = args
        return res
}

运行任务

func (h *Handler) Do() {
        f := reflect.ValueOf(h.f)
        typ := f.Type()
        //check if f is a function
        if typ.Kind() != reflect.Func {
                panic(ErrArgNotFunction)
        }
        //check input length, only check '>' is to allow varargs.
        if typ.NumIn() > len(h.args) {
                panic(ErrInArgLenNotMatch)
        }
        //check output length
        if typ.NumOut() != len(h.receivers) {
                panic(ErrOutArgLenNotMatch)
        }
        //check if output args is ptr
        for _, v := range h.receivers {
                t := reflect.ValueOf(v)
                if t.Type().Kind() != reflect.Ptr {
                        panic(ErrRecvArgTypeNotPtr)
                }
                if t.IsNil() {
                        panic(ErrRecvArgNil)
                }
        }

        inputs := make([]reflect.Value, len(h.args))
        for i := 0; i < len(h.args); i++ {
                if h.args[i] == nil {
                        inputs[i] = reflect.Zero(f.Type().In(i))
                } else {
                        inputs[i] = reflect.ValueOf(h.args[i])
                }
        }
        out := f.Call(inputs)

        for i := 0; i < len(h.receivers); i++ {
                v := reflect.ValueOf(h.receivers[i])
                v.Elem().Set(out[i])
        }
}

demo

package main

import "github.com/buptmiao/parallel"

func testJobA(x, y int) int {
        return x - y
}

func testJobB(x, y int) int {
        return x + y
}

func testJobC(x, y *int, z int) float64 {
        return float64((*x)*(*y)) / float64(z)
}

func main() {
        var x, y int
        var z float64

        p := parallel.NewParallel()

        ch1 := p.NewChild()
        ch1.Register(testJobA, 1, 2).SetReceivers(&x)

        ch2 := p.NewChild()
        ch2.Register(testJobB, 1, 2).SetReceivers(&y)

        p.Register(testJobC, &x, &y, 2).SetReceivers(&z)

        p.Run()

        if x != -1 || y != 3 || z != -1.5 {
                panic("unexpected result")
        }
}
目录
相关文章
|
2月前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
2月前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
2月前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
15天前
|
自然语言处理 数据处理 索引
mindspeed-llm源码解析(一)preprocess_data
mindspeed-llm是昇腾模型套件代码仓,原来叫"modelLink"。这篇文章带大家阅读一下数据处理脚本preprocess_data.py(基于1.0.0分支),数据处理是模型训练的第一步,经常会用到。
33 0
|
2月前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
存储 JavaScript 前端开发
go源码解析-Println的故事
本文主要通过平常常用的go的一个函数,深入源码,了解其底层到底是如何实现的。 Println Println函数接受参数a,其类型为…interface{}。用过Java的对这个应该比较熟悉,Java中也有…的用法。
|
2月前
|
存储 监控 算法
员工上网行为监控中的Go语言算法:布隆过滤器的应用
在信息化高速发展的时代,企业上网行为监管至关重要。布隆过滤器作为一种高效、节省空间的概率性数据结构,适用于大规模URL查询与匹配,是实现精准上网行为管理的理想选择。本文探讨了布隆过滤器的原理及其优缺点,并展示了如何使用Go语言实现该算法,以提升企业网络管理效率和安全性。尽管存在误报等局限性,但合理配置下,布隆过滤器为企业提供了经济有效的解决方案。
85 8
员工上网行为监控中的Go语言算法:布隆过滤器的应用
|
2月前
|
存储 Go 索引
go语言中数组和切片
go语言中数组和切片
47 7
|
2月前
|
Go 开发工具
百炼-千问模型通过openai接口构建assistant 等 go语言
由于阿里百炼平台通义千问大模型没有完善的go语言兼容openapi示例,并且官方答复assistant是不兼容openapi sdk的。 实际使用中发现是能够支持的,所以自己写了一个demo test示例,给大家做一个参考。
|
2月前
|
程序员 Go
go语言中结构体(Struct)
go语言中结构体(Struct)
116 71

推荐镜像

更多