Striped64原理
通過前面的幾章關於原子類的同步數據結構分析,我們知道Java並發包提供的原子類都是采用volatile+CAS機制實現的,這種輕量級的實現方式比傳統的synchronize一般來說更加高效,但是在高並發下依然會導致CAS操作的大量競爭失敗自旋重試,這時候對性能的影響說不定還不如使用synchronize,幸運的是,從JDK8開始Java並發包新增了抽象類Striped64以及它的擴展類 LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator解決了高並發下的累加問題。
我們知道普通的原子類例如AtomicLong等,它們內部都維護了一個volatile類型的變量用於保存對應的值,由於所有的操作都最終要通過CAS作用到這個變量上,所以在高並發環境下競爭是無可避免的,而Striped64的原理很簡單,Striped64不再使用單個變量保存結果,而是包含一個基礎值base和一個單元哈希表cells其實就是一個數組。沒有競爭的情況下,要累加的數會累加到這個基礎值上;如果有競爭的話,會通過內部的分散計算將要累加的數累加到單元哈希表中的某個單元里面。所以整個Striped64的值包括基礎值和單元哈希表中所有單元的值的總和。顯然Striped64是一種以空間換時間的解決方案。
前面關於CPU緩存行“偽共享”和ThreadLocalRandom的分析其實都是為本文鋪路,因為這里都會用到這些知識。
Striped64源碼分析
先看看基本結構:
1 @SuppressWarnings("serial") 2 abstract class Striped64 extends Number { 3 4 //存放Cell的哈希表,大小為2的冪 5 transient volatile Cell[] cells; 6 7 //基礎值, 主要時當沒有競爭是直接更新這個值, 但也可以作為哈希表初始化競爭失敗的回退方案 8 //通過CAS的方式更新 9 transient volatile long base; 10 11 //自旋鎖(通過CAS方式),用於當需要擴展數組的容量或創建一個數組中的元素時加鎖. 12 transient volatile int cellsBusy; 13 //下面是Cell的結構,就是數組中每個元素的結構 14 //這其實就是一個AtomicLong的變種,還使用了Contended注解解決偽共享的問題 15 @sun.misc.Contended static final class Cell { 16 volatile long value; 17 Cell(long x) { value = x; }//構造方法 18 final boolean cas(long cmp, long val) {//CAS更新方法 19 return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); 20 } 21 22 // Unsafe mechanics 23 private static final sun.misc.Unsafe UNSAFE; 24 private static final long valueOffset; 25 static { 26 try { 27 UNSAFE = sun.misc.Unsafe.getUnsafe(); 28 Class<?> ak = Cell.class; 29 valueOffset = UNSAFE.objectFieldOffset 30 (ak.getDeclaredField("value")); 31 } catch (Exception e) { 32 throw new Error(e); 33 } 34 } 35 } 36 .... 37 }
從以上的結構看出,Striped64內部維護了一個基礎值base,一個存放高競爭時的分散哈希表,即數組cells,數組存放的元素類型Cell是一個類似AtomicLong的變種也是原子類型,另外還有一個自旋鎖標記cellsBusy,只用於對數組進行擴容或者創建一個新元素的時候加鎖。另外Cell類被Contened注解解決了偽共享的問題,這是因為數組中的元素更傾向於彼此相鄰的存放,因此將可能共享緩存行這將會對性能造成巨大的副作用。
Striped64主要提供了longAccumulate和doubleAccumulate方法來支持子類,這兩個方法也是Striped64最核心的方法,先看下longAccumulate:
1 final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { 2 int h; 3 if ((h = getProbe()) == 0) { //獲取當前線程的probe值作為hash值。 4 ThreadLocalRandom.current(); //如果probe值為0,強制初始化當前線程的probe值,這次初始化的probe值不會為0。 5 h = getProbe(); //再次獲取probe值作為hash值。 6 wasUncontended = true; //重新計算了hash值之后,將未競爭標識為true 7 } 8 boolean collide = false; // True if last slot nonempty 9 for (;;) { //CAS的標志性方式 10 Cell[] as; Cell a; int n; long v; 11 if ((as = cells) != null && (n = as.length) > 0) {//哈希表已經初始化過了 12 //通過(n - 1) & h 來定位當前線程被分散到的Cell數組中的位置 13 if ((a = as[(n - 1) & h]) == null) { //如果當前位置是空 14 if (cellsBusy == 0) { //並且自旋鎖標記為空閑 15 Cell r = new Cell(x); 16 if (cellsBusy == 0 && casCellsBusy()) { 17 //成功獲取自旋鎖標記之后, 18 boolean created = false; 19 try { 20 Cell[] rs; int m, j; 21 //再次檢查該位置是否為空 22 if ((rs = cells) != null && 23 (m = rs.length) > 0 && 24 rs[j = (m - 1) & h] == null) { 25 rs[j] = r; //將新建的代表x的cell放到指定位置。 26 created = true; 27 } 28 } finally { 29 cellsBusy = 0;//釋放cellsBusy鎖。 30 } 31 if (created) 32 break; //如果創建成功,直接跳出循環,退出方法。 33 continue; //說明上面指定的cell的位置上有cell了,繼續嘗試。 34 } 35 } 36 collide = false; //走到這里說明獲取cellsBusy鎖失敗 37 } 38 //到這里說明上面通過h選定的cell表的位置上已經有Cell了, 39 else if (!wasUncontended) // CAS already known to fail 40 //如果之前的CAS失敗,說明已經發生競爭, 41 //這里會設置未競爭標志位true,然后進入advanceProbe產生新的probe值,然后重試。 42 wasUncontended = true; // Continue after rehash 43 else if (a.cas(v = a.value, ((fn == null) ? v + x : 44 fn.applyAsLong(v, x)))) //如果還未發生競爭,則嘗試將x累加到該位置(a)上 45 //成功加x累加到該位置(a)上,退出方法,結束。 46 break; 47 else if (n >= NCPU || cells != as) //到這里說明該位置不為空,但是嘗試累加到該位置上失敗, 48 //如果哈希表即數組cells已經到最大或數組發生了變化 49 //這里設置沖突collide為false,然后進入advanceProbe產生新的probe值,然后重試。 50 collide = false; // At max size or stale 51 else if (!collide) 52 collide = true; //設置沖突標志,表示發生了沖突,重試。 53 else 54 //到這里說明該位置不為空,但是嘗試累加到該位置上失敗,並且數組的容量還未到最大值,數組也沒有發生變化,但是發生了沖突 55 if (cellsBusy == 0 && casCellsBusy()) { //嘗試獲取cellsBusy鎖。 56 57 try { 58 if (cells == as) { //再次確認數組無變化 59 //對數組進行擴容 60 Cell[] rs = new Cell[n << 1]; 61 for (int i = 0; i < n; ++i) 62 rs[i] = as[i]; 63 cells = rs; 64 } 65 } finally { 66 cellsBusy = 0;//釋放cellsBusy鎖。 67 } 68 collide = false; 69 continue; //擴容哈希表后,然后重試。 70 } 71 h = advanceProbe(h); //重新計算新的probe值以對應到不同的下標元素,然后重試。 72 }else if (cellsBusy == 0 && cells == as && casCellsBusy()) { 73 //哈希表還未創建,嘗試獲取cellsBusy鎖,成功 74 boolean init = false; 75 try { // Initialize table 76 if (cells == as) { 77 //初始化哈希表cells,初始容量為2 78 Cell[] rs = new Cell[2]; 79 rs[h & 1] = new Cell(x);//將當前操作數x放進該數組中 80 cells = rs; 81 init = true; 82 } 83 } finally { 84 cellsBusy = 0;//釋放cellsBusy鎖 85 } 86 if (init) 87 break; //初始化表成功,退出方法,結束。 88 } 89 //如果創建哈希表由於競爭導致失敗,嘗試將x累加到base上。 90 else if (casBase(v = base, ((fn == null) ? v + x : 91 fn.applyAsLong(v, x)))) // Fall back on using base 92 break; //成功累加到base上,退出方法,結束 93 } 94 }
以上方法的邏輯稍顯復雜,我簡單概括如下:
1. 在第一個if語句塊中,判斷當前線程的哈希探針值是否初始化,只有當前線程第一個進入這個方法的時候才是未初始化的(這種延遲初始化在上一文的ThreadLocalRandom中已經介紹過), 如果未初始化,執行ThreadLocalRandom.current(),該方法在上一文ThreadLocalRandom的分析中已經分析過了,它只要就是原子的初始化當前線程的哈希探針值和種子,它保證了不同的線程將具有不同的初始值,這也為后面進行取模映射時將不同線程盡量映射到不同的數組下標減少沖突,提高CAS的成功率從而提高並發效率提高了基礎。重新獲取到新的哈希值h之后,就要進入下面的核心代碼塊了。這里的wasUncontended表示之前沒有發生CAS競爭失敗,一般是為了當wasUncontended為false時重新產生哈希值從而重試定位不到不同的數組下標進行累加。這里已經產生了新的哈希值,所以就將wasUncontended設為了true。
2. 進入 核心代碼塊 for (;;),這是CAS機制的標志性使用方式,它的邏輯如下:
1 1. if 該哈希表即數組已經初始化過了 2 1.1 if 映射到的槽位(下標)是空的,即還沒有放置過元素 3 1.1.1 if 鎖空閑,加鎖后再次判斷,如果該槽位仍然是空的,初始化cell並放到該槽。成功后退出。 4 1.1.2 鎖已經被占用了,設置collide為false,會導致重新產生哈希重試。 5 1.2 else if (槽不為空)在槽上之前的CAS已經失敗,刷新哈希重試。 6 1.3 else if (槽不為空、且之前的CAS沒失敗,)在此槽的cell上嘗試更新,成功退出。 7 1.4 else if 表已達到容量上限或被擴容了,刷新哈希重試。 8 1.5 else if 如果不存在沖突,則設置為存在沖突,刷新哈希重試。 9 1.6 else if 如果成功獲取到鎖,則擴容。 10 1.7 else 刷新哈希值,嘗試其他槽。 11 2. else if (表未初始化)鎖空閑,且數組無變化,且成功獲取到鎖: 12 2.1 初始化哈希表的大小為2,根據取模(h & 1)將需累加的參數x放到對應的下標中。釋放鎖。 13 3. else if (表未初始化,鎖不空)嘗試直接在base上更新,成功返回,失敗回到步驟1重試。
LongAdder和DoubleAdder
上面對 Striped64的原理進行了分析,下面要理解它的子類就非常簡單了,Striped64的子類主要有LongAdder、LongAccumulator、DoubleAdder、DoubleAccumulator,它們常用於狀態采集、統計等場景。AtomicLong/AtomicDouble也可以用於這種場景,但在線程競爭激烈的情況下,LongAdder/DoubleAdder要比AtomicLong/AtomicDouble擁有更高的吞吐量,但會耗費更多的內存空間。
LongAdder很簡單,其中它的add方法最重要:
1 public class LongAdder extends Striped64 implements Serializable { 2 public LongAdder() { //只有一個無參構造方法 3 } 4 public void add(long x) { 5 Cell[] as; long b, v; int m; Cell a; 6 if ((as = cells) != null || !casBase(b = base, b + x)) { 7 boolean uncontended = true; 8 if (as == null || (m = as.length - 1) < 0 || 9 (a = as[getProbe() & m]) == null || 10 !(uncontended = a.cas(v = a.value, v + x))) 11 longAccumulate(x, null, uncontended); 12 } 13 }
- 這里如果是第一次執行getProbe(),返回值肯定為0,因為沒有通過ThreadLocalRandom.current()初始化哈希探針值,所以第一次將會在0槽位進行嘗試。
- 如果哈希表初始化之后,以后每一次都不會再在base上嘗試累加了,那么能不能將第一個if判斷中的條件進行調換呢?比如,先做casBase的判斷?結果是不調換可能更好,調換后每次都要CAS一下,在高並發時,失敗幾率非常高,並且是惡性循環,比起一次判斷,后者的開銷明顯小很多,還沒有副作用。因此,不調換可能會更好。
LongAdder的其他方法很簡單,列舉如下:
1 public void increment() //累加1 2 public void decrement() //減1 3 public long sum() //求和,即base值加上每個cell的值。 4 public void reset() //重置方法,將base和cells中的元素都置為0。 5 public long sumThenReset() //先求和,再重置 6 下面這幾個方法是重寫的父類Number的方法。 7 public long longValue() //求和 8 public int intValue() //求和之后強轉int 9 public float floatValue() //求和之后強轉float 10 public double doubleValue() //求和之后強轉double 11 還有序列號相關的方法....
DoubleAdder的實現與LongAdder一樣, 它是利用的父類Striped64的doubleAccumulate方法,只不過使用了Double.doubleToRawLongBits(double)和Double.longBitsToDouble(long)方法在double和longBits數據之間轉換,另外DoubleAdder也沒有像increment()/decrement()這種加減1的方法,因為它一般不是操作的整數,也就不必要了。
LongAccumulator和DoubleAccumulator以及它們的局限性
LongAccumulator和DoubleAccumulator的構造方法與LongAdder/DoubleAdder不同,LongAdder/DoubleAdder只有一個無參的構造方法,不能指定初始值,而它們的構造方法有兩個參數,第一個參數是一個需要被實現累加邏輯的函數接口,第二個參數就是初始值。
LongAccumulator/DoubleAccumulator的使用有很大的局限性,根據JDK8的doc描述:
The order of accumulation within or across threads is not guaranteed and cannot be depended upon, so this class is only applicable to functions for which the order of accumulation does not matter. The supplied accumulator function should be side-effect-free, since it may be re-applied when attempted updates fail due to contention among threads. The function is applied with the current value as its first argument, and the given update as the second argument.
也就是說,線程之間的累加順序無法保證,也不應該被依賴,它們僅僅適用於對累加順序不敏感的累加操作,構造方法的第一個參數指定的累加函數必須是無副作用的,例如(v*2+x)這樣的累加函數就不適用在這里,其實這很好理解,Striped64的思想是分散的將要累加的數據放到一個哈希表里面,當執行(v*2+x)這樣的函數時,第一個參數v要么是0(空槽位),要么是base(無競爭),要么是某個槽位中的值(無競爭的CAS某個非空槽位時),v不是當前數據的總和,而是根據線程的不同執行順序而對應到不同的值,所以它的計算結果也將發生偏差,所以上面的Doc中對於第一個參數的描述也是錯誤的(它描述的第一個參數是當前值),在分散計算的時候,第一個參數並不是累加器的當前值。
下面的例子是來至stackoverflow,很好的詮釋了這個局限性:
1 public static void main(String[] args) throws InterruptedException { 2 LongBinaryOperator op = (x, y) -> 2 * x + y; 3 LongAccumulator accumulator = new LongAccumulator(op, 1L); 4 5 ExecutorService executor = Executors.newFixedThreadPool(2); 6 7 //產生【0,9)的10個數字,用兩個線程去執行累加這10個數 8 IntStream.range(0, 10) 9 .forEach(i -> executor.submit(() -> accumulator.accumulate(i))); 10 11 stop(executor); 12 13 System.out.format("Add: %d\n", accumulator.getThenReset()); 14 } 15 16 public static void stop(ExecutorService executor) { 17 try { 18 executor.shutdown(); 19 executor.awaitTermination(60, TimeUnit.SECONDS); 20 } 21 catch (InterruptedException e) { 22 System.err.println("termination interrupted"); 23 } 24 finally { 25 if (!executor.isTerminated()) { 26 System.err.println("killing non-finished tasks"); 27 } 28 executor.shutdownNow(); 29 } 30 }
上例中的結果在多次運行情況下,結果是不一致的,一會是2539,一會是2037,一會是3050等等值,這就是因為運算函數 2 * x + y是對順序敏感的,第一個參數x的值是與線程執行順序相關的。這個問題被提交為一個JDK的bug,聽說在JDK10中被修復了,但經過我查看JDK10的源碼,其實JDK10中的LongAccumulator和DoubleAccumulator的邏輯並沒有有任何更改,JDK的開發者僅僅是修改了其Java Doc, 更加明確了它的使用局限性,並對第一個參數的不准確描述進行了修正:
JDK10的Doc修正
For predictable results, the accumulator function should be associative and commutative. The function is applied with an existing value (or identity) as one argument, and a given update as the other argument.
對於可預測的結果,累加器函數應該是可交換的,第一個參數是一個存在的中間值或者基礎值。由此可見,對 LongAccumulator和DoubleAccumulator的使用有着很大的局限性,這直到JDK10都一樣。所以使用的時候一定要注意。
LongAdder/DoubleAdder其實是LongAccumulator和DoubleAccumulator的特例,當第一個參數的累加函數式 x+y,第二個參數是0的時候,LongAccumulator和DoubleAccumulator就等價於LongAdder/DoubleAdder。
LongAccumulator和DoubleAccumulator還提供了獲取結果(get(),getThenReset)和重置(reset())等方法,就不再一一介紹了,關鍵是要對它們的使用局限性要明白,不要亂用。
總結
本文首先了分析了Striped64的分散計算方式解決了高競爭的累加問題,然而它是一個抽象類無法直接使用,我們只有使用它的實現類,或自己實現。它的實現類中LongAdder/DoubleAdder是初始值為0,只能進行累加/減1的簡單累加器,常用於狀態采集、統計等場景。LongAccumulator和DoubleAccumulator雖然比LongAdder/DoubleAdder更加強大,能夠指定初始值和計算函數,但是由於其不能依賴執行順序和必須是無副作用的函數局限性,所以使用起來也必須要非常小心。
最后如要問是否可以拋棄AtomicLong、AtomicDouble,直接使用LongAdder/DoubleAdder或LongAccumulator/DoubleAccumulator,我認為不能。首先,其實在非高並發的情況下,它們的執行效率相差不大,但是AtomicLong/AtomicDouble提供的方法更豐富,使用起來更方便,而Striped64的實現類們不但方法少,而且由於解決“偽共享”的問題可能導致空間消耗大。其次,它們的使用場景不一樣,AtomicLong/AtomicDouble適用於復雜的細粒度的同步控制,而Striped64的實現類們更多地用於邏輯簡單的收集統計數據。