Linux 多线程(三)

简介: Linux 多线程

生产者消费模型


概念


举个栗子

dd7844b78e315bd3cf80d326761fb6f3_64a03488c12c4f2d86c602be42740204.png


在学校里,超市中存放着许多商品,这些商品并不是超市生产的而是由供应商供货,通过超市这一交易场所为学生提供商品


由于超市的存在,学生与供应商之间就不需要产生联系也就是所谓的“解耦”;学生不需要到供应商那里购买商品,两者在生产和消费之间没有任何联系


在超市中,学生与学生之间可能会存在购买同一商品,且商品库存不够的情景,在线程中称作互斥关系;在供应商向超市提供商品时,同一商品的不同品牌也存在着互斥关系,只能存在其中一个


学生与供应商之间:只有货架上存在商品时,才能去购买,不能说商品边生产边消费,这也是一种互斥关系;供应商一定是生产一部分,学生消费一部分,待消费得差不多时,再进行生产,这里存在着同步的关系


概念:

生产者消费者模型就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力,这个阻塞队列就是用来给生产者和消费者解耦的


优点


生产线程和销毁线程进行解耦

支持生产和消费的一段忙闲不均

提高效率

总结


3种关系:生产者与生产者(互斥),消费者与消费者(互斥),生产者与消费者(互斥,同步)

2种角色:生产者,消费者

1个交易场所:一段特定结构的缓冲区

不过呢,还存在一个问题:

2bb0cdd6c86491ea42c084139cb3d068_2cb6aee471244bef8c5c0ee01c8d12df.png


当超市的商品已经足够时,由于供应商并不知道,所以需要对超市(共享资源)加锁,判断,解锁;学生的优先级较低,只能看着供应商一直在重复着加锁,判断,解锁的操作,条件变量就是用来解决这一问题的


基于阻塞队列的生产消费模型


d4d49c0c144e993f2be56eff2ce625c0_987a4943a210436bbcdd0b5000319c08.jpeg

在多线程编程中阻塞队列是一种常用于实现生产者和消费者模型的数据结构;当队列为空时,从队列中获取元素的操作将会被阻塞,直到队列中放入元素;当队列满时,向队列中存放元素的操作也会被阻塞,直到有元素被从队列中取出


阻塞队列实现


const int bmaxcap=5;
template<class T>
class BlockQueue
{
public:
    BlockQueue(const int&maxcap=bmaxcap)
    :_maxcap(maxcap)
    {
        pthread_mutex_init(&_mutex,nullptr);
        pthread_cond_init(&_pcond,nullptr);
        pthread_cond_init(&_ccond,nullptr);
    }
    //输入型参数  const &
    void push(const T&in)
    {
        pthread_mutex_lock(&_mutex);
        //充当条件判断的语句必须是while
        //可能会存在假唤醒
        while(is_full())
        {
            pthread_cond_wait(&_pcond,&_mutex);
        }
        _q.push(in);
        pthread_cond_signal(&_ccond);
        pthread_mutex_unlock(&_mutex);
    }
    //输出型参数 *
    void pop(T*out)
    {
        pthread_mutex_lock(&_mutex);
        while(is_empty())
        {
            pthread_cond_wait(&_ccond,&_mutex);
        }
        *out=_q.front();
        _q.pop();
        pthread_cond_signal(&_pcond);
        pthread_mutex_unlock(&_mutex);
    }
    ~BlockQueue()
    {
        pthread_mutex_destroy(&_mutex);
        pthread_cond_destroy(&_pcond);
        pthread_cond_destroy(&_ccond);
    }
private:
    bool is_empty()
    {
        return _q.empty();
    }
    bool is_full()
    {
        return _q.size()==_maxcap;
    }
private:
    queue<T> _q;
    int _maxcap;//队列的上限
    //互斥量以保证共享资源安全
    pthread_mutex_t _mutex;
    //生产者对应的条件变量
    pthread_cond_t _pcond;
    //消费者对应的条件变量
    pthread_cond_t _ccond;
};

生产任务代码


class Task
{
    using func_t=function<int(int,int,char)>;
public:
    Task()
    {}
    Task(int x,int y,char op,func_t func)
    :_x(x)
    ,_y(y)
    ,_op(op)
    ,_callback(func)
    {}
    string operator()()
    {
        int result=_callback(_x,_y,_op);
        char buffer[1024];
        snprintf(buffer,sizeof(buffer),"%d %c %d = %d",_x,_op,_y,result);
        return buffer;
    }
    string toTaskString()
    {
        char buffer[1024];
        snprintf(buffer,sizeof(buffer),"%d %c %d = ?",_x,_op,_y);
        return buffer;
    }
private:
    int _x;
    int _y;
    char _op;
    func_t _callback;
};


