Java並發包——線程安全的Map相關類


Java並發包——線程安全的Map相關類

摘要:本文主要學習了Java並發包下線程安全的Map相關的類。

部分內容來自以下博客:

https://blog.csdn.net/bill_xiang_/article/details/81122044

https://www.cnblogs.com/zhaojj/p/8942647.html

分類

參照之前在學習集合時候的分類,可以將JUC下有關Map相關的類進行分類。

ConcurrentHashMap:繼承於AbstractMap類,相當於線程安全的HashMap,是線程安全的哈希表。JDK1.7之前使用分段鎖機制實現,JDK1.8則使用數組+鏈表+紅黑樹數據結構和CAS原子操作實現。

ConcurrentSkipListMap:繼承於AbstractMap類,相當於線程安全的TreeMap,是線程安全的有序的哈希表。通過“跳表”來實現的。

ConcurrentHashMap

JDK1.7的分段鎖機制

Hashtable之所以效率低下主要是因為其實現使用了synchronized關鍵字對put等操作進行加鎖,而synchronized關鍵字加鎖是對整個對象進行加鎖。也就是說在進行put等修改Hash表的操作時,鎖住了整個Hash表,從而使得其表現的效率低下。

因此,在JDK1.5到1.7版本,Java使用了分段鎖機制實現ConcurrentHashMap。

簡而言之,ConcurrentHashMap在對象中保存了一個Segment數組,即將整個Hash表划分為多個分段。而每個Segment元素,即每個分段則類似於一個Hashtable。在執行put操作時首先根據hash算法定位到元素屬於哪個Segment,然后使用ReentrantLock對該Segment加鎖即可。因此,ConcurrentHashMap在多線程並發編程中可是實現多線程put操作。

Segment類是ConcurrentHashMap中的內部類,繼承於ReentrantLock類。ConcurrentHashMap與Segment是組合關系,一個ConcurrentHashMap對象包含若干個Segment對象,ConcurrentHashMap類中存在“Segment數組”成員。

HashEntry也是ConcurrentHashMap的內部類,是單向鏈表節點,存儲着key-value鍵值對。Segment與HashEntry是組合關系,Segment類中存在“HashEntry數組”成員,“HashEntry數組”中的每個HashEntry就是一個單向鏈表。

JDK1.8的改進

在JDK1.7的版本,ConcurrentHashMap是通過分段鎖機制來實現的,所以其最大並發度受Segment的個數限制。因此,在JDK1.8中,ConcurrentHashMap的實現原理摒棄了這種設計,而是選擇了與HashMap類似的數組+鏈表+紅黑樹的方式實現,而加鎖則采用CAS原子更新、volatile關鍵字、synchronized可重入鎖實現。

JDK1.8的實現降低鎖的粒度,JDK1.7版本鎖的粒度是基於Segment的,包含多個HashEntry,而JDK1.8鎖的粒度就是HashEntry(首節點)。

JDK1.8版本的數據結構變得更加簡單,使得操作也更加清晰流暢,因為已經使用synchronized來進行同步,所以不需要分段鎖的概念,也就不需要Segment這種數據結構了,由於粒度的降低,實現的復雜度也增加了。

JDK1.8版本的擴容操作支持多線程並發。在之前的版本中如果Segment正在進行擴容操作,其他寫線程都會被阻塞,JDK1.8改為一個寫線程觸發了擴容操作,其他寫線程進行寫入操作時,可以幫助它來完成擴容這個耗時的操作。

JDK1.8使用紅黑樹來優化鏈表,基於長度很長的鏈表的遍歷是一個很漫長的過程,而紅黑樹的遍歷效率是很快的,代替一定閾值的鏈表。

重要屬性

sizeCtl:標志控制符。這個參數非常重要,出現在ConcurrentHashMap的各個階段,不同的值也表示不同情況和不同功能:

負數代表正在進行初始化或擴容操作。-1表示正在進行初始化操作。-N表示有N-1個線程正在進行擴容操作。

其為0時,表示hash表還未初始化。

正數表示下一次進行擴容的大小,類似於擴容閾值。它的值始終是當前容量的0.75倍,如果hash表的實際大小>=sizeCtl,則進行擴容。

構造方法

需要說明的是,在構造方法里並沒有對集合進行初始化操作,而是等到了添加元素的時候才進行初始化,屬於懶漢式的加載方式。

而且loadFactor參數在JDK1.8中也不再有加載因子的意義,僅為了兼容以前的版本,加載因子由sizeCtl來替代。

同樣,concurrencyLevel參數在JDK1.8中也不再有多線程運行的並發度的意義,僅為了兼容以前的版本。

 1 // 空參構造器。
 2 public ConcurrentHashMap() {
 3 }
 4 
 5 // 指定初始容量的構造器。
 6 public ConcurrentHashMap(int initialCapacity) {
 7     // 參數有效性判斷。
 8     if (initialCapacity < 0)
 9         throw new IllegalArgumentException();
10     int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
11             MAXIMUM_CAPACITY :
12             tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
13     // 設置標志控制符。
14     this.sizeCtl = cap;
15 }
16 
17 // 指定初始容量,加載因子的構造器。
18 public ConcurrentHashMap(int initialCapacity, float loadFactor) {
19     this(initialCapacity, loadFactor, 1);
20 }
21 
22 // 指定初始容量,加載因子,並發度的構造器。
23 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
24     // 參數有效性判斷。
25     if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0)
26         throw new IllegalArgumentException();
27     // 比較初始容量和並發度的大小,取最大值作為初始容量。
28     if (initialCapacity < concurrencyLevel)
29         initialCapacity = concurrencyLevel;
30     // 計算最大容量。
31     long size = (long)(1.0 + (long)initialCapacity / loadFactor);
32     int cap = (size >= (long)MAXIMUM_CAPACITY) ?
33         MAXIMUM_CAPACITY : tableSizeFor((int)size);
34     // 設置標志控制符。
35     this.sizeCtl = cap;
36 }
37 
38 // 包含指定Map集合的構造器。
39 public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
40     // 設置標志控制符。
41     this.sizeCtl = DEFAULT_CAPACITY;
42     // 放置指定的集合。
43     putAll(m);
44 }

初始化方法

集合並不會在構造方法里進行初始化,而是在用到集合的時候才進行初始化,在初始化的同時會設置集合的閾值。

初始化方法主要應用了關鍵屬性sizeCtl,如果sizeCtl小於0,表示其他線程正在進行初始化,就放棄這個操作,在這也可以看出初始化只能由一個線程完成。如果獲得了初始化權限,就用CAS方法將sizeCtl置為-1,防止其他線程進入。初始化完成后,將sizeCtl的值改為0.75倍的集合容量,作為閾值。

在初始化的過程中為了保證線程安全,總共使用了兩步操作:

1)通過CAS原子更新方法將sizeCtl設置為-1,保證只有一個線程進入。

2)線程獲取初始化權限后內部通過 if ((tab = table) == null || tab.length == 0) 二次判斷,保證只有在未初始化的情況下才能執行初始化。

 1 // 初始化集合,使用CAS原子更新保證線程安全,使用volatile保證順序和可見性。
 2 private final Node<K,V>[] initTable() {
 3     Node<K,V>[] tab; int sc;
 4     // 死循環以完成初始化。
 5     while ((tab = table) == null || tab.length == 0) {
 6         // 如果sizeCtl小於0則表示正在初始化,當前線程讓步。
 7         if ((sc = sizeCtl) < 0)
 8             Thread.yield();
 9         // 如果需要初始化,並且使用CAS原子更新。判斷SIZECTL保存的sizeCtl值是否和sc一致,一致則將sizeCtl更新為-1。
10         else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
11             try {
12                 // 第一個線程初始化之后,第二個線程還會進來所以需要重新判斷。類似於線程同步的二次判斷。
13                 if ((tab = table) == null || tab.length == 0) {
14                     // 如果沒有指定容量則使用默認容量16。
15                     int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
16                     // 初始化一個指定容量的節點數組。
17                     @SuppressWarnings("unchecked")
18                     Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
19                     // 將節點數組指向集合。
20                     table = tab = nt;
21                     // 擴容閥值,獲取容量的0.75倍的值,寫法略叼更高端比直接乘高效。
22                     sc = n - (n >>> 2);
23                 }
24             } finally {
25                 // 將sizeCtl的值設為閾值。
26                 sizeCtl = sc;
27             }
28             break;
29         }
30     }
31     return tab;
32 }

添加方法

1)校驗數據。判斷傳入一個key和value是否為空,如果為空就直接報錯。ConcurrentHashMap是不可為空的(HashMap是可以為空的)。

2)是否要初始化。判斷table是否為空,如果為空就進入初始化階段。

3)如果數組中key指定的桶是空的,那就使用CAS原子操作把鍵值對插入到這個桶中作為頭節點。

4)協助擴容。如果這個要插入的桶中的hash值為-1,也就是MOVED狀態(也就是這個節點是ForwordingNode),那就是說明有線程正在進行擴容操作,那么當前線程就進入協助擴容階段。

