ReentrantReadWriteLock的定义
ReentrantReadWriteLock
是一种读写锁,跟ReentrantLock
一样也是实现了Lock
,区别在于ReentrantLock
是独占锁,同一时刻只能有一个线程持有锁,ReentrantLock
在某些场景下可能会有并发性能的问题。而ReentrantReadWriteLock是独占锁(写锁)、共享锁(读锁)可以同时存在的一种读写锁,在读操作远大于写操作的场景中,能实现更好的并发性。当读锁存在时,其他线程仍然可以获取读锁并进行读操作,但是不能获得写锁进行写操作;当写锁存在时,其他线程的读锁、写锁都是不允许的。
使用举例
举个ReentrantReadWriteLock
的使用例子:
public class ReentrantReadWriteLockDemo {
private static final String THREAD_READ = "读线程";
private static final String THREAD_WRITE = "写线程";
public static void main(String[] args) {
Resource resource = new Resource();
//模拟三个线程去执行写操作
for (int i = 0; i < 3; i++) {
new Thread(new Task(resource), THREAD_WRITE + i).start();
}
//模拟10个线程去执行读操作
for (int i = 0; i < 10; i++) {
new Thread(new Task(resource), THREAD_READ + i).start();
}
}
public static class Task implements Runnable {
Resource resource;
Task(Resource resource) {
this.resource = resource;
}
@Override
public void run() {
String curThreadName = Thread.currentThread().getName();
Person person = new Person(curThreadName, new Random().nextInt(100));
if (curThreadName.startsWith(THREAD_READ)) {
//读操作
resource.get();
} else if (Thread.currentThread().getName().startsWith(THREAD_WRITE)) {
//写操作
resource.put(person, person.rank);
}
}
}
public static class Resource<K extends Comparable, V> {
TreeMap<K, V> rankMap = new TreeMap<>();
final ReadWriteLock rwLock = new ReentrantReadWriteLock();
final Lock readLock = rwLock.readLock(); // 读取锁
final Lock writeLock = rwLock.writeLock(); // 写入锁
//写入值
void put(K key, V value) {
try {
writeLock.lock();
System.out.println(Thread.currentThread().getName() + "准备写入数据");
Thread.sleep(new Random().nextInt(500));
System.out.println(Thread.currentThread().getName() + "写入数据完毕:" + key.toString());
rankMap.put(key, value);
} catch (Exception e) {
e.printStackTrace();
} finally {
writeLock.unlock();
}
}
//获取值
public List<K> get() {
try {
readLock.lock();
System.out.println(Thread.currentThread().getName() + "准备读取数据");
Thread.sleep(new Random().nextInt(500));
//treeMap中取出的数据是按rank从大到小排序的
List<K> list = new ArrayList<>(rankMap.keySet());
System.out.println(Thread.currentThread().getName() + "读取数据完毕:" + Arrays.toString(list.toArray()));
return list;
} catch (Exception e) {
e.printStackTrace();
} finally {
readLock.unlock();
}
return null;
}
}
static class Person implements Comparable<Person> {
public String name;//姓名
public int rank;//得分
Person(String name, int rank) {
this.name = name;
this.rank = rank;
}
@Override
public int compareTo(Person person) {
//分数少的在后面
if (rank <= person.rank) {
return 1;
}
return -1;
}
@Override
public String toString() {
return "name: " + name + ",rank: " + rank;
}
}
}
执行结果:
读线程4准备读取数据
读线程0准备读取数据
读线程3准备读取数据
读线程1准备读取数据
读线程2准备读取数据
读线程1读取数据完毕:[]
读线程4读取数据完毕:[]
读线程2读取数据完毕:[]
读线程3读取数据完毕:[]
读线程0读取数据完毕:[]
写线程1准备写入数据
写线程1写入数据完毕:name: 写线程1,rank: 83
写线程2准备写入数据
写线程2写入数据完毕:name: 写线程2,rank: 47
写线程0准备写入数据
写线程0写入数据完毕:name: 写线程0,rank: 55
读线程5准备读取数据
读线程6准备读取数据
读线程8准备读取数据
读线程9准备读取数据
读线程7准备读取数据
读线程8读取数据完毕:[name: 写线程1,rank: 83, name: 写线程0,rank: 55, name: 写线程2,rank: 47]
读线程9读取数据完毕:[name: 写线程1,rank: 83, name: 写线程0,rank: 55, name: 写线程2,rank: 47]
读线程6读取数据完毕:[name: 写线程1,rank: 83, name: 写线程0,rank: 55, name: 写线程2,rank: 47]
读线程7读取数据完毕:[name: 写线程1,rank: 83, name: 写线程0,rank: 55, name: 写线程2,rank: 47]
读线程5读取数据完毕:[name: 写线程1,rank: 83, name: 写线程0,rank: 55, name: 写线程2,rank: 47]
每次执行读写线程的顺序及数据可能不一样,但有一些结果是固定的:当有写线程操作时,其他线程不能进行任何操作,只能等写入完成后其他线程才能继续执行;但是当有读线程时,其他读线程同样可以执行读操作,但是此时不能进行写操作。
源码解析
UML类图:
如果熟悉ReentrantLock
实现的话,看到上面的类图也会感觉很熟悉,没错,ReentrantReadWriteLock
的底层也是通过AQS
实现的,不同的是ReentrantLock
只能用来做独占锁,而ReentrantReadWriteLock
是独占锁(写锁)、共享锁(读锁)共存的一种锁,那么他是如何实现的呢?我们通过看其源码实现来一探究竟:
public ReentrantReadWriteLock() {
this(false);
}
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock readLock() { return readerLock; }
abstract static class Sync extends AbstractQueuedSynchronizer {}
ReentrantReadWriteLock
构造方法中可以传入一个的boolean
类型参数fair
,表示是否是公平锁,默认是非公平锁,这里跟ReentrantLock
一样。在使用ReentrantReadWriteLock
时,分别通过writeLock()、readLock()
获取对应的写锁、读锁,他们对应于ReentrantReadWriteLock
的内部静态类WriteLock、ReadLock
,来看对应的实现:
public static class WriteLock implements Lock {
private final ReentrantReadWriteLock.Sync sync;
protected WriteLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquire(1);//独占锁
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);//独占锁
}
public boolean tryLock() {
return sync.tryWriteLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.release(1);//释放独占锁
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isHeldByCurrentThread() {
return sync.isHeldExclusively();
}
public int getHoldCount() {
return sync.getWriteHoldCount();
}
}
public static class ReadLock implements Lock {
private final ReentrantReadWriteLock.Sync sync;
protected ReadLock(ReentrantReadWriteLock lock) {
sync = lock.sync;
}
public void lock() {
sync.acquireShared(1);//共享锁
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);//共享锁
}
public boolean tryLock() {
return sync.tryReadLock();
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
public void unlock() {
sync.releaseShared(1);//释放共享锁
}
public Condition newCondition() {
throw new UnsupportedOperationException();
}
}
WriteLock
的实现类似于ReentrantLock
,都是独占锁,通过state
的0与大于等于1(大于1时是同一线程多次获取锁,即锁的重入性)来控制是否有线程占有锁;ReadLock
的实现类似于Semaphore
,都是共享锁,通过state
的0与非0来控制多个线程的访问。既然ReentrantReadWriteLock
既有独占锁,又有共享锁,那么ReentrantReadWriteLock
又是如何管理两者的呢?
读锁与写锁的关系
我们知道了读锁、写锁都是通过AQS
中的state
来控制线程的访问,其中WriteLock
通过Sync
的tryAcquire()
、ReadLock
通过Sync
的tryAcquireShared()
来尝试获取锁,我们直接看两者获取锁的实现:
static final int SHARED_SHIFT = 16;
static final int SHARED_UNIT = (1 << SHARED_SHIFT);
static final int MAX_COUNT = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
//共享锁持有的数量
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
//独占锁持有的数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
//1、如果读锁数量非空或者写锁数量非空并且持有者不是当前线程,直接返回,写锁获取失败,后续会加入到等待队列中
// (Note: if c != 0 and w == 0 then shared count != 0)
if (w == 0 || current != getExclusiveOwnerThread())
return false;
//2、如果当前持有数量超过最大值(65535),抛出异常
if (w + exclusiveCount(acquires) > MAX_COUNT)
throw new Error("Maximum lock count exceeded");
//3、如果该线程是重入获取或队列策略允许获取,则该线程就会尝试获取锁并更新当前锁持有的线程
setState(c + acquires);
return true;
}
if (writerShouldBlock() ||
!compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current);
return true;
}
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
//1、如果当前锁被其他线程的写锁持有,直接返回,获取读锁失败。
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);//获取当前读锁的数量
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//2、当前读锁不阻塞,且小于最大读锁数量,通过CAS尝试获取读锁
if (r == 0) {//当前线程第一个并且第一次获取读锁,
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) { //当前线程是第一次获取读锁的线程
firstReaderHoldCount++;
} else {
// 当前线程不是第一个获取读锁的线程,放入线程本地变量
HoldCounter rh = cachedHoldCounter;
if (rh == null || rh.tid != getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
//3、获取读锁成功后,会通过readHolds(ThreadLocalHoldCounter)来记录当前读锁所在线程的锁获取次数信息,本质上是通过ThreadLocal来保存一个Int变量来统计的。
return 1;
}
return fullTryAcquireShared(current);
}
写锁WriteLock
在尝试获取锁时,首先通过AQS
中的getState()
获取state
值,然后通过exclusiveCount(int)
对state
做了一次操作:
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
EXCLUSIVE_MASK
的值是65535(2的16次方减1)
,即state与低16位1做与操作
,结果是state的高16位都会变成0
,低16位的值作为其返回值,代表独占锁持有的数量。既然写锁用了state
的低16位,那么读锁是不是就用了state
的高16位呢?来看下读锁ReadLock
通过sharedCount(int)
操作的state
:
static int sharedCount(int c) { return c >>> SHARED_SHIFT; }
果然,读锁中对state
做右移16位的操作,即去掉了低16位,高16位的十进制数作为其返回值,代表共享读锁持有的数量。最终通过一个state
变量实现了对读锁和写锁的控制。
ReentrantReadWriteLock的写锁饥饿问题
ReentrantReadWriteLock
适用于读多写少的场景,我们知道当读锁存在的时候,写锁只能进入队列等待,那么如果队列前面有大量的读操作等待时,后面的写操作也只能等待前面的读操作都执行完才能执行写操作,所以可能会产生写操作很久得不到执行,数据不能更新,发生写锁饥饿的情况。
如何优化呢?ReadLock
可以认为是一个悲观读锁,这里的悲观是针对WriteLock
来说的,即ReadLock
存在时不允许WriteLock
进行写操作,因为写操作会改变数据源,进而影响读操作。那么能不能换个思路来想这个问题,读操作可以分为乐观读锁和悲观读锁,乐观读锁认为读操作时不会有写操作来改变数据,所以乐观读锁在读操作时并不会真正的去加锁,读操作时允许写操作执行,等读操作执行完再去校验数据的一致性;悲观读锁恰恰相反,一开始就会进行加锁,不允许读操作和写操作同时进行。
首先读操作先采用乐观读锁,即开始读操作不进行加锁,只是在读之前先获取数据对应的版本号,然后将数据copy
一份到读线程中,读操作结束后,通过数据版本判断当前读数据是否有效(在读操作进行时可能会有写线程去改变数据),如果有效,可以直接使用;否则说明在乐观读操作时有写操作改变了数据,那么使用悲观读锁ReadLock
进行加锁,再重新去读数据,此时拿到的一定是最新的数据。上述的思路已经在JDK1.8
引入的StampedLock
实现了,其执行多操作流程大致如下:
StampedLock lock = new StampedLock();
long stamp = lock.tryOptimisticRead(); //非阻塞获取版本信息
copyVaraibale2ThreadMemory();//拷贝变量到线程本地堆栈
if(!lock.validate(stamp)){ // 校验
long stamp = lock.readLock();//获取读锁
try {
copyVaraibale2ThreadMemory();//拷贝变量到线程本地堆栈
} finally {
lock.unlock(stamp);//释放悲观锁
}
}
因为读操作远大于写操作,StampedLock
中不加锁的读操作效率会更高,同时也能避免WriteLock
长时间得不到执行、发生写锁饥饿的情况。
总结
ReentrantReadWriteLock可以使一个资源同一时间被多个读线程访问,或者被一个写线程访问,但是两者不能同时进行。内部通过ReadLock
实现读锁,通过WriteLock
实现写锁
readLock.lock():
- 当有其他写线程在执行时(持有写锁),读锁获取会失败直到其他写线程释放了写锁;
- 本线程一旦获取了读锁,其他线程的写锁都不能获取只能等待所有的读锁都释放后才能尝试获取写锁
writeLock.lock():
- 当其他任何线程如果持有读锁或写锁时,本线程获取写锁失败直到其他线程释放了所有的读锁和写锁,本线程才有机会尝试获取写锁
- 一旦本线程获取了写锁,其他线程将不被允许获取任何读锁和写锁,直到本线程释放了写锁。
参考
【1】https://stackoverflow.com/questions/18354339/reentrantreadwritelock-whats-the-difference-between-readlock-and-writelock
【2】你真的了解 ReentrantReadWriteLock 吗?
【3】【Java并发工具类】ReadWriteLock
【4】死磕 java同步系列之ReentrantReadWriteLock源码解析
【5】J.U.C之读写锁:ReentrantReadWriteLock