测试代码


const string oper="+-*/%";
int mymath(int x,int y,char op)
{
    int result=0;
    switch(op)
    {
        case '+':
            result=x+y;
            break;
        case '-':
            result=x-y;
            break;
        case '*':
            result=x*y;
            break;
        case '/':
            if(y==0)
            {
                cout<<"div zero!"<<endl;
                result=-1;
            }
            else
                result=x/y;
            break;
        case '%':
            if(y==0)
            {
                cout<<"mod zero!"<<endl;
                result=-1;
            }
            else
                result=x%y;
        break;
        default:
            break;
    };
}
//消费者
void*consumer(void*_bq)
{
    BlockQueue<Task>*bq=static_cast<BlockQueue<Task>*>(_bq);
    while(true)
    {
        Task t;
        bq->pop(&t);
        cout<<"消费任务: "<<t()<<endl;
    }
    return nullptr;
}
//生产者
void*productor(void*_bq)
{
    BlockQueue<Task>*bq=static_cast<BlockQueue<Task>*>(_bq);
    while(true)
    {
        int x=rand()%10+1;
        int y=rand()%5;
        int opercode=rand()%oper.size();
        Task t(x,y,oper[opercode],mymath);
        bq->push(t);
        cout<<"生产任务: "<<t.toTaskString()<<endl;
        sleep(1);
    }
    return nullptr;
}
int main()
{
    srand((unsigned long)time(nullptr)^getpid());
    BlockQueue<Task>*bq=new BlockQueue<Task>();
    pthread_t c,p;
    pthread_create(&c,nullptr,consumer,bq);
    pthread_create(&p,nullptr,productor,bq);
    pthread_join(c,nullptr);
    pthread_join(p,nullptr);
    return 0;
}

运行结果


b223bc4be389776b03464afa53d3c60e_8c074382abdd48ec9a595d33d4388d48.png


在这里可以理解生产消费模型的高效:生产者在将任务放到队列之前,获取和构建任务需要消耗大量时间;消费者从队列中取出任务之后,执行任务同样也需要时间;模型的出现使线程在任务在生产之前,消费之后并行执行,因此而变得高效


POSIX信号量


概念


在上面阻塞队列中,生产者产生任务时,队列必须有空的资源,也就是满足产生任务的条件;判断公共资源是否满足条件在没有访问之前是无法得知的,只能通过先加锁,进行检测,接着操作,最后解锁,而且默认情况下只要对资源加锁,就意味着对整个资源都使用,但事实并不是如此,也可能是使用同一资源的不同区域


信号量的引入就可以提前得知资源的情况,也可以实现访问同一资源的不同区域


概念:

信号量本质是一个计数器,用来衡量资源数量的计数器。在线程访问某一资源之前先申请信号量,如果申请成功,就说明资源满足条件,自己也就拥有资源的这一区域,如果失败说明条件不满足,只能等待;所以申请信号量的本质是对资源中特定小块资源的预定


举个栗子

临界资源就是电影院,在电影开幕之前,观众需要进行买票,买票这一操作就是预定,座位就可以看作是资源的不同区域;买票成功,就表示可以进行观影


线程想要访问临界资源中的某一区域,需要先申请信号量,申请的前提是所有线程都能看到,也就是说信号量本质也是公共资源


信号量 sem_t sem的结构能够保证其操作是原子性的


信号量初始化


int sem_init(sem_t *sem, int pshared, unsigned int value);


pshared:0表示线程间共享;!0表示进程间共享

value:信号量初值,表示资源的多少


销毁信号量


int sem_destroy(sem_t *sem);


等待信号量


int sem_wait(sem_t *sem);


等待信号量,相当于 sem--;申请资源


发布信号量


int sem_post(sem_t *sem);


发布信号量,表示资源使用完毕,相当于sem++;归还资源


基于环形队列的生产消费模型

8b49c964f1baa9193cea60d86082618e_881fd3ad296a432b8b943958ac9ea8a0.png


环形队列为空或为满时,生产者和消费者可能会访问同一个位置


为空时,生产者先生产,所以消费者只能在生产者后面的位置,不可能超越

1d101ee27749d7c3ca8c3f7130aff2af_3a33316e2e9c46a78ec2c3d1f217e812.png


为满时,只有消费者消费一个任务,生产者才能生产任务,所以生产者在消费者的后面,不可能超越

4e741b966f92a9a0db4f35bb6a1c074c_f1a6f73dc40849d588d20a28a8eda55f.png


对于生产者:productor_sem

申请成功,将生产的任务放到环形队列中,生产者信号量总数减少P(productor_sem),消费者的信号量增加V(consumer_sem)


对于消费者:consumer_sem

消费一个任务,消费者信号量减少P(consumer_ssem),生产者信号量增加V(productor_sem)


生产者和消费者的位置其实就是队列的下标


static const int gcap=5;
template<class T>
class Ringqueue
{
private:
    void P(sem_t &sem)
    {
        int n=sem_wait(&sem);
        assert(n==0);
        (void)n;
    }
    void V(sem_t &sem)
    {
        int n=sem_post(&sem);
        assert(n==0);
        (void)n;
    }
public:
    Ringqueue(const int&cap=gcap)
    :_queue(cap)
    ,_cap(cap)
    {
        int n=sem_init(&_spaceSem,0,_cap);
        assert(n==0);
        n=sem_init(&_dataSem,0,0);
        assert(n==0);
        _productor=_consumer=0;
        pthread_mutex_init(&_pmutex,nullptr);
        pthread_mutex_init(&_cmutex,nullptr);
    }
    //生产者
    void Push(const T&in)
    {
        //申请信号量-减少空间信号量
        P(_spaceSem);
        pthread_mutex_lock(&_pmutex);
        _queue[_productor++]=in;
        _productor%=_cap;
        pthread_mutex_unlock(&_pmutex);
        //增加数据信号量
        V(_dataSem);
    }
    //消费者
    void Pop(T*out)
    {
        //减少数据信号量
        P(_dataSem);
        pthread_mutex_lock(&_cmutex);
        *out=_queue[_consumer++];
        _consumer%=_cap;
        pthread_mutex_unlock(&_cmutex);
        //减少消费者信号量
        V(_spaceSem);
    }
    ~Ringqueue()
    {
        sem_destroy(&_spaceSem);
        sem_destroy(&_dataSem);
        pthread_mutex_destroy(&_pmutex);
        pthread_mutex_destroy(&_cmutex);
    }
private:
    vector<T> _queue;
    //队列上限
    int _cap;
    sem_t _spaceSem;//生产者,空间资源
    sem_t _dataSem;//消费者,数据资源
    int _productor;//生产者位置
    int _consumer;//消费者位置
    pthread_mutex_t _pmutex;
    pthread_mutex_t _cmutex;
};


线程池


一种线程使用的模式。线程过多会带来调度开销,进而影响效率;线程池维护着多个线程,等待着分配的可并发的任务,避免了在处理短时间任务创建和销毁线程的代价


86081c189d31131029747919f1dbb615_1e5efe52385c474f811193a0502b1a96.png


简单来说,线程池就是一批提前创建完成的线程。当有任务到来时,就去处理任务;没有任务时,线程就进行休眠,通过空间来换取时间


目录
相关文章
|
存储 Linux API
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
在计算机系统的底层架构中,操作系统肩负着资源管理与任务调度的重任。当我们启动各类应用程序时,其背后复杂的运作机制便悄然展开。程序,作为静态的指令集合,如何在系统中实现动态执行?本文带你一探究竟!
【Linux进程概念】—— 操作系统中的“生命体”,计算机里的“多线程”
|
并行计算 Linux
Linux内核中的线程和进程实现详解
了解进程和线程如何工作,可以帮助我们更好地编写程序,充分利用多核CPU,实现并行计算,提高系统的响应速度和计算效能。记住,适当平衡进程和线程的使用,既要拥有独立空间的'兄弟',也需要在'家庭'中分享和并行的成员。对于这个世界,现在,你应该有一个全新的认识。
394 67
|
算法 Unix Linux
linux线程调度策略
linux线程调度策略
506 0
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
通过本文,您可以了解如何在业务线程中注册和处理Linux信号。正确处理信号可以提高程序的健壮性和稳定性。希望这些内容能帮助您更好地理解和应用Linux信号处理机制。
268 26
|
Linux
Linux编程: 在业务线程中注册和处理Linux信号
本文详细介绍了如何在Linux中通过在业务线程中注册和处理信号。我们讨论了信号的基本概念,并通过完整的代码示例展示了在业务线程中注册和处理信号的方法。通过正确地使用信号处理机制,可以提高程序的健壮性和响应能力。希望本文能帮助您更好地理解和应用Linux信号处理,提高开发效率和代码质量。
291 17
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
326 0
Linux C/C++之线程基础
|
存储 设计模式 NoSQL
Linux线程详解
Linux线程详解
|
缓存 Linux C语言
Linux线程是如何创建的
【8月更文挑战第5天】线程不是一个完全由内核实现的机制,它是由内核态和用户态合作完成的。
|
安全 Linux
Linux线程(十一)线程互斥锁-条件变量详解
Linux线程(十一)线程互斥锁-条件变量详解

热门文章

最新文章