ConcurrentHashMap原理分析(三)-計數


概述

  由於ConcurrentHashMap是一個高並發的集合,集合中增刪就比較頻繁,那計數就變成了一個問題,如果使用像AtomicInteger這樣類型的變量來計數,雖然可以保證原子性,但是太多線程去競爭CAS,自旋也挺浪費時間的,所以ConcurrentHashMap使用了一種類似LongAddr的數據結構去計數,其實LongAddr是繼承Striped64,有關於這個類的原理大家可以參考這篇文章:並發之STRIPED64(累加器)和 LONGADDER,大家了解了這個類的原理,理解ConcurrentHashMap計數就沒有一點壓力了,因為兩者在代碼實現上基本一樣。

ConcurrentHashMap計數原理

    private transient volatile long baseCount;

    private transient volatile CounterCell[] counterCells;

    @sun.misc.Contended static final class CounterCell {
        volatile long value;
        CounterCell(long x) { value = x; }
    }

ConcurrentHashMap就是依托上面三個東東進行計數的,那下面就詳細解釋一下這三個東東。

  • baseCount:最基礎的計數,比如只有一個線程put操作,只需要通過CAS修改baseCount就可以了。
  • counterCells:這是一個數組,里面放着CounterCell對象,這個類里面就一個屬性,其使用方法是,在高並發的時候,多個線程都要進行計數,每個線程有一個探針hash值,通過這個hash值定位到數組桶的位置,如果這個位置有值就通過CAS修改CounterCell的value(如果修改失敗,就換一個再試),如果沒有,就創建一個CounterCell對象。
  • 最后通過把桶中的所有對象的value值和baseCount求和得到總值,代碼如下。
    final long sumCount() {
        CounterCell[] as = counterCells; CounterCell a;
        //baseCount作為基礎值
        long sum = baseCount;
        if (as != null) {
            //遍歷數組
            for (int i = 0; i < as.length; ++i) {
                if ((a = as[i]) != null)
                    //對每個value累加
                    sum += a.value;
            }
        }
        return sum;
    }

通過上面的分析,相信大家已經了解了高並發計數的方法,在上面的介紹中提到一點,每個線程的探針hash值,大家先有個印象,一會分析代碼的時候會使用這個,其實這個值很有趣。

addCount()方法

又到了這個方法,在上篇文章中分析擴容的時候也分析過這個方法,不過分析的是一部分,現在分析另一部分。

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;
        //如果數組還沒有創建,就直接對baseCount進行CAS操作
        if ((as = counterCells) != null ||
            !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
            CounterCell a; long v; int m;
            //設置沒有沖突標志位為true
            boolean uncontended = true;
            //第一個條件as == null成立說明,counterCells數組沒有創建,而且通過CAS修改baseCount失敗,說明有別的線程競爭CAS
            //a = as[ThreadLocalRandom.getProbe() & m]) == null,說明數組是創建了,但是通過探針hash定位的桶中沒有對象
            //如果有對象,執行最后一個,進行CAS修改CounterCell對象,如果也失敗了,就要進入下面的方法
            if (as == null || (m = as.length - 1) < 0 ||
                (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
                !(uncontended =
                  U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
                //進行更精確的處理
                fullAddCount(x, uncontended);
                return;
            }
            if (check <= 1)
                return;
            s = sumCount();
        }
        //省略部分代碼
}

上面整體過程還是很好理解的,就是我在上面介紹計數原理中說的步驟,但是上面有一個地方需要注意下就是:

a = as[ThreadLocalRandom.getProbe() & m]) == null 

這里的ThreadLocalRandom.getProbe(),看着名字大家應該可以猜到應該是要獲取一個隨機值,因為有Random嘛,其實這里就是在獲取一個隨機值。那既然是要獲取一個隨機值,為什么要使用ThreadLocalRandom,而不直接使用Random,那看到ThreadLocal,大家也應該可以想到就是這個隨機值的獲取,線程之間是隔離的,每個線程獲取自己的隨機值,互相之間沒有影響,為什么要互相之間沒有影響呢?因為Random要實現隨機,有一個關鍵的東西就是種子(seed),具體過程如下:

  • 初始化Random的時候,如果沒有傳seed,就根據時間戳進行一些計算初始化一個種子
  • 如果某個線程需要獲取隨機數,先通過CAS更新種子seed1 = function1(seed)
  • 根據seed1,計算隨機數 = function2(seed1)
  • 上面的兩個function是固定的,就是說如果初始種子一樣,兩個不同的Random對象生成隨機數會完全一樣

上面的過程咋一看沒啥問題,其實最大問題就是第二步那個CAS,在高並發的時候效率會很差,所以這里才使用了ThreadLocalRandom,相當於每個線程都有一個Random,都有自己的種子,這樣就不會存在多線程競爭修改種子。想要詳細了解ThreadLocalRandom,參考:並發包中ThreadLocalRandom類原理剖析

fullAddCount()方法

其實這個方法沒什么好分析的,其實就是Striped64#longAccumulate()方法,據我的觀察好像一行不差,完全一樣,這里還是分析下吧。

  1 private final void fullAddCount(long x, boolean wasUncontended) {
  2         int h;
  3          //上面我貼出來了介紹ThreadLocalRandom的文章,這里如果是首次獲取,其實就是0 
  4          if ((h = ThreadLocalRandom.getProbe()) == 0) {
  5             //如果為0,就初始化,這里其實就是把種子和隨機數設置到(Thread)線程中
  6             ThreadLocalRandom.localInit();      // force initialization
  7             h = ThreadLocalRandom.getProbe();
  8             wasUncontended = true;
  9         }
 10         boolean collide = false;                // True if last slot nonempty
 11      
 12         //死循環,保證計數一定成功
 13         for (;;) {
 14             CounterCell[] as; CounterCell a; int n; long v;
 15             //說明數組已經初始化,在后面有判斷數組沒有初始化的情況
 16             if ((as = counterCells) != null && (n = as.length) > 0) {
 17                 //這里是不是和ConcurrentHashMap定位桶的位置很像,其實是一摸一樣的
 18                //說明數組中這個位置沒有元素 
 19                if ((a = as[(n - 1) & h]) == null) {
 20                     //這個字段保證數組新增節點,擴容只有一個線程在進行,防止多線程並發
 21                     //這里限制一個線程處理只是在數組新增節點和擴容的時候,修改對象的值並不需要限制這個變量
 22                     if (cellsBusy == 0) {            // Try to attach new Cell
 23                         CounterCell r = new CounterCell(x); // Optimistic create
 24 
 25                         //如果為0表示沒有別的線程在修改數組,通過CAS修改為1,表示當前線程在修改數組
 26                         if (cellsBusy == 0 &&
 27                             U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
 28                             boolean created = false;
 29                             try {               // Recheck under lock
 30                                 CounterCell[] rs; int m, j;
 31                                 //再次校驗,確保數組沒有變化
 32                                 //rs[j = (m - 1) & h] == null,再次確認該位置是否為null,防止別的線程插入了
 33                                 if ((rs = counterCells) != null &&
 34                                     (m = rs.length) > 0 &&
 35                                     rs[j = (m - 1) & h] == null) {
 36                                     //插入數組
 37                                     rs[j] = r;
 38                                     created = true;
 39                                 }
 40                             } finally {
 41                                //釋放CAS鎖
 42                                 cellsBusy = 0;
 43                             }
 44                             if (created)
 45                                 //如果新節點插入成功,表示計數已經成功,這里直接break了
 46                                 break;
 47                            //如果失敗會一直重試
 48                             continue;           // Slot is now non-empty
 49                         }
 50                     }
 51                     collide = false;
 52                 }
 53                 else if (!wasUncontended)       // CAS already known to fail
 54                     wasUncontended = true;      // Continue after rehash
 55               
 56                 //定位到桶中有值,然后通過CAS修改其值
 57                 else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
 58                     break;
 59                //下面的兩個elseif其實是為了防止數組一直擴容使用的,數組的最大容量就是CPU的核數
 60                //因為核數就是並發數,數組太大沒有意義,沒有那么多線程可以同時操作
 61                //就是說上面的新建節點或者CAS修改值事變了,就會到這里,然后攔截住,不讓執行擴容
 62                 else if (counterCells != as || n >= NCPU)
 63                     collide = false;            // At max size or stale
 64                 else if (!collide)
 65                     collide = true;
 66                 //先競爭到CAS鎖,然后執行擴容
 67                 else if (cellsBusy == 0 &&
 68                          U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
 69                     try {
 70                         if (counterCells == as) {// Expand table unless stale
 71                            
 72                             //每次擴容成原來的兩倍
 73                             CounterCell[] rs = new CounterCell[n << 1];
 74                             //復制元素,看過ConcurrentHashMap的擴容,再看這個,簡直就跟一個大學生看小學數學題一樣,😄
 75                             for (int i = 0; i < n; ++i)
 76                                 rs[i] = as[i];
 77                             counterCells = rs;
 78                         }
 79                     } finally {
 80                         cellsBusy = 0;
 81                     }
 82                     collide = false;
 83                     continue;                   // Retry with expanded table
 84                 }
 85                 //這里是重新生成一個隨機數,換個位置試試,比如上面新增節點失敗了,換個位置試試,或者通過CAS修改值失敗,也換個位置再試試
 86                 h = ThreadLocalRandom.advanceProbe(h);
 87             }
 88             //這里就是判斷數組沒有初始化的情況,搞不明白沒啥放在這里,不放在開頭
 89             else if (cellsBusy == 0 && counterCells == as &&
 90                      U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
 91                 boolean init = false;
 92                 try {                           // Initialize table
 93                     if (counterCells == as) {
 94                         //初始化的數組大小是2,非常小
 95                         CounterCell[] rs = new CounterCell[2];
 96                         rs[h & 1] = new CounterCell(x);
 97                         counterCells = rs;
 98                         init = true;
 99                     }
100                 } finally {
101                     cellsBusy = 0;
102                 }
103                 if (init)
104                     break;
105             }
106             //如果以上CAS修改,創建新節點都失敗了,這里還有一道防線,通過CAS修改baseCount
107             //這也是再addCount中,當判斷數組不為空,不先修改下baseCount試試,而是直接跳到這個方法中,因為在這個方法中也會修改baseCount
108             else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
109                 break;                          // Fall back on using base
110         }
111     }

方法流程梳理如下:

  1. 判斷數組是否為空,為空的話初始化數組
  2. 如果數組存在,通過探針hash定位桶中的位置,如果桶中為空,新建節點,通過CAS鎖插入數組,如果成功,結束,如果失敗轉到第5步
  3. 如果定位到桶中有值,通過CAS修改,如果成功,結束,如果失敗向下走
  4. 如果數組大小小於CPU核數,擴容數組
  5. 重新計算探針hash

總結

  計數的原理就是概述中說的使用的是striped64,為啥不直接繼承striped64,不太懂,可能striped64出來晚一點,里面使用到ThreadLocalRandom,這個其實還是挺有意思的,總的來說計數過程並不復雜,是看ConcurrentHashMap源碼的時候比較愉快的部分。

    


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM