DoubleAccumulator 源码详解
迪丽瓦拉
2024-05-30 01:37:22
0

DoubleAccumulator

简介

这个类是新增的并发统计工具,可以多线程安全计数。

他的构造方法有两个参数,分别是统计方法和初始值。所以具体的统计是加减乘除是由传入的操作方法决定的。

    public DoubleAccumulator(DoubleBinaryOperator accumulatorFunction,double identity) {this.function = accumulatorFunction;//identity参数是用来存储初始值的 例如identity=1 那么新创建完对象 get() 方法获得的值就是1base = this.identity = Double.doubleToRawLongBits(identity);}

DoubleAccumulator 继承了Striped64,Striped64的核心存储结构是Cell数组

 @sun.misc.Contended static final class Cell {volatile long value;Cell(long x) { value = x; }final boolean cas(long cmp, long val) {return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);}// Unsafe mechanicsprivate static final sun.misc.Unsafe UNSAFE;private static final long valueOffset;static {try {UNSAFE = sun.misc.Unsafe.getUnsafe();Class ak = Cell.class;//获取value的地址valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));} catch (Exception e) {throw new Error(e);}}}

方法详解

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-tWJaBGrX-1678182390812)(/Users/cool/Documents/门票md/学习/java/concurrent/atomic/DoubleAccumulator & LongAccumulator.assets/image-20230306193305640.png)]

accumulate()

统计计算,核心计算方法。

    public void accumulate(double x) {Cell[] as; long b, v, r; int m; Cell a;if (//首先判断cells是不是null 如果不是null就可以继续进行(as = cells) != null ||//如果是cells是null //判断fun.apply(x)之后 是不是数据没有变化 例如 1+0 还是等于1 因为数值属于无效操作,所以不进行初始化//否则进行cas替换数据 如果替换成功 则进行初始化操作, //   所以这个位置存在一个问题如果cas替换失败 本次操作就失败了 也不会有任何提示(r = Double.doubleToRawLongBits(function.applyAsDouble(Double.longBitsToDouble(b = base), x))) != b  && !casBase(b, r)) {//标记数据是否是无效操作boolean uncontended = true;if (as == null || (m = as.length - 1) < 0 ||//看到这个方法大概就知道是怎么实现线程安全的了//getProbe()获取当前线程hash值 然后&上数组长度len-1 获取slot的位置//也就是每个线程单独操作一个cell 这样就不会多线程冲突了,java中还有很多这样的例子 例如ConcurrentHashMap(a = as[getProbe() & m]) == null ||//下面这个判断和上面的判断逻辑一样 就是看操作x是不是无效操作 //重新对uncontended 进行赋值!(uncontended =(r = Double.doubleToRawLongBits(function.applyAsDouble(Double.longBitsToDouble(v = a.value), x))) == v ||//如果不是无效操作 就替换v(原始值)和r(算后值)的值 如果替换失败了 就会执行doubleAccumulate方法a.cas(v, r)))doubleAccumulate(x, function, uncontended);}}
    final void doubleAccumulate(double x, DoubleBinaryOperator fn,boolean wasUncontended) {int h;//getProbe()获取为0说明还没有初始化线程变量if ((h = getProbe()) == 0) {//初始化线程随机数变量ThreadLocalRandom.current(); // force initializationh = getProbe();//线程都未初始化 所以强制true  //正常这个字段 是 调用方法cas失败了 才会进入 这个值会固定式false 是true的时候走不到这个方法wasUncontended = true;}//官方备注 如果最后一个slot不为空 则改为trueboolean collide = false;                // True if last slot nonempty//这里就开始标准的cas操作 无限for循环for (;;) {Cell[] as; Cell a; int n; long v;//cells不为空的情况if ((as = cells) != null && (n = as.length) > 0) {//如果线程没有初始化过cellif ((a = as[(n - 1) & h]) == null) {//cellsBusy是一个锁 判断当前是否存在cell创建 0代表当前没有创建竞争if (cellsBusy == 0) {       // Try to attach new CellCell r = new Cell(Double.doubleToRawLongBits(x));//double-check 并且进行cas加锁 cellsBusy 设置成1 //加锁的目的就是为了防止并发创建cellif (cellsBusy == 0 && casCellsBusy()) {boolean created = false;try {               // Recheck under lockCell[] rs; int m, j;//cells没有满就进行创建if ((rs = cells) != null &&(m = rs.length) > 0 &&rs[j = (m - 1) & h] == null) {rs[j] = r;created = true;}} finally {//释放锁cellsBusy = 0;}//创建成功就跳出if (created)break;//失败就继续// 一般失败的情况就是cells[]满了 需要扩容了continue;           // Slot is now non-empty}}collide = false;}//上面的调用链路中 cas执行失败才会进入doubleAccumulate//失败的话wasUncontended就是falseelse if (!wasUncontended)       // CAS already known to failwasUncontended = true;      // Continue after rehash//cell存在的情况下进行数据cas替换else if (a.cas(v = a.value,((fn == null) ?Double.doubleToRawLongBits(Double.longBitsToDouble(v) + x) :Double.doubleToRawLongBits(fn.applyAsDouble(Double.longBitsToDouble(v), x)))))break;//如果长度n>cpu的个数 或者 cells已经被扩容了else if (n >= NCPU || cells != as)collide = false;            // At max size or staleelse if (!collide)collide = true;//进行扩容else if (cellsBusy == 0 && casCellsBusy()) {try {if (cells == as) {      // Expand table unless stale//每次扩容 容量翻倍Cell[] rs = new Cell[n << 1];for (int i = 0; i < n; ++i)rs[i] = as[i];cells = rs;}} finally {cellsBusy = 0;}collide = false;continue;                   // Retry with expanded table}//对线程的随机值进行修改 尝试修改当前线程slot的位置 看是否存在空缺h = advanceProbe(h);}//如果数据为空进行初始化else if (cellsBusy == 0 && cells == as && casCellsBusy()) {boolean init = false;try {                           // Initialize tableif (cells == as) {//初始化容量是2Cell[] rs = new Cell[2];rs[h & 1] = new Cell(Double.doubleToRawLongBits(x));cells = rs;init = true;}} finally {cellsBusy = 0;}if (init)break;}//尝试进行替换else if (casBase(v = base,((fn == null) ?Double.doubleToRawLongBits(Double.longBitsToDouble(v) + x) :Double.doubleToRawLongBits(fn.applyAsDouble(Double.longBitsToDouble(v), x)))))break;                          // Fall back on using base}}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-guXrQeGa-1678182390813)(/Users/cool/Documents/门票md/学习/java/concurrent/atomic/DoubleAccumulator & LongAccumulator.assets/image-20230307161018265.png)]

get()

    public double get() {Cell[] as = cells; Cell a;double result = Double.longBitsToDouble(base);if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)result = function.applyAsDouble(result, Double.longBitsToDouble(a.value));}}return result;}

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-ea0vgNKO-1678182390814)(/Users/cool/Documents/门票md/学习/java/concurrent/atomic/DoubleAccumulator & LongAccumulator.assets/image-20230307173115829.png)]

这个模式和ConcurrentHashMap的size()方法非常类似

public int size() {long n = sumCount();return ((n < 0L) ? 0 :(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :(int)n);}
//size方法调用了此方法
final long sumCount() {CounterCell[] as = counterCells; CounterCell a;long sum = baseCount;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)sum += a.value;}}return sum;}

大家注意一点哈 volatile 只是保证了可见性 而不是保证原子性,所以如果在get的时候存在并发修改,最终的结果可能是不同步的。

reset()

    public void reset() {Cell[] as = cells; Cell a;//进行重置  为什么base需要重置 看下面的序列化介绍base = identity;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null)//进行重置a.value = identity;}}}

getThenReset()

这个就是get和reset的结合体

 public double getThenReset() {Cell[] as = cells; Cell a;double result = Double.longBitsToDouble(base);base = identity;if (as != null) {for (int i = 0; i < as.length; ++i) {if ((a = as[i]) != null) {double v = Double.longBitsToDouble(a.value);a.value = identity;result = function.applyAsDouble(result, v);}}}return result;}

关于序列化

此类单独实现了一个序列化代理SerializationProxy,重写序列化的目的 主要是为了节省序列化的大小,我们看代码可以看到Striped64类中的字段都被transient修饰了,被transient修饰的数据是不参与序列化的,类似的还有常见的HashTable。

    private static class SerializationProxy implements Serializable {private static final long serialVersionUID = 7249069246863182397L;//真正序列化 只会序列化下面的几个字段private final double value;private final DoubleBinaryOperator function;private final long identity;SerializationProxy(DoubleAccumulator a) {function = a.function;identity = a.identity;value = a.get();}private Object readResolve() {double d = Double.longBitsToDouble(identity);DoubleAccumulator a = new DoubleAccumulator(function, d);//最终读取来的数据最终集复制给了base 所以上面reset的时候需要重新初始化base字段//这个位置赋值 不用去重新创建大量的cell了起到了节省内存的目的,同时cell变少了 get()方法的速度 对应的就变快了a.base = Double.doubleToRawLongBits(value);return a;}}

总结

  • 初始cells size大小为2,创建cell会加锁
  • Cells最大size受到cpu核数限制,最大为N(cpu核数)向上取2的n次幂
  • get方法是实时计算总值,如果存在并发修改可能感知不到。
  • volatile只是保证了可见性,不保证原子性

相关内容