5)插入數據到鏈表或者紅黑樹。如果這個節點是鏈表節點,那么就遍歷這個鏈表,如果有相同的key值就更新value值,如果沒有發現相同的key值,就在鏈表的尾部插入該數據。如果這個節點是紅黑樹節點,那就需要按照樹的插入規則進行插入。

6)轉化成紅黑樹。插入結束之后判斷如果是鏈表節點,並且個數大於8,就需要把鏈表轉化為紅黑樹存儲。

7)添加結束之后,需要給增加已存儲的數量,並判斷是否需要擴容。

 1 // 添加元素。
 2 public V put(K key, V value) {
 3     return putVal(key, value, false);
 4 }
 5 
 6 // 添加元素。
 7 final V putVal(K key, V value, boolean onlyIfAbsent) {
 8     // 排除null的數據。
 9     if (key == null || value == null) throw new NullPointerException();
10     // 計算hash,並保證hash一定大於零,負數表示在擴容或者是樹節點。
11     int hash = spread(key.hashCode());
12     // 節點個數。0表示未加入新結點,2表示TreeBin或鏈表結點數,其它值表示鏈表結點數。主要用於每次加入結點后查看是否要由鏈表轉為紅黑樹。
13     int binCount = 0;
14     // CAS經典寫法,不成功無限重試。
15     for (Node<K,V>[] tab = table;;) {
16         // 聲明節點、集合長度、對應的數組下標、節點的hash值。
17         Node<K,V> f; int n, i, fh;
18         // 如果沒有初始化則進行初始化。除非構造時指定集合,否則默認構造不初始化,添加時檢查是否初始化,屬於懶漢模式初始化。
19         if (tab == null || (n = tab.length) == 0)
20             // 初始化集合。
21             tab = initTable();
22         // 如果已經初始化了,並且使用CAS根據hash獲取到的節點為null。
23         else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
24             // 使用CAS比較該索引處是否為null防止其它線程已改變該值,null則插入。
25             if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
26                 // 添加成功,跳出循環。
27                 break;
28         }
29         // 如果獲取到節點不為null,並且節點的hash為-1,則表示節點在擴容。
30         else if ((fh = f.hash) == MOVED)
31             // 幫助擴容。
32             tab = helpTransfer(tab, f);
33         // 產生hash碰撞,並且沒有擴容操作。
34         else {
35             V oldVal = null;
36             // 鎖住節點。
37             synchronized (f) {
38                 // 這里volatile獲取首節點與節點對比判斷節點還是不是首節點。
39                 if (tabAt(tab, i) == f) {
40                     // 判斷是否是鏈表節點。
41                     if (fh >= 0) {
42                         // 記錄節點個數。
43                         binCount = 1;
44                         // 循環完成添加節點到鏈表。
45                         for (Node<K,V> e = f;; ++binCount) {
46                             K ek;
47                             if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
48                                 oldVal = e.val;
49                                 if (!onlyIfAbsent)
50                                     e.val = value;
51                                 break;
52                             }
53                             Node<K,V> pred = e;
54                             if ((e = e.next) == null) {
55                                 pred.next = new Node<K,V>(hash, key, value, null);
56                                 break;
57                             }
58                         }
59                     }
60                     // 如果是紅黑樹節點。
61                     else if (f instanceof TreeBin) {
62                         Node<K,V> p;
63                         binCount = 2;
64                         if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) {
65                             oldVal = p.val;
66                             if (!onlyIfAbsent)
67                                 p.val = value;
68                         }
69                     }
70                 }
71             }
72             // 如果添加到了鏈表節點,需要進一步判斷是否需要轉為紅黑樹。
73             if (binCount != 0) {
74                 // 如果鏈表上的節點數大於等於8。
75                 if (binCount >= TREEIFY_THRESHOLD)
76                     // 嘗試轉為紅黑樹。
77                     treeifyBin(tab, i);
78                 if (oldVal != null)
79                     // 返回原值。
80                     return oldVal;
81                 break;
82             }
83         }
84     }
85     // 集合容量加一並判斷是否要擴容。
86     addCount(1L, binCount);
87     return null;
88 }

修改容量並判斷是否需要擴容

1)嘗試對baseCount和CounterCell進行增加的操作,這些操作基於CAS原子操作,同時使用volatile保證順序和可見性。備用方法fullAddCount()則會死循環插入。

