8.並發容器ConcurrentHashMap#put方法解析


jdk1.7.0_79

  HashMap可以說是每個Java程序員用的最多的數據結構之一了,無處不見它的身影。關於HashMap,通常也能說出它不是線程安全的。這篇文章要提到的是在多線程並發環境下的HashMap——ConcurrentHashMap,顯然它必然是線程安全的,同樣我們不可避免的要討論散列表,以及它是如何實現線程安全的,它的效率又是怎樣的,因為對於映射容器還有一個Hashtable也是線程安全的但它似乎只出現在筆試、面試題里,在現實編碼中它已經基本被遺棄。

  關於HashMap的線程不安全,在多線程並發環境下它所帶來的影響絕不僅僅是出現臟數據等數據不一致的情況,嚴重的是它有可能帶來程序死循環,這可能有點不可思議,但確實在不久前的項目里同事有遇到了CPU100%滿負荷運行,分析結果是在多線程環境下HashMap導致程序死循環。對於Hashtable,查看其源碼可知,Hashtable保證線程安全的方式就是利用synchronized關鍵字,這樣會導致效率低下,但對於ConcurrentHashMap則采用了不同的線程安全保證方式——分段鎖。它不像Hashtable那樣將整個table鎖住而是將數組元素分段加鎖,如果線程1訪問的元素在分段segment1,而線程2訪問的元素在分段segment2,則它們互不影響可以同時進行操作。如果合理的進行分段就是其關鍵問題。

  ConcurrentHashMap和HashMap的結果基本一致,同樣也是Entry作為存放數據的對象,另外一個就是上面提到的分段鎖——Segment。它繼承自ReentrantLock(關於ReentrantLock,可參考《5.Lock接口及其實現ReentrantLock》),故它具有ReentrantLock一切特性——可重入,獨占等。

  ConcurrentHashMap的結構圖如下所示:

  可以看到相比較於HashMapConcurrentHashMap在Entry數組之上是Segment,這個就是我們上面提到的分段鎖,合理的確定分段數就能更好的提高並發效率,我們來看ConcurrentHashMap是如何確定分段數的。 

  ConcurrentHashMap的初始化時通過其構造函數public ConcurrentHashMap(int initialCapacity, float loadFactorint concurrencyLevel)完成的,若在不指定各參數的情況下,初始容量initialCapacity=DAFAULT_INITIAL_CAPACITY=16,負載因子loadFactor=DEFAULT_LOAD_FACTOR=0.75f,並發等級concurrencyLevel=DEFAULT_CONCURRENCY_LEVEL=16,前兩者和HashMap相同。至於負載因子表示一個散列表的空間的使用程度initialCapacity(總容量) * loadFactor(負載因子) = 數據量,有此公式可知,若負載因子越大,則散列表的裝填程度越高,也就是能容納更多的元素,但這樣元素就多,鏈表就大,此時索引效率就會降低。若負載因子越小,則相反,索引效率就會高,換來的代價就是浪費的空間越多並發等級它表示估計最多有多少個線程來共同修改這個Map,稍后可以看到它和segment數組相關,segment數組的長度就是通過concurrencyLevel計算得出。

 1 //以默認參數為例initalCapacity=16,loadFactor=0.75,concurrencyLevel=16 
 2 public ConcurrentHashMap(int initalCapacity, float loadFactor, int concurrencyLevel) { 
 3   if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 
 4     throw new IllegalArgumentException(); 
 5   if (concurrencyLevel > MAX_SEGMENTS) 
 6     concurrencyLevel = MAX_SEGMENTS; 
 7   int sshift = 0; 
 8   int ssize = 1;//segment數組長度 
 9   while (ssize < concurrencyLevel) { 
10     ++sshift; 
11     ssize <= 1; 
12   }//經過ssize左移4位后,ssize=16,ssift=4 
13 /*segmentShift用於參與散列運算的位數,segmentMask是散列運算的掩碼,這里有關的散列函數運算和HashMap有類似之處*/ 
14   this.segmentShift = 32 – ssift;//段偏移量segmentShift=28 
15   this.segmentMask = ssize – 1;//段掩碼segmentMask=15(1111) 
16   if (initialCapacity > MAXIMUM_CAPACITY) 
17     initialCapacity = MAXIMUM_CAPACITY; 
18   int c = initialCapacity / ssize;//c = 1 
19   if (c * ssize < initialCapacity) 
20     ++c; 
21   int cap = MIN_SEGMENT_TABLE_CAPACITY;//MIN_SEGMENT_TABLE_CAPACITY=2 
22   while (cap < c)//cap = 2, c = 1,false 
23       cap <<= 1;//cap是segment里HashEntry數組的長度,最小為2 
24 /*創建segments數組和segment[0]*/ 
25   Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[]) new HashEntry[cap]);//參數意為:負載因子=1,數據容量=(int)(2 * 0.75)=1,總容量=2,故每個Segment的HashEntry總容量為2,實際數據容量為1 
26   Segment<K,V> ss = (Segment<K,V>[])new Segment[ssize];//segments數組大小為16 
27   UNSAFE.putOrderedObject(ss, SBASE, s0); 
28   this.segments = ss; 
29 } 

  以上就是整個初始化過程,主要是初始化segments的長度大小以及通過負載因子確定每個Segment的容量大小。確定好Segment過后,接下來的重點就是如何准確定位Segment。定位Segment的方法就是通過散列函數來定位,先通過hash方法對元素進行二次散列,這個算法較為復雜,其目的只有一個——減少散列沖突,使元素能均勻分布在不同的Segment上,提高容器的存取效率。 

  我們通過最直觀最常用的put方法來觀察ConcurrentHashMap是如何通過key值計算hash值在定位到Segment的 

 1 //ConcurrentHashMap#put 
 2 public V put(K key, V value) { 
 3   Segment<K,V> s; 
 4   if (value == null) 
 5     throw new NullPointerException(); 
 6   int hash = hash(key);//根據散列函數,計算出key值的散列值 
 7   int j = (hash >>> segmentShift) & segmentMask;//這個操作就是定位Segment的數組下標,jdk1.7之前是segmentFor返回Segment,1.7之后直接就取消了這個方法,直接計算數組下標,然后通過偏移量底層操作獲取Segment 
 8   if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck 
 9       (segments, (j << SSHIFT) + SBASE)) == null) //  in ensureSegment 
10     s = ensureSegment(j);//通過便宜量定位不到就調用ensureSegment方法定位Segment 
11   return s.put(key, hash, value, false); 
12 } 

  Segment.put方法就是將鍵、值構造為Entry節點加入到對應的Segment段里,如果段中已經有元素(即表示兩個key鍵值的hash值重復)則將最新加入的放到鏈表的頭),整個過程必然是加鎖安全的。 

 

  不妨繼續深入Segment.put方法 :

 1 //Segment#put 
 2 final V put(K key, int hash, V value, boolean onlyIfAbsent) { 
 3   HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);//非阻塞獲取鎖,獲取成功node=null,失敗 
 4   V oldValue; 
 5   try { 
 6     HashEntry<K,V>[] tab = table;//Segment對應的HashEntry數組長度 
 7     int index = (tab.length - 1) & hash; 
 8     HashEntry<K,V> first = entryAt(tab, index);//獲取HashEntry數組的第一個值 
 9     for (HashEntry<K,V> e = first;;) { 
10       if (e != null) {//HashEntry數組已經存在值 
11         K k; 
12         if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {//key值和hash值都相等,則直接替換舊值 
13           oldValue = e.value; 
14           if (!onlyIfAbsent) { 
15             e.value = value; 
16             ++modCount; 
17           } 
18           break; 
19         } 
20         e = e.next;//不是同一個值則繼續遍歷,直到找到相等的key值或者為null的HashEntry數組元素 
21       } 
22       else {//HashEntry數組中的某個位置元素為null 
23         if (node != null) 
24           node.setNext(first);//將新加入節點(key)的next引用指向HashEntry數組第一個元素 
25         else//已經獲取到了Segment鎖 
26           node = new HashEntry<K,V>(hash, key, value, first) 
27         int c = count + 1; 
28         if (c > threshold && tab.lenth < MAXIUM_CAPACITY)//插入前先判斷是否擴容,ConcurrentHashMap擴容與HashMap不同,ConcurrentHashMap只擴Segment的容量,HashMap則是整個擴容 
29           rehash(node); 
30         else 
31           setEntryAt(tab, index, node);//設置為頭節點 
32         ++modCount;//總容量 
33         count = c; 
34         oldValue = null; 
35         break; 
36       } 
37      } 
38   } finally { 
39     unlock(); 
40   } 
41   return oldValue; 
42 } 

  上面大致就是ConcurrentHashMap加入一個元素的過程,需要明白的就是ConcurrentHashMap分段鎖的概念。在JDK1.6中定位Segment較為簡單,直接計算出Segment數組下標后就返回具體的Segment,而JDK1.7則通過偏移量來計算,算出為空時,還有一次檢查獲取Segment,猜測是1.7使用底層native是為了提高效率,JDK1.8的ConcurrentHashMap又有不同,暫未深入研究,它的數據結果似乎變成了紅黑樹。 

  有關ConcurrentHashMap的get方法不再分析,過程總結為一句話:根據key值計算出hash值,根據hash值計算出對應的Segment,再在Segment下的HashEntry鏈表遍歷查找。 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM