更新日志(2018年8月18日):這篇博客的隊列部分犯了個低級錯誤:入隊和出隊在同在隊列尾端進行。正確的實現方式見基於雙向鏈表實現無鎖隊列的正確姿勢(修正之前博客中的錯誤)
並發容器是線程安全的容器。它在實現容器基本功能的前提下,還提供了並發控制能力,使得容器在被多線程並發訪問的情況下還能表現出正確的行為。通常我們使用獨占鎖的悲觀策略來進行並發控制,因為其實現相對簡單,正確性易於判斷且絕大部分情況下都能表現出不錯的性能;當然,也可以使用CAS算法來進行並發控制,這是一種無鎖的樂觀策略,盡管其有着實現比較復雜,正確性較難判定等缺點,但相比獨占鎖的方式,它至少有以下幾方面的優勢:
- 因為不需要加鎖,根本上避免了死鎖的產生。
- 避免了線程對鎖的競爭產生的開銷,比如阻塞,喚醒以及線程的調度。
- 加鎖本質上是將部分代碼的執行由並行變串行,這會導致程序的並行化比例降低;而無鎖的CAS算法避免了這種情況發生。根據AMdahl定律,這意味着,隨着計算資源的增加,基於CAS算法構建的並發容器往往有更好的性能提升,盡管其有因為CAS競爭失敗導致的重試問題。
總的來說,無鎖的並發容器更加安全,大部分情況下吞吐量也更高,但是也有着實現較為復雜不太好理解的缺點。通過自己動手實現無鎖的線程安全的棧和隊列,就能深刻體會到這一點。完整的代碼已經放到github上beautiful-concurrent
2. 基於CAS算法構建無鎖的並發棧
棧通常有兩種實現方式,一種是使用數組,另一種是使用鏈表。首先我們定義一個無鎖的棧的接口,該接口內部只有兩個方法push()和pop(),如下圖所示
/**
* @author: takumiCX
* @create: 2018-08-09
**/
public interface LockFreeStack<E> {
boolean push(E e);
E pop();
}
接下來我們分別就數組和鏈表兩種實現方式,來探討如何基於CAS算法構建無鎖的並發棧。
2.1 數組實現
/**
* @author: takumiCX
* @create: 2018-08-08
*
* 基於數組實現的無鎖的並發棧
**/
public class LockFreeArrayStack<E> implements LockFreeStack<E>{
//不支持擴容
final Object[] elements;
//容量,一旦確定不可更改
final int capacity;
//記錄棧頂元素在數組中的下標,初始值為-1
AtomicInteger top = new AtomicInteger(-1);
public LockFreeArrayStack(int capacity) {
this.capacity = capacity;
elements = new Object[capacity];
}
/**
* 入棧
*
* @param e
* @return true:入棧成功 false:入棧失敗(棧已滿)
*/
public boolean push(E e) {
//死循環,保證多次CAS嘗試后能得到結果
for (; ; ) {
//當前棧頂元素在數組中的下標
int curTop = top.get();
//棧已滿,返回false
if (curTop + 1 >= capacity) {
return false;
} else {
//首先將元素放入棧中
elements[curTop + 1] = e;
//基於CAS更新棧頂指針,這里是top值
if (top.compareAndSet(curTop, curTop + 1)) {
return true;
}
}
}
}
/**
* 出棧
*
* @return 棧頂元素, 若棧為空返回null
*/
public E pop() {
//死循環,保證多次CAS嘗試后能得到結果
for (; ; ) {
//當前棧頂元素在數組中的下標
int curTop = top.get();
//棧為空,返回null
if (curTop == -1) {
return null;
} else {
//CAS更新棧頂指針,這里是top值
if (top.compareAndSet(curTop, curTop - 1)) {
return (E) elements[curTop];
}
}
}
}
}
為了突出CAS算法實現入棧出棧的過程,同時也是為了簡化代碼實現的復雜度,基於數組實現的棧不支持擴容,最大容量capacity在構造函數中確定。top為棧頂元素"指針",這里為棧頂元素在數組中的下標值。因為top值的更新依賴於前值,所以這里不能使用volatile關鍵字,而應該使用原子變量,保證在更新top值之前top值沒有被其他線程改變。棧的pop()方法實現比較簡單,只要在棧不為空的情況下,原子的把top值更新為top-1;而棧的push()方法相對來說比較復雜,它包含了兩步:
- 1.將入棧元素放入數組的top+1位置
- 2.然后原子的更新top值為top+1
這里可能有人會疑問:這難道不會產生並發問題嗎,比如說一個線程執行了1后,另一個線程也執行了1,這不是把前面的結果覆蓋了嗎?我們可以畫圖來演繹下整個過程:
可以看到,盡管1在多線程環境下會產生元素覆蓋問題,但是對於最后一個覆蓋的線程而言,2的CAS更新是必然會成功的,不管這個CAS更新是由該線程自己執行還是其他線程替他執行,一旦某線程CAS更新成功,其他線程將因CAS失敗重新執行for循環。
2.2 鏈表實現
基於鏈表實現無鎖的棧會更靈活,不用考慮棧擴容或者棧空間滿的問題,而且實現同樣簡單。
/**
* @author: takumiCX
* @create: 2018-08-09
*
* 基於鏈表實現的無鎖的並發棧
**/
public class LockFreeLinkedStack<E> implements LockFreeStack<E>{
//棧頂指針
AtomicReference<Node<E>> top = new AtomicReference<Node<E>>();
/**
* @param e 入棧元素
* @return true:入棧成功 false:入棧失敗
*/
public boolean push(E e) {
//構造新結點
Node<E> newNode = new Node<E>(e);
//死循環,保證CAS失敗重試后能入棧成功
for (; ; ) {
//當前棧頂結點
Node<E> curTopNode = top.get();
//新結點的next指針指向原棧頂結點
newNode.next = curTopNode;
//CAS更新棧頂指針
if (top.compareAndSet(curTopNode, newNode)) {
return true;
}
}
}
/**
*
* @return 返回棧頂結點中的值,若棧為空返回null
*/
public E pop() {
//死循環,保證出棧成功
for (; ; ) {
//當前棧頂結點
Node<E> curTopNode = top.get();
//棧為空,返回null
if (curTopNode == null) {
return null;
} else {
//獲得原棧頂結點的后繼結點
Node<E> nextNode = curTopNode.next;
//CAS更新棧頂指針
if (top.compareAndSet(curTopNode, nextNode)) {
return curTopNode.item;
}
}
}
}
//定義鏈表結點
private static class Node<E> {
public E item;
public Node<E> next;
public Node(E item) {
this.item = item;
}
}
}
由定義可知,該鏈表為單鏈表,且不帶哨兵。為了操作方便,新增結點的插入采用頭插法。如下圖所示:
因為棧頂元素的更新依賴於前值(我們要保證更新棧頂指針前沒有其他線程對其進行更改),故而使用原子引用,基於原子引用的CAS算法可以保證只有當更新前的引用為預期值時,當前更新才能成功(注意棧頂指針的初始化方式,至於為何不能直接給top賦值為null后面隊列部分會解釋)。其入棧出棧流程也十分簡單,僅需對棧頂指針top進行CAS同步。
- 入棧時:1.獲取當前棧頂結點 2.新結點的next指針指向獲取的棧頂結點 3.CAS更新棧頂指針。CAS競爭失敗的線程將會重復這一流程。
- 出棧時:1. 獲取當前棧頂結點 2.若棧頂結點為null,說明棧為空,返回null,否則CAS更新棧頂指針為原棧頂結點的下一個結點 3.返回原棧頂結點的元素 。CAS失敗的線程將會重復這一流程。
2.3 性能測試
以JDK中的Stack為基准進行性能測試,由於JDK中的Stack是線程不安全的,在測試時通過手動加鎖的方式保證線程安全。
開啟10個線程,每個線程混合進行10000次push和pop操作。分別以每種容器進行100次上述操作,計算出每次的平均執行時間(單位毫秒)如下。測試環境的處理器核數為4。這里就不貼測試代碼了,想看的點這里github
可以看到基於CAS算法的棧確實比基於鎖的棧表現出了更好的性能。
3. 基於CAS算法構建無鎖的並發隊列
隊列其實也有數組(循環數組)和鏈表兩種實現方式,這里為了向並發大神Doug Lea致敬(笑),僅就隊列的鏈表實現進行討論。棧的操作只在棧頂進行,只需要有一個棧頂指針,並保證其更新的原子性即可。但是隊列是先進先出的,其入隊和出隊分別在隊列的兩端,所以必須有兩個指針分別指向隊列的頭部和尾部,對它們的更新不僅需要保證是原子性的,還要避免其相互沖突,一旦涉及到兩項CAS操作,且它們相互間又存在一定的制約關系,無鎖算法的實現就會一下子變得復雜起來。不過不用急,通過合理的分析和設計,總能找到正確的辦法。先來看無鎖的隊列的接口定義:
/**
* @author: takumiCX
* @create: 2018-08-10
*
* 隊列接口,僅包含入隊和出隊抽象方法
**/
public interface LockFreeQueue<E> {
//入隊
boolean enqueue(E e);
//出隊
E dequeue();
}
該接口中僅定義了入隊和出隊的方法。
鏈表有雙向鏈表和單向鏈表之分,通過前面閱讀reentrantlock的源碼我們知道同步隊列就是一種雙向鏈表,並且在為我們保留了更多信息的情況下實現也並不復雜,借鑒下這種經驗,我們也選擇雙向鏈表來實現隊列。鏈表的結點定義如下:
private static class Node<E> {
//指向前一個節點的指針
public volatile Node pre;
//指向后一個結點的指針
public volatile Node next;
//真正要存儲在隊列中的值
public E item;
public Node(E item) {
this.item = item;
}
@Override
public String toString() {
return "Node{" +
"item=" + item +
'}';
}
}
因為可能有多線程環境對結點指針的並發訪問,所以pre和next指針都用volatile修飾保證可見性。
隊列的頭尾指針分別定義如下,我們采用了不帶哨兵結點的方式,即頭尾指針在初始化時指向的元素值為null,又因為我們需要原子的更新引用,故聲明為AtomicReference,其內部指針指向的類型為Node
//指向隊列頭結點的原子引用
private AtomicReference<Node<E>> head = new AtomicReference<>(null);
//指向隊列尾結點的原子引用
private AtomicReference<Node<E>> tail = new AtomicReference<>(null);
注意這里初始化的寫法,AtomicReference其實也是一個對象,其內部有一個指向真正元素的引用以及一些原子方法。我們不能直接寫head=tail=null來表示初始化狀態,而應該聲明一個AtomicReference的空對象。整個隊列的結構如下圖所示。
可以看到隊列的頭指針和尾指針分別指向的是一個AtomicReference對象,然后再由AtomicReference內部的指針去指向真正的頭尾結點。AtomicReference對象不僅使我們的頭尾指針間接找到了隊列的頭尾結點,還提供了原子的更新這種指向的方法。后面所講的更新頭尾指針其實更新的並不是真正的頭尾指針,而是它們指向的AtomicReference對象內部的指針。當然思考的時候不用考慮這層抽象。
3.1 入隊方法
/**
*
* @param e 要入隊的元素
* @return true:入隊成功 false:入隊失敗
*/
public boolean enqueue(E e) {
//創建一個包含入隊元素的新結點
Node<E> newNode = new Node<>(e);
//死循環,保證最后都能進入隊列
for (; ; ) {
//當前尾結點
Node<E> taild = tail.get();
//當前尾結點為null,說明隊列為空
if (taild == null) {
//CAS方式更新隊列頭指針
if (head.compareAndSet(null, newNode)) {
//非同步方式更新尾指針
tail.set(newNode);
return true;
}
} else {
//新結點的pre指針指向原尾結點
newNode.pre = taild;
//CAS方式將尾指針指向新的結點
if (tail.compareAndSet(taild, newNode)) {
//非同步方式更新
taild.next = newNode;
return true;
}
}
}
}
有沒有覺得這部分代碼很眼熟?如果你認真閱讀過AQS中線程加入同步隊列等待部分的源碼,就會發現只是在它的基礎上做了些小改動。整個過程只有更新頭尾指針時進行了CAS同步,所以並發環境下性能很好。至於為什么整個過程是線程安全的可以參考我在從源碼角度徹底理解ReentrantLock(重入鎖)里3.3小節的講解。
3.2 出隊方法
/**
* 將隊列首元素從隊列中移除並返回該元素,若隊列為空則返回null
* @return
*/
public E dequeue() {
//死循環,保證最后都能出隊成功
for (;;) {
//當前頭結點
Node<E> tailed = tail.get();
//當前尾結點
Node<E> headed = head.get();
if (tailed == null) { //尾結點為null,說明隊列為空,直接返回null
return null;
} else if (headed == tailed) { //尾結點和頭結點相同,說明隊列中只有一個元素,此時要更新頭尾指針
//CAS方式更新尾指針為null
if (tail.compareAndSet(tailed,null)) {
// head.compareAndSet(headed, null);
//頭指針更新為null
head.set(null);
return tailed.item;
}
} else { //此時隊列中元素個數大於1,頭尾指針指向不同結點,出隊操作只需要更新尾指針
Node preNode = tailed.pre;
//CAS方式更新尾指針指向原尾結點的前一個節點
if (tail.compareAndSet(tailed, preNode)) {
preNode.next = null; //help gc
return tailed.item;
}
}
}
}
更新日志(2018年8月18日):最后一個else分句中的出隊邏輯有問題:錯誤的在隊列尾部進行出隊。正確的實現方式見基於雙向鏈表實現無鎖隊列的正確姿勢(修正之前博客中的錯誤)中的2.2小節。
首先獲取當前隊列的頭尾結點,然后根據尾結點是否為null以及頭尾結點是否相等分3種情況討論:
- 1.隊列為空,此時頭尾指針都不用更新,直接返回null
- 2.隊列只有一個元素,需要同時更新頭尾指針
- 3.隊列元素個數大於1,只需更新尾指針
3.3 性能測試
以JDK中的LinkedBlockingQueue和ConcurrentLinkedQueue為基准,前者通過加鎖的方式實現了線程安全,后者也是基於CAS方式實現的,不過邏輯相對復雜,因為進行了很多優化。同樣開啟10個線程,每個線程混合進行10000次入隊和出隊操作。重復進行100次上述操作計算出平均執行時間並進行比較。測試代碼見github
由上面的結果可知,同樣是基於CAS實現,ConcurrentLinkedQueue的性能比我們自己構建的無鎖隊列好不少,一個原因是其內部做了不少優化,比如尾指針並不是在每次插入時都會更新,另一個原因可能是其隊列是由單鏈表構成,少了很多指針操作。LinkedBlockingQueue性能和LockFreeLinkedQueue差不多,如果算上因為測試需要將LockFreeLinkedQueue加了一層適配器的包裝導致的方法調用的額外開銷,可能后者性能稍微還要更好些。看來JDK確實對鎖做了很多優化,通過加鎖的方式
實現線程安全並不會導致性能下降很多,所以大部分並發容器都使用鎖來保證線程安全。
4. 總結
在閱讀ReentrentLock源碼領略過Doug Lea精湛的並發技藝后趁熱打鐵,自己動手構建了無鎖的線程安全的棧和隊列。整個過程雖有不少挑戰,但最終獲益匪淺,對原子變量和CAS算法有了更深的理解,也鍛煉了分析和解決問題的能力。