ConcurrentHashMap的size方法是線程安全的嗎?


前言

之前在面試的過程中有被問到,ConcurrentHashMap的size方法是線程安全的嗎?
這個問題,確實沒有答好。這次來根據源碼來了解一下,具體是怎么一個實現過程。

ConcurrentHashMap的原理與結構

我們都知道Hash表的結構是數組加鏈表,就是一個數組中,每一個元素都是一個鏈表,有時候也會形象的把數組中的每個元素稱為一個“”。在插入元素的時候,首先通過對傳入的鍵(key),進行一個哈希函數的處理,來確定元素應該存放於數組中哪個一個元素的鏈表中。
這種數據結構在很多計算機語言中都能找到其身影,在Java中如HashMap,ConcurrentHashMap等都是這種數據結構。

但是這中數據結構在實現HashMap的時候並不是線程安全的,因為在HashMap擴容的時候,是會將原先的鏈表遷移至新的鏈表數組中,在遷移過程中多線程情況下會有造成鏈表的死循環情況(JDK1.7之前的頭插法);還有就是在多線程插入的時候也會造成鏈表中數據的覆蓋導致數據丟失。

所以就出現了線程安全的HashMap類似的hash表集合,典型的就是HashTable和ConcurrentHashMap。
Hashtable實現線程安全的代價比較大,那就是在所有可能產生競爭方法里都加上了synchronized,這樣就會導致,當出現競爭的時候只有一個線程能對整個Hashtable進行操作,其他所有線程都需要阻塞等待當前獲取到鎖的線程執行完成。
這樣效率是非常低的。

而ConcurrentHashMap解決線程安全的方式就不一樣了,它避免了對整個Map進行加鎖,從而提高了並發的效率。
下面將具體介紹一下JDK1.7和1.8的實現。

JDK1.7中的ConcurrentHashMap

JDK1.7中的ConcurrentHashMap采用了分段鎖的形式,每一段為一個Segment類,它內部類似HashMap的結構,內部有一個Entry數組,數組的每個元素是一個鏈表。同時Segment類繼承自ReentrantLock
結構如下:
在這里插入圖片描述
在HashEntry中采用了volatile來修飾了HashEntry的當前值和next元素的值。所以get方法在獲取數據的時候是不需要加鎖的,這樣就大大的提升了執行效率。
在執行put()方法的時候會先嘗試獲取鎖(tryLock()),如果獲取鎖失敗,說明存在競爭,那么將通過scanAndLockForPut()方法執行自旋,當自旋次數達到MAX_SCAN_RETRIES時會執行阻塞鎖,直到獲取鎖成功。
源碼如下:

static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
	// 首先嘗試獲取鎖,獲取失敗則執行自旋,自旋次數超過最大長度,后改為阻塞鎖,直到獲取鎖成功。
     HashEntry<K,V> node = tryLock() ? null :
         scanAndLockForPut(key, hash, value);
     V oldValue;
     try {
         HashEntry<K,V>[] tab = table;
         int index = (tab.length - 1) & hash;
         HashEntry<K,V> first = entryAt(tab, index);
         for (HashEntry<K,V> e = first;;) {
             if (e != null) {
                 K k;
                 if ((k = e.key) == key ||
                     (e.hash == hash && key.equals(k))) {
                     oldValue = e.value;
                     if (!onlyIfAbsent) {
                         e.value = value;
                         ++modCount;
                     }
                     break;
                 }
                 e = e.next;
             }
             else {
                 if (node != null)
                     node.setNext(first);
                 else
                     node = new HashEntry<K,V>(hash, key, value, first);
                 int c = count + 1;
                 if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                     rehash(node);
                 else
                     setEntryAt(tab, index, node);
                 ++modCount;
                 count = c;
                 oldValue = null;
                 break;
             }
         }
     } finally {
         unlock();
     }
     return oldValue;
 }

JDK1.8后的ConcurrentHashMap

在JDK1.8中,放棄了Segment這種分段鎖的形式,而是采用了CAS+Synchronized的方式來保證並發操作的,采用了和HashMap一樣的結構,直接用數組加鏈表,在鏈表長度大於8的時候為了提高查詢效率會將鏈表轉為紅黑樹(鏈表定位數據的時間復雜度為O(N),紅黑樹定位數據的時間復雜度為O(logN))。
在代碼上也和JDK1.8的HashMap很像,也是將原先的HashEntry改為了Node類,但還是使用了volatile修飾了當前值和next的值。從而保證了在獲取數據時候的高效。
JDK1.8中的ConcurrentHashMap在執行put()方法的時候還是有些復雜的,主要是為了保證線程安全才做了一系列的措施。
源碼如下:
在這里插入圖片描述

  • 第一步通過key進行hash。
  • 第二步判斷是否需要初始化數據結構。
  • 第三步根據key定位到當前Node,如果當前位置為空,則可以寫入數據,利用CAS機制嘗試寫入數據,如果寫入失敗,說明存在競爭,將會通過自旋來保證成功。
  • 第四步如果當前的hashcode值等於MOVED則需要進行擴容(擴容時也使用了CAS來保證了線程安全)。
  • 第五步如果上面四步都不滿足,那么則通過synchronized阻塞鎖將數據寫入。
  • 第六步如果數據量大於TREEIFY_THRESHOLD時需要轉換成紅黑樹(默認為8)。

JDK1.8的ConcurrentHashMap的get()方法就還是比較簡單:

  • 根據keyhashcode尋址到具體的桶上。
  • 如果是紅黑樹則按照紅黑樹的方式去查找數據。
  • 如果是鏈表就按照遍歷鏈表的方式去查找數據。
public V get(Object key) {
     Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
     int h = spread(key.hashCode());
     if ((tab = table) != null && (n = tab.length) > 0 &&
         (e = tabAt(tab, (n - 1) & h)) != null) {
         if ((eh = e.hash) == h) {
             if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                 return e.val;
         }
         else if (eh < 0)
             return (p = e.find(h, key)) != null ? p.val : null;
         while ((e = e.next) != null) {
             if (e.hash == h &&
                 ((ek = e.key) == key || (ek != null && key.equals(ek))))
                 return e.val;
         }
     }
     return null;
 }

ConcurrentHashMap的size方法

JDK1.7中的ConcurrentHashMap的size方法,計算size的時候會先不加鎖獲取一次數據長度,然后再獲取一次,最多三次。比較前后兩次的值,如果相同的話說明不存在競爭的編輯操作,就直接把值返回就可以了。
但是如果前后獲取的值不一樣,那么會將每個Segment都加上鎖,然后計算ConcurrentHashMap的size值。
在這里插入圖片描述
JDK1.8中的ConcurrentHashMap的size()方法的源碼如下:

/**
 * {@inheritDoc}
 */
public int size() {
    long n = sumCount();
    return ((n < 0L) ? 0 :
            (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
            (int)n);
}

這個方法最大會返回int的最大值,但是ConcurrentHashMap的長度有可能超過int的最大值。
在JDK1.8中增加了mappingCount()方法,這個方法的返回值是long類型的,所以JDK1.8以后更推薦用這個方法獲取Map中數據的數量。

/**
 * @return the number of mappings
 * @since 1.8
 */
 public long mappingCount() {
     long n = sumCount();
     return (n < 0L) ? 0L : n; // ignore transient negative values
 }

無論是size()方法還是mappingCount()方法,核心方法都是sumCount()方法。
源碼如下:

final long sumCount() {
     CounterCell[] as = counterCells; CounterCell a;
     long sum = baseCount;
     if (as != null) {
         for (int i = 0; i < as.length; ++i) {
             if ((a = as[i]) != null)
                 sum += a.value;
         }
     }
     return sum;
 }

在上面sumCount()方法中我們看到,當counterCells為空時直接返回baseCount,當counterCells不為空時遍歷它並壘加到baseCount中。
先看baseCount

/**
 * Base counter value, used mainly when there is no contention,
 * but also as a fallback during table initialization
 * races. Updated via CAS.
 */
private transient volatile long baseCount;

baseCount是一個volatile變量,那么我們來看在put()方法執行時是如何使用baseCount的,在put方法的最后一段代碼中會調用addCount()方法,而addCount()方法的源碼如下:
在這里插入圖片描述
首先對baseCount做CAS自增操作。
如果並發導致了baseCount的CAS失敗了,則使用counterCells進行CAS。
如果counterCells的CAS也失敗了,那么則進入fullAddCount()方法,fullAddCount()方法中會進入死循環,直到成功為止。
在這里插入圖片描述
那么CountCell到底是個什么呢?
源碼如下:

/**
 * A padded cell for distributing counts.  Adapted from LongAdder
 * and Striped64.  See their internal docs for explanation.
 */
@sun.misc.Contended static final class CounterCell {
    volatile long value;
    CounterCell(long x) { value = x; }
}

一個使用了 @sun.misc.Contended 標記的類,內部一個 volatile 變量。
@sun.misc.Contended 這個注解是為了防止“偽共享”。
那么什么是偽共享呢?

緩存系統中是以緩存行(cache line)為單位存儲的。緩存行是2的整數冪個連續字節,一般為32-256個字節。最常見的緩存行大小是64個字節。當多線程修改互相獨立的變量時,如果這些變量共享同一個緩存行,就會無意中影響彼此的性能,這就是偽共享。

所以偽共享對性能危害極大。
JDK 8 版本之前沒有這個注解,JDK1.8之后使用拼接來解決這個問題,把緩存行加滿,讓緩存之間的修改互不影響。

總結

無論是JDK1.7還是JDK1.8中,ConcurrentHashMap的size()方法都是線程安全的,都是准確的計算出實際的數量,但是這個數據在並發場景下是隨時都在變的。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM