什么叫进程池
我们知道,一个进程创建子进程通常是为了让这个子进程去为它完成某个任务。例如我们使用的指令,其实就是bash进程创建子进程让子进程去执行的。但是我们需要考虑这样一个问题:是不是遇到问题之后才创建子进程呢?
频繁的创建和销毁进程都是一项较大的开销,涉及到内存分配、上下文切换等操作。于是我们可以提前创建出一批进程,当有任务要做的时候就从这一批子进程中拿出一个空闲的去执行,一旦某个子进程把任务执行完之后也不立即销毁进程,而是等待下一个任务的来临。
我们把这些预先创建的一批子进程就叫做进程池。我们把进程池中的进程也称为工作进程。
进程池的优点
- 通过复用已经创建的进程,减少创建和销毁进程的开销,提高了系统资源的利用与率和系统的性能。
- 动态调整进程池中工作进程的数量。可以让系统根据实际的负载和任务需求,动态的增加或者减少工作进程的数量,以适应不同的工作负载。提高了系统的工作的灵活性。
- 简化编程。进程池提供了一种高级抽象,隐藏了底层进程管理的细节,使得并发进程更加简单和可靠。
创建进程池
现在我们尝试自己去实现一个进程池。
创建进程好很简单,如何管理这些进程呢?每创建一个进程我们都用一个结构体(或者类)维护,于是对工作进程的管理就变成了对结构体的管理,整个进程池从代码角度上来说就是一个结构体数组。
那我们如何将任务分配到工作进程呢?这个问题的本质其实是在问进程之间通信的方式。于是我们很容易的想到使用管道。父进程通过管道,将任务信息传递给子进程。
下面是使用进程池的一个简单模型:
代码实现:
Process.cpp文件
#include <iostream> #include <cerrno> #include <unistd.h> #include <string> #include <vector> #include <sys/types.h> #include <wait.h> #include <sys/stat.h> #include "Task.hpp" using namespace std; // master class Channel // 维护管道信息 { public: Channel(int wfd, int subprocessid, string name) : _wfd(wfd), _subprocessid(subprocessid), _name(name) { } int GetWfd() { return _wfd; } int GetProcessId() { return _subprocessid; } string GetName() { return _name; } void CloseChannel() { // 关闭信道的写端 close(_wfd); } void Wait() { // 关闭工作进程 int status = 0; pid_t rid = waitpid(_subprocessid, NULL, 0); if (rid > 0) { cout << "wait " << rid << " success" << endl; } } ~Channel() { } private: int _wfd; // 信道以写方式打开的文件描述符 int _subprocessid; // 读端进程的pid string _name; // 信道的命名 }; // 创建信道和子进程 void CreatChannelAndSub(int num, vector<Channel> &Channels, task_t task) { // task是一个回调函数 for (int i = 0; i < num; i++) { // 1.创建管道 int pipefd[2]; int n = pipe(pipefd); if (n < 0) exit(0); // 创建子进程 pid_t pid = fork(); if (pid == 0) { // 关闭写端 close(pipefd[1]); dup2(pipefd[0], 0); // 将管道的读端重定向到标准输入 task(); // 子进程执行的任务 close(pipefd[0]); exit(0); } // 父进程关闭读端,只用来输出任务 close(pipefd[0]); string name = "Channel-" + to_string(i); // 将创建的信道存入输出型参数中 Channels.push_back({pipefd[1], pid, name}); } } int NextChannel(int channelnum) { static int next = 0; int channel = next; next = (next + 1) % channelnum; return channel; } void SendTask(int taskcommand, Channel &channel) { write(channel.GetWfd(), &taskcommand, sizeof(taskcommand)); } // 具体任务 void CtrlProcessOnce(vector<Channel> &Channels) { // 1.选择一个任务 int taskcommand = SelectTask(); // 2.选择一个空信道 int taskchannel = NextChannel(Channels.size()); // 3.发送任务编号给信道 SendTask(taskcommand, Channels[taskchannel]); cout << "taskcommand: " << taskcommand << " channel: " << Channels[taskchannel].GetName() << " sub process: " << Channels[taskchannel].GetProcessId() << endl; } // 控制子进程执行任务 void CtrlProcess(vector<Channel> &Channels, int time = -1) { // time表示分配多少次任务给子进程去执行,默认是无限 if (time == -1) { // 默认 while (true) { CtrlProcessOnce(Channels); sleep(1); } } else if (time > 0) { while (time--) { CtrlProcessOnce(Channels); sleep(1); } } } void CleanUpChannel(vector<Channel> &channels) { for (auto &it : channels) { it.CloseChannel(); } for (auto &it : channels) { it.Wait(); } } int main(int argc, char *argv[]) { // 参数列表,argv[1]表示创建的工作进程的数量,即内存池大小 if (argc != 2) { cerr << "Usage: " << argv[0] << " Processnum" << endl; return 1; } int num = stoi(argv[1]); // 加载任务 LoadTask(); vector<Channel> Channels; // 进程池 // 创建信道和子进程 CreatChannelAndSub(num, Channels, work1); // 通过Channel控制子进程 CtrlProcess(Channels, 10); // 关闭子进程和信道的写端 CleanUpChannel(Channels); return 0; }
Task.hpp文件
#include <iostream> #include <ctime> #include <sys/stat.h> #include <cstdlib> #include <unistd.h> #include <sys/types.h> #define TASKNUM 3 using namespace std; typedef void (*task_t)(); // task_t函数指针类型 void Print() { // 任务1 cout << "Printf task" << endl; } void DownLoad() { // 任务2 cout << "DownLoad task" << endl; } void Flush() { // 任务3 cout << "Flush task" << endl; } task_t tasks[TASKNUM]; // 用来存放函数指针 void LoadTask() { srand(time(nullptr)); tasks[0] = Print; tasks[1] = DownLoad; tasks[2] = Flush; } void ExcuteTask(int n) { if (n < 0 || n > 2) return; tasks[n](); } int SelectTask() { int n = rand() % TASKNUM; return n; } void work1() { while (true) { int command = 0; int n = read(0, &command, sizeof command); // 读取管道中的任务编号(int) if (n == sizeof(int)) { ExcuteTask(command); } else if (n == 0) { cout << "sub process: " << getpid() << " quit " << endl; break; } } }
观察运行结果: