ConcurrentHashMap源碼解讀二


接下來就講解put里面的三個方法,分別是

1、數組初始化方法initTable()
2、線程協助擴容方法helpTransfer()
3、計數方法addCount()

首先是數組初始化,再將源碼之前,先得搞懂里面的一個重要參數,那就是sizeCtl。

sizeCtl默認為0,代表數組未初始化。

sizeCtl為正數,如果數組未初始化,那么其記錄的是數組的初始容量,如果數組已經初始化,那么其記錄的是數組的擴容閾值。

sizeCtl為-1,表示數組正在進行初始化。

sizeCtl小於0,並且不是-1,表示數組正在擴容,-(1+n),表示此時有n個線程共同完成數組的擴容操作。

接下來講解initTable()方法

private final Node<K,V>[] initTable() {
        Node<K,V>[] tab; int sc;
    //第一次put的時候,table還沒被初始化,所以能進入while。sizeCtl默認值是0,當有兩個線程都想進行初始化時,線程A CAS成功,也就是else if為true,繼續執行下面,而另一個線程cas就是false,就重新進行while循環,而這時sizeCtl為-1.所以這個線程就
    放棄cpu的控制權,說白了就是在多線程下保證初始化只執行一次。
while ((tab = table) == null || tab.length == 0) {//tab在這里賦值,是table的引用 if ((sc = sizeCtl) < 0)//sc在這里賦值。是sizeCtl的引用。 Thread.yield(); // lost initialization race; just spin//線程放棄cpu的控制權。
        //SIZECTL:表示當前對象的內存偏移量,sc表示期望值,-1表示要替換的值,設定為-1表示要初始化表了
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//SIZECTL是地址偏移量,如果SIZECTL對應地址的值與sc相等,說明當前的線程是第一個到達這條語句的線程,那么就會將SIZECTL地址所對應的值替換成-1,而SIZECTL地址偏移量對應的對象就是sizeCtl try { if ((tab = table) == null || tab.length == 0) {//再次進行判斷,防止在進行U.compareAndSwapInt(this, SIZECTL, sc, -1)的時候有其他線程並發進入方法,導致出錯,使用雙重鎖 int n = (sc > 0) ? sc : DEFAULT_CAPACITY;//sc在前邊就賦值了,如果有初值,那么這里n就是設定的初始值,否則n就是默認容量。16 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];//基於初始長度,構建數組對象 table = tab = nt; sc = n - (n >>> 2);//這里就是計算擴容閾值,並賦值給sc。也就是n-0.25n = 0.75n } } finally { sizeCtl = sc;//將擴容閾值,賦值給sizeCtl,第一次初始化后。 } break; } } return tab; }

所以在這個方法中,sizeCtl為正數,如果數組未初始化,那么其記錄的是數組的初始容量,如果數組已經初始化,那么其記錄的是數組的擴容閾值。就是這個含義。

第二個要講解的方法是addCount方法。

 這段代碼分為兩個部分,一個是計數部分,另一個是擴容部分。

<用兩種方法進行計數,一個是用cas對baseCount<baseCount是一個全局屬性,volatile的>進行加法計數,另一個是用CounterCell數組,其實CounterCell對象就有一個屬性是value,用CounterCell構建一個數組,然后哪個線程要進行加法,就用這個線程產生一個hash值,並用這個數與CounterCell數組的長度減一做與運算,得到的結果就是CounterCell數組的index,然后對這個index對應的CounterCell對象的value做加法。在最后統計計數的時候是用:baseCount+每個CounterCell的value>
countcell就是通過分散計算來減少競爭。其內部有一個基礎值和一個哈希表。當沒有競爭的時候在基礎值上計數。有競爭的時候通過哈希表計數(每個線程有一個哈希值,通過哈希值確定在哈希表的位置,在這個位置進行計數,不同位置互不影響)

計數部分:通過baseCount和CounterCell數組,二選一的參與計數
擴容部分:當大於擴容閾值的時候進行擴容,當滿足擴容條件時才能擴容
如果有別的線程正在進行擴容,那么就存在nextTable(一個全局屬性,表示正在擴容的線程),就把nextTable作為參數傳入transfer方法,就是讓這個線程幫忙擴容nextTable
如果沒有別的線程正在擴容,那么就把null傳入transfer方法,讓transfer方法創建一個nextTable

private final void addCount(long x, int check) {
        CounterCell[] as; long b, s;//as表示counterCells引用, b表示獲取的baseCount值, s應該是表示元素個數。
 //當CounterCell數組不為空,則優先利用數組中的CounterCell記錄數量
    //或者當baseCount的累加操作失敗,會利用數組中的CounterCell記錄數量
    //條件一:as!=null true:表示counterCells以及初始化過了,當前線程應該將數據寫入到對應的counterCell中。
            //as==null 也就是條件1為false,那么表示counterCells未初始化,當前所有線程應該將數據寫到baseCount中。
if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {//條件二:true:表示當前線程cas替換數據成功  false表示發生競爭了,可能需要重試或者擴容。//因為這里有個!,所以應該false才能進入
    //什么時候會進入到if判斷里面
         //1.條件一:as!=null true:表示counterCells以及初始化過了,當前線程應該將數據寫入到對應的counterCell中。
        //2.條件二:false表示發生競爭了,可能需要重試或者擴容 CounterCell a;
long v; int m;//a表示當前線程命中的CounterCell單元格 v表示期望值,m表示as數組的長度。 boolean uncontended = true; //標識是否有多線程競爭 ,//true表示未發生競爭,false表示發生競爭。
      //當as數組為空
        //或者當as長度為0
        //或者當前線程對應的as數組桶位的元素為空
        //或者當前線程對應的as數組桶位不為空,但是累加失敗
        //條件一:as==null 為true,說明counterCells未初始化,那么上面就是根據多線程寫base發生競爭進入到這里的。
                //as!=null為false,說明counterCells已經初始化了。當前線程應該是找自己的counterCells寫值。
if (as == null || (m = as.length - 1) < 0 ||
        //ThreadLocalRandom.getProbe()表示當前線程的hash值。 m是CountCell長度-1,as.length一定是2的次方數,比如長度為16,那么減一就是15
            //也就是b1111,那么與上之后肯定是小於等於當前長度的值,也就是下標。
            //如果a==null,也就是條件為true,說明當前線程對應下標的CountCell為空,那么就需要創建
            //如果是false,不為空,說明下一步想要將x的值添加到CountCell中。 (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
         //條件三:true:表示cas失敗,意味着當前線程對應的CountCell有競爭,
                    //false,表示cas成功 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
       //都有哪些情況會調用下面的方法:
            //1.as==null 為true,說明counterCells未初始化,那么上面就是根據多線程寫base發生競爭進入到這里的,那么初始化。
            //2.如果a==null,也就是條件為true,說明當前線程對應下標的CountCell為空,那么就需要創建
            //3.true:表示cas失敗,意味着當前線程對應的CountCell有競爭,那么就可能是重試或者擴容。
            //以上任何一種情況成立,都會進入該方法,傳入的uncontended是false fullAddCount(x, uncontended);//也就是countCells是一個全局得volatile的CountCell數組,創建實在fullAddCount方法中。
return; } if (check <= 1) return; s = sumCount();//計算元素個數 } if (check >= 0) { Node<K,V>[] tab, nt; int n, sc;
       //當元素個數達到擴容閾值
        //並且數組不為空
        //並且數組長度小於限定的最大值
        //滿足以上所有條件,執行擴容
while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0) { //sc小於0,說明有線程正在擴容,那么會協助擴容
      //擴容結束或者擴容線程數達到最大值或者擴容后的數組為null或者沒有更多的桶位需要轉移,結束操作
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break;
            //擴容線程加1,成功后,進行協助擴容操作
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt);//協助擴容,newTable不為null }
          //否則sc>=0,說明是首個擴容的線程,所以transfer傳入的參數是null
else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) transfer(tab, null); s = sumCount(); } } }

接下來就是講解上面的fullAddCount方法

① 當CounterCell數組不為空,優先對CounterCell數組中的CounterCell的value累加

② 當CounterCell數組為空,會去創建CounterCell數組,默認長度為2,並對數組中的CounterCell的value累加

③ 當數組為空,並且此時有別的線程正在創建數組,那么嘗試對baseCount做累加,成功即返回,否則自旋

private final void fullAddCount(long x, boolean wasUncontended) {//wasUncontended只有counterCells初始化之后,並且當前線程競爭修改失敗,才會是false。其余情況都是true。
        int h;//h表示線程的hash值。
    //獲取當前線程的hash值
if ((h = ThreadLocalRandom.getProbe()) == 0) {//條件成立:說明當前線程還未分配hash值。 ThreadLocalRandom.localInit();//給當前線程分配hash值 // force initialization h = ThreadLocalRandom.getProbe();//取出當前線程的hash值,賦值給h wasUncontended = true;//為啥這里強制設為true,當前線程肯定是寫入到了countCells[0]位置,不把它當作一次真正的競爭。 } boolean collide = false; //標識是否有沖突,如果最后一個桶不是null,那么為true //表示擴容意向 false一定不會擴容,true可能會擴容  // True if last slot nonempty for (;;) {//自旋 CounterCell[] as; CounterCell a; int n; long v;//as表示counterCells引用,a表示當前線程命中的CounterCell,n表示counterCells數組長度,v表示期望值
       //數組不為空,優先對數組中CouterCell的value累加
        //CASE1:表示countCells已經初始化了,當前線程應該將數據寫入到對應的CounterCell中。
if ((as = counterCells) != null && (n = as.length) > 0) {
      //2.如果a==null,也就是條件為true,說明當前線程對應下標的CountCell為空,那么就需要創建
            //3.true:表示cas失敗,意味着當前線程對應的CountCell有競爭,那么就可能是重試或者擴容。這兩種情況會進入到這個if判斷中
            //線程對應的桶位為null
         //CASE1.1:a = as[(n - 1) & h]) == null true表示當前線程對應的下標位置的CounterCell為null,需要創建new CounterCell
if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { //true:表示當前鎖未被占用,false表示鎖被占用 // Try to attach new Cell CounterCell r = new CounterCell(x); // Optimistic create//創建CounterCell對象
                //利用CAS修改cellBusy狀態為1,成功則將剛才創建的CounterCell對象放入數組中
                    //條件一:true:表示當前鎖未被占用,false表示鎖被占用
                    //條件二:true:表示當前線程獲取鎖成功,false表示當前線程獲取鎖失敗。
if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean created = false;//是否創建成功的標記 try { // Recheck under lock CounterCell[] rs; int m, j;//rs表示當前countCells引用,m表示rs的長度,j表示當前線程命中的下標。
                  //桶位為空, 將CounterCell對象放入數組
                            //條件一條件二恆成立
                            // rs[j = (m - 1) & h] == null為了防止其它線程初始化過該位置,然后當前線程再次初始化該位置,導致數據丟失。
if ((rs = counterCells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true;//表示放入成功 } } finally { cellsBusy = 0; } if (created)//成功退出循環 break; continue; //桶位已經被別的線程放置了已給CounterCell對象,繼續循環 // Slot is now non-empty } } collide = false;//將擴容意向改為false,原因是因為再CASE1.1中當前CounterCell都是為null,不可能不讓寫。因此不需要擴容。 }
          //桶位不為空,重新計算線程hash值,然后繼續循環
            //CASE1.2:只有一種情況會來到這里,wasUncontended只有counterCells初始化之后,並且當前線程競爭修改失敗,才會是false
else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash
        //重新計算了hash值后,對應的桶位依然不為空,對value累加
            //成功則結束循環
            //失敗則繼續下面判斷
            //CASE1.3:當前線程rehash過hash值,然后新命中的CounterCell不為空。則來到這里。
            //true:寫成功,退出循環,
            //false:表示rehash之后命中的新的cell也有競爭,重試1次  再重試一次
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x)) break;
         //數組被別的線程改變了,或者數組長度超過了可用cpu大小,重新計算線程hash值,否則繼續下一個判斷
            //CASE1.4:條件一:n >= NCPU true->表示擴容意向改為false,表示不擴容了。false說明counterCells數組還可以擴容
            //條件二:counterCells != as true表示其它線程已經擴容過了,當前線程rehash之后重試即可。
else if (counterCells != as || n >= NCPU) collide = false; // At max size or stale
        //當沒有沖突,修改為有沖突,並重新計算線程hash,繼續循環
            //CASE1.5  !collide=true,設置擴容意向為true,但是不一定真的發生擴容。因為一旦進入這里,那么又會rehash一下,又會重來。
else if (!collide) collide = true;
         //如果CounterCell的數組長度沒有超過cpu核數,對數組進行兩倍擴容
            //並繼續循環
            //CASE1.6:真正擴容的邏輯
            //條件一:cellsBusy == 0 true表示當前無鎖狀態,當前線程可以去競爭這把鎖。
            //條件二:U.compareAndSwapInt(this, CELLSBUSY, 0, 1)  true表示當前線程獲取鎖成功,可以執行擴容邏輯。
                                                            //false表示當前時刻有其它線程正在做擴容相關的操作。
else if (cellsBusy == 0 && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { try { if (counterCells == as) {// Expand table unless stale //這里又有雙重檢測,就是為了防止擴容過了又再次擴容。 CounterCell[] rs = new CounterCell[n << 1];//擴容為兩倍擴容 for (int i = 0; i < n; ++i) rs[i] = as[i];//將舊數組的值放到新數組。 counterCells = rs; } } finally { cellsBusy = 0; //釋放鎖 } collide = false; continue; // Retry with expanded table } h = ThreadLocalRandom.advanceProbe(h);//重置當前hash值 rehash }
     //CounterCells數組為空,並且沒有線程在創建數組,修改標記,並創建數組,因為前面CASE1是數組不為空,所以這里是數組為空
        //CASE2:當前條件countCells還未初始化,as為null
        //條件一:cellsBusy == 0  true表示當前未加鎖
        //條件二:counterCells == as,為什么又重新比較一遍,明明前面CASE1已經賦值未null了。因為其它線程可能會在你給as賦值之后修改了counterCells。
        //條件三:如果未true,表示獲取鎖成功,會把cellsBusy改成1。false表示其它線程正在持有這把鎖,那么當前線程就進不了這里面了。
else if (cellsBusy == 0 && counterCells == as && U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) { boolean init = false; try { // Initialize table if (counterCells == as) {//為什么這里又判斷了一下counterCells == as,防止其它線程已經初始化了,當前線程再次初始化,就會覆蓋掉其它線程初始化的CountCell,導致丟失數據。 CounterCell[] rs = new CounterCell[2];//初始容量為2 rs[h & 1] = new CounterCell(x); counterCells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; }
    //CASE3:
        //1.當前cellsBusy加鎖狀態,表示其他線程正在初始化countCells,所以當前線程將值累加到baseCount。
        //2.countCells被其它線程初始化后,當前線程需要將數據累加到base。
        //數組為空,並且有別的線程在創建數組,那么嘗試對baseCount做累加,成功就退出循環,失敗就繼續循環
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x)) break; // Fall back on using base } }

總結:addCount方法通過屬性baseCount 和 counterCell數組中所有的元素的和來記錄size
在無競爭的情況下,通過cas當前map對象的baseCount為baseCount + 1,
有競爭情況下,上訴cas失敗,會初始化一個長度為2的CounterCell數組,數組會擴容,每次擴容成兩倍,每個線程有在counterCell數組中對應的位置(多個線程可能會對應同一個位置), 如果位置上的CounterCell元素為空,就生成一個value為1的元素,如果不為空,則cas當前CounterCell元素的value為value + 1;如果都失敗嘗試cas當前map對象的baseCount為baseCount + 1。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM