Java同步數據結構之ConcurrentHashMap


前言

這是Java並發包最后一個集合框架的數據結構,其復雜程度也較以往任何數據結構復雜的多,顧名思義ConcurrentHashMap是線程安全版本的HashMap,總所周知HashMap是非線程安全的,若直接用於多線程並發環境將會出現很多問題,比如數據丟失,甚至某些操作陷入死循環導致CPU利用率100%等情況。除了ConcurrentHashMap能夠保證線程安全之外,還可以通過兩種方法獲得線程安全的Map結構的實例,要么使用Collections.synchronizedMap(map)返回使用synchronized包裝過的線程安全的map,要么直接使用Hashtable代替。這兩種方法對於線程安全的支持本質上並沒有太大區別,前者僅僅是在每一個synchronized方法內部直接使用傳入的非線程安全的map實例本身,而后者hashtable也是在每一個有線程安全問題的方法上加synchronized鎖,並采用和hashMap相同的實現原理重新實現。

關於JDK8以前的HashMap與ConcurrentHashMap

在JDK8以前的HashMap是通過數組 + 鏈表的數據結構實現的,即它首先維護一個數組table,數組中的每一個元素都是一個包含鍵值對以及一個next指針的Entry對象,通過對Key對象的hashCode做一系列運算的結果再與數組的長度取模得出一個映射到數組table的下標(即索引,或者稱之為“槽”),從而將鍵值對映射到數組的不同槽位,然而通常具有相同hashCode值的不同Key很大程度上將會映射到數組的同一個槽位,這可以稱之為“碰撞”,這時候為了能夠存儲這種鍵值,HashMap采用了以鏈表的方式來組織具有相同hash值的不同Key(具體來說,當插入發生碰撞時,新節點會重新占據數組的對應槽位,該槽位原來的Entry節點被擠下去成為新節點的后繼節點,這就是所謂的“頭插法”,即在鏈表的頭部插入),這樣一來,具有相同hash值的不同Key雖然對應到了數組中的同一個槽位,但是卻位於鏈表上的不同節點上,下面是一個JDK8以前的HashMap的內部結構示意圖:

JDK8以前的HashMap的這種設計和實現都很簡單,但是當具體相同Hash值的Key較多的時候,鏈表的長度將會很長,導致查詢效率極其低下,畢竟鏈表的查詢只能從頭部一個一個的往后遍歷比較。 

HashMap內部有一個值為0.75的默認加載因子loadFactor,該加載因子也可通過構造函數傳入指定的值,其作用就是當數組table的使用率超過75%時,對數組長度進行擴容,還有一個變量threshold則表示下一次觸發擴容的臨界數組長度,即threshold = table.length * loadFactor,而。每一次擴容之后數組的長度是原來的2倍。數組是一個創建后就長度不可變更的數據結構,要對數組進行擴容,只有創建新的數組,然后將舊數組中的元素一個個拷貝過去,擴容是一個比較耗時的過程,也是當HashMap運用到多線程並發環境下線程不安全的主要誘因,擴容過程中依然采用頭插法組織鏈表元素,所以擴容之后鏈表中的節點順序將會發生反轉,並且由於將Key影射到不同數組槽位的時候需要與數組長度取模運算,當數組長度發生變化之后具有相同hash值的Key也有可能會被映射到不同的槽位,當多個線程同時進行擴容操作時,CPU時間片的分配與HashMap的擴容機制一結合,就產生了數據丟失甚至構成環形鏈表的可能。環形鏈就會造成某些操作陷入死循環導致CPU利用率100%等情況,關於HashMap線程不安全的詳細分析過程可用參考JDK1.7和JDK1.8中HashMap為什么是線程不安全的?

ConcurrentHashMap

JDK8之前的ConcurrentHashMap是采用分段鎖的方式實現了HashMap的線程安全版本,顧名思義數組table中的某一部分使用同一個鎖保證線程安全,它內部定義了一個Segment數組,Segment繼承了ReentrantLock,所以一個Segment就是一把鎖,每一個Segment內部又持有一個table數組,這樣就相當於將HashMap種的table數組拆分成若干個分段數組,每一個Segment管理table數組的一個區間,每一個table數組還是按照HashMap的實現方式實現即數組+鏈表,新節點的插入還是按頭插法進入。這種方式是一種粗粒度的並發控制方案,當兩個操作位於不同的兩個段時可以不受線程安全的影響,但是位於同一個段的不同數組槽位的更新操作依然會受到並發控制的互斥訪問限制,所以吞吐量並沒有提高太多,但是任何讀操作不存在競爭,即是讀寫分離的。下面的一個ConcurrentHashMap的內部結構示意圖:

Segment數組的每一個Segment元素都對應一個table數組,同時也共享同一把互斥鎖,table數組中的每一個元素都是一個HashEntry對象,HashEntry持有鍵值對,hash值以及指向鏈表結構的下一個節點next指針。

JDK8的HashMap

從JDK8開始,HashMap和ConcurrentHashMap的實現都做了大的調整,針對HashMap主要圍繞解決長鏈表下查詢緩慢的情況進行了改進,其主要變化就是將長鏈表換成了紅黑樹(一種平衡二叉樹),因此JDK8的HashMap采用了數組+短鏈表+紅黑樹的數據結構實現,在鏈表的長度超過8個節點的時候,將會將鏈表通過旋轉的方式直接轉換成紅黑樹(稱之為樹化),紅黑樹的引入在查詢效率上至少提升了2倍以上。以下是其內部結構示意圖:

Java8HashMap的table數組元素是由一個個Node或TreeNode節點組成,對於紅黑樹對應的數組槽位中始終存儲其根節點,對於鏈表結構,每一次新元素都在鏈表尾部插入,即“尾插法”;對於紅黑樹,每一次新插入節點可能都會引起紅黑樹的旋轉從而導致結構變化,但其根節點始終存儲在table數組的槽位中。

除了內部數據結構的變化,HashMap其它特性例如加載因子,擴容等都與JDK8以前的版本差不多,但JDK8的HashMap對以前版本擴容可能造成環形鏈的問題進行了修復,因此當再次用於多線程並發環境,JDK8的HashMap將不會導致CPU%100的情況,但依然可能存在數據覆蓋的問題出現,因此依然不是線程安全的。多線程環境下依然需要使用ConcurrentHashMap,它的JDK8版本下也做了大的調整。

JDK8的ConcurrentHashMap

分析它才是本文的最終目的,首先JDK8的ConcurrentHashMap內部數據結構基本與JDK8的HashMap一致,也是基於數組 + 短鏈表 + 紅黑樹的方式設計實現的。因為JDK8以前的分段鎖思想是一種粗粒度的線程安全實現,而JDK8的ConcurrentHashMap則將分段鎖的概念細划到單個的數組槽位上,即一個table數組槽位一個鎖,因此只有更新操作具有相同hash值得線程之間才會存在競爭,任何讀取操作依然不涉及競爭問題,仍然是讀寫分離的。JDK8拋棄分段鎖不但節省了不必要的空間消耗,而且用回了傳統的synchronized關鍵字的重量級鎖,畢竟現在的JDK對其優化已經比較好了。

ConcurrentHashMap中table數組在存儲紅黑樹的根節點時,使用了一個TreeBin的類封裝TreeNode,因此不再像JDK8那樣直接在數組的槽位中存放紅黑樹的根節點,而是一個攜帶根節點的TreeBin實例。另外,該類為了保證與以前的版本兼容,保留了擁有加載因子loadFactor和concurrencyLevel參數的構造函數,以及用於兼容序列化的Segment類,繼承AbstractMap抽象類也是對兼容性的支持,除此之外並無其它目的。在構造函數傳入的加載因子僅僅只是用於針對初始化給定容量的內部數組從而滿足可以在放入給定數量的元素之前不觸發擴容操作。其后,內部的加載因子還是默認的0.75,因為擴容操作是一個開銷很大的過程,因此若能夠在創建Map實例的時候確定大概需要的空間,將減少甚至消除擴容造成的開銷。

ConcurrentHashMap在JDK8中對擴容操作進行了精妙的設計實現,任何讀寫線程在發現需要擴容或正在擴容時都會奉獻雷鋒精神,加入到輔助擴容的行列中,畢竟人多力量大,從而縮短擴容過程的時間開銷,而且其內部的巧妙設計會在擴容過程中盡可能少的拷貝節點,根據Java Doc的描述,當table數組長度擴張一倍,只有大約六分之一的元素需要克隆。對於ConcurrentHashMap的理解我感覺比較有難度,特別是紅黑樹的轉換腦袋感覺都不夠用了,只能做粗略的分析。

 ConcurrentHashMap對size()方法也進行了精心的設計,它采用了類似高並發統計工具LongAdder的原理,使用baseCount + CounterCell數組的形式解決高並發更新同一個變量的線程爭用問題,對產生競爭的線程將計數分散到哈希數組中的不同單元格中,而不需要在調用size方法時才遍歷統計。關於LongAdder的原理可用查閱高並發原子累加器Striped64及其實現類LongAdder&LongAccumulator

ConcurrentHashMap中定義了一些特殊的節點,例如鏈表節點(hash值 > 0 ),紅黑樹節點(hash值為-1),轉移節點ForwardingNodes(hash值為-1)標記該數組槽位已經被遷移到擴容后的table數組中,舜態節點ReservationNodes(hash值為-3)標記是一種用於lumdba表達式計算的臨時節點,記住這些定義對於源碼理解事半功倍。

通過構造函數,可見不僅沒有持有加載引子,也沒有持有threshold擴容閾值了,多了一個sizeCtl成員變量,該變量不同的值代表不同的含義: 默認值0表示數組table還未初始化,-1表示正在初始化table數組,-(1+n)表示有n個線程正在輔助一起擴容,> 0表示初始化數組完成,並且表示下一次觸發擴容的數組占用閾值,例如現在數組長度是128,則sizeCtl 就是96,剛好是0.75的加載因子。sizeCtl還保存有擴容標記,確保調整大小不會重復執行。

ConcurrentHashMap擴容支持最多MAX_RESIZERS個線程並行進行以縮短時間,並且每一個擴容線程都按一個步長(默認是MIN_TRANSFER_STRIDE,即16)從數組末尾往頭部分配一段還沒被擴容的多個槽位,而不是一個線程一個槽位, transferIndex就用於指示現在擴容線程已經占據從數組末尾往頭部的第幾個槽位了,后來加入的擴容線程只能從該位置往前分配一段未完成擴容的槽位進行。

