Pthreads 中的互斥量
Pthreads 提供了一些功能用来同步线程。最基本的机制是使用互斥量变量,可以锁定和解锁,用来保护每个关键区域。希望进入关键区域的线程首先要尝试获取 mutex。如果 mutex 没有加锁,线程能够马上进入并且互斥量能够自动锁定,从而阻止其他线程进入。如果 mutex 已经加锁,调用线程会阻塞,直到 mutex 解锁。如果多个线程在相同的互斥量上等待,当互斥量解锁时,只有一个线程能够进入并且重新加锁。这些锁并不是必须的,程序员需要正确使用它们。
下面是与互斥量有关的函数调用
向我们想象中的一样,mutex 能够被创建和销毁,扮演这两个角色的分别是 Phread_mutex_init
和 Pthread_mutex_destroy
。mutex 也可以通过 Pthread_mutex_lock
来进行加锁,如果互斥量已经加锁,则会阻塞调用者。还有一个调用Pthread_mutex_trylock
用来尝试对线程加锁,当 mutex 已经被加锁时,会返回一个错误代码而不是阻塞调用者。这个调用允许线程有效的进行忙等。最后,Pthread_mutex_unlock
会对 mutex 解锁并且释放一个正在等待的线程。
除了互斥量以外,Pthreads
还提供了第二种同步机制:条件变量(condition variables)
。mutex 可以很好的允许或阻止对关键区域的访问。条件变量允许线程由于未满足某些条件而阻塞。绝大多数情况下这两种方法是一起使用的。下面我们进一步来研究线程、互斥量、条件变量之间的关联。
下面再来重新认识一下生产者和消费者问题:一个线程将东西放在一个缓冲区内,由另一个线程将它们取出。如果生产者发现缓冲区没有空槽可以使用了,生产者线程会阻塞起来直到有一个线程可以使用。生产者使用 mutex 来进行原子性检查从而不受其他线程干扰。但是当发现缓冲区已经满了以后,生产者需要一种方法来阻塞自己并在以后被唤醒。这便是条件变量做的工作。
下面是一些与条件变量有关的最重要的 pthread 调用
上表中给出了一些调用用来创建和销毁条件变量。条件变量上的主要属性是 Pthread_cond_wait
和 Pthread_cond_signal
。前者阻塞调用线程,直到其他线程发出信号为止(使用后者调用)。阻塞的线程通常需要等待唤醒的信号以此来释放资源或者执行某些其他活动。只有这样阻塞的线程才能继续工作。条件变量允许等待与阻塞原子性的进程。Pthread_cond_broadcast
用来唤醒多个阻塞的、需要等待信号唤醒的线程。
“需要注意的是,条件变量(不像是信号量)不会存在于内存中。如果将一个信号量传递给一个没有线程等待的条件变量,那么这个信号就会丢失,这个需要注意
下面是一个使用互斥量和条件变量的例子
#include <stdio.h> #include <pthread.h> #define MAX 1000000000 /* 需要生产的数量 */ pthread_mutex_t the_mutex; pthread_cond_t condc,condp; /* 使用信号量 */ int buffer = 0; void *producer(void *ptr){ /* 生产数据 */ int i; for(int i = 0;i <= MAX;i++){ pthread_mutex_lock(&the_mutex); /* 缓冲区独占访问,也就是使用 mutex 获取锁 */ while(buffer != 0){ pthread_cond_wait(&condp,&the_mutex); } buffer = i; /* 把他们放在缓冲区中 */ pthread_cond_signal(&condc); /* 唤醒消费者 */ pthread_mutex_unlock(&the_mutex); /* 释放缓冲区 */ } pthread_exit(0); } void *consumer(void *ptr){ /* 消费数据 */ int i; for(int i = 0;i <= MAX;i++){ pthread_mutex_lock(&the_mutex); /* 缓冲区独占访问,也就是使用 mutex 获取锁 */ while(buffer == 0){ pthread_cond_wait(&condc,&the_mutex); } buffer = 0; /* 把他们从缓冲区中取出 */ pthread_cond_signal(&condp); /* 唤醒生产者 */ pthread_mutex_unlock(&the_mutex); /* 释放缓冲区 */ } pthread_exit(0); }
管程
为了能够编写更加准确无误的程序,Brinch Hansen 和 Hoare 提出了一个更高级的同步原语叫做 管程(monitor)
。他们两个人的提案略有不同,通过下面的描述你就可以知道。管程是程序、变量和数据结构等组成的一个集合,它们组成一个特殊的模块或者包。进程可以在任何需要的时候调用管程中的程序,但是它们不能从管程外部访问数据结构和程序。下面展示了一种抽象的,类似 Pascal 语言展示的简洁的管程。不能用 C 语言进行描述,因为管程是语言概念而 C 语言并不支持管程。
monitor example integer i; condition c; procedure producer(); . . . end; procedure consumer(); . end; end monitor;
管程有一个很重要的特性,即在任何时候管程中只能有一个活跃的进程,这一特性使管程能够很方便的实现互斥操作。管程是编程语言的特性,所以编译器知道它们的特殊性,因此可以采用与其他过程调用不同的方法来处理对管程的调用。通常情况下,当进程调用管程中的程序时,该程序的前几条指令会检查管程中是否有其他活跃的进程。如果有的话,调用进程将被挂起,直到另一个进程离开管程才将其唤醒。如果没有活跃进程在使用管程,那么该调用进程才可以进入。
进入管程中的互斥由编译器负责,但是一种通用做法是使用 互斥量(mutex)
和 二进制信号量(binary semaphore)
。由于编译器而不是程序员在操作,因此出错的几率会大大降低。在任何时候,编写管程的程序员都无需关心编译器是如何处理的。他只需要知道将所有的临界区转换成为管程过程即可。绝不会有两个进程同时执行临界区中的代码。
即使管程提供了一种简单的方式来实现互斥,但在我们看来,这还不够。因为我们还需要一种在进程无法执行被阻塞。在生产者-消费者问题中,很容易将针对缓冲区满和缓冲区空的测试放在管程程序中,但是生产者在发现缓冲区满的时候该如何阻塞呢?
解决的办法是引入条件变量(condition variables)
以及相关的两个操作 wait
和 signal
。当一个管程程序发现它不能运行时(例如,生产者发现缓冲区已满),它会在某个条件变量(如 full)上执行 wait
操作。这个操作造成调用进程阻塞,并且还将另一个以前等在管程之外的进程调入管程。在前面的 pthread 中我们已经探讨过条件变量的实现细节了。另一个进程,比如消费者可以通过执行 signal
来唤醒阻塞的调用进程。
“Brinch Hansen 和 Hoare 在对进程唤醒上有所不同,Hoare 建议让新唤醒的进程继续运行;而挂起另外的进程。而 Brinch Hansen 建议让执行 signal 的进程必须退出管程,这里我们采用 Brinch Hansen 的建议,因为它在概念上更简单,并且更容易实现。
如果在一个条件变量上有若干进程都在等待,则在对该条件执行 signal 操作后,系统调度程序只能选择其中一个进程恢复运行。
顺便提一下,这里还有上面两位教授没有提出的第三种方式,它的理论是让执行 signal 的进程继续运行,等待这个进程退出管程时,其他进程才能进入管程。
条件变量不是计数器。条件变量也不能像信号量那样积累信号以便以后使用。所以,如果向一个条件变量发送信号,但是该条件变量上没有等待进程,那么信号将会丢失。也就是说,wait 操作必须在 signal 之前执行。
下面是一个使用 Pascal
语言通过管程实现的生产者-消费者问题的解法
monitor ProducerConsumer condition full,empty; integer count; procedure insert(item:integer); begin if count = N then wait(full); insert_item(item); count := count + 1; if count = 1 then signal(empty); end; function remove:integer; begin if count = 0 then wait(empty); remove = remove_item; count := count - 1; if count = N - 1 then signal(full); end; count := 0; end monitor; procedure producer; begin while true do begin item = produce_item; ProducerConsumer.insert(item); end end; procedure consumer; begin while true do begin item = ProducerConsumer.remove; consume_item(item); end end;
读者可能觉得 wait 和 signal 操作看起来像是前面提到的 sleep 和 wakeup ,而且后者存在严重的竞争条件。它们确实很像,但是有个关键的区别:sleep 和 wakeup 之所以会失败是因为当一个进程想睡眠时,另一个进程试图去唤醒它。使用管程则不会发生这种情况。管程程序的自动互斥保证了这一点,如果管程过程中的生产者发现缓冲区已满,它将能够完成 wait 操作而不用担心调度程序可能会在 wait 完成之前切换到消费者。甚至,在 wait 执行完成并且把生产者标志为不可运行之前,是不会允许消费者进入管程的。
尽管类 Pascal 是一种想象的语言,但还是有一些真正的编程语言支持,比如 Java (终于轮到大 Java 出场了),Java 是能够支持管程的,它是一种 面向对象
的语言,支持用户级线程,还允许将方法划分为类。只要将关键字 synchronized
关键字加到方法中即可。Java 能够保证一旦某个线程执行该方法,就不允许其他线程执行该对象中的任何 synchronized 方法。没有关键字 synchronized ,就不能保证没有交叉执行。
下面是 Java 使用管程解决的生产者-消费者问题
public class ProducerConsumer { static final int N = 100; // 定义缓冲区大小的长度 static Producer p = new Producer(); // 初始化一个新的生产者线程 static Consumer c = new Consumer(); // 初始化一个新的消费者线程 static Our_monitor mon = new Our_monitor(); // 初始化一个管程 static class Producer extends Thread{ public void run(){ // run 包含了线程代码 int item; while(true){ // 生产者循环 item = produce_item(); mon.insert(item); } } private int produce_item(){...} // 生产代码 } static class consumer extends Thread { public void run( ) { // run 包含了线程代码 int item; while(true){ item = mon.remove(); consume_item(item); } } private int produce_item(){...} // 消费代码 } static class Our_monitor { // 这是管程 private int buffer[] = new int[N]; private int count = 0,lo = 0,hi = 0; // 计数器和索引 private synchronized void insert(int val){ if(count == N){ go_to_sleep(); // 如果缓冲区是满的,则进入休眠 } buffer[hi] = val; // 向缓冲区插入内容 hi = (hi + 1) % N; // 找到下一个槽的为止 count = count + 1; // 缓冲区中的数目自增 1 if(count == 1){ notify(); // 如果消费者睡眠,则唤醒 } } private synchronized void remove(int val){ int val; if(count == 0){ go_to_sleep(); // 缓冲区是空的,进入休眠 } val = buffer[lo]; // 从缓冲区取出数据 lo = (lo + 1) % N; // 设置待取出数据项的槽 count = count - 1; // 缓冲区中的数据项数目减 1 if(count = N - 1){ notify(); // 如果生产者睡眠,唤醒它 } return val; } private void go_to_sleep() { try{ wait( ); }catch(Interr uptedExceptionexc) {}; } } }
上面的代码中主要设计四个类,外部类(outer class)
ProducerConsumer 创建并启动两个线程,p 和 c。第二个类和第三个类 Producer
和 Consumer
分别包含生产者和消费者代码。最后,Our_monitor
是管程,它有两个同步线程,用于在共享缓冲区中插入和取出数据。
在前面的所有例子中,生产者和消费者线程在功能上与它们是相同的。生产者有一个无限循环,该无限循环产生数据并将数据放入公共缓冲区中;消费者也有一个等价的无限循环,该无限循环用于从缓冲区取出数据并完成一系列工作。
程序中比较耐人寻味的就是 Our_monitor
了,它包含缓冲区、管理变量以及两个同步方法。当生产者在 insert 内活动时,它保证消费者不能在 remove 方法中运行,从而保证更新变量以及缓冲区的安全性,并且不用担心竞争条件。变量 count 记录在缓冲区中数据的数量。变量 lo
是缓冲区槽的序号,指出将要取出的下一个数据项。类似地,hi
是缓冲区中下一个要放入的数据项序号。允许 lo = hi,含义是在缓冲区中有 0 个或 N 个数据。
Java 中的同步方法与其他经典管程有本质差别:Java 没有内嵌的条件变量。然而,Java 提供了 wait 和 notify 分别与 sleep 和 wakeup 等价。
通过临界区自动的互斥,管程比信号量更容易保证并行编程的正确性。但是管程也有缺点,我们前面说到过管程是一个编程语言的概念,编译器必须要识别管程并用某种方式对其互斥作出保证。C、Pascal 以及大多数其他编程语言都没有管程,所以不能依靠编译器来遵守互斥规则。
与管程和信号量有关的另一个问题是,这些机制都是设计用来解决访问共享内存的一个或多个 CPU 上的互斥问题的。通过将信号量放在共享内存中并用 TSL
或 XCHG
指令来保护它们,可以避免竞争。但是如果是在分布式系统中,可能同时具有多个 CPU 的情况,并且每个 CPU 都有自己的私有内存呢,它们通过网络相连,那么这些原语将会失效。因为信号量太低级了,而管程在少数几种编程语言之外无法使用,所以还需要其他方法。
消息传递
上面提到的其他方法就是 消息传递(messaage passing)
。这种进程间通信的方法使用两个原语 send
和 receive
,它们像信号量而不像管程,是系统调用而不是语言级别。示例如下
send(destination, &message); receive(source, &message);
send 方法用于向一个给定的目标发送一条消息,receive 从一个给定的源接受一条消息。如果没有消息,接受者可能被阻塞,直到接受一条消息或者带着错误码返回。
消息传递系统的设计要点
消息传递系统现在面临着许多信号量和管程所未涉及的问题和设计难点,尤其对那些在网络中不同机器上的通信状况。例如,消息有可能被网络丢失。为了防止消息丢失,发送方和接收方可以达成一致:一旦接受到消息后,接收方马上回送一条特殊的 确认(acknowledgement)
消息。如果发送方在一段时间间隔内未收到确认,则重发消息。
现在考虑消息本身被正确接收,而返回给发送着的确认消息丢失的情况。发送者将重发消息,这样接受者将收到两次相同的消息。
对于接收者来说,如何区分新的消息和一条重发的老消息是非常重要的。通常采用在每条原始消息中嵌入一个连续的序号来解决此问题。如果接受者收到一条消息,它具有与前面某一条消息一样的序号,就知道这条消息是重复的,可以忽略。
消息系统还必须处理如何命名进程的问题,以便在发送或接收调用中清晰的指明进程。身份验证(authentication)
也是一个问题,比如客户端怎么知道它是在与一个真正的文件服务器通信,从发送方到接收方的信息有可能被中间人所篡改。
用消息传递解决生产者-消费者问题
现在我们考虑如何使用消息传递来解决生产者-消费者问题,而不是共享缓存。下面是一种解决方式
#define N 100 /* buffer 中槽的数量 */ void producer(void){ int item; message m; /* buffer 中槽的数量 */ while(TRUE){ item = produce_item(); /* 生成放入缓冲区的数据 */ receive(consumer,&m); /* 等待消费者发送空缓冲区 */ build_message(&m,item); /* 建立一个待发送的消息 */ send(consumer,&m); /* 发送给消费者 */ } } void consumer(void){ int item,i; message m; for(int i = 0;i < N;i++){ /* 循环N次 */ send(producer,&m); /* 发送N个缓冲区 */ } while(TRUE){ receive(producer,&m); /* 接受包含数据的消息 */ item = extract_item(&m); /* 将数据从消息中提取出来 */ send(producer,&m); /* 将空缓冲区发送回生产者 */ consume_item(item); /* 处理数据 */ } }
假设所有的消息都有相同的大小,并且在尚未接受到发出的消息时,由操作系统自动进行缓冲。在该解决方案中共使用 N 条消息,这就类似于一块共享内存缓冲区的 N 个槽。消费者首先将 N 条空消息发送给生产者。当生产者向消费者传递一个数据项时,它取走一条空消息并返回一条填充了内容的消息。通过这种方式,系统中总的消息数量保持不变,所以消息都可以存放在事先确定数量的内存中。
如果生产者的速度要比消费者快,则所有的消息最终都将被填满,等待消费者,生产者将被阻塞,等待返回一条空消息。如果消费者速度快,那么情况将正相反:所有的消息均为空,等待生产者来填充,消费者将被阻塞,以等待一条填充过的消息。
消息传递的方式有许多变体,下面先介绍如何对消息进行 编址
。
- 一种方法是为每个进程分配一个唯一的地址,让消息按进程的地址编址。
- 另一种方式是引入一个新的数据结构,称为
信箱(mailbox)
,信箱是一个用来对一定的数据进行缓冲的数据结构,信箱中消息的设置方法也有多种,典型的方法是在信箱创建时确定消息的数量。在使用信箱时,在 send 和 receive 调用的地址参数就是信箱的地址,而不是进程的地址。当一个进程试图向一个满的信箱发送消息时,它将被挂起,直到信箱中有消息被取走,从而为新的消息腾出地址空间。
屏障
最后一个同步机制是准备用于进程组而不是进程间的生产者-消费者情况的。在某些应用中划分了若干阶段,并且规定,除非所有的进程都就绪准备着手下一个阶段,否则任何进程都不能进入下一个阶段,可以通过在每个阶段的结尾安装一个 屏障(barrier)
来实现这种行为。当一个进程到达屏障时,它会被屏障所拦截,直到所有的屏障都到达为止。屏障可用于一组进程同步,如下图所示
在上图中我们可以看到,有四个进程接近屏障,这意味着每个进程都在进行运算,但是还没有到达每个阶段的结尾。过了一段时间后,A、B、D 三个进程都到达了屏障,各自的进程被挂起,但此时还不能进入下一个阶段呢,因为进程 B 还没有执行完毕。结果,当最后一个 C 到达屏障后,这个进程组才能够进入下一个阶段。
避免锁:读-复制-更新
最快的锁是根本没有锁。问题在于没有锁的情况下,我们是否允许对共享数据结构的并发读写进行访问。答案当然是不可以。假设进程 A 正在对一个数字数组进行排序,而进程 B 正在计算其平均值,而此时你进行 A 的移动,会导致 B 会多次读到重复值,而某些值根本没有遇到过。
然而,在某些情况下,我们可以允许写操作来更新数据结构,即便还有其他的进程正在使用。窍门在于确保每个读操作要么读取旧的版本,要么读取新的版本,例如下面的树
上面的树中,读操作从根部到叶子遍历整个树。加入一个新节点 X 后,为了实现这一操作,我们要让这个节点在树中可见之前使它"恰好正确":我们对节点 X 中的所有值进行初始化,包括它的子节点指针。然后通过原子写操作,使 X 称为 A 的子节点。所有的读操作都不会读到前后不一致的版本
在上面的图中,我们接着移除 B 和 D。首先,将 A 的左子节点指针指向 C 。所有原本在 A 中的读操作将会后续读到节点 C ,而永远不会读到 B 和 D。也就是说,它们将只会读取到新版数据。同样,所有当前在 B 和 D 中的读操作将继续按照原始的数据结构指针并且读取旧版数据。所有操作均能正确运行,我们不需要锁住任何东西。而不需要锁住数据就能够移除 B 和 D 的主要原因就是 读-复制-更新(Ready-Copy-Update,RCU)
,将更新过程中的移除和再分配过程分离开。