2)判斷是否要擴容操作,並且支持多個線程協助進行擴容。

 1 // 修改容量並判斷是否要擴容。
 2 private final void addCount(long x, int check) {
 3     CounterCell[] as; long b, s;
 4     // counterCells不為null,或者使用CAS對baseCount增加失敗了,說明產生了並發,需要進一步處理。
 5     // counterCells初始為null,如果不為null,說明產生了並發。
 6     // 如果counterCells仍然為null,但是在使用CAS對baseCount增加的時候失敗,表示產生了並發。
 7     if ((as = counterCells) != null || !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
 8         CounterCell a; long v; int m;
 9         boolean uncontended = true;
10         // 如果counterCells是null的,或者counterCells的個數小於0。
11         // 或者counterCells的每一個元素都是null。
12         // 或者用counterCells數組中隨機位置的值進行累加也失敗了。
13         if (as == null || (m = as.length - 1) < 0 ||
14             (a = as[ThreadLocalRandom.getProbe() & m]) == null ||
15             !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
16             // 繼續更新counterCells和baseCount。
17             fullAddCount(x, uncontended);
18             return;
19         }
20         // 刪除或清理節點時是-1,插入索引首節點0,第二個節點是1。
21         if (check <= 1)
22             return;
23         // 計算map元素個數。
24         s = sumCount();
25     }
26     // 如果check的值大於等於0,需要檢查是否要擴容。刪除或清理節點時是-1,此時不檢查。
27     if (check >= 0) {
28         Node<K,V>[] tab, nt; int n, sc;
29         // 當元素個數大於閾值,並且集合不為空,並且元素個數小於最大值。循環判斷,防止多線程同時擴容跳過if判斷。
30         while (s >= (long)(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) {
31             // 生成與n有關的標記,且n不變的情況下生成的一定是一樣的。
32             int rs = resizeStamp(n);
33             // sc在單線程時是大於等於0的,如果小於0說明有其他線程正在擴容。
34             // 如果小於0說明有線程執行了else里面的判斷,導致rs左移16位並在低位+2賦值給sc。
35             if (sc < 0) {
36                 // 在第一次左移16位的sc,經過第二次右移16位之后,還和rs相同,說明已經擴容完成。
37                 // 線程執行擴容,會使用CAS讓sc自增,如果sc和右移並累加后的rs相等,說明已經擴容完成。
38                 // 線程執行擴容,會使用CAS讓sc自增,如果sc和右移並累加最大值后的rs相等,說明已經擴容完成。
39                 // 如果下個節點是null,說明已經擴容完成。
40                 // 如果transferIndex小於等於0,說明集合已完成擴容,無法再分配任務。
41                 if ((sc >>> RESIZE_STAMP_SHIFT) != rs ||
42                     sc == rs + 1 ||// 此處應為 sc == (rs << RESIZE_STAMP_SHIFT) + 1
43                     sc == rs + MAX_RESIZERS ||// 此處應為 sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
44                     (nt = nextTable) == null ||
45                     transferIndex <= 0)
46                     // 跳出循環。
47                     break;
48                 // 使用CAS原子累加sc的值。
49                 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
50                     // 擴容。
51                     transfer(tab, nt);
52             }
53             // 如果sizeCtl大於或等於0,說明第一次擴容,並且使用CAS設置sizeCtl為rs左移后的負數,並且低位+2表示有2-1個線程正在擴容。
54             else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2))
55                 // 進行擴容操作。
56                 transfer(tab, null);
57             // 計算map元素個數,baseCount和counterCells數組存的總和。
58             s = sumCount();
59         }
60     }
61 }

幫助擴容方法

1)判斷集合已經完成初始化,並且節點是ForwordingNode類型(表示正在擴容),並且當前節點的子節點不為null,如果不成立則不需要擴容。

2)循環判斷是否擴容成功,如果沒有就使用CAS原子操作累加擴容的線程數,並且進行協助擴容。

 1 // 幫助擴容。
 2 final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
 3     Node<K,V>[] nextTab; int sc;
 4     // 如果表不為null,並且是fwd類型的節點,並且節點的子節點也不為null。
 5     if (tab != null && (f instanceof ForwardingNode) && (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
 6         // 得到標識符。
 7         int rs = resizeStamp(tab.length);
 8         // 如果nextTab沒有被並發修改,並且tab也沒有被並發修改,並且sizeCtl小於0說明還在擴容。
 9         while (nextTab == nextTable && table == tab && (sc = sizeCtl) < 0) {
10             // 在第一次左移16位的sc,經過第二次右移16位之后,還和rs相同,說明已經擴容完成。
11             // 線程執行擴容,會使用CAS讓sc自增,如果sc和右移並累加后的rs相等,說明已經擴容完成。
12             // 線程執行擴容,會使用CAS讓sc自增,如果sc和右移並累加最大值后的rs相等,說明已經擴容完成。
13             // 如果transferIndex小於等於0,說明集合已完成擴容,無法再分配任務。
14             if ((sc >>> RESIZE_STAMP_SHIFT) != rs ||
15                 sc == rs + 1 ||// 此處應為 sc == (rs << RESIZE_STAMP_SHIFT) + 1
16                 sc == rs + MAX_RESIZERS ||// 此處應為 sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS
17                 transferIndex <= 0)
18                 // 跳出循環。
19                 break;
20             // 使用CAS原子累加sc的值。
21             if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
22                 // 擴容。
23                 transfer(tab, nextTab);
24                 break;
25             }
26         }
27         return nextTab;
28     }
29     return table;
30 }

擴容方法

1)根據CPU核心數平均分配給每個CPU相同大小的區間,如果不夠16個,默認就是16個。

2)有且只能由一個線程構建一個nextTable,這個nextTable是擴容后的數組(容量已經擴大)。

3)外層使用for循環處理每個區間里的根節點,內層使用while循環讓線程領取未擴容的區間。

4)處理每個區間的頭節點,如果頭結點為空,則直接放置一個ForwordingNode,通知其他線程幫助擴容。

5)處理每個區間的頭節點,如果頭結點不為空,並且hash不為-1,那么就同步頭節點,開始擴容。判斷頭結點是鏈表還是紅黑樹:如果是鏈表,則拆分為高低兩個鏈表。如果是紅黑樹,拆分為高低兩個紅黑樹,並判斷是否需要轉為鏈表。

  1 // 進行擴容操作。
  2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  3     int n = tab.length, stride;
  4     // 根據cpu個數找出擴容時的最小分組,最小是16。
  5     if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  6         stride = MIN_TRANSFER_STRIDE;
  7     // 表示第一次擴容,因為在addCount()方法中,第一次擴容的時候傳入的nextTab的值是null。
  8     if (nextTab == null) {
  9         try {
 10             // 創建新的擴容后的節點數組。
 11             @SuppressWarnings("unchecked")
 12             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
 13             // 將新的數組賦值給nextTab。
 14             nextTab = nt;
 15         } catch (Throwable ex) {
 16             // 擴容失敗,設置sizeCtl為最大值。
 17             sizeCtl = Integer.MAX_VALUE;
 18             return;
 19         }
 20         // 將新的數組賦值給nextTable。
 21         nextTable = nextTab;
 22         // 記錄要擴容的區間最大值,說明是逆序遷移,從高位向低位遷移。
 23         transferIndex = n;
 24     }
 25     // 設置擴容后的容量。
 26     int nextn = nextTab.length;
 27     // 創建一個fwd節點,用於占位,fwd節點的hash默認為-1。當別的線程發現這個槽位中是fwd類型的節點,則跳過這個節點。
 28     ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 29     // 如果是false,需要處理區間上的當前位置,如果是true,說明需要處理區間上的下一個位置。
 30     boolean advance = true;
 31     // 完成狀態,如果是true,就結束此方法。
 32     boolean finishing = false;
 33     // 死循環,i表示最大下標,bound表示最小下標。
 34     for (int i = 0, bound = 0;;) {
 35         Node<K,V> f; int fh;
 36         // 循環判斷是否要處理區間上的下一個位置,每個線程都會在這個循環里獲取區間。
 37         while (advance) {
 38             int nextIndex, nextBound;
 39             // i自減一並判斷是否大於等於bound,以及是否已經完成了擴容。
 40             // 如果i自減后大於等於bound並且未完成擴容,說明需要處理當前i位置上的節點,跳出while循環。
 41             // 如果i自減后小於bound並且未完成擴容,說明區間上沒有節點需要處理,在while循環里繼續判讀。
 42             // 如果已經完成擴容,跳出while循環。
 43             if (--i >= bound || finishing)
 44                 // 跳出while循環。
 45                 advance = false;
 46             // 如果要擴容的區間最大值小於等於0,說明沒有區間需要擴容了。
 47             else if ((nextIndex = transferIndex) <= 0) {
 48                 // i會在下面的if塊里判斷,從而進入完成狀態判斷。
 49                 i = -1;
 50                 // 跳出while循環。
 51                 advance = false;
 52             }
 53             // 首次while循環進入,CAS判斷transferIndex和nextIndex是否一致,將transferIndex修改為最大值。
 54             else if (U.compareAndSwapInt(this, TRANSFERINDEX, nextIndex,
 55                      nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {
 56                 // 當前線程處理區間的最小下標。
 57                 bound = nextBound;
 58                 // 初次對i賦值,當前線程處理區間的最大下標。
 59                 i = nextIndex - 1;
 60                 // 跳出while循環。
 61                 advance = false;
 62             }
 63         }
 64         // 判讀是否完成擴容。
 65         // 如果i小於0,表示已經處理了最后一段空間。
 66         // 如果i大於等於原容量,表示超過下標最大值。
 67         // 如果i加上原容量大於等於新容量,表示超過下標最大值。
 68         if (i < 0 || i >= n || i + n >= nextn) {
 69             int sc;
 70             // 如果完成擴容,finishing為true,表示最后一個線程完成了擴容。
 71             if (finishing) {
 72                 // 刪除成員變量。
 73                 nextTable = null;
 74                 // 更新集合。
 75                 table = nextTab;
 76                 // 更新閾值。
 77                 sizeCtl = (n << 1) - (n >>> 1);
 78                 return;
 79             }
 80             // 如果沒完成擴容,當前線程完成這段區間的擴容,將sc的低16位減1。
 81             if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
 82                 // 如果判斷是否是最后一個擴容線程,如果不等於,說明還有其他線程在擴容,當前線程返回。
 83                 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
 84                     return;
 85                 // 如果相等,說明當前最后一個線程完成擴容,擴容結束,並再次進入while循環檢查一次。
 86                 finishing = advance = true;
 87                 // 再次循環檢查一下整張表。
 88                 i = n;
 89             }
 90         }
 91         // 正常處理區間,如果原數組i位置是null,就使用fwd占位。
 92         else if ((f = tabAt(tab, i)) == null)
 93             // 如果成功寫入fwd占位,進入while循環,繼續處理區間的下一個節點。
 94             advance = casTabAt(tab, i, null, fwd);
 95         // 正常處理區間,如果原數組i位置不是null,並且hash值是-1,說明別的線程已經處理過了。
 96         else if ((fh = f.hash) == MOVED)
 97             // 進入while循環,繼續處理區間的下一個節點。
 98             advance = true;
 99         // 到這里,說明這個位置有實際值了,且不是占位符。
100         else {
101             // 對這個節點上鎖,防止添加元素的時候向鏈表插入數據。
102             synchronized (f) {
103                 // 判斷i下標處的桶節點是否和f相同,二次校驗。
104                 if (tabAt(tab, i) == f) {
105                     // 聲明高位桶和低位桶。
106                     Node<K,V> ln, hn;
107                     // 如果f的hash值大於0,表示是鏈表結構。紅黑樹的hash默認是-2。
108                     if (fh >= 0) {
109                         // 獲取原容量最高位同節點hash值的與運算結果,用來判斷將該節點放到高位還是低位。
110                         int runBit = fh & n;
111                         // 定義尾節點,暫時取f節點,后面會更新。
112                         Node<K,V> lastRun = f;
113                         // 遍歷這個節點。
114                         for (Node<K,V> p = f.next; p != null; p = p.next) {
115                             // 獲取原容量最高位同節點hash值的與運算結果,用來判斷將該節點放到高位還是低位。
116                             int b = p.hash & n;
117                             // 如果節點的hash值和首節點的hash值,同原容量最高位與運算的結果不同。
118                             if (b != runBit) {
119                                 // 更新runBit,用於下面判斷lastRun該賦值給ln還是hn。
120                                 runBit = b;
121                                 // 更新lastRun,保證后面的節點與自己的取於值相同,避免后面沒有必要的循環。
122                                 lastRun = p;
123                             }
124                         }
125                         // 如果最后更新的runBit是0,設置低位節點。
126                         if (runBit == 0) {
127                             ln = lastRun;
128                             hn = null;
129                         }
130                         // 如果最后更新的runBit是1,設置高位節點。
131                         else {
132                             hn = lastRun;
133                             ln = null;
134                         }
135                         // 再次循環,生成兩個鏈表,lastRun作為停止條件,這樣就是避免無謂的循環。
136                         for (Node<K,V> p = f; p != lastRun; p = p.next) {
137                             int ph = p.hash; K pk = p.key; V pv = p.val;
138                             // 如果與運算結果是0,那么創建低位節點。
139                             if ((ph & n) == 0)
140                                 ln = new Node<K,V>(ph, pk, pv, ln);
141                             // 如果與運算結果是1,那么創建高位節點。
142                             else
143                                 hn = new Node<K,V>(ph, pk, pv, hn);
144                         }
145                         // 設置低位鏈表,放在新數組的i位置。
146                         setTabAt(nextTab, i, ln);
147                         // 設置高位鏈表,放在新數組的i+n位置。
148                         setTabAt(nextTab, i + n, hn);
149                         // 將舊的鏈表設置成fwd占位符。
150                         setTabAt(tab, i, fwd);
151                         // 繼續處理區間的下一個節點。
152                         advance = true;
153                     }
154                     // 如果是紅黑樹結構。
155                     else if (f instanceof TreeBin) {
156                         TreeBin<K,V> t = (TreeBin<K,V>)f;
157                         TreeNode<K,V> lo = null, loTail = null;
158                         TreeNode<K,V> hi = null, hiTail = null;
159                         int lc = 0, hc = 0;
160                         // 遍歷。
161                         for (Node<K,V> e = t.first; e != null; e = e.next) {
162                             int h = e.hash;
163                             TreeNode<K,V> p = new TreeNode<K,V>(h, e.key, e.val, null, null);
164                             // 與運算結果為0的放在低位。
165                             if ((h & n) == 0) {
166                                 if ((p.prev = loTail) == null)
167                                     lo = p;
168                                 else
169                                     loTail.next = p;
170                                 loTail = p;
171                                 ++lc;
172                             }
173                             // 與運算結果為1的放在高位。
174                             else {
175                                 if ((p.prev = hiTail) == null)
176                                     hi = p;
177                                 else
178                                     hiTail.next = p;
179                                 hiTail = p;
180                                 ++hc;
181                             }
182                         }
183                         // 如果樹的節點數小於等於6,那么轉成鏈表,反之,創建一個新的樹。
184                         ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t;
185                         // 如果樹的節點數小於等於6,那么轉成鏈表,反之,創建一個新的樹。
186                         hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t;
187                         // 設置低位樹,放在新數組的i位置。
188                         setTabAt(nextTab, i, ln);
189                         // 設置高位數,放在新數組的i+n位置。
190                         setTabAt(nextTab, i + n, hn);
191                         // 將舊的樹設置成fwd占位符。
192                         setTabAt(tab, i, fwd);
193                         // 繼續處理區間的下一個節點。
194                         advance = true;
195                     }
196                 }
197             }
198         }
199     }
200 }

