LongAdder是JDK1.8在java.util.concurrent.atomic包下新引入的 為了 高並發下實現高性能統計的類。
1.背景
AtomicLong是在高並發下對單一變量進行CAS操作,從而保證其原子性。
public final long getAndAdd(long delta) {
return unsafe.getAndAddLong(this, valueOffset, delta);
}
在Unsafe類中,如果有多個線程進入,只有一個線程能成功CAS,其他線程都失敗。失敗的線程會重復進行下一輪的CAS,但是下一輪還是只有一個線程成功。
public final long getAndAddLong(Object o, long offset, long delta) {
long v;
do {
v= this.getLongVolatile(o,offset);
} while(!this.compareAndSwapLong(o,offset, v, v+delta));
return v;
}
即在高並發下,AtomicLong的性能會越來越差勁。
因此,引入了替代方案,LongAdder。
2.LongAdder
LongAdder是一種以空間換時間的解決方案。其內部維護了一個值base,和一個cell數組,當線程寫base有沖突時,將其寫入數組的一個cell中。將base和所有cell中的值求和就得到最終LongAdder的值了。
Method sum() (or, equivalently, longValue()) returns the current total combined across the variables maintaining the sum.

public long longValue() {
return sum();
}
public long sum() {
Cell[] as = cells; Cell a;
long sum = base;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
3.Striped64內部結構
LongAdder類繼承了Striped64類,其中,class Striped64維護有有 Cell的內部類,Base,Cell數組等相關成員變量。
NCPU:表示當前計算機CPU數量,用於控制cells數組長度。因為一個CPU同一時間只能執行一個線程,如果cells數組長度 大於 CPU數量,並不能提高並發數,且造成空間的浪費。
cells:存放Cell的數組。
base:在沒有發生過競爭時,數據會累加到base上。 或者,當cells擴容時,是需要將數據寫到base中的。
cellsBusy:鎖。0表示無鎖狀態,1表示其他線程已經持有鎖。初始化cells,創建Cell,擴容cells都需要獲取鎖。
@sun.misc.Contended static final class Cell {
volatile long value;
Cell(long x) { value = x; }
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset; // 當前value基於當前對象的內存偏移
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
//表示當前計算機CPU數量,控制cells數組長度
static final int NCPU = Runtime.getRuntime().availableProcessors();
transient volatile Cell[] cells;
transient volatile long base; //在沒有發生過競爭時,數據會累加到base上, 或者 當cells擴容時,需要將數據寫到base中
transient volatile int cellsBusy; // 初始化cells或者擴容cells都需要獲取鎖,0表示無鎖狀態,1表示其他線程已經持有鎖
4.LongAdder的add方法解析
add(long x):加上給定的x。
1.一開始只加給base,那么此時cells一定沒有初始化,此時只會casBase,成功則返回。
2.casBase失敗,意味着多線程寫base發生競爭,進入longAccumulate(x, null, uncontended = true)重試或者初始化cells。
3.如果cells已經初始化過了,但是,當前線程對應下標的cell為空,需要創建。進入longAccumulate(x, null, uncontended = true)創建對應cell。
4.如果cells已經初始化過了,同時,當前線程對應的cell 不為空,cas給當前cell賦值,成功則返回。失敗,意味着當前線程對應的cell 有競爭,進入longAccumulate(x, null, uncontended = false) 重試或者擴容cells。
public void add(long x) {
//as 表示cells引用
//b 表示獲取的base值
//v 表示 期望值
//m 表示 cells 數組的長度
//a 表示當前線程命中的cell單元格
Cell[] as; long b, v; int m; Cell a;
//條件一:true->表示cells已經初始化過了,當前線程應該將數據寫入到對應的cell中
// false->表示cells未初始化,當前所有線程應該將數據寫到base中
//條件二:false->表示當前線程cas替換數據成功,
// true->表示發生競爭了,可能需要重試 或者 擴容
if ((as = cells) != null || !casBase(b = base, b + x)) {
//什么時候會進來?
//1.true->表示cells已經初始化過了,當前線程應該將數據寫入到對應的cell中
//2.true->表示發生競爭了,可能需要重試 或者 擴容
boolean uncontended = true; //true -> 未競爭 false->發生競爭
//條件一:true->說明 cells 未初始化,也就是多線程寫base發生競爭了
// false->說明 cells 已經初始化了,當前線程應該是 找自己的cell 寫值
//條件二:getProbe() 獲取當前線程的hash值 m表示cells長度-1 cells長度 一定是2的次方數 15= b1111
// true-> 說明當前線程對應下標的cell為空,需要創建 longAccumulate 支持
// false-> 說明當前線程對應的cell 不為空,說明 下一步想要將x值 添加到cell中。
//條件三:true->表示cas失敗,意味着當前線程對應的cell 有競爭
// false->表示cas成功
if (as == null || (m = as.length - 1) < 0 ||
(a = as[getProbe() & m]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
//都有哪些情況會調用?
//1.true->說明 cells 未初始化,也就是多線程寫base發生競爭了[重試|初始化cells]
//2.true-> 說明當前線程對應下標的cell為空,需要創建 longAccumulate 支持
//3.true->表示cas失敗,意味着當前線程對應的cell 有競爭[重試|擴容]
longAccumulate(x, null, uncontended);
}
}
5.Striped64的longAccumulate方法解析
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended)
根據LongAdder的add方法可知,參數x是add函數的傳入參數,即要增加的數;
LongBinaryOperator是一個接口可擴展,重寫applyAsLong方法用於處理cell中值與參數x的關系,此處傳null;
wasUncontended只有在 【cells已經初始化過了,同時,當前線程對應的cell 不為空,cas給當前cell賦值,競爭修改失敗】的情況下為false,其他為true。
第一種情況:寫base發生競爭,此時cells沒有初始化,所以才會寫到base,不走CASE1;
走Case2,判斷有沒有鎖,沒有鎖的話,嘗試加鎖,成功加鎖后執行初始化cells的邏輯。如果沒有拿到鎖,表示其它線程正在初始化cells,所以當前線程將值累加到base。
第二種情況:當前線程對應下標的cell為空,滿足CASE1,到達CASE1.1中,創建一個Cell,加鎖,如果成功,對應的位置其他線程沒有設置過cell,將創建的cell插入相應位置。
第三種情況:當前線程對應下標的cell已經創建成功,但寫入cell時發生競爭,到達CASE1.2,wasUncontended = true,把發生競爭線程的hash值rehash。
重置后走若CASE1.1,CASE1.2均不滿足,到達CASE1.3【當前線程rehash過hash值,然后新命中的cell不為空】重試cas賦值+x一次,成功則退出。失敗,擴容意向設置成true,rehash當前線程的hash值,再到1.3重試,還失敗走CASE1.6擴容。
注意:CASE1.4要求cells數組長度不能超過cpu數量,因為一個CPU同一時間只能執行一個線程,如果cells數組長度 大於 CPU數量,並不能提高並發數,且造成空間的浪費。
final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) {
//h 表示線程hash值
int h;
//條件成立:說明當前線程 還未分配hash值; getProbe()獲取當前線程的Hash值
if ((h = getProbe()) == 0) {
//給當前線程分配hash值
ThreadLocalRandom.current(); // force initialization
//取出當前線程的hash值 賦值給h
h = getProbe();
//為什么? 因為默認情況下 當前線程hash為0, 肯定是寫入到了 cells[0] 位置。 不把它當做一次真正的競爭
wasUncontended = true;
}
//表示擴容意向 false 一定不會擴容,true 可能會擴容。
boolean collide = false; // True if last slot nonempty
//自旋
for (;;) {
//as 表示cells引用
//a 表示當前線程命中的cell
//n 表示cells數組長度
//v 表示 期望值
Cell[] as; Cell a; int n; long v;
//CASE1: 表示cells已經初始化了,當前線程應該將數據寫入到對應的cell中
if ((as = cells) != null && (n = as.length) > 0) {
// 以下兩種情況會進入Case1:
//2.true-> 說明當前線程對應下標的cell為空,需要創建 longAccumulate 支持
//3.true->表示cas失敗,意味着當前線程對應的cell 有競爭[重試|擴容]
//CASE1.1:true->表示當前線程對應的下標位置的cell為null,需要創建new Cell
if ((a = as[(n - 1) & h]) == null) {
//true->表示當前鎖 未被占用 false->表示鎖被占用
if (cellsBusy == 0) { // Try to attach new Cell
//拿當前的x創建Cell
Cell r = new Cell(x); // Optimistically create
//條件一:true->表示當前鎖 未被占用 false->表示鎖被占用
//條件二:true->表示當前線程獲取鎖成功 false->當前線程獲取鎖失敗..
if (cellsBusy == 0 && casCellsBusy()) {
//是否創建成功 標記
boolean created = false;
try { // Recheck under lock
//rs 表示當前cells 引用
//m 表示cells長度
//j 表示當前線程命中的下標
Cell[] rs; int m, j;
//條件一 條件二 恆成立
//rs[j = (m - 1) & h] == null 為了防止其它線程初始化過該位置,然后當前線程再次初始化該位置
//導致丟失數據
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
//擴容意向 強制改為了false
collide = false;
}
// CASE1.2:
// wasUncontended:只有cells初始化之后,並且當前線程 競爭修改失敗,才會是false
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
//CASE 1.3:當前線程rehash過hash值,然后新命中的cell不為空
//true -> 寫成功,退出循環
//false -> 表示rehash之后命中的新的cell 也有競爭 重試1次 再重試1次
else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break;
//CASE 1.4:
//條件一:n >= NCPU true->擴容意向 改為false,表示不擴容了 false-> 說明cells數組還可以擴容
//條件二:cells != as true->其它線程已經擴容過了,當前線程rehash之后重試即可
else if (n >= NCPU || cells != as)
//擴容意向 改為false,表示不擴容了
collide = false; // At max size or stale
//CASE 1.5:
//!collide = true 設置擴容意向 為true 但是不一定真的發生擴容
else if (!collide)
collide = true;
//CASE 1.6:真正擴容的邏輯
//條件一:cellsBusy == 0 true->表示當前無鎖狀態,當前線程可以去競爭這把鎖
//條件二:casCellsBusy true->表示當前線程 獲取鎖 成功,可以執行擴容邏輯
// false->表示當前時刻有其它線程在做擴容相關的操作。
else if (cellsBusy == 0 && casCellsBusy()) {
try {
//cells == as
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
//釋放鎖
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
//重置當前線程Hash值
h = advanceProbe(h);
}
//CASE2:前置條件cells還未初始化 as 為null
//條件一:true 表示當前未加鎖
//條件二:cells == as?因為其它線程可能會在你給as賦值之后修改了 cells
//條件三:true 表示獲取鎖成功 會把cellsBusy = 1,false 表示其它線程正在持有這把鎖
else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
boolean init = false;
try { // Initialize table
//cells == as? 防止其它線程已經初始化了,當前線程再次初始化 導致丟失數據
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
//CASE3:
//1.當前cellsBusy加鎖狀態,表示其它線程正在初始化cells,所以當前線程將值累加到base
//2.cells被其它線程初始化后,當前線程需要將數據累加到base
else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x))))
break; // Fall back on using base
}
}
6.總結
官方文檔是這樣介紹的
This class is usually preferable to AtomicLong when multiple threads update a common sum that is used for purposes such as collecting statistics, not for fine-grained synchronization control. Under low update contention, the two classes have similar characteristics. But under high contention, expected throughput of this class is significantly higher, at the expense of higher space consumption.
LongAdder在多個線程更新一個用於收集統計信息的而不是追求同步的公共和的情況下,是優於AtomicLong類的。在並發度小,低競爭情況下,兩個類具有相似的性能。但是在高爭用情況下,LongAdder的預期吞吐量要高得多,代價是更高的空間消耗。
最后,我們再來看一下sum方法的注釋
Returns the current sum. The returned value is NOT an atomic snapshot; invocation in the absence of concurrent updates returns an accurate result, but concurrent updates that occur while the sum is being calculated might not be incorporated.
sum方法返回值只是一個接近值,並不是一個准確值。它在計算總和時,並發的更新並不會被合並在內。
總結:
- LongAdder是一種以空間換時間的解決方案,其在高並發,競爭大的情況下性能更優。
- 但是,sum方法拿到的只是接近值,追求最終一致性。如果業務場景追求高精度,高准確性,用AtomicLong。