並發編程——ConcurrentHashMap#transfer() 擴容逐行分析
前言
ConcurrentHashMap 是並發中的重中之重,也是最常用的數據結構,之前的文章中,我們介紹了 putVal 方法。並發編程之 ConcurrentHashMap(JDK 1.8) putVal 源碼分析。其中分析了 initTable 方法和 putVal 方法,但也留下了一句話:
這篇文章僅僅是 ConcurrentHashMap 的開頭,關於 ConcurrentHashMap 里面的精華太多,值得我們好好學習。
說道精華,他的擴容方法絕對是精華,要知道,ConcurrentHashMap 擴容是高度並發的。
今天來逐行分析源碼。
先說結論
首先說結論。源碼加注釋我會放在后面。該方法的執行邏輯如下:
-
通過計算 CPU 核心數和 Map 數組的長度得到每個線程(CPU)要幫助處理多少個桶,並且這里每個線程處理都是平均的。默認每個線程處理 16 個桶。因此,如果長度是 16 的時候,擴容的時候只會有一個線程擴容。
-
初始化臨時變量 nextTable。將其在原有基礎上擴容兩倍。
-
死循環開始轉移。多線程並發轉移就是在這個死循環中,根據一個 finishing 變量來判斷,該變量為 true 表示擴容結束,否則繼續擴容。
3.1 進入一個 while 循環,分配數組中一個桶的區間給線程,默認是 16. 從大到小進行分配。當拿到分配值后,進行 i-- 遞減。這個 i 就是數組下標。(
其中有一個 bound 參數,這個參數指的是該線程此次可以處理的區間的最小下標,超過這個下標,就需要重新領取區間或者結束擴容,還有一個 advance 參數,該參數指的是是否繼續遞減轉移下一個桶,如果為 true,表示可以繼續向后推進,反之,說明還沒有處理好當前桶,不能推進
)
3.2 出 while 循環,進 if 判斷,判斷擴容是否結束,如果擴容結束,清空臨時變量,更新 table 變量,更新庫容閾值。如果沒完成,但已經無法領取區間(沒了),該線程退出該方法,並將 sizeCtl 減一,表示擴容的線程少一個了。如果減完這個數以后,sizeCtl 回歸了初始狀態,表示沒有線程再擴容了,該方法所有的線程擴容結束了。(這里主要是判斷擴容任務是否結束,如果結束了就讓線程退出該方法,並更新相關變量
)。然后檢查所有的桶,防止遺漏。
3.3 如果沒有完成任務,且 i 對應的槽位是空,嘗試 CAS 插入占位符,讓 putVal 方法的線程感知。
3.4 如果 i 對應的槽位不是空,且有了占位符,那么該線程跳過這個槽位,處理下一個槽位。
3.5 如果以上都是不是,說明這個槽位有一個實際的值。開始同步處理這個桶。
3.6 到這里,都還沒有對桶內數據進行轉移,只是計算了下標和處理區間,然后一些完成狀態判斷。同時,如果對應下標內沒有數據或已經被占位了,就跳過了。 -
處理每個桶的行為都是同步的。防止 putVal 的時候向鏈表插入數據。
4.1 如果這個桶是鏈表,那么就將這個鏈表根據 length 取於拆成兩份,取於結果是 0 的放在新表的低位,取於結果是 1 放在新表的高位。
4.2 如果這個桶是紅黑數,那么也拆成 2 份,方式和鏈表的方式一樣,然后,判斷拆分過的樹的節點數量,如果數量小於等於 6,改造成鏈表。反之,繼續使用紅黑樹結構。
4.3 到這里,就完成了一個桶從舊表轉移到新表的過程。
好,以上,就是 transfer 方法的總體邏輯。還是挺復雜的。再進行精簡,分成 3 步驟:
- 計算每個線程可以處理的桶區間。默認 16.
- 初始化臨時變量 nextTable,擴容 2 倍。
- 死循環,計算下標。完成總體判斷。
- 1 如果桶內有數據,同步轉移數據。通常會像鏈表拆成 2 份。
大體就是上面的 3 個步驟。
再來看看源碼和注釋。
再看源碼分析
源碼加注釋:
/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. * * transferIndex 表示轉移時的下標,初始為擴容前的 length。 * * 我們假設長度是 32 */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; // 將 length / 8 然后除以 CPU核心數。如果得到的結果小於 16,那么就使用 16。 // 這里的目的是讓每個 CPU 處理的桶一樣多,避免出現轉移任務不均勻的現象,如果桶較少的話,默認一個 CPU(一個線程)處理 16 個桶 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range 細分范圍 stridea:TODO // 新的 table 尚未初始化 if (nextTab == null) { // initiating try { // 擴容 2 倍 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; // 更新 nextTab = nt; } catch (Throwable ex) { // try to cope with OOME // 擴容失敗, sizeCtl 使用 int 最大值。 sizeCtl = Integer.MAX_VALUE; return;// 結束 } // 更新成員變量 nextTable = nextTab; // 更新轉移下標,就是 老的 tab 的 length transferIndex = n; } // 新 tab 的 length int nextn = nextTab.length; // 創建一個 fwd 節點,用於占位。當別的線程發現這個槽位中是 fwd 類型的節點,則跳過這個節點。 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); // 首次推進為 true,如果等於 true,說明需要再次推進一個下標(i--),反之,如果是 false,那么就不能推進下標,需要將當前的下標處理完畢才能繼續推進 boolean advance = true; // 完成狀態,如果是 true,就結束此方法。 boolean finishing = false; // to ensure sweep before committing nextTab // 死循環,i 表示下標,bound 表示當前線程可以處理的當前桶區間最小下標 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; // 如果當前線程可以向后推進;這個循環就是控制 i 遞減。同時,每個線程都會進入這里取得自己需要轉移的桶的區間 while (advance) { int nextIndex, nextBound; // 對 i 減一,判斷是否大於等於 bound (正常情況下,如果大於 bound 不成立,說明該線程上次領取的任務已經完成了。那么,需要在下面繼續領取任務) // 如果對 i 減一大於等於 bound(還需要繼續做任務),或者完成了,修改推進狀態為 false,不能推進了。任務成功后修改推進狀態為 true。 // 通常,第一次進入循環,i-- 這個判斷會無法通過,從而走下面的 nextIndex 賦值操作(獲取最新的轉移下標)。其余情況都是:如果可以推進,將 i 減一,然后修改成不可推進。如果 i 對應的桶處理成功了,改成可以推進。 if (--i >= bound || finishing) advance = false;// 這里設置 false,是為了防止在沒有成功處理一個桶的情況下卻進行了推進 // 這里的目的是:1. 當一個線程進入時,會選取最新的轉移下標。2. 當一個線程處理完自己的區間時,如果還有剩余區間的沒有別的線程處理。再次獲取區間。 else if ((nextIndex = transferIndex) <= 0) { // 如果小於等於0,說明沒有區間了 ,i 改成 -1,推進狀態變成 false,不再推進,表示,擴容結束了,當前線程可以退出了 // 這個 -1 會在下面的 if 塊里判斷,從而進入完成狀態判斷 i = -1; advance = false;// 這里設置 false,是為了防止在沒有成功處理一個桶的情況下卻進行了推進 }// CAS 修改 transferIndex,即 length - 區間值,留下剩余的區間值供后面的線程使用 else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound;// 這個值就是當前線程可以處理的最小當前區間最小下標 i = nextIndex - 1; // 初次對i 賦值,這個就是當前線程可以處理的當前區間的最大下標 advance = false; // 這里設置 false,是為了防止在沒有成功處理一個桶的情況下卻進行了推進,這樣對導致漏掉某個桶。下面的 if (tabAt(tab, i) == f) 判斷會出現這樣的情況。 } }// 如果 i 小於0 (不在 tab 下標內,按照上面的判斷,領取最后一段區間的線程擴容結束) // 如果 i >= tab.length(不知道為什么這么判斷) // 如果 i + tab.length >= nextTable.length (不知道為什么這么判斷) if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { // 如果完成了擴容 nextTable = null;// 刪除成員變量 table = nextTab;// 更新 table sizeCtl = (n << 1) - (n >>> 1); // 更新閾值 return;// 結束方法。 }// 如果沒完成 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {// 嘗試將 sc -1. 表示這個線程結束幫助擴容了,將 sc 的低 16 位減一。 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)// 如果 sc - 2 不等於標識符左移 16 位。如果他們相等了,說明沒有線程在幫助他們擴容了。也就是說,擴容結束了。 return;// 不相等,說明沒結束,當前線程結束方法。 finishing = advance = true;// 如果相等,擴容結束了,更新 finising 變量 i = n; // 再次循環檢查一下整張表 } } else if ((f = tabAt(tab, i)) == null) // 獲取老 tab i 下標位置的變量,如果是 null,就使用 fwd 占位。 advance = casTabAt(tab, i, null, fwd);// 如果成功寫入 fwd 占位,再次推進一個下標 else if ((fh = f.hash) == MOVED)// 如果不是 null 且 hash 值是 MOVED。 advance = true; // already processed // 說明別的線程已經處理過了,再次推進一個下標 else {// 到這里,說明這個位置有實際值了,且不是占位符。對這個節點上鎖。為什么上鎖,防止 putVal 的時候向鏈表插入數據 synchronized (f) { //擴容時,只在這個環節加鎖 // 判斷 i 下標處的桶節點是否和 f 相同 if (tabAt(tab, i) == f) { Node<K,V> ln, hn;// low, height 高位桶,低位桶 // 如果 f 的 hash 值大於 0 。TreeBin 的 hash 是 -2 if (fh >= 0) { // 對老長度進行與運算(第一個操作數的的第n位於第二個操作數的第n位如果都是1,那么結果的第n為也為1,否則為0) // 由於 Map 的長度都是 2 的次方(000001000 這類的數字),那么取於 length 只有 2 種結果,一種是 0,一種是1 // 如果是結果是0 ,Doug Lea 將其放在低位,反之放在高位,目的是將鏈表重新 hash,放到對應的位置上,讓新的取於算法能夠擊中他。 int runBit = fh & n; Node<K,V> lastRun = f; // 尾節點,且和頭節點的 hash 值取於不相等 // 遍歷這個桶 for (Node<K,V> p = f.next; p != null; p = p.next) { // 取於桶中每個節點的 hash 值 int b = p.hash & n; // 如果節點的 hash 值和首節點的 hash 值取於結果不同 if (b != runBit) { runBit = b; // 更新 runBit,用於下面判斷 lastRun 該賦值給 ln 還是 hn。 lastRun = p; // 這個 lastRun 保證后面的節點與自己的取於值相同,避免后面沒有必要的循環 } } if (runBit == 0) {// 如果最后更新的 runBit 是 0 ,設置低位節點 ln = lastRun; hn = null; } else { hn = lastRun; // 如果最后更新的 runBit 是 1, 設置高位節點 ln = null; }// 再次循環,生成兩個鏈表,lastRun 作為停止條件,這樣就是避免無謂的循環(lastRun 后面都是相同的取於結果) for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; // 如果與運算結果是 0,那么就還在低位 if ((ph & n) == 0) // 如果是0 ,那么創建低位節點 ln = new Node<K,V>(ph, pk, pv, ln); else // 1 則創建高位 hn = new Node<K,V>(ph, pk, pv, hn); } // 其實這里類似 hashMap // 設置低位鏈表放在新鏈表的 i setTabAt(nextTab, i, ln); // 設置高位鏈表,在原有長度上加 n setTabAt(nextTab, i + n, hn); // 將舊的鏈表設置成占位符 setTabAt(tab, i, fwd); // 繼續向后推進 advance = true; }// 如果是紅黑樹 else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; // 遍歷 for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); // 和鏈表相同的判斷,與運算 == 0 的放在低位 if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } // 不是 0 的放在高位 else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } // 如果樹的節點數小於等於 6,那么轉成鏈表,反之,創建一個新的樹 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; // 低位樹 setTabAt(nextTab, i, ln); // 高位數 setTabAt(nextTab, i + n, hn); // 舊的設置成占位符 setTabAt(tab, i, fwd); // 繼續向后推進 advance = true; } } } } } }
代碼加注釋比較長,有興趣可以逐行對照,有 2 個判斷樓主看不懂為什么這么判斷,知道的同學可以提醒一下。
然后,說說精華的部分。
- Cmap 支持並發擴容,實現方式是,將表拆分,讓每個線程處理自己的區間。如下圖:

假設總長度是 64 ,每個線程可以分到 16 個桶,各自處理,不會互相影響。
- 而每個線程在處理自己桶中的數據的時候,是下圖這樣的:

擴容前的狀態。
當對 4 號桶或者 10 號桶進行轉移的時候,會將鏈表拆成兩份,規則是根據節點的 hash 值取於 length,如果結果是 0,放在低位,否則放在高位。
因此,10 號桶的數據,黑色節點會放在新表的 10 號位置,白色節點會放在新桶的 26 號位置。
下圖是循環處理桶中數據的邏輯:
處理完之后,新桶的數據是這樣的:

總結
transfer 方法可以說很牛逼,很精華,內部多線程擴容性能很高,
通過給每個線程分配桶區間,避免線程間的爭用,通過為每個桶節點加鎖,避免 putVal 方法導致數據不一致。同時,在擴容的時候,也會將鏈表拆成兩份,這點和 HashMap 的 resize 方法類似。
而如果有新的線程想 put 數據時,也會幫助其擴容。鬼斧神工,令人贊嘆。
作者:莫那一魯道
鏈接:https://www.jianshu.com/p/2829fe36a8dd
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯系作者獲得授權並注明出處。
理解Java7和8里面HashMap+ConcurrentHashMap的擴容策略
https://mp.weixin.qq.com/s?__biz=MzAxMzE4MDI0NQ==&mid=2650336167&idx=1&sn=56f2583778afe80ce7a3476ad311e550&chksm=83aac79db4dd4e8b96aa46df3387157d1b8f7d279dfdb770a45a3fe9e17359a4e6e6ae499bb3&token=1182516540&lang=zh_CN#rd
HashMap? ConcurrentHashMap? 相信看完這篇沒人能難住你!
https://mp.weixin.qq.com/s?__biz=MzAxMzE4MDI0NQ==&mid=2650336167&idx=1&sn=56f2583778afe80ce7a3476ad311e550&chksm=83aac79db4dd4e8b96aa46df3387157d1b8f7d279dfdb770a45a3fe9e17359a4e6e6ae499bb3&token=1182516540&lang=zh_CN#rd
深入理解ConcurrentHashMap原理分析以及線程安全性問題
在之前的文章提到ConcurrentHashMap是一個線程安全的,那么我么看一下ConcurrentHashMap如何進行操作的。
ConcurrentHashMap與HashTable區別?
HashTable
put()源代碼
從代碼可以看出來在所有put 的操作的時候 都需要用 synchronized 關鍵字進行同步。並且key 不能為空。
這樣相當於每次進行put 的時候都會進行同步 當10個線程同步進行操作的時候,就會發現當第一個線程進去 其他線程必須等待第一個線程執行完成,才可以進行下去。性能特別差。
CurrentHashMap
分段鎖技術:ConcurrentHashMap相比 HashTable而言解決的問題就是 的 它不是鎖全部數據,而是鎖一部分數據,這樣多個線程訪問的時候就不會出現競爭關系。不需要排隊等待了。
從圖中可以看出來ConcurrentHashMap的主干是個Segment數組。
這就是為什么ConcurrentHashMap支持允許多個修改同時並發進行,原因就是采用的Segment分段鎖功能,每一個Segment 都想的於小的hash table並且都有自己鎖,只要修改不再同一個段上就不會引起並發問題。
final Segment<K,V>[] segments;
使用ConConcurrentHashMap時候 有時候會遇到跨段的問題,跨段的時候【size()、 containsValue()】,可能需要鎖定部分段或者全段,當操作結束之后,又回按照 順序 進行 釋放 每一段的鎖。注意是按照順序解鎖的。,每個Segment又包含了多個HashEntry.
transient volatile HashEntry<K,V>[] table; static final class HashEntry<K,V> { final int hash; final K key; volatile V value; volatile HashEntry<K,V> next; //其他省略 }
需要注意的是 Segment 是一種可重入鎖(繼承ReentrantLock)
那么我簡單說一下ReentrantLock 與synchronized有什么區別?
synchronized 是一個同步鎖 synchronized (this)
同步鎖 當一個線程A 訪問 【資源】的代碼同步塊的時候,A線程就會持續持有當前鎖的狀態,如果其他線程B-E 也要訪問【資源】的代碼同步塊的時候將會收到阻塞,因此需要排隊等待A線程釋放鎖的狀態。(如圖情況1)但是注意的是,當一個線程B-E 只是不能方法 A線程 【資源】的代碼同步塊,仍然可以訪問其他的非資源同步塊。
ReentrantLock 可重入鎖 通常兩類:公平性、非公平性
公平性:根據線程請求鎖的順序依次獲取鎖,當一個線程A 訪問 【資源】的期間,線程A 獲取鎖資源,此時內部存在一個計數器num+1,在訪問期間,線程B、C請求 資源時,發現A 線程在持有當前資源,因此在后面生成節點排隊(B 處於待喚醒狀態),假如此時a線程再次請求資源時,不需要再次排隊,可以直接再次獲取當前資源 (內部計數器+1 num=2) ,當A線程釋放所有鎖的時候(num=0),此時會喚醒B線程進行獲取鎖的操作,其他C-E線程就同理。(情況2)
非公平性:當A線程已經釋放所之后,准備喚醒線程B獲取資源的時候,此時線程M 獲取請求,此時會出現競爭,線程B 沒有競爭過M線程,測試M獲取的線程因此,M會有限獲得資源,B繼續睡眠。(情況2)
synchronized 是一個非公平性鎖。 非公平性 會比公平性鎖的效率要搞很多原因,不需要通知等待。
ReentrantLock 提供了 new Condition可以獲得多個Condition對象,可以簡單的實現比較復雜的線程同步的功能.通過await(),signal()以實現。
ReentrantLock 提供可以中斷鎖的一個方法lock.lockInterruptibly()方法。
Jdk 1.8 synchronized和 ReentrantLock 比較的話,官方比較建議用synchronized。
在了解Segment 機制之后我們繼續看一下ConcurrentHashMap核心構造方法代碼。
// 跟HashMap結構有點類似 Segment(float lf, int threshold, HashEntry<K,V>[] tab) { this.loadFactor = lf;//負載因子 this.threshold = threshold;//閾值 this.table = tab;//主干數組即HashEntry數組 }
構造方法
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); //MAX_SEGMENTS 為1<<16=65536,也就是最大並發數為65536 if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; int sshift = 0; //ssize 為segments數組長度,concurrentLevel計算得出 int ssize = 1; while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } //segmentShift和segmentMask這兩個變量在定位segment時會用到 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; //計算cap的大小,即Segment中HashEntry的數組長度,cap也一定為2的n次方. int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; //創建segments數組並初始化第一個Segment,其余的Segment延遲初始化 Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; UNSAFE.putOrderedObject(ss, SBASE, s0); this.segments = ss; }
從以上代碼可以看出ConcurrentHashMap有比較重要的三個參數:
loadFactor 負載因子 0.75
threshold 初始 容量 16
concurrencyLevel 實際上是Segment的實際數量。
ConcurrentHashMap如何發生ReHash?
ConcurrentLevel 一旦設定的話,就不會改變。ConcurrentHashMap當元素個數大於臨界值的時候,就會發生擴容。但是ConcurrentHashMap與其他的HashMap不同的是,它不會對Segmengt 數量增大,只會增加Segmengt 后面的鏈表容量的大小。即對每個Segmengt 的元素進行的ReHash操作。
我們再看一下核心的ConcurrentHashMapput ()方法:
public V put(K key, V value) { Segment<K,V> s; //concurrentHashMap不允許key/value為空 if (value == null) throw new NullPointerException(); //hash函數對key的hashCode重新散列,避免差勁的不合理的hashcode,保證散列均勻 int hash = hash(key); //返回的hash值無符號右移segmentShift位與段掩碼進行位運算,定位segment int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
主要注意的是 當前put 方法 當前key 為空的時候 ,代碼報錯。
這個代碼主要是把Key 通過Hash函數計算出hash值 現計算出當前key屬於那個Segment 調用Segment.put 分段方法Segment.put()
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
HashEntry<K,V> node = tryLock() ? null :scanAndLockForPut(key, hash, value);
//tryLock()是ReentrantLock獲取鎖一個方法。如果當前線程獲取鎖成功 返回true,如果別線程獲取了鎖返回false不成功時會遍歷定位到的HashEnry位置的鏈表(遍歷主要是為了使CPU緩存鏈表),若找不到,則創建HashEntry。tryLock一定次數后(MAX_SCAN_RETRIES變量決定),則lock。若遍歷過程中,由於其他線程的操作導致鏈表頭結點變化,則需要重新遍歷。
V oldValue; try { HashEntry<K,V>[] tab = table; int index = (tab.length - 1) & hash;//定位HashEntry,可以看到,這個hash值在定位Segment時和在Segment中定位HashEntry都會用到,只不過定位Segment時只用到高幾位。 HashEntry<K,V> first = entryAt(tab, index); for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break; } e = e.next; } else { if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; //若c超出閾值threshold,需要擴容並rehash。擴容后的容量是當前容量的2倍。這樣可以最大程度避免之前散列好的entry重新散列。擴容並rehash的這個過程是比較消耗資源的。 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { unlock(); } return oldValue; }
Put 時候 ,通過Hash函數將即將要put 的元素均勻的放到所需要的Segment 段中,調用Segment的put 方法進行數據。
Segment的put 是加鎖中完成的。如果當前元素數大於最大臨界值的的話將會產生rehash. 先通過 getFirst 找到鏈表的表頭部分,然后遍歷鏈表,調用equals 比配是否存在相同的key ,如果找到的話,則將最新的Key 對應value值。如果沒有找到,新增一個HashEntry 它加到整個Segment的頭部。
我們先看一下Get 方法的源碼:
//計算Segment中元素的數量
transient volatile int count; *********************************************************** public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); } *********************************************************** final Segment<K,V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; } ******************************************************** V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }
1.讀取的時候 傳遞Key值,通過Hash函數計算出 對應Segment 的位置。
2.調用segmentFor(int hash) 方法,用於確定操作應該在哪一個segment中進行 ,通過 右無符號位運算 右移segmentShift位在與運算 segmentMask【偏移碼】 獲得需要操作的Segment
確定了需要操作的Segment 再調用 get 方法獲取對應的值。通過count 值先判斷當前值是否為空。在調用getFirst()獲取頭節點,然后遍歷列表通過equals對比的方式進行比對返回值。
ConcurrentHashMap為什么讀的時候不加鎖?
ConcurrentHashMap是分段並發分段進行讀取數據的。
Segment 里面有一個Count 字段,用來表示當前Segment中元素的個數 它的類型是volatile變量。所有的操作到最后都會 在最后一部更新count 這個變量,由於volatile變量 happer-before的特性。導致get 方法能夠幾乎准確的獲取最新的結構更新。
再看一下ConcurrentHashMapRemove()方法:
V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1; HashEntry<K,V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) { V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K,V> newFirst = e.next; for (HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }
調用Segment 的remove 方法,先定位當前要刪除的元素C,此時需要把A、B元素全部復制一遍,一個一個接入到D上。
remove 也是在加鎖的情況下進行的。
volatile 變量
我們發現 對於CurrentHashMap而言的話,源碼里面又很多地方都用到了這個變量。比如HashEntry 、value 、Segment元素個數Count。
volatile 屬於JMM 模型中的一個詞語。首先先簡單說一下 Java內存模型中的 幾個概念:
原子性:保證 Java內存模型中原子變量內存操作的。通常有 read、write、load、use、assign、store、lock、unlock等這些。
可見性:就是當一個線程對一個變量進行了修改,其他線程即可立即得到這個變量最新的修改數據。
有序性:如果在本線程內觀察,所有操作都是有序的;如果在一個線程中觀察另一個線程,所有操作都是無序的。
先行發生:happen-before 先行發生原則是指Java內存模型中定義的兩項操作之間的依序關系,如果說操作A先行發生於操作B,其實就是說發生操作B之前.
傳遞性
volatile 變量 與普通變量的不同之處?
volatile 是有可見性,一定程度的有序性。
volatile 賦值的時候新值能夠立即刷新到主內存中去,每次使用的時候能夠立刻從內存中刷新。
做一個簡單例子看一下 這個功能
public class VolatileTest{ int a=1; int b=2; //賦值操作 public void change(){ a=3; b=a; } //打印操作 public void print(){ System.out.println("b:"+b+",a:"+a); } @Test public void testNorMal(){ VolatileTest vt=new VolatileTest(); for (int i = 0; i < 100000; i++) { new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(100); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } vt.change(); } }).start(); new Thread(new Runnable() { @Override public void run() { try { Thread.sleep(10); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } vt.print(); } }).start(); } } }
跑了 n 次會出現一條 b=3,a=1 的錯誤打印記錄。這就是因為普通變量相比volatile 不存在 可見性。
---------------------
作者:技術小排骨
來源:CSDN
原文:https://blog.csdn.net/jjc120074203/article/details/78625433
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!