本文主要介绍python异步并发模块concurrent.futures。它非常简单易用,主要用来实现多线程和多进程的异步并发。
1. 模块安装
1) python 3.x中自带了concurrent.futures模块
2) python 2.7需要安装futures模块,使用命令pip install futures安装即可
pypi地址:https://pypi.python.org/pypi/futures/
2. Executor对象
class concurrent.futures.Executor
Executor是一个抽象类,它提供了异步执行调用的方法。它不能直接使用,但可以通过它的两个子类ThreadPoolExecutor或者ProcessPoolExecutor进行调用。
2.1 Executor.submit(fn, *args, **kwargs)
fn:需要异步执行的函数
*args, **kwargs:fn参数
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
|
#-*- coding:utf-8 -*-
from concurrent import futures
def test(num):
import time
return time.ctime(),num
with futures.ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(test,1)
print future.result()
>>>
('Tue Jan 17 15:23:10 2017', 1)
|
2.2 Executor.map(func, *iterables, timeout=None)
相当于map(func, *iterables),但是func是异步执行。timeout的值可以是int或float,如果操作超时,会返回raisesTimeoutError;如果不指定timeout参数,则不设置超时间。
func:需要异步执行的函数
*iterables:可迭代对象,如列表等。每一次func执行,都会从iterables中取参数。
timeout:设置每次异步操作的超时时间
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
#-*- coding:utf-8 -*-
from concurrent import futures
def test(num):
import time
return time.ctime(),num
data=[1,2,3]
with futures.ThreadPoolExecutor(max_workers=1) as executor:
for future in executor.map(test,data):
print future
>>>
('Tue Jan 17 15:23:47 2017', 1)
('Tue Jan 17 15:23:47 2017', 2)
('Tue Jan 17 15:23:47 2017', 3)
|
2.3 Executor.shutdown(wait=True)
释放系统资源,在Executor.submit()或 Executor.map()等异步操作后调用。使用with语句可以避免显式调用此方法。
3. ThreadPoolExecutor对象
ThreadPoolExecutor类是Executor子类,使用线程池执行异步调用.
class concurrent.futures.ThreadPoolExecutor(max_workers)
使用max_workers数目的线程池执行异步调用
4. ProcessPoolExecutor对象
ThreadPoolExecutor类是Executor子类,使用进程池执行异步调用.
class concurrent.futures.ProcessPoolExecutor(max_workers=None)
使用max_workers数目的进程池执行异步调用,如果max_workers为None则使用机器的处理器数目(如4核机器max_worker配置为None时,则使用4个进程进行异步并发)。
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
#-*- coding:utf-8 -*-
from concurrent import futures
def test(num):
import time
return time.ctime(),num
def muti_exec(m,n):
#m 并发次数
#n 运行次数
with futures.ProcessPoolExecutor(max_workers=m) as executor: #多进程
#with futures.ThreadPoolExecutor(max_workers=m) as executor: #多线程
executor_dict=dict((executor.submit(test,times), times) for times in range(m*n))
for future in futures.as_completed(executor_dict):
times = executor_dict[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (times,future.exception()))
else:
print('RunTimes:%d,Res:%s'% (times, future.result()))
if __name__ == '__main__':
muti_exec(5,1)
>>>
RunTimes:0,Res:('Tue Jan 17 15:56:53 2017', 0)
RunTimes:4,Res:('Tue Jan 17 15:56:53 2017', 4)
RunTimes:3,Res:('Tue Jan 17 15:56:53 2017', 3)
RunTimes:1,Res:('Tue Jan 17 15:56:53 2017', 1)
RunTimes:2,Res:('Tue Jan 17 15:56:53 2017', 2)
|
5. Python GIL相关
要理解GIL的含义,我们需要从Python的基础讲起。像C++这样的语言是编译型语言,所谓编译型语言,是指程序输入到编译器,编译器再根据语言的语 法进行解析,然后翻译成语言独立的中间表示,最终链接成具有高度优化的机器码的可执行程序。编译器之所以可以深层次的对代码进行优化,是因为它可以看到整 个程序(或者一大块独立的部分)。这使得它可以对不同的语言指令之间的交互进行推理,从而给出更有效的优化手段。
与此相反,Python是解释型语言。程序被输入到解释器来运行。解释器在程序执行之前对其并不了解;它所知道的只是Python的规则,以及在执行过程 中怎样去动态的应用这些规则。它也有一些优化,但是这基本上只是另一个级别的优化。由于解释器没法很好的对程序进行推导,Python的大部分优化其实是 解释器自身的优化。
现在我们来看一下问题的症结所在。要想利用多核系统,Python必须支持多线程运行。作为解释型语言,Python的解释器必须做到既安全又高效。我们都知道多线程编程会遇到的问题,解释器要留意的是避免在不同的线程操作内部共享的数据,同时它还要保证在管理用户线程时保证总是有最大化的计算资源。
那么,不同线程同时访问时,数据的保护机制是怎样的呢?答案是解释器全局锁。从名字上看能告诉我们很多东西,很显然,这是一个加在解释器上的全局(从解释器的角度看)锁(从互斥或者类似角度看)。这种方式当然很安全,但是它有一层隐含的意思(Python初学者需要了解这个):对于任何Python程序,不管有多少的处理器,任何时候都总是只有一个线程在执行。
”为什么我全新的多线程Python程序运行得比其只有一个线程的时候还要慢?“许多人在问这个问题时还是非常犯晕的,因为显然一个具有两个线程的程序要比其只有一个线程时要快(假设该程序确实是可并行的)。事实上,这个问题被问得如此频繁以至于Python的专家们精心制作了一个标准答案:”不要使用多线程,请使用多进程”。
所以,对于计算密集型的,我还是建议不要使用python的多线程而是使用多进程方式,而对于IO密集型的,还是劝你使用多进程方式,因为使用多线程方式出了问题,最后都不知道问题出在了哪里,这是多么让人沮丧的一件事情!
6. 参考文档