為什么juc下的集合類是線程安全的


1. JUC 簡介

  在 Java 5.0 提供了 java.util.concurrent(簡稱JUC)包,在此包中增加了在並發編程中很常用的工具類,用於定義類似於線程的自定義子系統,包括線程池,異步 IO 和輕量級任務框架;還提供了設計用於多線程上下文中的 Collection 實現等。

2.並發容器類

  我們都知道在java包下的集合大多是線程不安全的,而Vector,stack,hashtable是線程安全的,它們的線程安全是依靠synchronized,效率低。雖然可以通過Collections工具類中的方法獲取java集合包對應的同步類,但是這些同步類的並發效率並不是很高。為了更好的支持高並發任務,並發大師Doug Lea在JUC(java.util.concurrent)包中添加了java集合包中單線程類的對應的支持高並發的類。

例如,ArrayList對應的高並發類是CopyOnWriteArrayList,HashMap對應的高並發類是ConcurrentHashMap,等等。

  JUC包在添加”java集合包“對應的高並發類時,為了保持API接口的一致性,使用了”Java集合包“中的框架。例如,CopyOnWriteArrayList實現了“Java集合包”中的List接口,ConcurrentHashMap繼承了“java集合包”中的AbstractMap類等等,是的我們便於理解。

 

List和Set

JUC集合包中的List和Set實現類包括: CopyOnWriteArrayList, CopyOnWriteArraySet和ConcurrentSkipListSet。CopyOnWriteArrayList 和 CopyOnWriteArraySet的框架如下圖所示:
在這里插入圖片描述

(01) CopyOnWriteArrayList相當於線程安全的ArrayList,它實現了List接口。CopyOnWriteArrayList是支持高並發的。
(02) CopyOnWriteArraySet相當於線程安全的HashSet,它繼承於AbstractSet類。CopyOnWriteArraySet內部包含一個CopyOnWriteArrayList對象,它是通過CopyOnWriteArrayList實現的,CopyOnWriteArraySet操作原理與CopyOnWriteArrayList類似,只是Set是無序不可重復,List是有序可重復。

CopyOnWriteArrayList。

1. CopyOnWriteArrayList實現了List接口,因此它是一個隊列。

2. CopyOnWriteArrayList包含了成員lock。每一個CopyOnWriteArrayList都和一個互斥鎖lock綁定,通過lock,實現了對CopyOnWriteArrayList的互斥訪問。
3. CopyOnWriteArrayList包含了成員array數組,這說明CopyOnWriteArrayList本質上通過數組實現的。
下面從“動態數組”和“線程安全”兩個方面進一步對CopyOnWriteArrayList的原理進行說明。
1. CopyOnWriteArrayList的“動態數組”機制 -- 它內部有個“volatile數組”(array)來保持數據。在“添加/修改/刪除”數據時,都會新建一個數組,並將更新后的數據拷貝到新建的數組中,最后再將該數組賦值給“volatile數組”。這就是它叫做CopyOnWriteArrayList的原因!CopyOnWriteArrayList就是通過這種方式實現的動態數組;不過正由於它在“添加/修改/刪除”數據時,都會新建數組,所以涉及到修改數據的操作,CopyOnWriteArrayList效率很低;但是單單只是進行遍歷查找的話,效率比較高。

2. CopyOnWriteArrayList的“線程安全”機制 -- 是通過volatile和互斥鎖來實現的。(01) CopyOnWriteArrayList是通過“volatile數組”來保存數據的。一個線程讀取volatile數組時,總能看到其它線程對該volatile變量最后的寫入;就這樣,通過volatile提供了“讀取到的數據總是最新的”這個機制的保證。
(02) CopyOnWriteArrayList通過互斥鎖來保護數據。在“添加/修改/刪除”數據時,會先“獲取互斥鎖”,再修改完畢之后,先將數據更新到“volatile數組”中,然后再“釋放互斥鎖”;這樣,就達到了保護數據的目的。 

我們來看看CopyOnWriteArrayList的核心的源碼:

 

public boolean add(E e) {
final ReentrantLock lock = this.lock;
// 獲取“鎖”
lock.lock();
try {
// 獲取原始”volatile數組“中的數據和數據長度。
Object[] elements = getArray();
int len = elements.length;
// 新建一個數組newElements,並將原始數據拷貝到newElements中;
// newElements數組的長度=“原始數組的長度”+1
Object[] newElements = Arrays.copyOf(elements, len + 1);
// 將“新增加的元素”保存到newElements中。
newElements[len] = e;
// 將newElements賦值給”volatile數組“。
setArray(newElements);
return true;
} finally {
// 釋放“鎖”
lock.unlock();
}
}

說明: 第一,在”添加操作“開始前,獲取獨占鎖(lock),若此時有需要線程要獲取鎖,則必須等待;在操作完畢后,釋放獨占鎖(lock),此時其它線程才能獲取鎖。通過獨占鎖,來防止多線程同時修改數據!lock的定義如下:transient final ReentrantLock lock = new ReentrantLock();

        第二,操作完畢時,會通過setArray()來更新”volatile數組“。而且,前面我們提過”即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入“;這樣,每次添加元素之后,其它線程都能看到新添加的元素。

 由於ReentrantLock是java.util.concurrent包下提供的一套互斥鎖,相比Synchronized,ReentrantLock類提供了一些高級功能:

1.等待可中斷,持有鎖的線程長期不釋放的時候,正在等待的線程可以選擇放棄等待,這相當於Synchronized來說可以避免出現死鎖的情況。

2.公平鎖,多個線程等待同一個鎖時,必須按照申請鎖的時間順序獲得鎖

3.鎖綁定多個條件,一個ReentrantLock對象可以同時綁定多個對象。ReenTrantLock提供了一個Condition(條件)類,用來實現分組喚醒需要喚醒的線程們,而不是像synchronized要么隨機喚醒一個線程要么喚醒全部線程。

簡單來說,ReenTrantLock的實現是一種自旋鎖,通過循環調用CAS操作來實現加鎖。它的性能比較好也是因為避免了使線程進入內核態的阻塞狀態。

 

Map

JUC集合包中Map的實現類包括: ConcurrentHashMap和ConcurrentSkipListMap。它們的框架如下圖所示:
在這里插入圖片描述

(01) ConcurrentHashMap是線程安全的哈希表(相當於線程安全的HashMap);它繼承於AbstractMap類,並且實現ConcurrentMap接口。ConcurrentHashMap是通過“鎖分段”來實現的,它支持並發。
(02) ConcurrentSkipListMap是線程安全的有序的哈希表(相當於線程安全的TreeMap); 它繼承於AbstractMap類,並且實現ConcurrentNavigableMap接口。ConcurrentSkipListMap是通過“跳表”來實現的,它支持並發。
(03) ConcurrentSkipListSet是線程安全的有序的集合(相當於線程安全的TreeSet);它繼承於AbstractSet,並實現了NavigableSet接口。ConcurrentSkipListSet是通過ConcurrentSkipListMap實現的,它也支持並發。

ConcurrentHashMap。

1、ConcurrentHashMap是線程安全的哈希表,它是通過“鎖分段”來實現的。ConcurrentHashMap中包括了“Segment(鎖分段)數組”,每個Segment就是一個哈希表,而且也是可重入的互斥鎖。第一,Segment是哈希表表現在,Segment包含了“HashEntry數組”,而“HashEntry數組”中的每一個HashEntry元素是一個單向鏈表。即Segment是通過鏈式哈希表。第二,Segment是可重入的互斥鎖表現在,Segment繼承於ReentrantLock,而ReentrantLock就是可重入的互斥鎖。
對於ConcurrentHashMap的添加,刪除操作,在操作開始前,線程都會獲取Segment的互斥鎖;操作完畢之后,才會釋放。而對於讀取操作,它是通過volatile去實現的,HashEntry數組是volatile類型的,而volatile能保證“即對一個volatile變量的讀,總是能看到(任意線程)對這個volatile變量最后的寫入”,即我們總能讀到其它線程寫入HashEntry之后的值。 以上這些方式,就是ConcurrentHashMap線程安全的實現原理。

(01) put()根據key獲取對應的哈希值,再根據哈希值找到對應的Segment片段。如果Segment片段不存在,則新增一個Segment。
(02) 將key-value鍵值對添加到Segment片段中。

final V put(K key, int hash, V value, boolean onlyIfAbsent) {
// tryLock()獲取鎖,成功返回true,失敗返回false。
// 獲取鎖失敗的話,則通過scanAndLockForPut()獲取鎖,並返回”要插入的key-value“對應的”HashEntry鏈表“。
HashEntry<K,V> node = tryLock() ? null :
scanAndLockForPut(key, hash, value);
V oldValue;
try {
// tab代表”當前Segment中的HashEntry數組“
HashEntry<K,V>[] tab = table;
// 根據”hash值“獲取”HashEntry數組中對應的HashEntry鏈表“
int index = (tab.length - 1) & hash;
HashEntry<K,V> first = entryAt(tab, index);
for (HashEntry<K,V> e = first;;) {
// 如果”HashEntry鏈表中的當前HashEntry節點“不為null,
if (e != null) {
K k;
// 當”要插入的key-value鍵值對“已經存在於”HashEntry鏈表中“時,先保存原有的值。
// 若”onlyIfAbsent“為true,即”要插入的key不存在時才插入”,則直接退出;
// 否則,用新的value值覆蓋原有的原有的值。
if ((k = e.key) == key ||
(e.hash == hash && key.equals(k))) {
oldValue = e.value;
if (!onlyIfAbsent) {
e.value = value;
++modCount;
}
break;
}
e = e.next;
}
else {
// 如果node非空,則將first設置為“node的下一個節點”。
// 否則,新建HashEntry鏈表
if (node != null)
node.setNext(first);
else
node = new HashEntry<K,V>(hash, key, value, first);
int c = count + 1;
// 如果添加key-value鍵值對之后,Segment中的元素超過閾值(並且,HashEntry數組的長度沒超過限制),則rehash;
// 否則,直接添加key-value鍵值對。
if (c > threshold && tab.length < MAXIMUM_CAPACITY)
rehash(node);
else
setEntryAt(tab, index, node);
++modCount;
count = c;
oldValue = null;
break;
}
}
} finally {
// 釋放鎖
unlock();
}
return oldValue;
}

 

說明:
put()的作用是將key-value鍵值對插入到“當前Segment對應的HashEntry中”,在插入前它會獲取Segment對應的互斥鎖,插入后會釋放鎖。具體的插入過程如下:
(01) 首先根據“hash值”獲取“當前Segment的HashEntry數組對象”中的“HashEntry節點”,每個HashEntry節點都是一個單向鏈表。
(02) 接着,遍歷HashEntry鏈表。
       若在遍歷HashEntry鏈表時,找到與“要key-value鍵值對”對應的節點,即“要插入的key-value鍵值對”的key已經存在於HashEntry鏈表中。則根據onlyIfAbsent進行判斷,若onlyIfAbsent為true,即“當要插入的key不存在時才插入”,則不進行插入,直接返回;否則,用新的value值覆蓋原始的value值,然后再返回。
       若在遍歷HashEntry鏈表時,沒有找到與“要key-value鍵值對”對應的節點。當node!=null時,即在scanAndLockForPut()獲取鎖時,已經新建了key-value對應的HashEntry節點,則”將HashEntry添加到Segment中“;否則,新建key-value對應的HashEntry節點,然后再“將HashEntry添加到Segment中”。 在”將HashEntry添加到Segment中“前,會判斷是否需要rehash。如果在添加key-value鍵值之后,容量會超過閾值,並且HashEntry數組的長度沒有超過限制,則進行rehash;否則,直接通過setEntryAt()將key-value鍵值對添加到Segment中。

 

2、之前是分段鎖的思想,通過采用分段鎖Segment減少熱點域來提高並發效率。1.8之后利用CAS+Synchronized來保證並發更新的安全,底層采用數組+鏈表+紅黑樹的存儲結構。

    • CAS(Compare-And-Swap) 算法是硬件對於並發的支持,針對多處理器操作而設計的處理器中的一種特殊指令,用於
      管理對共享數據的並發訪問;
    • CAS 是一種無鎖的非阻塞算法的實現;
    • CAS 包含了三個操作數:
      • 需要讀寫的內存值: V
      • 進行比較的預估值: A
      • 擬寫入的更新值: B
      • 當且僅當 V == A 時, V = B, 否則,將不做任何操作;

這里來看看ConcurrentHashMap的put源碼:

復制代碼
 
         

public V put(K key, V value) {
return putVal(key, value, false);
}

final V putVal(K key, V value, boolean onlyIfAbsent) {
//和hashMap不同的是,concurrentHashMap的key和value都不允許為null
//concurrenthashmap它們是用於多線程的,並發的 ,如果map.get(key)得到了null,
// 不能判斷到底是映射的value是null,還是因為沒有找到對應的key而為空,
// 而用於單線程狀態的hashmap卻可以用containKey(key) 去判斷到底是否包含了這個null。
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
//如果是第一次put,進行初始化
if (tab == null || (n = tab.length) == 0)
tab = initTable();
//根據(tab.length - 1) & hash 計算目標節點在數組中的位置
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
//如果為空,則通過cas 添加一個新建一個頭節點
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
//hash為-1 說明是一個forwarding nodes節點,表明正在擴容
else if ((fh = f.hash) == MOVED)
//幫助擴容
tab = helpTransfer(tab, f);
else {
V oldVal = null;
//對上面計算出來的節點進行加鎖
synchronized (f) {
//這里判斷下有沒有線程對數組進行了修改
if (tabAt(tab, i) == f) {
//這里如果hash值是大於等於0的說明是鏈表
if (fh >= 0) {
binCount = 1;
for (Node<K,V> e = f;; ++binCount) {
K ek;
//如果找到了目標節點,那么進行值替換
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
//如果循環到鏈表結尾還沒發現,那么進行插入操作
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break;
}
}
}
//如果是樹
else if (f instanceof TreeBin) {
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
//如果鏈表數量大於TREEIFY_THRESHOLD(8),開始執行轉換樹
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
//進行元素數量統計,和決定是否擴容
addCount(1L, binCount);
return null;
}

復制代碼

 ConcurrentHashMap 是設計為非阻塞的。就是一個線程的失敗或者掛起不應該影響其他線程的失敗或掛起的算法。

在更新時會局部鎖住某部分數據,但不會把整個表都鎖住。同步讀取操作則是完全非阻塞的。好處是在保證合理的同步前提下,效率很高。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM