ConcurrentHashMap采用了非常精妙的"分段鎖"策略,ConcurrentHashMap的主干是個Segment數組。Segment繼承了ReentrantLock,所以它就是一種可重入鎖(ReentrantLock)。在ConcurrentHashMap,一個Segment就是一個子哈希表,Segment里維護了一個HashEntry數組,並發環境下,對於不同Segment的數據進行操作是不用考慮鎖競爭的。
當問到我們有關於ConcurrentHashMap的工作原理以及實現時,可以從以下幾個方面說:
-
ConcurrentHashMap的優點,即HashMap和HashTable的缺點。
- ConcurrentHashMap兩個靜態內部類:HsahEntry和segment
- ConcurrentHashMap含有16個segment
- ConcurrentHashMap的put方法:根據hash值找到對應的segment,segment是分段鎖。
- ConcurrentHashMap的get方法:count>0,hash找到HashEntry,hash相等並且key相同,若取value為null,加鎖重新獲取。
- ConcurrentHashMap的remove方法:加鎖,每刪除一個元素就將那之前的元素克隆一邊。因為設置為第一次next之后不能再改變。
- ConcurrentHashMap的size()方法:2次不鎖住segment方式統計各個segment的大小,若count發生變化,采用加鎖方式統計。modCount變量,在put,remove和clean方法里操作元素,modcount加1.
ConcurrentHashMap是Java1.5中引用的一個線程安全的支持高並發的HashMap集合類。
1、線程不安全的HashMap
因為多線程環境下,使用Hashmap進行put操作會引起死循環,導致CPU利用率接近100%,所以在並發情況下不能使用HashMap。
2、效率低下的HashTable
HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下。
因為當一個線程訪問HashTable的同步方法時,其他線程訪問HashTable的同步方法時,可能會進入阻塞或輪詢狀態。
如線程1使用put進行添加元素,線程2不但不能使用put方法添加元素,並且也不能使用get方法來獲取元素,所以競爭越激烈效率越低。
3、鎖分段技術
HashTable容器在競爭激烈的並發環境下表現出效率低下的原因,是因為所有訪問HashTable的線程都必須競爭同一把鎖,
那假如容器里有多把鎖,每一把鎖用於鎖容器其中一部分數據,那么當多線程訪問容器里不同數據段的數據時,線程間就不會存在鎖競爭,
從而可以有效的提高並發訪問效率,這就是ConcurrentHashMap所使用的鎖分段技術。
首先將數據分成一段一段的存儲,然后給每一段數據配一把鎖,當一個線程占用鎖訪問其中一個段數據的時候,其他段的數據也能被其他線程訪問。
有些方法需要跨段,比如size()和containsValue(),它們可能需要鎖定整個表而而不僅僅是某個段,這需要按順序鎖定所有段,操作完畢后,又按順序釋放所有段的鎖。
這里“按順序”是很重要的,否則極有可能出現死鎖,在ConcurrentHashMap內部,段數組是final的,並且其成員變量實際上也是final的,
但是,僅僅是將數組聲明為final的並不保證數組成員也是final的,這需要實現上的保證。這可以確保不會出現死鎖,因為獲得鎖的順序是固定的。
oncurrentHashMap 類中包含兩個靜態內部類 HashEntry 和 Segment。
HashEntry 用來封裝映射表的鍵 / 值對;Segment 用來充當鎖的角色,每個 Segment 對象守護整個散列映射表的若干個桶。
每個桶是由若干個 HashEntry 對象鏈接起來的鏈表。一個 ConcurrentHashMap 實例中包含由若干個 Segment 對象組成的數組。
每個Segment守護者一個HashEntry數組里的元素,當對HashEntry數組的數據進行修改時,必須首先獲得它對應的Segment鎖。
5、HashEntry類
static final class HashEntry<K,V> { final K key; // 聲明 key 為 final 型 final int hash; // 聲明 hash 值為 final 型 volatile V value; // 聲明 value 為 volatile 型 final HashEntry<K,V> next; // 聲明 next 為 final 型 HashEntry(K key, int hash, HashEntry<K,V> next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } }
每個HashEntry代表Hash表中的一個節點,在其定義的結構中可以看到,除了value值沒有定義final,其余的都定義為final類型,我們知道Java中關鍵詞final修飾的域成為最終域。
用關鍵詞final修飾的變量一旦賦值,就不能改變,也稱為修飾的標識為常量。這就意味着我們刪除或者增加一個節點的時候,就必須從頭開始重新建立Hash鏈,因為next引用值需要改變。
由於 HashEntry 的 next 域為 final 型,所以新節點只能在鏈表的表頭處插入。 例如將A,B,C插入空桶中,插入后的結構為:
6、segment類
static final class Segment<K,V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; /** * 在本 segment 范圍內,包含的 HashEntry 元素的個數 * 該變量被聲明為 volatile 型,保證每次讀取到最新的數據 */ transient volatile int count; /** *table 被更新的次數 */ transient int modCount; /** * 當 table 中包含的 HashEntry 元素的個數超過本變量值時,觸發 table 的再散列 */ transient int threshold; /** * table 是由 HashEntry 對象組成的數組 * 如果散列時發生碰撞,碰撞的 HashEntry 對象就以鏈表的形式鏈接成一個鏈表 * table 數組的數組成員代表散列映射表的一個桶 * 每個 table 守護整個 ConcurrentHashMap 包含桶總數的一部分 * 如果並發級別為 16,table 則守護 ConcurrentHashMap 包含的桶總數的 1/16 */ transient volatile HashEntry<K,V>[] table; /** * 裝載因子 */ final float loadFactor; }
Segment 類繼承於 ReentrantLock 類,從而使得 Segment 對象能充當鎖的角色。每個 Segment 對象用來守護其(成員對象 table 中)包含的若干個桶。
table 是一個由 HashEntry 對象組成的數組。table 數組的每一個數組成員就是散列映射表的一個桶。
每一個 Segment 對象都有一個 count 對象來表示本 Segment 中包含的 HashEntry 對象的總數。
之所以在每個 Segment 對象中包含一個計數器,而不是在 ConcurrentHashMap 中使用全局的計數器,是為了避免出現“熱點域”而影響 ConcurrentHashMap 的並發性。
下圖是依次插入 ABC 三個 HashEntry 節點后,Segment 的結構示意圖。
7、ConcurrentHashMap 類
默認的情況下,每個ConcurrentHashMap 類會創建16個並發的segment,每個segment里面包含多個Hash表,每個Hash鏈都是有HashEntry節點組成的。
如果鍵能均勻散列,每個 Segment 大約守護整個散列表中桶總數的 1/16。
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable { /** * 散列映射表的默認初始容量為 16,即初始默認為 16 個桶 * 在構造函數中沒有指定這個參數時,使用本參數 */ static final int DEFAULT_INITIAL_CAPACITY= 16; /** * 散列映射表的默認裝載因子為 0.75,該值是 table 中包含的 HashEntry 元素的個數與 * table 數組長度的比值 * 當 table 中包含的 HashEntry 元素的個數超過了 table 數組的長度與裝載因子的乘積時, * 將觸發 再散列 * 在構造函數中沒有指定這個參數時,使用本參數 */ static final float DEFAULT_LOAD_FACTOR= 0.75f; /** * 散列表的默認並發級別為 16。該值表示當前更新線程的估計數 * 在構造函數中沒有指定這個參數時,使用本參數 */ static final int DEFAULT_CONCURRENCY_LEVEL= 16; /** * segments 的掩碼值 * key 的散列碼的高位用來選擇具體的 segment */ final int segmentMask; /** * 偏移量 */ final int segmentShift; /** * 由 Segment 對象組成的數組 */ final Segment<K,V>[] segments; /** * 創建一個帶有指定初始容量、加載因子和並發級別的新的空映射。 */ public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if(!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if(concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // 尋找最佳匹配參數(不小於給定參數的最接近的 2 次冪) int sshift = 0; int ssize = 1; while(ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } segmentShift = 32 - sshift; // 偏移量值 segmentMask = ssize - 1; // 掩碼值 this.segments = Segment.newArray(ssize); // 創建數組 if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; int c = initialCapacity / ssize; if(c * ssize < initialCapacity) ++c; int cap = 1; while(cap < c) cap <<= 1; // 依次遍歷每個數組元素 for(int i = 0; i < this.segments.length; ++i) // 初始化每個數組元素引用的 Segment 對象 this.segments[i] = new Segment<K,V>(cap, loadFactor); } /** * 創建一個帶有默認初始容量 (16)、默認加載因子 (0.75) 和 默認並發級別 (16) * 的空散列映射表。 */ public ConcurrentHashMap() { // 使用三個默認參數,調用上面重載的構造函數來創建空散列映射表 this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL); }
8、 用分離鎖實現多個線程間的並發寫操作
插入數據后的ConcurrentHashMap的存儲形式
(1)Put方法的實現
首先,根據 key 計算出對應的 hash 值:
public V put(K key, V value) { if (value == null) //ConcurrentHashMap 中不允許用 null 作為映射值 throw new NullPointerException(); int hash = hash(key.hashCode()); // 計算鍵對應的散列碼 // 根據散列碼找到對應的 Segment return segmentFor(hash).put(key, hash, value, false); } 根據 hash 值找到對應的 Segment: /** * 使用 key 的散列碼來得到 segments 數組中對應的 Segment */ final Segment<K,V> segmentFor(int hash) { // 將散列值右移 segmentShift 個位,並在高位填充 0 // 然后把得到的值與 segmentMask 相“與” // 從而得到 hash 值對應的 segments 數組的下標值 // 最后根據下標值返回散列碼對應的 Segment 對象 return segments[(hash >>> segmentShift) & segmentMask]; }
在這個 Segment 中執行具體的 put 操作:
V put(K key, int hash, V value, boolean onlyIfAbsent) { lock(); // 加鎖,這里是鎖定某個 Segment 對象而非整個 ConcurrentHashMap try { int c = count; if (c++ > threshold) // 如果超過再散列的閾值 rehash(); // 執行再散列,table 數組的長度將擴充一倍 HashEntry<K,V>[] tab = table; // 把散列碼值與 table 數組的長度減 1 的值相“與” // 得到該散列碼對應的 table 數組的下標值 int index = hash & (tab.length - 1); // 找到散列碼對應的具體的那個桶 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue; if (e != null) { // 如果鍵 / 值對以經存在 oldValue = e.value; if (!onlyIfAbsent) e.value = value; // 設置 value 值 } else { // 鍵 / 值對不存在 oldValue = null; ++modCount; // 要添加新節點到鏈表中,所以 modCont 要加 1 // 創建新節點,並添加到鏈表的頭部 tab[index] = new HashEntry<K,V>(key, hash, first, value); count = c; // 寫 count 變量 } return oldValue; } finally { unlock(); // 解鎖 } `}
這里的加鎖操作是針對(鍵的 hash 值對應的)某個具體的 Segment,鎖定的是該 Segment 而不是整個 ConcurrentHashMap。
因為插入鍵 / 值對操作只是在這個 Segment 包含的某個桶中完成,不需要鎖定整個ConcurrentHashMap。
此時,其他寫線程對另外 15 個Segment 的加鎖並不會因為當前線程對這個 Segment 的加鎖而阻塞。
同時,所有讀線程幾乎不會因本線程的加鎖而阻塞(除非讀線程剛好讀到這個 Segment 中某個 HashEntry 的 value 域的值為 null,此時需要加鎖后重新讀取該值)。
(2)Get方法的實現
V get(Object key, int hash) { if(count != 0) { // 首先讀 count 變量 HashEntry<K,V> e = getFirst(hash); while(e != null) { if(e.hash == hash && key.equals(e.key)) { V v = e.value; if(v != null) return v; // 如果讀到 value 域為 null,說明發生了重排序,加鎖后重新讀取 return readValueUnderLock(e); } e = e.next; } } return null; } V readValueUnderLock(HashEntry<K,V> e) { lock(); try { return e.value; } finally { unlock(); } }
ConcurrentHashMap中的讀方法不需要加鎖,所有的修改操作在進行結構修改時都會在最后一步寫count 變量,通過這種機制保證get操作能夠得到幾乎最新的結構更新。
(3)Remove方法的實現
V remove(Object key, int hash, Object value) { lock(); //加鎖 try{ int c = count - 1; HashEntry<K,V>[] tab = table; //根據散列碼找到 table 的下標值 int index = hash & (tab.length - 1); //找到散列碼對應的那個桶 HashEntry<K,V> first = tab[index]; HashEntry<K,V> e = first; while(e != null&& (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if(e != null) { V v = e.value; if(value == null|| value.equals(v)) { //找到要刪除的節點 oldValue = v; ++modCount; //所有處於待刪除節點之后的節點原樣保留在鏈表中 //所有處於待刪除節點之前的節點被克隆到新鏈表中 HashEntry<K,V> newFirst = e.next;// 待刪節點的后繼結點 for(HashEntry<K,V> p = first; p != e; p = p.next) newFirst = new HashEntry<K,V>(p.key, p.hash, newFirst, p.value); //把桶鏈接到新的頭結點 //新的頭結點是原鏈表中,刪除節點之前的那個節點 tab[index] = newFirst; count = c; //寫 count 變量 } } return oldValue; } finally{ unlock(); //解鎖 } }
整個操作是在持有段鎖的情況下執行的,空白行之前的行主要是定位到要刪除的節點e。
如果不存在這個節點就直接返回null,否則就要將e前面的結點復制一遍,尾結點指向e的下一個結點。
e后面的結點不需要復制,它們可以重用。
中間那個for循環是做什么用的呢?從代碼來看,就是將定位之后的所有entry克隆並拼回前面去,但有必要嗎?
每次刪除一個元素就要將那之前的元素克隆一遍?這點其實是由entry的不變性來決定的,仔細觀察entry定義,發現除了value,其他所有屬性都是用final來修飾的,
這意味着在第一次設置了next域之后便不能再改變它,取而代之的是將它之前的節點全都克隆一次。至於entry為什么要設置為不變性,這跟不變性的訪問不需要同步從而節省時間有關。
(4)containsKey方法的實現,它不需要讀取值。
boolean containsKey(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K,V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) return true; e = e.next; } } return false; }
(5)size()
我們要統計整個ConcurrentHashMap里元素的大小,就必須統計所有Segment里元素的大小后求和。
Segment里的全局變量count是一個volatile變量,那么在多線程場景下,我們是不是直接把所有Segment的count相加就可以得到整個ConcurrentHashMap大小了呢?
不是的,雖然相加時可以獲取每個Segment的count的最新值,但是拿到之后可能累加前使用的count發生了變化,那么統計結果就不准了。
所以最安全的做法,是在統計size的時候把所有Segment的put,remove和clean方法全部鎖住,但是這種做法顯然非常低效。
因為在累加count操作過程中,之前累加過的count發生變化的幾率非常小,所以ConcurrentHashMap的做法是先嘗試2次通過不鎖住Segment的方式來統計各個Segment大小,如果統計的過程中,容器的count發生了變化,則再采用加鎖的方式來統計所有Segment的大小。
那么ConcurrentHashMap是如何判斷在統計的時候容器是否發生了變化呢?使用modCount變量,在put , remove和clean方法里操作元素前都會將變量modCount進行加1,那么在統計size前后比較modCount是否發生變化,從而得知容器的大小是否發生變化。
9、總結
1.在使用鎖來協調多線程間並發訪問的模式下,減小對鎖的競爭可以有效提高並發性。
有兩種方式可以減小對鎖的競爭:
減小請求同一個鎖的頻率。
減少持有鎖的時間。
2.ConcurrentHashMap 的高並發性主要來自於三個方面:
用分離鎖實現多個線程間的更深層次的共享訪問。
用 HashEntery 對象的不變性來降低執行讀操作的線程在遍歷鏈表期間對加鎖的需求。
通過對同一個 Volatile 變量的寫 / 讀訪問,協調不同線程間讀 / 寫操作的內存可見性。
使用分離鎖,減小了請求同一個鎖的頻率。