python --- mulitprocessing(多进程)模块使用

简介: 1. 什么是进程?  进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。

1. 什么是进程?

  进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。

一个进程至少包含一个线程。

2. 在python中有了多线程编程为何还需要多进程编程?

  在python中由于有GIL(全局解释器锁)的存在,在任一时刻只有一个线程在运行(无论你的CPU是多少核),无法实现真正的多线程。那么该如何让python程序真正的并行运行呢?答案就是不要使用多线程,使用多进程。python标准库提供了multiprocessing模块(multiprocessing是一个和threading模块类似,提供API,生成进程的模块。multiprocessing包提供本地和远程并发,通过使用子进程而不是线程有效地转移全局解释器锁。),它的API几乎复制了threading模块的API,当然它还有一行threading模块没有的API。

 

例一(multiprocessing模块的简单使用):

 1 import multiprocessing,time
 2 
 3 class Task(multiprocessing.Process):
 4     def __init__(self):
 5         super(Task, self).__init__()
 6 
 7     def run(self):
 8         print("Process---%s" % self.name)
 9         time.sleep(2)
10 
11 
12 if __name__ == "__main__":
13     for i in range(1, 8+1):
14         t = Task()
15         t.start()
View Code

  注:由于multiprocessing模块基本的API同threading模块,就不挨个演示了,本文主要讲解multiprocessing模块不同于threading模块的API的使用。要了解其他同threading模块相同的API的使用,可参见:http://www.cnblogs.com/God-Li/p/7732407.html

 

multiprocessing.Process源码:

class Process(object):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        self.name = ''
        self.daemon = False      #守护进程标志,必须在start()之前设置
        self.authkey = None      #The process’s authentication key (a byte string).
        self.exitcode = None     #The child’s exit code. This will be None if the process has not yet terminated. A negative value -N indicates that the child was terminated by signal N.
        self.ident = 0
        self.pid = 0          #进程ID。在生成进程之前,这将是Non。
        self.sentinel = None   #A numeric handle of a system object which will become “ready” when the process ends.

    def run(self):
        pass

    def start(self):
        pass

    def terminate(self):
        """
        Terminate the process. On Unix this is done using the SIGTERM signal; on Windows TerminateProcess() is used. 
        Note that exit handlers and finally clauses, etc., will not be executed.
        Note that descendant processes of the process will not be terminated – they will simply become orphaned.
        :return: 
        """
        pass

    def join(self, timeout=None):
        pass

    def is_alive(self):
        return False

 

multiprocessing模块中的队列:

  class multiprocessing.Queue([maxsize])实现除task_done()join()之外的queue.Queue的所有方法,下面列出queue.Queue中没有的方法:

class multiprocessing.Queue([maxsize])
    close()
    """
    指示当前进程不会在此队列上放置更多数据。
    The background thread will quit once it has flushed all buffered data to the pipe.
    当队列被垃圾回收时,这被自动调用。
    """

    join_thread()
    """
    加入后台线程。这只能在调用close()之后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到pipe。
    默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。
    该进程可以调用cancel_join_thread()使join_thread()不执行任何操作
    """

    cancel_join_thread()
    """
    使join_thread()不执行任何操作
    """

  class multiprocessing.SimpleQueue是class multiprocessing.Queue([maxsize])的简化,只有三个方法------empty(), get(), put()

   class multiprocessing.JoinableQueue([maxsize])是class multiprocessing.Queue([maxsize])的子类,增加了take_done()和join()方法

   注:由于进程之间内存空间不共享,所以必须将实例化后的queue对象当作参数传入其他进程,其他进程才能使用。而且,每传入一次相当于克隆一份,与原来的queue独立,只是python会同步queue中的数据,而不是像在多线程的queue数据只有一份。

 

进程之间的通信:

  multiprocessing.Pipe([duplex])  --------------- 返回表示管道末端的Connection对象(类似与socket中的连接可用于发送和接收数据)的(conn1, conn2)。 

    如果duplex是True(默认值),则管道是双向的。如果duplex是False,则管道是单向的:conn1只能用于接收消息,conn2用于发送消息。

  例二(multiprocessing.Pipe使用演示):

 1 import multiprocessing,time
 2 
 3 class Processing_1(multiprocessing.Process):
 4     def __init__(self, conn):
 5         super(Processing_1, self).__init__()
 6         self.conn = conn
 7     def run(self):
 8         send_data = "this message is from p1"
 9         self.conn.send(send_data)   #使用conn发送数据
10         time.sleep(0.8)
11         recv_data = self.conn.recv() #使用conn接收数据
12         print("p1 recv: " + recv_data)
13         self.conn.close()
14 
15 
16 class Processing_2(multiprocessing.Process):
17     def __init__(self, conn):
18         super(Processing_2, self).__init__()
19         self.conn = conn
20 
21     def run(self):
22         send_data = "this message is from p2"
23         self.conn.send(send_data)
24         time.sleep(0.8)
25         recv_data = self.conn.recv()
26         print("p2 recv: " + recv_data)
27         self.conn.close()
28 
29 if __name__ == "__main__":
30     conn1, conn2 = multiprocessing.Pipe()   #实例化Pipe对象,conn1, conn2分别代表连接两端
31 
32     p1 = Processing_1(conn1)   #将连接对象当作参数传递给子进程
33     p2 = Processing_2(conn2)
34 
35     p1.start()
36     p2.start()
37 
38     p1.join()
39     p2.join()
multiprocessing.Pipe使用演示

 

 

进程之间的数据共享:

  multiprocessing.Manager() ----------- 返回开始的SyncManager对象,可用于在进程之间共享对象。返回的管理器对象对应于生成的子进程,并且具有将创建共享对象并返回相应代理的方法。管理器进程将在垃圾收集或其父进程退出时立即关闭。

  例三(Manager的简单使用):

 1 import multiprocessing,time
 2 import os
 3 
 4 class Processing(multiprocessing.Process):
 5     def __init__(self, d, l):
 6         super(Processing, self).__init__()
 7         self.d = d
 8         self.l = l
 9 
10     def run(self):
11         self.d[os.getpid()] = os.getpid()    #当作正常dict使用即可
12         self.l.append(1)
13         print(self.l)
14 
15 if __name__ == "__main__":
16 
17     manager = multiprocessing.Manager()   #生成Manager 对象
18     d = manager.dict()   #生成共享dict
19     l = manager.list()   #生成共享list
20 
21     p_s = []
22     for i in range(10):
23         p = Processing(d, l)
24         p.start()
25         p_s.append(p)
26 
27     for p in p_s:
28         p.join()
29 
30     print(d)
31     print(l)
Manager简单使用

  manager可以生成以下共享数据对象(常用):

Event ()

Create a shared threading.Event object and return a proxy for it.

Lock ()

Create a shared threading.Lock object and return a proxy for it.

Namespace ()

Create a shared Namespace object and return a proxy for it.

Queue ([maxsize])

Create a shared queue.Queue object and return a proxy for it.

RLock ()

Create a shared threading.RLock object and return a proxy for it.

Semaphore ([value])

Create a shared threading.Semaphore object and return a proxy for it.

Array (typecodesequence)

Create an array and return a proxy for it.

Value (typecodevalue)

Create an object with a writable value attribute and return a proxy for it.

dict ()
dict (mapping)
dict (sequence)

Create a shared dict object and return a proxy for it.

list ()
list (sequence)

Create a shared list object and return a proxy for it.

 

 进程锁:

  进程锁有两种multiprocessing.Lock(非递归锁)和multiprocessing.RLock(递归锁)。

  multiprocessing.Lock(非递归锁):一旦进程或线程获得了锁,随后从任何进程或线程获取它的尝试将阻塞,直到它被释放;任何进程或线程都可以释放它。

  multiprocessing.RLock(递归锁): A recursive lock must be released by the process or thread that acquired it. Once a process or thread has acquired a recursive lock, the same process or thread may acquire it again without blocking; that process or thread must release it once for each time it has been acquired.

  这两种锁都只用两种方法:acquire(block=Truetimeout=None)和release(),它们的使用基本和线程锁类似(只不是要把锁的示例对象当作参数传入其他的进程):http://www.cnblogs.com/God-Li/p/7732407.html

 

进程池:

  为了便于对多进程的管理,通常使用进程池来进行多进程编程(而不是使用multiprocessing.Process)。

  例:

 1 import multiprocessing,os
 2 import time
 3 
 4 
 5 def run():
 6     print(str(os.getpid()) + "-----running")
 7     time.sleep(2)
 8     print(str(os.getpid()) + "-----done")
 9 
10 def done():
11     print("done")
12 
13 def error():
14     print("error")
15 
16 if __name__ == "__main__":
17     pool = multiprocessing.Pool(processes=4)    #实力化进程池对象
18 
19     for i in range(8):
20         # pool.apply(func=run)      #进程池中的进程串行运行
21         pool.apply_async(func=run)
22 
23     pool.close()
24     pool.join()
25     print("finish....")
View Code

  Pool对象常用方法:

apply(func[, args[, kwds]])

Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

将任务提交到进程池,只有一个进程在工作,其他进程处于阻塞状态(相当于串行运行)。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

A variant of the apply() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.

If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.

Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.

将任务提交到进程池,多个进程(进程数量由之前实例化时的processes参数设置)同时运行,callback工作进程完成时(由当前进程的父进程)调用由此传入的任务,error_callback工作进程出错时(由当前进程的父进程)调用由此传入的任务。

close()

Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.

调用此方法后进程池不能在提交新的任务

terminate()

Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected terminate() will be called immediately.

立即停止工作进程,而不需要等待未完成的工作进程。

join()

Wait for the worker processes to exit. One must call close() or terminate() before using join().

等待进程池中的工作进程结束(在此之前必须调用close()或者terminate())。

注:Pool对象在生成时进程内的进程(阻塞)就已经启动,使用apply(或者apply_async)方法只是将任务提交给线程池,不会再建立新进程。

 

目录
相关文章
|
2月前
|
SQL 关系型数据库 数据库
Python SQLAlchemy模块:从入门到实战的数据库操作指南
免费提供Python+PyCharm编程环境,结合SQLAlchemy ORM框架详解数据库开发。涵盖连接配置、模型定义、CRUD操作、事务控制及Alembic迁移工具,以电商订单系统为例,深入讲解高并发场景下的性能优化与最佳实践,助你高效构建数据驱动应用。
378 7
|
2月前
|
监控 安全 程序员
Python日志模块配置:从print到logging的优雅升级指南
从 `print` 到 `logging` 是 Python 开发的必经之路。`print` 调试简单却难维护,日志混乱、无法分级、缺乏上下文;而 `logging` 支持级别控制、多输出、结构化记录,助力项目可维护性升级。本文详解痛点、优势、迁移方案与最佳实践,助你构建专业日志系统,让程序“有记忆”。
270 0
|
2月前
|
JSON 算法 API
Python中的json模块:从基础到进阶的实用指南
本文深入解析Python内置json模块的使用,涵盖序列化与反序列化核心函数、参数配置、中文处理、自定义对象转换及异常处理,并介绍性能优化与第三方库扩展,助你高效实现JSON数据交互。(238字)
398 4
|
2月前
|
Java 调度 数据库
Python threading模块:多线程编程的实战指南
本文深入讲解Python多线程编程,涵盖threading模块的核心用法:线程创建、生命周期、同步机制(锁、信号量、条件变量)、线程通信(队列)、守护线程与线程池应用。结合实战案例,如多线程下载器,帮助开发者提升程序并发性能,适用于I/O密集型任务处理。
304 0
|
2月前
|
XML JSON 数据处理
超越JSON:Python结构化数据处理模块全解析
本文深入解析Python中12个核心数据处理模块,涵盖csv、pandas、pickle、shelve、struct、configparser、xml、numpy、array、sqlite3和msgpack,覆盖表格处理、序列化、配置管理、科学计算等六大场景,结合真实案例与决策树,助你高效应对各类数据挑战。(238字)
214 0
|
3月前
|
安全 大数据 程序员
Python operator模块的methodcaller:一行代码搞定对象方法调用的黑科技
`operator.methodcaller`是Python中处理对象方法调用的高效工具,替代冗长Lambda,提升代码可读性与性能。适用于数据过滤、排序、转换等场景,支持参数传递与链式调用,是函数式编程的隐藏利器。
145 4
|
3月前
|
存储 数据库 开发者
Python SQLite模块:轻量级数据库的实战指南
本文深入讲解Python内置sqlite3模块的实战应用,涵盖数据库连接、CRUD操作、事务管理、性能优化及高级特性,结合完整案例,助你快速掌握SQLite在小型项目中的高效使用,是Python开发者必备的轻量级数据库指南。
336 0
|
9月前
|
Linux 数据库 Perl
【YashanDB 知识库】如何避免 yasdb 进程被 Linux OOM Killer 杀掉
本文来自YashanDB官网,探讨Linux系统中OOM Killer对数据库服务器的影响及解决方法。当内存接近耗尽时,OOM Killer会杀死占用最多内存的进程,这可能导致数据库主进程被误杀。为避免此问题,可采取两种方法:一是在OS层面关闭OOM Killer,通过修改`/etc/sysctl.conf`文件并重启生效;二是豁免数据库进程,由数据库实例用户借助`sudo`权限调整`oom_score_adj`值。这些措施有助于保护数据库进程免受系统内存管理机制的影响。
|
9月前
|
Linux Shell
Linux 进程前台后台切换与作业控制
进程前台/后台切换及作业控制简介: 在 Shell 中,启动的程序默认为前台进程,会占用终端直到执行完毕。例如,执行 `./shella.sh` 时,终端会被占用。为避免不便,可将命令放到后台运行,如 `./shella.sh &`,此时终端命令行立即返回,可继续输入其他命令。 常用作业控制命令: - `fg %1`:将后台作业切换到前台。 - `Ctrl + Z`:暂停前台作业并放到后台。 - `bg %1`:让暂停的后台作业继续执行。 - `kill %1`:终止后台作业。 优先级调整:
732 5
|
运维 关系型数据库 MySQL
掌握taskset:优化你的Linux进程,提升系统性能
在多核处理器成为现代计算标准的今天,运维人员和性能调优人员面临着如何有效利用这些处理能力的挑战。优化进程运行的位置不仅可以提高性能,还能更好地管理和分配系统资源。 其中,taskset命令是一个强大的工具,它允许管理员将进程绑定到特定的CPU核心,减少上下文切换的开销,从而提升整体效率。
掌握taskset:优化你的Linux进程,提升系统性能

推荐镜像

更多