主要目的
主要是在linux下C语言代码实现线程池,关于链表的操作,多线程,以及多线程锁、条件变量等知识点请自行参考其他博客。
线程池
使用场景
高性能服务器处理大量客户端的情景,比如火车售票系统,购物网,炒股网站等。
为什么使用线程池
想想在一个百万级客户端使用的服务器,客户集中在某个时刻访问服务器,服务器是否在某一时刻为所有客户开启一个线程去处理任务,显然不现实,比如1W个客户同时访问服务器,以posix来说,每个线程需要的内存资源在8M左右,那么1w个需要多少内存?
线程池作用
前文说了为什么要使用线程池,那么线程池的主要作用除了上面说的避免线程太多,导致服务器内存耗尽。那么另外创建线程的另外2个作用是:避免创建于销毁线程的代价和任务与执行分离的作用
或许有的朋友对任务与执行分离感到疑惑,那么举个简单例子吧。对于游戏服务器来说,某个时刻有大量客户登录,这时候需要将所有的客户的登录信息(登录时间等)记录下来,这些信息是需要记录到数据库文件的。我们知道,磁盘写入和内存读写相比,是很慢的,那么我们只需要在主线程告诉某个客户登录了,然后将客户登录的信息记录到数据库由线程池来实现写入。这就是任务与执行分离的一个例子。
生活例子来说明线程池
我们以银行办理业务来说明线程池,通过这个例子我们可以知道线程池的主要数据结构。
银行为客户办理业务的过程中,主要有客户任务、柜员、公示牌(排队号)3个角色,客户是办理业务的,因此对应于线程要执行的任务,柜员为客户服务,即柜员是线程池中线程的概念,公示牌的作用是连接柜员与办理业务的桥梁,主要的作用是柜员呼叫客户的标志。在深入一点,我们将客户的任务当做线程中的临界资源,所有的柜员都有机会去为某个客户办理服务, 这取决于柜员当前是否处于忙碌中,我们可以将柜员的服务过程用伪代码来表示
while(1) { 加锁 while(当前没有客户) { 释放锁,让公示牌有机会去增加客户号码 (如果这里不释放锁,那么就会造成死锁) 摸鱼中.... 当银行来客户了,那么柜员就呼叫客户去服务 (这里要注意,是所有柜员去争夺这个客户, 当然现实柜员没有那么认真,一个柜员去服务即可, 想象力更丰富一点,洗脚店...多个技师就你一个客户...) 如果柜员中途有事或者被银行经理叫走, 那么应该直接将自己服务的标志设置为停止, 并且将自己从线程池中取出,直接退出循环 加锁(我抢到了这个客户,其他柜员不允许为其服务) } 从任务列表中取出一个任务(该客户的号码不会 出现等待队列中,被其他柜员呼叫) 释放锁 为取出的任务服务。 } 释放自身的线程资源
不知道你们对上面的伪代码是否能理解上文的代码,是否可以根据上文的描述,知道使用C语言的相关代码来描述柜员(即线程)的主要逻辑。
通过上面的描述,我们知道了得到了线程池中主要需要3个结构体:
(1) 执行队列(线程的概念): 对应柜员
(2)任务队列: 对应客户任务
(3)管理组件(线程池):对应公示牌
先来的客户先服务,我们将所有的客户进行排队,因此想到任务结构体应该使用队列的形式来表示,所有的柜员使用链表来表示,为了代码的简单性,将柜员和客户都使用双向链表表示,公示牌是连接二者的桥梁,因此应该具有线程和任务,在数据结构中即是将2个对象放入到线程池中,在后面的代码很容易理解。另外,银行服务分为VIP客户和普通客户,那么我们知道柜员被分为2类,即有2种线程池,一类线程池(服务VIP的柜员),一类线程池(服务普通客户的柜员),因此在线程的结构中还应该有线程池的对象。因此如果任务有优先级的特性,还应该使用多种线程池(这是自我理解,可能是错误的)
线程池主要API
一般线程池在一个软件中,是作为基础组件为上层服务的,那么实现一个线程池需要实现哪些API供上层调用呢?
(1)首先,我们从线程池的名字可以看出,线程池是由一些线程构成的,通常,我们在使用线程之前,需要创建线程,那么肯定线程池也少不了初始化的功能,其包括了创建线程的功能,初始化锁和条件变量等
(2)其次,线程池的主要功能是告诉线程有任务执行,因此必须要有一个接口用于插入任务的接口。
(3)有创建,那当然少不了释放线程池的接口
其次,像获取线程的个数,或者空闲线程都是一些为线程池锦上添花的一些功能,在我们的代码中没有实现。
代码实现
#include <stdio.h> #include <stdlib.h> #include <pthread.h> #include <string.h> //以银行办理业务来说明 //柜员的处理--对应线程(执行)逻辑 //客户的任务--对应任务 //所有柜员-对应执行队列 //所有客户的任务-对应任务队列 //要明白客户的任务相对于柜员是临界资源,柜员叫号就相当于去争取资源。 //公示牌-对应线程池(通知某个客户到某个柜员处办理业务- //柜员主要去拉客户的任务来执行 //--对应任务的线程池来说就是某一个线程会在某一时刻去执行任务 //执行队列 typedef struct NWORKER { pthread_t id; //线程id(用来控制线程) int terminate; //是否停止的标志 struct NWORKER *prev; struct NWORKER *next; struct NTHREADPOLL *pool; //线程所属的线程池 } nworker; //任务队列 typedef struct NJOB { void *user_data;用来做任务的参数 void (*job_func)(struct NJOB *job);//任务执行的函数指针 struct NJOB *prev; struct NJOB *next; } njob; //(管理组件)线程池 typedef struct NTHREADPOLL { pthread_mutex_t mutex; //线程锁(为了使任务有序的执行) //条件变量(等待任务的到来的变量,如果没有任务时,会释放锁, //有任务时,又会去争夺任务) pthread_cond_t cond; struct NWORKER *workers; //执行队列(指向首节点) struct NJOB *njobs; //任务队列(指向首节点) } nthreadpoll; //头插法-插入结点 //(list)加()是保证*的优先于->,否则不会得到正确的结果 #define LL_ADD(item, list) \ do \ { \ item->prev = NULL; \ item->next = (list); \ if ((list) != NULL) \ (list)->prev = item; \ (list) = item; \ } while (0); //删除结点(不释放内存) #define LL_REMOVE(item, list) \ do \ { \ if (item->prev != NULL) \ item->prev->next = item->next; \ if (item->next != NULL) \ item->next->prev = item->prev; \ if (item == (list)) \ (list) = (list)->next; \ item->next = item->prev = NULL; \ } while (0); /** * @description: 线程的回调逻辑 *(银行柜员的工作逻辑:有客户就为客户执行任务,没有客户就等待客户来) * @param {*arg worker} * @return {*} */ void *thread_callback(void *arg) { nworker *worker = (nworker *)arg; while (1) { pthread_mutex_lock(&worker->pool->mutex); while (worker->pool->njobs == NULL) { //没有执行的任务 if (worker->terminate) break; pthread_cond_wait(&worker->pool->cond, &worker->pool->mutex); } //柜员下班或者中途有事,释放自己占有的资源(客户的任务) //让其他柜员去执行 if (worker->terminate)// { pthread_mutex_unlock(&worker->pool->mutex); break; } struct NJOB *job = worker->pool->njobs;//取出队列首任务 if(job) { LL_REMOVE(job, worker->pool->njobs); } pthread_mutex_unlock(&worker->pool->mutex); //暂时不清楚这里为啥还要判断! //老师讲的是如果有一个任务,有多个线程去争夺, //可能njob *job = worker->poll->njobs取出来时为空 //那么pthread_cond_wait在释放锁时,资源被其他线程争夺了, //任务队列再次为空, //那么上面的while (!worker->poll->njobs)会往下走吗? //难道是pthread_cond_wait还没加锁 //完成时,这时候while循环判断不为空, //其他线程执行了 njob *job = worker->poll->njobs,这样 //导致该线程执行 njob *job = worker->poll->njobs为空? if(!job) continue; 执行任务(这里的job一定不会被其他线程获取到, //因为前面使用LL_REMOVE从任务队列中删除了) job->job_func(job);//参数也是自己 } free(worker);//释放线程内存空间 } /** * @description: * @param {poll 要创建的线程池对象} * @param {thread_num 要创建的线程数量} * @return {成功创建线程的个数,小于0为错误}} */ int pthreadpool_create(nthreadpoll *pool, int thread_num) { //参数判断 if (!pool) return -1; memset(pool, 0, sizeof(nthreadpoll)); // if (thread_num < 1) thread_num = 1; //初始化poll参数 // cond pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(&pool->cond, &blank_cond, sizeof(pthread_cond_t)); //metex pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(&pool->mutex, &blank_mutex, sizeof(pthread_mutex_t)); //构造线程 int idx; for (idx = 0; idx < thread_num; idx++) { nworker *worker = (nworker *)malloc(sizeof(nworker)); if (worker == NULL) { perror("malloc worker error!"); return idx; } memset(worker, 0, sizeof(nworker)); worker->pool = pool; int ret = pthread_create(&worker->id, NULL, thread_callback, worker); if (ret) { perror("pthread create error!"); //释放最后一个分配poll失败的空间, //前面的线程都创建成功了,不用释放空间。 free(worker); return idx; // } LL_ADD(worker, pool->workers); } return idx; } //往线程池丢任务 int pthreadpool_push_task(nthreadpoll *pool, njob *njob) { pthread_mutex_lock(&pool->mutex); LL_ADD(njob, pool->njobs); //通知等待的线程,已经有任务可以执行了 pthread_cond_signal(&pool->cond); pthread_mutex_unlock(&pool->mutex); return 0; } //释放线程池资源 int pthreadpool_destory(nthreadpoll *pool) { nworker *worker = NULL; for(worker = pool->workers; worker != NULL; worker = pool->workers->next) { worker->terminate = 1; } pthread_mutex_lock(&pool->mutex); //广播给所有线程,告诉他们应该停止工作,释放自己的空间 pthread_cond_broadcast(&pool->cond); pthread_mutex_unlock(&pool->mutex); return 0; } //debug 以下是测试代码 #define TASK_COUNT 1000 //要完成的任务 void counter(struct NJOB *job) { if (job == NULL) return ; int idx = *(int*)job->user_data; printf("idx : %d, selfid: %lu\n", idx, pthread_self()); free(job->user_data); free(job); } int main(int argc, char *argv[]) { int thread_num = 50; nthreadpoll pool = {0}; pthreadpool_create(&pool, thread_num); int idx; for(idx = 0; idx < TASK_COUNT; ++idx) { njob *job = (njob *)malloc(sizeof(njob)); if(job == NULL) exit(0); job->job_func = counter; //任务的参数需要在其他函数中使用,需要在堆上分配内存 job->user_data = malloc(sizeof(int)); *(int *)(job->user_data) = idx;//任务编号 pthreadpool_push_task(&pool, job); } getchar(); pthreadpool_destory(&pool); return 0; }
代码说明
代码来源:腾讯课堂-零声学院king老师(尊重他人成果,不是为了该学院打广告)
个人感觉代码不合理的地方:任务队列使用头插法的双链表感觉不合适,使用队列更合适,或者使用尾插法也可以。否则可能会导致后来的请求被先执行的问题,不过这里保证任务不丢失特性即可
支持多系统的线程池代码:多平台的线程池代码版本