目錄
二.源碼分析-類定義
2.2 Segment內部類
2.3 HashEntry內部類
三.常用接口源碼分析
3.2 map.put操作
3.3 創建新segment
3.4 segment.put操作
3.5 segment.rehash擴容
3.6 map.get操作
3.7 map.remove操作
3.8 map.size操作
原文地址:https://www.cnblogs.com/-beyond/p/13157083.html
一.ConcurrentHashMap的模型圖
本文所有的介紹都是針對Java7而言!!!!!
下面是ConcurrentHashMap的結構圖,ConcurrentHashMap有一個segments數組,每個segment中又有一個table數組,該數組的每個元素時HashEntry類型。
可以簡單的理解為ConcurrentHashMap是多個HashMap組成,每一個HashMap是一個segment,就如傳說中一樣,ConcurrentHashMap使用分段鎖的方式,這個“段”就是segment。
ConcurrentHashMap之所以能夠保證並發安全,是因為支持對不同segment的並發修改操作,比如兩個線程同時修改ConcurrentHashMap,一個線程修改第一個segment的數據,另一個線程修改第二個segment的數據,兩個線程可以並發修改,不會出現並發問題;但是多個線程同一個segment進行並發修改,則需要先獲取該segment的鎖后再修改,修改完后釋放鎖,然后其他要修改的線程再進行修改。
那么,ConcurrentHashMap可以支持多少並發量(寫)呢?這個也就是問,ConcurrentHashMap最多能支持多少線程並發修改,其實也就是segment的數量,只要修改segment的這些線程不是修改同一個segment,那么這些線程就可以並行執行,這也就是ConcurrentHashMap的並發量(segment的數量)。
注意,ConcurrentHashMap創建完成后,也就是segment的數量、並發級別已經確定,則segment的數量和並發級別都不能再改變了,即使發生擴容,也是segment中的table進行擴容,segment的數量保持不變。
二.源碼分析-類定義
2.1 極簡ConcurrentHashMap定義
下面是省略了大部分代碼的ConcurrentHashMap定義:
package java.util.concurrent;
import java.util.AbstractMap;
import java.util.concurrent.locks.ReentrantLock;
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
final Segment<K, V>[] segments;
/**
* segment分段的定義
*/
static final class Segment<K, V> extends ReentrantLock implements Serializable {
transient volatile HashEntry<K, V>[] table;
}
/**
* 存放的元素節點
*/
static final class HashEntry<K, V> {
}
}
2.2 Segment內部類
ConcurrentHashMap內部有一個segments屬性,是Segment類型的數組,Segment類和HashMap很相似(Java7的HashMap),維持一個數組,數組的每個元素是HashEntry類型(可以理解為HashMap的節點),當發生hash沖突后,則使用拉鏈法(頭插法)來解決沖突。
而Segment數組的定義如下,省略了方法:
static final class Segment<K, V> extends ReentrantLock implements Serializable {
static final int MAX_SCAN_RETRIES = Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;
private static final long serialVersionUID = 2249069246763182397L;
// segment的負載因子(segments數組中的所有segment負載因子都相同)
final float loadFactor;
// segment中保存元素的數組
transient volatile HashEntry<K, V>[] table;
// 該segment中的元素個數
transient int count;
// modify count,該segment被修改的次數
transient int modCount;
// segment的擴容閾值
transient int threshold;
}
注意每個Segment都有負載因子、以及擴容閾值,但是后面可以看到,其實segments數組中的每一個segment,負載因子和擴容閾值都相同,因為創建的時候,都是使用0號segment的負載因子和擴容閾值。
2.3 HashEntry內部類
Segment類中有一個table數組,table數組的每個元素都是HashEntry類型,和HashMap的Entry類似,源碼如下:(省略了方法)
/**
* map中每個元素的類型
*/
static final class HashEntry<K, V> {
final int hash;
final K key;
volatile V value;
volatile HashEntry<K, V> next;
}
2.4 ConcurrentHashMap的一些常量
ConcurrentHashMap中有很多常量,
// 默認初始容量 static final int DEFAULT_INITIAL_CAPACITY = 16; // 默認的負載因子 static final float DEFAULT_LOAD_FACTOR = 0.75f; // 默認的並發級別(同時支持多少線程並發修改) // 因為JDK7中ConcurrentHashMap中是用分段鎖實現並發,不同分段的數據可以進行並發操作,同一個段的數據不能同時修改 static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 最大容量 static final int MAXIMUM_CAPACITY = 1 << 30; // 每一個分段的數組容量初始值 static final int MIN_SEGMENT_TABLE_CAPACITY = 2; // 最多能有多少個segment static final int MAX_SEGMENTS = 1 << 16; // slightly conservative // 嘗試對整個map進行操作(比如說統計map的元素數量),可能需要鎖定全部segment; // 這個常量表示鎖定所有segment前,嘗試的次數 static final int RETRIES_BEFORE_LOCK = 2;
三.常用接口源碼分析
3.1 ConcurrentHashMap構造方法
ConcurrentHashMap有多個構造方法,但是內部其實都是調用同一個進行創建:
public ConcurrentHashMap() {
this(DEFAULT_INITIAL_CAPACITY, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
/**
* 統一調用的構造方法
*
* @param initialCapacity 初始容量
* @param loadFactor 負載因子
* @param concurrencyLevel 並發級別
*/
@SuppressWarnings("unchecked")
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
// 校驗參數合法性
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0) {
throw new IllegalArgumentException();
}
// 對並發級別進行調整,不允許超過segment的數量(超過segment其實是沒有意義的)
if (concurrencyLevel > MAX_SEGMENTS) {
concurrencyLevel = MAX_SEGMENTS;
}
// 根據concurrencyLevel確定sshift和ssize的值
int ssize = 1; // ssize是表示segment的數量,ssize是不小於concurrencyLevel的數,並且是2的n次方
int sshift = 0;// sshift是ssize轉換為2進制后的位數,比如ssize為16(1000),則sshift為4
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 比如concurrencyLevel默認為16,走完循環,sshift為4,ssize為16
// 如果concurrentLevel為8,則sshift為3,ssize為8
// segment的shift偏移量
this.segmentShift = 32 - sshift;
// segment掩碼
this.segmentMask = ssize - 1;
// 對傳入的初始容量進行調整(不允許大於最大容量)
if (initialCapacity > MAXIMUM_CAPACITY) {
initialCapacity = MAXIMUM_CAPACITY;
}
// 假設傳入的容量為128,並發級別為16,則initialCapacity為128,ssize為16
int c = initialCapacity / ssize;
// c可以理解為根據傳入的初始容量,計算出每個segment中的數組容量
if (c * ssize < initialCapacity) {
++c;
}
// cap表示的是每個segment中的數組容量
int cap = MIN_SEGMENT_TABLE_CAPACITY;
// 如果默認的每個segment中的數組長度小於上面計算出來的每個segment的數組長度,則將容量翻倍
while (cap < c) {
cap <<= 1;
}
// 創建一個segment,作為segments數組的第一個segment
Segment<K, V> s0 = new Segment<K, V>(loadFactor, (int) (cap * loadFactor), new HashEntry[cap]);
// 創建segments數組
Segment<K, V>[] ss = (Segment<K, V>[]) new Segment[ssize];
// 將s0賦值給segments數組的第一個元素(偏移量為0)
UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]
// 復制segment數組
this.segments = ss;
}
/**
* 傳入map,將map中的元素加入到ConcurrentHashMap中
* 注意使用默認的負載因子(0.75)和默認的並發級別(16)
* 初始容量取map容量和默認容量的較大值
*/
public ConcurrentHashMap(Map<? extends K, ? extends V> m) {
this(Math.max((int) (m.size() / DEFAULT_LOAD_FACTOR) + 1, DEFAULT_INITIAL_CAPACITY),
DEFAULT_LOAD_FACTOR,
DEFAULT_CONCURRENCY_LEVEL);
putAll(m);
}
3.2 map.put操作
map.put,map就是指ConcurrentHashMap,其實就是確定HashEntry應該放入哪個segment中的哪個位置,所以可分為3步:
1.首先需要確定放入哪個segment;
2.確定segment后,再確定HashEntry應該放入segment的哪個位置;
3.進行覆蓋覆蓋或者插入。
/**
* put操作,注意key或者value為null時,會拋出NPE
*/
@SuppressWarnings("unchecked")
public V put(K key, V value) {
Segment<K, V> s;
if (value == null) {
throw new NullPointerException();
}
// 計算key的hash
int hash = hash(key);
// hash值右移shift位后 與 mask掩碼進行取與,確定數據應該放到哪個segments數組的哪一個segment中
int j = (hash >>> segmentShift) & segmentMask;
// 判斷計算出的segment數組位置上的segment是否為null,如果為null,則進行創建segment
if ((s = (Segment<K, V>) UNSAFE.getObject(segments, (j << SSHIFT) + SBASE)) == null) {
s = ensureSegment(j);
}
// 創建segment后,將數據put到segment中,調用的segment.put()
return s.put(key, hash, value, false);
}
3.3 創建新segment
上面put的時候,如果發現segment為null,則會進行調用ensureSegment進行創建segment,代碼如下:
/**
* 擴容(創建)segment
*
* @param k 需要擴容或者需要創建的segment位置
* @return 返回擴容后的segment
*/
@SuppressWarnings("unchecked")
private Segment<K, V> ensureSegment(int k) {
final Segment<K, V>[] ss = this.segments;
long u = (k << SSHIFT) + SBASE; // 傳入的index,對應的偏移量
Segment<K, V> seg;
// 判斷是否需要擴容或者創建segment,如果獲取到segment不為null,則返回segment
if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
Segment<K, V> proto = ss[0]; // “原型設計模式”,使用segments數組的0號位置segment
int cap = proto.table.length;// 0號segment的table長度
float lf = proto.loadFactor; // 0號segment的負載因子
int threshold = (int) (cap * lf); // 0號segment的擴容閾值
// 創建一個HashEntry的數組,數組容量和0號segment相同
HashEntry<K, V>[] tab = (HashEntry<K, V>[]) new HashEntry[cap];
// 再次檢測,指定的segment是不是為null,如果為null才進行下一步操作
if ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) { // recheck
// 創建segment
Segment<K, V> s = new Segment<K, V>(lf, threshold, tab);
// 使用CAS將新創建的segment填入指定的位置
while ((seg = (Segment<K, V>) UNSAFE.getObjectVolatile(ss, u)) == null) {
if (UNSAFE.compareAndSwapObject(ss, u, null, seg = s)) {
break;
}
}
}
}
// 返回新增的segment
return seg;
}
上面需要注意的是:
1.創建segment中的table數組時,是使用0號segment的table屬性(長度、負載因子、閾值);
2.創建segment前會進行再check,check發現segment的確為null時,再進行創建segment;
3.利用CAS來將創建的segment填入segments數組中;
3.4 segment.put操作
當確定HashEntry應該放到哪個segment后,就開始調用segment的put方法,如下:
/**
* 在確定應該存放到那個segment后,就segment.put()將k-v存入segment中
*
* @param key put的key
* @param hash hash(key)的值
* @param value put的value
* @param onlyIfAbsent true:key對應的Entry不進行覆蓋,false:key對應的entry存在與否,都進行put覆蓋
* @return
*/
final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 先獲取鎖(ReentrantLock,內部使用非公平鎖)
HashEntry<K, V> node = tryLock() ? null : scanAndLockForPut(key, hash, value);
V oldValue;
try {
HashEntry<K, V>[] tab = table;
// 根據hash值計算出在segment的table中的位置
int index = (tab.length - 1) & hash;
// 定位到segment的table的index位置(第一個節點)
HashEntry<K, V> first = entryAt(tab, index);
// 從第一個節點開始遍歷
for (HashEntry<K, V> e = first; ; ) {
// 節點不為空,則判斷是否key是否相同(相同HashEntry)
if (e != null) {
K k;
// 比較是否key是否相等(判斷put的key是否已經存在)
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
// 如果key已經存在,則進行覆蓋,如果onlyIsAbsent為false(不關心key對應的Entry是否存在)
oldValue = e.value;
if (!onlyIfAbsent) {
// 覆蓋舊值,修改計數加1
e.value = value;
++modCount;
}
break;
}
e = e.next;
} else {
// 頭插法,將put的k-v創建新HashEntry,放到first的前面
if (node != null) {
node.setNext(first);
} else {
node = new HashEntry<K, V>(hash, key, value, first);
}
// segment中table元素數量加1
int c = count + 1;
// 加1后的size大於擴容閾值,且數組的長度小於最大容量,則進行rehash
if (c > threshold && tab.length < MAXIMUM_CAPACITY) {
// 擴容,並進行rehash
rehash(node);
} else {
// 將節點放入數組中的指定位置
setEntryAt(tab, index, node);
}
// 修改次數加一,修改segment的table元素個數
++modCount;
count = c;
// 舊值為null
oldValue = null;
break;
}
}
} finally {
// 釋放鎖
unlock();
}
return oldValue;
}
梳理一下,大致在做下面幾件事:
1.先獲取鎖(ReetrantLock,內部使用非公平鎖NonFairSync);
2.獲取到鎖后根據hash計算出在table的位置;
3.遍歷table的HashEntry的鏈表,如果有相同key的,則進行覆蓋,如果沒有,在進行頭插法;
4.插入鏈表后,確定是否需要擴容,需要則執行rehash;
5.修改計數(count、modCount),並且釋放鎖。
3.5 segment.rehash擴容
segment擴容時,會將該segment的容量擴容為之前的2倍,並且將各位置的鏈表節點元素進行rehash。
/**
* 將segment的table容量擴容一倍(newCap=oldCap*2),注意只會擴容該node所在的segment
*
* @param node segment[i]->鏈表的頭結點
*/
@SuppressWarnings("unchecked")
private void rehash(HashEntry<K, V> node) {
HashEntry<K, V>[] oldTable = table;
int oldCapacity = oldTable.length;
// 新容量為舊容量的2倍
int newCapacity = oldCapacity << 1;
// 設置新的擴容閾值
threshold = (int) (newCapacity * loadFactor);
// 申請新數組,數組長度為新容量值
HashEntry<K, V>[] newTable = (HashEntry<K, V>[]) new HashEntry[newCapacity];
// 循環遍歷segment的數組(舊數組)
int sizeMask = newCapacity - 1; // 新的掩碼
for (int i = 0; i < oldCapacity; i++) {
// 獲取第i個位置的HashEntry節點,如果該節點為null,則該位置為空,不作處理
HashEntry<K, V> e = oldTable[i];
if (e != null) {
HashEntry<K, V> next = e.next;
// 計算新位置
int idx = e.hash & sizeMask;
// next為null,表示該位置只有一個節點,直接將節點復制到新位置
if (next == null) { // Single node on list
newTable[idx] = e;
} else { // Reuse consecutive sequence at same slot
HashEntry<K, V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K, V> last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// 從頭結點開始,開始將節點拷貝到新數組中
for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
V v = p.value;
int h = p.hash;
int k = h & sizeMask;
HashEntry<K, V> n = newTable[k];
newTable[k] = new HashEntry<K, V>(h, p.key, v, n);
}
}
}
}
// 將頭結點添加到segment的table中
int nodeIndex = node.hash & sizeMask; // add the new node
node.setNext(newTable[nodeIndex]);
newTable[nodeIndex] = node;
table = newTable;
}
為啥擴容的時候沒有加鎖呀?
其實已經加鎖了,只不過不是在rehash中加鎖!!!因為rehash是在map.put中調用,put的時候已經加鎖了,所以rehash中不用加鎖。
3.6 map.get操作
get操作,先定位到segment,然后定位到segment的具體位置,進行獲取
/**
* 從ConcurrentHashMap中獲取key對應的value,不需要加鎖
*/
public V get(Object key) {
Segment<K, V> s;
HashEntry<K, V>[] tab;
// 計算key的hash
int h = hash(key);
// 計算key處於哪一個segment中(index)
long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
// 獲取數組中該位置的segment,如果該segment的table不為空,那么就繼續在segment中查找,否則返回null,因為未找到
if ((s = (Segment<K, V>) UNSAFE.getObjectVolatile(segments, u)) != null && (tab = s.table) != null) {
// tab指向segment的table數組,通過hash計算定位到table數組的位置(然后開始遍歷鏈表)
HashEntry<K, V> e;
for (e = (HashEntry<K, V>) UNSAFE.getObjectVolatile(tab, ((long) (((tab.length - 1) & h)) << TSHIFT) + TBASE);
e != null; e = e.next) {
K k;
// 判斷key和hash是否匹配,匹配則證明找到要查找的數據,將數據返回
if ((k = e.key) == key || (e.hash == h && key.equals(k)))
return e.value;
}
}
return null;
}
3.7 map.remove操作
刪除節點,和get的流程差不多,先定位到segment,然后確定segment的哪個位置(哪條鏈表),遍歷鏈表,找到后進行刪除。
/**
* 刪除map中key對應的元素
*/
public V remove(Object key) {
// 計算key的hash
int hash = hash(key);
// 根據hash確定segment
Segment<K, V> s = segmentForHash(hash);
// 調用segment.remove進行刪除
return s == null ? null : s.remove(key, hash, null);
}
/**
* 刪除segment中key對應的hashEntry
* 如果傳入的value不為空,則會在value匹配的時候進行刪除,否則不操作
*/
final V segmeng.remove(Object key, int hash, Object value) {
// 獲取鎖失敗,則不斷自旋嘗試獲取鎖
if (!tryLock()) {
scanAndLock(key, hash);
}
V oldValue = null;
try {
HashEntry<K, V>[] tab = table;
// 定位到segment中table的哪個位置
int index = (tab.length - 1) & hash;
HashEntry<K, V> e = entryAt(tab, index);
HashEntry<K, V> pred = null;
// 遍歷鏈表
while (e != null) {
K k;
HashEntry<K, V> next = e.next;
// 如果key和hash都匹配
if ((k = e.key) == key || (e.hash == hash && key.equals(k))) {
V v = e.value;
// 如果沒有傳入value,則直接刪除該節點
// 如果傳入了value,比如調用的map.remove(key,value),則要value匹配才會刪除,否則不操作
if (value == null || value == v || value.equals(v)) {
if (pred == null) { // 頭結點就是要找刪除的元素,next為null,則將null賦值數組的該位置
setEntryAt(tab, index, next);
} else { //
pred.setNext(next);
}
// 修改次數加一,map數量減一
++modCount;
--count;
oldValue = v;
}
break;
}
// 不匹配時,pred保存當前一次檢測的節點,e指向下一個節點
pred = e;
e = next;
}
} finally {
unlock();// 釋放鎖
}
return oldValue;
}
3.8 map.size操作
ConcurrentHashMap的size(),需要統計每一個segment中的元素數量(也就是count值),因為不同的segment允許並發修改,統計過程中可能會出現修改操作,導致統計不准確,所以要想准確統計整個map的元素數量,可以這樣做:通過加鎖的方式來解決(將所有的segment都加鎖),這樣就能保證元素不會變化了,這是我們的想法。
而ConcurrentHashMap是這樣解決的,先嘗試不加鎖進行統計兩次,這兩次統計,不止會統計每個segment的元素數量,還會統計每個segment的modCount(修改次數),進行匯總;如果兩次統計的modCount總量相同,也就說明兩次統計期間沒有修改操作,證明統計的元素總量正確;如果兩次modCount總量不相同,表示有修改操作,則進行重試,如果重試后,發現還是不准確(modCount不匹配),那么就嘗試為所有的segment加鎖,再進行統計。
源碼如下:
/**
* 獲取ConcurrentHashMap中的元素個數,如果元素個數超過Integer.MAX_VALUE,則返回Integer.MAX_VALUE
*/
public int size() {
final Segment<K, V>[] segments = this.segments;
int size; // 返回元素數量(統計結果->元素的總量)
boolean overflow; // 標志是否溢出(是否超過Integer.MAX_VALUE)
long sum; // 所有segment的modCount總量次數(整個map的修改次數)
long last = 0L; // previous sum,上一輪統計的modCount總量
int retries = -1; // 記錄重試的次數
try {
// 此處for循環主要用於控制重試
for (; ; ) {
// 重試的次數如果達到RETRIES_BEFORE_LOCK,則強制獲取所有segment的鎖
if (retries++ == RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j) {
ensureSegment(j).lock();
// 強制獲取segment的table第i個位置,並加鎖
}
}
sum = 0L;
size = 0;
overflow = false;
// 開始對segments中的每一個segment中進行統計
for (int j = 0; j < segments.length; ++j) {
// 獲取第j個segment
Segment<K, V> seg = segmentAt(segments, j);
// 如果segment不為空,則進行統計
if (seg != null) {
sum += seg.modCount;
int c = seg.count;
// size累加
if (c < 0 || (size += c) < 0)
overflow = true;
}
}
// 如果本次統計的modCount總量和上次一樣,則表示在這兩次統計之間沒有進行過修改,則不再重試
if (sum == last) {
break;
}
// 記錄本次統計的modCount總量
last = sum;
}
} finally {
// 判斷是否加了鎖(如果retries大於RETRIES_BEFORE_LOCK),則證明加了鎖,於是進行釋放鎖
if (retries > RETRIES_BEFORE_LOCK) {
for (int j = 0; j < segments.length; ++j)
segmentAt(segments, j).unlock();
}
}
return overflow ? Integer.MAX_VALUE : size;
}
