一.LongAdder原理
LongAdder類是JDK1.8新增的一個原子性操作類。AtomicLong通過CAS算法提供了非阻塞的原子性操作,相比受用阻塞算法的同步器來說性能已經很好了,但是JDK開發組並不滿足於此,因為非常搞並發的請求下AtomicLong的性能是不能讓人接受的。
如下AtomicLong 的incrementAndGet的代碼,雖然AtomicLong使用CAS算法,但是CAS失敗后還是通過無限循環的自旋鎖不多的嘗試,這就是高並發下CAS性能低下的原因所在。源碼如下:
public final long incrementAndGet() { for (;;) { long current = get(); long next = current + 1; if (compareAndSet(current, next)) return next; } }
在高並發下N多線程同時去操作一個變量會造成大量線程CAS失敗,然后處於自旋狀態,這樣導致大大浪費CPU資源,降低了並發性。
既然AtomicLong性能問題是由於過多線程同時去競爭同一個變量的更新而降低的,那么如果把一個變量分解為多個變量,讓同樣多的線程去競爭多個資源,那么性能問題不久迎刃而解了嗎?
沒錯,因此,JDK8 提供的LongAdder就是這個思路。下面通過圖形來標示兩者的不同,如下圖:
如上圖 AtomicLong 是多個線程同時競爭同一個變量情景。
如上圖所示,LongAdder則是內部維護多個Cell變量,每個Cell里面有一個初始值為0的long型變量,在同等並發量的情況下,爭奪單個變量的線程會減少,這是變相的減少了爭奪共享資源的並發量,另外多個線程在爭奪同一個原子變量時候,
如果失敗並不是自旋CAS重試,而是嘗試獲取其他原子變量的鎖,最后當獲取當前值時候是把所有變量的值累加后再加上base的值返回的。
LongAdder維護了要給延遲初始化的原子性更新數組和一個基值變量base數組的大小保持是2的N次方大小,數組表的下標使用每個線程的hashcode值的掩碼表示,數組里面的變量實體是Cell類型。
Cell 類型是Atomic的一個改進,用來減少緩存的爭用,對於大多數原子操作字節填充是浪費的,因為原子操作都是無規律的分散在內存中進行的,多個原子性操作彼此之間是沒有接觸的,但是原子性數組元素彼此相鄰存放將能經常共享緩存行,也就是偽共享。所以這在性能上是一個提升。
另外由於Cells占用內存是相對比較大的,所以一開始並不創建,而是在需要時候再創建,也就是惰性加載,當一開始沒有空間時候,所有的更新都是操作base變量。
接下來進行LongAdder代碼簡單分析
這里我只是簡單的介紹一下代碼的實現,詳細實現,大家可以翻看代碼去研究。為了降低高並發下多線程對一個變量CAS爭奪失敗后大量線程會自旋而造成降低並發性能問題,LongAdder內部通過根據並發請求量來維護多個Cell元素(一個動態的Cell數組)來分擔對單個變量進行爭奪資源。
首先我們先看LongAdder的構造類圖,如下圖:
可以看到LongAdder繼承自Striped64類,Striped64內部維護着三個變量,LongAdder的真實值其實就是base的值與Cell數組里面所有Cell元素值的累加,base是個基礎值,默認是0,cellBusy用來實現自旋鎖,當創建Cell元素或者擴容Cell數組時候用來進行線程間的同步。
接下來進去源碼如看Cell的構造,源碼如下:
@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe 技術 private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
正如上面的代碼可以知道Cell的構造很簡單,內部維護一個聲明volatile的變量,這里聲明為volatile是因為線程操作value變量時候沒有使用鎖,為了保證變量的內存可見性這里只有聲明為volatile。另外這里就是先前文件所說的使用Unsafe類的方法來設置value的值
接下來進入LongAdder的源碼里面去看幾個重要的方法,如下:
1.long sum() 方法:返回當前的值,內部操作是累加所有 Cell 內部的 value 的值后累加 base,如下代碼,由於計算總和時候沒有對 Cell 數組進行加鎖,所以在累加過程中可能有其它線程對 Cell 中的值進行了修改,也有可能數組進行了擴容,所以 sum 返回的值並不是非常精確的,
返回值並不是一個調用 sum 方法時候的一個原子快照值。
源碼如下:
public long sum() { Cell[] as = cells; Cell a; long sum = base; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) sum += a.value; } } return sum; }
2.void reset() 方法:重置操作,如下代碼把 base 置為 0,如果 Cell 數組有元素,則元素值重置為 0。源碼如下:
public void reset() { Cell[] as = cells; Cell a; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) a.value = 0L; } } }
3.long sumThenReset() 方法:是sum 的改造版本,如下代碼,在計算 sum 累加對應的 cell 值后,把當前 cell 的值重置為 0,base 重置為 0。 當多線程調用該方法時候會有問題,比如考慮第一個調用線程會清空 Cell 的值,后一個線程調用時候累加時候累加的都是 0 值。
源碼如下:
public long sumThenReset() { Cell[] as = cells; Cell a; long sum = base; base = 0L; if (as != null) { for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) { sum += a.value; a.value = 0L; } } } return sum; }
4.long longValue() 等價於 sum(),源碼如下:
public long longValue() { return this.sum(); }
5.void add(long x) 累加增量 x 到原子變量,這個過程是原子性的。源碼如下:
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) {//(1) boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 ||//(2) (a = as[getProbe() & m]) == null ||//(3) !(uncontended = a.cas(v = a.value, v + x)))//(4) longAccumulate(x, null, uncontended);//(5) } } final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); }
可以看到上面代碼,當第一個線程A執行add時候,代碼(1)會執行casBase方法,通過CAS設置base為 X, 如果成功則直接返回,這時候base的值為1。
假如多個線程同時執行add時候,同時執行到casBase則只有一個線程A成功返回,其他線程由於CAS失敗執行代碼(2),代碼(2)是獲取cells數組的長度,如果數組長度為0,則執行代碼(5),否則cells長度不為0,說明cells數組有元素則執行代碼(3),
代碼(3)首先計算當前線程在數組中下標,然后獲取當前線程對應的cell值,如果獲取到則執行(4)進行CAS操作,CAS失敗則執行代碼(5)。
代碼(5)里面是具體進行數組擴充和初始化,這個代碼比較復雜,這里就不講解了,有興趣的可以進去看看。
二.LongAccumulator類源碼分析
LongAdder類是LongAccumulator的一個特例,LongAccumulator提供了比LongAdder更強大的功能,如下構造函數,其中accumulatorFunction是一個雙目運算器接口,根據輸入的兩個參數返回一個計算值,identity則是LongAccumulator累加器的初始值。
public LongAccumulator(LongBinaryOperator accumulatorFunction,long identity) { this.function = accumulatorFunction; base = this.identity = identity; } public interface LongBinaryOperator { //根據兩個參數計算返回一個值 long applyAsLong(long left, long right); }
上面提到LongAdder 其實就是LongAccumulator 的一個特例,調用LongAdder 相當使用下面的方式調用 LongAccumulator。
LongAdder adder = new LongAdder(); LongAccumulator accumulator = new LongAccumulator(new LongBinaryOperator() { @Override public long applyAsLong(long left, long right) { return left + right; } }, 0);
LongAccumulator相比LongAdder 可以提供累加器初始非0值,后者只能默認為0,另外前者還可以指定累加規則,比如不是累加而相乘,只需要構造LongAccumulator 時候傳入自定義雙目運算器即可,后者則內置累加規則。
從下面代碼知道LongAccumulator相比於LongAdde的不同在於casBase的時候,后者傳遞的是b+x,而前者則是調用了r=function.applyAsLong(b=base.x)來計算。
LongAdder類的add源碼如下:
public void add(long x) { Cell[] as; long b, v; int m; Cell a; if ((as = cells) != null || !casBase(b = base, b + x)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = a.cas(v = a.value, v + x))) longAccumulate(x, null, uncontended); } }
LongAccumulator的accumulate方法的源碼如下:
public void accumulate(long x) { Cell[] as; long b, v, r; int m; Cell a; if ((as = cells) != null || (r = function.applyAsLong(b = base, x)) != b && !casBase(b, r)) { boolean uncontended = true; if (as == null || (m = as.length - 1) < 0 || (a = as[getProbe() & m]) == null || !(uncontended = (r = function.applyAsLong(v = a.value, x)) == v || a.cas(v, r))) longAccumulate(x, function, uncontended); } }
另外LongAccumulator調用longAccumulate時候傳遞的是function,而LongAdder是null,從下面代碼可以知道當fn為null,時候就是使用v+x 加法運算,這時候就等價於LongAdder,fn不為null的時候則使用傳遞的fn函數計算,如果fn為加法則等價於LongAdder;
else if (casBase(v = base, ((fn == null) ? v + x :fn.applyAsLong(v, x)))) // Fall back on using base
break;