Linux C实现线程池

简介: 主要是在linux下C语言代码实现线程池,关于链表的操作,多线程,以及多线程锁、条件变量等知识点请自行参考其他博客。

主要目的

主要是在linux下C语言代码实现线程池,关于链表的操作,多线程,以及多线程锁、条件变量等知识点请自行参考其他博客。


线程池

使用场景

高性能服务器处理大量客户端的情景,比如火车售票系统,购物网,炒股网站等。

为什么使用线程池

想想在一个百万级客户端使用的服务器,客户集中在某个时刻访问服务器,服务器是否在某一时刻为所有客户开启一个线程去处理任务,显然不现实,比如1W个客户同时访问服务器,以posix来说,每个线程需要的内存资源在8M左右,那么1w个需要多少内存?

线程池作用

前文说了为什么要使用线程池,那么线程池的主要作用除了上面说的避免线程太多,导致服务器内存耗尽。那么另外创建线程的另外2个作用是:避免创建于销毁线程的代价和任务与执行分离的作用

或许有的朋友对任务与执行分离感到疑惑,那么举个简单例子吧。对于游戏服务器来说,某个时刻有大量客户登录,这时候需要将所有的客户的登录信息(登录时间等)记录下来,这些信息是需要记录到数据库文件的。我们知道,磁盘写入和内存读写相比,是很慢的,那么我们只需要在主线程告诉某个客户登录了,然后将客户登录的信息记录到数据库由线程池来实现写入。这就是任务与执行分离的一个例子。

生活例子来说明线程池

我们以银行办理业务来说明线程池,通过这个例子我们可以知道线程池的主要数据结构。

银行为客户办理业务的过程中,主要有客户任务、柜员、公示牌(排队号)3个角色,客户是办理业务的,因此对应于线程要执行的任务,柜员为客户服务,即柜员是线程池中线程的概念,公示牌的作用是连接柜员与办理业务的桥梁,主要的作用是柜员呼叫客户的标志。在深入一点,我们将客户的任务当做线程中的临界资源,所有的柜员都有机会去为某个客户办理服务, 这取决于柜员当前是否处于忙碌中,我们可以将柜员的服务过程用伪代码来表示

while(1)
{
  加锁
  while(当前没有客户)
  {
  释放锁,让公示牌有机会去增加客户号码
  (如果这里不释放锁,那么就会造成死锁)
  摸鱼中....
  当银行来客户了,那么柜员就呼叫客户去服务
  (这里要注意,是所有柜员去争夺这个客户,
  当然现实柜员没有那么认真,一个柜员去服务即可,
  想象力更丰富一点,洗脚店...多个技师就你一个客户...)
     如果柜员中途有事或者被银行经理叫走,
     那么应该直接将自己服务的标志设置为停止,
     并且将自己从线程池中取出,直接退出循环
     加锁(我抢到了这个客户,其他柜员不允许为其服务)  
  }
  从任务列表中取出一个任务(该客户的号码不会
    出现等待队列中,被其他柜员呼叫)
  释放锁
  为取出的任务服务。
}
释放自身的线程资源

不知道你们对上面的伪代码是否能理解上文的代码,是否可以根据上文的描述,知道使用C语言的相关代码来描述柜员(即线程)的主要逻辑。

通过上面的描述,我们知道了得到了线程池中主要需要3个结构体:

(1) 执行队列(线程的概念): 对应柜员

(2)任务队列: 对应客户任务

(3)管理组件(线程池):对应公示牌

先来的客户先服务,我们将所有的客户进行排队,因此想到任务结构体应该使用队列的形式来表示,所有的柜员使用链表来表示,为了代码的简单性,将柜员和客户都使用双向链表表示,公示牌是连接二者的桥梁,因此应该具有线程和任务,在数据结构中即是将2个对象放入到线程池中,在后面的代码很容易理解。另外,银行服务分为VIP客户和普通客户,那么我们知道柜员被分为2类,即有2种线程池,一类线程池(服务VIP的柜员),一类线程池(服务普通客户的柜员),因此在线程的结构中还应该有线程池的对象。因此如果任务有优先级的特性,还应该使用多种线程池(这是自我理解,可能是错误的)


线程池主要API

一般线程池在一个软件中,是作为基础组件为上层服务的,那么实现一个线程池需要实现哪些API供上层调用呢?

(1)首先,我们从线程池的名字可以看出,线程池是由一些线程构成的,通常,我们在使用线程之前,需要创建线程,那么肯定线程池也少不了初始化的功能,其包括了创建线程的功能,初始化锁和条件变量等

(2)其次,线程池的主要功能是告诉线程有任务执行,因此必须要有一个接口用于插入任务的接口。

(3)有创建,那当然少不了释放线程池的接口

其次,像获取线程的个数,或者空闲线程都是一些为线程池锦上添花的一些功能,在我们的代码中没有实现。


代码实现

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <string.h>
//以银行办理业务来说明
//柜员的处理--对应线程(执行)逻辑
//客户的任务--对应任务
//所有柜员-对应执行队列
//所有客户的任务-对应任务队列
//要明白客户的任务相对于柜员是临界资源,柜员叫号就相当于去争取资源。
//公示牌-对应线程池(通知某个客户到某个柜员处办理业务-
//柜员主要去拉客户的任务来执行
//--对应任务的线程池来说就是某一个线程会在某一时刻去执行任务
//执行队列
typedef struct NWORKER
{
    pthread_t id;  //线程id(用来控制线程)
    int terminate; //是否停止的标志
    struct NWORKER *prev;
    struct NWORKER *next;
    struct NTHREADPOLL *pool; //线程所属的线程池
} nworker;
//任务队列
typedef struct NJOB
{
    void *user_data;用来做任务的参数
    void (*job_func)(struct NJOB *job);//任务执行的函数指针
    struct NJOB *prev;
    struct NJOB *next;
} njob;
//(管理组件)线程池
typedef struct NTHREADPOLL
{
    pthread_mutex_t mutex;  //线程锁(为了使任务有序的执行)
    //条件变量(等待任务的到来的变量,如果没有任务时,会释放锁,
    //有任务时,又会去争夺任务)
    pthread_cond_t cond;    
    struct NWORKER *workers; //执行队列(指向首节点)
    struct NJOB *njobs;      //任务队列(指向首节点)
} nthreadpoll;
//头插法-插入结点
//(list)加()是保证*的优先于->,否则不会得到正确的结果
#define LL_ADD(item, list)     \
    do                         \
    {                          \
        item->prev = NULL;     \
        item->next = (list);     \
        if ((list) != NULL)      \
            (list)->prev = item; \
        (list) = item;           \
    } while (0);
//删除结点(不释放内存)
#define LL_REMOVE(item, list)              \
    do                                     \
    {                                      \
        if (item->prev != NULL)            \
            item->prev->next = item->next; \
        if (item->next != NULL)            \
            item->next->prev = item->prev; \
        if (item == (list))                  \
            (list) = (list)->next;             \
        item->next = item->prev = NULL;    \
    } while (0);
/**
 * @description: 线程的回调逻辑
 *(银行柜员的工作逻辑:有客户就为客户执行任务,没有客户就等待客户来)
 * @param {*arg worker}
 * @return {*}
 */
void *thread_callback(void *arg)
{
    nworker *worker = (nworker *)arg;
    while (1)
    {
        pthread_mutex_lock(&worker->pool->mutex);
        while (worker->pool->njobs == NULL)
        { //没有执行的任务
            if (worker->terminate)
                break;  
            pthread_cond_wait(&worker->pool->cond, 
            &worker->pool->mutex);
        }
        //柜员下班或者中途有事,释放自己占有的资源(客户的任务)
        //让其他柜员去执行
        if (worker->terminate)//
        {
            pthread_mutex_unlock(&worker->pool->mutex);
            break;
        }
        struct NJOB *job = worker->pool->njobs;//取出队列首任务
        if(job) 
        {
            LL_REMOVE(job, worker->pool->njobs);
        }
        pthread_mutex_unlock(&worker->pool->mutex);
        //暂时不清楚这里为啥还要判断! 
        //老师讲的是如果有一个任务,有多个线程去争夺,
        //可能njob *job = worker->poll->njobs取出来时为空
        //那么pthread_cond_wait在释放锁时,资源被其他线程争夺了,
        //任务队列再次为空,
        //那么上面的while (!worker->poll->njobs)会往下走吗?
        //难道是pthread_cond_wait还没加锁
        //完成时,这时候while循环判断不为空,
        //其他线程执行了 njob *job = worker->poll->njobs,这样
        //导致该线程执行 njob *job = worker->poll->njobs为空?
        if(!job) continue; 
        执行任务(这里的job一定不会被其他线程获取到,
        //因为前面使用LL_REMOVE从任务队列中删除了)
        job->job_func(job);//参数也是自己
    }
    free(worker);//释放线程内存空间
}
/**
 * @description:
 * @param {poll 要创建的线程池对象} 
 * @param {thread_num 要创建的线程数量} 
 * @return {成功创建线程的个数,小于0为错误}}
 */
int pthreadpool_create(nthreadpoll *pool, int thread_num)
{
    //参数判断
    if (!pool)
        return -1;
    memset(pool, 0, sizeof(nthreadpoll)); //
    if (thread_num < 1)
        thread_num = 1;
    //初始化poll参数
    // cond
    pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
    memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t));
    //metex
    pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
    memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t));
    //构造线程
    int idx;
    for (idx = 0; idx < thread_num; idx++)
    {
        nworker *worker = (nworker *)malloc(sizeof(nworker));
        if (worker == NULL)
        {
            perror("malloc worker error!");
            return idx;
        }
        memset(worker, 0, sizeof(nworker));
        worker->pool = pool;
        int ret = pthread_create(&worker->id, NULL, 
        thread_callback, worker);
        if (ret)
        {
            perror("pthread create error!");
             //释放最后一个分配poll失败的空间,
             //前面的线程都创建成功了,不用释放空间。
            free(worker);
            return idx;   //
        }
        LL_ADD(worker, pool->workers);
    }
    return idx;
}
//往线程池丢任务
int pthreadpool_push_task(nthreadpoll *pool, njob *njob)
{
    pthread_mutex_lock(&pool->mutex);
    LL_ADD(njob, pool->njobs);
    //通知等待的线程,已经有任务可以执行了
    pthread_cond_signal(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    return 0;
}
//释放线程池资源
int pthreadpool_destory(nthreadpoll *pool)
{
    nworker *worker = NULL;
    for(worker = pool->workers; worker != NULL;
     worker = pool->workers->next)
    {
        worker->terminate = 1;
    }
    pthread_mutex_lock(&pool->mutex);
    //广播给所有线程,告诉他们应该停止工作,释放自己的空间
    pthread_cond_broadcast(&pool->cond);
    pthread_mutex_unlock(&pool->mutex);
    return 0;
}
//debug 以下是测试代码
#define TASK_COUNT 1000
//要完成的任务
void counter(struct NJOB *job)
{
    if (job == NULL) return ;
  int idx = *(int*)job->user_data;
  printf("idx : %d, selfid: %lu\n", idx, pthread_self());
  free(job->user_data);
  free(job);
}
int main(int argc, char *argv[])
{
    int thread_num = 50;
    nthreadpoll pool = {0};
    pthreadpool_create(&pool, thread_num);
    int idx;
    for(idx = 0; idx < TASK_COUNT; ++idx)
    {   
        njob *job = (njob *)malloc(sizeof(njob));
        if(job == NULL) exit(0);
        job->job_func = counter;
        //任务的参数需要在其他函数中使用,需要在堆上分配内存
        job->user_data = malloc(sizeof(int));
  *(int *)(job->user_data) = idx;//任务编号
         pthreadpool_push_task(&pool, job);
    }
    getchar();
    pthreadpool_destory(&pool);
    return 0;
}

代码说明

代码来源:腾讯课堂-零声学院king老师(尊重他人成果,不是为了该学院打广告)

个人感觉代码不合理的地方:任务队列使用头插法的双链表感觉不合适,使用队列更合适,或者使用尾插法也可以。否则可能会导致后来的请求被先执行的问题,不过这里保证任务不丢失特性即可


支持多系统的线程池代码:多平台的线程池代码版本


相关文章
|
7月前
|
消息中间件 存储 缓存
【嵌入式软件工程师面经】Linux系统编程(线程进程)
【嵌入式软件工程师面经】Linux系统编程(线程进程)
140 1
|
5月前
|
算法 Unix Linux
linux线程调度策略
linux线程调度策略
110 0
|
3月前
|
资源调度 Linux 调度
Linux C/C++之线程基础
这篇文章详细介绍了Linux下C/C++线程的基本概念、创建和管理线程的方法,以及线程同步的各种机制,并通过实例代码展示了线程同步技术的应用。
47 0
Linux C/C++之线程基础
|
3月前
|
安全 Linux
Linux线程(十一)线程互斥锁-条件变量详解
Linux线程(十一)线程互斥锁-条件变量详解
|
5月前
|
存储 设计模式 NoSQL
Linux线程详解
Linux线程详解
|
5月前
|
缓存 Linux C语言
Linux线程是如何创建的
【8月更文挑战第5天】线程不是一个完全由内核实现的机制,它是由内核态和用户态合作完成的。
|
5月前
|
负载均衡 Linux 调度
在Linux中,进程和线程有何作用?
在Linux中,进程和线程有何作用?
|
5月前
|
缓存 Linux C语言
Linux中线程是如何创建的
【8月更文挑战第15天】线程并非纯内核机制,由内核态与用户态共同实现。
|
7月前
|
API
linux---线程互斥锁总结及代码实现
linux---线程互斥锁总结及代码实现
|
7月前
|
Linux API
Linux线程总结---线程的创建、退出、取消、回收、分离属性
Linux线程总结---线程的创建、退出、取消、回收、分离属性