JUC系列学习(六):ReentrantReadWriteLock的使用及源码解析

本文涉及的产品
全局流量管理 GTM,标准版 1个月
云解析 DNS,旗舰版 1个月
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介: `ReentrantReadWriteLock`是一种读写锁,跟`ReentrantLock`一样也是实现了`Lock`,区别在于`ReentrantLock`是独占锁,同一时刻只能有一个线程持有锁,`ReentrantLock`在某些场景下可能会有并发性能的问题。而**ReentrantReadWriteLock是独占锁(写锁)、共享锁(读锁)可以同时存在的一种读写锁,在读操作远大于写操作的场景中,能实现更好的并发性**。当读锁存在时,其他线程仍然可以获取读锁并进行读操作,但是不能获得写锁进行写操作;当写锁存在时,其他线程的读锁、写锁都是不允许的。

ReentrantReadWriteLock的定义

ReentrantReadWriteLock是一种读写锁,跟ReentrantLock一样也是实现了Lock,区别在于ReentrantLock是独占锁,同一时刻只能有一个线程持有锁,ReentrantLock在某些场景下可能会有并发性能的问题。而ReentrantReadWriteLock是独占锁(写锁)、共享锁(读锁)可以同时存在的一种读写锁,在读操作远大于写操作的场景中,能实现更好的并发性。当读锁存在时,其他线程仍然可以获取读锁并进行读操作,但是不能获得写锁进行写操作;当写锁存在时,其他线程的读锁、写锁都是不允许的。

使用举例

举个ReentrantReadWriteLock的使用例子:

public class ReentrantReadWriteLockDemo {
    private static final String THREAD_READ = "读线程";
    private static final String THREAD_WRITE = "写线程";

    public static void main(String[] args) {

        Resource resource = new Resource();
        //模拟三个线程去执行写操作
        for (int i = 0; i < 3; i++) {
            new Thread(new Task(resource), THREAD_WRITE + i).start();
        }
        //模拟10个线程去执行读操作
        for (int i = 0; i < 10; i++) {
            new Thread(new Task(resource), THREAD_READ + i).start();
        }
    }

    public static class Task implements Runnable {
        Resource resource;

        Task(Resource resource) {
            this.resource = resource;
        }

        @Override
        public void run() {
            String curThreadName = Thread.currentThread().getName();
            Person person = new Person(curThreadName, new Random().nextInt(100));
            if (curThreadName.startsWith(THREAD_READ)) {
                //读操作
                resource.get();
            } else if (Thread.currentThread().getName().startsWith(THREAD_WRITE)) {
                //写操作
                resource.put(person, person.rank);
            }
        }
    }

    public static class Resource<K extends Comparable, V> {
        TreeMap<K, V> rankMap = new TreeMap<>();
        final ReadWriteLock rwLock = new ReentrantReadWriteLock();
        final Lock readLock = rwLock.readLock(); // 读取锁
        final Lock writeLock = rwLock.writeLock(); // 写入锁

        //写入值
        void put(K key, V value) {
            try {
                writeLock.lock();
                System.out.println(Thread.currentThread().getName() + "准备写入数据");
                Thread.sleep(new Random().nextInt(500));
                System.out.println(Thread.currentThread().getName() + "写入数据完毕:" + key.toString());
                rankMap.put(key, value);
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                writeLock.unlock();
            }
        }

        //获取值
        public List<K> get() {
            try {
                readLock.lock();
                System.out.println(Thread.currentThread().getName() + "准备读取数据");
                Thread.sleep(new Random().nextInt(500));
                //treeMap中取出的数据是按rank从大到小排序的
                List<K> list = new ArrayList<>(rankMap.keySet());
                System.out.println(Thread.currentThread().getName() + "读取数据完毕:" + Arrays.toString(list.toArray()));
                return list;
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                readLock.unlock();
            }
            return null;
        }

    }

    static class Person implements Comparable<Person> {
        public String name;//姓名
        public int rank;//得分

        Person(String name, int rank) {
            this.name = name;
            this.rank = rank;
        }

        @Override
        public int compareTo(Person person) {
            //分数少的在后面
            if (rank <= person.rank) {
                return 1;
            }
            return -1;
        }

        @Override
        public String toString() {
            return "name: " + name + ",rank: " + rank;
        }
    }

}

执行结果:

读线程4准备读取数据
读线程0准备读取数据
读线程3准备读取数据
读线程1准备读取数据
读线程2准备读取数据
读线程1读取数据完毕:[]
读线程4读取数据完毕:[]
读线程2读取数据完毕:[]
读线程3读取数据完毕:[]
读线程0读取数据完毕:[]
写线程1准备写入数据
写线程1写入数据完毕:name: 写线程1,rank: 83
写线程2准备写入数据
写线程2写入数据完毕:name: 写线程2,rank: 47
写线程0准备写入数据
写线程0写入数据完毕:name: 写线程0,rank: 55
读线程5准备读取数据
读线程6准备读取数据
读线程8准备读取数据
读线程9准备读取数据
读线程7准备读取数据
读线程8读取数据完毕:[name: 写线程1,rank: 83, name: 写线程0,rank: 55, name: 写线程2,rank: 47]
读线程9读取数据完毕:[name: 写线程1,rank: 83, name: 写线程0,rank: 55, name: 写线程2,rank: 47]
读线程6读取数据完毕:[name: 写线程1,rank: 83, name: 写线程0,rank: 55, name: 写线程2,rank: 47]
读线程7读取数据完毕:[name: 写线程1,rank: 83, name: 写线程0,rank: 55, name: 写线程2,rank: 47]
读线程5读取数据完毕:[name: 写线程1,rank: 83, name: 写线程0,rank: 55, name: 写线程2,rank: 47]

每次执行读写线程的顺序及数据可能不一样,但有一些结果是固定的:当有写线程操作时,其他线程不能进行任何操作,只能等写入完成后其他线程才能继续执行;但是当有读线程时,其他读线程同样可以执行读操作,但是此时不能进行写操作。

源码解析

UML类图:
ReentrantReadWriteLock.png

如果熟悉ReentrantLock实现的话,看到上面的类图也会感觉很熟悉,没错,ReentrantReadWriteLock的底层也是通过AQS实现的,不同的是ReentrantLock只能用来做独占锁,而ReentrantReadWriteLock是独占锁(写锁)、共享锁(读锁)共存的一种锁,那么他是如何实现的呢?我们通过看其源码实现来一探究竟:

public ReentrantReadWriteLock() {
    this(false);
}

public ReentrantReadWriteLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
    readerLock = new ReadLock(this);
    writerLock = new WriteLock(this);
}

public ReentrantReadWriteLock.WriteLock writeLock() { return writerLock; }
public ReentrantReadWriteLock.ReadLock  readLock()  { return readerLock; }

abstract static class Sync extends AbstractQueuedSynchronizer {}

ReentrantReadWriteLock构造方法中可以传入一个的boolean类型参数fair,表示是否是公平锁,默认是非公平锁,这里跟ReentrantLock一样。在使用ReentrantReadWriteLock时,分别通过writeLock()、readLock()获取对应的写锁、读锁,他们对应于ReentrantReadWriteLock的内部静态类WriteLock、ReadLock,来看对应的实现:

public static class WriteLock implements Lock {
    private final ReentrantReadWriteLock.Sync sync;

    protected WriteLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    public void lock() {
        sync.acquire(1);//独占锁
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);//独占锁
    }

    public boolean tryLock() {
        return sync.tryWriteLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        sync.release(1);//释放独占锁
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isHeldByCurrentThread() {
        return sync.isHeldExclusively();
    }

    public int getHoldCount() {
        return sync.getWriteHoldCount();
    }
}

public static class ReadLock implements Lock {
    private final ReentrantReadWriteLock.Sync sync;

    protected ReadLock(ReentrantReadWriteLock lock) {
        sync = lock.sync;
    }

    public void lock() {
        sync.acquireShared(1);//共享锁
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);//共享锁
    }

    public boolean tryLock() {
        return sync.tryReadLock();
    }

    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    public void unlock() {
        sync.releaseShared(1);//释放共享锁
    }

    public Condition newCondition() {
        throw new UnsupportedOperationException();
    }
}

WriteLock的实现类似于ReentrantLock,都是独占锁,通过state的0与大于等于1(大于1时是同一线程多次获取锁,即锁的重入性)来控制是否有线程占有锁;ReadLock的实现类似于Semaphore,都是共享锁,通过state的0与非0来控制多个线程的访问。既然ReentrantReadWriteLock既有独占锁,又有共享锁,那么ReentrantReadWriteLock又是如何管理两者的呢?

读锁与写锁的关系

我们知道了读锁、写锁都是通过AQS中的state来控制线程的访问,其中WriteLock通过SynctryAcquire()ReadLock通过SynctryAcquireShared()来尝试获取锁,我们直接看两者获取锁的实现:

static final int SHARED_SHIFT   = 16;
static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;

//共享锁持有的数量
static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
//独占锁持有的数量
static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        //1、如果读锁数量非空或者写锁数量非空并且持有者不是当前线程,直接返回,写锁获取失败,后续会加入到等待队列中
        // (Note: if c != 0 and w == 0 then shared count != 0)
        if (w == 0 || current != getExclusiveOwnerThread())
            return false;
        //2、如果当前持有数量超过最大值(65535),抛出异常
        if (w + exclusiveCount(acquires) > MAX_COUNT)
            throw new Error("Maximum lock count exceeded");
         //3、如果该线程是重入获取或队列策略允许获取,则该线程就会尝试获取锁并更新当前锁持有的线程
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() ||
        !compareAndSetState(c, c + acquires))
        return false;
    setExclusiveOwnerThread(current);
    return true;
}

protected final int tryAcquireShared(int unused) {
    Thread current = Thread.currentThread();
    int c = getState();
    //1、如果当前锁被其他线程的写锁持有,直接返回,获取读锁失败。
    if (exclusiveCount(c) != 0 &&
        getExclusiveOwnerThread() != current)
        return -1;
    int r = sharedCount(c);//获取当前读锁的数量
    if (!readerShouldBlock() &&
        r < MAX_COUNT &&
        compareAndSetState(c, c + SHARED_UNIT)) {
        //2、当前读锁不阻塞,且小于最大读锁数量,通过CAS尝试获取读锁
        if (r == 0) {//当前线程第一个并且第一次获取读锁,
            firstReader = current;
            firstReaderHoldCount = 1;
        } else if (firstReader == current) { //当前线程是第一次获取读锁的线程
            firstReaderHoldCount++;
        } else {
            // 当前线程不是第一个获取读锁的线程,放入线程本地变量
            HoldCounter rh = cachedHoldCounter;
            if (rh == null || rh.tid != getThreadId(current))
                cachedHoldCounter = rh = readHolds.get();
            else if (rh.count == 0)
                readHolds.set(rh);
            rh.count++;
        }
        //3、获取读锁成功后,会通过readHolds(ThreadLocalHoldCounter)来记录当前读锁所在线程的锁获取次数信息,本质上是通过ThreadLocal来保存一个Int变量来统计的。
        return 1;
    }
    return fullTryAcquireShared(current);
}

写锁WriteLock在尝试获取锁时,首先通过AQS中的getState()获取state值,然后通过exclusiveCount(int)state做了一次操作:

static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }

EXCLUSIVE_MASK的值是65535(2的16次方减1),即state与低16位1做与操作,结果是state的高16位都会变成0,低16位的值作为其返回值,代表独占锁持有的数量。既然写锁用了state的低16位,那么读锁是不是就用了state的高16位呢?来看下读锁ReadLock通过sharedCount(int)操作的state

static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }

果然,读锁中对state做右移16位的操作,即去掉了低16位,高16位的十进制数作为其返回值,代表共享读锁持有的数量。最终通过一个state变量实现了对读锁和写锁的控制。

ReentrantReadWriteLock的写锁饥饿问题

ReentrantReadWriteLock适用于读多写少的场景,我们知道当读锁存在的时候,写锁只能进入队列等待,那么如果队列前面有大量的读操作等待时,后面的写操作也只能等待前面的读操作都执行完才能执行写操作,所以可能会产生写操作很久得不到执行,数据不能更新,发生写锁饥饿的情况。

如何优化呢?ReadLock可以认为是一个悲观读锁,这里的悲观是针对WriteLock来说的,即ReadLock存在时不允许WriteLock进行写操作,因为写操作会改变数据源,进而影响读操作。那么能不能换个思路来想这个问题,读操作可以分为乐观读锁和悲观读锁,乐观读锁认为读操作时不会有写操作来改变数据,所以乐观读锁在读操作时并不会真正的去加锁,读操作时允许写操作执行,等读操作执行完再去校验数据的一致性;悲观读锁恰恰相反,一开始就会进行加锁,不允许读操作和写操作同时进行

首先读操作先采用乐观读锁,即开始读操作不进行加锁,只是在读之前先获取数据对应的版本号,然后将数据copy一份到读线程中,读操作结束后,通过数据版本判断当前读数据是否有效(在读操作进行时可能会有写线程去改变数据),如果有效,可以直接使用;否则说明在乐观读操作时有写操作改变了数据,那么使用悲观读锁ReadLock进行加锁,再重新去读数据,此时拿到的一定是最新的数据。上述的思路已经在JDK1.8 引入的StampedLock实现了,其执行多操作流程大致如下:

StampedLock lock = new StampedLock();

long stamp = lock.tryOptimisticRead(); //非阻塞获取版本信息
copyVaraibale2ThreadMemory();//拷贝变量到线程本地堆栈
if(!lock.validate(stamp)){ // 校验
    long stamp = lock.readLock();//获取读锁
    try {
        copyVaraibale2ThreadMemory();//拷贝变量到线程本地堆栈
     } finally {
       lock.unlock(stamp);//释放悲观锁
    }
}

因为读操作远大于写操作,StampedLock中不加锁的读操作效率会更高,同时也能避免WriteLock长时间得不到执行、发生写锁饥饿的情况。

总结

ReentrantReadWriteLock可以使一个资源同一时间被多个读线程访问,或者被一个写线程访问,但是两者不能同时进行。内部通过ReadLock实现读锁,通过WriteLock实现写锁

readLock.lock():

  • 当有其他写线程在执行时(持有写锁),读锁获取会失败直到其他写线程释放了写锁;
  • 本线程一旦获取了读锁,其他线程的写锁都不能获取只能等待所有的读锁都释放后才能尝试获取写锁

writeLock.lock():

  • 当其他任何线程如果持有读锁或写锁时,本线程获取写锁失败直到其他线程释放了所有的读锁和写锁,本线程才有机会尝试获取写锁
  • 一旦本线程获取了写锁,其他线程将不被允许获取任何读锁和写锁,直到本线程释放了写锁。

参考

【1】https://stackoverflow.com/questions/18354339/reentrantreadwritelock-whats-the-difference-between-readlock-and-writelock

【2】你真的了解 ReentrantReadWriteLock 吗?

【3】【Java并发工具类】ReadWriteLock

【4】死磕 java同步系列之ReentrantReadWriteLock源码解析

【5】J.U.C之读写锁:ReentrantReadWriteLock

相关文章
|
12天前
|
存储 设计模式 算法
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
行为型模式用于描述程序在运行时复杂的流程控制,即描述多个类或对象之间怎样相互协作共同完成单个对象都无法单独完成的任务,它涉及算法与对象间职责的分配。行为型模式分为类行为模式和对象行为模式,前者采用继承机制来在类间分派行为,后者采用组合或聚合在对象间分配行为。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象行为模式比类行为模式具有更大的灵活性。 行为型模式分为: • 模板方法模式 • 策略模式 • 命令模式 • 职责链模式 • 状态模式 • 观察者模式 • 中介者模式 • 迭代器模式 • 访问者模式 • 备忘录模式 • 解释器模式
【23种设计模式·全精解析 | 行为型模式篇】11种行为型模式的结构概述、案例实现、优缺点、扩展对比、使用场景、源码解析
|
12天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
结构型模式描述如何将类或对象按某种布局组成更大的结构。它分为类结构型模式和对象结构型模式,前者采用继承机制来组织接口和类,后者釆用组合或聚合来组合对象。由于组合关系或聚合关系比继承关系耦合度低,满足“合成复用原则”,所以对象结构型模式比类结构型模式具有更大的灵活性。 结构型模式分为以下 7 种: • 代理模式 • 适配器模式 • 装饰者模式 • 桥接模式 • 外观模式 • 组合模式 • 享元模式
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
12天前
|
设计模式 存储 安全
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
创建型模式的主要关注点是“怎样创建对象?”,它的主要特点是"将对象的创建与使用分离”。这样可以降低系统的耦合度,使用者不需要关注对象的创建细节。创建型模式分为5种:单例模式、工厂方法模式抽象工厂式、原型模式、建造者模式。
【23种设计模式·全精解析 | 创建型模式篇】5种创建型模式的结构概述、实现、优缺点、扩展、使用场景、源码解析
|
1月前
|
PyTorch Shell API
Ascend Extension for PyTorch的源码解析
本文介绍了Ascend对PyTorch代码的适配过程,包括源码下载、编译步骤及常见问题,详细解析了torch-npu编译后的文件结构和三种实现昇腾NPU算子调用的方式:通过torch的register方式、定义算子方式和API重定向映射方式。这对于开发者理解和使用Ascend平台上的PyTorch具有重要指导意义。
|
13天前
|
安全 搜索推荐 数据挖掘
陪玩系统源码开发流程解析,成品陪玩系统源码的优点
我们自主开发的多客陪玩系统源码,整合了市面上主流陪玩APP功能,支持二次开发。该系统适用于线上游戏陪玩、语音视频聊天、心理咨询等场景,提供用户注册管理、陪玩者资料库、预约匹配、实时通讯、支付结算、安全隐私保护、客户服务及数据分析等功能,打造综合性社交平台。随着互联网技术发展,陪玩系统正成为游戏爱好者的新宠,改变游戏体验并带来新的商业模式。
|
2月前
|
监控 Java 应用服务中间件
高级java面试---spring.factories文件的解析源码API机制
【11月更文挑战第20天】Spring Boot是一个用于快速构建基于Spring框架的应用程序的开源框架。它通过自动配置、起步依赖和内嵌服务器等特性,极大地简化了Spring应用的开发和部署过程。本文将深入探讨Spring Boot的背景历史、业务场景、功能点以及底层原理,并通过Java代码手写模拟Spring Boot的启动过程,特别是spring.factories文件的解析源码API机制。
87 2
|
3月前
|
缓存 Java 程序员
Map - LinkedHashSet&Map源码解析
Map - LinkedHashSet&Map源码解析
87 0
|
3月前
|
算法 Java 容器
Map - HashSet & HashMap 源码解析
Map - HashSet & HashMap 源码解析
69 0
|
3月前
|
存储 Java C++
Collection-PriorityQueue源码解析
Collection-PriorityQueue源码解析
75 0
|
3月前
|
安全 Java 程序员
Collection-Stack&Queue源码解析
Collection-Stack&Queue源码解析
96 0