注:在看這篇文章之前,如果對HashMap的層不清楚的話,建議先去看看HashMap源碼解析。
http://www.cnblogs.com/java-zhao/p/5106189.html
1、對於ConcurrentHashMap需要掌握以下幾點
- Map的創建:ConcurrentHashMap()
- 往Map中添加鍵值對:即put(Object key, Object value)方法
- 獲取Map中的單個對象:即get(Object key)方法
- 刪除Map中的對象:即remove(Object key)方法
- 判斷對象是否存在於Map中:containsKey(Object key)
- 遍歷Map中的對象:即keySet().iterator(),在實際中更常用的是增強型的for循環去做遍歷
2、ConcurrentHashMap的創建
注:在往下看之前,心里先有這樣一個映像:ConcurrentHashMap的數據結構:一個指定個數的Segment數組,數組中的每一個元素Segment相當於一個HashTable。
2.1、使用方法:
Map<String, Object> map = new ConcurrentHashMap<String, Object>();
2.2、源代碼:
ConcurrentHashMap相關屬性:

/** * 用於分段 */ // 根據這個數來計算segment的個數,segment的個數是僅小於這個數且是2的幾次方的一個數(ssize) static final int DEFAULT_CONCURRENCY_LEVEL = 16; // 最大的分段(segment)數(2的16次方) static final int MAX_SEGMENTS = 1 << 16; /** * 用於HashEntry */ // 默認的用於計算Segment數組中的每一個segment的HashEntry[]的容量,但是並不是每一個segment的HashEntry[]的容量 static final int DEFAULT_INITIAL_CAPACITY = 16; // 默認的加載因子(用於resize) static final float DEFAULT_LOAD_FACTOR = 0.75f; // 用於計算Segment數組中的每一個segment的HashEntry[]的最大容量(2的30次方) static final int MAXIMUM_CAPACITY = 1 << 30; /** * segments數組 * 每一個segment元素都看做是一個HashTable */ final Segment<K, V>[] segments; /** * 用於擴容 */ final int segmentMask;// 用於根據給定的key的hash值定位到一個Segment final int segmentShift;// 用於根據給定的key的hash值定位到一個Segment
Segment類(ConcurrentHashMap的內部類)

/** * 一個特殊的HashTable */ static final class Segment<K, V> extends ReentrantLock implements Serializable { private static final long serialVersionUID = 2249069246763182397L; transient volatile int count;// 該Segment中的包含的所有HashEntry中的key-value的個數 transient int modCount;// 並發標記 /* * 元素個數超出了這個值就擴容 threshold==(int)(capacity * loadFactor) * 值得注意的是,只是當前的Segment擴容,所以這是Segment自己的一個變量,而不是ConcurrentHashMap的 */ transient int threshold; transient volatile HashEntry<K, V>[] table;// 鏈表數組 final float loadFactor; /** * 這里要注意一個很不好的編程習慣,就是小寫l,容易與數字1混淆,所以最好不要用小寫l,可以改為大寫L */ Segment(int initialCapacity, float lf) { loadFactor = lf;//每個Segment的加載因子 setTable(HashEntry.<K, V> newArray(initialCapacity)); } /** * 創建一個Segment數組,容量為i */ @SuppressWarnings("unchecked") static final <K, V> Segment<K, V>[] newArray(int i) { return new Segment[i]; } /** * Sets table to new HashEntry array. Call only while holding lock or in * constructor. */ void setTable(HashEntry<K, V>[] newTable) { threshold = (int) (newTable.length * loadFactor);// 設置擴容值 table = newTable;// 設置鏈表數組 }
說明:只列出了Segement的全部屬性和創建ConcurrentHashMap時所用到的方法。
HashEntry類(ConcurrentHashMap的內部類)

/** * Segment中的HashEntry節點 類比HashMap中的Entry節點 */ static final class HashEntry<K, V> { final K key;// 鍵 final int hash;//hash值 volatile V value;// 實現線程可見性 final HashEntry<K, V> next;// 下一個HashEntry HashEntry(K key, int hash, HashEntry<K, V> next, V value) { this.key = key; this.hash = hash; this.next = next; this.value = value; } /* * 創建HashEntry數組,容量為傳入的i */ @SuppressWarnings("unchecked") static final <K, V> HashEntry<K, V>[] newArray(int i) { return new HashEntry[i]; } }
ConcurrentHashMap(int initialCapacity,float loadFactor,int concurrencyLevel)

1 /** 2 * 創建ConcurrentHashMap 3 * @param initialCapacity 用於計算Segment數組中的每一個segment的HashEntry[]的容量, 但是並不是每一個segment的HashEntry[]的容量 4 * @param loadFactor 5 * @param concurrencyLevel 用於計算Segment數組的大小(可以傳入不是2的幾次方的數,但是根據下邊的計算,最終segment數組的大小ssize將是2的幾次方的數) 6 * 7 * 步驟: 8 * 這里以默認的無參構造器參數為例,initialCapacity==16,loadFactor==0.75f,concurrencyLevel==16 9 * 1)檢查各參數是否符合要求 10 * 2)根據concurrencyLevel(16),計算Segment[]的容量ssize(16)與擴容移位條件sshift(4) 11 * 3)根據sshift與ssize計算將來用於定位到相應Segment的參數segmentShift與segmentMask 12 * 4)根據ssize創建Segment[]數組,容量為ssize(16) 13 * 5)根據initialCapacity(16)與ssize計算用於計算HashEntry[]容量的參數c(1) 14 * 6)根據c計算HashEntry[]的容量cap(1) 15 * 7)根據cap與loadFactor(0.75)為每一個Segment[i]都實例化一個Segment 16 * 8)每一個Segment的實例化都做下面這些事兒: 17 * 8.1)為當前的Segment初始化其loadFactor為傳入的loadFactor(0.75) 18 * 8.2)創建一個HashEntry[],容量為傳入的cap(1) 19 * 8.3)根據創建出來的HashEntry的容量(1)和初始化的loadFactor(0.75),計算擴容因子threshold(0) 20 * 8.4)初始化Segment的table為剛剛創建出來的HashEntry 21 */ 22 public ConcurrentHashMap(int initialCapacity,float loadFactor,int concurrencyLevel) { 23 // 檢查參數情況 24 if (loadFactor <= 0f || initialCapacity < 0 || concurrencyLevel <= 0) 25 throw new IllegalArgumentException(); 26 27 if (concurrencyLevel > MAX_SEGMENTS) 28 concurrencyLevel = MAX_SEGMENTS; 29 30 /** 31 * 找一個能夠正好小於concurrencyLevel的數(這個數必須是2的幾次方的數) 32 * eg.concurrencyLevel==16==>sshift==4,ssize==16 33 * 當然,如果concurrencyLevel==15也是上邊這個結果 34 */ 35 int sshift = 0; 36 int ssize = 1;// segment數組的長度 37 while (ssize < concurrencyLevel) { 38 ++sshift; 39 ssize <<= 1;// ssize=ssize*2 40 } 41 42 segmentShift = 32 - sshift;// eg.segmentShift==32-4=28 用於根據給定的key的hash值定位到一個Segment 43 segmentMask = ssize - 1;// eg.segmentMask==16-1==15 用於根據給定的key的hash值定位到一個Segment 44 this.segments = Segment.newArray(ssize);// 構造出了Segment[ssize]數組 eg.Segment[16] 45 46 /* 47 * 下面將為segment數組中添加Segment元素 48 */ 49 if (initialCapacity > MAXIMUM_CAPACITY) 50 initialCapacity = MAXIMUM_CAPACITY; 51 int c = initialCapacity / ssize;// eg.initialCapacity==16,c==16/16==1 52 if (c * ssize < initialCapacity)// eg.initialCapacity==17,c==17/16=1,這時1*16<17,所以c=c+1==2 53 ++c;// 為了少執行這一句,最好將initialCapacity設置為2的幾次方 54 int cap = 1;// 每一個Segment中的HashEntry[]的初始化容量 55 while (cap < c) 56 cap <<= 1;// 創建容量 57 58 for (int i = 0; i < this.segments.length; ++i) 59 // 這一塊this.segments.length就是ssize,為了不去計算這個值,可以直接改成i<ssize 60 this.segments[i] = new Segment<K, V>(cap, loadFactor); 61 }
注意:這個方法里邊我在頭部所寫的注釋非常重要,在這塊注釋寫明了:
- 每一個參數的作用
- 整個ConcurrentHashMap的一個創建步驟(以默認的參數值為例)
ConcurrentHashMap()

/** * 創建ConcurrentHashMap */ public ConcurrentHashMap() { this(DEFAULT_INITIAL_CAPACITY, // 16 DEFAULT_LOAD_FACTOR, // 0.75f DEFAULT_CONCURRENCY_LEVEL);// 16 }
該方法調用了上邊的三參構造器。
五點注意:
- 傳入的concurrencyLevel只是用於計算Segment數組的大小(可以傳入不是2的幾次方的數,但是根據下邊的計算,最終segment數組的大小ssize將是2的幾次方的數),並非真正的Segment數組的大小
- 傳入的initialCapacity只是用於計算Segment數組中的每一個segment的HashEntry[]的容量, 但是並不是每一個segment的HashEntry[]的容量,而每一個HashEntry[]的容量不是2的幾次方
- 非常值得注意的是,在默認情況下,創建出的HashEntry[]數組的容量為1,並不是傳入的initialCapacity(16),證實了上一點;而每一個Segment的擴容因子threshold,一開始算出來是0,即開始put第一個元素就要擴容,不太理解JDK為什么這樣做。
- 想要在初始化時擴大HashEntry[]的容量,可以指定initialCapacity參數,且指定時最好指定為2的幾次方的一個數,這樣的話,在代碼執行中可能會少執行一句"c++",具體參看三參構造器的注釋
- 對於Concurrenthashmap的擴容而言,只會擴當前的Segment,而不是整個Concurrenthashmap中的所有Segment都擴
兩點改進:
在Concurrenthashmap的構造過程中,相對於JDK的代碼,有兩點改進:
- 在遍歷Segment數組為每一個數組元素實例化的時候,可以直接寫作i<ssize,而不必在每次循環時都去計算this.segments.length,JDK代碼如下,可以按照代碼中的注釋做優化
for (int i = 0; i < this.segments.length; ++i) // 這一塊this.segments.length就是ssize,為了不去計算這個值,可以直接改成i<ssize this.segments[i] = new Segment<K, V>(cap, loadFactor);
- 另外一個,就是在程序中,盡量少用小寫"l",容易與數字"1"混淆,要么不用"l",若要用的話,就用大寫"L",JDK代碼如下,可參照注釋進行優化:
/** * 這里要注意一個很不好的編程習慣,就是小寫l,容易與數字1混淆,所以最好不要用小寫l,可以改為大寫L */ Segment(int initialCapacity, float lf) { loadFactor = lf;//每個Segment的加載因子 setTable(HashEntry.<K, V> newArray(initialCapacity)); }
一個疑問:
- 在默認情況下,創建出的HashEntry[]數組的容量為1,而每一個Segment的擴容因子threshold,一開始算出來是0,即開始put第一個元素就要擴容,不太理解JDK為什么這樣做。在我們實際開發中,其實空間有的是,所以我們一般會采用"以適當的空間換取時間"的方式,所以我們會適當的擴大HashEntry[],以確保在put數據的時候盡量減少擴容才對,但是JDK這樣做到底是為了什么?是為了減少空間嗎?還是我本身的理解就有問題?求大神指點!!!
- 注意我上邊說的適當容量,是因為如果容量設的太大,可能會導致某個HashEntry[i]中的HashEntry鏈表過長,進而影響查詢的效率,容量設的太小的話,有需要不斷擴容,影響插入效率。
3、put(Object key, Object value)
上述方法,若添加已有key的key-value對,則新值覆蓋舊值。
putIfAbsent(K key, V value):若添加已有key的key-value對,直接返回舊值,則新值相當於沒有添加。
使用方法:
map.put("hello", "world");
源代碼:
ConcurrentHashMap的put(Object key, Object value)方法

/** * 將key-value放入map * 注意:key和value都不可以為空 * 步驟: * 1)計算key.hashCode()的hash值 * 2)根據hash值定位到某個Segment * 3)調用Segment的put()方法 * Segment的put()方法: * 1)上鎖 * 2)從主內存中讀取key-value對個數count * 3)count+1如果大於threshold,執行rehash() * 4)計算將要插入的HashEntry[]的下標index * 5)獲取HashEntry的頭節點HashEntry[index]-->first * 6)從頭結點開始遍歷整個HashEntry鏈表, * 6.1)若找到與key和hash相同的節點,則判斷onlyIfAbsent如果為false,新值覆蓋舊值,返回舊值;如果為true,則直接返回舊值(相當於不添加重復key的元素) * 6.2)若沒有找到與key和hash相同的節點,則創建新節點HashEntry,並將之前的有節點作為新節點的next,即將新節點放入鏈頭,然后將新節點賦值給HashEntry[index],將count強制寫入主內存,最后返回null */ public V put(K key, V value) { if (key == null || value == null) throw new NullPointerException(); int hash = hash(key.hashCode());//計算key.hashCode()的hash值 /** * 根據hash值定位到某個Segment,調用Segment的put()方法 */ return segmentFor(hash).put(key, hash, value, false); }
注意:
- key和value都不可為null,這一點與HashMap不同,但是從代碼來看,並沒有判斷key為空的情況,這一段代碼在哪里呢?為了可讀性,建議將判斷的地方改為如下代碼
if (key == null || value == null) throw new NullPointerException();
- 注釋部分寫明了整個插入流程,詳細的流程步驟見代碼,這里列出大致流程
- 根據key獲取key.hashCode的hash值
- 根據hash值算出將要插入的Segment
- 根據hash值與Segment中的HashEntry的容量-1按位與獲取將要插入的HashEntry的index
- 若HashEntry[index]中的HashEntry鏈表有與插入元素相同的key和hash值,根據onlyIfAbsent決定是否替換舊值
- 若沒有相同的key和hash,直接返回將新節點插入鏈頭,原來的頭節點設為新節點的next(采用的方式與HashMap一致,都是HashEntry替換的方法)
Segment的put(K key, int hash, V value, boolean onlyIfAbsent)

/** * 往當前segment中添加key-value * 注意: * 1)onlyIfAbsent-->false如果有舊值存在,新值覆蓋舊值,返回舊值;true如果有舊值存在,則直接返回舊值,相當於不添加元素(不可添加重復key的元素) * 2)ReentrantLock的用法 * 3)volatile只能配合鎖去使用才能實現原子性 */ V put(K key, int hash, V value, boolean onlyIfAbsent) { lock();//加鎖:ReentrantLock try { int c = count;//當前Segment中的key-value對(注意:由於count是volatile型的,所以讀的時候工作內存會從主內存重新加載count值) if (c++ > threshold) // 需要擴容 rehash();//擴容 HashEntry<K, V>[] tab = table; int index = hash & (tab.length - 1);//按位與獲取數組下標:與HashMap相同 HashEntry<K, V> first = tab[index];//獲取相應的HashEntry[i]中的頭節點 HashEntry<K, V> e = first; //一直遍歷到與插入節點的hash和key相同的節點e;若沒有,最后e==null while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue;//舊值 if (e != null) {//table中已經有與將要插入節點相同hash和key的節點 oldValue = e.value;//獲取舊值 if (!onlyIfAbsent) e.value = value;//false 覆蓋舊值 true的話,就不添加元素了 } else {//table中沒有與將要插入節點相同hash或key的節點 oldValue = null; ++modCount; tab[index] = new HashEntry<K, V>(key, hash, first, value);//將頭節點作為新節點的next,所以新加入的元素也是添加在鏈頭 count = c; //設置key-value對(注意:由於count是volatile型的,所以寫的時候工作內存會立即向主內存重新寫入count值) } return oldValue; } finally { unlock();//手工釋放鎖 } }
注意:在注釋中已經寫明了,這里還是要寫一下
- onlyIfAbsent-->false如果有舊值存在,新值覆蓋舊值,返回舊值;true如果有舊值存在,則直接返回舊值,相當於不添加元素
- ReentrantLock的用法:必須手工釋放鎖。可實現Synchronized的效果,原子性。
- volatile需要配合鎖去使用才能實現原子性,否則在多線程操作的情況下依然不夠用,在程序中,count變量(當前Segment中的key-value對個數)通過volatile修飾,實現內存可見性(關於內存可見性以后會仔細去記錄,這里列出大概的一個流程)在有鎖保證了原子性的情況下
- 當我們讀取count變量的時候,會強制從主內存中讀取count的最新值
- 當我們對count變量進行賦值之后,會強制將最新的count值刷到主內存中去
- 通過以上兩點,我們可以保證在高並發的情況下,執行這段流程的線程可以讀取到最新值
- 在這里的ReentrantLock與volatile結合的用法值得我們學習
補:volatile的介紹見《附2 volatile》,鏈接如下:
http://www.cnblogs.com/java-zhao/p/5125698.html
hash(int h)

/** * 對key.hashCode()進行hash計算 * @param h key.hashCode() */ private static int hash(int h) { // Spread bits to regularize both segment and index locations, // using variant of single-word Wang/Jenkins hash. h += (h << 15) ^ 0xffffcd7d; h ^= (h >>> 10); h += (h << 3); h ^= (h >>> 6); h += (h << 2) + (h << 14); return h ^ (h >>> 16); }
segmentFor(int hash)

/** * 根據給定的key的hash值定位到一個Segment * @param hash */ final Segment<K, V> segmentFor(int hash) { return segments[(hash >>> segmentShift) & segmentMask]; }
注意:hash(int h)與segmentFor(int hash)這兩個方法應該會盡量將key的hash值打散,從而保證盡可能多的同時在多個Segment上進行put操作,而不是在同一個Segment上執行多個put操作,這樣之后,在同一個Segment中,要盡可能的保證向HashEntry[]的不同元素上進行put,而不是向同一個元素上一直put,以上兩個函數究竟是怎樣保證實現這樣的將hash打散的效果呢?求大神指點啊!!!
rehash()
JDK的實現代碼:

/** * 步驟: * 需要注意的是:同一個桶下邊的HashEntry鏈表中的每一個元素的hash值不一定相同,只是hash&(table.length-1)的結果相同 * 1)創建一個新的HashEntry數組,容量為舊數組的二倍 * 2)計算新的threshold * 3)遍歷舊數組的每一個元素,對於每一個元素 * 3.1)根據頭節點e重新計算將要存入的新數組的索引idx * 3.2)若整個鏈表只有一個節點e,則是直接將e賦給newTable[idx]即可 * 3.3)若整個鏈表還有其他節點,先算出最后一個節點lastRun的位置lastIdx,並將最后一個節點賦值給newTable[lastIdx] * 3.4)最后將從頭節點開始到最后一個節點之前的所有節點計算其將要存儲的索引k,然后創建新節點,將新節點賦給newTable[k],並將之前newTable[k]上存在的節點作為新節點的下一節點 */ void rehash() { HashEntry<K, V>[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity >= MAXIMUM_CAPACITY) return; HashEntry<K, V>[] newTable = HashEntry.newArray(oldCapacity << 1);//擴容為原來二倍 threshold = (int) (newTable.length * loadFactor);//計算新的擴容臨界值 int sizeMask = newTable.length - 1; for (int i = 0; i < oldCapacity; i++) { // We need to guarantee that any existing reads of old Map can // proceed. So we cannot yet null out each bin. HashEntry<K, V> e = oldTable[i];//頭節點 if (e != null) { HashEntry<K, V> next = e.next; int idx = e.hash & sizeMask;//重新按位與計算將要存放的新數組中的索引 if (next == null)//如果是只有一個頭節點,只需將頭節點設置到newTable[idx]即可 newTable[idx] = e; else { // Reuse trailing consecutive sequence at same slot HashEntry<K, V> lastRun = e; int lastIdx = idx;//存放最后一個元素將要存儲的數組索引 //查找到最后一個元素,並設置相關信息 for (HashEntry<K, V> last = next; last != null; last = last.next) { int k = last.hash & sizeMask; if (k != lastIdx) { lastIdx = k; lastRun = last; } } newTable[lastIdx] = lastRun;//存放最后一個元素 // Clone all remaining nodes for (HashEntry<K, V> p = e; p != lastRun; p = p.next) { int k = p.hash & sizeMask; HashEntry<K, V> n = newTable[k];//獲取newTable[k]已經存在的HashEntry,並將此HashEntry賦給n //創建新節點,並將之前的n作為新節點的下一節點 newTable[k] = new HashEntry<K, V>(p.key, p.hash, n,p.value); } } } } table = newTable; }
個人感覺JDK的實現方式比較拖沓,改造后的代碼如下,如有問題,請指出!!!
我對其進行改造后的實現代碼:

/** * 步驟: * 需要注意的是:同一個桶下邊的HashEntry鏈表中的每一個元素的hash值不一定相同,只是hash&(table.length-1)的結果相同 * 1)創建一個新的HashEntry數組,容量為舊數組的二倍 * 2)計算新的threshold * 3)遍歷舊數組的每一個元素,對於每一個元素(即一個鏈表) * 3.1)獲取頭節點e * 3.2)從頭節點開始到最后一個節點(null之前的那個節點)的所有節點計算其將要存儲的索引k,然后創建新節點,將新節點賦給newTable[k],並將之前newTable[k]上存在的節點作為新節點的下一節點 */ void rehash() { HashEntry<K, V>[] oldTable = table; int oldCapacity = oldTable.length; if (oldCapacity >= MAXIMUM_CAPACITY) return; HashEntry<K, V>[] newTable = HashEntry.newArray(oldCapacity << 1);//擴容為原來二倍 threshold = (int) (newTable.length * loadFactor);//計算新的擴容臨界值 int sizeMask = newTable.length - 1; for (int i = 0; i < oldCapacity; i++) {//遍歷每一個數組元素 // We need to guarantee that any existing reads of old Map can // proceed. So we cannot yet null out each bin. HashEntry<K, V> e = oldTable[i];//頭節點 if (e != null) { for (HashEntry<K, V> p = e; p != null; p = p.next) {//遍歷數組元素中的鏈表 int k = p.hash & sizeMask; HashEntry<K, V> n = newTable[k];//獲取newTable[k]已經存在的HashEntry,並將此HashEntry賦給n //創建新節點,並將之前的n作為新節點的下一節點 newTable[k] = new HashEntry<K, V>(p.key, p.hash, n,p.value); } } } table = newTable; }
注意點:
- 同一個桶下邊的HashEntry鏈表中的每一個元素的hash值不一定相同,只是index = hash&(table.length-1)的結果相同,當table.length發生變化時,同一個桶下各個HashEntry算出來的index會不同。
總結:ConcurrentHashMap基於concurrencyLevel划分出多個Segment來存儲key-value,這樣的話put的時候只鎖住當前的Segment,可以避免put的時候鎖住整個map,從而減少了並發時的阻塞現象。
4、get(Object key)
使用方法:
map.get("hello");
源代碼:
ConcurrentHashMap的get(Object key)

/** * 根據key獲取value * 步驟: * 1)根據key獲取hash值 * 2)根據hash值找到相應的Segment * 調用Segment的get(Object key, int hash) * 3)根據hash值找出HashEntry數組中的索引index,並返回HashEntry[index] * 4)遍歷整個HashEntry[index]鏈表,找出hash和key與給定參數相等的HashEntry,例如e, * 4.1)如沒找到e,返回null * 4.2)如找到e,獲取e.value * 4.2.1)如果e.value!=null,直接返回 * 4.2.2)如果e.value==null,則先加鎖,等並發的put操作將value設置成功后,再返回value值 */ public V get(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).get(key, hash); }
Segment的get(Object key, int hash)

/** * 根據key和hash值獲取value */ V get(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K, V> e = getFirst(hash);//找到HashEntry[index] while (e != null) {//遍歷整個鏈表 if (e.hash == hash && key.equals(e.key)) { V v = e.value; if (v != null) return v; /* * 如果V等於null,有可能是當下的這個HashEntry剛剛被創建,value屬性還沒有設置成功, * 這時候我們讀到是該HashEntry的value的默認值null,所以這里加鎖,等待put結束后,返回value值 */ return readValueUnderLock(e); } e = e.next; } } return null; }
Segment的getFirst(int hash)

/** * 根據hash值找出HashEntry數組中的索引index,並返回HashEntry[index] */ HashEntry<K, V> getFirst(int hash) { HashEntry<K, V>[] tab = table; return tab[hash & (tab.length - 1)]; }
Segment的readValueUnderLock(HashEntry<K, V> e)

V readValueUnderLock(HashEntry<K, V> e) { lock(); try { return e.value; } finally { unlock(); } }
注意點:
- 注釋很重要,一定要看
- 注釋已經寫明了詳細流程,這里說一下大致流程:
- 根據key獲取hash值
- 根據hash值找到相應的Segment
- 根據hash值找出Segment中的哪一個HashEntry[index]
- 遍歷整個HashEntry[index]鏈表,找出hash和key與給定參數相等的HashEntry,例如e
- 如沒找到e,返回null
- 如找到e,獲取e.value
- 如果e.value!=null,直接返回
- 如果e.value==null,則先加鎖,等並發的put操作將value設置成功后,再返回value值
- 如果e.value!=null,直接返回
- 如沒找到e,返回null
- 根據key獲取hash值
- 對於get操作而言,基本沒有鎖,只有當找到了e且e.value等於null,有可能是當下的這個HashEntry剛剛被創建,value屬性還沒有設置成功,這時候我們讀到是該HashEntry的value的默認值null,所以這里加鎖,等待put結束后,返回value值
- 據說,上邊這一點還沒有發生過
5、remove(Object key)
使用方法:
map.remove("hello");
源代碼:
ConcurrentHashMap的remove(Object key)

/** * 刪除指定key的元素 * 步驟: * 1)根據key獲取hash值 * 2)根據hash值獲取Segment * 調用Segment的remove(Object key, int hash, Object value) * 1)count-1 * 2)獲取將要刪除的元素所在的HashEntry[index] * 3)遍歷鏈表, * 3.1)若沒有hash和key都與指定參數相同的節點e,返回null * 3.2)若有e,刪除指定節點e,並將e之前的節點重新排序后,將排序后的最后一個節點的下一個節點指定為e的下一個節點 * (很繞,不知道JDK為什么這樣實現) */ public V remove(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).remove(key, hash, null); }
Segment的remove(Object key, int hash, Object value)

V remove(Object key, int hash, Object value) { lock(); try { int c = count - 1;//key-value對個數-1 HashEntry<K, V>[] tab = table; int index = hash & (tab.length - 1); HashEntry<K, V> first = tab[index];//獲取將要刪除的元素所在的HashEntry[index] HashEntry<K, V> e = first; //從頭節點遍歷到最后,若未找到相關的HashEntry,e==null,否則,有 while (e != null && (e.hash != hash || !key.equals(e.key))) e = e.next; V oldValue = null; if (e != null) {//將要刪除的節點e V v = e.value; if (value == null || value.equals(v)) { oldValue = v; // All entries following removed node can stay // in list, but all preceding ones need to be // cloned. ++modCount; HashEntry<K, V> newFirst = e.next; /* * 從頭結點遍歷到e節點,這里將e節點刪除了,但是刪除節點e的前邊的節點會倒序 * eg.原本的順序:E3-->E2-->E1-->E0,刪除E1節點后的順序為:E2-->E3-->E0 * E1前的節點倒序排列了 */ for (HashEntry<K, V> p = first; p != e; p = p.next) newFirst = new HashEntry<K, V>(p.key, p.hash, newFirst, p.value); tab[index] = newFirst; count = c; // write-volatile } } return oldValue; } finally { unlock(); } }
注意:具體的實現方式看注釋,個人感覺比較繞,所以有興趣的朋友可以按照如下步驟實現了一遍:(實現的過程可以參照HashMap的remove(Object key))
- 根據key獲取hash值
- 根據hash值獲取Segment
- 獲取將要刪除的元素所在的HashEntry[index]
- 遍歷鏈表
- 若沒有hash和key都與指定參數相同的節點e,返回null
- 若有e,刪除指定節點e,並將e的前一個節點的next指向e的下一個節點,之后count-1
6、containsKey(Object key)
使用方法:

map.containsKey("hello")
源代碼:
ConcurrentHashMap的containsKey(Object key)

/** * 是否包含指定key的數據 * 步驟: * 1)根據key計算hash值 * 2)根據hash獲取相應的Segment * 調用Segment的containsKey(Object key, int hash) * 3)根據hash值找出HashEntry數組中的索引index,並返回HashEntry[index] * 4)遍歷整個HashEntry[index]鏈表,找出hash和key與給定參數相等的HashEntry,例如e, * 4.1)如找到e,返回true * 4.2)如沒找到e,返回false */ public boolean containsKey(Object key) { int hash = hash(key.hashCode()); return segmentFor(hash).containsKey(key, hash); }
Segment的containsKey(Object key, int hash)

boolean containsKey(Object key, int hash) { if (count != 0) { // read-volatile HashEntry<K, V> e = getFirst(hash); while (e != null) { if (e.hash == hash && key.equals(e.key)) return true; e = e.next; } } return false; }
說明:代碼清晰簡單,流程步驟查看注釋即可
7、keySet().iterator()
使用方法:

Map<String, Object> map = new ConcurrentHashMap<String, Object>(); map.put("hello3", "world2"); map.put("hello2", "world"); for(String key : map.keySet()){ System.out.println(map.get(key)); }
源代碼不寫了。
流程:
遍歷每個Segment中的HashEntry[],完成所有對象的讀取,不加鎖。
8、size()
源代碼:

/** * 計算map中的key-value對總數 * 步驟: * 1)遍歷所有段,計算總的count值sum,計算總的modCount值 * 2)如果有數據的話(modCount!=0),再遍歷所有段一遍,計算總的count值check,在這期間只要有一個段的modCount發生了變化,就再重復如上動作兩次 * 3)若三次后,還未成功,遍歷所有Segment,分別加鎖(即建立全局鎖),然后計算,最后釋放所有鎖 */ public int size() { final Segment<K, V>[] segments = this.segments; long sum = 0;//總量 long check = 0;//標志位 int[] mc = new int[segments.length];//存放每個段的modCount for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) { check = 0; sum = 0;//總的count值 int mcsum = 0;//總的modCount值 for (int i = 0; i < segments.length; ++i) {//遍歷所有段 sum += segments[i].count;//計算總的count值 mcsum += mc[i] = segments[i].modCount;//計算總的modCount值 } if (mcsum != 0) {//有數據的話,再檢查一遍 for (int i = 0; i < segments.length; ++i) { check += segments[i].count;//計算總的count if (mc[i] != segments[i].modCount) {//只要有一個段發生了變化(在遍歷期間發生了增刪變化) check = -1; break;//跳出所有循環 } } } if (check == sum)//成功 break; } if (check != sum) { //以上三次都為成功的話 sum = 0; //每一個段全部加鎖(相當於加了一個全局鎖) for (int i = 0; i < segments.length; ++i) segments[i].lock(); //進行統計 for (int i = 0; i < segments.length; ++i) sum += segments[i].count; //全部解鎖 for (int i = 0; i < segments.length; ++i) segments[i].unlock(); } if (sum > Integer.MAX_VALUE) return Integer.MAX_VALUE; else return (int) sum; }
在不加鎖的情況下遍歷所有Segment,讀取每個Segment的count和modCount,並進行統計;
完畢后,再遍歷一遍所有Segment,比較modCount,是否發生了變化,若發生了變化,則再重復如上動作兩次;
若三次后,還未成功,遍歷所有Segment,分別加鎖(即建立全局鎖),然后計算,最后釋放所有鎖。
注:以如上的方式,大部分情況下,不需要加鎖就可以獲取size()
總結:
- 數據結構:一個指定個數的Segment數組,數組中的每一個元素Segment相當於一個HashTable
- 加鎖情況(分段鎖):
- put
- get中找到了hash與key都與指定參數相同的HashEntry,但是value==null的情況
- remove
- size():三次嘗試后,還未成功,遍歷所有Segment,分別加鎖(即建立全局鎖)