獲取方法

根據指定的鍵,返回對應的鍵值對,由於是讀操作,所以不涉及到並發問題,步驟如下:

1)判斷查詢的key對應數組的首節點是否null。

2)先判斷數組的首節點是否為尋找的對象。

3)如果首節點不是,並且是紅黑樹結構,另做處理。

4)如果是鏈表結構,遍歷整個鏈表查詢。

5)如果都不是,那就返回null值。

 1 // 獲取元素。
 2 public V get(Object key) {
 3     Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
 4     // 計算hash,並保證hash一定大於零,負數表示在擴容或者是樹節點。
 5     int h = spread(key.hashCode());
 6     // 如果集合不為null,並且集合長度大於0,並且指定位置上的元素不為null。
 7     if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {
 8         // 如果hash相等。
 9         if ((eh = e.hash) == h) {
10             // 如果首節點是要找的元素。
11             if ((ek = e.key) == key || (ek != null && key.equals(ek)))
12                 return e.val;
13         }
14         // 如果正在擴容或者是樹節點。
15         else if (eh < 0)
16             // 嘗試查找元素,找到返回元素,找不到返回null。
17             return (p = e.find(h, key)) != null ? p.val : null;
18         // 如果不是首節點,則遍歷集合查找。
19         while ((e = e.next) != null) {
20             if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
21                 return e.val;
22         }
23     }
24     return null;
25 }

刪除方法

刪除操作,可以看成是用null替代原來的節點,因此合並在這個方法中,由這個方法一起實現刪除操作和替換操作。

replaceNode()方法中的三個參數,key表示想要刪除的鍵,value表示想要替換的元素,cv表示想要刪除的key對應的值。

 1 // 刪除元素。
 2 public V remove(Object key) {
 3     return replaceNode(key, null, null);
 4 }
 5 
 6 // 刪除元素
 7 final V replaceNode(Object key, V value, Object cv) {
 8     // 計算hash,並保證hash一定大於零,負數表示在擴容或者是樹節點。
 9     int hash = spread(key.hashCode());
10     // CAS經典寫法,不成功無限重試。
11     for (Node<K,V>[] tab = table;;) {
12         Node<K,V> f; int n, i, fh;
13         // 如果集合是null,或者集合長度是0,或者指定位置上的元素是null。
14         if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null)
15             // 跳出循環。
16             break;
17         // 如果獲取到節點不為null,並且節點的hash為-1,則表示節點在擴容。
18         else if ((fh = f.hash) == MOVED)
19             // 幫助擴容。
20             tab = helpTransfer(tab, f);
21         // 產生hash碰撞,並且沒有擴容操作。
22         else {
23             V oldVal = null;
24             // 是否進入了同步代碼。
25             boolean validated = false;
26             // 鎖住節點。
27             synchronized (f) {
28                 // 這里volatile獲取首節點與節點對比判斷節點還是不是首節點。
29                 if (tabAt(tab, i) == f) {
30                     // 判斷是否是鏈表節點。
31                     if (fh >= 0) {
32                         validated = true;
33                         // 循環查找指定元素。
34                         for (Node<K,V> e = f, pred = null;;) {
35                             K ek;
36                             // 找到元素了。
37                             if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
38                                 V ev = e.val;
39                                 // 如果cv為null,或者cv不為null時cv和指定元素上的值相同,才更新或者刪除節點。
40                                 if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {
41                                     oldVal = ev;
42                                     // 如果新值不為null,替換。
43                                     if (value != null)
44                                         e.val = value;
45                                     // 如果新值是null,並且當前節點非首結點,刪除。
46                                     else if (pred != null)
47                                         pred.next = e.next;
48                                     // 如果新值是null,並且當前節點是首結點,刪除。
49                                     else
50                                         setTabAt(tab, i, e.next);
51                                 }
52                                 break;
53                             }
54                             pred = e;
55                             // 如果遍歷集合也沒有找到。
56                             if ((e = e.next) == null)
57                                 // 跳出循環。
58                                 break;
59                         }
60                     }
61                     // 如果是紅黑樹節點。
62                     else if (f instanceof TreeBin) {
63                         validated = true;
64                         TreeBin<K,V> t = (TreeBin<K,V>)f;
65                         TreeNode<K,V> r, p;
66                         // 找到元素了。
67                         if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) {
68                             V pv = p.val;
69                             // 如果cv為null,或者cv不為null時cv和指定元素上的值相同,才更新或者刪除節點。
70                             if (cv == null || cv == pv || (pv != null && cv.equals(pv))) {
71                                 oldVal = pv;
72                                 if (value != null)
73                                     p.val = value;
74                                 else if (t.removeTreeNode(p))
75                                     setTabAt(tab, i, untreeify(t.first));
76                             }
77                         }
78                     }
79                 }
80             }
81             // 如果進入了同步代碼。
82             if (validated) {
83                 // 如果更新或者刪除了節點。
84                 if (oldVal != null) {
85                     // 如果value為null,說明是刪除操作。
86                     if (value == null)
87                         // 將數組長度減一。
88                         addCount(-1L, -1);
89                     return oldVal;
90                 }
91                 break;
92             }
93         }
94     }
95     return null;
96 }

計算集合容量

ConcurrentHashMap中baseCount用於保存tab中元素總數,但是並不准確,因為多線程同時增刪改,會導致baseCount修改失敗,此時會將元素變動存儲於counterCells數組內。

當需要統計當前的size的時候,除了要統計baseCount之外,還需要加上counterCells中的每個桶的值。

值得一提的是即使如此,統計出來的依舊不是當前tab中元素的准確值,在多線程環境下統計前后並不能暫停線程操作,因此無法保證准確性。

 1 // 計算集合容量。
 2 public int size() {
 3     long n = sumCount();
 4     return ((n < 0L) ? 0 : (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE : (int)n);
 5 }
 6 
 7 // 計算集合容量,baseCount和counterCells數組存的總和。
 8 final long sumCount() {
 9     CounterCell[] as = counterCells; CounterCell a;
10     long sum = baseCount;
11     if (as != null) {
12         for (int i = 0; i < as.length; ++i) {
13             if ((a = as[i]) != null)
14                 sum += a.value;
15         }
16     }
17     return sum;
18 }

Hashtable、Collections.synchronizedMap()、ConcurrentHashMap之間的區別

Hashtable是線程安全的哈希表,它是通過synchronized來保證線程安全的;即,多線程通過同一個“對象的同步鎖”來實現並發控制。Hashtable在線程競爭激烈時,效率比較低(此時建議使用ConcurrentHashMap)。當一個線程訪問Hashtable的同步方法時,其它線程如果也在訪問Hashtable的同步方法時,可能會進入阻塞狀態。

Collections.synchronizedMap()使用了synchronized同步關鍵字來保證對Map的操作是線程安全的。

ConcurrentHashMap是線程安全的哈希表。在JDK1.7中它是通過“鎖分段”來保證線程安全的,本質上也是一個“可重入的互斥鎖”(ReentrantLock)。多線程對同一個片段的訪問,是互斥的;但是,對於不同片段的訪問,卻是可以同步進行的。在JDK1.8中是通過使用CAS原子更新、volatile關鍵字、synchronized可重入鎖實現的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM