最关键的 longAccumulate
先贴出来 longAccumulate 的完整代码,然后我们再进行分析:
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { // 获取线程的哈希值 int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { // cells 已经初始化了 if ((a = as[(n - 1) & h]) == null) { // 对应的 cell 不存在,需要新建 if (cellsBusy == 0) { // 只有在 cells 没上锁时才尝试新建 Cell r = new Cell(x); if (cellsBusy == 0 && casCellsBusy()) { // 上锁 boolean created = false; try { // 上锁后判断 cells 对应元素是否被占用 Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) // cell 创建完毕,可以退出 break; continue; // 加锁后发现 cell 元素已经不再为空,轮询重试 } } collide = false; } // 下面这些 else 在尝试检测当前竞争度大不大,如果大则尝试扩容,如 // 果扩容已经没用了,则尝试 rehash 来分散并发到不同的 cell 中 else if (!wasUncontended) // 已知 CAS 失败,说明并发度大 wasUncontended = true; // rehash 后重试 else if (a.cas(v = a.value, ((fn == null) ? v + x : // 尝试 CAS 将值更新到 cell 中 fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) // cells 数组已经够大,rehash collide = false; // At max size or stale else if (!collide) // 到此说明其它竞争已经很大,rehash collide = true; else if (cellsBusy == 0 && casCellsBusy()) { // rehash 都没用,尝试扩容 try { if (cells == as) { // 加锁过程中可能有其它线程在扩容,需要排除该情形 Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); // rehash } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { // cells 未初始化 boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // 其它线程在初始化 cells 或在扩容,尝试更新 base } }
先别忙着惊讶,整理好心情慢慢看。
首先,在 Striped64 中,会先计算哈希,哈希值用于分发线程到 cells 数组。Striped64 中利用了 Thread 类中用来做伪随机数的 threadLocalRandomProbe
:
public class Thread implements Runnable { @sun.misc.Contended("tlr") int threadLocalRandomProbe; }
Striped64 中复制(直接拿来用)了 ThreadLocalRandom
中的一些方法,使用 unsafe 来获取和修改字段值。
可以理解为 getProbe
用来获取哈希值,advanceProbe
用来更新哈希值。
而其中的 PROBE
常量是在类加载的时候从类加载器提取的 threadLocalRandomProbe 的常量值。
然后是一系列的循环判断向 cell 数组映射的操作,因为 Cells 类占用比较多的空间,所以它的初始化按需进行的,开始时为空,需要时先创建两个元素,不够用时再扩展成两倍大小。在修改 cells 数组(如扩展)时需要加锁,这也就是 cellsBusy
的作用。
释放锁只需要将 cellsBusy 从 0 -> 1 即可。
cellsBusy = 0;
另外,这个方法虽然代码行很多,使用了很多 if else ,但其实代码设计使用了双重检查锁,也就是下面这种模式
if (condition_met) { // 只在必要时进入 lock(); // 加锁 done = false; // 因为外层有轮询,需要记录任务是否需要继续 try { if (condition_met) { // 前面的 if 到加锁间状态可能变化,需要重新判断 // ... done = true; // 任务完成 } } finally { unlock(); // 确保锁释放 } if (done) // 任务完成,可以退出轮询 break; }
而 doubleAccumulate
的整体逻辑与 longAccumulate 几乎一样,区别在于将 double 存储成 long 时需要转换。例如在创建 cell 时,需要
Cell r = new Cell(Double.doubleToRawLongBits(x));
doubleToRawLongBits 是一个 native 方法,将 double
转成 long
。在累加时需要再转来回:
else if (a.cas(v = a.value, ((fn == null) ? Double.doubleToRawLongBits (Double.longBitsToDouble(v) + x) : // 转回 double 做累加 Double.doubleToRawLongBits (fn.applyAsDouble (Double.longBitsToDouble(v), x)))))
上面的流程我们只是高度概括了下,实际的分支要远比我们概括的更多,longAccumulate 会根据不同的状态来执行不同的分支,比如在线程竞争非常激烈的情况下,还会通过对 cells 进行扩容或者重新计算哈希值来重新分散线程,这些做法的目的是将多个线程的计数请求分散到不同的 cell 中的 index 上,这其实和 ConcurrentHashMap
的设计思路一样,只不过 Java7 中的 ConcurrentHashMap 实现 segment 加锁使用了比较重的 synchronized
锁,而 Striped64 使用了轻量级的 unsafe CAS 来进行并发操作。
一口气终于讲完一个段落了,累屁我了,歇会继续肝下面。
下面再说回 LongAdder 这个类。
LongAdder 的再认识
所以,LongAdder 的原理就是,在最初无竞争时,只更新 base 值,当有多线程竞争时通过分段的思想,让不同的线程更新不同的段,最后把这些段相加就得到了完整的 LongAdder 存储的值,下面我画个图帮助你理解一下。
如果你理解了上面 Striped64 的描述和上面这幅图之后,LongAdder 你就理解的差不多了,最后还有一个 LongAdder 中的 sum 方法需要强调下:
sum
方法会返回当前总和,在没有并发的情况下会返回一个准确的结果,也就是把所有的 base 值相加求和之后的结果,那么,现在有一个问题,如果前面已经累加到 sum 上的 Cell 的 value 值有修改,不就没法计算了吗?
这里的结论就是,LongAdder 不是强一致性的,它是最终一致性。
后记
这篇我和你聊了一下为什么引入 LongAdder 以及 AtomicLong 有哪些缺陷,然后带你了解了一下 LongAdder 的源码和它的底层实现,如果这篇文章对你有帮助的话,可以给我个三连,你的支持是我更新最大的动力!