ConcurrentHashMap源碼分析


前言:ConcurrentHashMap是HashMap的線程安全版本,內部使用了數組+鏈表+紅黑樹的結構來存儲數據,相對於同樣線程安全的Hashtable來說,它在效率方面有很大的提升,因此多線程環境下更多的是使用ConcurrentHashMap,因此有必要對其原理進行分析。

注:本文jdk源碼版本為jdk1.8.0_172


1.ConcurrentHashMap介紹

ConcurrentHashMap是HashMap的線程安全版本,底層數據結構為數組+鏈表+紅黑樹,默認容量16,線程同步,不允許[key,value]為null。

1 public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
2     implements ConcurrentMap<K,V>, Serializable

構造函數:

 1 public ConcurrentHashMap() {
 2 }
 3 
 4  public ConcurrentHashMap(int initialCapacity) {
 5     if (initialCapacity < 0)
 6         throw new IllegalArgumentException();
 7     int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
 8                MAXIMUM_CAPACITY :
 9                tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
10     this.sizeCtl = cap;
11 }
12 
13   public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
14     this.sizeCtl = DEFAULT_CAPACITY;
15     putAll(m);
16 }
17 
18  public ConcurrentHashMap(int initialCapacity, float loadFactor) {
19     this(initialCapacity, loadFactor, 1);
20 }
21 
22     public ConcurrentHashMap(int initialCapacity,
23                          float loadFactor, int concurrencyLevel) {
24     if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
25         throw new IllegalArgumentException();
26     if (initialCapacity < concurrencyLevel)   // Use at least as many bins
27         initialCapacity = concurrencyLevel;   // as estimated threads
28     long size = (long)(1.0 + (long)initialCapacity / loadFactor);
29     int cap = (size >= (long)MAXIMUM_CAPACITY) ?
30         MAXIMUM_CAPACITY : tableSizeFor((int)size);
31     this.sizeCtl = cap;
32 }

分析:

通過構造函數可以發現sizeCtl變量經常出現,該變量通過查看jdk源碼注釋可知該變量主要控制初始化或擴容:

#1.-1,表示線程正在進行初始化操作。

#2.-(1+nThreads),表示n個線程正在進行擴容。

#3.0,默認值,后續在真正初始化的時候使用默認容量。

#4.>0,初始化或擴容完成后下一次的擴容門檻。

2.具體源碼分析

put操作:

 1 final V putVal(K key, V value, boolean onlyIfAbsent) {
 2         if (key == null || value == null) throw new NullPointerException();
 3         // 計算key的hash值
 4         int hash = spread(key.hashCode());
 5         // 用來計算在這個節點總共有多少個元素,用來控制擴容或者轉移為樹
 6         int binCount = 0;
 7         // 進行自旋
 8         for (Node<K,V>[] tab = table;;) {
 9             Node<K,V> f; int n, i, fh;
10             if (tab == null || (n = tab.length) == 0)
11                 // table未初始化,則初始化
12                 tab = initTable();
13             // 如果該位置上的f為null,則說明第一次插入元素,則直接插入新的Node節點
14             else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
15                 if (casTabAt(tab, i, null,
16                              new Node<K,V>(hash, key, value, null)))
17                     break;                   // no lock when adding to empty bin
18             }
19             // 如果檢測到當前某個節點的hash值為MOVED,則表示正在進行數組擴張的數據復制階段
20             // 則當前線程與會參與復制,通過允許多線程復制的功能,減少數組的復制來帶來的性能損失
21             else if ((fh = f.hash) == MOVED)
22                 tab = helpTransfer(tab, f);
23             else {
24                 V oldVal = null;
25                 /**
26                  * 到該分支表明該位置上有元素,采用synchronized方式加鎖
27                  * 如果是鏈表的話,則對鏈表進行遍歷,找到key和key的hash值都一樣的節點,進行替換
28                  * 如果沒有找到,則添加在鏈表最后面
29                  * 如果是樹的話,則添加到樹中去
30                  */
31                 synchronized (f) {
32                     // 再次取出要存儲的位置元素,跟之前的數據進行比較,看是否進行了更改
33                     if (tabAt(tab, i) == f) {
34                         // 鏈表
35                         if (fh >= 0) {
36                             binCount = 1;
37                             // 遍歷鏈表
38                             for (Node<K,V> e = f;; ++binCount) {
39                                 K ek;
40                                 // 元素的hash、key都相同,則進行替換和hashMap相同
41                                 if (e.hash == hash &&
42                                     ((ek = e.key) == key ||
43                                      (ek != null && key.equals(ek)))) {
44                                     oldVal = e.val;
45                                     // 當使用putIfAbsent的時候,只有在這個key沒有設置值時的候才設置
46                                     if (!onlyIfAbsent)
47                                         e.val = value;
48                                     break;
49                                 }
50                                 Node<K,V> pred = e;
51                                 // 不同key,hash值相同時,直接添加到鏈表尾即可
52                                 if ((e = e.next) == null) {
53                                     pred.next = new Node<K,V>(hash, key,
54                                                               value, null);
55                                     break;
56                                 }
57                             }
58                         }
59                         // 當前結點為紅黑樹
60                         else if (f instanceof TreeBin) {
61                             Node<K,V> p;
62                             binCount = 2;
63                             // 添加元素到樹中去,表明樹的當前結點存在值,則進行替換
64                             if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
65                                                            value)) != null) {
66                                 oldVal = p.val;
67                                 if (!onlyIfAbsent)
68                                     p.val = value;
69                             }
70                         }
71                     }
72                 }
73                 if (binCount != 0) {
74                     // 當在同一個節點的數目大於等於8時,則進行擴容或者將數據轉換成紅黑樹
75                     // 注意,這里並不一定是直接轉換成紅黑樹,有可能先進行擴容
76                     if (binCount >= TREEIFY_THRESHOLD)
77                         treeifyBin(tab, i);
78                     if (oldVal != null)
79                         return oldVal;
80                     break;
81                 }
82             }
83         }
84         // 計數 binCount大於1(鏈表的長度)表示鏈表,binCount=2表示紅黑樹
85         addCount(1L, binCount);
86         return null;
87     }

分析:

通過查看put操作的核心源碼,整體邏輯還是比較清晰,有幾個點需要注意:

#1.在插入元素時,采用了自旋。

#2.在插入元素的時候才會進行初始化。

#3.在插入元素時,底層數據結構可能會轉向紅黑樹。

initTable:初始化函數

 1  private final Node<K,V>[] initTable() {
 2         Node<K,V>[] tab; int sc;
 3         while ((tab = table) == null || tab.length == 0) {
 4             // sizeCtl初始值為0,當小於0時,表示在別的線程初始化表或擴展表,當前線程只需要讓出cpu時間片即可
 5             if ((sc = sizeCtl) < 0)
 6                 Thread.yield(); // lost initialization race; just spin
 7             // 將sc更新為-1,表示線程正在進行初始化操作
 8             else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
 9                 try {
10                     if ((tab = table) == null || tab.length == 0) {
11                         // 指定了大小就創建指定大小的Node數組,否則創建默認大小的Node數組
12                         int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
13                         @SuppressWarnings("unchecked")
14                         Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
15                         table = tab = nt;
16                         sc = n - (n >>> 2);
17                     }
18                 } finally {
19                     // 和上面邏輯對別可知sizeCtl的大小為數組長度的3/4
20                     sizeCtl = sc;
21                 }
22                 break;
23             }
24         }
25         return tab;
26     }

分析:

在put操作時才進行初始化操作其實是懶加載的一種表現形式,並且初始化時,已考慮多線程的情況,默認容量為16

當掛在鏈表上的元素大於等於8時,會通過treeifyBin方法來判斷是否擴容或轉換為一棵樹。

treeifyBin:

 1 private final void treeifyBin(Node<K,V>[] tab, int index) {
 2         Node<K,V> b; int n, sc;
 3         if (tab != null) {
 4             // 如果數組長度小於64則進行擴容
 5             if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
 6                 tryPresize(n << 1); 
 7             else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
 8                 // 將鏈表轉換成樹
 9                 synchronized (b) {
10                     // 再次比較當前位置結點是否改變
11                     if (tabAt(tab, index) == b) {
12                         TreeNode<K,V> hd = null, tl = null; // hd:樹的頭(head)
13                         for (Node<K,V> e = b; e != null; e = e.next) {
14                             TreeNode<K,V> p =
15                                 new TreeNode<K,V>(e.hash, e.key, e.val,
16                                                   null, null);
17                             // 鏈表轉換成樹后,頭節點依然在相同位置
18                             if ((p.prev = tl) == null)
19                                 hd = p;
20                             else
21                                 tl.next = p;
22                             tl = p;
23                         }
24                         setTabAt(tab, index, new TreeBin<K,V>(hd));
25                     }
26                 }
27             }
28         }
29     }

分析:

從上述源碼上看,當節點鏈表上的元素大於等於8時,並不是一定要將數據結構轉換成樹。而是要先判斷數組的容量,如果數組長度小於64,會進行擴容(擴容為原來數組長度的一倍),否則才會轉換成樹。

tryPresize:擴容函數,注意通過treeifyBin調用tryPresize時,入參已經擴大2倍

 1    /**
 2      * 擴容時大小總是2的N次方
 3      * 擴容這里可能有一點繞,用一個例子來走下流程
 4      * 假設原來數組長度為16(默認值),在調用tryPresize的時候size的值已經變成了32(16<<1),此時sizeCtl為12
 5      * 計算出c的值為64,注意擴容會在transfer中進行(前提數組已經初始化),每次擴大2倍,由於數組長度基數為2的N次方,所以最終的數組長度也是2的N次方。
 6      * 注意c的值是用來控制循環退出的,條件c<=sc(sizeCtl)。
 7      *           數組長度   sizeCtl
 8      *第一次擴容:  32        28
 9      *第二次擴容:  64        48
10      *第三次擴容:  128       96   此時c(64)<sc(96) 此時退出擴容
11      */
12 private final void tryPresize(int size) {
13         // 通過tableSizeFor計算擴容退出控制量標志,容量大小總是2的N次方
14         int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
15             tableSizeFor(size + (size >>> 1) + 1);
16         int sc;
17         while ((sc = sizeCtl) >= 0) {
18             Node<K,V>[] tab = table; int n;
19             // 初始化
20             // 如果tab未初始化,則初始化一個大小為sizeCtl和c中較大的數組
21             // 初始化是將sizeCtl設置為-1,完成之后將其設置為數組長度的3/4
22             // 在此進行初始化,主要是因為如果直接調用putAll方法進行元素添加時,table還未初始化,所以這里需要判斷table是否進行了初始化
23             if (tab == null || (n = tab.length) == 0) {
24                 n = (sc > c) ? sc : c;
25                 // 初始化tab的時候,把sizeCtl設置為-1,通過CAS
26                 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
27                     try {
28                         if (table == tab) {
29                             @SuppressWarnings("unchecked")
30                             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
31                             table = nt;
32                             sc = n - (n >>> 2);
33                         }
34                     } finally {
35                         sizeCtl = sc;
36                     }
37                 }
38             }
39             // 一直擴容到c小於等於sizeCtl或者數組長度大於最大長度的時候,退出擴容
40             else if (c <= sc || n >= MAXIMUM_CAPACITY)
41                 break;
42             else if (tab == table) {
43                 int rs = resizeStamp(n);
44                 // 如果正在擴容,則幫助擴容
45                 // 否則的話,開始新的擴容
46                 // 在transfer操作,將第一個參數的table元素,移到第二個元素的table去,
47                 // 雖然此時第二個參數設置的是null,但是在transfer方法中,第二個參數為null的時候,會創建一個兩倍大小的table
48                 // sc小於0表示有線程在進行操作
49                 if (sc < 0) {
50                     Node<K,V>[] nt;
51                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
52                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
53                         transferIndex <= 0)
54                         break;
55                     // 將線程數加一,該線程將進行transfer,在transfer的時候,sc表示transfer工作線程數
56                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
57                         transfer(tab, nt);
58                 }
59                 // 沒有初始化或擴容,直接進行擴容
60                 else if (U.compareAndSwapInt(this, SIZECTL, sc,
61                                              (rs << RESIZE_STAMP_SHIFT) + 2))
62                     transfer(tab, null);
63             }
64         }
65     }

分析:

擴容時稍微有一點繞,但上面注釋給出了一個例子,理解該例子應該就可以理解擴容,特別要注意源碼中的c值,可以看做是擴容控制值,通過該值來終止擴容函數。

transfer:數組擴容函數

  1  private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  2         int n = tab.length, stride;
  3         // 確定線程負責數組大小的范圍
  4         if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  5             stride = MIN_TRANSFER_STRIDE; // subdivide range
  6         // 擴容后數組長度為原來的兩倍
  7         if (nextTab == null) {            // initiating
  8             try {
  9                 @SuppressWarnings("unchecked")
 10                 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
 11                 nextTab = nt;
 12             } catch (Throwable ex) {      // try to cope with OOME
 13                 sizeCtl = Integer.MAX_VALUE;
 14                 return;
 15             }
 16             nextTable = nextTab;
 17             transferIndex = n;
 18         }
 19         int nextn = nextTab.length;
 20         /**
 21          * 創建一個fwd結點,用來控制並發,當一個結點為空或者已經被轉移之后,就設置為fwd結點
 22          * 這是一個空的標志節點
 23          */
 24         ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 25         // 是否繼續向前查找的標志位
 26         boolean advance = true;
 27         boolean finishing = false; // to ensure sweep before committing nextTab
 28         for (int i = 0, bound = 0;;) {
 29             Node<K,V> f; int fh;
 30             while (advance) {
 31                 int nextIndex, nextBound;
 32                 if (--i >= bound || finishing)
 33                     advance = false;
 34                 else if ((nextIndex = transferIndex) <= 0) {
 35                     i = -1;
 36                     advance = false;
 37                 }
 38                 else if (U.compareAndSwapInt
 39                          (this, TRANSFERINDEX, nextIndex,
 40                           nextBound = (nextIndex > stride ?
 41                                        nextIndex - stride : 0))) {
 42                     bound = nextBound;
 43                     i = nextIndex - 1;
 44                     advance = false;
 45                 }
 46             }
 47             if (i < 0 || i >= n || i + n >= nextn) {
 48                 int sc;
 49                 // 數據遷移完成,替換舊桶數據
 50                 if (finishing) {
 51                     nextTable = null;
 52                     table = nextTab;
 53                     // 設置sizeCtl為擴容后的0.75
 54                     sizeCtl = (n << 1) - (n >>> 1);
 55                     return;
 56                 }
 57                 // 擴容完成,將擴容線程數-1
 58                 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
 59                     if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
 60                         return;
 61                     // finishing和advance設置為true,重新走到上面if條件,再次檢查是否遷移完
 62                     // 通過fh=f.hash==MOVED進行判斷
 63                     finishing = advance = true;
 64                     i = n; // recheck before commit
 65                 }
 66             }
 67             // 如果桶中無數據,則放入fwd標記,表示該位置已遷移
 68             else if ((f = tabAt(tab, i)) == null)
 69                 advance = casTabAt(tab, i, null, fwd);
 70             // 如果桶中第一個元素的hash值為MOVED,說明該節點為fwd節點,詳情看fwd節點的構造函數
 71             // 說明該位置已經被遷移
 72             else if ((fh = f.hash) == MOVED)
 73                 advance = true; // already processed
 74             else {
 75                 // 加鎖遷移元素
 76                 synchronized (f) {
 77                     // 再次判斷桶中第一個元素是否有過修改
 78                     if (tabAt(tab, i) == f) {
 79                         /**
 80                          * 把一個鏈表划分成兩個鏈表
 81                          * 規則是桶中各元素的hash值與桶大小n進行與操作
 82                          * 等於0的放到低位鏈表(low)中,等於1的放到高位鏈表(high)中
 83                          * 其中低位鏈表遷移到新桶的位置是相對舊桶不變的
 84                          * 高位鏈表遷移到新桶的位置正好是其在舊桶位置上加n,這里在HashMap(jdk1.8中)分析過。
 85                          * 這就是為什么擴容時,容量變成原來兩倍的原因
 86                          */
 87                         Node<K,V> ln, hn; // ln:low節點 hn:height節點
 88                         // 鏈表的節點hash值大於0,TreeBin的hash值為-2
 89                         if (fh >= 0) {
 90                             // 首先計算出當前結點的位置
 91                             int runBit = fh & n;
 92                             Node<K,V> lastRun = f;
 93                             for (Node<K,V> p = f.next; p != null; p = p.next) {
 94                                 int b = p.hash & n;
 95                                 // 同一節點下hashCode可能是不同的,這樣才會有hash分布
 96                                 // 更新runBit的值,找出與f不同的節點
 97                                 // 這里一直要找到鏈表尾,但是lastRun不一定是尾節點,也就是找到最后一段相同的
 98                                 // 因為是鏈表,當位置相同,直接就帶過去了,避免沒必要的循環
 99                                 if (b != runBit) {
100                                     runBit = b;
101                                     lastRun = p;
102                                 }
103                             }
104                             // 設置低位節點
105                             if (runBit == 0) {
106                                 ln = lastRun;
107                                 hn = null;
108                             }
109                             // 設置高位節點
110                             else {
111                                 hn = lastRun;
112                                 ln = null;
113                             }
114                             // 生成兩條鏈表,直接拼接
115                             // 找到不等於lastRun的節點,進行拼接,不是倒序,這里就是進行一個拼接,因為把hash值相同的鏈從lastRun帶過來了
116                             for (Node<K,V> p = f; p != lastRun; p = p.next) {
117                                 int ph = p.hash; K pk = p.key; V pv = p.val;
118                                 if ((ph & n) == 0)
119                                     ln = new Node<K,V>(ph, pk, pv, ln);
120                                 else
121                                     hn = new Node<K,V>(ph, pk, pv, hn);
122                             }
123                             // 這里設置和hashMap類似,在相應點上設置節點即可
124                             setTabAt(nextTab, i, ln);
125                             setTabAt(nextTab, i + n, hn);
126                             // 在舊的鏈表位置上設置占位符,標記已遷移完成
127                             setTabAt(tab, i, fwd);
128                             advance = true;
129                         }
130                         /**
131                          * 結點是樹的情況
132                          * 和鏈表相同,分成兩顆樹,根據hash&n為0的放在低位樹,為1的放在高位樹
133                          */
134                         else if (f instanceof TreeBin) {
135                             TreeBin<K,V> t = (TreeBin<K,V>)f;
136                             TreeNode<K,V> lo = null, loTail = null;
137                             TreeNode<K,V> hi = null, hiTail = null;
138                             int lc = 0, hc = 0;
139                             // 遍歷整棵樹,根據hash&n是否為0進行划分
140                             for (Node<K,V> e = t.first; e != null; e = e.next) {
141                                 int h = e.hash;
142                                 TreeNode<K,V> p = new TreeNode<K,V>
143                                     (h, e.key, e.val, null, null);
144                                 if ((h & n) == 0) {
145                                     if ((p.prev = loTail) == null)
146                                         lo = p;
147                                     else
148                                         loTail.next = p;
149                                     loTail = p;
150                                     ++lc;
151                                 }
152                                 else {
153                                     if ((p.prev = hiTail) == null)
154                                         hi = p;
155                                     else
156                                         hiTail.next = p;
157                                     hiTail = p;
158                                     ++hc;
159                                 }
160                             }
161                             // 復制完樹結點之后,如果樹的節點小於等於6時,就轉回鏈表
162                             ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
163                                 (hc != 0) ? new TreeBin<K,V>(lo) : t;
164                             hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
165                                 (lc != 0) ? new TreeBin<K,V>(hi) : t;
166                             // 低位樹的位置不變
167                             setTabAt(nextTab, i, ln);
168                             // 高位樹的位置在原來位置上加n
169                             setTabAt(nextTab, i + n, hn);
170                             // 標記該位置已經進遷移
171                             setTabAt(tab, i, fwd);
172                             // 繼續循環,執行--i操作
173                             advance = true;
174                         }
175                     }
176                 }
177             }
178         }
179     }

