基於 jdk1.8
Java並發包中提供的一個線程安全且高效的HashMap實現,可以完全替代HashTable,在並發編程的場景中使用頻率非常之高。
可能大多人只是知道它使用了多個鎖代替HashTable中的單個鎖,也就是鎖分離技術(Lock Stripping)
實現原理
1.8之前ConcurrentHashMap是使用Segment段來進行,一個段就相當於一個HashMap的數據結構,每個段使用一個鎖
1.8之后Segment雖保留,但已經簡化屬性,僅僅是為了兼容舊版本,使用和HashMap一樣的數據結構每個數組位置使用一個鎖
再學習中始終要考慮多線程的情況
一.類加載
主要知道以下的一些初始化值
private static final int MAXIMUM_CAPACITY = 1 << 30; //數組最大大小 同HashMap
private static final int DEFAULT_CAPACITY = 16;//數組默認大小
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16; //兼容舊版保留的值,默認線程並發度,類似信號量
private static final float LOAD_FACTOR = 0.75f;//默認map擴容比例,實際用(n << 1) - (n >>> 1)代替了更高效
static final int TREEIFY_THRESHOLD = 8; // 鏈表轉樹閥值,大於8時
static final int UNTREEIFY_THRESHOLD = 6; //樹轉鏈表閥值,小於等於6(tranfer時,lc、hc=0兩個計數器分別++記錄原bin、新binTreeNode數量,<=UNTREEIFY_THRESHOLD 則untreeify(lo))。【僅在擴容tranfer時才可能樹轉鏈表】
static final int MIN_TREEIFY_CAPACITY = 64;
private static final int MIN_TRANSFER_STRIDE = 16;//擴容轉移時的最小數組分組大小
private static int RESIZE_STAMP_BITS = 16;//本類中沒提供修改的方法 用來根據n生成位置一個類似時間搓的功能
private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1; // 2^15-1,help resize的最大線程數
private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS; // 32-16=16,sizeCtl中記錄size大小的偏移量
static final int MOVED = -1; // hash for forwarding nodes(forwarding nodes的hash值)、標示位
static final int TREEBIN = -2; // hash for roots of trees(樹根節點的hash值)
static final int RESERVED = -3; // 保留
static final int HASH_BITS = 0x7fffffff; // 用在計算hash時進行安位與計算消除負hash
static final int NCPU = Runtime.getRuntime().availableProcessors(); // 可用處理器數量
二、put數據
這里先看使用空構造方法產生實例的使用,這個沒問題了,在學習其它有參的構造方法也沒什么大問題了
完全理解了put基本ConcurrentHashMap就理解一大半了主要思想也理解
public V put(K key, V value) { return putVal(key, value, false); } final V putVal(K key, V value, boolean onlyIfAbsent) { 1 if (key == null || value == null) throw new NullPointerException(); 2 int hash = spread(key.hashCode()); 3 int binCount = 0; 4 for (Node<K,V>[] tab = table;;) { 5 Node<K,V> f; int n, i, fh; 6 if (tab == null || (n = tab.length) == 0) 7 tab = initTable(); 8 else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 9 if (casTabAt(tab, i, null, 10 new Node<K,V>(hash, key, value, null))) 11 break; // no lock when adding to empty bin 12 } 13 else if ((fh = f.hash) == MOVED) 14 tab = helpTransfer(tab, f); 15 else { 16 V oldVal = null; 17 synchronized (f) { 18 if (tabAt(tab, i) == f) { 19 if (fh >= 0) { 20 binCount = 1; 21 for (Node<K,V> e = f;; ++binCount) { 22 K ek; 23 if (e.hash == hash && 24 ((ek = e.key) == key || 25 (ek != null && key.equals(ek)))) { 26 oldVal = e.val; 27 if (!onlyIfAbsent) 28 e.val = value; 29 break; 30 } 31 Node<K,V> pred = e; 32 if ((e = e.next) == null) { 33 pred.next = new Node<K,V>(hash, key, 34 value, null); 35 break; 36 } 37 } 38 } 39 else if (f instanceof TreeBin) { 40 Node<K,V> p; 41 binCount = 2; 42 if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, 43 value)) != null) { 44 oldVal = p.val; 45 if (!onlyIfAbsent) 46 p.val = value; 47 } 48 } 49 } 50 } 51 if (binCount != 0) { 52 if (binCount >= TREEIFY_THRESHOLD) 53 treeifyBin(tab, i); 54 if (oldVal != null) 55 return oldVal; 56 break; 57 } 58 } 59 } 60 addCount(1L, binCount); 61 return null; } 一句一句學習 1. if (key == null || value == null) throw new NullPointerException();//key value不能null 2. int hash = spread(key.hashCode()); static final int spread(int h) { return (h ^ (h >>> 16)) & HASH_BITS; } 與hashmap計算hash基本一樣,但多了一步& HASH_BITS,HASH_BITS是0x7fffffff,該步是為了消除最高位上的負符號 hash的負在ConcurrentHashMap中有特殊意義表示在擴容或者是樹節點 4. for (Node<K,V>[] tab = table;;) { 死循環 table即map的基本Node數組 Node: final int hash; final K key; volatile V val; volatile Node<K,V> next; 基本結構和hashmap的node沒區別,區別在val和next是volatile的即保證線程可見性,為后面的多線程服務 7. tab = initTable();//table為null或空map時進行初始化 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) {//死循環以完成初始化 if ((sc = sizeCtl) < 0) Thread.yield(); // lost initialization race; just spin -1代表正在初始化或者擴容,則本線程退讓,依賴上面的死循環繼續初始化 else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {//CAS判斷SIZECTL和sc相同SIZECTL賦值-1表示正在初始化,只有一個線程進行初始化其它線程在上個if卡住 try { if ((tab = table) == null || tab.length == 0) { //第一個線程初始化之后,第二個線程還會進來所以需要再次判斷一次 int n = (sc > 0) ? sc : DEFAULT_CAPACITY; @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt;//創建基本Node數組結構 sc = n - (n >>> 2);//擴容閥值 實際就是0.75*n 寫法略叼更高端比直接乘高效 } } finally { sizeCtl = sc; } break; } } return tab; } 8. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { 判斷當前索引位置的數組上是否有值 無值那就是首元素插入 i = (n - 1) & hash計算索引位置 tabAt(tab, i = (n - 1) & hash) //獲取數組i索引的Node CAS操作 保持其它線程對table的改變在這里可見 static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } 9. if (casTabAt(tab, i, null,new Node<K,V>(hash, key, value, null))) 根據8該索引為null則插入值,casTabAt主要用於CAS比較該索引處是否為null防止其它線程已改變該值,null則插入,break結束死循環 13. else if ((fh = f.hash) == MOVED) 首地址處不null 並且Node的hash是-1 表示是ForwardingNode節點正在rehash擴容 14. tab = helpTransfer(tab, f); 幫助擴容 這個和擴容單學習不在這分析了 17. synchronized (f) { //到這也就說明了有hash沖突也不需要幫助擴容 最直接的操作synchronized鎖f,即本索引處的首節點,可能是鏈表或紅黑樹 有了synchronized那么后續的沖突插入就簡單了也不需要考慮多線程問題了 18. if (tabAt(tab, i) == f) {//這里volatile獲取首節點與8處獲取的首節點對比判斷f還是不是首節點 注意因為是多線程的關系雖然remove等操作也會鎖首節點但在從第8處執行到鎖這里的時候這里的首節點是完全存在被其它線程干掉或者空旋的情況的 19. if (fh >= 0) { //fh即節點的的二次hash值,判斷是為了等待擴容完成 39. else if (f instanceof TreeBin) { //fh<0 -2表示紅黑樹節點 是樹直接添加樹節點 之后的的數據插入過程有了鎖就和hashmap一樣了,最后判斷是否超過8鏈表變紅黑樹 60. addCount(1L, binCount); //這就是最后的插入數據后根據一定條件來進行擴容的方法了 binCount 鏈表時 記錄鏈表長度
三、addCount計數並判斷是否擴容
private final void addCount(long x, int check) { 1 CounterCell[] as; long b, s; 2 if ((as = counterCells) != null || 3 !U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { 4 CounterCell a; long v; int m; 5 boolean uncontended = true; 6 if (as == null || (m = as.length - 1) < 0 || 7 (a = as[ThreadLocalRandom.getProbe() & m]) == null || 8 !(uncontended = 9 U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { 10 fullAddCount(x, uncontended); 11 return; 12 } 13 if (check <= 1) 14 return; 15 s = sumCount(); 16 } 17 if (check >= 0) { 18 Node<K,V>[] tab, nt; int n, sc; 19 while (s >= (long)(sc = sizeCtl) && (tab = table) != null && 20 (n = tab.length) < MAXIMUM_CAPACITY) { 21 int rs = resizeStamp(n); 22 if (sc < 0) { 23 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || 24 sc == rs + MAX_RESIZERS || (nt = nextTable) == null || 25 transferIndex <= 0) 26 break; 27 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 28 transfer(tab, nt); 29 } 30 else if (U.compareAndSwapInt(this, SIZECTL, sc, 31 (rs << RESIZE_STAMP_SHIFT) + 2)) 32 transfer(tab, null); 33 s = sumCount(); } } } 1 CounterCell類只有一個volatile的long類型變量 2 if ((as = counterCells) != null ||!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) { private transient volatile long baseCount; 表示map中的元素個數但不一定對,需要加counterCells數組里存的數量 private transient volatile CounterCell[] counterCells; 用於輔組baseCount存儲元素個數 這個if主要干的事 2.1 counterCells初始是null,則運行后面的CAS對baseCount增加,但存在多線程可能會導致CAS增加失敗,則運行fullAddCount把數量值存到counterCells數組中 2.2 counterCells不null之前已經有過baseCount CAS失敗,這種能失敗大多代表並發不低,則在counterCells數組中使用隨機數隨便取一個索引位置之前記錄的數據進行數量累加, 如果在counterCells數組中CAS累加因多線程還是失敗這繼續fullAddCount fullAddCount中會觸發擴容等操作,因此直接return 13 if (check <= 1)//刪除或清理節點時是-1 插入索引首節點0 第二個節點是1 到這個if說明之前競爭大,現在競爭小counterCells更新成功了,那么在上述時不進行擴容的檢查 因此ConcurrentHashMap是有可能自己超過0.75的容量閥值而不擴容的 sumCount()正真的計算map元素數量的方法,baseCount和counterCells數組存的總和 17 if (check >= 0) { //刪除時不檢查 19 while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&(n = tab.length) < MAXIMUM_CAPACITY) { 檢查s即元素總數是否超過閥值 死循環防止多線程同時擴容在CAS操作sizeCtl時即else if中競爭失敗而跳過擴容檢查 21 int rs = resizeStamp(n); static final int resizeStamp(int n) { return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1)); } RESIZE_STAMP_BITS是靜態非常量16 按注釋是可以改變該值的,但在本類中是沒有對該值進行變更的 這個方法可以看出是生成與n有關的標記,且n不變的情況下生成的一定是一樣的 注意這里的或后面的計算,最終結果第15位肯定是1 22-30 else if sc=sizeCtl初始是閥值,sc肯定大於0 rs << (RESIZE_STAMP_SHIFT) + 2 //RESIZE_STAMP_SHIFT=16 由於rs第15位是1因此左移16,那么第32位肯定是1 即結果一定是一個比較大的負數 +2 也是有用的 左移之后 低16位肯定是0 +2只影響低16位的值 這里就把SIZECTL變成了一個比較大的負數,也就去前面提到的是負數的時候代表在擴容 SIZECTL和注釋有點沖突,注釋不是很准確,注釋的-(1+正在擴容線程數)是不對的 if(sc < 0) //在有線程正在擴容時sc就是負數了 if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||transferIndex <= 0) 比如就兩個線程同時運行這個方法,第二個線程競爭修改SIZECTL失敗,那么第二個線程因外層的死循環會運行到這 注意線程1時 SIZECTL=rs << (RESIZE_STAMP_SHIFT) + 2 線程2 條件成立直接break這就意味着不檢查了 (sc >>> RESIZE_STAMP_SHIFT) != rs || (nt = nextTable) == null ||transferIndex <= 0 這3個條件成立一個都代表擴容已經完成不需要再擴容,這里是防止太頻繁擴容消耗性能,並發非常高的時候存在超過閥值而沒擴容的可能 sc == rs + 1 ||sc == rs + MAX_RESIZERS 這兩條件是與RESIZE_STAMP_BITS這個靜態變量有關,類中沒提供修改的地方,但是注釋說可以是6-32,那么比如極端的等於32,那sc等於不移動然后+2,與rs+1相等的判斷是判斷擴容線程是否減少了一個,即沒有擴容的線程再運行說明擴容完成了 sc == rs + MAX_RESIZERS需要猜測變量等於多少的時候有用的判斷 現階段一般也不會修改這個變量 因此不太需要管這兩個判斷條件 if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) 在同一個n下其它線程還沒擴容完,這里幫助擴容 同時SIZECTL+1 表示增加一個擴容線程
四、擴容
包含了二個主要方法:transfer擴容、helpTransfer幫助擴容
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; //根據cpu個數找出擴容時的數組跨度大小即最小分組 16 32 64增長 if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; // subdivide range //普通擴容nextTab為空,競爭幫助擴容時有值,n<<1說明擴容2倍 if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; //當前轉移的位置,說明是逆序遷移 transferIndex = n; } int nextn = nextTab.length; //創建擴容的連接節點,節點hash是-1 ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; boolean finishing = false; // to ensure sweep before committing nextTab for (int i = 0, bound = 0;;) {//死循環檢查 Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing)//當前分組未轉移完||擴容全部完成 --i完成數組逆序遷移 advance = false; else if ((nextIndex = transferIndex) <= 0) {//TRANSFERINDEX為0表示無下一個分組了 i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) {//CAS TRANSFERINDEX 多線程時,advance死循環會找到不同的分組,以一個分組一個線程負責來進行擴容 bound = nextBound;//遷移時本分組的下界 i = nextIndex - 1;//上界 advance = false; } } if (i < 0 || i >= n || i + n >= nextn) {//全部遷移完或無分組 int sc; if (finishing) {//擴容完成 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1);//0.75 return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {//減少一個擴容線程 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)//根據前面addCount的+2這里就有-2 判斷是否是最后一個正在擴容的線程 return; finishing = advance = true;//准備結束 i = n; // recheck before commit 賦值n讓其進入本if進行是否結束的檢查 } } else if ((f = tabAt(tab, i)) == null)//原數組i位置無節點 advance = casTabAt(tab, i, null, fwd);//cas插入擴容節點 多線程插入失敗就循環重新檢查 else if ((fh = f.hash) == MOVED)//實際是檢查上一步為null時CAS是否成功 advance = true; // already processed 之后在上面的while中變更i后繼續 else { synchronized (f) {//首節點上鎖 if (tabAt(tab, i) == f) {//節點此時沒本remove等干掉 Node<K,V> ln, hn; if (fh >= 0) {//不是樹節點 //下面這段是在拆分本位置的鏈表 一拆為二(一鏈表正向一鏈表反向,0或非0誰在最后連續那它就是正向,另一個反向) map大小n是2的倍數 與計算只會有0和n本身 好想法 int runBit = fh & n; Node<K,V> lastRun = f; for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) ln = new Node<K,V>(ph, pk, pv, ln); else hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln);//拆后的鏈表1放在新數組i位置 setTabAt(nextTab, i + n, hn);//鏈表2放i+n位置 setTabAt(tab, i, fwd);//原數組i位置放擴容節點 advance = true;//i位置索引遷移完成 } else if (f instanceof TreeBin) { TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } //擴容后數量太少降為鏈表 不用樹 ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } } helpTransfer幫助擴容就不詳細說了 看懂transfer這個就基本不會有問題的
五、get獲取
public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) {//確定hash位置的索引處有值 if ((eh = e.hash) == h) {//hash相等 if ((ek = e.key) == key || (ek != null && key.equals(ek)))//首節點 return e.val; } else if (eh < 0)//非正常節點 真在擴容或者是樹節點 return (p = e.find(h, key)) != null ? p.val : null;//使用ForwardingNode或TreeNode的find方法查找元素 while ((e = e.next) != null) {//遍歷鏈表 if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
六、remove刪除
實際是replaceNode方法
final V replaceNode(Object key, V value, Object cv) { int hash = spread(key.hashCode()); for (Node<K,V>[] tab = table;;) {//死循環刪除 防止擴容等的影響 Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 || (f = tabAt(tab, i = (n - 1) & hash)) == null)//table不存在或者 索引位置無值 break; else if ((fh = f.hash) == MOVED)//正在擴容 tab = helpTransfer(tab, f);//幫助擴容 else { V oldVal = null; boolean validated = false; synchronized (f) {//鎖首節點 if (tabAt(tab, i) == f) { if (fh >= 0) {//鏈表 validated = true; for (Node<K,V> e = f, pred = null;;) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {//找到需要替換的節點 V ev = e.val; if (cv == null || cv == ev || (ev != null && cv.equals(ev))) {//傳遞替換的新值是null 或者新值和原值相等 oldVal = ev; if (value != null)//有原值 替換值 e.val = value; else if (pred != null)//說明不是鏈表首節點 刪除 即改變前一節點的next pred.next = e.next; else//鏈表首節點 刪除 setTabAt(tab, i, e.next); } break; } //兩句循環鏈表 pred = e; if ((e = e.next) == null) break; } } else if (f instanceof TreeBin) {//首節點是紅黑樹 validated = true; TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> r, p; if ((r = t.root) != null && (p = r.findTreeNode(hash, key, null)) != null) {//查找節點 V pv = p.val; if (cv == null || cv == pv || (pv != null && cv.equals(pv))) { oldVal = pv; if (value != null) p.val = value;//替換 else if (t.removeTreeNode(p))//刪除 setTabAt(tab, i, untreeify(t.first)); } } } } } if (validated) {//有刪除節點 if (oldVal != null) {//有原值 if (value == null)//無新值 表示是刪除 addCount(-1L, -1);//減少元素計數-1 return oldVal; } break; } } } return null; } public void clear() { long delta = 0L; // negative number of deletions int i = 0; Node<K,V>[] tab = table; while (tab != null && i < tab.length) { int fh; Node<K,V> f = tabAt(tab, i); if (f == null)//索引無值 ++i; else if ((fh = f.hash) == MOVED) {//正在擴容 幫助擴容后重置索引刪除 tab = helpTransfer(tab, f); i = 0; // restart } else { synchronized (f) {//上鎖 if (tabAt(tab, i) == f) { Node<K,V> p = (fh >= 0 ? f : (f instanceof TreeBin) ? ((TreeBin<K,V>)f).first : null); while (p != null) {//計數刪除元素個數 --delta; p = p.next; } setTabAt(tab, i++, null);//直接在table中拋棄整個鏈表或樹 } } } } if (delta != 0L) addCount(delta, -1);//減少計數 }
七、其它
1.推薦最好是多學習和理解紅黑樹 包括HashMap不帶鎖和ConcurrentHashMap帶鎖的紅黑樹的不同
2.通過本類等看出有不少移位、安位與或等的使用 而普通開發人員可能用的比較少 想進階可以有所嘗試