部分源碼分析

  1  ------------------------常量---------------------------
  2  
  3  //最大的表容量。
  4  private static final int MAXIMUM_CAPACITY = 1 << 30;
  5  
  6  //默認初始表容量。
  7  private static final int DEFAULT_CAPACITY = 16;
  8  
  9  //最大的數組大小(非2次冪)。被toArray和toArray(T[] a)方法需要。
 10  static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
 11  
 12  //默認並發級別。未使用,僅僅用於與該類的以前版本兼容而定義。
 13  private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
 14  
 15  //負載因子,貌似也沒使用,在構造函數中指定此值只影響初始表容量。
 16  private static final float LOAD_FACTOR = 0.75f;
 17  
 18  //鏈表轉換成紅黑樹(樹化)的size閾值
 19  static final int TREEIFY_THRESHOLD = 8;
 20  
 21  //紅黑色轉換成鏈表(反樹化)的size閾值,用在調整大小的時候。
 22  static final int UNTREEIFY_THRESHOLD = 6;
 23  
 24  //最小的表容量,該值至少是 4倍 TREEIFY_THRESHOLD以避免調整大小和樹化閾值的沖突
 25  static final int MIN_TREEIFY_CAPACITY = 64;
 26  
 27  //擴容線程的最小跨度,即每一個線程至少分配16個槽位進行擴容
 28  private static final int MIN_TRANSFER_STRIDE = 16;
 29  
 30  //用於在sizeCtl中生成擴容標記的比特位數
 31  private static int RESIZE_STAMP_BITS = 16;
 32 
 33  //用於輔助調整大小的最大線程數
 34  private static final int MAX_RESIZERS = (1 << (32 - RESIZE_STAMP_BITS)) - 1;
 35  
 36  //在sizeCtl中標記位的偏移
 37  private static final int RESIZE_STAMP_SHIFT = 32 - RESIZE_STAMP_BITS;
 38  
 39  /*
 40  * Encodings for Node hash fields.節點哈希字段的編碼
 41  */
 42  static final int MOVED     = -1; // 轉發節點的哈希值
 43  static final int TREEBIN   = -2; // 紅黑樹的根節點的哈希值
 44  static final int RESERVED  = -3; // 臨時保留節點的哈希
 45  static final int HASH_BITS = 0x7fffffff; // 普通鏈表節點哈希的可用位
 46  
 47  /* ---------------- 節點 -------------- */
 48 
 49  //鍵值對的條目,只用於遍歷時自讀操作,存在hash值為負值和鍵值為null的特殊節點。
 50  static class Node<K,V> implements Map.Entry<K,V> {
 51         final int hash;  //哈希值
 52         final K key;     //
 53         volatile V val;    //
 54         volatile Node<K,V> next; //下一個節點指針
 55  
 56  .....
 57  }
 58  
 59  //紅黑樹節點
 60  static final class TreeNode<K,V> extends Node<K,V> {
 61         TreeNode<K,V> parent;  // red-black tree links
 62         TreeNode<K,V> left;
 63         TreeNode<K,V> right;
 64         TreeNode<K,V> prev;    // needed to unlink next upon deletion
 65         boolean red;
 66  
 67   .....
 68  }
 69  
 70  //存放在tale數組槽位種的紅黑樹根節點包裝類
 71  static final class TreeBin<K,V> extends Node<K,V> {
 72         TreeNode<K,V> root;
 73         volatile TreeNode<K,V> first;
 74         volatile Thread waiter;
 75         volatile int lockState;
 76         // values for lockState
 77         static final int WRITER = 1; // set while holding write lock
 78         static final int WAITER = 2; // set when waiting for write lock
 79         static final int READER = 4; // increment value for setting read lock
 80  
 81    .....
 82  }
 83  
 84  //標記已經被擴容轉移的槽位,持有新數組的引用,可通過它的find方法讓get操作可用從該節點過渡到新數組中去搜索
 85  static final class ForwardingNode<K,V> extends Node<K,V> {
 86         final Node<K,V>[] nextTable;
 87         ForwardingNode(Node<K,V>[] tab) {
 88             super(MOVED, null, null, null);
 89             this.nextTable = tab;
 90         }
 91         
 92       .....
 93  }    
 94  /* ---------------- 靜態工具 -------------- */
 95  
 96  //通過哈希值h計算對應的數組索引
 97  static final int spread(int h) {
 98     return (h ^ (h >>> 16)) & HASH_BITS;
 99  }
100  //若x實現了Comparable接口則返回其類對象,否則返回null
101  static Class<?> comparableClassFor(Object x) 
102  
103  //若x與kc類型匹配,返回k.compareTo(x) ,否則返回0
104  static int compareComparables(Class<?> kc, Object k, Object x) {
105  
106  /* ---------------- table元素訪問 -------------- */
107 
108  //volatile讀取指定數組索引位置的元素
109  static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
110     return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
111  }
112  
113  //CAS更新指定數組索引位置的元素
114  static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
115                                     Node<K,V> c, Node<K,V> v) {
116     return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
117  }
118  
119  //volatile設置指定數組索引位置的元素
120  static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
121     U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
122  }
123  
124  
125  /* ---------------- 字段 -------------- */
126  
127  //table數組,在第一次插入元素時才初始化,大小總是2的冪。由迭代器直接訪問。
128  transient volatile Node<K,V>[] table;
129  
130  //調整大小時才不為空的臨時新數組。
131  private transient volatile Node<K,V>[] nextTable;
132  
133  //基礎計數器,主要用於沒有競爭時,也可用用作初始化表產生競爭的回退,通過CAS更新
134  private transient volatile long baseCount;
135  
136  //用於表初始化和調整大小的控制,當為負值表正在初始化或者調整大小:-1 表示正在初始化,-(1 + 輔助調整大小的線程數)表示正在調整大小。
137  //當table等於null時該值等於用於初始創建表時的初始大小,默認情況為0.初始化完成之后,保留下一個元素的count值,用於調整大小。
138  private transient volatile int sizeCtl;
139  
140  //指示擴容時已經處理到的槽位位置(從數組末尾往前處理,所以最開始transferIndex等於數組的長度)
141  private transient volatile int transferIndex;
142  
143  //自旋鎖,用於計數元素個數對CounterCells槽位的鎖定或擴容
144  private transient volatile int cellsBusy;
145  
146  //用於計數元素個數的單元格計數器,非空時大小為2的冪
147  private transient volatile CounterCell[] counterCells;
View Code

首先是一些常量,Node、TreeNode、TreeBin、ForwardingNode類定義,一些靜態工具和字段的定義,這部分只是為實現真正的邏輯做准備,看看即可。

部分構造方法

 1 //創建一個新的初始容量為16的空表
 2  public ConcurrentHashMap() {
 3  }
 4  
 5  //創建一個新的初始大小可容納指定數量元素而不需要動態調整大小的空表。
 6  public ConcurrentHashMap(int initialCapacity) {
 7     this.sizeCtl = ....
 8  }
 9  
10  //創建給定初始容量和負載因子的空表
11  public ConcurrentHashMap(int initialCapacity,float loadFactor) {
12     this.sizeCtl = ....
13  }
View Code

ConcurrentHashMap提供了5種構造方法,除了無參構造方法什么也沒做,其它方法都涉及到初始化數組容量和根據加載因子確定下一次的擴容閾值sizeCtl,loadFactor和concurrencyLevel僅僅用於初始化table數組的長度以及對老版本的兼容,ConcurrentHashMap內部並不會持有它們,其內部加載因子還是默認值0.75.

添加元素---put

 1 //外部接口方法 
 2 public V put(K key, V value) {
 3     return putVal(key, value, false);
 4 }
 5 //內部實現方法
 6 final V putVal(K key, V value, boolean onlyIfAbsent) {
 7 // key和value都不能為null
 8 if (key == null || value == null) throw new NullPointerException();
 9 // 計算hash值
10 int hash = spread(key.hashCode());
11 // 要插入的元素所在桶的元素個數
12 int binCount = 0;
13 
14 for (Node<K,V>[] tab = table;;) {// 自旋
15     Node<K,V> f; int n, i, fh;
16     if (tab == null || (n = tab.length) == 0)
17         // 如果table數組未初始化或者長度為0,則初始化
18         tab = initTable();
19     else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
20         // 如果要插入的元素所在的槽位還沒有被占據,則把這個元素直接放入到這個槽位中
21         if (casTabAt(tab, i, null,
22                 new Node<K,V>(hash, key, value, null)))
23             // 如果使用CAS插入元素時,發現已經有元素了,則進入下一次循環,重新操作
24             // 如果使用CAS插入元素成功,則break跳出循環,流程結束
25             break;                   // no lock when adding to empty bin
26     }
27     else if ((fh = f.hash) == MOVED)
28         // 如果要插入的元素所在的槽位的第一個元素的hash是MOVED,表示當前正在進行擴容遷移元素,則當前線程幫忙一起遷移元素
29         tab = helpTransfer(tab, f);
30     else {
31         // 如果這個槽位不為空且沒有進行擴容,則鎖住這個槽位(分段鎖概念)
32         // 並查找要插入的元素是否在這個槽位中
33         // 存在,則替換值(onlyIfAbsent=false)
34         // 不存在,則插入到鏈表結尾或紅黑樹中
35         V oldVal = null;
36         synchronized (f) {
37             // 再次確認第一個元素是否有變化,如果有變化則進入下一次循環,重試
38             if (tabAt(tab, i) == f) {
39                 
40                 if (fh >= 0) { //表示鏈表結構
41                     binCount = 1;
42                     // 遍歷鏈表
43                     for (Node<K,V> e = f;; ++binCount) {
44                         K ek;
45                         if (e.hash == hash &&
46                                 ((ek = e.key) == key ||
47                                         (ek != null && key.equals(ek)))) {
48                             // 如果找到了這個元素,則更新value(onlyIfAbsent=false)
49                             oldVal = e.val;
50                             if (!onlyIfAbsent)
51                                 e.val = value;
52                             break; //退出
53                         }
54                         Node<K,V> pred = e;
55                         if ((e = e.next) == null) {
56                             // 如果到鏈表尾部還沒有找到該Key對應的元素
57                             // 就把它插入到鏈表結尾並退出循環
58                             pred.next = new Node<K,V>(hash, key,
59                                     value, null);
60                             break;
61                         }
62                     }
63                 }
64                 else if (f instanceof TreeBin) {//表示是紅黑樹結構
65                     Node<K,V> p;
66                     binCount = 2;
67                     // 調用紅黑樹的插入方法插入元素
68                     // 如果成功插入則返回null
69                     // 否則返回尋找到的節點
70                     if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
71                             value)) != null) {
72                         // 如果找到了這個元素,則更新value(onlyIfAbsent=false)
73                         oldVal = p.val;
74                         if (!onlyIfAbsent)
75                             p.val = value;
76                     }
77                 }
78             }
79         }
80         // 如果binCount不為0,說明成功插入了元素或者尋找到了元素
81         if (binCount != 0) {
82             // 如果鏈表元素個數達到了8,則嘗試樹化
83             // 因為上面把元素插入到樹中時,binCount只賦值了2,並沒有計算整個樹中元素的個數
84             // 所以不會重復樹化
85             if (binCount >= TREEIFY_THRESHOLD)
86                 treeifyBin(tab, i);
87             // 如果要插入的元素已經存在,則返回舊值
88             if (oldVal != null)
89                 return oldVal;
90             // 退出外層大循環,流程結束
91             break;
92         }
93     }
94     }
95     // 成功插入元素,元素個數加1(是否要擴容在這個里面)
96     addCount(1L, binCount);
97     // 成功插入元素返回null
98     return null;
99 } 
View Code

單純就這一段邏輯如果不考慮紅黑樹的插入方法putTreeVal,以及樹化的方法treeifyBin的話,則該段邏輯其實很簡單,大致分四個部分:1,計算出數組槽位索引,確定該位置的情況,2,該位置為空直接占據即可,3,該位置是鏈表結構,使用尾插法鏈接到鏈表結尾,若該位置是紅黑樹結構,則調用紅黑樹的插入方法插入該元素,4,若是鏈表結構並且鏈表長度達到了閾值8,則將鏈表樹化,即轉換成紅黑樹結構。

考慮到紅黑樹的插入邏輯比較復雜,我們這里暫時不去探究其過程,僅做簡單的邏輯梳理。但要明白其中大致的關鍵,若是鏈表采用的尾插法,若是紅黑樹,插入節點之后紅黑樹為了維持平衡將采用旋轉和重新作色的方式調整紅黑樹的結構(根節點可能會變化),但無論怎么調整,最終該hash值對應的數組槽位中的TreeBin實例都將持有該紅黑樹的根節點。

從上面的邏輯我們還可用發現,ConcurrentHashMap不允許Key和Value為null。並且在實現的過程中,采用了synchronized鎖定了該hash值對應的table數組的單個槽位,這同樣也可以看作是一種分段鎖,只是比起JDK8以前的分段鎖一次鎖定多個數組槽位,JDK8這種實現分段鎖更精細化,僅僅只需要鎖定一個真正操作的槽位。

