Python 进阶(二):多进程

简介: Python多进程

1. 简介

进程:通常一个运行着的应用程序就是一个进程,比如:我启动了一个音乐播放器,现在它就是一个进程。
线程:线程是进程的最小执行单元,比如:我在刚启动的音乐播放器上选了一首歌曲进行播放,这就是一个线程。

多线程一文中,我们说了因为 GIL 的原因,CPython 解释器下的多线程牺牲了并行性,为此 Python 提供了多进程模块 multiprocessing,该模块同时提供了本地和远程并发,使用子进程代替线程,可以有效的避免 GIL 带来的影响,能够充分发挥机器上的多核优势,可以实现真正的并行效果,并且它与 threading 模块的 API 基本类似,使用起来也比较方便。

2. 使用

2.1 Process 类

multiprocessing 模块通过创建一个 Process 对象然后调用它的 start() 方法来生成进程,Processthreading.Thread API 相同。

multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

进程对象,表示在单独进程中运行的活动。参数说明如下:

  • group:仅用于兼容 threading.Thread,应该始终是 None。

  • target:由 run() 方法调用的可调用对象。

  • name:进程名。

  • args:目标调用的参数元组。

  • kwargs:目标调用的关键字参数字典。

  • daemon:设置进程是否为守护进程,如果是默认值 None,则该标志将从创建的进程继承。

multiprocessing.Process 对象具有如下方法和属性。

  • run():进程具体执行的方法。

  • start():启动进程。

  • join([timeout]):如果可选参数 timeout 是默认值 None,则将阻塞至调用 join() 方法的进程终止;如果 timeout 是一个正数,则最多会阻塞 timeout 秒。

  • name:进程的名称。

  • is_alive():返回进程是否还活着。

  • daemon:进程的守护标志,是一个布尔值。

  • pid:返回进程 ID。

  • exitcode:子进程的退出代码。

  • authkey:进程的身份验证密钥。

  • sentinel:系统对象的数字句柄,当进程结束时将变为 ready。

  • terminate():终止进程。

  • kill():与 terminate() 相同,但在 Unix 上使用 SIGKILL 信号。

  • close():关闭 Process 对象,释放与之关联的所有资源。

看一个使用多进程的示例。

from multiprocessing import Process
import time, os

def target():
    time.sleep(2)
    print ('子进程ID:', os.getpid())

if __name__=='__main__':
    print ('主进程ID:', os.getpid())
    ps = []
    for i in range(10):
        p = Process(target=target)
        p.start()
        ps.append(p)
    for p in ps:
        p.join()

当进程数量比较多时,我们可以利用进程池方便、高效的对进程进行使用和管理。

multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

进程池对象。参数说明如下:

  • processes:工作进程数目,如果 processes 为 None,则使用 os.cpu_count() 返回的值。

  • initializer:如果 initializer 不为 None,则每个工作进程将会在启动时调用 initializer(*initargs)。

  • maxtasksperchild:一个工作进程在它退出或被一个新的工作进程代替之前能完成的任务数量,为了释放未使用的资源。

  • context:用于指定启动的工作进程的上下文。

有如下两种方式向进程池提交任务:

  • apply(func[, args[, kwds]]):阻塞方式。

  • apply_async(func[, args[, kwds[, callback[, error_callback]]]]):非阻塞方式。

import multiprocessing, time

def target(p):
    print('t')
    time.sleep(2)
    print(p)

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes = 5)
    for i in range(3):
        p = 'p%d'%(i)
        # 阻塞式
        pool.apply(target, (p, ))
        # 非阻塞式
        # pool.apply_async(target, (p, ))
    pool.close()
    pool.join()

2.2 进程间交换数据

2.2.1 管道

multiprocessing.Pipe([duplex])

返回一对 Connection 对象 (conn1, conn2) , 分别表示管道的两端;如果 duplex 被置为 True (默认值),那么该管道是双向的,否则管道是单向的。

from multiprocessing import Pipe, Process

def setData(conn, data):
    conn.send(data)

def printData(conn):
    print(conn.recv())

if __name__ == "__main__":
    data = '程序之间'
    # 创建管道返回管道的两端
    conn1, conn2 = Pipe()
    p1 = Process(target=setData, args=(conn1, data,))
    p2 = Process(target=printData, args=(conn2,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

2.2.2 队列

multiprocessing.Queue([maxsize])

返回一个共享队列实例。具有如下方法:

  • qsize():返回队列的大致长度。

  • empty():如果队列是空的,返回 True,反之返回 False。

  • full():如果队列是满的,返回 True,反之返回 False。

  • put(obj[, block[, timeout]]):将 obj 放入队列。

  • put_nowait(obj):相当于 put(obj, False)。

  • get([block[, timeout]]):从队列中取出并返回对象。

  • get_nowait():相当于 get(False)。

  • close():指示当前进程将不会再往队列中放入对象。

  • join_thread():等待后台线程。

  • cancel_join_thread():防止进程退出时自动等待后台线程退出。

from multiprocessing import Queue, Process

def setData(q, data):
    q.put(data)

def printData(q):
    print(q.get())

if __name__ == "__main__":
    data = '程序之间'
    q = Queue()
    p1 = Process(target=setData, args=(q, data,))
    p2 = Process(target=printData, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

2.3 进程间同步

多进程之间不共享数据,但共享同一套文件系统,像访问同一个文件、同一终端打印,如果不进行同步操作,就会出现错乱的现象。

所有在 threading 存在的同步方式,multiprocessing 中都有类似的等价物,如:锁、信号量等。以锁的方式为例,我们来看一个终端打印例子。

不加锁

from multiprocessing import Process
import os,time

def target():
    print('p%s is start' %os.getpid())
    time.sleep(2)
    print('p%s is end' %os.getpid())

if __name__ == '__main__':
    for i in range(3):
        p=Process(target=target)
        p.start()

执行结果:

p7996 is start
p10404 is start
p10744 is start
p7996 is end
p10404 is end
p10744 is end

加锁

from multiprocessing import Process, Lock
import os,time

def target(lock):
    lock.acquire()
    print('p%s is start' %os.getpid())
    time.sleep(2)
    print('p%s is end' %os.getpid())
    lock.release()

if __name__ == '__main__':
    lock = Lock()
    for i in range(3):
        p=Process(target=target, args=(lock,))
        p.start()

执行结果:

p11064 is start
p11064 is end
p1532 is start
p1532 is end
p11620 is start
p11620 is end

2.4 进程间共享状态

并发编程时,通常尽量避免使用共享状态,但如果有一些数据确实需要在进程之间共享怎么办呢?对于这种情况,multiprocessing 模块提供了两种方式。

2.4.1 共享内存

multiprocessing.Value(typecode_or_type, *args, lock=True)

返回一个从共享内存上创建的对象。参数说明如下:

  • typecode_or_type:返回的对象类型。

  • *args:传给类的构造函数。

  • lock:如果 lock 值是 True(默认值),将会新建一个递归锁用于同步此值的访问操作;如果 lock 值是 Lock、RLock 对象,那么这个传入的锁将会用于同步这个值的访问操作;如果 lock 是 False,那么对这个对象的访问将没有锁保护,也就是说这个变量不是进程安全的。

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

从共享内存中申请并返回一个数组对象。

  • typecode_or_type:返回的数组中的元素类型。

  • size_or_initializer:如果参数值是一个整数,则会当做数组的长度;否则参数会被当成一个序列用于初始化数组中的每一个元素,并且会根据元素个数自动判断数组的长度。

  • lock:说明同上。

使用 Value 或 Array 将数据存储在共享内存映射中。

from multiprocessing import Process, Value, Array

def setData(n, a):
    n.value = 1024
    for i in range(len(a)):
        a[i] = -a[i]

def printData(n, a):
    print(num.value)
    print(arr[:])

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(5))
    print(num.value)
    print(arr[:])
    print('-----------------------')
    p = Process(target=setData, args=(num, arr))
    p.start()
    p.join()
    print(num.value)
    print(arr[:])

2.4.2 服务进程

由 Manager() 返回的管理器对象控制一个服务进程,该进程保存 Python 对象并允许其他进程使用代理操作它们。

Manager() 返回的管理器支持类型包括:list、dict、Namespace、Lock、RLock、Semaphore、BoundedSemaphore、Condition、Event、Barrier、Queue、Value 和 Array。

from multiprocessing import Process, Manager

def setData(d, l):
    d[1] = '1'
    d[0.5] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(5))
        print(d)
        print(l)
        print('-----------------------')
        p = Process(target=setData, args=(d, l))
        p.start()
        p.join()
        print(d)
        print(l)
相关文章
|
8天前
|
消息中间件 安全 Kafka
Python IPC机制全攻略:让进程间通信变得像呼吸一样自然
【9月更文挑战第12天】在编程领域,进程间通信(IPC)是连接独立执行单元的关键技术。Python凭借简洁的语法和丰富的库支持,提供了多种IPC方案。本文将对比探讨Python的IPC机制,包括管道与消息队列、套接字与共享内存。管道适用于简单场景,而消息队列更灵活,适合高并发环境。套接字广泛用于网络通信,共享内存则在本地高效传输数据。通过示例代码展示`multiprocessing.Queue`的使用,帮助读者理解IPC的实际应用。希望本文能让你更熟练地选择和运用IPC机制。
31 10
|
24天前
|
数据采集 存储 安全
如何确保Python Queue的线程和进程安全性:使用锁的技巧
本文探讨了在Python爬虫技术中使用锁来保障Queue(队列)的线程和进程安全性。通过分析`queue.Queue`及`multiprocessing.Queue`的基本线程与进程安全特性,文章指出在特定场景下使用锁的重要性。文中还提供了一个综合示例,该示例利用亿牛云爬虫代理服务、多线程技术和锁机制,实现了高效且安全的网页数据采集流程。示例涵盖了代理IP、User-Agent和Cookie的设置,以及如何使用BeautifulSoup解析HTML内容并将其保存为文档。通过这种方式,不仅提高了数据采集效率,还有效避免了并发环境下的数据竞争问题。
如何确保Python Queue的线程和进程安全性:使用锁的技巧
|
4天前
|
监控 Ubuntu API
Python脚本监控Ubuntu系统进程内存的实现方式
通过这种方法,我们可以很容易地监控Ubuntu系统中进程的内存使用情况,对于性能分析和资源管理具有很大的帮助。这只是 `psutil`库功能的冰山一角,`psutil`还能够提供更多关于系统和进程的详细信息,强烈推荐进一步探索这个强大的库。
16 1
|
7天前
|
Python
惊!Python进程间通信IPC,让你的程序秒变社交达人,信息畅通无阻
【9月更文挑战第13天】在编程的世界中,进程间通信(IPC)如同一场精彩的社交舞会,每个进程通过优雅的IPC机制交换信息,协同工作。本文将带你探索Python中的IPC奥秘,了解它是如何让程序实现无缝信息交流的。IPC如同隐形桥梁,连接各进程,使其跨越边界自由沟通。Python提供了多种IPC机制,如管道、队列、共享内存及套接字,适用于不同场景。通过一个简单的队列示例,我们将展示如何使用`multiprocessing.Queue`实现进程间通信,使程序如同社交达人般高效互动。掌握IPC,让你的程序在编程舞台上大放异彩。
12 3
|
9天前
|
安全 开发者 Python
Python IPC大揭秘:解锁进程间通信新姿势,让你的应用无界连接
【9月更文挑战第11天】在编程世界中,进程间通信(IPC)如同一座无形的桥梁,连接不同进程的信息孤岛,使应用无界而广阔。Python凭借其丰富的IPC机制,让开发者轻松实现进程间的无缝交流。本文将揭开Python IPC的神秘面纱,介绍几种关键的IPC技术:管道提供简单的单向数据传输,适合父子进程间通信;队列则是线程和进程安全的数据共享结构,支持多进程访问;共享内存允许快速读写大量数据,需配合锁机制确保一致性;套接字则能实现跨网络的通信,构建分布式系统。掌握这些技术,你的应用将不再受限于单个进程,实现更强大的功能。
26 5
|
9天前
|
消息中间件 Kafka 数据安全/隐私保护
Python IPC实战指南:构建高效稳定的进程间通信桥梁
【9月更文挑战第11天】在软件开发中,随着应用复杂度的提升,进程间通信(IPC)成为构建高效系统的关键。本文通过一个分布式日志处理系统的案例,介绍如何使用Python和套接字实现可靠的IPC。案例涉及定义通信协议、实现日志发送与接收,并提供示例代码。通过本教程,你将学会构建高效的IPC桥梁,并了解如何根据需求选择合适的IPC机制,确保系统的稳定性和安全性。
25 5
|
11天前
|
消息中间件 网络协议 Python
工具人逆袭!掌握Python IPC,让你的进程从此告别单打独斗
【9月更文挑战第9天】你是否曾遇到多个Python程序像孤岛般无法通信,导致数据孤立、任务难协同的问题?掌握进程间通信(IPC)技术,可助你打破这一僵局。IPC是不同进程间传递数据或信号的机制,在Python中常用的方法有管道、消息队列、共享内存及套接字等。其中,管道适用于父子或兄弟进程间简单数据传递;套接字则不仅限于本地,还能在网络间实现复杂的数据交换。通过学习IPC,你将能设计更健壮灵活的系统架构,成为真正的编程高手。
15 3
|
12天前
|
安全 开发者 Python
揭秘Python IPC:进程间的秘密对话,让你的系统编程更上一层楼
【9月更文挑战第8天】在系统编程中,进程间通信(IPC)是实现多进程协作的关键技术。IPC机制如管道、队列、共享内存和套接字,使进程能在独立内存空间中共享信息,提升系统并发性和灵活性。Python提供了丰富的IPC工具,如`multiprocessing.Pipe()`和`multiprocessing.Queue()`,简化了进程间通信的实现。本文将从理论到实践,详细介绍各种IPC机制的特点和应用场景,帮助开发者构建高效、可靠的多进程应用。掌握Python IPC,让系统编程更加得心应手。
18 4
|
12天前
|
消息中间件 数据库 Python
深度剖析!Python IPC的奥秘,带你走进进程间通信的微观世界
【9月更文挑战第8天】在编程世界中,进程间通信(IPC)是连接不同程序或进程的关键技术,使数据在独立进程间自由流动,构建复杂软件系统。本文将深入探讨Python中的IPC机制,包括管道、消息队列、套接字等,并通过具体示例展示如何使用Socket实现网络IPC。Python的`multiprocessing`模块还提供了队列、管道和共享内存等多种高效IPC方式。通过本文,你将全面了解Python IPC的核心概念与应用技巧,助力开发高效协同的软件系统。
30 2
|
13天前
|
消息中间件 数据采集 数据库
庆祝吧!Python IPC让进程间的合作,比团队游戏还默契
【9月更文挑战第7天】在这个数字化时代,软件系统日益复杂,单进程已难以高效处理海量数据。Python IPC(进程间通信)技术应运而生,使多进程协作如同训练有素的电竞战队般默契。通过`multiprocessing`模块中的Pipe等功能,进程间可以直接传递数据,无需依赖低效的文件共享或数据库读写。此外,Python IPC还提供了消息队列、共享内存和套接字等多种机制,适用于不同场景,使进程间的合作更加高效、精准。这一技术革新让开发者能轻松应对复杂挑战,构建更健壮的软件系统。
26 1