JDK5中添加了新的concurrent包,相對同步容器而言,並發容器通過一些機制改進了並發性能。因為同步容器將所有對容器狀態的訪問都串行化了,這樣保證了線程的安全性,所以這種方法的代價就是嚴重降低了並發性,當多個線程競爭容器(bins)時,吞吐量嚴重降低。因此Java5.0開始針對多線程並發訪問設計,提供了並發性能較好的並發容器,引入了Java.util.concurrent包,在線程安全的基礎上提供了更好的寫並發能力,但同時降低了對讀一致性的要求(這點挺符合CAP理論的)。與Vector和Hashtable、Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的並發容器主要解決了兩個問題:
1)根據具體場景進行設計,盡量避免synchronized,提供並發性。
2)定義了一些並發安全的復合操作,並且保證並發環境下的迭代操作不會出錯。
util.concurrent中容器在迭代時,可以不封裝在synchronized中,可以保證不拋異常,但是未必每次看到的都是"最新的、當前的"數據。
並發編程實踐中,ConcurrentHashMap是一個經常被使用的數據結構,它的設計與實現非常精巧,大量的利用了volatile,final,CAS等lock-free技術來減少鎖競爭對於性能的影響,無論對於Java並發編程的學習還是Java內存模型的理解,ConcurrentHashMap的設計以及源碼都值得非常仔細的閱讀與揣摩。
HashTable是一個線程安全的類,它使用synchronized來鎖住整張Hash表來實現線程安全,即每次鎖住整張表讓線程獨占,相當於所有線程進行讀寫時都去競爭一把鎖,導致效率非常低下。ConcurrentHashMap可以做到讀取數據不加鎖,並且其內部的結構可以讓其在進行寫操作的時候能夠將鎖的粒度保持地盡量地小,允許多個修改操作並發進行,其關鍵在於使用了鎖分離技術。它使用了多個鎖來控制對hash表的不同部分進行的修改。ConcurrentHashMap內部使用段(Segment)來表示這些不同的部分,每個段其實就是一個小的Hashtable,它們有自己的鎖。只要多個修改操作發生在不同的段上,它們就可以並發進行。
有些方法需要跨段,比如size()和containsValue(),它們可能需要鎖定整個表而不僅僅是某個段,這需要按順序鎖定所有段,操作完畢后,又按順序釋放所有段的鎖。這里“按順序”是很重要的,否則極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,並且其成員變量實際上也是final的,但是,僅僅是將數組聲明為final的並不保證數組成員也是final的,這需要實現上的保證。這可以確保不會出現死鎖,因為獲得鎖的順序是固定的。
ConcurrentHashMap為了提高本身的並發能力,在內部采用了一個叫做Segment的結構,一個Segment其實就是一個類HashTable的結構,Segment內部維護了一個鏈表數組,我們用下面這一幅圖來看下ConcurrentHashMap的內部結構詳情圖:
從宏觀上來看,大體結構圖可以簡單描繪為如下:
不難看出,ConcurrentHashMap采用了二次hash的方式,第一次hash將key映射到對應的segment,而第二次hash則是映射到segment的不同桶(bucket)中。
為什么要用二次hash,主要原因是為了構造分離鎖,使得對於map的修改不會鎖住整個容器,提高並發能力。當然,沒有一種東西是絕對完美的,二次hash帶來的問題是整個hash的過程比hashmap單次hash要長,所以,如果不是並發情形,不要使用concurrentHashmap。
4. 源代碼解析
Segment
static final class Segment<K,V> extends ReentrantLock implements Serializable { transient volatile int count; //Segment中元素的數量 transient int modCount; //對table的大小造成影響的操作的數量(比如put或者remove操作) transient int threshold; //閾值,Segment里面元素的數量超過這個值那么就會對Segment進行擴容 final float loadFactor; //負載因子,用於確定threshold transient volatile HashEntry<K,V>[] table; //鏈表數組,數組中的每一個元素代表了一個鏈表的頭部 }
HashEntry
static final class HashEntry<K,V> { final K key; final int hash; volatile V value; final HashEntry<K,V> next; }
可以看到HashEntry的一個特點,除了value以外,其他的幾個變量都是final的,這樣做是為了防止鏈表結構被破壞,出現ConcurrentModification的情況。
ConcurrentHashMap構造函數
1 public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { 2 if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) 3 throw new IllegalArgumentException(); 4 5 if (concurrencyLevel > MAX_SEGMENTS) 6 concurrencyLevel = MAX_SEGMENTS; 7 8 // Find power-of-two sizes best matching arguments 9 int sshift = 0; 10 int ssize = 1; 11 while (ssize < concurrencyLevel) { 12 ++sshift; 13 ssize <<= 1; 14 } 15 segmentShift = 32 - sshift; 16 segmentMask = ssize - 1; 17 this.segments = Segment.newArray(ssize); 18 19 if (initialCapacity > MAXIMUM_CAPACITY) 20 initialCapacity = MAXIMUM_CAPACITY; 21 int c = initialCapacity / ssize; 22 if (c * ssize < initialCapacity) 23 ++c; 24 int cap = 1; 25 while (cap < c) 26 cap <<= 1; 27 28 for (int i = 0; i < this.segments.length; ++i) 29 this.segments[i] = new Segment<K,V>(cap, loadFactor); 30 }
CurrentHashMap的初始化一共有三個參數
initialCapacity:表示初始的容量
loadFactor:表示負載參數,最后一個是
concurrencyLevel:代表ConcurrentHashMap內部的Segment的數量
其中,concurrencyLevel 一經指定,不可改變,后續如果ConcurrentHashMap的元素數量增加導致ConrruentHashMap需要擴容,ConcurrentHashMap不會增加Segment的數量,而只會增加Segment中鏈表數組的容量大小,這樣的好處是擴容過程不需要對整個ConcurrentHashMap做rehash,而只需要對Segment里面的元素做一次rehash就可以了。
整個ConcurrentHashMap的初始化方法還是非常簡單的,先是根據concurrencyLevel來new出Segment,這里Segment的數量是不大於concurrencyLevel的最大的2的指數,就是說Segment的數量永遠是2的指數個,這樣的好處是方便采用移位操作來進行hash,加快hash的過程。接下來就是根據intialCapacity確定Segment的容量的大小,每一個Segment的容量大小也是2的指數,同樣使為了加快hash的過程。
這邊需要特別注意一下兩個變量,分別是segmentShift和segmentMask,這兩個變量在后面將會起到很大的作用,假設構造函數確定了Segment的數量是2的n次方,那么segmentShift就等於32減去n,而segmentMask就等於2的n次方減一。
ConcurrentHashMap之get操作
JDK1.6的代碼如下:
V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; return readValueUnderLock(e); // recheck } e = e.next; } } return null; }
1.6的jdk采用了樂觀鎖的方式處理了get方法,取出key對應的value的值,如果拿出的value的值是null,則可能這個key--value對正在put的過程中,如果出現這種情況,那么就加鎖來保證取出的value是完整的,如果不是null,則直接返回value。
JDK1.7代碼如下:
public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile(tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
1.7並沒有判斷value=null的情況,不知為何,仔細思考了一下,個人認為無論是1.6還是1.7的實現,實際上都是一種樂觀的方式,而樂觀的方式帶來的是性能上的提升,但同時也帶來數據的弱一致性,如果你的業務是強一致性的業務,可能就要考慮另外的解決辦法(用Collections包裝或者像jdk6中一樣二次加鎖獲取)
具體ConcurrentHashMap為什么會帶來數據的若一致性詳情可查看博客:
http://ifeve.com/concurrenthashmap-weakly-consistent/
ConcurrentHashMap之put操作
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
對於put,ConcurrentHashMap采用自旋鎖的方式,不同於1.6的直接獲取鎖
注:個人理解,這里采用自旋鎖可能作者是覺得在分段鎖的狀態下,並發的可能本來就比較小,並且鎖占用時間又並不是特別長,因此自旋鎖可以減小線程喚醒和切換的開銷。針對自旋鎖相關的詳情,請參看博文(原理詳情):http://blog.csdn.net/sunp823/article/details/49886051,比較測試:http://www.cnblogs.com/softidea/p/5530761.html
ConcurrentHashMap之remove操作
Remove操作的前面一部分和前面的get和put操作一樣,都是定位Segment的過程,然后再調用Segment的remove方法:
1 final V remove(Object key, int hash, Object value) { 2 if (!tryLock()) 3 scanAndLock(key, hash); 4 V oldValue = null; 5 try { 6 HashEntry<K,V>[] tab = table; 7 int index = (tab.length - 1) & hash; 8 HashEntry<K,V> e = entryAt(tab, index); 9 HashEntry<K,V> pred = null; 10 while (e != null) { 11 K k; 12 HashEntry<K,V> next = e.next; 13 if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { 14 V v = e.value; 15 if (value == null || value == v || value.equals(v)) { 16 if (pred == null) 17 setEntryAt(tab, index, next); 18 else 19 pred.setNext(next); 20 ++modCount; 21 --count; 22 oldValue = v; 23 } 24 break; 25 } 26 pred = e; 27 e = next; 28 } 29 } finally { 30 unlock(); 31 } 32 return oldValue; 33 }
首先remove操作也是確定需要刪除的元素的位置,不過這里刪除元素的方法不是簡單地把待刪除元素的前面的一個元素的next指向后面一個就完事了,前面已經說過HashEntry中的next是final的,一經賦值以后就不可修改,在定位到待刪除元素的位置以后,程序就將待刪除元素前面的那一些元素全部復制一遍,然后再一個一個重新接到鏈表上去,看一下下面這一幅圖來了解這個過程:
假設鏈表中原來的元素如上圖所示,現在要刪除元素3,那么刪除元素3以后的鏈表就如下圖所示:
注意:下面的圖1和2的元素順序相反了,為什么這樣,不防再仔細看看源碼或者再讀一遍上面remove的分析過程,元素復制是從待刪除元素位置起將前面的元素逐一復制的,然后再將后面的鏈接起來。
ConcurrentHashMap之size操作
如果我們要統計整個ConcurrentHashMap里元素的大小,就必須統計所有Segment里元素的大小后求和。Segment里的全局變量count是一個volatile變量,那么在多線程場景下,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?不是的,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之后可能累加前使用的count發生了變化,那么統計結果就不准了。所以最安全的做法,是在統計size的時候把所有Segment的put,remove和clean方法全部鎖住,但是這種做法顯然非常低效。因為在累加count操作過程中,之前累加過的count發生變化的幾率非常小,所以ConcurrentHashMap的做法是先嘗試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再采用加鎖的方式來統計所有Segment的大小。
那么ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?
前面我們提到了一個Segment中的有一個modCount變量,代表的是對Segment中元素的數量造成影響的操作的次數,這個值只增不減,size操作就是遍歷了兩次Segment,每次記錄Segment的modCount值,然后將兩次的modCount進行比較,如果相同,則表示期間沒有發生過寫入操作,就將原先遍歷的結果返回,如果不相同,則把這個過程再重復做一次,如果再不相同,則就需要將所有的Segment都鎖住,然后一個一個遍歷了,具體的實現大家可以看ConcurrentHashMap的源碼,這里就不貼了。
總結
concurrentHashmap主要是為並發設計,與Collections的包裝不同,他不是采用全同步的方式,而是采用非鎖get方式,通過數據的弱一致性帶來性能上的大幅提升,同時采用分段鎖的策略,提高並發能力。