Java並發包——線程安全的Map相關類
摘要:本文主要學習了Java並發包下線程安全的Map相關的類。
部分內容來自以下博客:
https://blog.csdn.net/bill_xiang_/article/details/81122044
https://www.cnblogs.com/zhaojj/p/8942647.html
分類
參照之前在學習集合時候的分類,可以將JUC下有關Map相關的類進行分類。
ConcurrentHashMap:繼承於AbstractMap類,相當於線程安全的HashMap,是線程安全的哈希表。JDK1.7之前使用分段鎖機制實現,JDK1.8則使用數組+鏈表+紅黑樹數據結構和CAS原子操作實現。
ConcurrentSkipListMap:繼承於AbstractMap類,相當於線程安全的TreeMap,是線程安全的有序的哈希表。通過“跳表”來實現的。
ConcurrentHashMap
JDK1.7的分段鎖機制
Hashtable之所以效率低下主要是因為其實現使用了synchronized關鍵字對put等操作進行加鎖,而synchronized關鍵字加鎖是對整個對象進行加鎖。也就是說在進行put等修改Hash表的操作時,鎖住了整個Hash表,從而使得其表現的效率低下。
因此,在JDK1.5到1.7版本,Java使用了分段鎖機制實現ConcurrentHashMap。
簡而言之,ConcurrentHashMap在對象中保存了一個Segment數組,即將整個Hash表划分為多個分段。而每個Segment元素,即每個分段則類似於一個Hashtable。在執行put操作時首先根據hash算法定位到元素屬於哪個Segment,然后使用ReentrantLock對該Segment加鎖即可。因此,ConcurrentHashMap在多線程並發編程中可是實現多線程put操作。
Segment類是ConcurrentHashMap中的內部類,繼承於ReentrantLock類。ConcurrentHashMap與Segment是組合關系,一個ConcurrentHashMap對象包含若干個Segment對象,ConcurrentHashMap類中存在“Segment數組”成員。
HashEntry也是ConcurrentHashMap的內部類,是單向鏈表節點,存儲着key-value鍵值對。Segment與HashEntry是組合關系,Segment類中存在“HashEntry數組”成員,“HashEntry數組”中的每個HashEntry就是一個單向鏈表。
JDK1.8的改進
在JDK1.7的版本,ConcurrentHashMap是通過分段鎖機制來實現的,所以其最大並發度受Segment的個數限制。因此,在JDK1.8中,ConcurrentHashMap的實現原理摒棄了這種設計,而是選擇了與HashMap類似的數組+鏈表+紅黑樹的方式實現,而加鎖則采用CAS原子更新、volatile關鍵字、synchronized可重入鎖實現。
JDK1.8的實現降低鎖的粒度,JDK1.7版本鎖的粒度是基於Segment的,包含多個HashEntry,而JDK1.8鎖的粒度就是HashEntry(首節點)。
JDK1.8版本的數據結構變得更加簡單,使得操作也更加清晰流暢,因為已經使用synchronized來進行同步,所以不需要分段鎖的概念,也就不需要Segment這種數據結構了,由於粒度的降低,實現的復雜度也增加了。
JDK1.8版本的擴容操作支持多線程並發。在之前的版本中如果Segment正在進行擴容操作,其他寫線程都會被阻塞,JDK1.8改為一個寫線程觸發了擴容操作,其他寫線程進行寫入操作時,可以幫助它來完成擴容這個耗時的操作。
JDK1.8使用紅黑樹來優化鏈表,基於長度很長的鏈表的遍歷是一個很漫長的過程,而紅黑樹的遍歷效率是很快的,代替一定閾值的鏈表。
重要屬性
sizeCtl:標志控制符。這個參數非常重要,出現在ConcurrentHashMap的各個階段,不同的值也表示不同情況和不同功能:
負數代表正在進行初始化或擴容操作。-1表示正在進行初始化操作。-N表示有N-1個線程正在進行擴容操作。
其為0時,表示hash表還未初始化。
正數表示下一次進行擴容的大小,類似於擴容閾值。它的值始終是當前容量的0.75倍,如果hash表的實際大小>=sizeCtl,則進行擴容。
構造方法
需要說明的是,在構造方法里並沒有對集合進行初始化操作,而是等到了添加元素的時候才進行初始化,屬於懶漢式的加載方式。
而且loadFactor參數在JDK1.8中也不再有加載因子的意義,僅為了兼容以前的版本,加載因子由sizeCtl來替代。
同樣,concurrencyLevel參數在JDK1.8中也不再有多線程運行的並發度的意義,僅為了兼容以前的版本。
1 // 空參構造器。 2 public ConcurrentHashMap() { 3 } 4 5 // 指定初始容量的構造器。 6 public ConcurrentHashMap(int initialCapacity) { 7 // 參數有效性判斷。 8 if (initialCapacity < 0) 9 throw new IllegalArgumentException(); 10 int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? 11 MAXIMUM_CAPACITY : 12 tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); 13 // 設置標志控制符。 14 this.sizeCtl = cap; 15 } 16 17 // 指定初始容量,加載因子的構造器。 18 public ConcurrentHashMap(int initialCapacity, float loadFactor) { 19 this(initialCapacity, loadFactor, 1); 20 } 21 22 // 指定初始容量,加載因子,並發度的構造器。 23 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { 24 // 參數有效性判斷。 25 if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) 26 throw new IllegalArgumentException(); 27 // 比較初始容量和並發度的大小,取最大值作為初始容量。 28 if (initialCapacity < concurrencyLevel) 29 initialCapacity = concurrencyLevel; 30 // 計算最大容量。 31 long size = (long)(1.0 + (long)initialCapacity / loadFactor); 32 int cap = (size >= (long)MAXIMUM_CAPACITY) ? 33 MAXIMUM_CAPACITY : tableSizeFor((int)size); 34 // 設置標志控制符。 35 this.sizeCtl = cap; 36 } 37 38 // 包含指定Map集合的構造器。 39 public ConcurrentHashMap(Map<? extends K, ? extends V> m) { 40 // 設置標志控制符。 41 this.sizeCtl = DEFAULT_CAPACITY; 42 // 放置指定的集合。 43 putAll(m); 44 }
初始化方法
集合並不會在構造方法里進行初始化,而是在用到集合的時候才進行初始化,在初始化的同時會設置集合的閾值。
初始化方法主要應用了關鍵屬性sizeCtl,如果sizeCtl小於0,表示其他線程正在進行初始化,就放棄這個操作,在這也可以看出初始化只能由一個線程完成。如果獲得了初始化權限,就用CAS方法將sizeCtl置為-1,防止其他線程進入。初始化完成后,將sizeCtl的值改為0.75倍的集合容量,作為閾值。
在初始化的過程中為了保證線程安全,總共使用了兩步操作:
1)通過CAS原子更新方法將sizeCtl設置為-1,保證只有一個線程進入。
2)線程獲取初始化權限后內部通過 if ((tab = table) == null || tab.length == 0) 二次判斷,保證只有在未初始化的情況下才能執行初始化。
1 // 初始化集合,使用CAS原子更新保證線程安全,使用volatile保證順序和可見性。 2 private final Node<K,V>[] initTable() { 3 Node<K,V>[] tab; int sc; 4 // 死循環以完成初始化。 5 while ((tab = table) == null || tab.length == 0) { 6 // 如果sizeCtl小於0則表示正在初始化,當前線程讓步。 7 if ((sc = sizeCtl) < 0) 8 Thread.yield(); 9 // 如果需要初始化,並且使用CAS原子更新。判斷SIZECTL保存的sizeCtl值是否和sc一致,一致則將sizeCtl更新為-1。 10 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { 11 try { 12 // 第一個線程初始化之后,第二個線程還會進來所以需要重新判斷。類似於線程同步的二次判斷。 13 if ((tab = table) == null || tab.length == 0) { 14 // 如果沒有指定容量則使用默認容量16。 15 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; 16 // 初始化一個指定容量的節點數組。 17 @SuppressWarnings("unchecked") 18 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; 19 // 將節點數組指向集合。 20 table = tab = nt; 21 // 擴容閥值,獲取容量的0.75倍的值,寫法略叼更高端比直接乘高效。 22 sc = n - (n >>> 2); 23 } 24 } finally { 25 // 將sizeCtl的值設為閾值。 26 sizeCtl = sc; 27 } 28 break; 29 } 30 } 31 return tab; 32 }
添加方法
1)校驗數據。判斷傳入一個key和value是否為空,如果為空就直接報錯。ConcurrentHashMap是不可為空的(HashMap是可以為空的)。
2)是否要初始化。判斷table是否為空,如果為空就進入初始化階段。
3)如果數組中key指定的桶是空的,那就使用CAS原子操作把鍵值對插入到這個桶中作為頭節點。
4)協助擴容。如果這個要插入的桶中的hash值為-1,也就是MOVED狀態(也就是這個節點是ForwordingNode),那就是說明有線程正在進行擴容操作,那么當前線程就進入協助擴容階段。
5)插入數據到鏈表或者紅黑樹。如果這個節點是鏈表節點,那么就遍歷這個鏈表,如果有相同的key值就更新value值,如果沒有發現相同的key值,就在鏈表的尾部插入該數據。如果這個節點是紅黑樹節點,那就需要按照樹的插入規則進行插入。
6)轉化成紅黑樹。插入結束之后判斷如果是鏈表節點,並且個數大於8,就需要把鏈表轉化為紅黑樹存儲。
7)添加結束之后,需要給增加已存儲的數量,並判斷是否需要擴容。
1 // 添加元素。 2 public V put(K key, V value) { 3 return putVal(key, value, false); 4 } 5 6 // 添加元素。 7 final V putVal(K key, V value, boolean onlyIfAbsent) { 8 // 排除null的數據。 9 if (key == null || value == null) throw new NullPointerException(); 10 // 計算hash,並保證hash一定大於零,負數表示在擴容或者是樹節點。 11 int hash = spread(key.hashCode()); 12 // 節點個數。0表示未加入新結點,2表示TreeBin或鏈表結點數,其它值表示鏈表結點數。主要用於每次加入結點后查看是否要由鏈表轉為紅黑樹。 13 int binCount = 0; 14 // CAS經典寫法,不成功無限重試。 15 for (Node<K,V>[] tab = table;;) { 16 // 聲明節點、集合長度、對應的數組下標、節點的hash值。 17 Node<K,V> f; int n, i, fh; 18 // 如果沒有初始化則進行初始化。除非構造時指定集合,否則默認構造不初始化,添加時檢查是否初始化,屬於懶漢模式初始化。 19 if (tab == null || (n = tab.length) == 0) 20 // 初始化集合。 21 tab = initTable(); 22 // 如果已經初始化了,並且使用CAS根據hash獲取到的節點為null。 23 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 24 // 使用CAS比較該索引處是否為null防止其它線程已改變該值,null則插入。 25 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) 26 // 添加成功,跳出循環。 27 break; 28 } 29 // 如果獲取到節點不為null,並且節點的hash為-1,則表示節點在擴容。 30 else if ((fh = f.hash) == MOVED) 31 // 幫助擴容。 32 tab = helpTransfer(tab, f); 33 // 產生hash碰撞,並且沒有擴容操作。 34 else { 35 V oldVal = null; 36 // 鎖住節點。 37 synchronized (f) { 38 // 這里volatile獲取首節點與節點對比判斷節點還是不是首節點。 39 if (tabAt(tab, i) == f) { 40 // 判斷是否是鏈表節點。 41 if (fh >= 0) { 42 // 記錄節點個數。 43 binCount = 1; 44 // 循環完成添加節點到鏈表。 45 for (Node<K,V> e = f;; ++binCount) { 46 K ek; 47 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { 48 oldVal = e.val; 49 if (!onlyIfAbsent) 50 e.val = value; 51 break; 52 } 53 Node<K,V> pred = e; 54 if ((e = e.next) == null) { 55 pred.next = new Node<K,V>(hash, key, value, null); 56 break; 57 } 58 } 59 } 60 // 如果是紅黑樹節點。 61 else if (f instanceof TreeBin) { 62 Node<K,V> p; 63 binCount = 2; 64 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { 65 oldVal = p.val; 66 if (!onlyIfAbsent) 67 p.val = value; 68 } 69 } 70 } 71 } 72 // 如果添加到了鏈表節點,需要進一步判斷是否需要轉為紅黑樹。 73 if (binCount != 0) { 74 // 如果鏈表上的節點數大於等於8。 75 if (binCount >= TREEIFY_THRESHOLD) 76 // 嘗試轉為紅黑樹。 77 treeifyBin(tab, i); 78 if (oldVal != null) 79 // 返回原值。 80 return oldVal; 81 break; 82 } 83 } 84 } 85 // 集合容量加一並判斷是否要擴容。 86 addCount(1L, binCount); 87 return null; 88 }
修改容量並判斷是否需要擴容
1)嘗試對baseCount和CounterCell進行增加的操作,這些操作基於CAS原子操作,同時使用volatile保證順序和可見性。備用方法fullAddCount()則會死循環插入。
2)判斷是否要擴容操作,並且支持多個線程協助進行擴容。
1 // 修改容量並判斷是否要擴容。 2 private final void addCount(long x, int check) { 3 CounterCell[] as; long b, s; 4 // counterCells不為null,或者使用CAS對baseCount增加失敗了,說明產生了並發,需要進一步處理。 5 // counterCells初始為null,如果不為null,說明產生了並發。 6 // 如果counterCells仍然為null,但是在使用CAS對baseCount增加的時候失敗,表示產生了並發。 7 if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 8 CounterCell a; long v; int m; 9 boolean uncontended = true; 10 // 如果counterCells是null的,或者counterCells的個數小於0。 11 // 或者counterCells的每一個元素都是null。 12 // 或者用counterCells數組中隨機位置的值進行累加也失敗了。 13 if (as == null || (m = as.length - 1) < 0 || 14 (a = as[ThreadLocalRandom.getProbe() & m]) == null || 15 !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { 16 // 繼續更新counterCells和baseCount。 17 fullAddCount(x, uncontended); 18 return; 19 } 20 // 刪除或清理節點時是-1,插入索引首節點0,第二個節點是1。 21 if (check <= 1) 22 return; 23 // 計算map元素個數。 24 s = sumCount(); 25 } 26 // 如果check的值大於等於0,需要檢查是否要擴容。刪除或清理節點時是-1,此時不檢查。 27 if (check >= 0) { 28 Node<K,V>[] tab, nt; int n, sc; 29 // 當元素個數大於閾值,並且集合不為空,並且元素個數小於最大值。循環判斷,防止多線程同時擴容跳過if判斷。 30 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { 31 // 生成與n有關的標記,且n不變的情況下生成的一定是一樣的。 32 int rs = resizeStamp(n); 33 // sc在單線程時是大於等於0的,如果小於0說明有其他線程正在擴容。 34 // 如果小於0說明有線程執行了else里面的判斷,導致rs左移16位並在低位+2賦值給sc。 35 if (sc < 0) { 36 // 在第一次左移16位的sc,經過第二次右移16位之后,還和rs相同,說明已經擴容完成。 37 // 線程執行擴容,會使用CAS讓sc自增,如果sc和右移並累加后的rs相等,說明已經擴容完成。 38 // 線程執行擴容,會使用CAS讓sc自增,如果sc和右移並累加最大值后的rs相等,說明已經擴容完成。 39 // 如果下個節點是null,說明已經擴容完成。 40 // 如果transferIndex小於等於0,說明集合已完成擴容,無法再分配任務。 41 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 42 sc == rs + 1 ||// 此處應為 sc == (rs << RESIZE_STAMP_SHIFT) + 1 43 sc == rs + MAX_RESIZERS ||// 此處應為 sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS 44 (nt = nextTable) == null || 45 transferIndex <= 0) 46 // 跳出循環。 47 break; 48 // 使用CAS原子累加sc的值。 49 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 50 // 擴容。 51 transfer(tab, nt); 52 } 53 // 如果sizeCtl大於或等於0,說明第一次擴容,並且使用CAS設置sizeCtl為rs左移后的負數,並且低位+2表示有2-1個線程正在擴容。 54 else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) 55 // 進行擴容操作。 56 transfer(tab, null); 57 // 計算map元素個數,baseCount和counterCells數組存的總和。 58 s = sumCount(); 59 } 60 } 61 }
幫助擴容方法
1)判斷集合已經完成初始化,並且節點是ForwordingNode類型(表示正在擴容),並且當前節點的子節點不為null,如果不成立則不需要擴容。
2)循環判斷是否擴容成功,如果沒有就使用CAS原子操作累加擴容的線程數,並且進行協助擴容。
1 // 幫助擴容。 2 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) { 3 Node<K,V>[] nextTab; int sc; 4 // 如果表不為null,並且是fwd類型的節點,並且節點的子節點也不為null。 5 if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) { 6 // 得到標識符。 7 int rs = resizeStamp(tab.length); 8 // 如果nextTab沒有被並發修改,並且tab也沒有被並發修改,並且sizeCtl小於0說明還在擴容。 9 while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) { 10 // 在第一次左移16位的sc,經過第二次右移16位之后,還和rs相同,說明已經擴容完成。 11 // 線程執行擴容,會使用CAS讓sc自增,如果sc和右移並累加后的rs相等,說明已經擴容完成。 12 // 線程執行擴容,會使用CAS讓sc自增,如果sc和右移並累加最大值后的rs相等,說明已經擴容完成。 13 // 如果transferIndex小於等於0,說明集合已完成擴容,無法再分配任務。 14 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || 15 sc == rs + 1 ||// 此處應為 sc == (rs << RESIZE_STAMP_SHIFT) + 1 16 sc == rs + MAX_RESIZERS ||// 此處應為 sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS 17 transferIndex <= 0) 18 // 跳出循環。 19 break; 20 // 使用CAS原子累加sc的值。 21 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) { 22 // 擴容。 23 transfer(tab, nextTab); 24 break; 25 } 26 } 27 return nextTab; 28 } 29 return table; 30 }
擴容方法
1)根據CPU核心數平均分配給每個CPU相同大小的區間,如果不夠16個,默認就是16個。
2)有且只能由一個線程構建一個nextTable,這個nextTable是擴容后的數組(容量已經擴大)。
3)外層使用for循環處理每個區間里的根節點,內層使用while循環讓線程領取未擴容的區間。
4)處理每個區間的頭節點,如果頭結點為空,則直接放置一個ForwordingNode,通知其他線程幫助擴容。
5)處理每個區間的頭節點,如果頭結點不為空,並且hash不為-1,那么就同步頭節點,開始擴容。判斷頭結點是鏈表還是紅黑樹:如果是鏈表,則拆分為高低兩個鏈表。如果是紅黑樹,拆分為高低兩個紅黑樹,並判斷是否需要轉為鏈表。
1 // 進行擴容操作。 2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { 3 int n = tab.length, stride; 4 // 根據cpu個數找出擴容時的最小分組,最小是16。 5 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) 6 stride = MIN_TRANSFER_STRIDE; 7 // 表示第一次擴容,因為在addCount()方法中,第一次擴容的時候傳入的nextTab的值是null。 8 if (nextTab == null) { 9 try { 10 // 創建新的擴容后的節點數組。 11 @SuppressWarnings("unchecked") 12 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; 13 // 將新的數組賦值給nextTab。 14 nextTab = nt; 15 } catch (Throwable ex) { 16 // 擴容失敗,設置sizeCtl為最大值。 17 sizeCtl = Integer.MAX_VALUE; 18 return; 19 } 20 // 將新的數組賦值給nextTable。 21 nextTable = nextTab; 22 // 記錄要擴容的區間最大值,說明是逆序遷移,從高位向低位遷移。 23 transferIndex = n; 24 } 25 // 設置擴容后的容量。 26 int nextn = nextTab.length; 27 // 創建一個fwd節點,用於占位,fwd節點的hash默認為-1。當別的線程發現這個槽位中是fwd類型的節點,則跳過這個節點。 28 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); 29 // 如果是false,需要處理區間上的當前位置,如果是true,說明需要處理區間上的下一個位置。 30 boolean advance = true; 31 // 完成狀態,如果是true,就結束此方法。 32 boolean finishing = false; 33 // 死循環,i表示最大下標,bound表示最小下標。 34 for (int i = 0, bound = 0;;) { 35 Node<K,V> f; int fh; 36 // 循環判斷是否要處理區間上的下一個位置,每個線程都會在這個循環里獲取區間。 37 while (advance) { 38 int nextIndex, nextBound; 39 // i自減一並判斷是否大於等於bound,以及是否已經完成了擴容。 40 // 如果i自減后大於等於bound並且未完成擴容,說明需要處理當前i位置上的節點,跳出while循環。 41 // 如果i自減后小於bound並且未完成擴容,說明區間上沒有節點需要處理,在while循環里繼續判讀。 42 // 如果已經完成擴容,跳出while循環。 43 if (--i >= bound || finishing) 44 // 跳出while循環。 45 advance = false; 46 // 如果要擴容的區間最大值小於等於0,說明沒有區間需要擴容了。 47 else if ((nextIndex = transferIndex) <= 0) { 48 // i會在下面的if塊里判斷,從而進入完成狀態判斷。 49 i = -1; 50 // 跳出while循環。 51 advance = false; 52 } 53 // 首次while循環進入,CAS判斷transferIndex和nextIndex是否一致,將transferIndex修改為最大值。 54 else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex, 55 nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { 56 // 當前線程處理區間的最小下標。 57 bound = nextBound; 58 // 初次對i賦值,當前線程處理區間的最大下標。 59 i = nextIndex - 1; 60 // 跳出while循環。 61 advance = false; 62 } 63 } 64 // 判讀是否完成擴容。 65 // 如果i小於0,表示已經處理了最后一段空間。 66 // 如果i大於等於原容量,表示超過下標最大值。 67 // 如果i加上原容量大於等於新容量,表示超過下標最大值。 68 if (i < 0 || i >= n || i + n >= nextn) { 69 int sc; 70 // 如果完成擴容,finishing為true,表示最后一個線程完成了擴容。 71 if (finishing) { 72 // 刪除成員變量。 73 nextTable = null; 74 // 更新集合。 75 table = nextTab; 76 // 更新閾值。 77 sizeCtl = (n << 1) - (n >>> 1); 78 return; 79 } 80 // 如果沒完成擴容,當前線程完成這段區間的擴容,將sc的低16位減1。 81 if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { 82 // 如果判斷是否是最后一個擴容線程,如果不等於,說明還有其他線程在擴容,當前線程返回。 83 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) 84 return; 85 // 如果相等,說明當前最后一個線程完成擴容,擴容結束,並再次進入while循環檢查一次。 86 finishing = advance = true; 87 // 再次循環檢查一下整張表。 88 i = n; 89 } 90 } 91 // 正常處理區間,如果原數組i位置是null,就使用fwd占位。 92 else if ((f = tabAt(tab, i)) == null) 93 // 如果成功寫入fwd占位,進入while循環,繼續處理區間的下一個節點。 94 advance = casTabAt(tab, i, null, fwd); 95 // 正常處理區間,如果原數組i位置不是null,並且hash值是-1,說明別的線程已經處理過了。 96 else if ((fh = f.hash) == MOVED) 97 // 進入while循環,繼續處理區間的下一個節點。 98 advance = true; 99 // 到這里,說明這個位置有實際值了,且不是占位符。 100 else { 101 // 對這個節點上鎖,防止添加元素的時候向鏈表插入數據。 102 synchronized (f) { 103 // 判斷i下標處的桶節點是否和f相同,二次校驗。 104 if (tabAt(tab, i) == f) { 105 // 聲明高位桶和低位桶。 106 Node<K,V> ln, hn; 107 // 如果f的hash值大於0,表示是鏈表結構。紅黑樹的hash默認是-2。 108 if (fh >= 0) { 109 // 獲取原容量最高位同節點hash值的與運算結果,用來判斷將該節點放到高位還是低位。 110 int runBit = fh & n; 111 // 定義尾節點,暫時取f節點,后面會更新。 112 Node<K,V> lastRun = f; 113 // 遍歷這個節點。 114 for (Node<K,V> p = f.next; p != null; p = p.next) { 115 // 獲取原容量最高位同節點hash值的與運算結果,用來判斷將該節點放到高位還是低位。 116 int b = p.hash & n; 117 // 如果節點的hash值和首節點的hash值,同原容量最高位與運算的結果不同。 118 if (b != runBit) { 119 // 更新runBit,用於下面判斷lastRun該賦值給ln還是hn。 120 runBit = b; 121 // 更新lastRun,保證后面的節點與自己的取於值相同,避免后面沒有必要的循環。 122 lastRun = p; 123 } 124 } 125 // 如果最后更新的runBit是0,設置低位節點。 126 if (runBit == 0) { 127 ln = lastRun; 128 hn = null; 129 } 130 // 如果最后更新的runBit是1,設置高位節點。 131 else { 132 hn = lastRun; 133 ln = null; 134 } 135 // 再次循環,生成兩個鏈表,lastRun作為停止條件,這樣就是避免無謂的循環。 136 for (Node<K,V> p = f; p != lastRun; p = p.next) { 137 int ph = p.hash; K pk = p.key; V pv = p.val; 138 // 如果與運算結果是0,那么創建低位節點。 139 if ((ph & n) == 0) 140 ln = new Node<K,V>(ph, pk, pv, ln); 141 // 如果與運算結果是1,那么創建高位節點。 142 else 143 hn = new Node<K,V>(ph, pk, pv, hn); 144 } 145 // 設置低位鏈表,放在新數組的i位置。 146 setTabAt(nextTab, i, ln); 147 // 設置高位鏈表,放在新數組的i+n位置。 148 setTabAt(nextTab, i + n, hn); 149 // 將舊的鏈表設置成fwd占位符。 150 setTabAt(tab, i, fwd); 151 // 繼續處理區間的下一個節點。 152 advance = true; 153 } 154 // 如果是紅黑樹結構。 155 else if (f instanceof TreeBin) { 156 TreeBin<K,V> t = (TreeBin<K,V>)f; 157 TreeNode<K,V> lo = null, loTail = null; 158 TreeNode<K,V> hi = null, hiTail = null; 159 int lc = 0, hc = 0; 160 // 遍歷。 161 for (Node<K,V> e = t.first; e != null; e = e.next) { 162 int h = e.hash; 163 TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null); 164 // 與運算結果為0的放在低位。 165 if ((h & n) == 0) { 166 if ((p.prev = loTail) == null) 167 lo = p; 168 else 169 loTail.next = p; 170 loTail = p; 171 ++lc; 172 } 173 // 與運算結果為1的放在高位。 174 else { 175 if ((p.prev = hiTail) == null) 176 hi = p; 177 else 178 hiTail.next = p; 179 hiTail = p; 180 ++hc; 181 } 182 } 183 // 如果樹的節點數小於等於6,那么轉成鏈表,反之,創建一個新的樹。 184 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; 185 // 如果樹的節點數小於等於6,那么轉成鏈表,反之,創建一個新的樹。 186 hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; 187 // 設置低位樹,放在新數組的i位置。 188 setTabAt(nextTab, i, ln); 189 // 設置高位數,放在新數組的i+n位置。 190 setTabAt(nextTab, i + n, hn); 191 // 將舊的樹設置成fwd占位符。 192 setTabAt(tab, i, fwd); 193 // 繼續處理區間的下一個節點。 194 advance = true; 195 } 196 } 197 } 198 } 199 } 200 }
獲取方法
根據指定的鍵,返回對應的鍵值對,由於是讀操作,所以不涉及到並發問題,步驟如下:
1)判斷查詢的key對應數組的首節點是否null。
2)先判斷數組的首節點是否為尋找的對象。
3)如果首節點不是,並且是紅黑樹結構,另做處理。
4)如果是鏈表結構,遍歷整個鏈表查詢。
5)如果都不是,那就返回null值。
1 // 獲取元素。 2 public V get(Object key) { 3 Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; 4 // 計算hash,並保證hash一定大於零,負數表示在擴容或者是樹節點。 5 int h = spread(key.hashCode()); 6 // 如果集合不為null,並且集合長度大於0,並且指定位置上的元素不為null。 7 if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { 8 // 如果hash相等。 9 if ((eh = e.hash) == h) { 10 // 如果首節點是要找的元素。 11 if ((ek = e.key) == key || (ek != null && key.equals(ek))) 12 return e.val; 13 } 14 // 如果正在擴容或者是樹節點。 15 else if (eh < 0) 16 // 嘗試查找元素,找到返回元素,找不到返回null。 17 return (p = e.find(h, key)) != null ? p.val : null; 18 // 如果不是首節點,則遍歷集合查找。 19 while ((e = e.next) != null) { 20 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) 21 return e.val; 22 } 23 } 24 return null; 25 }
刪除方法
刪除操作,可以看成是用null替代原來的節點,因此合並在這個方法中,由這個方法一起實現刪除操作和替換操作。
replaceNode()方法中的三個參數,key表示想要刪除的鍵,value表示想要替換的元素,cv表示想要刪除的key對應的值。
1 // 刪除元素。 2 public V remove(Object key) { 3 return replaceNode(key, null, null); 4 } 5 6 // 刪除元素 7 final V replaceNode(Object key, V value, Object cv) { 8 // 計算hash,並保證hash一定大於零,負數表示在擴容或者是樹節點。 9 int hash = spread(key.hashCode()); 10 // CAS經典寫法,不成功無限重試。 11 for (Node<K,V>[] tab = table;;) { 12 Node<K,V> f; int n, i, fh; 13 // 如果集合是null,或者集合長度是0,或者指定位置上的元素是null。 14 if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null) 15 // 跳出循環。 16 break; 17 // 如果獲取到節點不為null,並且節點的hash為-1,則表示節點在擴容。 18 else if ((fh = f.hash) == MOVED) 19 // 幫助擴容。 20 tab = helpTransfer(tab, f); 21 // 產生hash碰撞,並且沒有擴容操作。 22 else { 23 V oldVal = null; 24 // 是否進入了同步代碼。 25 boolean validated = false; 26 // 鎖住節點。 27 synchronized (f) { 28 // 這里volatile獲取首節點與節點對比判斷節點還是不是首節點。 29 if (tabAt(tab, i) == f) { 30 // 判斷是否是鏈表節點。 31 if (fh >= 0) { 32 validated = true; 33 // 循環查找指定元素。 34 for (Node<K,V> e = f, pred = null;;) { 35 K ek; 36 // 找到元素了。 37 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { 38 V ev = e.val; 39 // 如果cv為null,或者cv不為null時cv和指定元素上的值相同,才更新或者刪除節點。 40 if (cv == null || cv == ev || (ev != null && cv.equals(ev))) { 41 oldVal = ev; 42 // 如果新值不為null,替換。 43 if (value != null) 44 e.val = value; 45 // 如果新值是null,並且當前節點非首結點,刪除。 46 else if (pred != null) 47 pred.next = e.next; 48 // 如果新值是null,並且當前節點是首結點,刪除。 49 else 50 setTabAt(tab, i, e.next); 51 } 52 break; 53 } 54 pred = e; 55 // 如果遍歷集合也沒有找到。 56 if ((e = e.next) == null) 57 // 跳出循環。 58 break; 59 } 60 } 61 // 如果是紅黑樹節點。 62 else if (f instanceof TreeBin) { 63 validated = true; 64 TreeBin<K,V> t = (TreeBin<K,V>)f; 65 TreeNode<K,V> r, p; 66 // 找到元素了。 67 if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) { 68 V pv = p.val; 69 // 如果cv為null,或者cv不為null時cv和指定元素上的值相同,才更新或者刪除節點。 70 if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { 71 oldVal = pv; 72 if (value != null) 73 p.val = value; 74 else if (t.removeTreeNode(p)) 75 setTabAt(tab, i, untreeify(t.first)); 76 } 77 } 78 } 79 } 80 } 81 // 如果進入了同步代碼。 82 if (validated) { 83 // 如果更新或者刪除了節點。 84 if (oldVal != null) { 85 // 如果value為null,說明是刪除操作。 86 if (value == null) 87 // 將數組長度減一。 88 addCount(-1L, -1); 89 return oldVal; 90 } 91 break; 92 } 93 } 94 } 95 return null; 96 }
計算集合容量
ConcurrentHashMap中baseCount用於保存tab中元素總數,但是並不准確,因為多線程同時增刪改,會導致baseCount修改失敗,此時會將元素變動存儲於counterCells數組內。
當需要統計當前的size的時候,除了要統計baseCount之外,還需要加上counterCells中的每個桶的值。
值得一提的是即使如此,統計出來的依舊不是當前tab中元素的准確值,在多線程環境下統計前后並不能暫停線程操作,因此無法保證准確性。
1 // 計算集合容量。 2 public int size() { 3 long n = sumCount(); 4 return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n); 5 } 6 7 // 計算集合容量,baseCount和counterCells數組存的總和。 8 final long sumCount() { 9 CounterCell[] as = counterCells; CounterCell a; 10 long sum = baseCount; 11 if (as != null) { 12 for (int i = 0; i < as.length; ++i) { 13 if ((a = as[i]) != null) 14 sum += a.value; 15 } 16 } 17 return sum; 18 }
Hashtable、Collections.synchronizedMap()、ConcurrentHashMap之間的區別
Hashtable是線程安全的哈希表,它是通過synchronized來保證線程安全的;即,多線程通過同一個“對象的同步鎖”來實現並發控制。Hashtable在線程競爭激烈時,效率比較低(此時建議使用ConcurrentHashMap)。當一個線程訪問Hashtable的同步方法時,其它線程如果也在訪問Hashtable的同步方法時,可能會進入阻塞狀態。
Collections.synchronizedMap()使用了synchronized同步關鍵字來保證對Map的操作是線程安全的。
ConcurrentHashMap是線程安全的哈希表。在JDK1.7中它是通過“鎖分段”來保證線程安全的,本質上也是一個“可重入的互斥鎖”(ReentrantLock)。多線程對同一個片段的訪問,是互斥的;但是,對於不同片段的訪問,卻是可以同步進行的。在JDK1.8中是通過使用CAS原子更新、volatile關鍵字、synchronized可重入鎖實現的。