Java並發編程筆記之LongAdder和LongAccumulator源碼探究


一.LongAdder原理

LongAdder類是JDK1.8新增的一個原子性操作類。AtomicLong通過CAS算法提供了非阻塞的原子性操作,相比受用阻塞算法的同步器來說性能已經很好了,但是JDK開發組並不滿足於此,因為非常搞並發的請求下AtomicLong的性能是不能讓人接受的。

如下AtomicLong 的incrementAndGet的代碼,雖然AtomicLong使用CAS算法,但是CAS失敗后還是通過無限循環的自旋鎖不多的嘗試,這就是高並發下CAS性能低下的原因所在。源碼如下:

  public final long incrementAndGet() {
        for (;;) {
            long current = get();
            long next = current + 1;
            if (compareAndSet(current, next))
                return next;
        }
    }

在高並發下N多線程同時去操作一個變量會造成大量線程CAS失敗,然后處於自旋狀態,這樣導致大大浪費CPU資源,降低了並發性。

既然AtomicLong性能問題是由於過多線程同時去競爭同一個變量的更新而降低的,那么如果把一個變量分解為多個變量,讓同樣多的線程去競爭多個資源,那么性能問題不久迎刃而解了嗎?

沒錯,因此,JDK8 提供的LongAdder就是這個思路。下面通過圖形來標示兩者的不同,如下圖:

如上圖 AtomicLong 是多個線程同時競爭同一個變量情景。

 

如上圖所示,LongAdder則是內部維護多個Cell變量,每個Cell里面有一個初始值為0的long型變量,在同等並發量的情況下,爭奪單個變量的線程會減少,這是變相的減少了爭奪共享資源的並發量,另外多個線程在爭奪同一個原子變量時候,

如果失敗並不是自旋CAS重試,而是嘗試獲取其他原子變量的鎖,最后當獲取當前值時候是把所有變量的值累加后再加上base的值返回的。

 

LongAdder維護了要給延遲初始化的原子性更新數組和一個基值變量base數組的大小保持是2的N次方大小,數組表的下標使用每個線程的hashcode值的掩碼表示,數組里面的變量實體是Cell類型。

Cell 類型是Atomic的一個改進,用來減少緩存的爭用,對於大多數原子操作字節填充是浪費的,因為原子操作都是無規律的分散在內存中進行的,多個原子性操作彼此之間是沒有接觸的,但是原子性數組元素彼此相鄰存放將能經常共享緩存行,也就是偽共享。所以這在性能上是一個提升。

另外由於Cells占用內存是相對比較大的,所以一開始並不創建,而是在需要時候再創建,也就是惰性加載,當一開始沒有空間時候,所有的更新都是操作base變量。

 

接下來進行LongAdder代碼簡單分析

這里我只是簡單的介紹一下代碼的實現,詳細實現,大家可以翻看代碼去研究。為了降低高並發下多線程對一個變量CAS爭奪失敗后大量線程會自旋而造成降低並發性能問題,LongAdder內部通過根據並發請求量來維護多個Cell元素(一個動態的Cell數組)來分擔對單個變量進行爭奪資源。

首先我們先看LongAdder的構造類圖,如下圖:

 

 可以看到LongAdder繼承自Striped64類,Striped64內部維護着三個變量,LongAdder的真實值其實就是base的值與Cell數組里面所有Cell元素值的累加,base是個基礎值,默認是0,cellBusy用來實現自旋鎖,當創建Cell元素或者擴容Cell數組時候用來進行線程間的同步。

 

接下來進去源碼如看Cell的構造,源碼如下:

  @sun.misc.Contended static final class Cell {
        volatile long value;
        Cell(long x) { value = x; }
        final boolean cas(long cmp, long val) {
            return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
        }

        // Unsafe 技術
        private static final sun.misc.Unsafe UNSAFE;
        private static final long valueOffset;
        static {
            try {
                UNSAFE = sun.misc.Unsafe.getUnsafe();
                Class<?> ak = Cell.class;
                valueOffset = UNSAFE.objectFieldOffset
                    (ak.getDeclaredField("value"));
            } catch (Exception e) {
                throw new Error(e);
            }
        }
    }

正如上面的代碼可以知道Cell的構造很簡單,內部維護一個聲明volatile的變量,這里聲明為volatile是因為線程操作value變量時候沒有使用鎖,為了保證變量的內存可見性這里只有聲明為volatile。另外這里就是先前文件所說的使用Unsafe類的方法來設置value的值

 

接下來進入LongAdder的源碼里面去看幾個重要的方法,如下:

  1.long sum() 方法:返回當前的值,內部操作是累加所有 Cell 內部的 value 的值后累加 base,如下代碼,由於計算總和時候沒有對 Cell 數組進行加鎖,所以在累加過程中可能有其它線程對 Cell 中的值進行了修改,也有可能數組進行了擴容,所以 sum 返回的值並不是非常精確的,

返回值並不是一個調用 sum 方法時候的一個原子快照值。

  源碼如下:

  

public long sum() {
        Cell[] as = cells; Cell a;
        long sum = base;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    sum += a.value;
            }
        }
        return sum;
}

 

  2.void reset() 方法:重置操作,如下代碼把 base 置為 0,如果 Cell 數組有元素,則元素值重置為 0。源碼如下:

  

  public void reset() {
        Cell[] as = cells; Cell a;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    a.value = 0L;
            }
        }
   }

 

  3.long sumThenReset() 方法:是sum 的改造版本,如下代碼,在計算 sum 累加對應的 cell 值后,把當前 cell 的值重置為 0,base 重置為 0。 當多線程調用該方法時候會有問題,比如考慮第一個調用線程會清空 Cell 的值,后一個線程調用時候累加時候累加的都是 0 值。

  源碼如下:

  

  public long sumThenReset() {
        Cell[] as = cells; Cell a;
        long sum = base;
        base = 0L;
        if (as != null) {
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null) {
                    sum += a.value;
                    a.value = 0L;
                }
            }
        }
        return sum;
    }

 

  4.long longValue() 等價於 sum(),源碼如下:

  

    public long longValue() {
        return this.sum();
    }

 

  5.void add(long x) 累加增量 x 到原子變量,這個過程是原子性的。源碼如下:

  

  public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {//(1)
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||//(2)
                (a = as[getProbe() & m]) == null ||//(3)
                !(uncontended = a.cas(v = a.value, v + x)))//(4)
                longAccumulate(x, null, uncontended);//(5)
        }
    }

    final boolean casBase(long cmp, long val) {
        return UNSAFE.compareAndSwapLong(this, BASE, cmp, val);
    }

可以看到上面代碼,當第一個線程A執行add時候,代碼(1)會執行casBase方法,通過CAS設置base為 X, 如果成功則直接返回,這時候base的值為1。

假如多個線程同時執行add時候,同時執行到casBase則只有一個線程A成功返回,其他線程由於CAS失敗執行代碼(2),代碼(2)是獲取cells數組的長度,如果數組長度為0,則執行代碼(5),否則cells長度不為0,說明cells數組有元素則執行代碼(3),

代碼(3)首先計算當前線程在數組中下標,然后獲取當前線程對應的cell值,如果獲取到則執行(4)進行CAS操作,CAS失敗則執行代碼(5)。

代碼(5)里面是具體進行數組擴充和初始化,這個代碼比較復雜,這里就不講解了,有興趣的可以進去看看。

 

二.LongAccumulator類源碼分析

LongAdder類是LongAccumulator的一個特例,LongAccumulator提供了比LongAdder更強大的功能,如下構造函數,其中accumulatorFunction是一個雙目運算器接口,根據輸入的兩個參數返回一個計算值,identity則是LongAccumulator累加器的初始值。

public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity) {
        this.function = accumulatorFunction;
        base = this.identity = identity;
}
public interface LongBinaryOperator {

       //根據兩個參數計算返回一個值
       long applyAsLong(long left, long right);
}

上面提到LongAdder 其實就是LongAccumulator 的一個特例,調用LongAdder 相當使用下面的方式調用 LongAccumulator。

   LongAdder adder = new LongAdder();
    LongAccumulator accumulator = new LongAccumulator(new LongBinaryOperator() {

        @Override
        public long applyAsLong(long left, long right) {
            return left + right;
        }
    }, 0);

LongAccumulator相比LongAdder 可以提供累加器初始非0值,后者只能默認為0,另外前者還可以指定累加規則,比如不是累加而相乘,只需要構造LongAccumulator 時候傳入自定義雙目運算器即可,后者則內置累加規則。

 

從下面代碼知道LongAccumulator相比於LongAdde的不同在於casBase的時候,后者傳遞的是b+x,而前者則是調用了r=function.applyAsLong(b=base.x)來計算。

LongAdder類的add源碼如下:

  public void add(long x) {
        Cell[] as; long b, v; int m; Cell a;
        if ((as = cells) != null || !casBase(b = base, b + x)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended = a.cas(v = a.value, v + x)))
                longAccumulate(x, null, uncontended);
        }
    }

 

LongAccumulator的accumulate方法的源碼如下:

  public void accumulate(long x) {
        Cell[] as; long b, v, r; int m; Cell a;
        if ((as = cells) != null ||
            (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) {
            boolean uncontended = true;
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[getProbe() & m]) == null ||
                !(uncontended =
                  (r = function.applyAsLong(v = a.value, x)) == v ||
                  a.cas(v, r)))
                longAccumulate(x, function, uncontended);
        }
    }

 

另外LongAccumulator調用longAccumulate時候傳遞的是function,而LongAdder是null,從下面代碼可以知道當fn為null,時候就是使用v+x  加法運算,這時候就等價於LongAdder,fn不為null的時候則使用傳遞的fn函數計算,如果fn為加法則等價於LongAdder;

else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x))))
       // Fall back on using base
      
break;
 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM