概述
由於ConcurrentHashMap是一個高並發的集合,集合中增刪就比較頻繁,那計數就變成了一個問題,如果使用像AtomicInteger這樣類型的變量來計數,雖然可以保證原子性,但是太多線程去競爭CAS,自旋也挺浪費時間的,所以ConcurrentHashMap使用了一種類似LongAddr的數據結構去計數,其實LongAddr是繼承Striped64,有關於這個類的原理大家可以參考這篇文章:並發之STRIPED64(累加器)和 LONGADDER,大家了解了這個類的原理,理解ConcurrentHashMap計數就沒有一點壓力了,因為兩者在代碼實現上基本一樣。
ConcurrentHashMap計數原理
private transient volatile long baseCount; private transient volatile CounterCell[] counterCells; @sun.misc.Contended static final class CounterCell { volatile long value; CounterCell(long x) { value = x; } }
ConcurrentHashMap就是依托上面三個東東進行計數的,那下面就詳細解釋一下這三個東東。
- baseCount:最基礎的計數,比如只有一個線程put操作,只需要通過CAS修改baseCount就可以了。
- counterCells:這是一個數組,里面放着CounterCell對象,這個類里面就一個屬性,其使用方法是,在高並發的時候,多個線程都要進行計數,每個線程有一個探針hash值,通過這個hash值定位到數組桶的位置,如果這個位置有值就通過CAS修改CounterCell的value(如果修改失敗,就換一個再試),如果沒有,就創建一個CounterCell對象。
- 最后通過把桶中的所有對象的value值和baseCount求和得到總值,代碼如下。
final long sumCount() { CounterCell[] as = counterCells; CounterCell a; //baseCount作為基礎值 long sum = baseCount; if (as != null) { //遍歷數組 for (int i = 0; i < as.length; ++i) { if ((a = as[i]) != null) //對每個value累加 sum += a.value; } } return sum; }
通過上面的分析,相信大家已經了解了高並發計數的方法,在上面的介紹中提到一點,每個線程的探針hash值,大家先有個印象,一會分析代碼的時候會使用這個,其實這個值很有趣。
addCount()方法
又到了這個方法,在上篇文章中分析擴容的時候也分析過這個方法,不過分析的是一部分,現在分析另一部分。
private final void addCount(long x, int check) { CounterCell[] as; long b, s; //如果數組還沒有創建,就直接對baseCount進行CAS操作 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { CounterCell a; long v; int m; //設置沒有沖突標志位為true boolean uncontended = true; //第一個條件as == null成立說明,counterCells數組沒有創建,而且通過CAS修改baseCount失敗,說明有別的線程競爭CAS //a = as[ThreadLocalRandom.getProbe() & m]) == null,說明數組是創建了,但是通過探針hash定位的桶中沒有對象 //如果有對象,執行最后一個,進行CAS修改CounterCell對象,如果也失敗了,就要進入下面的方法 if (as == null || (m = as.length - 1) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { //進行更精確的處理 fullAddCount(x, uncontended); return; } if (check <= 1) return; s = sumCount(); } //省略部分代碼 }
上面整體過程還是很好理解的,就是我在上面介紹計數原理中說的步驟,但是上面有一個地方需要注意下就是:
a = as[ThreadLocalRandom.getProbe() & m]) == null
這里的ThreadLocalRandom.getProbe(),看着名字大家應該可以猜到應該是要獲取一個隨機值,因為有Random嘛,其實這里就是在獲取一個隨機值。那既然是要獲取一個隨機值,為什么要使用ThreadLocalRandom,而不直接使用Random,那看到ThreadLocal,大家也應該可以想到就是這個隨機值的獲取,線程之間是隔離的,每個線程獲取自己的隨機值,互相之間沒有影響,為什么要互相之間沒有影響呢?因為Random要實現隨機,有一個關鍵的東西就是種子(seed),具體過程如下:
- 初始化Random的時候,如果沒有傳seed,就根據時間戳進行一些計算初始化一個種子
- 如果某個線程需要獲取隨機數,先通過CAS更新種子seed1 = function1(seed)
- 根據seed1,計算隨機數 = function2(seed1)
- 上面的兩個function是固定的,就是說如果初始種子一樣,兩個不同的Random對象生成隨機數會完全一樣
上面的過程咋一看沒啥問題,其實最大問題就是第二步那個CAS,在高並發的時候效率會很差,所以這里才使用了ThreadLocalRandom,相當於每個線程都有一個Random,都有自己的種子,這樣就不會存在多線程競爭修改種子。想要詳細了解ThreadLocalRandom,參考:並發包中ThreadLocalRandom類原理剖析
fullAddCount()方法
其實這個方法沒什么好分析的,其實就是Striped64#longAccumulate()方法,據我的觀察好像一行不差,完全一樣,這里還是分析下吧。
1 private final void fullAddCount(long x, boolean wasUncontended) { 2 int h; 3 //上面我貼出來了介紹ThreadLocalRandom的文章,這里如果是首次獲取,其實就是0 4 if ((h = ThreadLocalRandom.getProbe()) == 0) { 5 //如果為0,就初始化,這里其實就是把種子和隨機數設置到(Thread)線程中 6 ThreadLocalRandom.localInit(); // force initialization 7 h = ThreadLocalRandom.getProbe(); 8 wasUncontended = true; 9 } 10 boolean collide = false; // True if last slot nonempty 11 12 //死循環,保證計數一定成功 13 for (;;) { 14 CounterCell[] as; CounterCell a; int n; long v; 15 //說明數組已經初始化,在后面有判斷數組沒有初始化的情況 16 if ((as = counterCells) != null && (n = as.length) > 0) { 17 //這里是不是和ConcurrentHashMap定位桶的位置很像,其實是一摸一樣的 18 //說明數組中這個位置沒有元素 19 if ((a = as[(n - 1) & h]) == null) { 20 //這個字段保證數組新增節點,擴容只有一個線程在進行,防止多線程並發 21 //這里限制一個線程處理只是在數組新增節點和擴容的時候,修改對象的值並不需要限制這個變量 22 if (cellsBusy == 0) { // Try to attach new Cell 23 CounterCell r = new CounterCell(x); // Optimistic create 24 25 //如果為0表示沒有別的線程在修改數組,通過CAS修改為1,表示當前線程在修改數組 26 if (cellsBusy == 0 && 27 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { 28 boolean created = false; 29 try { // Recheck under lock 30 CounterCell[] rs; int m, j; 31 //再次校驗,確保數組沒有變化 32 //rs[j = (m - 1) & h] == null,再次確認該位置是否為null,防止別的線程插入了 33 if ((rs = counterCells) != null && 34 (m = rs.length) > 0 && 35 rs[j = (m - 1) & h] == null) { 36 //插入數組 37 rs[j] = r; 38 created = true; 39 } 40 } finally { 41 //釋放CAS鎖 42 cellsBusy = 0; 43 } 44 if (created) 45 //如果新節點插入成功,表示計數已經成功,這里直接break了 46 break; 47 //如果失敗會一直重試 48 continue; // Slot is now non-empty 49 } 50 } 51 collide = false; 52 } 53 else if (!wasUncontended) // CAS already known to fail 54 wasUncontended = true; // Continue after rehash 55 56 //定位到桶中有值,然后通過CAS修改其值 57 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) 58 break; 59 //下面的兩個elseif其實是為了防止數組一直擴容使用的,數組的最大容量就是CPU的核數 60 //因為核數就是並發數,數組太大沒有意義,沒有那么多線程可以同時操作 61 //就是說上面的新建節點或者CAS修改值事變了,就會到這里,然后攔截住,不讓執行擴容 62 else if (counterCells != as || n >= NCPU) 63 collide = false; // At max size or stale 64 else if (!collide) 65 collide = true; 66 //先競爭到CAS鎖,然后執行擴容 67 else if (cellsBusy == 0 && 68 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { 69 try { 70 if (counterCells == as) {// Expand table unless stale 71 72 //每次擴容成原來的兩倍 73 CounterCell[] rs = new CounterCell[n << 1]; 74 //復制元素,看過ConcurrentHashMap的擴容,再看這個,簡直就跟一個大學生看小學數學題一樣,😄 75 for (int i = 0; i < n; ++i) 76 rs[i] = as[i]; 77 counterCells = rs; 78 } 79 } finally { 80 cellsBusy = 0; 81 } 82 collide = false; 83 continue; // Retry with expanded table 84 } 85 //這里是重新生成一個隨機數,換個位置試試,比如上面新增節點失敗了,換個位置試試,或者通過CAS修改值失敗,也換個位置再試試 86 h = ThreadLocalRandom.advanceProbe(h); 87 } 88 //這里就是判斷數組沒有初始化的情況,搞不明白沒啥放在這里,不放在開頭 89 else if (cellsBusy == 0 && counterCells == as && 90 U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { 91 boolean init = false; 92 try { // Initialize table 93 if (counterCells == as) { 94 //初始化的數組大小是2,非常小 95 CounterCell[] rs = new CounterCell[2]; 96 rs[h & 1] = new CounterCell(x); 97 counterCells = rs; 98 init = true; 99 } 100 } finally { 101 cellsBusy = 0; 102 } 103 if (init) 104 break; 105 } 106 //如果以上CAS修改,創建新節點都失敗了,這里還有一道防線,通過CAS修改baseCount 107 //這也是再addCount中,當判斷數組不為空,不先修改下baseCount試試,而是直接跳到這個方法中,因為在這個方法中也會修改baseCount 108 else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) 109 break; // Fall back on using base 110 } 111 }
方法流程梳理如下:
- 判斷數組是否為空,為空的話初始化數組
- 如果數組存在,通過探針hash定位桶中的位置,如果桶中為空,新建節點,通過CAS鎖插入數組,如果成功,結束,如果失敗轉到第5步
- 如果定位到桶中有值,通過CAS修改,如果成功,結束,如果失敗向下走
- 如果數組大小小於CPU核數,擴容數組
- 重新計算探針hash
總結
計數的原理就是概述中說的使用的是striped64,為啥不直接繼承striped64,不太懂,可能striped64出來晚一點,里面使用到ThreadLocalRandom,這個其實還是挺有意思的,總的來說計數過程並不復雜,是看ConcurrentHashMap源碼的時候比較愉快的部分。