獲取元素---get

 1 //外部接口方法 
 2 public V get(Object key) {
 3     Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
 4     // 計算hash
 5     int h = spread(key.hashCode());
 6     // 如果元素所在的槽位存在且里面有元素
 7     if ((tab = table) != null && (n = tab.length) > 0 &&
 8             (e = tabAt(tab, (n - 1) & h)) != null) {
 9         // 如果第一個元素就是要找的元素,直接返回
10         if ((eh = e.hash) == h) {
11             if ((ek = e.key) == key || (ek != null && key.equals(ek)))
12                 return e.val;
13         }
14         else if (eh < 0)
15             // hash小於0,說明是紅黑樹樹或者正在擴容遷移
16             // 調用紅黑樹的查找邏輯
17             return (p = e.find(h, key)) != null ? p.val : null;
18 
19         //否則是鏈表結構,則遍歷整個鏈表尋找元素
20         while ((e = e.next) != null) {
21             if (e.hash == h &&
22                     ((ek = e.key) == key || (ek != null && key.equals(ek))))
23                 return e.val; //找到返回其value
24         }
25     }
26     return null;
27 }
View Code

同樣,若不考慮紅黑樹的查找邏輯,該段代碼的邏輯非常簡單,就是根據hash值計算數組的槽位索引,根據該索引對應的槽位的數據結構,按不同的查找實現,是鏈表則遍歷鏈表查找,是紅黑樹則調用紅黑樹的查找邏輯。唯一需要注意的是,它並沒有區分現在是否處於擴容遷移的狀態,這是因為ConcurrentHashMap的擴容在遷移元素的過程中依然會確保各個數據結構的查找邏輯能夠通過ForwardingNodes轉到新數組中去繼續進行查找。

刪除元素---remove

  1 //外部接口方法 
  2 public V remove(Object key) {
  3     // 調用替換節點方法
  4     return replaceNode(key, null, null);
  5 }
  6 
  7 //實現方法
  8 final V replaceNode(Object key, V value, Object cv) {
  9     // 計算hash
 10     int hash = spread(key.hashCode());
 11     // 自旋
 12     for (Node<K,V>[] tab = table;;) {
 13         Node<K,V> f; int n, i, fh;
 14         if (tab == null || (n = tab.length) == 0 ||
 15                 (f = tabAt(tab, i = (n - 1) & hash)) == null)
 16             // 如果目標key所在的槽位不存在,直接跳出循環返回null
 17             break;
 18         else if ((fh = f.hash) == MOVED)
 19             // 如果正在擴容中,協助擴容
 20             tab = helpTransfer(tab, f);
 21         else {
 22             V oldVal = null;
 23             // 標記是否處理過
 24             boolean validated = false;
 25             synchronized (f) {
 26                 // 再次驗證當前鎖定的節點是槽位中第一個元素
 27                 if (tabAt(tab, i) == f) {
 28                     if (fh >= 0) {
 29                         // 是鏈表結構
 30                         validated = true;
 31                         // 遍歷鏈表尋找目標節點
 32                         for (Node<K,V> e = f, pred = null;;) {
 33                             K ek;
 34                             if (e.hash == hash &&
 35                                     ((ek = e.key) == key ||
 36                                             (ek != null && key.equals(ek)))) {
 37                                 // 找到了目標節點
 38                                 V ev = e.val;
 39                                 // 檢查目標節點舊value是否等於cv
 40                                 if (cv == null || cv == ev ||
 41                                         (ev != null && cv.equals(ev))) {
 42                                     oldVal = ev;
 43                                     if (value != null)
 44                                         // 如果value不為空則替換舊值
 45                                         e.val = value;
 46                                     else if (pred != null)
 47                                         // 如果前置節點不為空
 48                                         // 修改后繼指針以刪除當前節點
 49                                         pred.next = e.next;
 50                                     else
 51                                         // 如果前置節點為空
 52                                         // 說明是槽位中第一個元素,直接將后繼節點提升上來占據該槽位
 53                                         setTabAt(tab, i, e.next);
 54                                 }
 55                                 break;
 56                             }
 57                             pred = e;
 58                             // 遍歷到鏈表尾部還沒找到元素,跳出循環
 59                             if ((e = e.next) == null)
 60                                 break;
 61                         }
 62                     }
 63                     else if (f instanceof TreeBin) {
 64                         // 是紅黑樹結構
 65                         validated = true;
 66                         TreeBin<K,V> t = (TreeBin<K,V>)f;
 67                         TreeNode<K,V> r, p;
 68                         // 遍歷樹找到了目標節點
 69                         if ((r = t.root) != null &&
 70                                 (p = r.findTreeNode(hash, key, null)) != null) {
 71                             V pv = p.val;
 72                             // 檢查目標節點舊value是否等於cv
 73                             if (cv == null || cv == pv ||
 74                                     (pv != null && cv.equals(pv))) {
 75                                 oldVal = pv;
 76                                 if (value != null)
 77                                     // 如果value不為空則替換舊值
 78                                     p.val = value;
 79                                 else if (t.removeTreeNode(p))
 80                                     // 如果value為空則刪除元素
 81                                     // 如果刪除后樹的元素個數較少則退化成鏈表
 82                                     // t.removeTreeNode(p)這個方法返回true表示刪除節點后樹的元素個數較少
 83                                     setTabAt(tab, i, untreeify(t.first));
 84                             }
 85                         }
 86                     }
 87                 }
 88             }
 89             // 如果處理過,不管有沒有找到元素都返回
 90             if (validated) {
 91                 // 如果找到了元素,返回其舊值
 92                 if (oldVal != null) {
 93                     // 如果要替換的值為空,元素個數減1
 94                     if (value == null)
 95                         addCount(-1L, -1);
 96                     return oldVal;
 97                 }
 98                 break;
 99             }
100         }
101     }
102     // 沒找到元素返回空
103     return null;
104 }
View Code

同樣,若不考慮紅黑樹的查找、刪除、反樹化邏輯,該段代碼的邏輯非常簡單,根據索引找到數組槽位的節點,若是鏈表就通過修改next指針刪除節點,如果刪除的是占據槽位的鏈表頭節點則將其后繼節點提升上來占據數組該槽位;若是紅黑樹就使用紅黑樹的方法刪除節點,刪除節點之后若樹的節點足夠少則需要反樹化,即重新轉換成鏈表。

從實現可用看出,刪除元素的實現是先將節點的value置為null,然后在進行節點移除。其實現方法同樣的替換節點值的實現。

擴容

首先,是否需要擴容一般是在累計元素個數的時候進行確定的,即addCount方法,我們這里就不具體分析該方法了,總之當table數組的使用率達到sizeCtl指示的擴容閾值時就會觸發擴容,擴容時sizeCtl高16位存儲擴容郵戳(resizeStamp),低位存儲擴容線程數加1(1+nThreads),其它線程在累計元素之后通過檢查sizeCtl的擴容標記發現正在進行擴容的話也會加入的擴容行列中來,當然擴容的線程個數也是有控制的。

在上面添加元素的實現中,若hash指示的數組槽位中的節點標志正在進行擴容,也會調用輔助擴容方法幫助一起擴容,即helpTransfer方法,在該線程加入到擴容行列中之前,會通過標記的ForwardingNode節點拿到新數組的引用,然后對sizeCtl中記錄的擴容線程數加+,最后再調用真正的擴容遷移元素的實現方法transfer(Node<K,V>[] tab, Node<K,V>[] nextTab):

  1  
  2 private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  3     int n = tab.length, stride;
  4     if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  5         stride = MIN_TRANSFER_STRIDE; // subdivide range
  6     if (nextTab == null) {            // initiating
  7         // 如果nextTab為空,說明是第一個開始擴容遷移元素的線程
  8         // 就創建一個新數組
  9         try {
 10             // 新數組是原來的兩倍
 11             @SuppressWarnings("unchecked")
 12             Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
 13             nextTab = nt;
 14         } catch (Throwable ex) {      
 15             // 數組擴容的時候有可能出現OOME,這時需要將sizeCtl設置為Integer.MAX_VALUE,用以表示這個異常
 16             sizeCtl = Integer.MAX_VALUE;
 17             return;
 18         }
 19         nextTable = nextTab;
 20         //因為擴容時候的元素遷移是從數組最末端的元素開始的,所以遷移的時候下標是遞減的,從下面的`--i`就能看出來了
 21         transferIndex = n;
 22     }
 23     // 新數組大小
 24     int nextn = nextTab.length;
 25     // 新建一個ForwardingNode類型的節點,並把新數組的引用存儲在里面,用於標記該被遷移的槽位
 26     ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
 27     boolean advance = true; //指示該線程負責的槽位段中每一個槽位是否完成遷移,初始為true僅僅是為了能進入while
 28     boolean finishing = false; // 表示整個table里面的所有元素是否遷移完畢
 29     for (int i = 0, bound = 0;;) {//自旋
 30         Node<K,V> f; int fh;
 31         //確定當前線程負責的槽位段,並更新transferIndex指示已經遷移到了數組的哪個位置
 32         while (advance) {
 33             int nextIndex, nextBound;
 34             // 倒序遷移舊table數組元素的下標已達到槽的邊界,或者整個table已經遷移完畢,說明遷移完成了
 35             if (--i >= bound || finishing)
 36                 advance = false;
 37             //擴容的腳本已經從最末端走到起始位置了,說明遷移完成了
 38             else if ((nextIndex = transferIndex) <= 0) {
 39                 i = -1;
 40                 advance = false;
 41             }
 42             //根據步長設置本次線程遷移的槽位段的邊界transferIndex
 43             else if (U.compareAndSwapInt
 44                     (this, TRANSFERINDEX, nextIndex,
 45                             nextBound = (nextIndex > stride ?
 46                                     nextIndex - stride : 0))) {
 47                 bound = nextBound;
 48                 i = nextIndex - 1;
 49                 advance = false;
 50             }
 51         }
 52         
 53         //下面開始一個槽位一個槽位的遷移
 54         if (i < 0 || i >= n || i + n >= nextn) {
 55             int sc;
 56             if (finishing) { // 整個map所有槽位的元素都遷移完成了,由最后一個完成的線程執行
 57                
 58                 // 更新table數組指向新數組,設置下一次擴容的閾值
 59                 nextTable = null;
 60                 table = nextTab;
 61                 sizeCtl = (n << 1) - (n >>> 1);
 62                 return;
 63             }
 64             if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
 65                 // 當前線程負責的槽位段全部完成,把擴容線程數-1
 66                 if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
 67                     return;
 68 
 69                 // finishing為true才會走到上面的if條件
 70                 finishing = advance = true;
 71                 // i重新賦值為n
 72                 // 這樣會再重新遍歷一次table數組,看看是不是都遷移完成了
 73                 // 也就是第二次遍歷都會走到下面的(fh = f.hash) == MOVED這個條件
 74                 i = n; // recheck before commit
 75             }
 76         }
 77         else if ((f = tabAt(tab, i)) == null)
 78             // 如果對應的槽位為空,直接放入ForwardingNode標記該槽位已遷移
 79             advance = casTabAt(tab, i, null, fwd);
 80         else if ((fh = f.hash) == MOVED)
 81             // 說明它是ForwardingNode節點,也就是該槽位已遷移
 82             advance = true; // already processed
 83         else {
 84             // 鎖定該槽位並遷移元素
 85             synchronized (f) {
 86                 // 再次判斷當前槽位第一個元素是否有修改
 87                 // 也就是可能其它線程先一步遷移了元素
 88                 if (tabAt(tab, i) == f) {
 89                     // 把一個鏈表分化成兩個鏈表
 90                     // 規則是槽位中各元素的hash與數組長度n進行與操作
 91                     // 等於0的放到低位鏈表(low)中,不等於0的放到高位鏈表(high)中
 92                     // 其中低位鏈表遷移到新數組中的位置相對舊數組不變
 93                     // 高位鏈表遷移到新數組中的位置正好是其在舊數組的位置加舊數組的長度n
 94                     // 這也正是為什么擴容時容量要變成兩倍的原因
 95                     Node<K,V> ln, hn;
 96                     if (fh >= 0) {
 97                         // 第一個元素的hash值大於等於0
 98                         // 說明該槽位中元素是以鏈表形式存儲的
 99                         // 這里與HashMap遷移算法基本類似
100                         // 唯一不同的是多了一步尋找lastRun
101                         // 這里的lastRun是提取出鏈表后面不用處理再特殊處理的子鏈表
102                         // 比如所有元素的hash值與桶大小n與操作后的值分別為 0 0 4 4 0 0 0
103                         // 則最后后面三個0對應的元素肯定還是在同一個槽位中
104                         // 這時lastRun對應的就是倒數第三個節點
105                         // 至於為啥要這樣處理,我也沒太搞明白
106                         int runBit = fh & n;
107                         Node<K,V> lastRun = f;
108                         for (Node<K,V> p = f.next; p != null; p = p.next) {
109                             int b = p.hash & n;
110                             if (b != runBit) {
111                                 runBit = b;
112                                 lastRun = p;
113                             }
114                         }
115                         // 看看最后這幾個元素歸屬於低位鏈表還是高位鏈表
116                         if (runBit == 0) {
117                             ln = lastRun;
118                             hn = null;
119                         }
120                         else {
121                             hn = lastRun;
122                             ln = null;
123                         }
124                         // 遍歷鏈表,把hash&n為0的放在低位鏈表中
125                         // 不為0的放在高位鏈表中
126                         for (Node<K,V> p = f; p != lastRun; p = p.next) {
127                             int ph = p.hash; K pk = p.key; V pv = p.val;
128                             if ((ph & n) == 0)
129                                 ln = new Node<K,V>(ph, pk, pv, ln);
130                             else
131                                 hn = new Node<K,V>(ph, pk, pv, hn);
132                         }
133                         // 低位鏈表的位置不變
134                         setTabAt(nextTab, i, ln);
135                         // 高位鏈表的位置是原位置加n
136                         setTabAt(nextTab, i + n, hn);
137                         // 標記當前槽位已遷移
138                         setTabAt(tab, i, fwd);
139                         // advance為true,返回上面進行--i操作
140                         advance = true;
141                     }
142                     else if (f instanceof TreeBin) {
143                         // 如果第一個元素是樹節點
144                         // 也是一樣,分化成兩顆樹
145                         // 也是根據hash&n為0放在低位樹中
146                         // 不為0放在高位樹中
147                         TreeBin<K,V> t = (TreeBin<K,V>)f;
148                         TreeNode<K,V> lo = null, loTail = null;
149                         TreeNode<K,V> hi = null, hiTail = null;
150                         int lc = 0, hc = 0;
151                         // 遍歷整顆樹,根據hash&n是否為0分化成兩顆樹
152                         for (Node<K,V> e = t.first; e != null; e = e.next) {
153                             int h = e.hash;
154                             TreeNode<K,V> p = new TreeNode<K,V>
155                                     (h, e.key, e.val, null, null);
156                             if ((h & n) == 0) {
157                                 if ((p.prev = loTail) == null)
158                                     lo = p;
159                                 else
160                                     loTail.next = p;
161                                 loTail = p;
162                                 ++lc;
163                             }
164                             else {
165                                 if ((p.prev = hiTail) == null)
166                                     hi = p;
167                                 else
168                                     hiTail.next = p;
169                                 hiTail = p;
170                                 ++hc;
171                             }
172                         }
173                         // 如果分化的樹中元素個數小於等於6,則退化成鏈表
174                         ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
175                                 (hc != 0) ? new TreeBin<K,V>(lo) : t;
176                         hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
177                                 (lc != 0) ? new TreeBin<K,V>(hi) : t;
178                         // 低位樹的位置不變
179                         setTabAt(nextTab, i, ln);
180                         // 高位樹的位置是原位置加n
181                         setTabAt(nextTab, i + n, hn);
182                         // 標記該槽位已遷移
183                         setTabAt(tab, i, fwd);
184                         // advance為true,返回上面進行--i操作
185                         advance = true;
186                     }
187                 }
188             }
189         }
190     }
191 }
View Code

