LongAdder其实是AtomicLong的升级版,AtomicLong在多线程下会导致较多的自旋重试,主要原因还是因为多线程同时cas同一个变量的时候失败次数较多,那LongAdder的出现就是为了解决AtomicLong在多线程坏境下的痛点。

  首先分析源代码之前我们先抛出以下几个问题:

  1. LongAdder如何解决AtomicLong的痛点?
  2. LongAdder内部维护的cell数组在什么情况下初始化?
  3. LongAdder怎么确定当前线程访问内部cell数组的那个元素?
  4. LongAdder如何扩容,在什么情况下扩容?
  5. LongAdder访问cell冲突后怎么解决?

先来看下LongAdder类的继承关系
LongAdder类继承图
LongAdder继承自Striped64,Striped64内部维护的三个变量cells、base、cellsBusy,base其实是基础值,cells是一个Cell数组,LongAdder的值其实就等于base加上cells数组内的value值。cellsBusy是cells初始化,扩容以及创建新的Cell的标志位。

接下来我们分析源代码,LongAdder的源码我们主要看add方法就行。

public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {  //(1)
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null 
                || !(uncontended = a.cas(v = a.value, v + x)))   //(2)
                longAccumulate(x, null, uncontended);    //(3)
        }
    }

代码1:先看cells数组是否为空,如果为空进行cas原子操作,将新增的值赋值给原变量,如果cells数组不为空或者是cas失败进行下一步。

问题1:AtomicLong的痛点就是多线程同时cas一个变量时,导致失败后自旋重试次数特别多,所以LongAdder内部特意扩充了一个cell数组去减少线程之间变量的竞争关系,从而提高了效率。

代码2:别看条件比较多,其实总结起来就一句话,查找当前线程所属的cell元素,如果有进行cas值替换,如果cell为空且替换失败的话进行下一步,这里注意下getProbe方法,该方法是获取当前线程下的threadLocalRandomProbe的值,这个值一开始为0,初始化的逻辑在代码3中。
1、2失败都会进入3,3代码业务相对复杂我们单独拧出来分析

final void longAccumulate(long x, LongBinaryOperator fn,
                              boolean wasUncontended) {
        int h;
        if ((h = getProbe()) == 0) {
            ThreadLocalRandom.current(); //这里进行ThreadLocalRandom初始化
            h = getProbe();//获取当前线程下的threadLocalRandomProbe值
            wasUncontended = true;//该值为无竞争标志位置位,如果存在竞争该值则为false,无竞争则为true
        }
        boolean collide = false;               
        for (;;) {
            Cell[] as; Cell a; int n; long v;
            if ((as = cells) != null && (n = as.length) > 0) {
                if ((a = as[(n - 1) & h]) == null) { (3) //当前线程访问的cell元素为空进行cell初始化 
                    if (cellsBusy == 0) {//判断标志位,如果为零表示可以进行初始化操作
                        Cell r = new Cell(x);  //新建一个Cell类
                        if (cellsBusy == 0 && casCellsBusy()) { //再次判断并且对cellsBusy进行cas值替换
                            boolean created = false;
                            try {               
                                Cell[] rs; int m, j;
                                if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) {
                                    rs[j] = r;
                                    created = true;
                                }
                            } finally {
                                cellsBusy = 0;//清除初始化标志位
                            }
                            if (created)//如果创建Cell并且赋值成功直接结束本次add操作
                                break;
                            continue;       
                        }
                    }
                    collide = false;
                }
                else if (!wasUncontended)
                    wasUncontended = true;     
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                else if (n >= NCPU || cells != as)//
                    collide = false;            
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {(4)
                    try {
                        if (cells == as) {   
                            Cell[] rs = new Cell[n << 1];//进行cell数组扩容,每次都扩充为原来的二倍
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];//将老数据转移至新cell数组中
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   
                }
                h = advanceProbe(h); (5) //重新计算当前线程的threadLocalRandomProbe值,减少冲突的机会。
            }
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {  ----(2)
                boolean init = false;
                try {
                    if (cells == as) {
                        Cell[] rs = new Cell[2];//初始化cell数组长度为2
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
                break;                          
        }
    }
static final int advanceProbe(int probe) {
        probe ^= probe << 13;   // xorshift
        probe ^= probe >>> 17;
        probe ^= probe << 5;
        UNSAFE.putInt(Thread.currentThread(), PROBE, probe);
        return probe;
    }

上述代码逻辑比较复杂,这里我们带着剩下的问题去找答案会更好一些,下面我们逐个分析。

问题2:LongAdder在什么情况下会进行数组初始化呢?当在对base值进行cas失败后,且cell数组为空的时候就会进行cell数组初始化操作,在源码中我标注了下,在代码2中对cell数组进行初始化,并且初始化的数组大小长度为2,并且新创建一个cell类并赋值在h & 1位置上。

问题3:在代码3中可以看到“getProbe()&n-1”与运算逻辑,由此我们知道是由线程的threadLocalRandomProbe值与数组最大下标进行与运算得出的,这个逻辑有点类似hashMap寻找散列桶下标的逻辑。

问题4:代码4逻辑里面可以看到对cell数组进行了扩容操作,那么进入代码4的条件结合前面if判断可以得出(当前线程所访问的cell元素不为空 & 对当前访问的cell类中的value值cas替换失败 & 数组长度不大于cpu的个数
),每次扩容都将数组扩充为原来的两倍,并且将老数据循环赋值至新数组中。

问题5:在代码5中我们可以看到对线程的threadLocalRandomProbe进行了重新计算,采用的是xorshift随机算法,以此来减小下次访问冲突的机会。

这里我给一个我对AtomicLong和LongAdder性能对比的小测试

@State(Scope.Benchmark)
@Warmup(iterations = 5, time = 1)
@Measurement(iterations = 5, time = 1)
@Fork(1)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@BenchmarkMode(Mode.AverageTime)
public class TestMain {

    private LongAdder longAdder = new LongAdder();
    private AtomicLong atomicLong = new AtomicLong();

    public static void main(String[] args) throws RunnerException {
        Options options = new OptionsBuilder().include(TestMain.class.getName()).build();

        new Runner(options).run();
    }

    @Benchmark
    @Threads(8)
    public void runLongAdder() {
        for (int i = 0; i < 1000; i++) {
            longAdder.add(i);
        }
    }

    @Benchmark
    @Threads(8)
    public void runAtomicLong() {
        for (int i = 0; i < 1000; i++) {
            atomicLong.addAndGet(i);
        }
    }
}

运行结果:
运行结果

最后给一个问题:既然有了高效率的LongAdder那AtomicLong是不是可以废弃?

参考文献:

Logo

瓜分20万奖金 获得内推名额 丰厚实物奖励 易参与易上手

更多推荐