1、ConcurrentHashMap跟HashMap,HashTable的對比
※為了分析源碼的時候方便調試,把ConcurrentHashMap的源碼放在本地了,名字改為了ConcurrentHashMapDebug
由於源碼中的unsafe有很多限制,不能直接在本地使用,所以,在源碼的最后面的靜態代碼塊處修改了U的初始化方法。
private static final sun.misc.Unsafe U; static{ U = getUnsafe(); .... } static sun.misc.Unsafe getUnsafe() throws Exception { java.lang.reflect.Field field = Unsafe.class.getDeclaredField("theUnsafe"); field.setAccessible(true); Unsafe unsafe=(Unsafe) field.get(null); return unsafe; }
1、ConcurrentHashMap跟HashMap,HashTable的對比
我們都知道HashMap不是線程安全的,所以在處理並發的時候會出現問題。
而HashTable雖然是線程安全的,但是是通過整個來加鎖的方式,當一個線程在寫操作的時候,另外的線程則不能進行讀寫。
而ConcurrentHashMap則可以支持並發的讀寫。跟1.7版本相比,1.8版本又有了很大的變化,已經拋棄了Segment的概念,雖然源碼里面還保留了,也只是為了兼容性的考慮。
在ConcurrentHashMap中通過一個Node<K,V>[]數組來保存添加到map中的鍵值對,而在同一個數組位置是通過鏈表和紅黑樹的形式來保存的。但是這個數組只有在第一次添加元素的時候才會初始化,否則只是初始化一個ConcurrentHashMap對象的話,只是設定了一個sizeCtl變量,這個變量用來判斷對象的一些狀態和是否需要擴容,后面會詳細解釋。
第一次添加元素的時候,默認初期長度為16,當往map中繼續添加元素的時候,通過hash值跟數組長度取與來決定放在數組的哪個位置,如果出現放在同一個位置的時候,優先以鏈表的形式存放,在同一個位置的個數又達到了8個以上,如果數組的長度還小於64的時候,則會擴容數組。如果數組的長度大於等於64了的話,在會將該節點的鏈表轉換成樹。
通過擴容數組的方式來把這些節點給分散開。然后將這些元素復制到擴容后的新的數組中,同一個鏈表中的元素通過hash值的數組長度位來區分,是還是放在原來的位置還是放到擴容的長度的相同位置去 。在擴容完成之后,如果某個節點的是樹,同時現在該節點的個數又小於等於6個了,則會將該樹轉為鏈表。
取元素的時候,相對來說比較簡單,通過計算hash來確定該元素在數組的哪個位置,然后在通過遍歷鏈表或樹來判斷key和key的hash,取出value值。
往ConcurrentHashMap中添加元素的時候,里面的數據以數組的形式存放的樣子大概是這樣的:
這個時候因為數組的長度才為16,則不會轉化為樹,而是會進行擴容。
擴容后數組大概是這樣的:
需要注意的是,擴容之后的長度不是32,擴容后的長度在后面細說。
如果數組擴張后長度達到64了,且繼續在某個節點的后面添加元素達到8個以上的時候,則會出現轉化為紅黑樹的情況。
轉化之后大概是這樣:
下面是幾個重要的屬性
private static final int MAXIMUM_CAPACITY = 1 << 30; private static final int DEFAULT_CAPACITY = 16; static final int TREEIFY_THRESHOLD = 8; static final int UNTREEIFY_THRESHOLD = 6; static final int MIN_TREEIFY_CAPACITY = 64; static final int MOVED = -1; // 表示正在轉移 static final int TREEBIN = -2; // 表示已經轉換成樹 static final int RESERVED = -3; // hash for transient reservations static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash transient volatile Node<K,V>[] table;//默認沒初始化的數組,用來保存元素 private transient volatile Node<K,V>[] nextTable;//轉移的時候用的數組 /** * 用來控制表初始化和擴容的,默認值為0,當在初始化的時候指定了大小,這會將這個大小保存在sizeCtl中,大小為數組的0.75 * 當為負的時候,說明表正在初始化或擴張, * -1表示初始化 * -(1+n) n:表示活動的擴張線程 */ private transient volatile int sizeCtl;
幾個重要的類
Node<K,V>,這是構成每個元素的基本類。
static class Node<K,V> implements Map.Entry<K,V> { final int hash; //key的hash值 final K key; //key volatile V val; //value volatile Node<K,V> next; //表示鏈表中的下一個節點 Node(int hash, K key, V val, Node<K,V> next) { this.hash = hash; this.key = key; this.val = val; this.next = next; } public final K getKey() { return key; } public final V getValue() { return val; } public final int hashCode() { return key.hashCode() ^ val.hashCode(); } }
TreeNode,構造樹的節點
static final class TreeNode<K,V> extends Node<K,V> { TreeNode<K,V> parent; // red-black tree links TreeNode<K,V> left; TreeNode<K,V> right; TreeNode<K,V> prev; // needed to unlink next upon deletion boolean red; TreeNode(int hash, K key, V val, Node<K,V> next, TreeNode<K,V> parent) { super(hash, key, val, next); this.parent = parent; } }
TreeBin 用作樹的頭結點,只存儲root和first節點,不存儲節點的key、value值。
static final class TreeBin<K,V> extends Node<K,V> { TreeNode<K,V> root; volatile TreeNode<K,V> first; volatile Thread waiter; volatile int lockState; // values for lockState static final int WRITER = 1; // set while holding write lock static final int WAITER = 2; // set when waiting for write lock static final int READER = 4; // increment value for setting read lock }
ForwardingNode在轉移的時候放在頭部的節點,是一個空節點
static final class ForwardingNode<K,V> extends Node<K,V> { final Node<K,V>[] nextTable; ForwardingNode(Node<K,V>[] tab) { super(MOVED, null, null, null); this.nextTable = tab; }
}
在ConcurrentHashMap中使用了unSafe方法,通過直接操作內存的方式來保證並發處理的安全性,使用的是硬件的安全機制。
/* * 用來返回節點數組的指定位置的節點的原子操作 */ @SuppressWarnings("unchecked") static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) { return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); } /* * cas原子操作,在指定位置設定值 */ static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i, Node<K,V> c, Node<K,V> v) { return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v); } /* * 原子操作,在指定位置設定值 */ static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) { U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v); }
首先我們看看構造方法
//空的構造 public ConcurrentHashMapDebug() { } //如果在實例化對象的時候指定了容量,則初始化sizeCtl public ConcurrentHashMapDebug(int initialCapacity) { if (initialCapacity < 0) throw new IllegalArgumentException(); int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1)); this.sizeCtl = cap; } //當出入一個Map的時候,先設定sizeCtl為默認容量,在添加元素 public ConcurrentHashMapDebug(Map<? extends K, ? extends V> m) { this.sizeCtl = DEFAULT_CAPACITY; putAll(m); }
可以看到,在任何一個構造方法中,都沒有對存儲Map元素Node的table變量進行初始化。而是在第一次put操作的時候在進行初始化。
下面來看看數組的初始化方法initTable
/**
* 初始化數組table,
* 如果sizeCtl小於0,說明別的數組正在進行初始化,則讓出執行權
* 如果sizeCtl大於0的話,則初始化一個大小為sizeCtl的數組
* 否則的話初始化一個默認大小(16)的數組
* 然后設置sizeCtl的值為數組長度的3/4
*/
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0) { //第一次put的時候,table還沒被初始化,進入while if ((sc = sizeCtl) < 0) //sizeCtl初始值為0,當小於0的時候表示在別的線程在初始化表或擴展表 Thread.yield(); // lost initialization race; just spin else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //SIZECTL:表示當前對象的內存偏移量,sc表示期望值,-1表示要替換的值,設定為-1表示要初始化表了 try { if ((tab = table) == null || tab.length == 0) { int n = (sc > 0) ? sc : DEFAULT_CAPACITY; //指定了大小的時候就創建指定大小的Node數組,否則創建指定大小(16)的Node數組 @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; //初始化后,sizeCtl長度為數組長度的3/4 } break; } } return tab; }
下面看看put方法的源碼
/* * 單純的額調用putVal方法,並且putVal的第三個參數設置為false * 當設置為false的時候表示這個value一定會設置 * true的時候,只有當這個key的value為空的時候才會設置 */ public V put(K key, V value) { return putVal(key, value, false); }
再來看putVal
/* * 當添加一對鍵值對的時候,首先會去判斷保存這些鍵值對的數組是不是初始化了, * 如果沒有的話就初始化數組 * 然后通過計算hash值來確定放在數組的哪個位置 * 如果這個位置為空則直接添加,如果不為空的話,則取出這個節點來 * 如果取出來的節點的hash值是MOVED(-1)的話,則表示當前正在對這個數組進行擴容,復制到新的數組,則當前線程也去幫助復制 * 最后一種情況就是,如果這個節點,不為空,也不在擴容,則通過synchronized來加鎖,進行添加操作 * 然后判斷當前取出的節點位置存放的是鏈表還是樹 * 如果是鏈表的話,則遍歷整個鏈表,直到取出來的節點的key來個要放的key進行比較,如果key相等,並且key的hash值也相等的話, * 則說明是同一個key,則覆蓋掉value,否則的話則添加到鏈表的末尾 * 如果是樹的話,則調用putTreeVal方法把這個元素添加到樹中去 * 最后在添加完成之后,會判斷在該節點處共有多少個節點(注意是添加前的個數),如果達到8個以上了的話, * 則調用treeifyBin方法來嘗試將處的鏈表轉為樹,或者擴容數組 */ final V putVal(K key, V value, boolean onlyIfAbsent) { if (key == null || value == null) throw new NullPointerException();//K,V都不能為空,否則的話跑出異常 int hash = spread(key.hashCode()); //取得key的hash值 int binCount = 0; //用來計算在這個節點總共有多少個元素,用來控制擴容或者轉移為樹 for (Node<K,V>[] tab = table;;) { // Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0) tab = initTable(); //第一次put的時候table沒有初始化,則初始化table else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //通過哈希計算出一個表中的位置因為n是數組的長度,所以(n-1)&hash肯定不會出現數組越界 if (casTabAt(tab, i, null, //如果這個位置沒有元素的話,則通過cas的方式嘗試添加,注意這個時候是沒有加鎖的 new Node<K,V>(hash, key, value, null))) //創建一個Node添加到數組中區,null表示的是下一個節點為空 break; // no lock when adding to empty bin } /* * 如果檢測到某個節點的hash值是MOVED,則表示正在進行數組擴張的數據復制階段, * 則當前線程也會參與去復制,通過允許多線程復制的功能,一次來減少數組的復制所帶來的性能損失 */ else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { /* * 如果在這個位置有元素的話,就采用synchronized的方式加鎖, * 如果是鏈表的話(hash大於0),就對這個鏈表的所有元素進行遍歷, * 如果找到了key和key的hash值都一樣的節點,則把它的值替換到 * 如果沒找到的話,則添加在鏈表的最后面 * 否則,是樹的話,則調用putTreeVal方法添加到樹中去 * * 在添加完之后,會對該節點上關聯的的數目進行判斷, * 如果在8個以上的話,則會調用treeifyBin方法,來嘗試轉化為樹,或者是擴容 */ V oldVal = null; synchronized (f) { if (tabAt(tab, i) == f) { //再次取出要存儲的位置的元素,跟前面取出來的比較 if (fh >= 0) { //取出來的元素的hash值大於0,當轉換為樹之后,hash值為-2 binCount = 1; for (Node<K,V> e = f;; ++binCount) { //遍歷這個鏈表 K ek; if (e.hash == hash && //要存的元素的hash,key跟要存儲的位置的節點的相同的時候,替換掉該節點的value即可 ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) //當使用putIfAbsent的時候,只有在這個key沒有設置值得時候才設置 e.val = value; break; } Node<K,V> pred = e; if ((e = e.next) == null) { //如果不是同樣的hash,同樣的key的時候,則判斷該節點的下一個節點是否為空, pred.next = new Node<K,V>(hash, key, //為空的話把這個要加入的節點設置為當前節點的下一個節點 value, null); break; } } } else if (f instanceof TreeBin) { //表示已經轉化成紅黑樹類型了 Node<K,V> p; binCount = 2; if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key, //調用putTreeVal方法,將該元素添加到樹中去 value)) != null) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0) { if (binCount >= TREEIFY_THRESHOLD) //當在同一個節點的數目達到8個的時候,則擴張數組或將給節點的數據轉為tree treeifyBin(tab, i); if (oldVal != null) return oldVal; break; } } } addCount(1L, binCount); //計數 return null; }
在put方法的詳解中,我們可以看到,在同一個節點的個數超過8個的時候,會調用treeifyBin方法來看看是擴容還是轉化為一棵樹
同時在每次添加完元素的addCount方法中,也會判斷當前數組中的元素是否達到了sizeCtl的量,如果達到了的話,則會進入transfer方法去擴容
/** * Replaces all linked nodes in bin at given index unless table is * too small, in which case resizes instead. * 當數組長度小於64的時候,擴張數組長度一倍,否則的話把鏈表轉為樹 */ private final void treeifyBin(Node<K,V>[] tab, int index) { Node<K,V> b; int n, sc; if (tab != null) { System.out.println("treeifyBin方\t==>數組長:"+tab.length); if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //MIN_TREEIFY_CAPACITY 64 tryPresize(n << 1); // 數組擴容 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { //使用synchronized同步器,將該節點出的鏈表轉為樹 if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; //hd:樹的頭(head) for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) //把Node組成的鏈表,轉化為TreeNode的鏈表,頭結點任然放在相同的位置 hd = p; //設置head else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的鏈表放入容器TreeBin中 } } } } }
可以看到當需要擴容的時候,調用的時候tryPresize方法,看看trePresize的源碼
/** * 擴容表為指可以容納指定個數的大小(總是2的N次方) * 假設原來的數組長度為16,則在調用tryPresize的時候,size參數的值為16<<1(32),此時sizeCtl的值為12 * 計算出來c的值為64,則要擴容到sizeCtl≥為止 * 第一次擴容之后 數組長:32 sizeCtl:24 * 第二次擴容之后 數組長:64 sizeCtl:48 * 第二次擴容之后 數組長:128 sizeCtl:94 --> 這個時候才會退出擴容 */ private final void tryPresize(int size) { /* * MAXIMUM_CAPACITY = 1 << 30 * 如果給定的大小大於等於數組容量的一半,則直接使用最大容量, * 否則使用tableSizeFor算出來 * 后面table一直要擴容到這個值小於等於sizeCtrl(數組長度的3/4)才退出擴容 */ int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY : tableSizeFor(size + (size >>> 1) + 1); int sc; while ((sc = sizeCtl) >= 0) { Node<K,V>[] tab = table; int n; // printTable(tab); 調試用的 /* * 如果數組table還沒有被初始化,則初始化一個大小為sizeCtrl和剛剛算出來的c中較大的一個大小的數組 * 初始化的時候,設置sizeCtrl為-1,初始化完成之后把sizeCtrl設置為數組長度的3/4 * 為什么要在擴張的地方來初始化數組呢?這是因為如果第一次put的時候不是put單個元素, * 而是調用putAll方法直接put一個map的話,在putALl方法中沒有調用initTable方法去初始化table, * 而是直接調用了tryPresize方法,所以這里需要做一個是不是需要初始化table的判斷 */ if (tab == null || (n = tab.length) == 0) { n = (sc > c) ? sc : c; if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //初始化tab的時候,把sizeCtl設為-1 try { if (table == tab) { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = nt; sc = n - (n >>> 2); } } finally { sizeCtl = sc; } } } /* * 一直擴容到的c小於等於sizeCtl或者數組長度大於最大長度的時候,則退出 * 所以在一次擴容之后,不是原來長度的兩倍,而是2的n次方倍 */ else if (c <= sc || n >= MAXIMUM_CAPACITY) { break; //退出擴張 } else if (tab == table) { int rs = resizeStamp(n); /* * 如果正在擴容Table的話,則幫助擴容 * 否則的話,開始新的擴容 * 在transfer操作,將第一個參數的table中的元素,移動到第二個元素的table中去, * 雖然此時第二個參數設置的是null,但是,在transfer方法中,當第二個參數為null的時候, * 會創建一個兩倍大小的table */ if (sc < 0) { Node<K,V>[] nt; if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0) break; /* * transfer的線程數加一,該線程將進行transfer的幫忙 * 在transfer的時候,sc表示在transfer工作的線程數 */ if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) transfer(tab, nt); } /* * 沒有在初始化或擴容,則開始擴容 */ else if (U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2)) { transfer(tab, null); } } } }
在tryPresize方法中,並沒有加鎖,允許多個線程進入,如果數組正在擴張,則當前線程也去幫助擴容。
數組擴容的主要方法就是transfer方法
/** * Moves and/or copies the nodes in each bin to new table. See * above for explanation. * 把數組中的節點復制到新的數組的相同位置,或者移動到擴張部分的相同位置 * 在這里首先會計算一個步長,表示一個線程處理的數組長度,用來控制對CPU的使用, * 每個CPU最少處理16個長度的數組元素,也就是說,如果一個數組的長度只有16,那只有一個線程會對其進行擴容的復制移動操作 * 擴容的時候會一直遍歷,知道復制完所有節點,沒處理一個節點的時候會在鏈表的頭部設置一個fwd節點,這樣其他線程就會跳過他, * 復制后在新數組中的鏈表不是絕對的反序的 */ private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE) //MIN_TRANSFER_STRIDE 用來控制不要占用太多CPU stride = MIN_TRANSFER_STRIDE; // subdivide range //MIN_TRANSFER_STRIDE=16 /* * 如果復制的目標nextTab為null的話,則初始化一個table兩倍長的nextTab * 此時nextTable被設置值了(在初始情況下是為null的) * 因為如果有一個線程開始了表的擴張的時候,其他線程也會進來幫忙擴張, * 而只是第一個開始擴張的線程需要初始化下目標數組 */ if (nextTab == null) { // initiating try { @SuppressWarnings("unchecked") Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; nextTab = nt; } catch (Throwable ex) { // try to cope with OOME sizeCtl = Integer.MAX_VALUE; return; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; /* * 創建一個fwd節點,這個是用來控制並發的,當一個節點為空或已經被轉移之后,就設置為fwd節點 * 這是一個空的標志節點 */ ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab); boolean advance = true; //是否繼續向前查找的標志位 boolean finishing = false; // to ensure sweep(清掃) before committing nextTab,在完成之前重新在掃描一遍數組,看看有沒完成的沒 for (int i = 0, bound = 0;;) { Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) { advance = false; } else if ((nextIndex = transferIndex) <= 0) { i = -1; advance = false; } else if (U.compareAndSwapInt (this, TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0))) { bound = nextBound; i = nextIndex - 1; advance = false; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { //已經完成轉移 nextTable = null; table = nextTab; sizeCtl = (n << 1) - (n >>> 1); //設置sizeCtl為擴容后的0.75 return; } if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT) { return; } finishing = advance = true; i = n; // recheck before commit } } else if ((f = tabAt(tab, i)) == null) //數組中把null的元素設置為ForwardingNode節點(hash值為MOVED[-1]) advance = casTabAt(tab, i, null, fwd); else if ((fh = f.hash) == MOVED) advance = true; // already processed else { synchronized (f) { //加鎖操作 if (tabAt(tab, i) == f) { Node<K,V> ln, hn; if (fh >= 0) { //該節點的hash值大於等於0,說明是一個Node節點 /* * 因為n的值為數組的長度,且是power(2,x)的,所以,在&操作的結果只可能是0或者n * 根據這個規則 * 0--> 放在新表的相同位置 * n--> 放在新表的(n+原來位置) */ int runBit = fh & n; Node<K,V> lastRun = f; /* * lastRun 表示的是需要復制的最后一個節點 * 每當新節點的hash&n -> b 發生變化的時候,就把runBit設置為這個結果b * 這樣for循環之后,runBit的值就是最后不變的hash&n的值 * 而lastRun的值就是最后一次導致hash&n 發生變化的節點(假設為p節點) * 為什么要這么做呢?因為p節點后面的節點的hash&n 值跟p節點是一樣的, * 所以在復制到新的table的時候,它肯定還是跟p節點在同一個位置 * 在復制完p節點之后,p節點的next節點還是指向它原來的節點,就不需要進行復制了,自己就被帶過去了 * 這也就導致了一個問題就是復制后的鏈表的順序並不一定是原來的倒序 */ for (Node<K,V> p = f.next; p != null; p = p.next) { int b = p.hash & n; //n的值為擴張前的數組的長度 if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0) { ln = lastRun; hn = null; } else { hn = lastRun; ln = null; } /* * 構造兩個鏈表,順序大部分和原來是反的 * 分別放到原來的位置和新增加的長度的相同位置(i/n+i) */ for (Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0) /* * 假設runBit的值為0, * 則第一次進入這個設置的時候相當於把舊的序列的最后一次發生hash變化的節點(該節點后面可能還有hash計算后同為0的節點)設置到舊的table的第一個hash計算后為0的節點下一個節點 * 並且把自己返回,然后在下次進來的時候把它自己設置為后面節點的下一個節點 */ ln = new Node<K,V>(ph, pk, pv, ln); else /* * 假設runBit的值不為0, * 則第一次進入這個設置的時候相當於把舊的序列的最后一次發生hash變化的節點(該節點后面可能還有hash計算后同不為0的節點)設置到舊的table的第一個hash計算后不為0的節點下一個節點 * 並且把自己返回,然后在下次進來的時候把它自己設置為后面節點的下一個節點 */ hn = new Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } else if (f instanceof TreeBin) { //否則的話是一個樹節點 TreeBin<K,V> t = (TreeBin<K,V>)f; TreeNode<K,V> lo = null, loTail = null; TreeNode<K,V> hi = null, hiTail = null; int lc = 0, hc = 0; for (Node<K,V> e = t.first; e != null; e = e.next) { int h = e.hash; TreeNode<K,V> p = new TreeNode<K,V> (h, e.key, e.val, null, null); if ((h & n) == 0) { if ((p.prev = loTail) == null) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } /* * 在復制完樹節點之后,判斷該節點處構成的樹還有幾個節點, * 如果≤6個的話,就轉回為一個鏈表 */ ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0) ? new TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0) ? new TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true; } } } } } }
到這里,ConcurrentHashMap的put操作和擴容都介紹的差不多了,
下面的兩點一定要注意:
·復制之后的新鏈表不是舊鏈表的絕對倒序。
·在擴容的時候每個線程都有處理的步長,最少為16,在這個步長范圍內的數組節點只有自己一個線程來處理
相比put操作,get操作就顯得很簡單了。廢話少說,直接上源碼分析。
/* * 相比put方法,get就很單純了,支持並發操作, * 當key為null的時候回拋出NullPointerException的異常 * get操作通過首先計算key的hash值來確定該元素放在數組的哪個位置 * 然后遍歷該位置的所有節點 * 如果不存在的話返回null */ public V get(Object key) { Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek; int h = spread(key.hashCode()); if ((tab = table) != null && (n = tab.length) > 0 && (e = tabAt(tab, (n - 1) & h)) != null) { if ((eh = e.hash) == h) { if ((ek = e.key) == key || (ek != null && key.equals(ek))) return e.val; } else if (eh < 0) return (p = e.find(h, key)) != null ? p.val : null; while ((e = e.next) != null) { if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek)))) return e.val; } } return null; }
前面分析了下ConcurrentHashMap的源碼,那么,對於一個映射集合來說,ConcurrentHashMap是如果來做到並發安全,又是如何做到高效的並發的呢?
首先是讀操作,從源碼中可以看出來,在get操作中,根本沒有使用同步機制,也沒有使用unsafe方法,所以讀操作是支持並發操作的。
那么寫操作呢?
分析這個之前,先看看什么情況下會引起數組的擴容,擴容是通過transfer方法來進行的。而調用transfer方法的只有trePresize、helpTransfer和addCount三個方法。
這三個方法又是分別在什么情況下進行調用的呢?
·tryPresize是在treeIfybin和putAll方法中調用,treeIfybin主要是在put添加元素完之后,判斷該數組節點相關元素是不是已經超過8個的時候,如果超過則會調用這個方法來擴容數組或者把鏈表轉為樹。
·helpTransfer是在當一個線程要對table中元素進行操作的時候,如果檢測到節點的HASH值為MOVED的時候,就會調用helpTransfer方法,在helpTransfer中再調用transfer方法來幫助完成數組的擴容
·addCount是在當對數組進行操作,使得數組中存儲的元素個數發生了變化的時候會調用的方法。
所以引起數組擴容的情況如下:
·只有在往map中添加元素的時候,在某一個節點的數目已經超過了8個,同時數組的長度又小於64的時候,才會觸發數組的擴容。
·當數組中元素達到了sizeCtl的數量的時候,則會調用transfer方法來進行擴容
那么在擴容的時候,可以不可以對數組進行讀寫操作呢?
事實上是可以的。當在進行數組擴容的時候,如果當前節點還沒有被處理(也就是說還沒有設置為fwd節點),那就可以進行設置操作。
如果該節點已經被處理了,則當前線程也會加入到擴容的操作中去。
那么,多個線程又是如何同步處理的呢?
在ConcurrentHashMap中,同步處理主要是通過Synchronized和unsafe兩種方式來完成的。
·在取得sizeCtl、某個位置的Node的時候,使用的都是unsafe的方法,來達到並發安全的目的
·當需要在某個位置設置節點的時候,則會通過Synchronized的同步機制來鎖定該位置的節點。
·在數組擴容的時候,則通過處理的步長和fwd節點來達到並發安全的目的,通過設置hash值為MOVED
·當把某個位置的節點復制到擴張后的table的時候,也通過Synchronized的同步機制來保證現程安全
前面在講解tryifyBin的源碼的時候講到過,如果在當個bin上的元素超過了8個的時候,就會嘗試去擴容數組或者是將鏈表轉為紅黑樹。
源碼:
private final void treeifyBin(Node<K,V>[] tab, int index) { System.out.println("當前線程:"+Thread.currentThread().getName()+"進入treeifyBin方法"); Node<K,V> b; int n, sc; if (tab != null) { if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //MIN_TREEIFY_CAPACITY 64 tryPresize(n << 1); // 數組擴容 else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { synchronized (b) { //使用synchronized同步器,將該節點出的鏈表轉為樹 if (tabAt(tab, index) == b) { TreeNode<K,V> hd = null, tl = null; //hd:樹的頭(head) for (Node<K,V> e = b; e != null; e = e.next) { TreeNode<K,V> p = new TreeNode<K,V>(e.hash, e.key, e.val, null, null); if ((p.prev = tl) == null) //把Node組成的鏈表,轉化為TreeNode的鏈表,頭結點任然放在相同的位置 hd = p; //設置head else tl.next = p; tl = p; } setTabAt(tab, index, new TreeBin<K,V>(hd));//把TreeNode的鏈表放入容器TreeBin中 } } } } }
首先將Node的鏈表轉化為一個TreeNode的鏈表,然后將TreeNode鏈表的頭結點來構造一個TreeBin。
下面是TreeBin構造方法的源碼:
TreeBin(TreeNode<K,V> b) { super(TREEBIN, null, null, null); //創建的TreeBin是一個空節點,hash值為TREEBIN(-2) this.first = b; TreeNode<K,V> r = null; for (TreeNode<K,V> x = b, next; x != null; x = next) { next = (TreeNode<K,V>)x.next; x.left = x.right = null; if (r == null) { x.parent = null; x.red = false; r = x; }// else { K k = x.key; int h = x.hash; Class<?> kc = null; for (TreeNode<K,V> p = r;;) {//x代表的是轉換為樹之前的順序遍歷到鏈表的位置的節點,r代表的是根節點 int dir, ph; K pk = p.key; if ((ph = p.hash) > h) // dir = -1; else if (ph < h) dir = 1; else if ((kc == null && (kc = comparableClassFor(k)) == null) || (dir = compareComparables(kc, k, pk)) == 0) dir = tieBreakOrder(k, pk); //當key不可以比較,或者相等的時候采取的一種排序措施 TreeNode<K,V> xp = p; if ((p = (dir <= 0) ? p.left : p.right) == null) {//在這里判斷要放的left/right是否為空,不為空繼續用left/right節點來判斷 x.parent = xp; if (dir <= 0) xp.left = x; else xp.right = x; r = balanceInsertion(r, x); //每次插入一個元素的時候都調用balanceInsertion來保持紅黑樹的平衡 break; } } } } this.root = r; assert checkInvariants(root); }
轉化的過程大概如下:
接下來,用鏈表頭部的TreeNode來構造一個TreeBin,在TreeBin容器中,將鏈表轉化為紅黑樹。
首先是構造一個如下的TreeBin空節點。
構造完TreeBin這個空節點之后,就開始構造紅黑樹,首先是第一個節點,左右子節點設置為空,作為紅黑樹的root節點,設置為黑色,父節點為空。
接下來遍歷鏈表的后續節點,沒添加一個元素的時候,都會通過判斷hash值來決定是放在根節點的左節點還是有節點,如果左/右節點不為空,則繼續以左/右節點來重復判斷,直到左/右節點為空,則添加到左/右位置。
然后在每次添加完一個節點之后,都會調用balanceInsertion方法來維持這是一個紅黑樹的屬性和平衡性。紅黑樹所有操作的復雜度都是O(logn),所以當元素量比較大的時候,效率也很高。