LongAdder ,这哥们劲儿大(二)

简介: 我们在之前的文章中介绍到了 AtomicLong ,如果你还不了解,我建议你阅读一下这篇文章

最关键的 longAccumulate

先贴出来 longAccumulate 的完整代码,然后我们再进行分析:

final void longAccumulate(long x, LongBinaryOperator fn,
                          boolean wasUncontended) {
    // 获取线程的哈希值
    int h;
    if ((h = getProbe()) == 0) {
        ThreadLocalRandom.current(); // force initialization
        h = getProbe();
        wasUncontended = true;
    }
    boolean collide = false;                // True if last slot nonempty
    for (;;) {
        Cell[] as; Cell a; int n; long v;
        if ((as = cells) != null && (n = as.length) > 0) { // cells 已经初始化了
            if ((a = as[(n - 1) & h]) == null) { // 对应的 cell 不存在,需要新建
                if (cellsBusy == 0) {       // 只有在 cells 没上锁时才尝试新建
                    Cell r = new Cell(x);
                    if (cellsBusy == 0 && casCellsBusy()) { // 上锁
                        boolean created = false;
                        try {               // 上锁后判断 cells 对应元素是否被占用
                            Cell[] rs; int m, j;
                            if ((rs = cells) != null &&
                                (m = rs.length) > 0 &&
                                rs[j = (m - 1) & h] == null) {
                                rs[j] = r;
                                created = true;
                            }
                        } finally {
                            cellsBusy = 0;
                        }
                        if (created)        // cell 创建完毕,可以退出
                            break;
                        continue;           // 加锁后发现 cell 元素已经不再为空,轮询重试
                    }
                }
                collide = false;
            }
            // 下面这些 else 在尝试检测当前竞争度大不大,如果大则尝试扩容,如
            // 果扩容已经没用了,则尝试 rehash 来分散并发到不同的 cell 中
            else if (!wasUncontended)       // 已知 CAS 失败,说明并发度大
                wasUncontended = true;      // rehash 后重试
            else if (a.cas(v = a.value, ((fn == null) ? v + x :   // 尝试 CAS 将值更新到 cell 中
                                         fn.applyAsLong(v, x))))
                break;
            else if (n >= NCPU || cells != as) // cells 数组已经够大,rehash
                collide = false;               // At max size or stale
            else if (!collide)                 // 到此说明其它竞争已经很大,rehash
                collide = true;
            else if (cellsBusy == 0 && casCellsBusy()) { // rehash 都没用,尝试扩容
                try {
                    if (cells == as) {      // 加锁过程中可能有其它线程在扩容,需要排除该情形
                        Cell[] rs = new Cell[n << 1];
                        for (int i = 0; i < n; ++i)
                            rs[i] = as[i];
                        cells = rs;
                    }
                } finally {
                    cellsBusy = 0;
                }
                collide = false;
                continue;                   // Retry with expanded table
            }
            h = advanceProbe(h);            // rehash
        }
        else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells 未初始化
            boolean init = false;
            try {                           // Initialize table
                if (cells == as) {
                    Cell[] rs = new Cell[2];
                    rs[h & 1] = new Cell(x);
                    cells = rs;
                    init = true;
                }
            } finally {
                cellsBusy = 0;
            }
            if (init)
                break;
        }
        else if (casBase(v = base, ((fn == null) ? v + x :
                                    fn.applyAsLong(v, x))))
            break; // 其它线程在初始化 cells 或在扩容,尝试更新 base
    }
}

先别忙着惊讶,整理好心情慢慢看。

首先,在 Striped64 中,会先计算哈希,哈希值用于分发线程到 cells 数组。Striped64 中利用了 Thread 类中用来做伪随机数的 threadLocalRandomProbe

public class Thread implements Runnable {
  @sun.misc.Contended("tlr")
  int threadLocalRandomProbe;
}

Striped64 中复制(直接拿来用)了 ThreadLocalRandom 中的一些方法,使用 unsafe 来获取和修改字段值。

微信图片_20220417153735.jpg

可以理解为 getProbe 用来获取哈希值,advanceProbe 用来更新哈希值。

而其中的 PROBE 常量是在类加载的时候从类加载器提取的 threadLocalRandomProbe 的常量值。

微信图片_20220417153740.jpg

然后是一系列的循环判断向 cell 数组映射的操作,因为 Cells 类占用比较多的空间,所以它的初始化按需进行的,开始时为空,需要时先创建两个元素,不够用时再扩展成两倍大小。在修改 cells 数组(如扩展)时需要加锁,这也就是 cellsBusy 的作用。

微信图片_20220417153746.jpg

释放锁只需要将 cellsBusy 从 0 -> 1 即可。

cellsBusy = 0;

另外,这个方法虽然代码行很多,使用了很多 if else ,但其实代码设计使用了双重检查锁,也就是下面这种模式

if (condition_met) {       // 只在必要时进入
  lock();                  // 加锁
  done = false;            // 因为外层有轮询,需要记录任务是否需要继续
  try {
    if (condition_met) {   // 前面的 if 到加锁间状态可能变化,需要重新判断
      // ...
      done = true;         // 任务完成
    }
  } finally {
    unlock();              // 确保锁释放
  }
  if (done)                // 任务完成,可以退出轮询
    break;
}

doubleAccumulate 的整体逻辑与 longAccumulate 几乎一样,区别在于将 double 存储成  long 时需要转换。例如在创建 cell 时,需要

Cell r = new Cell(Double.doubleToRawLongBits(x));

doubleToRawLongBits 是一个 native 方法,将 double 转成 long。在累加时需要再转来回:

else if (a.cas(v = a.value,
               ((fn == null) ?
                Double.doubleToRawLongBits
                (Double.longBitsToDouble(v) + x) : // 转回 double 做累加
                Double.doubleToRawLongBits
                (fn.applyAsDouble
                 (Double.longBitsToDouble(v), x)))))

上面的流程我们只是高度概括了下,实际的分支要远比我们概括的更多,longAccumulate 会根据不同的状态来执行不同的分支,比如在线程竞争非常激烈的情况下,还会通过对 cells 进行扩容或者重新计算哈希值来重新分散线程,这些做法的目的是将多个线程的计数请求分散到不同的 cell 中的 index 上,这其实和 ConcurrentHashMap 的设计思路一样,只不过 Java7 中的 ConcurrentHashMap 实现 segment 加锁使用了比较重的 synchronized 锁,而 Striped64 使用了轻量级的 unsafe CAS 来进行并发操作。

一口气终于讲完一个段落了,累屁我了,歇会继续肝下面

微信图片_20220417153751.jpg

下面再说回 LongAdder 这个类。

LongAdder 的再认识

所以,LongAdder 的原理就是,在最初无竞争时,只更新 base 值,当有多线程竞争时通过分段的思想,让不同的线程更新不同的段,最后把这些段相加就得到了完整的 LongAdder 存储的值,下面我画个图帮助你理解一下。

微信图片_20220417153755.jpg

如果你理解了上面 Striped64 的描述和上面这幅图之后,LongAdder 你就理解的差不多了,最后还有一个 LongAdder 中的 sum 方法需要强调下:

微信图片_20220417153758.jpg

sum 方法会返回当前总和,在没有并发的情况下会返回一个准确的结果,也就是把所有的 base 值相加求和之后的结果,那么,现在有一个问题,如果前面已经累加到 sum 上的 Cell 的 value 值有修改,不就没法计算了吗?

这里的结论就是,LongAdder 不是强一致性的,它是最终一致性。

后记

这篇我和你聊了一下为什么引入 LongAdder 以及 AtomicLong 有哪些缺陷,然后带你了解了一下 LongAdder 的源码和它的底层实现,如果这篇文章对你有帮助的话,可以给我个三连,你的支持是我更新最大的动力!

相关文章
|
3月前
|
监控 安全 IDE
别再瞎用了!synchronized的正确使用姿势在这里!
别再瞎用了!synchronized的正确使用姿势在这里!
61 4
|
5月前
|
安全 Java 程序员
惊呆了!Java多线程里的“synchronized”竟然这么神奇!
【6月更文挑战第20天】Java的`synchronized`关键字是解决线程安全的关键,它确保同一时间只有一个线程访问同步代码。在案例中,`Counter`类的`increment`方法如果不加同步,可能会导致竞态条件。通过使用`synchronized`方法或语句块,可以防止这种情况,确保线程安全。虽然同步会带来性能影响,但它是构建并发应用的重要工具,平衡同步与性能是使用时需考虑的。了解并恰当使用`synchronized`,能有效应对多线程挑战。
20 1
|
6月前
|
存储 缓存 Oracle
Java线程池,白话文vs八股文,原来是这么回事!
一、线程池原理 1、白话文篇 1.1、正式员工(corePoolSize) 正式员工:这些是公司最稳定和最可靠的长期员工,他们一直在工作,不会被解雇或者辞职。他们负责处理公司的核心业务,比如生产、销售、财务等。在Java线程池中,正式员工对应于核心线程(corePoolSize),这些线程会一直存在于线程池中。他们负责执行线程池中的任务,如果没有任务,他们会等待新的任务到来。 1.2、所有员工(maximumPoolSize) 所有员工:这些是公司所有的员工,包括正式员工和外包员工。他们共同组成了公司的团队,协作完成公司的各种业务。在Java线程池中,所有员工对应于所有线程(maxim
|
传感器 缓存 安全
JUC第六讲:二面阿里竟然败在了 volatile 关键字上
JUC第六讲:二面阿里竟然败在了 volatile 关键字上
|
监控 安全 算法
这次锁面试题的连环16问,差点就跪了
这次锁面试题的连环16问,差点就跪了
206 0
面试又被问懵了吗?不如把ThreadLocal拆开了揉碎看看
1.为什么用 ThreadLocal? 所谓并发,就是有限资源需要应对远超资源的访问。解决问题的方法,要么增加资源应对访问;要么增加资源的利用率。 所以,相信这年头做开发的多多少少,都会那么几个“线程二三招”、“用锁五六式”。 那所带来的就是多线程访问下的并发安全问题。 共享变量的访问域跨越了原始的单线程,进入了千家万户的线程眼里。谁都可以用,谁都可以改,那不就打起来了吗? 因此,防止并发问题的最好办法,就是不要多线程访问(这科技水平倒退二十年~)。ThreadLocal 顾名思义,将一个变量限制为“线程封闭”:对象只被一个线程持有、访问、修改。
AtomicXXX 用得好好的,阿里为什么推荐使用 LongAdder?面试必问
面试连环炮 先来一连炮简单的面试,看你能顶住几轮? 栈长: 1、多线程情况下,进行数字累加(count++)要注意什么? 张三: 要注意给累加方法加同步锁,不然会出现变量可见性问题,变量值被其他线程覆盖出现不一致的情况
|
安全 容器
面试阿里被P8质问:ConcurrentHashMap真的线程安全吗?(上)
面试阿里被P8质问:ConcurrentHashMap真的线程安全吗?
203 1
面试阿里被P8质问:ConcurrentHashMap真的线程安全吗?(上)
|
存储 缓存 Java
LongAdder ,这哥们劲儿大(一)
我们在之前的文章中介绍到了 AtomicLong ,如果你还不了解,我建议你阅读一下这篇文章
LongAdder ,这哥们劲儿大(一)
|
安全
当Synchronized遇到这玩意儿,有个大坑,要注意! (上)
当Synchronized遇到这玩意儿,有个大坑,要注意! (上)
197 0
当Synchronized遇到这玩意儿,有个大坑,要注意! (上)