前言
上文【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock我们了解到,ReentrantLock是一个互斥排他的重入锁,读和读、读和写、写和写不能同时进行。但在很多场景下,读多写少,我们希望能并发读,这时候ReentrantReadWriteLock就派上用场了,是专门针对这种场景设计的。
接下来我们一起来学习下ReentrantReadWriteLock。
ReentrantReadWriteLock
/**
* Creates a new {@code ReentrantReadWriteLock} with
* default (nonfair) ordering properties.
*/
public ReentrantReadWriteLock() {
this(false);
}
/**
* Creates a new {@code ReentrantReadWriteLock} with
* the given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
我们可以看到和ReentrantLock一样,ReentrantReadWriteLock也使用了通过AQS实现的FairSync和NonfairSync模式
有两个成员变量锁ReadLock和WriteLock
ReadLock::lock
获取读锁,不死不休
public void lock() {
sync.acquireShared(1);
}
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
@ReservedStackAccess
protected final int tryAcquireShared(int unused) {
/*
* Walkthrough:
* 1. If write lock held by another thread, fail.
* 2. Otherwise, this thread is eligible for
* lock wrt state, so ask if it should block
* because of queue policy. If not, try
* to grant by CASing state and updating count.
* Note that step does not check for reentrant
* acquires, which is postponed to full version
* to avoid having to check hold count in
* the more typical non-reentrant case.
* 3. If step 2 fails either because thread
* apparently not eligible or CAS fails or count
* saturated, chain to version with full retry loop.
*/
Thread current = Thread.currentThread();
int c = getState();
//如果已经有写锁,且不是当前线程持有的,则加读锁失败
//如果当前线程已经持有写锁,则可以获取读锁,这就是锁降级
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c);
/**
* 判断读线程是否阻塞,取决于队列的策略
* 公平锁策略:如果当前同步队列不为空且当前线程不是队列的第一个节点,则阻塞。
* 非公平锁策略:如果当前队列的第一个节点时写锁,则需要阻塞。这样是为了防止写锁饥饿。
* 如果不需要阻塞,且读锁数未达到最大值 则尝试通过cas的方式获取锁
*/
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
//如果当前读锁为0,则当前线程获取锁
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
//如过第一个读锁的持有者是当前线程,则firstReaderHoldCount数量加一
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
//如果最后一个获取锁的线程不是当前线程
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
//获取当前线程的锁
cachedHoldCounter = rh = readHolds.get();
//如果当前最后一个线程获取锁数量为0,则将其设置为当前线程的holdcounter
else if (rh.count == 0)
readHolds.set(rh);
//读锁数+1
rh.count++;
}
return 1;
}
//尝试无限循环获取读锁
return fullTryAcquireShared(current);
}
final int fullTryAcquireShared(Thread current) {
/*
* This code is in part redundant with that in
* tryAcquireShared but is simpler overall by not
* complicating tryAcquireShared with interactions between
* retries and lazily reading hold counts.
*/
HoldCounter rh = null;
for (;;) {
int c = getState();
//如果已经有写锁,且不是当前线程持有的,返回-1
if (exclusiveCount(c) != 0) {
if (getExclusiveOwnerThread() != current)
return -1;
// else we hold the exclusive lock; blocking here
// would cause deadlock.
//如果需要阻塞
} else if (readerShouldBlock()) {
// Make sure we're not acquiring read lock reentrantly
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
} else {
if (rh == null) {
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current)) {
//如果当前线程持有的锁数为0,则移除
rh = readHolds.get();
if (rh.count == 0)
readHolds.remove();
}
}
if (rh.count == 0)
return -1;
}
}
if (sharedCount(c) == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (sharedCount(c) == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
if (rh == null)
rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
cachedHoldCounter = rh; // cache for release
}
return 1;
}
}
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED);
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor();
//无限循环,直到当前线程是队列的头结点,则尝试获取读锁
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//获取锁成功后,将当前线程从队列头结点移除
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
ReadLock::lockInterruptibly
获取读锁,直到成功或被中断
public void lockInterruptibly() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
//如果收到中断信号,则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//如果尝试获取锁失败,则循环等待获取锁
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
//无限循环,直到当前线程是队列的头结点,则尝试获取读锁
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
//获取锁失败的话则需要进行中断检测,检测到中断信号则抛出异常
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
ReadLock::tryLock
//尝试获取读锁,如果有写锁获取失败,则直接返回失败
public boolean tryLock() {
return sync.tryReadLock();
}
@ReservedStackAccess
final boolean tryReadLock() {
Thread current = Thread.currentThread();
for (;;) {
int c = getState();
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return false;
int r = sharedCount(c);
if (r == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
if (compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) {
firstReader = current;
firstReaderHoldCount = 1;
} else if (firstReader == current) {
firstReaderHoldCount++;
} else {
HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return true;
}
}
}
//尝试获取读锁,获取失败或者超时未获取到的话,则返回失败
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
private boolean doAcquireSharedNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout;
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
//排到当前线程的话则尝试获取锁
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);
p.next = null; // help GC
return true;
}
}
//超时返回false
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) {
cancelAcquire(node);
return false;
}
//阻塞当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
//如果被中断
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
ReadLock::unlock
释放锁
public void unlock() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected final boolean tryReleaseShared(int unused) {
Thread current = Thread.currentThread();
//如果当前线程是第一个持有读锁的
if (firstReader == current) {
// assert firstReaderHoldCount > 0;
//如果是唯一一个持有读锁的,则firstReader设置为null
if (firstReaderHoldCount == 1)
firstReader = null;
//firstReaderHoldCount减一,
else
firstReaderHoldCount--;
} else {
HoldCounter rh = cachedHoldCounter;
//如果不是最后一个持有读锁的线程
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
//从ThreadLocal获取readHolds
rh = readHolds.get();
int count = rh.count;
//如果小于等于1,则移除readHolds
if (count <= 1) {
readHolds.remove();
if (count <= 0)
throw unmatchedUnlockException();
}
//持有锁的数量减一
--rh.count;
}
for (;;) {
//将state设置为0,原因是在写锁降级为读锁后,释放读锁时,需要将state设为0,方便后续的写锁竞争。
int c = getState();
int nextc = c - SHARED_UNIT;
if (compareAndSetState(c, nextc))
// Releasing the read lock has no effect on readers,
// but it may allow waiting writers to proceed if
// both read and write locks are now free.
return nextc == 0;
}
}
private void doReleaseShared() {
/*
* Ensure that a release propagates, even if there are other
* in-progress acquires/releases. This proceeds in the usual
* way of trying to unparkSuccessor of head if it needs
* signal. But if it does not, status is set to PROPAGATE to
* ensure that upon release, propagation continues.
* Additionally, we must loop in case a new node is added
* while we are doing this. Also, unlike other uses of
* unparkSuccessor, we need to know if CAS to reset status
* fails, if so rechecking.
*/
for (;;) {
Node h = head;
//如果头结点不是null,并且队列不为空
if (h != null && h != tail) {
int ws = h.waitStatus;
//如果当前结点是SIGNAL信号
if (ws == Node.SIGNAL) {
//唤醒头结点
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
WriteLock::lock
获取写锁,如果获取失败,则加入等待队列
具体方法和ReentrantLock调用的方法相同,可参考【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock
public void lock() {
sync.acquire(1);
}
WriteLock::lockInterruptibly
获取写锁,如果获取失败,则加入等待队列,直到获取到或被中断
具体方法和ReentrantLock调用的方法相同,可参考【从入门到放弃-Java】并发编程-JUC-locks-ReentrantLock
WriteLock::tryLock
public boolean tryLock() {
return sync.tryWriteLock();
}
@ReservedStackAccess
final boolean tryWriteLock() {
Thread current = Thread.currentThread();
int c = getState();
//如果存在写锁,且写锁不是当前线程持有的,则返回false
if (c != 0) {
int w = exclusiveCount(c);
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w == MAX_COUNT)
throw new Error("Maximum lock count exceeded");
}
//如果不存在写锁或是当前线程获取的写锁,则尝试将state加一
if (!compareAndSetState(c, c + 1))
return false;
//设置持有写锁的线程为当前线程
setExclusiveOwnerThread(current);
return true;
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
//和ReentrantLock的调用方法一样,不再赘述
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
WriteLock::unlock
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
//如果不是当前线程持有的写锁,抛出异常
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
int nextc = getState() - releases;
//判断持有的写锁是否释放完毕
boolean free = exclusiveCount(nextc) == 0;
//如果释放完毕,则将当前持有锁的线程设置为null
if (free)
setExclusiveOwnerThread(null);
//设置持有的锁数量减一
setState(nextc);
return free;
}
总结
通过源码分析,我们了解到,可以通过ReentrantReadWriteLock可以获取读锁和写锁。
- 写锁是互斥锁,只能一个线程持有,写锁和ReentrantLock类似
- 读锁是共享锁,可以多个线程同时持有。
- 读锁通过firstReader和cachedHoldCounter优化获取、释放锁的性能。使用ThreadLocal readHolds存放所有持有锁线程的tid和持有锁数量。
- 线程可以将自己持有的写锁降级为读锁,在释放读锁时,一起释放。
更多文章
见我的博客:https://nc2era.com
written by AloofJr,转载请注明出处