前言
ReentrantLock是非常常用的锁,在前面【从入门到放弃-Java】并发编程-JUC-LinkedBlockingQueue在我们了解到,LinkedBlockingQueue入队、出队都是依赖ReentrantLock进行锁同步和线程唤醒、等待的。
本文来学习下ReentrantLock。
ReentrantLock
/**
* Creates an instance of {@code ReentrantLock}.
* This is equivalent to using {@code ReentrantLock(false)}.
*/
public ReentrantLock() {
sync = new NonfairSync();
}
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
通过构造函数,我们可以看到可以根据参数fair,生成公平的同步和不公平的同步模式。
接下来需要看下FairSync和NonfairSync到底是何方神圣
Sync
Sync是一个抽象类,FairSync和NonfairSync都继承自Sync并实现了tryAcquire方法,tryAcquire是在AbstractQueuedSynchronizer(AQS)中声明的。
AbstractQueuedSynchronizer中的方法非常多,我们通过ReentrantLock中各方法的调用来逐步熟悉它。
ReentrantLock::lock
public void lock() {
sync.acquire(1);
}
请求锁,如果加锁失败则一直等待。
ReentrantLock中加锁的方法非常简洁,直接调用sync的acquire方法
下面我们看下acquire的具体实现。
AbstractQueuedSynchronizer::acquire
/**
* Acquires in exclusive mode, ignoring interrupts. Implemented
* by invoking at least once {@link #tryAcquire},
* returning on success. Otherwise the thread is queued, possibly
* repeatedly blocking and unblocking, invoking {@link
* #tryAcquire} until success. This method can be used
* to implement method {@link Lock#lock}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
*/
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
//会一直等待,直到获取到锁为止
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
- 首先尝试获取锁(具体实现下面分析),获取成功函数结束
- 获取失败,则加入等待队列一直自旋尝试获取锁直到获取成功或超时。
- 如果获取失败,则抛出中断异常
NonfairSync::tryAcquire
/**
* Sync object for non-fair locks
*/
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
/**
* Performs non-fair tryLock. tryAcquire is implemented in
* subclasses, but both need nonfair try for trylock method.
*/
@ReservedStackAccess
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果state = 0 即当前没加锁,则尝试通过CAS的方式加锁,加锁后将持有锁的线程设置为当前线程
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前有锁,则判断是否是当前线程的锁,是的话state加一,即重入锁,不是的话 返回加锁失败
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
- 如果没锁,则尝试获取锁
- 如果有锁,判断是否是当前线程持有的
- 是当前线程持有,则state值加1 返回加锁成功。即重入锁
- 不是当前线程持有,则加锁失败
FairSync::tryAcquire
/**
* Fair version of tryAcquire. Don't grant access unless
* recursive call or no waiters or is first.
*/
@ReservedStackAccess
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果没锁
if (c == 0) {
//如果队列中没有比当前线程等待更久的线程,则尝试通过CAS的方式获取锁
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//如果当前有锁,则判断是否是当前线程的锁,是的话state加一,即重入锁,不是的话 返回加锁失败
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
/**
* Queries whether any threads have been waiting to acquire longer
* than the current thread.
*
* <p>An invocation of this method is equivalent to (but may be
* more efficient than):
* <pre> {@code
* getFirstQueuedThread() != Thread.currentThread()
* && hasQueuedThreads()}</pre>
*
* <p>Note that because cancellations due to interrupts and
* timeouts may occur at any time, a {@code true} return does not
* guarantee that some other thread will acquire before the current
* thread. Likewise, it is possible for another thread to win a
* race to enqueue after this method has returned {@code false},
* due to the queue being empty.
*
* <p>This method is designed to be used by a fair synchronizer to
* avoid <a href="AbstractQueuedSynchronizer.html#barging">barging</a>.
* Such a synchronizer's {@link #tryAcquire} method should return
* {@code false}, and its {@link #tryAcquireShared} method should
* return a negative value, if this method returns {@code true}
* (unless this is a reentrant acquire). For example, the {@code
* tryAcquire} method for a fair, reentrant, exclusive mode
* synchronizer might look like this:
*
* <pre> {@code
* protected boolean tryAcquire(int arg) {
* if (isHeldExclusively()) {
* // A reentrant acquire; increment hold count
* return true;
* } else if (hasQueuedPredecessors()) {
* return false;
* } else {
* // try to acquire normally
* }
* }}</pre>
*
* @return {@code true} if there is a queued thread preceding the
* current thread, and {@code false} if the current thread
* is at the head of the queue or the queue is empty
* @since 1.7
*/
public final boolean hasQueuedPredecessors() {
Node h, s;
//如果队列不为空
if ((h = head) != null) {
//看当前线程是不是在队列的队首,即排队时间最长的队列
if ((s = h.next) == null || s.waitStatus > 0) {
s = null; // traverse in case of concurrent cancellation
//这里为啥从队列尾部开始向前遍历???可能是因为队列头部可能会有大量超时的节点,从后往前遍历更快?
for (Node p = tail; p != h && p != null; p = p.prev) {
if (p.waitStatus <= 0)
s = p;
}
}
if (s != null && s.thread != Thread.currentThread())
return true;
}
return false;
}
ReentrantLock::lockInterruptibly
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
/**
* Acquires in exclusive mode, aborting if interrupted.
* Implemented by first checking interrupt status, then invoking
* at least once {@link #tryAcquire}, returning on
* success. Otherwise the thread is queued, possibly repeatedly
* blocking and unblocking, invoking {@link #tryAcquire}
* until success or the thread is interrupted. This method can be
* used to implement method {@link Lock#lockInterruptibly}.
*
* @param arg the acquire argument. This value is conveyed to
* {@link #tryAcquire} but is otherwise uninterpreted and
* can represent anything you like.
* @throws InterruptedException if the current thread is interrupted
*/
public final void acquireInterruptibly(int arg)
throws InterruptedException {
//如果线程被重点,抛出异常
if (Thread.interrupted())
throw new InterruptedException();
//如果获取锁失败
if (!tryAcquire(arg))
doAcquireInterruptibly(arg);
}
/**
* Acquires in exclusive interruptible mode.
* @param arg the acquire argument
*/
private void doAcquireInterruptibly(int arg)
throws InterruptedException {
//new一个新节点
final Node node = addWaiter(Node.EXCLUSIVE);
try {
//轮询直到获取到锁或者线程被中断
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
请求锁,如果失败则一直阻塞等待 直到获取锁或线程中断
ReentrantLock::tryLock
public boolean tryLock() {
//尝试获取锁,获取失败的话 直接返回false,不会再等待
return sync.nonfairTryAcquire(1);
}
public boolean tryLock(long timeout, TimeUnit unit)
throws InterruptedException {
//尝试获取锁,如果失败的话,等待timeout时间后返回false,如果被中断则抛出异常
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public final boolean tryAcquireNanos(int arg, long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
return tryAcquire(arg) ||
doAcquireNanos(arg, nanosTimeout);
}
请求锁,如果请求失败,则返回false
ReentrantLock::unlock
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
@ReservedStackAccess
protected final boolean tryRelease(int releases) {
//释放锁 state数减releases
int c = getState() - releases;
//如果当前线程没有持有锁,则抛出异常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//当c=0时,锁完全释放,ownerThread设为null。
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
释放锁,直到state=0完全释放时,线程owner设置为null
ReentrantLock::newCondition
public Condition newCondition() {
return sync.newCondition();
}
final ConditionObject newCondition() {
return new ConditionObject();
}
ConditionObject::await
线程释放锁,阻塞挂起,直到被signal唤醒,则继续尝试获取锁
public final void await() throws InterruptedException {
//如果当前线程被中断、则抛出中断异常
if (Thread.interrupted())
throw new InterruptedException();
//新创建个节点,将当前线程加入等待队列
Node node = addConditionWaiter();
//完全释放锁
int savedState = fullyRelease(node);
int interruptMode = 0;
//阻塞直到node被唤醒
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//如果被中断,则直接break
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//尝试获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
ConditionObject::awaitNanos
线程释放锁,阻塞挂起一段时间,直到被signal唤醒或超时,则继续尝试获取锁
public final long awaitNanos(long nanosTimeout)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
// We don't check for nanosTimeout <= 0L here, to allow
// awaitNanos(0) as a way to "yield the lock".
final long deadline = System.nanoTime() + nanosTimeout;
long initialNanos = nanosTimeout;
//加入等待队列
Node node = addConditionWaiter();
//释放所有锁
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//等待超时
if (nanosTimeout <= 0L) {
transferAfterCancelledWait(node);
break;
}
if (nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
nanosTimeout = deadline - System.nanoTime();
}
//尝试获取锁
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
long remaining = deadline - System.nanoTime(); // avoid overflow
return (remaining <= initialNanos) ? remaining : Long.MIN_VALUE;
}
ConditionObject::awaitUntil
线程释放锁,阻塞挂起一段时间,直到被signal唤醒或到指定时间,则继续尝试获取锁
public final boolean awaitUntil(Date deadline)
throws InterruptedException {
long abstime = deadline.getTime();
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
boolean timedout = false;
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
if (System.currentTimeMillis() >= abstime) {
timedout = transferAfterCancelledWait(node);
break;
}
LockSupport.parkUntil(this, abstime);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null)
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
return !timedout;
}
ConditionObject::signal
把首节点的status设置为Node.SIGNAL 则阻塞的线程循环判断发现statue状态变了,则唤醒继续执行。如果设置status失败,则在此线程中调用LockSupport.unpark唤醒阻塞的线程
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
Node p = enq(node);
int ws = p.waitStatus;
//唤醒一个节点,把statue设置为Node.SIGNAL。如果设置失败了,则自己调用LockSupport.unpark唤醒线程
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
ConditionObject::signalAll
唤醒所有的节点。
public final void signalAll() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignalAll(first);
}
private void doSignalAll(Node first) {
lastWaiter = firstWaiter = null;
do {
Node next = first.nextWaiter;
first.nextWaiter = null;
transferForSignal(first);
first = next;
} while (first != null);
}
总结
在【从入门到放弃-Java】并发编程-锁-synchronized中,我们学习内置锁synchronized,与ReentrantLock对比
- 两者都是可重入的互斥锁。
- synchronized是隐式的加解锁,不需要手动解锁。而ReentrantLock需要显式的lock和unlock。lock加锁多少次,对应的就需要unlock多少次。因此一般都会在finally中unlock。避免因异常等情况导致锁无法释放
- ReentrantLock通过AQS(volatile state + CAS + CLH队列实现)加解锁。synchronized是通过monitor实现(存在偏向锁、轻量级锁、重量级锁等锁升级)。
- ReentrantLock可以使用lockInterruptibly响应中断,synchronized只能傻等、等到死
- ReentrantLock可以使用非公平锁和公平锁模式,可以通过非公平性减少CAS的竞争,提升性能。也可以通过公平锁减少线程饥饿情况发生
- ReentrantLock可以创造多个Condition,来实现线程等待通知机制(阻塞、唤醒)
更多文章
见我的博客:https://nc2era.com
written by AloofJr,转载请注明出处