开发者学堂课程【Python 入门 2020年版:进程池的使用】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/639/detail/10475
进程池的使用
内容介绍:
一、进程池
二、进程池中的 Queue
一、进程池
当需要创建的子进程数量不多时,可以直接利用 multiprocessing 中的 Process 动态成生多个进程,但如果是上百甚至上千个目标,手动的去创建进程的工作量巨大,此时就可以用到 multiprocessing 模块提供的 Pool 方法。
初始化 Pool 时,可以指定一个最大进程数,当有新的请求提交到 Pool 中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到指定的最大值,那么该请求就会等待,直到池中有进程结束,才会用之前的进程来执行新的任务。
理解:规定一个池子,最多四个进程,如果有四个任务能创建四个进程,如果有第五个任务就不再创建新的进程,那怎么办呢,就是等待前面的任务执行完,再执行第五个。
请看下面的实例:
from multiprocessing import Pool
import os, time, random
def worker(msg):
t_ start = time. time()
print("%s开始执行,进程号为%d" % (msg, os.getpid()))
# random. random()随机生成0~1之间的浮点数
time.sleep(random. random() * 2)
t_ stop = time. time()
print(msg,” 执行完毕,耗时%0.2f"% (t_stop - t_ start))
If_name_==’_main_’: #快捷键,直接打main会有个提示+回车
po = Pool(3) #定义一个进程池,最大进程数3
for i in range(0, 10):
# Pool(). apply_ async(要调用的目标,(传递给目标的参数元祖,))
#每次循环将会用空闲出来的子进程去调用目标
po.apply_ async(worker, (i,))
print("----start----")
po.close() #关闭进程池,关闭后po不再接收新的请求
po.join() #等待po中所有子进程执行完成,必须放在close语句之后
print(“-----end-----”)
运行结果:
0开始执行,进程号为5820
1开始执行,进程号为6500
2开始执行,进程号为11912
0执行完毕,耗时0.89
3开始执行,进程号为5820
1执行完毕,耗时0.86
4开始执行,进程号为6500
2执行完毕,耗时0.79
5开始执行,进程号为11912
5执行完毕,耗时1.15
6开始执行 ,进程号为11912
4执行完毕,耗时1.33
7开始执行,进程号为6500
3执行完毕,耗时1.81
8开始执行,进程号为5820
6执行完毕,耗时1.69
9开始执行,进程号为11912
7执行完毕,耗时1.66|
8执行完毕,耗时1.40
9执行完毕,耗时1.19
-----end-----
二、进程池中的 Queue
如果要使用 Pool 创建进程,就需要使用
multiprocessing.Manager()
中的 Queue()
而不是
multiprocessing.Queue()
,否则会得到一条如下的错误信息:
RuntimeError: Queue objects should only be shared between processes through inheritance.
下面的实例演示了进程池中的进程如何通信:
#修改 import 中的 Queue 为 Manager
from multiprocessing import Manager, Pool
import os, time, random
def reader(q):
print("reader启动(%s),父进程为(%s)" %(os.getpid(), os.getppid()))
for i in range(q.qsize()):
print("reader 从 Queue 获取到消息:%s" % q.get(True))
def writer(q):
print("writer 启动(%s),父进程为(%s)" %(os.getpid(), os.getppid()))
for i in "helloworld" :
q.put(i)
If_name_ == "_main_ ":
print("(%s) start" % os.getpid())
q=Manager().Queue() # 使用Manager中的Queue
po = Pool()
po.apply_async(writer, (q,))
time.sleep(1) # 先让上面的任务向Queue存入数据,然后再让下面的任务开始从中取数据
po.apply_ async(reader, (q,))
po.close()
po.join()
print("(%s) End" % os.getpid())
运行结果:
(4171) start
writer启动(4173),父进程为(4171)
reader启动(4174),父进程为(4171)
reader 从 Queue 获取到信息:h
reader 从 Queue 获取到信息:e
reader 从 Queue 获取到信息:l
reader 从 Queue 获取到信息:l
reader 从 Queue 获取到信息:o
reader 从 Queue 获取到信息:w
reader 从 Queue 获取到信息:o
reader 从 Queue 获取到信息:r
reader 从 Queue 获取到信息:l
reader 从 Queue 获取到信息:d
(4171) End