分析:

擴容函數中對於中間有段求i的值不是特別明白,其他流程還是比較清楚的,和HashMap的擴容有點類似,鏈表分成兩段進行處理,通過hash&n是否等於0進行划分,遷移是從靠后的桶開始的(具體就在中間那段求i的值處),在遷移過程中鎖住了當前桶,還是采用了分段鎖的思想。需注意:#1.針對樹節點,如果擴容后樹節點上的元素總數小於等於6,則會退化成鏈表;#2.在鏈表拆分后進行組合時並不一定是倒序

在put操作中還有一個幫助擴容的函數:helpTransfer

 1  // 線程添加元素時發現正在擴容且當前元素所在的桶已經遷移完成,則協助遷移其他桶的元素
 2     final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
 3         Node<K,V>[] nextTab; int sc;
 4         // 如果桶數組不為空,並且當前桶第一個元素為fwd類型,且nexttable不為空
 5         // 說明當前桶已經遷移完畢,可以去幫助遷移其他的桶的元素了
 6         if (tab != null && (f instanceof ForwardingNode) &&
 7             (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
 8             int rs = resizeStamp(tab.length);
 9            // sizeCtl<0,說明正在擴容
10             while (nextTab == nextTable && table == tab &&
11                    (sc = sizeCtl) < 0) {
12                 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
13                     sc == rs + MAX_RESIZERS || transferIndex <= 0)
14                     break;
15                 // 擴容線程數加1
16                 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
17                     // 當前線程幫忙遷移元素
18                     transfer(tab, nextTab);
19                     break;
20                 }
21             }
22             return nextTab;
23         }
24         return table;
25     }

分析:

只有當前桶元素遷移完成了才能去協助遷移其他桶的元素。

接下來看addCount函數,該函數在put操作后會判斷是否需要擴容,如果達到擴容門檻,則進行擴容或協助擴容。

 1  private final void addCount(long x, int check) {
 2         CounterCell[] as; long b, s;
 3         // 如果計數盒子不為空,或者修改baseCount失敗
 4         if ((as = counterCells) != null ||
 5             !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
 6             CounterCell a; long v; int m;
 7             boolean uncontended = true;
 8             // 如果as為空,或者長度為0,或者當前線程所在的段為null,或者在當前線程的段上加數量失敗
 9             if (as == null || (m = as.length - 1) < 0 ||
10                 (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
11                 !(uncontended =
12                   U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
13                 // 這里對counterCells擴容,減少多線程hash到同一個段的頻率
14                 fullAddCount(x, uncontended);
15                 return;
16             }
17             if (check <= 1)
18                 return;
19             // 計算元素個數
20             s = sumCount();
21         }
22         if (check >= 0) {
23             Node<K,V>[] tab, nt; int n, sc;
24             // 如果元素個數達到了擴容門檻,則進行擴容
25             // sizeCtl即為擴容門檻,它為容量的0.75倍
26             while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
27                    (n = tab.length) < MAXIMUM_CAPACITY) {
28                 // rs是擴容的一個郵戳標識
29                 int rs = resizeStamp(n);
30                 // sc小於0,表明正在擴容
31                 if (sc < 0) {
32                     if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
33                         sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
34                         transferIndex <= 0)
35                         // 擴容完成,退出循環
36                         break;
37                     // 擴容未完成,將當前線程加入遷移元素中,並把擴容線程數加1
38                     if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
39                         transfer(tab, nt);
40                 }
41                 else if (U.compareAndSwapInt(this, SIZECTL, sc,
42                                              (rs << RESIZE_STAMP_SHIFT) + 2))
43                     // 進行元素遷移
44                     transfer(tab, null);
45                 // 重新計算元素個數
46                 s = sumCount();
47             }
48         }
49     }

分析:

該函數的主要作用就是將元素個數加1,並且判斷是否需要進行擴容。目前對該函數的詳細邏輯不是特別清楚,后續再來進行分析。

一個put操作涉及的內容太多了,還需深入理解,下面來看get操作:

 1 public V get(Object key) {
 2         Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
 3         // 計算hash
 4         int h = spread(key.hashCode());
 5         // 如果對應位置上有元素
 6         if ((tab = table) != null && (n = tab.length) > 0 &&
 7             (e = tabAt(tab, (n - 1) & h)) != null) {
 8             // 如果第一個元素就是要找的元素,則直接返回
 9             if ((eh = e.hash) == h) {
10                 if ((ek = e.key) == key || (ek != null && key.equals(ek)))
11                     return e.val;
12             }
13             // 如果hash小於0,則說明是樹或正在擴容,則使用find尋找元素,find根據Node的不同子類實現方式不同
14             else if (eh < 0)
15                 return (p = e.find(h, key)) != null ? p.val : null;
16             // 遍歷整個鏈表尋找元素
17             while ((e = e.next) != null) {
18                 if (e.hash == h &&
19                     ((ek = e.key) == key || (ek != null && key.equals(ek))))
20                     return e.val;
21             }
22         }
23         return null;
24     }

分析:

get操作整體來說邏輯清楚明了,與HashMap類似,但是要注意hash值小於0的時候,其尋找元素的方式有所不同,並且整個獲取元素的過程是沒有加鎖的。

接下來看remove操作:

 1 final V replaceNode(Object key, V value, Object cv) {
 2         // 計算hash值
 3         int hash = spread(key.hashCode());
 4         // 進行自旋操作
 5         for (Node<K,V>[] tab = table;;) {
 6             Node<K,V> f; int n, i, fh;
 7             // 如果tab為空,或者key所在的位置上沒有元素,則直接終止自旋
 8             if (tab == null || (n = tab.length) == 0 ||
 9                 (f = tabAt(tab, i = (n - 1) & hash)) == null)
10                 break;
11             // 正在擴容,則協助其擴容
12             else if ((fh = f.hash) == MOVED)
13                 tab = helpTransfer(tab, f);
14             else {
15                 V oldVal = null;
16                 // 標記是否處理過
17                 boolean validated = false;
18                 // 加鎖
19                 synchronized (f) {
20                     // 再次驗證當前位置上的元素是否被修改過
21                     if (tabAt(tab, i) == f) {
22                         // 鏈表
23                         if (fh >= 0) {
24                             validated = true;
25                             // 遍歷鏈表,尋找節點
26                             for (Node<K,V> e = f, pred = null;;) {
27                                 K ek;
28                                 if (e.hash == hash &&
29                                     ((ek = e.key) == key ||
30                                      (ek != null && key.equals(ek)))) {
31                                     // 找到目標元素
32                                     V ev = e.val;
33                                     if (cv == null || cv == ev ||
34                                         (ev != null && cv.equals(ev))) {
35                                         oldVal = ev;
36                                         // 如果value不為空,則替換舊值
37                                         if (value != null)
38                                             e.val = value;
39                                         else if (pred != null)
40                                             // 前置節點不為空,刪除當前節點
41                                             pred.next = e.next;
42                                         else
43                                             // 如果前置節點為空,則說明是桶中第一個元素,則刪除即可
44                                             setTabAt(tab, i, e.next);
45                                     }
46                                     break;
47                                 }
48                                 // 更新前置節點
49                                 pred = e;
50                                 // 遍歷到鏈表尾還未找打元素,則跳出循環
51                                 if ((e = e.next) == null)
52                                     break;
53                             }
54                         }
55                         // 節點是樹
56                         else if (f instanceof TreeBin) {
57                             validated = true;
58                             TreeBin<K,V> t = (TreeBin<K,V>)f;
59                             TreeNode<K,V> r, p;
60                             // 遍歷樹找到目標節點
61                             if ((r = t.root) != null &&
62                                 (p = r.findTreeNode(hash, key, null)) != null) {
63                                 V pv = p.val;
64                                 if (cv == null || cv == pv ||
65                                     (pv != null && cv.equals(pv))) {
66                                     oldVal = pv;
67                                     if (value != null)
68                                         // 替換舊值
69                                         p.val = value;
70                                     else if (t.removeTreeNode(p))
71                                         // 當removeTreeNode返回true表示樹的元素個數較少,則退化成鏈表
72                                         setTabAt(tab, i, untreeify(t.first));
73                                 }
74                             }
75                         }
76                     }
77                 }
78                 // 如果處理過
79                 if (validated) {
80                     // 找到了元素,返回其舊值
81                     if (oldVal != null) {
82                         // 如果要替換的值為空,則將元素個數減1
83                         if (value == null)
84                             addCount(-1L, -1);
85                         return oldVal;
86                     }
87                     break;
88                 }
89             }
90         }
91         return null;
92     }

分析:

利用自旋刪除元素,整體流程清晰,根據鏈表或樹進行相應操作,注意如果刪除過程中正在進行擴容,需要協助其擴容后再進行刪除。

size函數:獲取元素個數

 1 public int size() {
 2     // 調用sumCount計算元素個數
 3     long n = sumCount();
 4     return ((n < 0L) ? 0 :
 5             (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
 6             (int)n);
 7 }
 8 
 9    final long sumCount() {
10     // 計算CounterCell所有段以及baseCount的數量之和
11     CounterCell[] as = counterCells; CounterCell a;
12     long sum = baseCount;
13     if (as != null) {
14         for (int i = 0; i < as.length; ++i) {
15             if ((a = as[i]) != null)
16                 sum += a.value;
17         }
18     }
19     return sum;
20 }

分析:

元素的個數會計算CounterCell所有段和baseCount之和,並且該函數是沒有加鎖的。

3.總結

ConcurrentHashMap的源碼分析真不容易,代碼量非常的大,其實有的地方目前還沒弄懂,需后續反復閱讀。

#1.ConcurrentHashMap是HashMap的線程安全版本。

#2.ConcurrentHashMap底層數據結構為數組+鏈表+紅黑樹,默認容量為16,不允許[key,value]為null。

#3.ConcurrentHashMap內部采用的鎖有synchronized、CAS、自旋鎖、分段鎖、volatile。

#4.通過sizeCtl變量來控制擴容、初始化等操作。

#5.查詢操作不加鎖,因此ConcurrentHashMap不是強一致性

ConcurrentHashMap未完待續!!!


by Shawn Chen,2019.09.18日,下午。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM