线程池
一、线程池的概念
线程池: 一种线程使用模式,线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
优点:避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。
其中线程池中的线程数量应该取决于:可用的并发处理器、处理器内核、内存、网络sockets等的数量。
二、线程池的应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。 WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
三、线程池的实现
下面我们实现一个简单的线程池,线程池中提供了一个任务队列,以及若干个线程(多线程)
- 线程池中的多个线程负责从任务队列当中拿任务,并将拿到的任务进行处理。
- 线程池对外提供一个Push接口,用于让外部线程能够将任务Push到任务队列当中。
线程池的代码实现
- 首先,我们要能够确定线程的数量,所以我们要有一个
_cap
变量代表线程数目。 - 然后我们还要能把线程(对象)存储起来,方便我们后续进行管理线程。所以我们需要一个容器,这里我们选择
vector
- 同理,我们的线程的任务是不确定的,如果有任务我们应该保存执行的任务,方便分配给线程。
- 由于是多线程程序,所以多个线程从任务队列中取任务时会有线程安全的问题,为了解决这个问题我们还要给任务队列配一把锁。
- 当线程被创建以后却没有任务时,线程应该被挂起等待,所以我们还需要一个条件变量来让线程完成等待任务。
#include <pthread.h> #include <vector> #include <queue> template <class T> class ThreadPool { public: ThreadPool(int cap = 5) :_cap(cap),_threads(cap) { pthread_mutex_init(&_tasks_mtx, nullptr); pthread_cond_init(&_cond, nullptr); } void push(const T& data) { QueueLock(); _tasks.push(data); // 唤醒休眠的线程 pthread_cond_signal(&_cond); QueueUnlock(); } void start() { for (auto& e : _threads) { pthread_create(&e, nullptr, threadRoutine, this); } } // 线程的执行历程,必须是static函数不然有this指针,导致线程无法执行 static void* threadRoutine(void* args) { // 分离线程 pthread_detach(pthread_self()); ThreadPool<T>* ptp = static_cast<ThreadPool<T>*>(args); while (true) { ptp->QueueLock(); while (ptp->_tasks.empty()) { // 当前没有任务,线程应该挂起等待 ptp->threadWait(); } // 将任务从公共区域拿到线程自己的独立栈中 T task = ptp->pop(); ptp->QueueUnlock(); // 在临界区外运行任务,提高性能! // 下面两行代码是为了测试用的 task(); std::cout << task.formatRet() << std::endl; } return nullptr; } ~ThreadPool() { pthread_mutex_destroy(&_tasks_mtx); pthread_cond_destroy(&_cond); } private: void threadWait() { pthread_cond_wait(&_cond, &_tasks_mtx); } T pop() { T task = _tasks.front(); _tasks.pop(); return task; } void QueueLock() { pthread_mutex_lock(&_tasks_mtx); } void QueueUnlock() { pthread_mutex_unlock(&_tasks_mtx); } private: int _cap; // 线程的数量 std::vector<pthread_t> _threads; // 存储线程的容器 std::queue<T> _tasks; // 任务队列 pthread_mutex_t _tasks_mtx; // 任务队列的锁 pthread_cond_t _cond; // 线程的等待条件变量 };
为了测试我们的代码我们设计了下面的计算任务,输入两个数和一个操作符,计算其结果。
#include <iostream> #include <string> #include <unistd.h> class Task { public: Task() :_x(0), _y(0), _op('+'), _result(0), _exitcode(0) {} Task(int x, int y, char op) :_x(x), _y(y), _op(op), _result(0), _exitcode(0) {} // 进行计算 std::string operator()() { switch (_op) { case '+': _result = _x + _y; break; case '-': _result = _x - _y; break; case '*': _result = _x * _y; break; case '/': if (_y != 0) { _result = _x / _y; } else { _exitcode = -1; } break; case '%': if (_y != 0) { _result = _x % _y; } else { _exitcode = -2; } break; default: break; } return std::to_string(_result); } // 格式化表达式 std::string formatArg() { return std::to_string(_x) + _op + std::to_string(_y) + '='; } // 格式化输出结果 std::string formatRet() { return std::to_string(_result) + '(' + std::to_string(_exitcode) + ')'; } private: int _x; // 左操作数 int _y; // 右操作数 char _op; // 操作符 int _result; // 结果 int _exitcode; // 退出码 };
主线程逻辑:生成大量随机数,形成计算任务,让线程去执行。
#include <iostream> #include <pthread.h> #include "ThreadPool_V1.hpp" #include "Task.hpp" using namespace std; int main() { ThreadPool<Task> tp; tp.start(); int x, y; char op; const char* ops = "+-*/%"; while (true) { x = rand() % 100; y = rand() % 10; op = ops[rand() % 5]; Task t(x, y, op); cout << t.formatArg() << endl; tp.push(t); // 休眠1s sleep(1); } return 0; }
运行代码后一瞬间就有六个线程,其中一个是主线程,另外五个是线程池内处理任务的线程。
此后我们如果想让线程池处理其他不同的任务请求时,我们只需要提供一个任务类,在该任务类当中提供对应的任务处理方法就行了。