HashMap的線程安全版本,可以用來替換HashTable。在hash碰撞過多的情況下會將鏈表轉化成紅黑樹。1.8版本的ConcurrentHashMap的實現與1.7版本有很大的差別,放棄了段鎖的概念,借鑒了HashMap的數據結構:數組+鏈表+紅黑樹。ConcurrentHashMap不接受nullkey和nullvalue。
數據結構:
數組+鏈表+紅黑樹
並發原理:
cas樂觀鎖+synchronized鎖
加鎖對象:
數組每個位置的頭節點
方法分析:
put方法:
先根據key的hash值定位桶位置,然后cas操作獲取該位置頭節點,接着使用synchronized鎖鎖住頭節點,遍歷該位置的鏈表或者紅黑樹進行插入操作。
稍微具體一點:
1.根據key的hash值定位到桶位置
2.判斷if(table==null),先初始化table。
3.判斷if(table[i]==null),cas添加元素。成功則跳出循環,失敗則進入下一輪for循環。
4.判斷是否有其他線程在擴容table,有則幫忙擴容,擴容完成再添加元素。進入真正的put步驟
5.真正的put步驟。桶的位置不為空,遍歷該桶的鏈表或者紅黑樹,若key已存在,則覆蓋;不存在則將key插入到鏈表或紅黑樹的尾部。
並發問題:假如put操作時正好有別的線程正在對table數組(map)擴容怎么辦?
答:暫停put操作,先幫助其他線程對map擴容。
源碼:
final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); //分散Hash int hash = spread(key.hashCode()); int binCount = 0; //這里是一個死循環,可能的出口如下 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) //上面已經分析了初始化過程,初始化完成后繼續執行死循環 tab = initTable(); //數組的第一個元素為空,則賦值 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //這里使用了CAS,避免使用鎖。如果CAS失敗,說明該節點已經發生改變, //可能被其他線程插入了,那么繼續執行死循環,在鏈尾插入。 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) //可能的出口一 break; // no lock when adding to empty bin } //如果tab正在resize,則幫忙一起執行resize //這里監測到的的條件是目標桶被設置成了FORWORD。如果桶沒有設置為 //FORWORD節點,即使正在擴容,該線程也無感知。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); //執行put操作 else { V oldVal = null; //這里請求了synchronized鎖。這里要注意,不會出現 //桶正在resize的過程中執行插入,因為桶resize的時候 //也請求了synchronized鎖。即如果該桶正在resize,這里會發生鎖等待 synchronized (f) { //如果是鏈表的首個節點 if (tabAt(tab, i) == f) { //並且是一個用戶節點,非Forwarding等節點 if (fh >= 0) { binCount = 1; for (Node<K,V> e = f;; ++binCount) { K ek; //找到相等的元素更新其value if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; //可能的出口二 break; } //否則添加到鏈表尾部 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); //可能的出口三 break; } } } else if (f instanceof TreeBin) { Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { //如果鏈表長度(碰撞次數)超過8,將鏈表轉化為紅黑樹 if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } //見下面的分析 addCount(1L, binCount); return null; }
get方法:
根據key的hash值定位,遍歷鏈表或者紅黑樹,獲取節點。
具體一點:
1.根據key的hash值定位到桶位置。
2.map是否初始化,沒有初始化則返回null。否則進入3
3.定位到的桶位置是否有頭結點,沒有返回nul,否則進入4
4.是否有其他線程在擴容,有的話調用find方法查找。所以這里可以看出,擴容操作和get操作不沖突,擴容map的同時可以get操作。
5.若沒有其他線程在擴容,則遍歷桶對應的鏈表或者紅黑樹,使用equals方法進行比較。key相同則返回value,不存在則返回null.
並發問題:假如此時正好有別的線程正在對數組擴容怎么辦?
答:沒關系,擴容的時候不會破壞原來的table,遍歷任然可以繼續,不需要加鎖。
源碼:
//不用擔心get的過程中發生resize,get可能遇到兩種情況
//1.桶未resize(無論是沒達到閾值還是resize已經開始但是還未處理該桶),遍歷鏈表
//2.在桶的鏈表遍歷的過程中resize,上面的resize分析可以看出並未破壞原tab的桶的節點關系,遍歷仍可以繼續
//不用擔心get的過程中發生resize,get可能遇到兩種情況 //1.桶未resize(無論是沒達到閾值還是resize已經開始但是還未處理該桶),遍歷鏈表 //2.在桶的鏈表遍歷的過程中resize,上面的resize分析可以看出並未破壞原tab的桶的節點關系,遍歷仍可以繼續 public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
擴容方法:
什么情況會導致擴容?
1.鏈表轉換為紅黑樹時(鏈表節點個數達到8個可能會轉換為紅黑樹)。如果轉換時map長度小於64則直接擴容一倍,不轉化為紅黑樹。如果此時map長度大於64,則不會擴容,直接進行鏈表轉紅黑樹的操作。
2.map中總節點數大於閾值(即大於map長度的0.75倍)時會進行擴容。
如何擴容?
1.創建一個新的map,是原先map的兩倍。注意此過程是單線程創建的
2.復制舊的map到新的map中。注意此過程是多線程並發完成。(將map按照線程數量平均划分成多個相等區域,每個線程負責一塊區域的復制任務)
擴容的具體過程:
答:
注:擴容操作是hashmap最復雜難懂的地方,博主也是看了很久才看懂個大概。一兩句話真的很難說清楚,建議有時間還是看源碼比較好。網上很少有人使用通俗易懂語言來描述擴容的機制。所以這里我嘗試用自己的語言做一個簡要的概括,描述一下大體的流程,供大家參考,如果覺得不錯,可以點個贊,表示對博主的支持,謝謝。
整體思路:擴容是並發擴容,也就是多個線程共同協作,把舊table中的鏈表一個個復制到新table中。
1.給多個線程划分各自負責的區域。分配時是從后向前分配。假設table原先長度是64,有四個線程,則第一個到達的線程負責48-63這塊內容的復制,第二個線程負責32-47,第三個負責16-31,第四個負責0-15。
2.每個線程負責各自區域,復制時是一個個從后向前復制的。如第一個線程先復制下標為63的桶的復制。63復制完了接下來復制62,一直向前,直到完成自己負責區域的所有復制。
3.完成自己區域的任務之后,還沒有結束,這時還會判斷一下其他線程負責區域有沒有完成所有復制任務,如果沒有完成,則可能還會去幫助其它線程復制。比如線程1先完成了,這時它看到線程2才做了一半,這時它會幫助線程2去做剩下一半任務。
4.那么復制到底是怎么完成的呢?線程之間相互幫忙會導致混亂嗎?
5.首先回答上面第一個問題,我們知道,每個數組的每個桶存放的是一個鏈表(紅黑樹也可能,這里只討論是鏈表情況)。復制的時候,先將鏈表拆分成兩個鏈表。拆分的依據是鏈表中的每個節點的hash值和未擴容前數組長度n進行與運算。運算結果可能為0和1,所以結果為0的組成一個新鏈表,結果為1的組成一個新鏈表。為0的鏈表放在新table的 i 位置,為1的鏈表放在 新table的 i+n處。擴容后新table是原先table的兩倍,即長度是2n。
6.接着回答上面第二個問題,線程之間相互幫忙不會造成混亂。因為線程已完成復制的位置會標記該位置已完成,其他線程看到標記則會直接跳過。而對於正在執行的復制任務的位置,則會直接鎖住該桶,表示這個桶我來負責,其他線程不要插手。這樣,就不會有並發問題了。
7.什么時候結束呢?每個線程參加復制前會將標記位sizeCtl加1,同樣退出時會將sizeCtl減1,這樣每個線程退出時,只要檢查一下sizeCtl是否等於進入前的狀態就知道是否全都退出了。最后一個退出的線程,則將就table的地址更新指向新table的地址,這樣后面的操作就是新table的操作了。
總結:上面的一字一句都是自己看完源碼手敲出來的,為了簡單易懂,可能會將一些細節忽略,但是其中最重要的思想都還包含在上面。如果有疑問或者有錯誤的地方,歡迎在評論區留言。
擴容源碼:
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { Node<K,V>[] nextTab; int sc; //nextTab為空時,則說明擴容已經完成 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { int rs = resizeStamp(tab.length); while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || transferIndex <= 0) break; if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { transfer(tab, nextTab); break; } } return nextTab; } return table; } //復制元素到nextTab transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //NCPU為CPU核心數,每個核心均分復制任務,如果均分小於16個 //那么以16為步長分給處理器:例如0-15號給處理器1,16-32號分給處理器2。處理器3就不用接任務了。 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //如果nextTab為空則初始化為原tab的兩倍,這里只會時單線程進得來,因為這初始化了nextTab, //addcount里面判斷了nextTab為空則不執行擴容任務 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; //構造一個forword節點 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false; else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null; table = nextTab; // sizeCtl=nextTab.length*0.75=2*tab.length*0.75=tab.length*1.5!!! sizeCtl = (n << 1) - (n >>> 1); return; } //sc - 1表示當前線程完成了擴容任務,sizeCtl的線程數要-1 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { //還有線程在擴容,就不能設置finish為true if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return; finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { //這保證了不會出現該桶正在resize又執行put操作的情況 synchronized (f) { if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; //這里盡量少的復制鏈表節點,從lastrun到鏈尾的這段鏈表段,無需復制節點,直接復用 if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } //其他節點執行復制 for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
initTable方法:
private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; //如果table為null或者長度為0, //則一直循環試圖初始化table(如果某一時刻別的線程將table初始化好了,那table不為null, 該//線程就結束while循環)。 while ((tab = table) == null || tab.length == 0) { //如果sizeCtl小於0, //即有其他線程正在初始化或者擴容,執行Thread.yield()將當前線程掛起,讓出CPU時間, //該線程從運行態轉成就緒態。 //如果該線程從就緒態轉成運行態了,此時table可能已被別的線程初始化完成,table不為 //null,該線程結束while循環。 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin //如果此時sizeCtl不小於0,即沒有別的線程在做table初始化和擴容操作, //那么該線程就會調用Unsafe的CAS操作compareAndSwapInt嘗試將sizeCtl的值修改成 //-1(sizeCtl=-1表示table正在初始化,別的線程如果也進入了initTable方法則會執行 //Thread.yield()將它的線程掛起 讓出CPU時間), //如果compareAndSwapInt將sizeCtl=-1設置成功 則進入if里面,否則繼續while循環。 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { //再次確認當前table為null即還未初始化,這個判斷不能少。 if ((tab = table) == null || tab.length == 0) { //如果sc(sizeCtl)大於0,則n=sc,否則n=默認的容量大 小16, //這里的sc=sizeCtl=0,即如果在構造函數沒有指定容量 大小, //否則使用了有參數的構造函數,sc=sizeCtl=指定的容量大小。 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") //創建指定容量的Node數組(table)。 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; //計算閾值,n - (n >>> 2) = 0.75n當ConcurrentHashMap儲存的鍵值對數量 //大於這個閾值,就會發生擴容。 //這里的0.75相當於HashMap的默認負載因子,可以發現HashMap、Hashtable如果 //使用傳入了負載因子的構造函數初始化的話,那么每次擴容,新閾值都是=新容 //量 * 負載因子,而ConcurrentHashMap不管使用的哪一種構造函數初始化, //新閾值都是=新容量 * 0.75。 sc = n - (n >>> 2); } } finally { sizeCtl = sc; } break; } } return tab; }
簡單來說就是:
1.多線程使用cas樂觀鎖競爭tab數組初始化的權力。
2.線程競爭成功,則初始化tab數組。
3.競爭失敗的線程則讓出cpu(從運行態到就緒態)。等再次得到cpu時,發現tab!=null,即已經有線程初始化tab數組了,則退出即可。
remove方法:
public V remove(Object key) { return replaceNode(key, null, null); } final V replaceNode(Object key, V value, Object cv) { //計算需要移除的鍵key的哈希地址。 int hash = spread(key.hashCode()); //遍歷table。 for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; //table為空,或者鍵key所在的bucket為空,則跳出循環返回。 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) break; //如果當前table正在擴容,則調用helpTransfer方法,去協助擴容。 else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null; boolean validated = false; //將鍵key所在的bucket加鎖。 synchronized (f) { if (tabAt(tab, i) == f) { //bucket頭節點的哈希地址大於等於0,為鏈表。 if (fh >= 0) { validated = true; //遍歷鏈表。 for (Node<K,V> e = f, pred = null;;) { K ek; //找到哈希地址、鍵key相同的節點,進行移除。 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { oldVal = ev; if (value != null) e.val = value; else if (pred != null) pred.next = e.next; else setTabAt(tab, i, e.next); } break; } pred = e; if ((e = e.next) == null) break; } } //如果bucket的頭節點小於0,即為紅黑樹。 else if (f instanceof TreeBin) { validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; //找到節點,並且移除。 if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value; else if (t.removeTreeNode(p)) setTabAt(tab, i, untreeify(t.first)); } } } } } //調用addCount方法,將當前ConcurrentHashMap存儲的鍵值對數量-1。 if (validated) { if (oldVal != null) { if (value == null) addCount(-1L, -1); return oldVal; } break; } } } return null; }
總結:
1.擴容完成后做了什么?
nextTable=null //新數組的引用置為null
tab=nextTab //舊數組的引用指向新數組
sizeCtl=0.75n //擴容閾值重新設置,數組元素個數超過這個閾值就會觸發擴容
2.concurrentHashMap中設置為volatile的變量有哪些?
Node,nextTable,baseCount,sizeCtl
3.單線程初始化,多線程擴容
4.什么時候觸發擴容?
1.鏈表轉換為紅黑樹時(鏈表節點個數達到8個可能會轉換為紅黑樹),table數組長度小於64。
2.數組中總節點數大於閾值(數組長度的0.75倍)
5.如何保證初始化nextTable時是單線程的?
所有調用transfer的方法(例如helperTransfer、addCount)幾乎都預先判斷了nextTab!=null,而nextTab只會在transfer方法中初始化,保證了第一個進來的線程初始化之后其他線程才能進入。
6.get操作時擴容怎么辦?
7.put操作擴容時怎么辦?
8.如何hash定位?
答:h^(h>>>16)&0x7fffffff,即先將hashCode的高16位和低16位異或運算,這個做目的是為了讓hash值更加隨機。和0x7fffffff相與運算是為了得到正數,因為負數的hash有特殊用途,如-1表示forwardingNode(上面說的表示該位置正在擴容),-2表示是一顆紅黑樹。
9.forwardingNode有什么內容?
nextTable //擴容時執向新table的引用
hash=moved //moved是常量-1,正在擴容的標記
10.擴容前鏈表和擴容后鏈表順序問題
語言描述很難解釋,直接看圖,hn指向最后同一類的第一個節點,hn->6->7,此時ln->null,接着從頭開始遍歷鏈表;
第一個節點:由於1的hash&n==1,所以應該放到hn指向的鏈表,采用頭插法。hn->1->6->7
第二個節點:同樣,hn->2->1->6->7
第三個節點:hash&n==0,所以應該插入到ln鏈表,采用頭插法,ln->3
.....
最后:
ln->5->3 //復制到新table的i位置處
hn->2->1->6->7 //復制到新table的i+n位置處
可以看到ln中所有元素都是后來一個個插入進來的,所以都是逆序
而hn中6->7是初始賦予的所以順序,而其1,2是后來插入的,所以逆序。
總結:有部分順序,有部分逆序。看情況
————————————————
版權聲明:本文為CSDN博主「卻顧所來徑」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/weixin_42130471/article/details/89813248