說實話,擴容的邏輯應該是ConcurrentHashMap中除了紅黑樹相關的邏輯之外最難懂的了,我也只是理解了一部分,大致邏輯就是每一個參與擴容的線程都會分得一段槽位完成遷移,分派槽位區間是從table數組得尾部往頭部進行的,完成遷移得槽位會放置一個ForwardingNode節點標記該槽位已經被遷移過了,鏈表節點與紅黑樹節點都各自實現了不同的遷移邏輯,但都會將原鏈表/紅黑樹拆分成兩個鏈表/紅黑樹,然后分別把這兩部分放置於新數組的原位置和原位置+n的位置,n為舊數組的長度,這里為什么要將原來的鏈表或紅黑樹拆成兩個我就不是很明白了,大概是為了充分利用擴容出來的空間,並且將長鏈表和紅黑樹拆分的小一點可用加快查詢搜索速度吧。

完成整個數組的所有槽位的遷移之后,再將新數組的引用指向table數組,整個擴容擴充即結束。

獲取元素個數---size()/mappingCount

 1 public int size() {
 2     // 調用sumCount()計算元素個數
 3     long n = sumCount();
 4     return ((n < 0L) ? 0 :
 5             (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
 6                     (int)n);
 7 }
 8 
 9 final long sumCount() {
10     // 計算CounterCell所有段及baseCount的數量之和
11     CounterCell[] as = counterCells; CounterCell a;
12     long sum = baseCount;
13     if (as != null) {
14         for (int i = 0; i < as.length; ++i) {
15             if ((a = as[i]) != null)
16                 sum += a.value;
17         }
18     }
19     return sum;
20 }
View Code

size的實現也很講究啊,使用了LongAdder的實現原理,由一個基礎計數器 和一個散列數組構成,當計數器在基礎計數器或散列數組的同一個索引槽位產生競爭時都將會把數據記錄到其它CounterCell數組的不同槽位中,以消除競爭。但是其作者推薦使用mappingCount(),拋棄使用size方法,因為size方法返回值是一個int類型的,在元素個數超出int的最大范圍時將無法正常工作,而mappingCount方法返回的是long類型的,而且它們的實現都是一樣的。

其它一些部分方法

putIfAbsent(Key, Value) 只在不存在相應的鍵使才插入該鍵值對。

remove(Key, Value) 僅在存在鍵Key,並且值等於給定的value時才刪除該鍵值對。

replace(Key, oldValue, newValue) 僅在存在鍵Key,並且值等於給定的value時才替換其value為新值。

replace(Key, Value) 只在不存在相應的鍵才替換該鍵值為新的值。

computeIfAbsent(key, Function)僅在不存在相應的鍵Key時才通過給定的函數用該指定的key計算出一個值,若非null則將其與key作為鍵值對插入Map

computeIfPresent(K key, BiFunction)若存在相應的鍵Key時通過給定的函數用該指定的key及其value計算出一個值,若非null則將其與key作為鍵值對插入Map,若為nul則刪除原鍵值對。

static <K> KeySetView<K,Boolean> newKeySet()  返回一個用ConcurrentHashMap實現的值為布爾型的KetSet視圖。

forEach(parallelismThreshold,BiFunction<? super K, ? super V, ? extends U> transformer,Consumer<? super U> action) 對每個(鍵、值)按指定的transformer轉換的非空結果執行給定的操作action。

search(parallelismThreshold,BiFunction)通過對每個(鍵、值)或空(如果沒有)應用給定的搜索函數返回非空結果。

reduce(long parallelismThreshold,BiFunction<? super K, ? super V, ? extends U> transformer, BiFunction<? super U, ? super U, ? extends U> reducer)返回使用給定的轉換器transformer轉換每一個鍵值對並使用給定的reducer進行累計的結果。

reduceToXXX(long parallelismThreshold, ToLongBiFunction<? super K, ? super V> transformer,X basis,LongBinaryOperator reducer)返回使用給定的轉換器transformer轉換每一個鍵值對並使用給定的reducer進行累計的到指定的基准值basis上的結果。

forEach/reduce/searchKey(...)則是上面forEach, search, reduce方法僅僅作用於Key時的重載。

forEach/reduce/searchValue(...)則是上面forEach, search, reduce方法僅僅作用於Value時的重載。

forEach/reduce/searchEntries(...)則是上面forEach, search, reduce方法作用於Entry條目時的重載。

迭代器/可拆分迭代器

ConcurrentHashMap作為Map無法直接迭代,只能對各自視圖進行迭代,例如KeySet,ValueSet,entrySet()等,它們相應的內部類迭代器KeyIterator,ValueIterator,EntryIterator都繼承了內部類BaseIterator,BaseIterator又繼承自內部類Traverser,它們的迭代器都是弱一致性的,不會拋出ConcurrentModificationException。但是,迭代器被設計成一次只能被一個線程使用。迭代器反映了ConcurrentHashMap在迭代器創建時或創建后的狀態的元素,即擴容之后的元素狀態也會反映到迭代器。

同樣的,對於可拆分迭代器spliterator,也僅僅只能對各自視圖進行迭代,例如KeySet,ValueSet,entrySet()等,它們相應的內部類可拆分迭代器KeySpliterator,EntrySpliterator,ValueSpliterator都繼承了內部類Traverser,它們的拆分按每一次拆分一半的方式進行。

總結

ConcurrentHashMap是線程安全的HashMap實現,它的內部數據結構與對應JDK的HashMap的數據一致,在JDK8以前HashMap是采用數組+鏈表的方式實現,而ConcurrentHashMap在其基礎上采用分段鎖實現了線程安全,而JDK8的HashMap采用了數組+短鏈表+紅黑樹的數據結構實現,這改善了之前版本的長鏈表的查詢低效問題,而對應的ConcurrentHashMap在此基礎上也摒棄了粗粒度的分段鎖實現,采用了每一個數組槽位一個鎖這種更細粒度的分段鎖,並且拋棄了ReentrantLock改用synchronized + CAS鎖,這也改善了上一個版本ReentrantLock的空間浪費。

ConcurrentHashMap的內部實現非常精妙,但紅黑樹的部門確實有點難度,我並沒有對紅黑樹的轉換過程做深入的探究,本文只對ConcurrentHashMap作了粗略的了解,若想了解紅黑樹轉換過程,可參考【死磕Java並發】—–J.U.C之ConcurrentHashMap紅黑樹轉換分析 一文。除了初始化的時候,會按照指定的加載因子創建可容納指定數量的元素而不觸發數組擴容之外,ConcurrentHashMap內部維護了一個0.75的加載因子,也就是每當內部的數組占用率達到75%的時候就會將原來的數組擴容至原來的2倍大小,並將原來的所有元素拷貝到新數組中,拷貝的時候為了充分里面多出來的空間,和提高查詢搜索速度,會將一些長鏈表或紅黑樹拆分成兩個體積更小的鏈表或紅黑樹分別存放與新數組的原位置和原位置+原數組長度的位置,由於擴容操作是一個非常耗時的過程,ConcurrentHashMap對這一塊做了精妙的設計使擴容可以由多個參與線程一起輔助完成,從而減小時間消耗,但擴容本身還是是開銷比較大操作,能夠在使用ConcurrentHashMap之前就確定其大概需要的容量將有效減少擴容的消耗。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM