Python 网络编程(三)
join方法
该方法将等待,一直到它调用的线程终止. 它的名字表示调用的线程会一直等待,直到指定的线程加入它.
当一个进程启动之后,会默认产生一个主线程,因为线程是程序执行流的 小单元,当设置多线程时,主线程会创建多个子线程,在python中,默认情况下(其实就是setDaemon(False)),主线程执行完自己的任务以后,就退出了,此时子线程会继续执行自己的任务,直到自己的任务结束
而join所完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后,主线程在终止,需要注意的是,不要启动线程后立即join(),很容易造成串行运行,导致并发失效
函数写法
# -*- coding: UTF-8 -*- # 文件名:join_a.py # 方法包装-启动多线程 # 导入模块 from threading import Thread from time import sleep, time def run(name): '''执行任务''' print("Threading:{} start".format(name)) sleep(3) print("Threading:{} end".format(name)) if __name__ == '__main__': '''入口''' # 开始时间 start = time() # 创建线程列表 t_list = [] # 循环创建线程 for i in range(10): t = Thread(target=run, args=('t{}'.format(i),)) t.start() t_list.append(t) # 等待线程结束 for t in t_list: t.join() # 计算使用时间 end = time() - start print(end)
类写法
# -*- coding: UTF-8 -*- # 文件名:join_b.py # 类包装-启动多线程 from threading import Thread from time import sleep, time class MyThread(Thread): def __init__(self, name): Thread.__init__(self) self.name = name def run(self): print("Threading:{} start".format(self.name)) sleep(3) print("Threading:{} end".format(self.name)) if __name__ == '__main__': '''入口''' # 开始时间 start = time() # 创建线程列表 t_list = [] # 循环创建线程 for i in range(10): t = MyThread(f"t{i}") t.start() t_list.append(t) # 等待线程结束 for t in t_list: t.join() # 计算时间 end = time() - start
我们可以看到,join方法下,我们等待子线程执行完后,结束的主线程。主线程在完成自己任务同时,阻塞状态下,等待子线程完成后结束进程。
setDaemon
将线程声明为守护线程,必须在start() 方法调用之前设置, 如果不设置为守护线程程序会被无限挂起。这个方法基本和join是相反的。当我们 在程序运行中,执行一个主线程,如果主线程又创建一个子线程,主线程和子线程 就分兵两路,分别运行,那么当主线程完成想退出时,会检验子线程是否完成。如 果子线程未完成,则主线程会等待子线程完成后再退出。但是有时候我们需要的是 只要主线程完成了,不管子线程是否完成,都要和主线程一起退出,这时就可以 用setDaemon方法
如果没有用户线程,那么守护线程也没有存活下去的意义了
from threading import Thread import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': t1 = Thread(target=foo) t2 = Thread(target=bar) t1.daemon = True t1.start() t2.start() print("main----------")
主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束。
主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。
这就是通过Thread中join方法,对每个线程都调用了join方法,让主线程等待子线程的完成
join方法还有一个可选的超时参数timeout。如果进程没有正常退出或者通过某个异常退出,且超时的情况下,主线程就不再等待子线程了。由于join的返回值始终是None,所以当在join方法中有超时参数的情况下,join返回后无法判断子线程是否已经结束。这个时候,则必须使用Thread类中的isAlive方法来判断是否发生了超时。
在join中,使用join方法的时候,还需要注意以下问题。
在超时参数不存在的情况下,join操作将会一直阻塞,直到线程终止。
一个线程可以多次使用join方法。
线程不能在自己的运行代码中调用join方法,否则会造成死锁。
在线程调用start方法之前使用join方法,将会出现错误。
下面给出threading模块中Thread类的常用方法。
总结
每个线程⼀定会有⼀个名字,尽管上⾯的例⼦中没有指定线程对象的 name,但是python会⾃动为线程指定
⼀个名字。
当线程的run()⽅法结束时该线程完成。
⽆法控制线程调度程序,但可以通过别的⽅式来影响线程调度的⽅式。
线程的⼏种状态:新建-> 就绪 -> 运行->等待(阻塞) ->中止(死亡)
同步
同步就是协同步调,按预定的先后次序进⾏运⾏。如:你先走,我再走。
同步的简单方法就是使用锁机制。这在低级thread模块和高级threading模块中都有提供。当然,threading模块中的锁机制也是基于thread模块实现的。在Python中,这是 低层次的数据同步原语。一个锁总是处于下面两种状态之中:“已锁”和“未锁”。为此提供了两种操作:“加锁”和“解锁”,分别用来改变锁的状态。对于一个锁来说,如果是未锁的状态,则线程在进入部分将此锁使用“加锁”操作将其状态变为“已锁”。
由于同一进程中的所有线程都是共享数据的,如果对线程中共享数据的并发访问不加以限制,结果将不可预期,在严重的情况下,还会产生死锁。
irom threading import Thread import time g_num = 100 def work1(): global g_num for i in range(3): g_num += 1 print("----in work1, g_num is %d---"%g_num) def work2(): global g_num print("----in work2, g_num is %d---"%g_num) print("---线程创建之前g_num is %d---"%g_num) t1 = Thread(target=work1) t1.start() #延时一会,保证t1线程中的事情做完 time.sleep(1) t2 = Thread(target=work2) t2.start()
看运行结果:
---线程创建之前g_num is 100--- ----in work1, g_num is 103--- ----in work2, g_num is 103--- Process finished with exit code 0
列表当做实参传递到线程中
from threading import Thread import time def work_1(nums): nums.append(44) print("----in work1---",nums) def work_2(nums): # 延时一会,保证t1线程中的事情做完 time.sleep(1) print("----in work2---",nums) g_nums = [11,22,33] t1 = Thread(target=work_1, args=(g_nums,)) t1.start() t2 = Thread(target=work_2, args=(g_nums,)) t2.start()
在一个进程内的所有线程共享全局变量,能够在不使用其他方式的前提下完成多线程之间的数据共享(这点要比多进程要好)
缺点就是,线程是对全局变量随意遂改可能造成多线程之间对全局变量的混乱(即线程非安全)
为了解决这个问题,需要允许线程独占地访问共享数据,这就是线程同步。需要注意的是,这些问题在进程中也是存在的,只是在多线程环境下更见常见而已。
有时候需要在每个线程中使用各自独立的变量,一个显而易见的方法就是每个线程都使用自己的私有变量。为了方便,Python中提供了一种简单的机制threading.local来解决这个问题。其使用方法也很简单,其成员变量就是在每个线程中不同的。看代码
import threading import time,random def func1(): local = threading.local() time.sleep(random.random()) # 随机休眠时间 local.number = [1] for _ in range(10): # 加入随机数 local.number.append(random.choice(range(10))) # 打印当前的线程对象,值 print(threading.currentThread(), local.number) if __name__ == '__main__': threads = [] for i in range(5): t = threading.Thread(target=func1) t.start() threads.append(t) for i in range(5): threads[i].join()
在ThreadLocal类中使用了threading.local()生成了类局部变量。此变量将在不同的线程中保存为不同的值。ThreadLocal类中的run方法主要是将10个随机数放到前面生成的局部变量中,并打印出来。
# 下面是这段代码执行的一种结果。 <Thread(Thread-4, started 19172)> [1, 0, 9, 6, 3, 7, 9, 2, 1, 9, 1] <Thread(Thread-3, started 13448)> [1, 1, 3, 4, 7, 7, 6, 7, 0, 1, 6] <Thread(Thread-5, started 10988)> [1, 9, 2, 9, 2, 3, 7, 7, 3, 2, 7] <Thread(Thread-1, started 13584)> [1, 2, 5, 6, 5, 3, 0, 0, 9, 2, 4] <Thread(Thread-2, started 3736)> [1, 0, 4, 6, 3, 9, 3, 3, 6, 6, 3] Process finished with exit code 0 # 从上面的输出结果中可以看到,每个线程都有自己不同的值。
同步锁与GIL的关系
GIL本质是一把互斥锁,但GIL锁住的是解释器级别的数据
同步锁,锁的是解释器以外的共享资源,例如:硬盘上的文件 控制台,对于这种不属于解释器的数据资源就应该自己加锁处理
Python的线程在GIL的控制之下,线程之间,对整个python解释器,对python提供的C API的访问都是互斥的,这可以看作是Python内核级的互斥机制。但是这种互斥是我们不能控制的,我们还需要另外一种可控的互斥机制 ———用户级互斥。内核级通过互斥保护了内核的共享资源,同样,用户级互斥保护了用户程序中的共享资源。
GIL 的作用是:对于一个解释器,只能有一个thread在执行bytecode。所以每时每刻只有一条bytecode在被执行一个thread。GIL保证了bytecode 这层面上是thread safe的。但是如果你有个操作比如 x += 1,这个操作需要多个bytecodes操作,在执行这个操作的多条bytecodes期间的时候可能中途就换thread了,这样就出现了data races的情况了。
假设两个线程t1和t2都要对num=0进行增1运算,t1和t2都各对num修改10次,num的 终的结果应该为20。但是由于是多线程访问,有可能出现下面情况:
在num=0时,t1取得num=0。此时系统把t1调度为”sleeping”状态,把t2转换为”running”状态,t2也获得 num=0。然后t2对得到的值进行加1并赋给num,使得num=1。然后系统又把t2调度为”sleeping”,把t1转为”running”。线程t1又把它之前得到的0加1后赋值给num。这样,明明t1和t2都完成了1次加1工作,但结果仍然是num=1。
from threading import Thread def func1(name): print('Threading:{} start'.format(name)) global num for i in range(50000000): # 有问题 # for i in range(5000): # 无问题 num += 1 print('Threading:{} end num={}'.format(name, num)) if __name__ == '__main__': num = 0 # 创建线程列表 t_list = [] # 循环创建线程 for i in range(5): t = Thread(target=func1, args=('t{}'.format(i),)) t.start() t_list.append(t) # 等待线程结束 for t in t_list: t.join()
运行结果(可能不一样,但是结果往往不是正确的结果):
数字设置小一些之后,再次运行结果就是正确的:
问题产生的原因就是没有控制多个线程对同一资源的访问,对数据造成破坏,使得线程运行的结果不可预期。这种现象称为“线程不安全”。
同步就是协同步调,按预定的先后次序进行运行。如:你说完,我再说。
"同"字从字面上容易理解为一起动作
其实不是,"同"字应是指协同、协助、互相配合。
如进程、线程同步,可理解为进程或线程A和B一块配合,A执行到一定程度时要依靠B的某个结果,于是停下来,示意B运行;B依言执行,再将结果给A;A再继续操作。这既是同步,
当线程间共享全局变量,多个线程对该变量执行不同的操作时,该变量 终的结果可能是不确定的(每次线程执行后的结果不同),如:对变量执行加减操作,变量的值是不确定的,要想变量的值是一个确定的需对线程执行的代码段加锁。
python对线程加锁主要有Lock和Rlock模块
看下面的代码:
from threading import Thread, Lock def func1(name): print('Threading:{} start'.format(name)) global num lock.acquire() for i in range(1000000): # 有问题 # for i in range(5000): # 无问题 num += 1 lock.release() print('Threading:{} end num={}'.format(name, num)) if __name__ == '__main__': # 创建锁 lock = Lock() num = 0 # 创建线程列表 t_list = [] # 循环创建线程 for i in range(5): t = Thread(target=func1, args=('t{}'.format(i),)) t.start() t_list.append(t) # 等待线程结束 # for t in t_list: # t.join()
注意:
加锁还可以使用with 效果一样
必须使用同一把锁
如果使用锁,程序会变成串行,因此应该是在适当的地方加锁 线程调度本质上是不确定的,因此,在多线程程序中错误地使用锁机制可能会导致随机数 据损坏或者其他的异常行为,我们称之为竞争条件。为了避免竞争条件, 好只在临界区(对 临界资源进行操作的那部分代码)使用锁
对于全局变量,在多线程中要格外小心,否则容易造成数据错乱的情况发生,那对非全局变量是否要加锁呢?我们来看看下面两段代码:
代码1:
import threading import time class MyThread(threading.Thread): # 重写 构造方法 def __init__(self, num, sleepTime): threading.Thread.__init__(self) self.num = num self.sleepTime = sleepTime def run(self): self.num += 1 time.sleep(self.sleepTime) print('线程(%s),num=%d' % (self.name, self.num)) if __name__ == '__main__': mutex = threading.Lock() t1 = MyThread(100, 5) t1.start() t2 = MyThread(200, 1) t2.start()
运行结果:
线程(Thread-2),num=201 线程(Thread-1),num=101 Process finished with exit code 0
代码2:
import threading from time import sleep def test(sleepTime): num = 1 sleep(sleepTime) num += 1 print('---(%s)--num=%d' % (threading.current_thread(), num)) t1 = threading.Thread(target=test, args=(5,)) t2 = threading.Thread(target=test, args=(1,)) t1.start() t2.start()
运行结果:
---(<Thread(Thread-2, started 9872)>)--num=2 ---(<Thread(Thread-1, started 2340)>)--num=2 Process finished with exit code 0
我们看到运行结果,与程序上的对比,实际上对于局部变量,是独立非共享的程序,每一个线程拿到的值,是相对独立的,在各自的线程内。所以运行的结果,在第二段代码上非常明显,不同的暂停时间上,运算num += 1结果是一致的。
在多线程开发中,全局变量是多个线程都共享的数据,而局部变量等是各自线程的,是非共享的
死锁
在多线程程序中,死锁问题很大一部分是由于线程同时获取多个锁造成的。
在线程间共享多个资源的时候,如果两个线程分别占有一部分资源并且同时等待对方的资源,就会造成死锁。
尽管死锁很少发生,但一旦发生就会造成应用的停止响应。看代码代码:
import threading import time class MyThread1(threading.Thread): def run(self): if mutexA.acquire(): print(self.name+'----do1---up----') time.sleep(1) if mutexB.acquire(): print(self.name+'----do1---down----') mutexB.release() mutexA.release() class MyThread2(threading.Thread): def run(self): if mutexB.acquire(): print(self.name+'----do2---up----') time.sleep(1) if mutexA.acquire(): print(self.name+'----do2---down----') mutexA.release() mutexB.release() mutexA = threading.Lock() mutexB = threading.Lock() if __name__ == '__main__': t1 = MyThread1() t2 = MyThread2() t1.start() t2.start()
看运行结果:
Thread-1----do1---up---- Thread-2----do2---up---- Process finished with exit code -1
此时已经进入死锁状态,命令行可以可以使用ctrl-z退出,
当我们执行线程t1与t2时候,我们的程序,第一步就是对双方所需占用的资源上锁,我们可以获取到上锁后的一部分信息,至此程序运转继续,等待对方的资源解锁,但是解锁操作无法完成,因为解锁条件就是对方完成解锁才能解锁。
产生死锁的四个必要条件:
互斥条件:一个资源每次只能被一个线程使用。
请求与保持条件:一个线程因请求资源而阻塞时,对已获得的资源保持不放。
不剥夺条件:线程已获得的资源,在末使用完之前,不能强行剥夺。
循环等待条件:若干线程之间形成一种头尾相接的循环等待资源关系。
代码:
from threading import Thread,Lock from time import sleep class Task1(Thread): def run(self): while True: if lock1.acquire(): print("------Task 1 -----") sleep(0.5) lock2.release() class Task2(Thread): def run(self): while True: if lock2.acquire(): print("------Task 2 -----") sleep(0.5) lock3.release() class Task3(Thread): def run(self): while True: if lock3.acquire(): print("------Task 3 -----") sleep(0.5) lock1.release() # 使用Lock创建出的锁默认没有“锁上” lock1 = Lock() # 创建另外一把锁,并且“锁上” lock2 = Lock() lock2.acquire() # 创建另外一把锁,并且“锁上” lock3 = Lock() lock3.acquire() t1 = Task1() t2 = Task2() t3 = Task3() t1.start() t2.start() t3.start()
可以使用互斥锁完成多个任务,有序的进程工作,这就是线程的同步,按照指定的顺序执行指定的任务。
线程间的通信
信号量
我们都知道在加锁的情况下,程序就变成了串行,也就是单线程,而有时,我们在不用考虑数据安全时,不用加锁,程序就变成了并行,也就是多线程。为了避免业务开启过多的线程时。我们就可以通过信号量(Semaphore)来设置指定个数的线程。举个简单例子:车站有3 个安检口,那么同时只能有3 个人安检,别人来了,只能等着别人安检完才可以过。
from threading import Thread, BoundedSemaphore from time import sleep def an_jian(num): semapshore.acquire() print('第{}个人安检完成!'.format(num)) sleep(2) semapshore.release() if __name__ == '__main__': semapshore = BoundedSemaphore(3) for i in range(20): thread = Thread(target=an_jian, args=(i,)) thread.start()
Queue
从一个线程向另一个线程发送数据 安全的方式可能就是使用queue 库中的队列了。创建一个被多个线程共享的 Queue 对象,这些线程通过使用put() 和get() 操作来向队列中添加或者删除元素。Queue 对象已经包含了必要的锁,所以你可以通过它在多个线程间多安全地共享数据。
Python 包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。 Queue 中包含以下方法:
Queue.qsize() 返回队列的大小
Queue.empty() 如果队列为空,返回True,反之False
Queue.full() 如果队列满了,返回True,反之False
Queue.full 与maxsize 大小对应
Queue.get([block[, timeout]])获取队列,timeout 等待时间
Queue.get_nowait() 相当Queue.get(False)
Queue.put(item) 写入队列,timeout 等待时间
Queue.put_nowait(item) 相当Queue.put(item, False)
Queue.taskdone() 在完成一项工作之后,Queue.taskdone()函数向任务已经完成的队列发送一个信号
Queue.join() 实际上意味着等到队列为空,再执行别的操作
Python的Queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语(可以理解为原子操作,即要么不做,要么就做完),能够在多线程中直接使用。可以使用队列来实现线程间的同步。
用FIFO队列实现上述生产者与消费者问题的代码如下:
import threading import time from queue import Queue class Producer(threading.Thread): def run(self): global queue count = 0 while True: if queue.qsize() < 1000: for i in range(100): count = count + 1 msg = '生成产品' + str(count) queue.put(msg) print(msg) time.sleep(0.5) class Consumer(threading.Thread): def run(self): global queue while True: if queue.qsize() > 100: for i in range(3): msg = self.name + '消费了 ' + queue.get() print(msg) time.sleep(1) if __name__ == '__main__': queue = Queue() for i in range(500): queue.put('初始产品' + str(i)) for i in range(2): p = Producer() p.start() for i in range(5): c = Consumer() c.start()
对于Queue,在多线程通信之间扮演重要的角色
添加数据到队列中,使用put()方法
从队列中取数据,使用get()方法
判断队列中是否还有数据,使用qsize()方法
在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
上面的代码如果不加以人工干预,本代码将会一直执行下去。从输出结果来看,除了 初还有部分剩余产品外,后面只要产品生产出来后就被消费了。这也是可以解释的,因为消费者消费产品的速度要快于生产者生产产品的速度。
事件Event
线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行
Event()可以创建一个事件管理标志,该标志(event)默认为False,event 对象主要有四种方法可以调用:
event.wait(timeout=None):调用该方法的线程会被阻塞,如果设置了timeout 参数,超时后,线程会停止阻塞继续执行;
event.set():将event 的标志设置为True,调用wait 方法的所有线程将被唤醒;
event.clear():将event 的标志设置为False,调用wait 方法的所有线程将被阻塞;
event.is_set():判断event 的标志是否为True。
import threading import time import random #线程1,门,一开始是打开的,每3秒需要自动关闭一下。如果有人通过,需要重新刷卡打开 #线程2,人,人通过门,如果门是打开的直接通过,如果没有打开需要刷卡。之后门就已经大开了,通知人继续进入 event = threading.Event() #创建一个事件 event.set() # 设置标志位真 ,门一开始就是打开的 status = 0 # status代表门的状态,如果是0~3代表打开,如果等于3,需要关闭 def door(): global status while True: print("当前门的status为:{}".format(status)) if status>=3: print("当门已经打开了3秒,需要自动关闭") event.clear() if event.is_set(): print('当前门是开着的,可以通行!') else: print('门已经关了,请用户自己刷卡!') event.wait() # 门的线程阻塞等待 continue time.sleep(1) status+=1 # status代表们开始的秒数 def person(): global status n =0 # 人的计数器,看看有多少人进入到门里面 while True: n+=1 if event.is_set(): print('门开着,{}号人进入门里面'.format(n)) else: print('门关着,{}号人刷卡之后,进入门里面'.format(n)) event.set() # 标志改为true status =0 time.sleep(random.randint(1,10)) if __name__ == '__main__': d = threading.Thread(target=door) p = threading.Thread(target=person) d.start() p.start()
异步
同步: 就是女儿叫你起床,你还没有起来,她一直在床边等着也不去早读,等你起来之后,再去早读。
异步:就是女儿叫你起床,你还没有起来,她对着你房间喊一声“起床了”,然后就不管了,自己去早读了。
异步应用
为完成某个任务,不同程序单元之间过程中无需通信协调,也能完成任务的方式。
不相关的程序单元之间可以是异步的。
例如,爬虫下载网页。调度程序调用下载程序后,即可调度其他任务,而无需与该下载任务保持通信以协调行为。不同网页的下载、保存等操作都是无关的,也无需相互通知协调。这些异步操作的完成时刻并不确定。异步意味着无序。
如果在某程序的运行时,能根据已经执行的指令准确判断它接下来要进行哪个具体操作,那它是同步程序,反之则为异步程序。(无序与有序的区别)
同步/异步、阻塞/非阻塞并非水火不容,要看讨论的程序所处的封装级别。例如购物程序在处理多个用户的浏览请求可以是异步的,而更新库存时必须是同步的
rom multiprocessing import Pool import time import os def test(): print("---进程池中的进程---pid=%d,ppid=%d--" % (os.getpid(), os.getppid())) for i in range(3): print("----%d---" % i) time.sleep(1) return "hahah" def test2(args): print("---callback func--pid=%d" % os.getpid()) print("---callback func--args=%s" % args) if __name__ == '__main__': # 创建进程池及制定数量 pool = Pool(3) # 非阻塞支持回调 pool.apply_async(func=test, callback=test2) time.sleep(5) print("----主进程-pid=%d----" % os.getpid()
我们看到,异步调用实际想要同时结束进程,但是线程池中的其他进程有需要执行任务,等待执行任务完成结束去与其他进程回合,这里的参数通过回调,直至程序结束。
协程
协程(coroutine),又称为微线程,纤程。(协程是一种用户态的轻量级线程)
作用:在执行A 函数的时候,可以随时中断,去执行B 函数,然后中断继续执行A 函数(可以自动切换),注意这一过程并不是函数调用(没有调用语句),过程很像多线程,然而协程只有一个线程在执行
对于单线程下,我们不可避免程序中出现io 操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io 阻塞时就将寄存器上下文和栈保存到其他地方,切换到另外一个任务去计算。在任务切回来的时候,恢复先前保存的寄存器上下文和栈,这样就保证了该线程能够 大限度地处于就绪态,即随时都可以被cpu 执行的状态,相当于我们在用户程序级别将自己的io 操作 大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io 比较少,从而更多的将cpu的执行权限分配给我们的线程(注意:线程是CPU 控制的,而协程是程序自身控制的)
协作的标准
必须在只有一个单线程里实现并发
修改共享数据不需加锁
用户程序里自己保存多个控制流的上下文栈
一个协程遇到 IO 操作自动切换到其它协程
由于自身带有上下文和栈,无需线程上下文切换的开销,属于程序级别的切换,操作系统完全感知不到,因而更加轻量级;无需原子操作的锁定及同步的开销;方便切换控制流,简化编程模型,单线程内就可以实现并发的效果,大限度地利用cpu,且可扩展性高,成本低(注:一个CPU 支持上万的协程都不是问题。所以很适合用于高并发处理)
他的缺点:无法利用多核资源:协程的本质是个单线程,它不能同时将单个CPU 的多个核用上,协程需要和进程配合,才能运行在多CPU 上.当然我们日常所编写的绝大部分应用都没有这个必要,除非是cpu 密集型应用。进行阻塞(Blocking)操作(如IO 时)会阻塞掉整个程序
greenlet
from greenlet import greenlet def attack(name): print(f'{name} :我要买包!') # 2 gree_b.switch('吕布') # 3 print(f'{name} :我要去学编程!') # 6 gree_b.switch() # 7 def player(name): print(f'{name} :买买买!! ') # 4 gree_a.switch() # 5 print(f'{name} :一定去马士兵教育!!!!') # 8 gree_a = greenlet(attack) gree_b = greenlet(player) gree_a.switch('貂蝉') # 可以在第一次switch 时传入参数,以后都不需要#1
Gevent 模块
Gevent 是一个第三方库,可以轻松通过gevent 实现并发同步或异步编程,在gevent中用到的主要模式是
Greenlet,它是以C 扩展模块形式接入Python 的轻量级协程。Greenlet 全部运行在主程序操作系统进程的内部,但他们被协作式地调度。
当一个greenlet遇到IO操作时,比如访问网络/睡眠等待,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就保证总有greenlet在运行,而不是等待IO。同时也因为只有一个线程在执行,会极大的减少上下文切换的成本。
安装模块
pip install gevent
代码:
import gevent def gf(name): print(f'{name} :我要买包!') # 2 gevent.sleep(2) # 3 print(f'{name} :我要去学编程!') # 6 def bf(name): print(f'{name} :买买买!! ') # 4 gevent.sleep(2) print(f'{name} :一定去马士兵教育!!!!') # 8 geven_a = gevent.spawn(gf, '小乔') geven_b = gevent.spawn(bf, name='周瑜') gevent.joinall([geven_a, geven_b])
注意:上例gevent.sleep(2)模拟的是gevent 可以识别的io 阻塞;
而time.sleep(2)或其他的阻塞,gevent 是不能直接识别的,需要加入一行代码monkey.patch_all(),这行代码需在 time,socket 模块之前。
async io 异步IO asyncio 是python3.4 之后的协程模块,是python 实现并发重要的包,这个包使用事件循环驱动实现并发。事件循环是一种处理多并发量的有效方式,在维基百科中它被描述为「一种等待程序分配事件或消息的编程架构」,我们可以定义事件循环来简化使用轮询方法来监控事件,通俗的说法就是「当A 发生时,执行B」。事件循环利用poller 对象,使得程序员不用控制任务的添加、删除和事件的控制。事件循环使用回调方法来知道事件的发生。
看代码
件名:asyn_b.py import asyncio @asyncio.coroutine # python3.5 之前 def func_a(): for i in range(5): print('协程——a!!') yield from asyncio.sleep(1) async def func_b(): # python3.5 之后 for i in range(5): print('协程——b!!!') await asyncio.sleep(2) # 创建协程对象 asy_a = func_a() asy_b= func_b() # 获取事件循环 loop = asyncio.get_event_loop() # 监听事件循环 loop.run_until_complete(asyncio.gather(asy_a, asy_b)) # 关闭事件 loop.close()
协程的嵌套:使用async可以定义协程,协程用于耗时的io操作,我们也可以封装更多的io操作过程,这样就实现了嵌套的协程,即一个协程中await了另外一个协程,如此连接起来。
import asyncio async def compute(x, y): print(f"compute: {x}+{y} ...") await asyncio.sleep(1) return x + y async def print_sum(x, y): result = await compute(x, y) print(f"{x}+{y}={result}") loop = asyncio.get_event_loop() loop.run_until_complete(print_sum(1, 2)) loop.close()
总结
串行、并行与并发的区别
- 并行:指的是任务数小于等于 cpu核数,即任务真的是一起执行的
- 并发:指的是任务数多余 cpu核数,通过操作系统的各种任务调度算法,实现用多个任务“一起”执行(实际上总有一些任务不在执行,因为切换任务的速度相当快,看上去一起执行而已)
进程与线程的区别
线程是程序执行的 小单位,而进程是操作系统分配资源的 小单位;
一个进程由一个或多个线程组成,线程是一个进程中代码的不同执行路线;
进程之间相互独立,但同一进程下的各个线程之间共享程序的内存空间(包括代码段、数据集、堆等)及一些进程级的资源(如打开文件和信号),某进程内的线程在其它进程不可见;
调度和切换:线程上下文切换比进程上下文切换要快得多。
有一个老板想要开个工厂进行生产某件商品(例如:手机)他需要花一些财力物力制作一条生产线,这个生产线上有很多的器件以及材料这些所有的为了能够生产手机而准备的资源称之为:进程。只有生产线是不能够进行生产的,所以老板的找个工人来进行生产,这个工人能够利用这些材料 终一步步的将手机做出来,这个来做事情的工人称之为:线程这个老板为了提高生产率,想到 3种办法:
在这条生产线上多招些工人,一起来做手机,这样效率是成倍増长,即单进程多线程方式
老板发现这条生产线上的工人不是越多越好,因为一条生产线的资源以及材料毕竟有限,所以老板又花了些财力物力购置了另外一条生产线,然后再招些工人这样效率又再一步提高了,即多进程多线程方式
老板发现,现在已经有了很多条生产线,并且每条生产线上已经有很多工人了(即程序是多进程的,每个进程中又有多个线程),为了再次提高效率,老板想了个损招,规定:如果某个员工在上班时临时没事或者再等待某些条件(比如等待另一个工人生产完谋道工序之后他才能再次工作),那么这个员工就利用这个时间去做其它的事情,那么也就是说:如果一个线程等待某些条件,可以充分利用这个时间去做其它事情,其实这就是:协程方式
进程:拥有自己独立的堆和栈,既不共享堆,也不共享栈,进程由操作系统调度;进程切换需要的资源很 大,效率很低 线程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,标准线程由操作系统调度;线程切换需要的资源一般,效率一般(当然了在不考虑 GIL的情况下) 协程:拥有自己独立的栈和共享的堆,共享堆,不共享栈,协程由程序员在协程的代码里显示调度;协程切换任务资源很小,效率高 多进程、多线程根据 cpu核数不一样可能是并行的,但是协程是在一个线程中所以是并发 选择技术考虑的因素:切换的效率、数据共享的问题、数据安全、是否需要并发