前言:ConcurrentHashMap是HashMap的線程安全版本,內部使用了數組+鏈表+紅黑樹的結構來存儲數據,相對於同樣線程安全的Hashtable來說,它在效率方面有很大的提升,因此多線程環境下更多的是使用ConcurrentHashMap,因此有必要對其原理進行分析。
注:本文jdk源碼版本為jdk1.8.0_172
1.ConcurrentHashMap介紹
ConcurrentHashMap是HashMap的線程安全版本,底層數據結構為數組+鏈表+紅黑樹,默認容量16,線程同步,不允許[key,value]為null。
1 public class ConcurrentHashMap<K,V> extends AbstractMap<K,V> 2 implements ConcurrentMap<K,V>, Serializable
構造函數:
1 public ConcurrentHashMap() { 2 } 3 4 public ConcurrentHashMap(int initialCapacity) { 5 if (initialCapacity < 0) 6 throw new IllegalArgumentException(); 7 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? 8 MAXIMUM_CAPACITY : 9 tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); 10 this.sizeCtl = cap; 11 } 12 13 public ConcurrentHashMap(Map<? extends K, ? extends V> m) { 14 this.sizeCtl = DEFAULT_CAPACITY; 15 putAll(m); 16 } 17 18 public ConcurrentHashMap(int initialCapacity, float loadFactor) { 19 this(initialCapacity, loadFactor, 1); 20 } 21 22 public ConcurrentHashMap(int initialCapacity, 23 float loadFactor, int concurrencyLevel) { 24 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) 25 throw new IllegalArgumentException(); 26 if (initialCapacity < concurrencyLevel) // Use at least as many bins 27 initialCapacity = concurrencyLevel; // as estimated threads 28 long size = (long)(1.0 + (long)initialCapacity / loadFactor); 29 int cap = (size >= (long)MAXIMUM_CAPACITY) ? 30 MAXIMUM_CAPACITY : tableSizeFor((int)size); 31 this.sizeCtl = cap; 32 }
分析:
通過構造函數可以發現sizeCtl變量經常出現,該變量通過查看jdk源碼注釋可知該變量主要控制初始化或擴容:
#1.-1,表示線程正在進行初始化操作。
#2.-(1+nThreads),表示n個線程正在進行擴容。
#3.0,默認值,后續在真正初始化的時候使用默認容量。
#4.>0,初始化或擴容完成后下一次的擴容門檻。
2.具體源碼分析
put操作:
1 final V putVal(K key, V value, boolean onlyIfAbsent) { 2 if (key == null || value == null) throw new NullPointerException(); 3 // 計算key的hash值 4 int hash = spread(key.hashCode()); 5 // 用來計算在這個節點總共有多少個元素,用來控制擴容或者轉移為樹 6 int binCount = 0; 7 // 進行自旋 8 for (Node<K,V>[] tab = table;;) { 9 Node<K,V> f; int n, i, fh; 10 if (tab == null || (n = tab.length) == 0) 11 // table未初始化,則初始化 12 tab = initTable(); 13 // 如果該位置上的f為null,則說明第一次插入元素,則直接插入新的Node節點 14 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 15 if (casTabAt(tab, i, null, 16 new Node<K,V>(hash, key, value, null))) 17 break; // no lock when adding to empty bin 18 } 19 // 如果檢測到當前某個節點的hash值為MOVED,則表示正在進行數組擴張的數據復制階段 20 // 則當前線程與會參與復制,通過允許多線程復制的功能,減少數組的復制來帶來的性能損失 21 else if ((fh = f.hash) == MOVED) 22 tab = helpTransfer(tab, f); 23 else { 24 V oldVal = null; 25 /** 26 * 到該分支表明該位置上有元素,采用synchronized方式加鎖 27 * 如果是鏈表的話,則對鏈表進行遍歷,找到key和key的hash值都一樣的節點,進行替換 28 * 如果沒有找到,則添加在鏈表最后面 29 * 如果是樹的話,則添加到樹中去 30 */ 31 synchronized (f) { 32 // 再次取出要存儲的位置元素,跟之前的數據進行比較,看是否進行了更改 33 if (tabAt(tab, i) == f) { 34 // 鏈表 35 if (fh >= 0) { 36 binCount = 1; 37 // 遍歷鏈表 38 for (Node<K,V> e = f;; ++binCount) { 39 K ek; 40 // 元素的hash、key都相同,則進行替換和hashMap相同 41 if (e.hash == hash && 42 ((ek = e.key) == key || 43 (ek != null && key.equals(ek)))) { 44 oldVal = e.val; 45 // 當使用putIfAbsent的時候,只有在這個key沒有設置值時的候才設置 46 if (!onlyIfAbsent) 47 e.val = value; 48 break; 49 } 50 Node<K,V> pred = e; 51 // 不同key,hash值相同時,直接添加到鏈表尾即可 52 if ((e = e.next) == null) { 53 pred.next = new Node<K,V>(hash, key, 54 value, null); 55 break; 56 } 57 } 58 } 59 // 當前結點為紅黑樹 60 else if (f instanceof TreeBin) { 61 Node<K,V> p; 62 binCount = 2; 63 // 添加元素到樹中去,表明樹的當前結點存在值,則進行替換 64 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 65 value)) != null) { 66 oldVal = p.val; 67 if (!onlyIfAbsent) 68 p.val = value; 69 } 70 } 71 } 72 } 73 if (binCount != 0) { 74 // 當在同一個節點的數目大於等於8時,則進行擴容或者將數據轉換成紅黑樹 75 // 注意,這里並不一定是直接轉換成紅黑樹,有可能先進行擴容 76 if (binCount >= TREEIFY_THRESHOLD) 77 treeifyBin(tab, i); 78 if (oldVal != null) 79 return oldVal; 80 break; 81 } 82 } 83 } 84 // 計數 binCount大於1(鏈表的長度)表示鏈表,binCount=2表示紅黑樹 85 addCount(1L, binCount); 86 return null; 87 }
分析:
通過查看put操作的核心源碼,整體邏輯還是比較清晰,有幾個點需要注意:
#1.在插入元素時,采用了自旋。
#2.在插入元素的時候才會進行初始化。
#3.在插入元素時,底層數據結構可能會轉向紅黑樹。
initTable:初始化函數
1 private final Node<K,V>[] initTable() { 2 Node<K,V>[] tab; int sc; 3 while ((tab = table) == null || tab.length == 0) { 4 // sizeCtl初始值為0,當小於0時,表示在別的線程初始化表或擴展表,當前線程只需要讓出cpu時間片即可 5 if ((sc = sizeCtl) < 0) 6 Thread.yield(); // lost initialization race; just spin 7 // 將sc更新為-1,表示線程正在進行初始化操作 8 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 9 try { 10 if ((tab = table) == null || tab.length == 0) { 11 // 指定了大小就創建指定大小的Node數組,否則創建默認大小的Node數組 12 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 13 @SuppressWarnings("unchecked") 14 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 15 table = tab = nt; 16 sc = n - (n >>> 2); 17 } 18 } finally { 19 // 和上面邏輯對別可知sizeCtl的大小為數組長度的3/4 20 sizeCtl = sc; 21 } 22 break; 23 } 24 } 25 return tab; 26 }
分析:
在put操作時才進行初始化操作其實是懶加載的一種表現形式,並且初始化時,已考慮多線程的情況,默認容量為16。
當掛在鏈表上的元素大於等於8時,會通過treeifyBin方法來判斷是否擴容或轉換為一棵樹。
treeifyBin:
1 private final void treeifyBin(Node<K,V>[] tab, int index) { 2 Node<K,V> b; int n, sc; 3 if (tab != null) { 4 // 如果數組長度小於64則進行擴容 5 if ((n = tab.length) < MIN_TREEIFY_CAPACITY) 6 tryPresize(n << 1); 7 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { 8 // 將鏈表轉換成樹 9 synchronized (b) { 10 // 再次比較當前位置結點是否改變 11 if (tabAt(tab, index) == b) { 12 TreeNode<K,V> hd = null, tl = null; // hd:樹的頭(head) 13 for (Node<K,V> e = b; e != null; e = e.next) { 14 TreeNode<K,V> p = 15 new TreeNode<K,V>(e.hash, e.key, e.val, 16 null, null); 17 // 鏈表轉換成樹后,頭節點依然在相同位置 18 if ((p.prev = tl) == null) 19 hd = p; 20 else 21 tl.next = p; 22 tl = p; 23 } 24 setTabAt(tab, index, new TreeBin<K,V>(hd)); 25 } 26 } 27 } 28 } 29 }
分析:
從上述源碼上看,當節點鏈表上的元素大於等於8時,並不是一定要將數據結構轉換成樹。而是要先判斷數組的容量,如果數組長度小於64,會進行擴容(擴容為原來數組長度的一倍),否則才會轉換成樹。
tryPresize:擴容函數,注意通過treeifyBin調用tryPresize時,入參已經擴大2倍。
1 /** 2 * 擴容時大小總是2的N次方 3 * 擴容這里可能有一點繞,用一個例子來走下流程 4 * 假設原來數組長度為16(默認值),在調用tryPresize的時候size的值已經變成了32(16<<1),此時sizeCtl為12 5 * 計算出c的值為64,注意擴容會在transfer中進行(前提數組已經初始化),每次擴大2倍,由於數組長度基數為2的N次方,所以最終的數組長度也是2的N次方。 6 * 注意c的值是用來控制循環退出的,條件c<=sc(sizeCtl)。 7 * 數組長度 sizeCtl 8 *第一次擴容: 32 28 9 *第二次擴容: 64 48 10 *第三次擴容: 128 96 此時c(64)<sc(96) 此時退出擴容 11 */ 12 private final void tryPresize(int size) { 13 // 通過tableSizeFor計算擴容退出控制量標志,容量大小總是2的N次方 14 int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : 15 tableSizeFor(size + (size >>> 1) + 1); 16 int sc; 17 while ((sc = sizeCtl) >= 0) { 18 Node<K,V>[] tab = table; int n; 19 // 初始化 20 // 如果tab未初始化,則初始化一個大小為sizeCtl和c中較大的數組 21 // 初始化是將sizeCtl設置為-1,完成之后將其設置為數組長度的3/4 22 // 在此進行初始化,主要是因為如果直接調用putAll方法進行元素添加時,table還未初始化,所以這里需要判斷table是否進行了初始化 23 if (tab == null || (n = tab.length) == 0) { 24 n = (sc > c) ? sc : c; 25 // 初始化tab的時候,把sizeCtl設置為-1,通過CAS 26 if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 27 try { 28 if (table == tab) { 29 @SuppressWarnings("unchecked") 30 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 31 table = nt; 32 sc = n - (n >>> 2); 33 } 34 } finally { 35 sizeCtl = sc; 36 } 37 } 38 } 39 // 一直擴容到c小於等於sizeCtl或者數組長度大於最大長度的時候,退出擴容 40 else if (c <= sc || n >= MAXIMUM_CAPACITY) 41 break; 42 else if (tab == table) { 43 int rs = resizeStamp(n); 44 // 如果正在擴容,則幫助擴容 45 // 否則的話,開始新的擴容 46 // 在transfer操作,將第一個參數的table元素,移到第二個元素的table去, 47 // 雖然此時第二個參數設置的是null,但是在transfer方法中,第二個參數為null的時候,會創建一個兩倍大小的table 48 // sc小於0表示有線程在進行操作 49 if (sc < 0) { 50 Node<K,V>[] nt; 51 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 52 sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 53 transferIndex <= 0) 54 break; 55 // 將線程數加一,該線程將進行transfer,在transfer的時候,sc表示transfer工作線程數 56 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 57 transfer(tab, nt); 58 } 59 // 沒有初始化或擴容,直接進行擴容 60 else if (U.compareAndSwapInt(this, SIZECTL, sc, 61 (rs << RESIZE_STAMP_SHIFT) + 2)) 62 transfer(tab, null); 63 } 64 } 65 }
分析:
擴容時稍微有一點繞,但上面注釋給出了一個例子,理解該例子應該就可以理解擴容,特別要注意源碼中的c值,可以看做是擴容控制值,通過該值來終止擴容函數。
transfer:數組擴容函數
1 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { 2 int n = tab.length, stride; 3 // 確定線程負責數組大小的范圍 4 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 5 stride = MIN_TRANSFER_STRIDE; // subdivide range 6 // 擴容后數組長度為原來的兩倍 7 if (nextTab == null) { // initiating 8 try { 9 @SuppressWarnings("unchecked") 10 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; 11 nextTab = nt; 12 } catch (Throwable ex) { // try to cope with OOME 13 sizeCtl = Integer.MAX_VALUE; 14 return; 15 } 16 nextTable = nextTab; 17 transferIndex = n; 18 } 19 int nextn = nextTab.length; 20 /** 21 * 創建一個fwd結點,用來控制並發,當一個結點為空或者已經被轉移之后,就設置為fwd結點 22 * 這是一個空的標志節點 23 */ 24 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); 25 // 是否繼續向前查找的標志位 26 boolean advance = true; 27 boolean finishing = false; // to ensure sweep before committing nextTab 28 for (int i = 0, bound = 0;;) { 29 Node<K,V> f; int fh; 30 while (advance) { 31 int nextIndex, nextBound; 32 if (--i >= bound || finishing) 33 advance = false; 34 else if ((nextIndex = transferIndex) <= 0) { 35 i = -1; 36 advance = false; 37 } 38 else if (U.compareAndSwapInt 39 (this, TRANSFERINDEX, nextIndex, 40 nextBound = (nextIndex > stride ? 41 nextIndex - stride : 0))) { 42 bound = nextBound; 43 i = nextIndex - 1; 44 advance = false; 45 } 46 } 47 if (i < 0 || i >= n || i + n >= nextn) { 48 int sc; 49 // 數據遷移完成,替換舊桶數據 50 if (finishing) { 51 nextTable = null; 52 table = nextTab; 53 // 設置sizeCtl為擴容后的0.75 54 sizeCtl = (n << 1) - (n >>> 1); 55 return; 56 } 57 // 擴容完成,將擴容線程數-1 58 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 59 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 60 return; 61 // finishing和advance設置為true,重新走到上面if條件,再次檢查是否遷移完 62 // 通過fh=f.hash==MOVED進行判斷 63 finishing = advance = true; 64 i = n; // recheck before commit 65 } 66 } 67 // 如果桶中無數據,則放入fwd標記,表示該位置已遷移 68 else if ((f = tabAt(tab, i)) == null) 69 advance = casTabAt(tab, i, null, fwd); 70 // 如果桶中第一個元素的hash值為MOVED,說明該節點為fwd節點,詳情看fwd節點的構造函數 71 // 說明該位置已經被遷移 72 else if ((fh = f.hash) == MOVED) 73 advance = true; // already processed 74 else { 75 // 加鎖遷移元素 76 synchronized (f) { 77 // 再次判斷桶中第一個元素是否有過修改 78 if (tabAt(tab, i) == f) { 79 /** 80 * 把一個鏈表划分成兩個鏈表 81 * 規則是桶中各元素的hash值與桶大小n進行與操作 82 * 等於0的放到低位鏈表(low)中,等於1的放到高位鏈表(high)中 83 * 其中低位鏈表遷移到新桶的位置是相對舊桶不變的 84 * 高位鏈表遷移到新桶的位置正好是其在舊桶位置上加n,這里在HashMap(jdk1.8中)分析過。 85 * 這就是為什么擴容時,容量變成原來兩倍的原因 86 */ 87 Node<K,V> ln, hn; // ln:low節點 hn:height節點 88 // 鏈表的節點hash值大於0,TreeBin的hash值為-2 89 if (fh >= 0) { 90 // 首先計算出當前結點的位置 91 int runBit = fh & n; 92 Node<K,V> lastRun = f; 93 for (Node<K,V> p = f.next; p != null; p = p.next) { 94 int b = p.hash & n; 95 // 同一節點下hashCode可能是不同的,這樣才會有hash分布 96 // 更新runBit的值,找出與f不同的節點 97 // 這里一直要找到鏈表尾,但是lastRun不一定是尾節點,也就是找到最后一段相同的 98 // 因為是鏈表,當位置相同,直接就帶過去了,避免沒必要的循環 99 if (b != runBit) { 100 runBit = b; 101 lastRun = p; 102 } 103 } 104 // 設置低位節點 105 if (runBit == 0) { 106 ln = lastRun; 107 hn = null; 108 } 109 // 設置高位節點 110 else { 111 hn = lastRun; 112 ln = null; 113 } 114 // 生成兩條鏈表,直接拼接 115 // 找到不等於lastRun的節點,進行拼接,不是倒序,這里就是進行一個拼接,因為把hash值相同的鏈從lastRun帶過來了 116 for (Node<K,V> p = f; p != lastRun; p = p.next) { 117 int ph = p.hash; K pk = p.key; V pv = p.val; 118 if ((ph & n) == 0) 119 ln = new Node<K,V>(ph, pk, pv, ln); 120 else 121 hn = new Node<K,V>(ph, pk, pv, hn); 122 } 123 // 這里設置和hashMap類似,在相應點上設置節點即可 124 setTabAt(nextTab, i, ln); 125 setTabAt(nextTab, i + n, hn); 126 // 在舊的鏈表位置上設置占位符,標記已遷移完成 127 setTabAt(tab, i, fwd); 128 advance = true; 129 } 130 /** 131 * 結點是樹的情況 132 * 和鏈表相同,分成兩顆樹,根據hash&n為0的放在低位樹,為1的放在高位樹 133 */ 134 else if (f instanceof TreeBin) { 135 TreeBin<K,V> t = (TreeBin<K,V>)f; 136 TreeNode<K,V> lo = null, loTail = null; 137 TreeNode<K,V> hi = null, hiTail = null; 138 int lc = 0, hc = 0; 139 // 遍歷整棵樹,根據hash&n是否為0進行划分 140 for (Node<K,V> e = t.first; e != null; e = e.next) { 141 int h = e.hash; 142 TreeNode<K,V> p = new TreeNode<K,V> 143 (h, e.key, e.val, null, null); 144 if ((h & n) == 0) { 145 if ((p.prev = loTail) == null) 146 lo = p; 147 else 148 loTail.next = p; 149 loTail = p; 150 ++lc; 151 } 152 else { 153 if ((p.prev = hiTail) == null) 154 hi = p; 155 else 156 hiTail.next = p; 157 hiTail = p; 158 ++hc; 159 } 160 } 161 // 復制完樹結點之后,如果樹的節點小於等於6時,就轉回鏈表 162 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : 163 (hc != 0) ? new TreeBin<K,V>(lo) : t; 164 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : 165 (lc != 0) ? new TreeBin<K,V>(hi) : t; 166 // 低位樹的位置不變 167 setTabAt(nextTab, i, ln); 168 // 高位樹的位置在原來位置上加n 169 setTabAt(nextTab, i + n, hn); 170 // 標記該位置已經進遷移 171 setTabAt(tab, i, fwd); 172 // 繼續循環,執行--i操作 173 advance = true; 174 } 175 } 176 } 177 } 178 } 179 }
分析:
擴容函數中對於中間有段求i的值不是特別明白,其他流程還是比較清楚的,和HashMap的擴容有點類似,鏈表分成兩段進行處理,通過hash&n是否等於0進行划分,遷移是從靠后的桶開始的(具體就在中間那段求i的值處),在遷移過程中鎖住了當前桶,還是采用了分段鎖的思想。需注意:#1.針對樹節點,如果擴容后樹節點上的元素總數小於等於6,則會退化成鏈表;#2.在鏈表拆分后進行組合時並不一定是倒序。
在put操作中還有一個幫助擴容的函數:helpTransfer
1 // 線程添加元素時發現正在擴容且當前元素所在的桶已經遷移完成,則協助遷移其他桶的元素 2 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { 3 Node<K,V>[] nextTab; int sc; 4 // 如果桶數組不為空,並且當前桶第一個元素為fwd類型,且nexttable不為空 5 // 說明當前桶已經遷移完畢,可以去幫助遷移其他的桶的元素了 6 if (tab != null && (f instanceof ForwardingNode) && 7 (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { 8 int rs = resizeStamp(tab.length); 9 // sizeCtl<0,說明正在擴容 10 while (nextTab == nextTable && table == tab && 11 (sc = sizeCtl) < 0) { 12 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 13 sc == rs + MAX_RESIZERS || transferIndex <= 0) 14 break; 15 // 擴容線程數加1 16 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { 17 // 當前線程幫忙遷移元素 18 transfer(tab, nextTab); 19 break; 20 } 21 } 22 return nextTab; 23 } 24 return table; 25 }
分析:
只有當前桶元素遷移完成了才能去協助遷移其他桶的元素。
接下來看addCount函數,該函數在put操作后會判斷是否需要擴容,如果達到擴容門檻,則進行擴容或協助擴容。
1 private final void addCount(long x, int check) { 2 CounterCell[] as; long b, s; 3 // 如果計數盒子不為空,或者修改baseCount失敗 4 if ((as = counterCells) != null || 5 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 6 CounterCell a; long v; int m; 7 boolean uncontended = true; 8 // 如果as為空,或者長度為0,或者當前線程所在的段為null,或者在當前線程的段上加數量失敗 9 if (as == null || (m = as.length - 1) < 0 || 10 (a = as[ThreadLocalRandom.getProbe() & m]) == null || 11 !(uncontended = 12 U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { 13 // 這里對counterCells擴容,減少多線程hash到同一個段的頻率 14 fullAddCount(x, uncontended); 15 return; 16 } 17 if (check <= 1) 18 return; 19 // 計算元素個數 20 s = sumCount(); 21 } 22 if (check >= 0) { 23 Node<K,V>[] tab, nt; int n, sc; 24 // 如果元素個數達到了擴容門檻,則進行擴容 25 // sizeCtl即為擴容門檻,它為容量的0.75倍 26 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && 27 (n = tab.length) < MAXIMUM_CAPACITY) { 28 // rs是擴容的一個郵戳標識 29 int rs = resizeStamp(n); 30 // sc小於0,表明正在擴容 31 if (sc < 0) { 32 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 33 sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 34 transferIndex <= 0) 35 // 擴容完成,退出循環 36 break; 37 // 擴容未完成,將當前線程加入遷移元素中,並把擴容線程數加1 38 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 39 transfer(tab, nt); 40 } 41 else if (U.compareAndSwapInt(this, SIZECTL, sc, 42 (rs << RESIZE_STAMP_SHIFT) + 2)) 43 // 進行元素遷移 44 transfer(tab, null); 45 // 重新計算元素個數 46 s = sumCount(); 47 } 48 } 49 }
分析:
該函數的主要作用就是將元素個數加1,並且判斷是否需要進行擴容。目前對該函數的詳細邏輯不是特別清楚,后續再來進行分析。
一個put操作涉及的內容太多了,還需深入理解,下面來看get操作:
1 public V get(Object key) { 2 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 3 // 計算hash 4 int h = spread(key.hashCode()); 5 // 如果對應位置上有元素 6 if ((tab = table) != null && (n = tab.length) > 0 && 7 (e = tabAt(tab, (n - 1) & h)) != null) { 8 // 如果第一個元素就是要找的元素,則直接返回 9 if ((eh = e.hash) == h) { 10 if ((ek = e.key) == key || (ek != null && key.equals(ek))) 11 return e.val; 12 } 13 // 如果hash小於0,則說明是樹或正在擴容,則使用find尋找元素,find根據Node的不同子類實現方式不同 14 else if (eh < 0) 15 return (p = e.find(h, key)) != null ? p.val : null; 16 // 遍歷整個鏈表尋找元素 17 while ((e = e.next) != null) { 18 if (e.hash == h && 19 ((ek = e.key) == key || (ek != null && key.equals(ek)))) 20 return e.val; 21 } 22 } 23 return null; 24 }
分析:
get操作整體來說邏輯清楚明了,與HashMap類似,但是要注意hash值小於0的時候,其尋找元素的方式有所不同,並且整個獲取元素的過程是沒有加鎖的。
接下來看remove操作:
1 final V replaceNode(Object key, V value, Object cv) { 2 // 計算hash值 3 int hash = spread(key.hashCode()); 4 // 進行自旋操作 5 for (Node<K,V>[] tab = table;;) { 6 Node<K,V> f; int n, i, fh; 7 // 如果tab為空,或者key所在的位置上沒有元素,則直接終止自旋 8 if (tab == null || (n = tab.length) == 0 || 9 (f = tabAt(tab, i = (n - 1) & hash)) == null) 10 break; 11 // 正在擴容,則協助其擴容 12 else if ((fh = f.hash) == MOVED) 13 tab = helpTransfer(tab, f); 14 else { 15 V oldVal = null; 16 // 標記是否處理過 17 boolean validated = false; 18 // 加鎖 19 synchronized (f) { 20 // 再次驗證當前位置上的元素是否被修改過 21 if (tabAt(tab, i) == f) { 22 // 鏈表 23 if (fh >= 0) { 24 validated = true; 25 // 遍歷鏈表,尋找節點 26 for (Node<K,V> e = f, pred = null;;) { 27 K ek; 28 if (e.hash == hash && 29 ((ek = e.key) == key || 30 (ek != null && key.equals(ek)))) { 31 // 找到目標元素 32 V ev = e.val; 33 if (cv == null || cv == ev || 34 (ev != null && cv.equals(ev))) { 35 oldVal = ev; 36 // 如果value不為空,則替換舊值 37 if (value != null) 38 e.val = value; 39 else if (pred != null) 40 // 前置節點不為空,刪除當前節點 41 pred.next = e.next; 42 else 43 // 如果前置節點為空,則說明是桶中第一個元素,則刪除即可 44 setTabAt(tab, i, e.next); 45 } 46 break; 47 } 48 // 更新前置節點 49 pred = e; 50 // 遍歷到鏈表尾還未找打元素,則跳出循環 51 if ((e = e.next) == null) 52 break; 53 } 54 } 55 // 節點是樹 56 else if (f instanceof TreeBin) { 57 validated = true; 58 TreeBin<K,V> t = (TreeBin<K,V>)f; 59 TreeNode<K,V> r, p; 60 // 遍歷樹找到目標節點 61 if ((r = t.root) != null && 62 (p = r.findTreeNode(hash, key, null)) != null) { 63 V pv = p.val; 64 if (cv == null || cv == pv || 65 (pv != null && cv.equals(pv))) { 66 oldVal = pv; 67 if (value != null) 68 // 替換舊值 69 p.val = value; 70 else if (t.removeTreeNode(p)) 71 // 當removeTreeNode返回true表示樹的元素個數較少,則退化成鏈表 72 setTabAt(tab, i, untreeify(t.first)); 73 } 74 } 75 } 76 } 77 } 78 // 如果處理過 79 if (validated) { 80 // 找到了元素,返回其舊值 81 if (oldVal != null) { 82 // 如果要替換的值為空,則將元素個數減1 83 if (value == null) 84 addCount(-1L, -1); 85 return oldVal; 86 } 87 break; 88 } 89 } 90 } 91 return null; 92 }
分析:
利用自旋刪除元素,整體流程清晰,根據鏈表或樹進行相應操作,注意如果刪除過程中正在進行擴容,需要協助其擴容后再進行刪除。
size函數:獲取元素個數
1 public int size() { 2 // 調用sumCount計算元素個數 3 long n = sumCount(); 4 return ((n < 0L) ? 0 : 5 (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : 6 (int)n); 7 } 8 9 final long sumCount() { 10 // 計算CounterCell所有段以及baseCount的數量之和 11 CounterCell[] as = counterCells; CounterCell a; 12 long sum = baseCount; 13 if (as != null) { 14 for (int i = 0; i < as.length; ++i) { 15 if ((a = as[i]) != null) 16 sum += a.value; 17 } 18 } 19 return sum; 20 }
分析:
元素的個數會計算CounterCell所有段和baseCount之和,並且該函數是沒有加鎖的。
3.總結
ConcurrentHashMap的源碼分析真不容易,代碼量非常的大,其實有的地方目前還沒弄懂,需后續反復閱讀。
#1.ConcurrentHashMap是HashMap的線程安全版本。
#2.ConcurrentHashMap底層數據結構為數組+鏈表+紅黑樹,默認容量為16,不允許[key,value]為null。
#3.ConcurrentHashMap內部采用的鎖有synchronized、CAS、自旋鎖、分段鎖、volatile。
#4.通過sizeCtl變量來控制擴容、初始化等操作。
#5.查詢操作不加鎖,因此ConcurrentHashMap不是強一致性。
ConcurrentHashMap未完待續!!!
by Shawn Chen,2019.09.18日,下午。