多线程引言
多线程处理,是 Python 乃至很多编程语言中比较复杂的概念,随着 CPU 的多核心、计算速度越来越快、各类网络应用等的出现,对于多个线程的运用,可以有效的提高程序的处理性能和速度。
有很多讨论 Python 线程、进程和协程的书和资料,有的概念说的不太清楚,有的例子举得太复杂,因此在研究和实践之后,斗胆也讨论一下这个略有复杂的话题,希望不要误人子弟。
线程
线程的标准定义如下:
线程(thread)是操作系统能够进行运算调度的最小单位。它被包含在进程之中,是进程中的实际运作单位。一条线程指的是进程中一个单一顺序的控制流,一个进程中可以并发多个线程,每条线程并行执行不同的任务。
在多核或多CPU,或支持 Hyper-threading 的CPU上使用多线程程序设计的好处是显而易见,即提高了程序的执行吞吐率。在单 CPU 单核的计算机上,使用多线程技术,也可以把进程中负责 IO 处理、人机交互而常被阻塞的部分与密集计算的部分分开来执行,编写专门的 workhorse 线程执行密集计算,从而提高了程序的执行效率。
Python 是解释性语言
像 C/C++这样的语言是编译型语言,程序输入到编译器,编译器再根据语言的语法进行解析,然后翻译成语言独立的中间表示,最终链接成具有高度优化的机器码的可执行程序。编译器之所以可以深层次的对代码进行优化,是因为它可以看到整个程序(或者一大块独立的部分)。这使得它可以对不同的语言指令之间的交互进行推理,从而给出更有效的优化手段。
Python 程序的执行是解释型的,检查语法、翻译成中间状态等也会做,但是不会把整个程序翻译成机器码,可以理解为一行行去执行代码。目前的全栈语言 JavaScript 以及非常适合开发网站的 PHP 都是解释型语言。
现在的 CPU 4核、8核都是常规了,要想利用多核系统,Python必须支持多线程运行。作为解释型语言,Python的解释器必须做到既安全又高效。多线程编程会遇到的问题是解释器要避免在不同的线程操作内部共享的数据。同时它还要保证在管理用户线程时保证总是有最大化的计算资源
Python 线程切换机制
Python 支持多线程,有两种模式,一种是协作多任务(cooperative multitasking),另一种是抢占式多任务(preemptive multitasking)。
Python 的协作多任务机制是当一个线程开始 sleep 或者进行 I/O 操作时,另一个线程就有机会拿到GIL锁,开始执行它的代码。 Python 的抢占式多任务机制是每隔 15ms 进行监测,尝试收回 GIL。
由于多线程执行时,存在线程的切换,当多个线程同时运行时,如果能保证运行结果符合预期,就是线程安全的。
和操作系统进行进程调度类似,当进程执行一段时间之后,发生时钟中断,操作系统响应时钟中断,并在这时进行进程调度。而 Python 中也是通过软件模拟了这种时钟中断,来激活线程调度。
下面是一个重要的概念,关于线程安全,在说这个之前,先来看这个例子:
# 导入需要的库
>>> import threading
>>> count = 0
>>> def run():
... global count
... for i in range(1000000):
... count = count + 1
>>> t1 = threading.Thread(target=run)
>>> t2 = threading.Thread(target=run)
>>> t1.start()
>>> t2.start()
>>> t1.join()
>>> t2.join()
>>> print(count)
1373573
两个线程轮流执行一个加法程序,感觉答案应该是2000000,可以,你会发现每次都小于2000000。这是为什么呢?
比如在 count 是 20 的时候,线程 t1 读取了 count,t1 读到的是 20,这时候 CPU 将控制权给了另一个线程 t2。 t2线程读到的 count 也是 20,然后 t2 加1,写回21。线程回到 t1的时候,t1 将前面读到的20也加1,还是21写回。本来应该连个线程各加1次,等于22的,现在成了21。
所以说在这个例子里,只要 CPU 从线程拿走控制权的时候正好是在读完值的时候,就会发生这样的情况。这就是多线程下对全局变量的写操作不是线程安全的现象和原因。
Python 线程安全
因为线程被切换时候,线程的写操作会被中断,所以我们要考虑线程安全这个问题,否则多线程的程序的运行结果就会出错。
天生线程安全
天生线程安全,就是线程代码中只对全局对象进行读操作,而不存在写操作。这种情况下,不论线程在何处中断,都不会影响各个线程本来的执行逻辑。
实现原子操作
在一个线程中,有时需要保证某一行或者某一段代码的逻辑是不可中断的,也就是说要保证这段代码执行的原子性。
Python 内建的数据类型(list,dict等)的共享变量进行操作,就是原子操作。
比如下面这些操作都是原子的,不用担心多线程切换时候的问题
- list.append(x)
- list1.extend(list2)
- x = list[i]
- x = list.pop()
- list.sort()
- x = y
执行代码的前后加互斥锁
我们修改一下刚才的两个进程的加法例子:
最简单的办法就是引入 threading 模块中的 Lock(),然后在 count 计算这里前面加上锁,后面加上释放。
>>> import threading
>>> lock = threading.Lock()
>>> count = 0
>>> def run():
... global count
... for i in range(1000000):
... # 加锁
... lock.acquire()
... count += 1
... lock.release()
>>> t1 = threading.Thread(target=run)
>>> t2 = threading.Thread(target=run)
>>> t1.start()
>>> t2.start()
>>> t1.join()
>>> t2.join()
>>> print(count)
2000000
也可以使用with语句来实现同样功能。在使用锁的时候,with语句会在进入语句块之前自动的获取到该锁对象,然后在语句块执行完成后自动释放掉锁。如同在打开文件时候的 with 语句一样,这样比较简洁也安全。
>>> import threading
>>> lock = threading.Lock()
>>> count = 0
>>> def run():
... global count
... for i in range(1000000):
... # 使用 with 来进行加锁
... with lock:
... count += 1
>>> t1 = threading.Thread(target=run)
>>> t2 = threading.Thread(target=run)
>>> t1.start()
>>> t2.start()
>>> t1.join()
>>> t2.join()
>>> print(count)
2000000
锁的操作还是略复杂的,除了简单的直接锁以外,还有RLock,简单锁即便是线程本身也会发生阻塞,RLock 只有在其他线程访问时才会发生阻塞。
信号量 (Semaphores) 是一个更高级的锁机制。信号量内部有一个计数器而不像锁对象内部有锁标识,而且只有当占用信号量的线程数超过信号量时线程才阻塞。这允许了多个线程可以同时访问相同的代码区。
当信号量被获取的时候,计数器减小;当信号量被释放的时候,计数器增大。当获取信号量的时候,如果计数器值为0,则该进程将阻塞。当某一信号量被释放,counter值增加为1时,被阻塞的线程(如果有的话)中会有一个得以继续运行。
信号量通常被用来限制对容量有限的资源的访问,比如一个网络连接或者数据库服务器。在这类场景中,只需要将计数器初始化为最大值,信号量的实现将为你完成剩下的事情。
用 Semaphores 可以实现类似线程池的功能。当然我们其实有更简单的办法来实现线程池,后面会说到。
实现线程同步
线程同步是在锁的基础来实现的。通过锁来对各个线程的执行顺序进行控制。一个线程需要等待其它线程完成特定任务之后,才能执行。多个线程之间有依赖关系。比如抓取网站数据,然后分析处理,写入数据库,就可以通过线程同步来实现。
待续
摘自本人与同事所著《Python 机器学习实战》一书