一、HashTable
hashTable是一個線程安全的容器,是線程安全版本的HashMap。但它的底層是和HashMap一樣的,只是在方法上都加上了synchronized關鍵字。
這樣子有什么后果呢:
- 效率及低,意味着每個線程在執行HashTable的方法的時候,或者說操縱HashTable的時候,都要鎖住整個對象。也就是讓並行並發的訪問,變成了串行。
- 復合操作會有線程安全問題。因為它是每個方法都加鎖了,這意味着在執行單個方法像put,contains方法的時候,是可以保證原子性的,但如果是執行一個復合操作的時候,就不保證了。
if(!table.contains("key")) { map.put("key", object); }
類似於這樣的方法,當線程1在執行if里面的判斷的時候,線程1會獲得table實例的所,其他線程無法訪問table的其他同步方法。但當線程1判斷完if后,鎖會放掉,這個時候如果線程2進來,獲得table實例的鎖,然后put了一個”key“進來,然后再放鎖;那么線程1再執行put方法就不對了。(它本來是以為沒有這個key再put的)
二、concurrentHashMap1.7
並發思路
concurrenthashMap是采用一個叫做分段所的機制。
它可以看作是一個二重hashMap,首先concurrentHashMap是一個segment數組,每個segment都是一個繼承了ReentrantLock的類,這樣就可以方便地在各個segment里面加鎖所以每次需要加鎖的操作鎖住的是一個 segment,這樣只要保證每個 Segment 是線程安全的,也就實現了全局的線程安全。
哦哦還要注意,這個最外面的Segment[]數組,是不可以擴容的!
然后進到Segment內部,會發現,每個Segment可以看作一個HashMap。也就是在一個Segment里面,有個HashEntry[]數組,然后這個數組是一個個桶,桶里面是單向鏈表。
(圖片來自:http://www.importnew.com/28263.html)
構造函數
然后我們通過構造函數進入,順便了解ConcurrentHashMap中重要的field吧。
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) { if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) throw new IllegalArgumentException(); if (concurrencyLevel > MAX_SEGMENTS) concurrencyLevel = MAX_SEGMENTS; // Find power-of-two sizes best matching arguments int sshift = 0; int ssize = 1; // 計算並行級別 ssize,因為要保持並行級別是 2 的 n 次方 while (ssize < concurrencyLevel) { ++sshift; ssize <<= 1; } // 我們這里先不要那么燒腦,用默認值,concurrencyLevel 為 16,sshift 為 4 // 那么計算出 segmentShift 為 28,segmentMask 為 15,后面會用到這兩個值 this.segmentShift = 32 - sshift; this.segmentMask = ssize - 1; if (initialCapacity > MAXIMUM_CAPACITY) initialCapacity = MAXIMUM_CAPACITY; // initialCapacity 是設置整個 map 初始的大小, // 這里根據 initialCapacity 計算 Segment 數組中每個位置可以分到的大小 // 如 initialCapacity 為 64,那么每個 Segment 或稱之為"槽"可以分到 4 個 int c = initialCapacity / ssize; if (c * ssize < initialCapacity) ++c; // 默認 MIN_SEGMENT_TABLE_CAPACITY 是 2,這個值也是有講究的,因為這樣的話,對於具體的槽上, // 插入一個元素不至於擴容,插入第二個的時候才會擴容 int cap = MIN_SEGMENT_TABLE_CAPACITY; while (cap < c) cap <<= 1; // 創建 Segment 數組, // 並創建數組的第一個元素 segment[0] Segment<K,V> s0 = new Segment<K,V>(loadFactor, (int)(cap * loadFactor), (HashEntry<K,V>[])new HashEntry[cap]); Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize]; // 往數組寫入 segment[0] UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0] this.segments = ss; }
initialCapacity和以前一樣,指的是這個ConcurrenthashMap的初始容量,或者說是理解成初始桶的數量。但我們這個hashmap是有兩重表的嘛,所以在實際操作的時候會把這個值分配給各個Segment,也就相當於間接指定了每個Segment中應該有幾個桶。
loadFactor和一般的hashTable一樣,負載因子,size/capacity。但上面說了Segment數組是不可以擴容的,所以這個也是給Segment里面的數組用的。
concurrencyLevel:concurrencyLevel:並行級別、並發數、Segment 數,怎么翻譯不重要,理解它。默認是 16,也就是說 ConcurrentHashMap 有 16 個 Segments,所以理論上,這個時候,最多可以同時支持 16 個線程並發寫,只要它們的操作分別分布在不同的 Segment 上。這個值可以在初始化的時候設置為其他值,但是一旦初始化以后,它是不可以擴容的。
segmentShift:
這個值=32 - shift,shift是你>=你傳進來的concurrentLevel的一個2次冪數的左移位數。而二次冪的數字,都是10000這樣的嘛,所以shift就是10000中0的個數。
所以field segmentShift我覺得可以理解成000000100000(32位數字),然后是前面的0加上1的位數就是segmentShift吧。
SegmetnMask:
掩碼嘛,就二次冪處理后的concurrentLevel的長度 - 1,得到的就類似0111111這樣咯,所以等等用來做與操作用的。
然后最后那個Unsafe的putOrderObject一個不安全的直接操縱內存的方法,應該是因為這樣會快點吧。這個order應該是防止指令重排序的意思。
要了解Unsafe可以看這篇文章:https://www.cnblogs.com/throwable/p/9139947.html
如果我們就當是用 new ConcurrentHashMap() 無參構造函數進行初始化的,那么初始化完成后:
Segment 數組長度為 16,不可以擴容
Segment[i] 的默認大小為 2,負載因子是 0.75,得出初始閾值為 1.5,也就是以后插入第一個元素不會觸發擴容,插入第二個會進行第一次擴容
這里初始化了 segment[0],其他位置還是 null,至於為什么要初始化 segment[0],后面的代碼會介紹
當前 segmentShift 的值為 32 – 4 = 28,segmentMask 為 16 – 1 = 15,姑且把它們簡單翻譯為移位數和掩碼,這兩個值馬上就會用到
put方法
然后來看重要的put方法。
先看put的主流程:
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); // 1. 計算 key 的 hash 值 int hash = hash(key); // 2. 根據 hash 值找到 Segment 數組中的位置 j // hash 是 32 位,無符號右移 segmentShift(28) 位,剩下低 4 位, // 然后和 segmentMask(15) 做一次與操作,也就是說 j 是 hash 值的最后 4 位,也就是槽的數組下標 int j = (hash >>> segmentShift) & segmentMask; // 剛剛說了,初始化的時候初始化了 segment[0],但是其他位置還是 null, // ensureSegment(j) 對 segment[j] 進行初始化 if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); // 3. 插入新值到 槽 s 中 return s.put(key, hash, value, false); }
這里的主流程是在第一層表格的操作。就根據key的hash值,找到Segment[]數組的桶序號,然后先初始化這個segment[j](構造器中只初始化了segment[0]),然后進入這個segmetn[j],交給這個segment[j](局部HashMap)繼續執行put操作。
求j的時候,hash值移了segmentShift后,剛好只剩后面四位(默認情況的話),剛好等於segmentMask15(4位)的位數,然后再相與就得到一個序號咯。
然后就通過s.put(key, hash, value, false);進入Segment內部的那個局部Hashmap的put方法。、
先看看這個初始化segment[j]的方法。
ensureSegment(j):
private Segment<K,V> ensureSegment(int k) { final Segment<K,V>[] ss = this.segments; long u = (k << SSHIFT) + SBASE; // raw offset Segment<K,V> seg; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 這里看到為什么之前要初始化 segment[0] 了, // 使用當前 segment[0] 處的數組長度和負載因子來初始化 segment[k] // 為什么要用“當前”,因為 segment[0] 可能早就擴容過了 Segment<K,V> proto = ss[0]; int cap = proto.table.length; float lf = proto.loadFactor; int threshold = (int)(cap * lf); // 初始化 segment[k] 內部的數組 HashEntry<K,V>[] tab = (HashEntry<K,V>[])new HashEntry[cap]; if ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { // 再次檢查一遍該槽是否被其他線程初始化了。 Segment<K,V> s = new Segment<K,V>(lf, threshold, tab); // 使用 while 循環,內部用 CAS,當前線程成功設值或其他線程成功設值后,退出 while ((seg = (Segment<K,V>)UNSAFE.getObjectVolatile(ss, u)) == null) { if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) break; } } } return seg; }
這里需要考慮並發,因為很可能會有多個線程同時進來初始化同一個槽 segment[k],不過只要有一個成功了就可以。
這里就用構造其中已經初始化好了的segment[0](也可能已經有元素了)的數據來構造segment[j]咯,然后再用自旋的CAS操作來更新segment數組中的j桶,更新成功或者是有別的線程更新成功都會跳出循環。
再來看segment里面的局部HashMap的put方法。
Segment里面的hashMap的put方法:

final V put(K key, int hash, V value, boolean onlyIfAbsent) { // 在往該 segment 寫入前,需要先獲取該 segment 的獨占鎖 // 先看主流程,后面還會具體介紹這部分內容 HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { // 這個是 segment 內部的數組 HashEntry<K,V>[] tab = table; // 再利用 hash 值,求應該放置的數組下標 int index = (tab.length - 1) & hash; // first 是數組該位置處的鏈表的表頭 HashEntry<K,V> first = entryAt(tab, index); // 下面這串 for 循環雖然很長,不過也很好理解,想想該位置沒有任何元素和已經存在一個鏈表這兩種情況 for (HashEntry<K,V> e = first;;) { if (e != null) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { // 覆蓋舊值 e.value = value; ++modCount; } break; } // 繼續順着鏈表走 e = e.next; } else { // node 到底是不是 null,這個要看獲取鎖的過程,不過和這里都沒有關系。 // 如果不為 null,那就直接將它設置為鏈表表頭;如果是null,初始化並設置為鏈表表頭。 if (node != null) node.setNext(first); else node = new HashEntry<K,V>(hash, key, value, first); int c = count + 1; // 如果超過了該 segment 的閾值,這個 segment 需要擴容 if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); // 擴容后面也會具體分析 else // 沒有達到閾值,將 node 放到數組 tab 的 index 位置, // 其實就是將新的節點設置成原鏈表的表頭 setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null; break; } } } finally { // 解鎖 unlock(); } return oldValue; }
我們可以看到,代碼一開始就先去獲得所在Segment的鎖:
HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
首先,這個tryLock()是個快速獲得鎖的方法,獲得的話就返回ture,那么node就賦值為null。
如果沒獲得鎖的話,說明存在競爭,那么就進入scanAndLockForPut方法。這個方法的話其實也就是不斷去嘗試獲得這個Segment的鎖,里面還有可能順便初始化下這個node元素。(就可能順便構造下你要插入的那個鍵值對的node)
這個scanAndLockForPut的方法等等下面才去分析,這兩行代碼的結果就是——獲得了segment的鎖,然后可能初始化了node也可能沒有。(看下面代碼會知道node有沒有初始化沒所謂的)
然后就正常的put操作了。這里是帶鎖了的,所以不用怕其他的寫操作會影響。
可以看到,node為空它就new一個相關Entry,不為空就直接頭插入,所以是不是null不影響代碼邏輯。
關於這個setEntryAt方法,可以簡單看看它的代碼:
static final <K,V> void setEntryAt(HashEntry<K,V>[] tab, int i, HashEntry<K,V> e) { UNSAFE.putOrderedObject(tab, ((long)i << TSHIFT) + TBASE, e); }
也是用了個防止重排序的方法,再加上本來Segment里面的table還有Entry里面重要的相關指針都是volatile的,所以可以讓讀操作也安全。
然后就來看這個scanAndLockForPut了:

private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) { HashEntry<K,V> first = entryForHash(this, hash); HashEntry<K,V> e = first; HashEntry<K,V> node = null; int retries = -1; // negative while locating node // 循環獲取鎖 while (!tryLock()) { HashEntry<K,V> f; // to recheck first below if (retries < 0) { if (e == null) { if (node == null) // speculatively create node // 進到這里說明數組該位置的鏈表是空的,沒有任何元素 // 當然,進到這里的另一個原因是 tryLock() 失敗,所以該槽存在並發,不一定是該位置 node = new HashEntry<K,V>(hash, key, value, null); retries = 0; } else if (key.equals(e.key)) retries = 0; else // 順着鏈表往下走 e = e.next; } // 重試次數如果超過 MAX_SCAN_RETRIES(單核1多核64),那么不搶了,進入到阻塞隊列等待鎖 // lock() 是阻塞方法,直到獲取鎖后返回 else if (++retries > MAX_SCAN_RETRIES) { lock(); break; } else if ((retries & 1) == 0 && // 這個時候是有大問題了,那就是有新的元素進到了鏈表,成為了新的表頭 // 所以這邊的策略是,相當於重新走一遍這個 scanAndLockForPut 方法 (f = entryForHash(this, hash)) != first) { e = first = f; // re-traverse if entry changed retries = -1; } } return node;
代碼全在一個不斷嘗試拿鎖的while循環里進行,代碼邏輯大概是用這個retries來控制流程。
當這個retries<0的時候,也就是初始情況,這里做的是遍歷這個桶的鏈表,看看有沒這個要put的key的entry,如果有的話就停下來,retries置為0,沒有的話順便new一個node,然后retries置為0。
如果retries嘗試的次數太大了,就會lock(),這個方法是堵塞鎖,類似synchronized(解鎖在put方法中),直到拿到鎖才break。
最后一個情況大概是發生了沖突了,就重新走一次這個方法。
這個方法有兩個出口,一個是 tryLock() 成功了,循環終止,另一個就是重試次數超過了 MAX_SCAN_RETRIES,進到 lock() 方法,此方法會阻塞等待,直到成功拿到獨占鎖。
這個方法就是看似復雜,但是其實就是做了一件事,那就是獲取該 segment 的獨占鎖,如果需要的話順便實例化了一下 node。
擴容方法rehash
重復一下,segment 數組不能擴容,擴容是 segment 數組某個位置內部的數組 HashEntry\[] 進行擴容,擴容后,容量為原來的 2 倍。
首先,我們要回顧一下觸發擴容的地方,put 的時候,如果判斷該值的插入會導致該 segment 的元素個數超過閾值,那么先進行擴容,再插值,讀者這個時候可以回去 put 方法看一眼。
該方法不需要考慮並發,因為到這里的時候,是持有該 segment 的獨占鎖的。
看代碼:

// 方法參數上的 node 是這次擴容后,需要添加到新的數組中的數據。 private void rehash(HashEntry<K,V> node) { HashEntry<K,V>[] oldTable = table; int oldCapacity = oldTable.length; // 2 倍 int newCapacity = oldCapacity << 1; threshold = (int)(newCapacity * loadFactor); // 創建新數組 HashEntry<K,V>[] newTable = (HashEntry<K,V>[]) new HashEntry[newCapacity]; // 新的掩碼,如從 16 擴容到 32,那么 sizeMask 為 31,對應二進制 ‘000...00011111’ int sizeMask = newCapacity - 1; // 遍歷原數組,老套路,將原數組位置 i 處的鏈表拆分到 新數組位置 i 和 i+oldCap 兩個位置 for (int i = 0; i < oldCapacity ; i++) { // e 是鏈表的第一個元素 HashEntry<K,V> e = oldTable[i]; if (e != null) { HashEntry<K,V> next = e.next; // 計算應該放置在新數組中的位置, // 假設原數組長度為 16,e 在 oldTable[3] 處,那么 idx 只可能是 3 或者是 3 + 16 = 19 int idx = e.hash & sizeMask; if (next == null) // 該位置處只有一個元素,那比較好辦 newTable[idx] = e; else { // Reuse consecutive sequence at same slot // e 是鏈表表頭 HashEntry<K,V> lastRun = e; // idx 是當前鏈表的頭結點 e 的新位置 int lastIdx = idx; // 下面這個 for 循環會找到一個 lastRun 節點,這個節點之后的所有元素是將要放到一起的 for (HashEntry<K,V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } // 將 lastRun 及其之后的所有節點組成的這個鏈表放到 lastIdx 這個位置 newTable[lastIdx] = lastRun; // 下面的操作是處理 lastRun 之前的節點, // 這些節點可能分配在另一個鏈表中,也可能分配到上面的那個鏈表中 for (HashEntry<K,V> p = e; p != lastRun; p = p.next) { V v = p.value; int h = p.hash; int k = h & sizeMask; HashEntry<K,V> n = newTable[k]; newTable[k] = new HashEntry<K,V>(h, p.key, v, n); } } } } // 將新來的 node 放到新數組中剛剛的 兩個鏈表之一 的 頭部 int nodeIndex = node.hash & sizeMask; // add the new node node.setNext(newTable[nodeIndex]); newTable[nodeIndex] = node; table = newTable; }
找桶用的是直接重新和新的capacity - 1的值與的方法。
轉移操作用了個改良的算法吧,沒怎么認真看,直接上參考文章的分析:
這里的擴容比之前的 HashMap 要復雜一些,代碼難懂一點。上面有兩個挨着的 for 循環,第一個 for 有什么用呢?
仔細一看發現,如果沒有第一個 for 循環,也是可以工作的,但是,這個 for 循環下來,如果 lastRun 的后面還有比較多的節點,那么這次就是值得的。因為我們只需要克隆 lastRun 前面的節點,后面的一串節點跟着 lastRun 走就是了,不需要做任何操作。
我覺得 Doug Lea 的這個想法也是挺有意思的,不過比較壞的情況就是每次 lastRun 都是鏈表的最后一個元素或者很靠后的元素,那么這次遍歷就有點浪費了。不過 Doug Lea 也說了,根據統計,如果使用默認的閾值,大約只有 1/6 的節點需要克隆。
get過程
相對於 put 來說,get 真的不要太簡單。
計算 hash 值,找到 segment 數組中的具體位置,或我們前面用的“槽”
槽中也是一個數組,根據 hash 找到數組中具體的位置
到這里是鏈表了,順着鏈表進行查找即可

public V get(Object key) { Segment<K,V> s; // manually integrate access methods to reduce overhead HashEntry<K,V>[] tab; // 1. hash 值 int h = hash(key); long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE; // 2. 根據 hash 找到對應的 segment if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) { // 3. 找到segment 內部數組相應位置的鏈表,遍歷 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE); e != null; e = e.next) { K k; if ((k = e.key) == key || (e.hash == h && key.equals(k))) return e.value; } } return null; }
這里用了個getObjectVolatile來保證讀的可見性。
並發問題分析
現在我們已經說完了 put 過程和 get 過程,我們可以看到 get 過程中是沒有加鎖的,那自然我們就需要去考慮並發問題。
添加節點的操作 put 和刪除節點的操作 remove 都是要加 segment 上的獨占鎖的,所以它們之間自然不會有問題,我們需要考慮的問題就是 get 的時候在同一個 segment 中發生了 put 或 remove 操作。
1. put 操作的線程安全性。
- 初始化槽,這個我們之前就說過了,使用了 CAS 來初始化 Segment 中的數組。
- 添加節點到鏈表的操作是插入到表頭的,所以,如果這個時候 get 操作在鏈表遍歷的過程已經到了中間,是不會影響的。當然,另一個並發問題就是 get 操作在 put 之后,需要保證剛剛插入表頭的節點被讀取,這個依賴於 setEntryAt 方法中使用的 UNSAFE.putOrderedObject還有相關的volatile的field。
- 擴容。擴容是新創建了數組,然后進行遷移數據,最后面將 newTable 設置給屬性 table。所以,如果 get 操作此時也在進行,那么也沒關系,如果 get 先行,那么就是在舊的 table 上做查詢操作;而 put 先行,那么 put 操作的可見性保證就是 table 使用了 volatile 關鍵字。
2. remove 操作的線程安全性。
remove 操作我們沒有分析源碼,所以這里說的讀者感興趣的話還是需要到源碼中去求實一下的。
get 操作需要遍歷鏈表,但是 remove 操作會”破壞”鏈表。
如果 remove 破壞的節點 get 操作已經過去了,那么這里不存在任何問題。
如果 remove 先破壞了一個節點,分兩種情況考慮。 1、如果此節點是頭結點,那么需要將頭結點的 next 設置為數組該位置的元素,table 雖然使用了 volatile 修飾,但是 volatile 並不能提供數組內部操作的可見性保證,所以源碼中使用了 UNSAFE 來操作數組,請看方法 setEntryAt。2、如果要刪除的節點不是頭結點,它會將要刪除節點的后繼節點接到前驅節點中,這里的並發保證就是 next 屬性是 volatile 的。
總之,首先segment分段所保證了單一的寫、刪是無並發危險的。
然后讀和這些更改性的操作呢,首先是通過table的volatile,然后涉及table中桶元素的替換(要訪問桶的第一個元素的時候),就利用setEntryAt方法中的unsafe的保證有序性的方法。
涉及讀非頭節點的,除了以上措施還有就是Entry中的next屬性也是volatile的。
后期補的:
因為這個table雖然說是volatile的,但它里面的元素是不能有volatile的效果的:Java數組在元素層面的元數據設計上的缺失,無法表達元素是final、volatile等語義,數組元素就跟沒有標volatile的成員字段一樣,無法保證線程之間可見性。
所以,我們看到,這里關於在數組里面取元素的操作,都用的是getObjectVolatile之類的,通過這個來彌補數組的這個不足;
然后我看到好多關於set的方法,用的是putOrderedObject,大概是因為這個只是保證禁止指令重排序,開銷比putVolatile版本的小吧,而且應該是這里只需要禁止重排序就可以保證並發安全了吧。畢竟有segment鎖,而且get方法又有getObjectVolatile
三、ConcurrentHashMap——1.8
1.8的concurrentHashMap真心難懂,特別是擴容還有轉移方法……
這個版本的hashMap摒棄了Segment的概念,主要是采用CAS算法,底層用的是和1.8的HashMap一樣的數組+鏈表+紅黑樹的實現。
emmm關於紅黑樹就不在這里講了
先講幾個重要的屬性和需要了解的東西
sizeCtl
這個是在ConcurrenthashMap中很重要的一個field,它在流程控制和邏輯代碼上起着重要的作用。
-
不同狀態,sizeCtl所代表的含義也有所不同。
- 未初始化:
- sizeCtl=0:表示沒有指定初始容量。
- sizeCtl>0:表示初始容量。
-
初始化中:
- sizeCtl=-1,標記作用,告知其他線程,正在初始化
-
正常狀態:
- sizeCtl=0.75n ,擴容閾值
-
擴容中:
- sizeCtl < 0 : 表示有其他線程正在執行擴容
- sizeCtl = (resizeStamp(n) << RESIZE_STAMP_SHIFT) + 2 :表示此時只有一個線程在執行擴容
- 未初始化:
ForwardingNode
一個用於連接兩個table的節點類。它包含一個nextTable指針,用於指向下一張表。而且這個節點的key value next指針全部為null,它的hash值為-1. 這里面定義的find的方法是從nextTable里進行查詢節點,而不是以自身為頭節點進行查找。
大量的Unsafe和CAS
在ConcurrentHashMap中,隨處可以看到U, 大量使用了U.compareAndSwapXXX的方法,這個方法是利用一個CAS算法實現無鎖化的修改值的操作(利用Unsafe來獲得底層的相關支持CAS的方法),他可以大大降低鎖代理的性能消耗。這個算法的基本思想就是不斷地去比較當前內存中的變量值與你指定的一個變量值是否相等,如果相等,則接受你指定的修改的值,否則拒絕你的操作。因為當前線程中的值已經不是最新的值,你的修改很可能會覆蓋掉其他線程修改的結果。這一點與樂觀鎖,SVN的思想是比較類似的。
unsafe靜態塊
unsafe代碼塊控制了一些屬性的修改工作,比如最常用的SIZECTL 。在這一版本的concurrentHashMap中,大量應用來的CAS方法進行變量、屬性的修改工作。利用CAS進行無鎖操作,可以大大提高性能。

private static final sun.misc.Unsafe U; private static final long SIZECTL; private static final long TRANSFERINDEX; private static final long BASECOUNT; private static final long CELLSBUSY; private static final long CELLVALUE; private static final long ABASE; private static final int ASHIFT; static { try { U = sun.misc.Unsafe.getUnsafe(); Class<?> k = ConcurrentHashMap.class; SIZECTL = U.objectFieldOffset (k.getDeclaredField("sizeCtl")); TRANSFERINDEX = U.objectFieldOffset (k.getDeclaredField("transferIndex")); BASECOUNT = U.objectFieldOffset (k.getDeclaredField("baseCount")); CELLSBUSY = U.objectFieldOffset (k.getDeclaredField("cellsBusy")); Class<?> ck = CounterCell.class; CELLVALUE = U.objectFieldOffset (ck.getDeclaredField("value")); Class<?> ak = Node[].class; ABASE = U.arrayBaseOffset(ak); int scale = U.arrayIndexScale(ak); if ((scale & (scale - 1)) != 0) throw new Error("data type scale not a power of two"); ASHIFT = 31 - Integer.numberOfLeadingZeros(scale); } catch (Exception e) { throw new Error(e); } }
三個關於table中桶的相關操作的核心方法
ConcurrentHashMap定義了三個原子操作,用於對指定位置的節點進行操作。正是這些原子操作保證了ConcurrentHashMap的線程安全。

//獲得在i位置上的Node節點 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } //利用CAS算法設置i位置上的Node節點。之所以能實現並發是因為他指定了原來這個節點的值是多少 //在CAS算法中,會比較內存中的值與你指定的這個值是否相等,如果相等才接受你的修改,否則拒絕你的修改 //因此當前線程中的值並不是最新的值,這種修改可能會覆蓋掉其他線程的修改結果 有點類似於SVN static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } //利用volatile方法設置節點位置的值 static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
從構造函數的初始化開始看吧
// 這構造函數里,什么都不干 public ConcurrentHashMap() { } public ConcurrentHashMap(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; }
這個初始化方法有點意思,通過提供初始容量,計算了 sizeCtl,sizeCtl = 【 (1.5 * initialCapacity + 1),然后向上取最近的 2 的 n 次方】。如 initialCapacity 為 10,那么得到 sizeCtl 為 16,如果 initialCapacity 為 11,得到 sizeCtl 為 32。
emm現在這個sizeCtl好像沖當一個capacity的角色,在put方法中,initial表格的時候好像又會把這個sizeCtl變成類似一個threshold的角色,給擴容的時候做判斷。
然后直接通過put方法來更深入吧
put:

public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException(); // 得到 hash 值 int hash = spread(key.hashCode()); // 用於記錄相應鏈表的長度 int binCount = 0; for (Node<K,V>[] tab = table;;) { Node<K,V> f; int n, i, fh; // 如果數組"空",進行數組初始化 if (tab == null || (n = tab.length) == 0) // 初始化數組,后面會詳細介紹 tab = initTable(); // 找該 hash 值對應的數組下標,得到第一個節點 f else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果數組該位置為空, // 用一次 CAS 操作將這個新值放入其中即可,這個 put 操作差不多就結束了,可以拉到最后面了 // 如果 CAS 失敗,那就是有並發操作,進到下一個循環就好了 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin } // hash 居然可以等於 MOVED,這個需要到后面才能看明白,不過從名字上也能猜到,肯定是因為在擴容 else if ((fh = f.hash) == MOVED) // 幫助數據遷移,這個等到看完數據遷移部分的介紹后,再理解這個就很簡單了 tab = helpTransfer(tab, f); else { // 到這里就是說,f 是該位置的頭結點,而且不為空 V oldVal = null; // 獲取數組該位置的頭結點的監視器鎖 synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0) { // 頭結點的 hash 值大於 0,說明是鏈表 // 用於累加,記錄鏈表的長度 binCount = 1; // 遍歷鏈表 for (Node<K,V> e = f;; ++binCount) { K ek; // 如果發現了"相等"的 key,判斷是否要進行值覆蓋,然后也就可以 break 了 if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break; } // 到了鏈表的最末端,將這個新值放到鏈表的最后面 Node<K,V> pred = e; if ((e = e.next) == null) { pred.next = new Node<K,V>(hash, key, value, null); break; } } } else if (f instanceof TreeBin) { // 紅黑樹 Node<K,V> p; binCount = 2; // 調用紅黑樹的插值方法插入新節點 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } // binCount != 0 說明上面在做鏈表操作 if (binCount != 0) { // 判斷是否要將鏈表轉換為紅黑樹,臨界值和 HashMap 一樣,也是 8 if (binCount >= TREEIFY_THRESHOLD) // 這個方法和 HashMap 中稍微有一點點不同,那就是它不是一定會進行紅黑樹轉換, // 如果當前數組的長度小於 64,那么會選擇進行數組擴容,而不是轉換為紅黑樹 // 具體源碼我們就不看了,擴容部分后面說 treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } // addCount(1L, binCount); return null; }
put是在一個for的無限循環中進行的,大概分為這幾個情況:
1. 表格為空,初始化先。
2. 桶中沒有東西,那就直接用CAS來為桶中的頭指針設值。
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { // 如果數組該位置為空, // 用一次 CAS 操作將這個新值放入其中即可,這個 put 操作差不多就結束了,可以拉到最后面了 // 如果 CAS 失敗,那就是有並發操作,進到下一個循環就好了 if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null))) break; // no lock when adding to empty bin }
3. else if ((fh = f.hash) == MOVED),就幫助數據遷移。
4. 桶中不為空,且不是處於遷移狀態,那么就用synchronized來進行put操作。這里面包括找到key的替換操作;沒找到key的鏈表尾插入操作;數量達到閾值的樹化操作;本來就是樹結點交給樹來做put操作
put完后,出去循環,然后addCount方法,這個方法也是有點煩的,大概做了兩件事:
-
對 table 的長度加一。無論是通過修改 baseCount,還是通過使用 CounterCell。當 CounterCell 被初始化了,就優先使用他,不再使用 baseCount。
-
檢查是否需要擴容,或者是否正在擴容。如果需要擴容,就調用擴容方法,如果正在擴容,就幫助其擴容。
下面來慢慢看這些方法。

private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { // 初始化的"功勞"被其他線程"搶去"了 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin // CAS 一下,將 sizeCtl 設置為 -1,代表搶到了鎖 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { try { if ((tab = table) == null || tab.length == 0) { // DEFAULT_CAPACITY 默認初始容量是 16 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; // 初始化數組,長度為 16 或初始化時提供的長度 Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; // 將這個數組賦值給 table,table 是 volatile 的 table = tab = nt; // 如果 n 為 16 的話,那么這里 sc = 12 // 其實就是 0.75 * n sc = n - (n >>> 2); } } finally { // 設置 sizeCtl 為 sc,我們就當是 12 吧 sizeCtl = sc; } break; } } return tab; }
這個比較簡單,主要就是初始化一個合適大小的數組,然后會設置 sizeCtl,就是讓sizeCtl變成一個類似threshold的角色,給擴容的時候做判斷用的。
初始化方法中的並發問題是通過對 sizeCtl 進行一個 CAS 操作來控制的。
然后就是關於擴容、transfer的操作了,這個是真滴難懂,代碼還長,我暫時先記一下它的思路吧。
關於擴容的思路(來自:https://blog.csdn.net/varyall/article/details/81283231)
jdk8中,采用多線程擴容。整個擴容過程,通過CAS設置sizeCtl,transferIndex等變量協調多個線程進行並發擴容。
先介紹幾個相關的field:
nextTable:擴容時,把table中的元素遷移至nextTable,擴容時非空。
sizeCtl:上面講了,這里放個圖:
transferIndex:擴容索引,表示已經分配給擴容線程的table數組索引位置。主要用來協調多個線程,並發安全地獲取遷移任務(hash桶)。
其實說白了,就是多個線程來一起擴容,每個線程要做的,就是把當前transferIndex到transferIndex - stride個桶的數據遷移到新table中去。(transferIndex一開始在數組尾巴那,往前挪)
看兩個圖理解下:
1 在擴容之前,transferIndex 在數組的最右邊 。此時有一個線程發現已經到達擴容閾值,准備開始擴容。
2 擴容線程,在遷移數據之前,首先要將transferIndex左移(以cas的方式修改transferIndex=transferIndex-stride(要遷移hash桶的個數)),獲取遷移任務。每個擴容線程都會通過for循環+CAS的方式設置transferIndex,因此可以確保多線程擴容的並發安全。
forwardingNode:擴容索引,表示已經分配給擴容線程的table數組索引位置。主要用來協調多個線程,並發安全地獲取遷移任務(hash桶)。簡單地說,遷移完的桶就會被設為這個結點,這個結點的hash值是-1,也就是常量MOVED的值。
看看擴容過程:
- 線程執行Put,發現容量要擴容了,這個時候的transferIndex = table.length = 32。
- 擴容線程A 以cas的方式修改transferindex=31-16=16 ,然后按照降序遷移table[31]--table[16]這個區間的hash桶。
- 遷移hash桶時,會將桶內的鏈表或者紅黑樹,按照一定算法,拆分成2份,將其插入nextTable[i]和nextTable[i+n](n是table數組的長度)。 遷移完畢的hash桶,會被設置成ForwardingNode節點,以此告知訪問此桶的其他線程,此節點已經遷移完畢。
- 此時,線程2訪問到了ForwardingNode節點,如果線程2執行的put或remove等寫操作,那么就會先幫其擴容。如果線程2執行的是get等讀方法,則會調用ForwardingNode的find方法,去nextTable里面查找相關元素。
5. 如果准備加入擴容的線程,發現以下情況,放棄擴容,直接返回。
-
- 發現transferIndex=0,即所有node均已分配
- 發現擴容線程已經達到最大擴容線程數
總之擴容就是:多線程無鎖擴容的關鍵就是通過CAS設置sizeCtl與transferIndex變量,協調多個線程對table數組中的node進行遷移。
get的分析

public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { // 判斷頭結點是否就是我們需要的節點 if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } // 如果頭結點的 hash 小於 0,說明 正在擴容,或者該位置是紅黑樹 else if (eh < 0) // 參考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k) return (p = e.find(h, key)) != null ? p.val : null; // 遍歷鏈表 while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
get 方法從來都是最簡單的,這里也不例外:
- 計算 hash 值
- 根據 hash 值找到數組對應位置: (n – 1) & h
- 根據該位置處結點性質進行相應查找
- 如果該位置為 null,那么直接返回 null 就可以了
- 如果該位置處的節點剛好就是我們需要的,返回該節點的值即可
- 如果該位置節點的 hash 值小於 0,說明正在擴容(-1),或者是紅黑樹(-2),就用find來解決get獲取。(擴容的話如果為-1,說明結點轉移了,就去nextTable里面去get)
- 如果以上 3 條都不滿足,那就是鏈表,進行遍歷比對即可
我自己對1.8的concurrentHashMap的並發分析:
1. 涉及寫的方面
- 如果寫的時候發現,table都還是null,就要初始化,這個初始化顯然要考慮並發,這里的並發主要通過對sizeCtl的CAS操作還有table的volatile來實現。擴容是個While循環,第一時間會判斷sizeCtl是不是小於0,是的話說明有線程在初始化這個table了,就yield;否則就可以進行初始化操作,第一步就CAS改sizeCtl為-1,如果失敗重新進入循環,成功的話就搞一個table賦值給類變量table咯,這個類變量table是volatile所以可以保證其他線程的可見性。
- 如果table中某個桶為空,就直接用cas把新結點設為桶的那個首節點,失敗的話會結合外層循環形成自旋CAS。我們知道table雖然是volatile,但table中的元素的寫操作並不具有volatile特性,所以這里是通過CAS操作來解決這個問題的。
- 如果table中某個桶不為空,也就是下面是個鏈表或者是個紅黑樹,這個時候就要synchronized(桶的頭節點)了,這是put操作唯一一個用到鎖的地方,可見對比1.7好了很多,因為鎖的顆粒度從segment編導是一個bucket的位置,也就是說桶下面的東西的寫操作直接是肯定不會有沖突的了。
- 如果需要擴容,則通過CAS設置sizeCtl與transferIndex變量,協調多個線程對table數組中的node進行遷移。
2. 涉及讀方面的操作
讀我們可以看到是完全沒有鎖的,get可以大致分為三種情況,1. 桶的頭節點直接就是那個想要的key,直接就返回這個node的val;2. 如果桶元素的hash值小於0,可能是紅黑樹可能是forwardingNode,交給這個node的find方法,可能去新的table中get,也可能交給紅黑樹get;3. 都不是,就在這個桶下面的鏈表中遍歷。
- 第一種情況,直接讀桶,也就是讀Node[] table的某個元素,這個為什么可以不加鎖??如果讀的時候有人在put怎么辦?這里我覺得主要是通過這個tabAt()方法來解決的,我們可以看到tabAt方法是用getObjectVolatile來實現的,所以可以保證讀對寫的可見性吧。
- 第二種情況,另一個table中find一樣的吧,紅黑樹不做分析……
- 第三種情況,遍歷的時候,可能涉及到鏈表的增長,刪除什么的;node結點元素的變化。鏈表變化,其實就next指針的變化,但next指針是volatile的,所以對寫操作是具有可見性的;然后node元素變化?也不怕,因為node的val也是volatile的,在多線程環境下線程A修改結點的val或者新增節點的時候是對線程B可見的。
四、參考文章:
http://www.importnew.com/28263.html——《Java7/8 中的 HashMap 和 ConcurrentHashMap 全解析》本文主要是根據這篇的主線來轉載的。
https://www.cnblogs.com/throwable/p/9139947.html——《JAVA中神奇的雙刃劍--Unsafe》
https://www.cnblogs.com/seyer/p/5819904.html——網上很多講解都這個版本
https://blog.csdn.net/varyall/article/details/81283231——《ConcurrentHashMap源碼分析(JDK8) 擴容實現機制》這篇講擴容機制的,思路講得很清晰了,也有關鍵源碼的分析。