python --- mulitprocessing(多进程)模块使用

简介: 1. 什么是进程?  进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。

1. 什么是进程?

  进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。在早期面向进程设计的计算机结构中,进程是程序的基本执行实体;在当代面向线程设计的计算机结构中,进程是线程的容器。

一个进程至少包含一个线程。

2. 在python中有了多线程编程为何还需要多进程编程?

  在python中由于有GIL(全局解释器锁)的存在,在任一时刻只有一个线程在运行(无论你的CPU是多少核),无法实现真正的多线程。那么该如何让python程序真正的并行运行呢?答案就是不要使用多线程,使用多进程。python标准库提供了multiprocessing模块(multiprocessing是一个和threading模块类似,提供API,生成进程的模块。multiprocessing包提供本地和远程并发,通过使用子进程而不是线程有效地转移全局解释器锁。),它的API几乎复制了threading模块的API,当然它还有一行threading模块没有的API。

 

例一(multiprocessing模块的简单使用):

 1 import multiprocessing,time
 2 
 3 class Task(multiprocessing.Process):
 4     def __init__(self):
 5         super(Task, self).__init__()
 6 
 7     def run(self):
 8         print("Process---%s" % self.name)
 9         time.sleep(2)
10 
11 
12 if __name__ == "__main__":
13     for i in range(1, 8+1):
14         t = Task()
15         t.start()
View Code

  注:由于multiprocessing模块基本的API同threading模块,就不挨个演示了,本文主要讲解multiprocessing模块不同于threading模块的API的使用。要了解其他同threading模块相同的API的使用,可参见:http://www.cnblogs.com/God-Li/p/7732407.html

 

multiprocessing.Process源码:

class Process(object):
    def __init__(self, group=None, target=None, name=None, args=(), kwargs={}):
        self.name = ''
        self.daemon = False      #守护进程标志,必须在start()之前设置
        self.authkey = None      #The process’s authentication key (a byte string).
        self.exitcode = None     #The child’s exit code. This will be None if the process has not yet terminated. A negative value -N indicates that the child was terminated by signal N.
        self.ident = 0
        self.pid = 0          #进程ID。在生成进程之前,这将是Non。
        self.sentinel = None   #A numeric handle of a system object which will become “ready” when the process ends.

    def run(self):
        pass

    def start(self):
        pass

    def terminate(self):
        """
        Terminate the process. On Unix this is done using the SIGTERM signal; on Windows TerminateProcess() is used. 
        Note that exit handlers and finally clauses, etc., will not be executed.
        Note that descendant processes of the process will not be terminated – they will simply become orphaned.
        :return: 
        """
        pass

    def join(self, timeout=None):
        pass

    def is_alive(self):
        return False

 

multiprocessing模块中的队列:

  class multiprocessing.Queue([maxsize])实现除task_done()join()之外的queue.Queue的所有方法,下面列出queue.Queue中没有的方法:

class multiprocessing.Queue([maxsize])
    close()
    """
    指示当前进程不会在此队列上放置更多数据。
    The background thread will quit once it has flushed all buffered data to the pipe.
    当队列被垃圾回收时,这被自动调用。
    """

    join_thread()
    """
    加入后台线程。这只能在调用close()之后使用。它阻塞直到后台线程退出,确保缓冲区中的所有数据都已刷新到pipe。
    默认情况下,如果进程不是队列的创建者,那么在退出时它将尝试加入队列的后台线程。
    该进程可以调用cancel_join_thread()使join_thread()不执行任何操作
    """

    cancel_join_thread()
    """
    使join_thread()不执行任何操作
    """

  class multiprocessing.SimpleQueue是class multiprocessing.Queue([maxsize])的简化,只有三个方法------empty(), get(), put()

   class multiprocessing.JoinableQueue([maxsize])是class multiprocessing.Queue([maxsize])的子类,增加了take_done()和join()方法

   注:由于进程之间内存空间不共享,所以必须将实例化后的queue对象当作参数传入其他进程,其他进程才能使用。而且,每传入一次相当于克隆一份,与原来的queue独立,只是python会同步queue中的数据,而不是像在多线程的queue数据只有一份。

 

进程之间的通信:

  multiprocessing.Pipe([duplex])  --------------- 返回表示管道末端的Connection对象(类似与socket中的连接可用于发送和接收数据)的(conn1, conn2)。 

    如果duplex是True(默认值),则管道是双向的。如果duplex是False,则管道是单向的:conn1只能用于接收消息,conn2用于发送消息。

  例二(multiprocessing.Pipe使用演示):

 1 import multiprocessing,time
 2 
 3 class Processing_1(multiprocessing.Process):
 4     def __init__(self, conn):
 5         super(Processing_1, self).__init__()
 6         self.conn = conn
 7     def run(self):
 8         send_data = "this message is from p1"
 9         self.conn.send(send_data)   #使用conn发送数据
10         time.sleep(0.8)
11         recv_data = self.conn.recv() #使用conn接收数据
12         print("p1 recv: " + recv_data)
13         self.conn.close()
14 
15 
16 class Processing_2(multiprocessing.Process):
17     def __init__(self, conn):
18         super(Processing_2, self).__init__()
19         self.conn = conn
20 
21     def run(self):
22         send_data = "this message is from p2"
23         self.conn.send(send_data)
24         time.sleep(0.8)
25         recv_data = self.conn.recv()
26         print("p2 recv: " + recv_data)
27         self.conn.close()
28 
29 if __name__ == "__main__":
30     conn1, conn2 = multiprocessing.Pipe()   #实例化Pipe对象,conn1, conn2分别代表连接两端
31 
32     p1 = Processing_1(conn1)   #将连接对象当作参数传递给子进程
33     p2 = Processing_2(conn2)
34 
35     p1.start()
36     p2.start()
37 
38     p1.join()
39     p2.join()
multiprocessing.Pipe使用演示

 

 

进程之间的数据共享:

  multiprocessing.Manager() ----------- 返回开始的SyncManager对象,可用于在进程之间共享对象。返回的管理器对象对应于生成的子进程,并且具有将创建共享对象并返回相应代理的方法。管理器进程将在垃圾收集或其父进程退出时立即关闭。

  例三(Manager的简单使用):

 1 import multiprocessing,time
 2 import os
 3 
 4 class Processing(multiprocessing.Process):
 5     def __init__(self, d, l):
 6         super(Processing, self).__init__()
 7         self.d = d
 8         self.l = l
 9 
10     def run(self):
11         self.d[os.getpid()] = os.getpid()    #当作正常dict使用即可
12         self.l.append(1)
13         print(self.l)
14 
15 if __name__ == "__main__":
16 
17     manager = multiprocessing.Manager()   #生成Manager 对象
18     d = manager.dict()   #生成共享dict
19     l = manager.list()   #生成共享list
20 
21     p_s = []
22     for i in range(10):
23         p = Processing(d, l)
24         p.start()
25         p_s.append(p)
26 
27     for p in p_s:
28         p.join()
29 
30     print(d)
31     print(l)
Manager简单使用

  manager可以生成以下共享数据对象(常用):

Event ()

Create a shared threading.Event object and return a proxy for it.

Lock ()

Create a shared threading.Lock object and return a proxy for it.

Namespace ()

Create a shared Namespace object and return a proxy for it.

Queue ([maxsize])

Create a shared queue.Queue object and return a proxy for it.

RLock ()

Create a shared threading.RLock object and return a proxy for it.

Semaphore ([value])

Create a shared threading.Semaphore object and return a proxy for it.

Array (typecodesequence)

Create an array and return a proxy for it.

Value (typecodevalue)

Create an object with a writable value attribute and return a proxy for it.

dict ()
dict (mapping)
dict (sequence)

Create a shared dict object and return a proxy for it.

list ()
list (sequence)

Create a shared list object and return a proxy for it.

 

 进程锁:

  进程锁有两种multiprocessing.Lock(非递归锁)和multiprocessing.RLock(递归锁)。

  multiprocessing.Lock(非递归锁):一旦进程或线程获得了锁,随后从任何进程或线程获取它的尝试将阻塞,直到它被释放;任何进程或线程都可以释放它。

  multiprocessing.RLock(递归锁): A recursive lock must be released by the process or thread that acquired it. Once a process or thread has acquired a recursive lock, the same process or thread may acquire it again without blocking; that process or thread must release it once for each time it has been acquired.

  这两种锁都只用两种方法:acquire(block=Truetimeout=None)和release(),它们的使用基本和线程锁类似(只不是要把锁的示例对象当作参数传入其他的进程):http://www.cnblogs.com/God-Li/p/7732407.html

 

进程池:

  为了便于对多进程的管理,通常使用进程池来进行多进程编程(而不是使用multiprocessing.Process)。

  例:

 1 import multiprocessing,os
 2 import time
 3 
 4 
 5 def run():
 6     print(str(os.getpid()) + "-----running")
 7     time.sleep(2)
 8     print(str(os.getpid()) + "-----done")
 9 
10 def done():
11     print("done")
12 
13 def error():
14     print("error")
15 
16 if __name__ == "__main__":
17     pool = multiprocessing.Pool(processes=4)    #实力化进程池对象
18 
19     for i in range(8):
20         # pool.apply(func=run)      #进程池中的进程串行运行
21         pool.apply_async(func=run)
22 
23     pool.close()
24     pool.join()
25     print("finish....")
View Code

  Pool对象常用方法:

apply(func[, args[, kwds]])

Call func with arguments args and keyword arguments kwds. It blocks until the result is ready. Given this blocks, apply_async() is better suited for performing work in parallel. Additionally, func is only executed in one of the workers of the pool.

将任务提交到进程池,只有一个进程在工作,其他进程处于阻塞状态(相当于串行运行)。

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

A variant of the apply() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it, that is unless the call failed, in which case the error_callback is applied instead.

If error_callback is specified then it should be a callable which accepts a single argument. If the target function fails, then the error_callback is called with the exception instance.

Callbacks should complete immediately since otherwise the thread which handles the results will get blocked.

将任务提交到进程池,多个进程(进程数量由之前实例化时的processes参数设置)同时运行,callback工作进程完成时(由当前进程的父进程)调用由此传入的任务,error_callback工作进程出错时(由当前进程的父进程)调用由此传入的任务。

close()

Prevents any more tasks from being submitted to the pool. Once all the tasks have been completed the worker processes will exit.

调用此方法后进程池不能在提交新的任务

terminate()

Stops the worker processes immediately without completing outstanding work. When the pool object is garbage collected terminate() will be called immediately.

立即停止工作进程,而不需要等待未完成的工作进程。

join()

Wait for the worker processes to exit. One must call close() or terminate() before using join().

等待进程池中的工作进程结束(在此之前必须调用close()或者terminate())。

注:Pool对象在生成时进程内的进程(阻塞)就已经启动,使用apply(或者apply_async)方法只是将任务提交给线程池,不会再建立新进程。

 

目录
相关文章
|
10天前
|
消息中间件 安全 Kafka
Python IPC机制全攻略:让进程间通信变得像呼吸一样自然
【9月更文挑战第12天】在编程领域,进程间通信(IPC)是连接独立执行单元的关键技术。Python凭借简洁的语法和丰富的库支持,提供了多种IPC方案。本文将对比探讨Python的IPC机制,包括管道与消息队列、套接字与共享内存。管道适用于简单场景,而消息队列更灵活,适合高并发环境。套接字广泛用于网络通信,共享内存则在本地高效传输数据。通过示例代码展示`multiprocessing.Queue`的使用,帮助读者理解IPC的实际应用。希望本文能让你更熟练地选择和运用IPC机制。
32 10
|
6天前
|
监控 Ubuntu API
Python脚本监控Ubuntu系统进程内存的实现方式
通过这种方法,我们可以很容易地监控Ubuntu系统中进程的内存使用情况,对于性能分析和资源管理具有很大的帮助。这只是 `psutil`库功能的冰山一角,`psutil`还能够提供更多关于系统和进程的详细信息,强烈推荐进一步探索这个强大的库。
18 1
|
9天前
|
Python
惊!Python进程间通信IPC,让你的程序秒变社交达人,信息畅通无阻
【9月更文挑战第13天】在编程的世界中,进程间通信(IPC)如同一场精彩的社交舞会,每个进程通过优雅的IPC机制交换信息,协同工作。本文将带你探索Python中的IPC奥秘,了解它是如何让程序实现无缝信息交流的。IPC如同隐形桥梁,连接各进程,使其跨越边界自由沟通。Python提供了多种IPC机制,如管道、队列、共享内存及套接字,适用于不同场景。通过一个简单的队列示例,我们将展示如何使用`multiprocessing.Queue`实现进程间通信,使程序如同社交达人般高效互动。掌握IPC,让你的程序在编程舞台上大放异彩。
12 3
|
11天前
|
安全 开发者 Python
Python IPC大揭秘:解锁进程间通信新姿势,让你的应用无界连接
【9月更文挑战第11天】在编程世界中,进程间通信(IPC)如同一座无形的桥梁,连接不同进程的信息孤岛,使应用无界而广阔。Python凭借其丰富的IPC机制,让开发者轻松实现进程间的无缝交流。本文将揭开Python IPC的神秘面纱,介绍几种关键的IPC技术:管道提供简单的单向数据传输,适合父子进程间通信;队列则是线程和进程安全的数据共享结构,支持多进程访问;共享内存允许快速读写大量数据,需配合锁机制确保一致性;套接字则能实现跨网络的通信,构建分布式系统。掌握这些技术,你的应用将不再受限于单个进程,实现更强大的功能。
27 5
|
11天前
|
消息中间件 Kafka 数据安全/隐私保护
Python IPC实战指南:构建高效稳定的进程间通信桥梁
【9月更文挑战第11天】在软件开发中,随着应用复杂度的提升,进程间通信(IPC)成为构建高效系统的关键。本文通过一个分布式日志处理系统的案例,介绍如何使用Python和套接字实现可靠的IPC。案例涉及定义通信协议、实现日志发送与接收,并提供示例代码。通过本教程,你将学会构建高效的IPC桥梁,并了解如何根据需求选择合适的IPC机制,确保系统的稳定性和安全性。
27 5
|
11天前
|
Java Serverless Python
探索Python中的并发编程与`concurrent.futures`模块
探索Python中的并发编程与`concurrent.futures`模块
15 4
|
13天前
|
消息中间件 网络协议 Python
工具人逆袭!掌握Python IPC,让你的进程从此告别单打独斗
【9月更文挑战第9天】你是否曾遇到多个Python程序像孤岛般无法通信,导致数据孤立、任务难协同的问题?掌握进程间通信(IPC)技术,可助你打破这一僵局。IPC是不同进程间传递数据或信号的机制,在Python中常用的方法有管道、消息队列、共享内存及套接字等。其中,管道适用于父子或兄弟进程间简单数据传递;套接字则不仅限于本地,还能在网络间实现复杂的数据交换。通过学习IPC,你将能设计更健壮灵活的系统架构,成为真正的编程高手。
15 3
|
14天前
|
安全 开发者 Python
揭秘Python IPC:进程间的秘密对话,让你的系统编程更上一层楼
【9月更文挑战第8天】在系统编程中,进程间通信(IPC)是实现多进程协作的关键技术。IPC机制如管道、队列、共享内存和套接字,使进程能在独立内存空间中共享信息,提升系统并发性和灵活性。Python提供了丰富的IPC工具,如`multiprocessing.Pipe()`和`multiprocessing.Queue()`,简化了进程间通信的实现。本文将从理论到实践,详细介绍各种IPC机制的特点和应用场景,帮助开发者构建高效、可靠的多进程应用。掌握Python IPC,让系统编程更加得心应手。
20 4
|
13天前
|
消息中间件 数据库 Python
深度剖析!Python IPC的奥秘,带你走进进程间通信的微观世界
【9月更文挑战第8天】在编程世界中,进程间通信(IPC)是连接不同程序或进程的关键技术,使数据在独立进程间自由流动,构建复杂软件系统。本文将深入探讨Python中的IPC机制,包括管道、消息队列、套接字等,并通过具体示例展示如何使用Socket实现网络IPC。Python的`multiprocessing`模块还提供了队列、管道和共享内存等多种高效IPC方式。通过本文,你将全面了解Python IPC的核心概念与应用技巧,助力开发高效协同的软件系统。
35 2
|
15天前
|
消息中间件 数据采集 数据库
庆祝吧!Python IPC让进程间的合作,比团队游戏还默契
【9月更文挑战第7天】在这个数字化时代,软件系统日益复杂,单进程已难以高效处理海量数据。Python IPC(进程间通信)技术应运而生,使多进程协作如同训练有素的电竞战队般默契。通过`multiprocessing`模块中的Pipe等功能,进程间可以直接传递数据,无需依赖低效的文件共享或数据库读写。此外,Python IPC还提供了消息队列、共享内存和套接字等多种机制,适用于不同场景,使进程间的合作更加高效、精准。这一技术革新让开发者能轻松应对复杂挑战,构建更健壮的软件系统。
26 1