AQS系列(一)- ReentrantLock的加鎖


前言

    AQS即AbstractQueuedSynchronizer,是JUC包中的一個核心抽象類,JUC包中的絕大多數功能都是直接或間接通過它來實現的。本文是AQS系列的第一篇,后面會持續更新多篇,爭取將JUC包中AQS相關的常用功能講清楚,一方面鞏固自己的知識體系,一方面亦可與各位園友互相學習。寒冷的冬天,要用技術來溫暖自己。

一、AQS與ReentrantLock的關系

    先奉上一張自制的丑陋類圖

 

 

 

     從下往上看,ReentrantLock類內部有兩個靜態內部類FairSync和NonfairSync,分別代表了公平鎖和非公平鎖(注意ReentrantLock實現的鎖是可重入排它鎖)。這兩個靜態內部類又共同繼承了ReentrantLock的一個內部靜態抽象類Sync,此抽象類繼承AQS。

    類的關系搞清楚了,我們下面一起看一下源碼。

二、源碼解讀

    ReentrantLock的默認構造方法創建的是非公平鎖,也可以通過傳入true來指定生成公平鎖。下面我們以公平鎖的加鎖過程為例,進行解讀源碼。在解讀源碼之前需要先明確一下AQS中的state屬性,它是int類型,state=0表示當前lock沒有被占用,state=1表示被占用,如果是重入狀態,則重入了幾次state就是幾。

 1 public class JucLockDemo1 {
 2     public static void main(String[] args){
 3         ReentrantLock lock = new ReentrantLock(true);
 4         Thread t1 = new Thread(() -> {
 5             lock.lock();
 6             // 業務邏輯
 7             lock.unlock();
 8         });
 9         t1.start();
10         System.out.println("main end");
11     }
12 }

    其中第5行lock方法點進去的代碼:

1 public void lock() {
2         sync.lock();
3     }

    直接調了sync的lock方法,sync下面的lock方法是抽象方法,方法邏輯取決於具體的實現類,因為我們這里創建的是公平鎖,所以進FairSync看它的lock方法實現:

1 final void lock() {
2             acquire(1);
3         }

    FairSync中的lock方法很簡單,直接調用了acquire方法,參數是1,繼續跟蹤:

1 public final void acquire(int arg) {
2         if (!tryAcquire(arg) &&
3             acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4             selfInterrupt();
5     }

    acquire方法位於AQS中,很重要,雖然只有短短的三行,但是里面的內容非常多。下面對里面的方法分別進行解讀。

方法1:tryAcquire(arg)

    此方法在FairSync中進行了實現,代碼如下所示:

 1 protected final boolean tryAcquire(int acquires) {
 2             final Thread current = Thread.currentThread();
 3             int c = getState();
 4             // 判斷state狀態,如果是0表示鎖空閑,可以去嘗試獲取
 5             if (c == 0) {
 6                 if (!hasQueuedPredecessors() &&
 7                     compareAndSetState(0, acquires)) {
 8                     setExclusiveOwnerThread(current);
 9                     return true;
10                 }
11             }// exclusiceOwnerThread存放的是當前運行的獨占線程,如果此處判斷為true,說明是當前線程第二次加鎖,可以重入,只是要將state+1
12             else if (current == getExclusiveOwnerThread()) {
13                 int nextc = c + acquires;
14                 if (nextc < 0)
15                     throw new Error("Maximum lock count exceeded");
16                 setState(nextc);
17                 return true;
18             }
19             return false;
20         }

    第二個if判斷很好理解,是ReentrantLock對重入和排他的支持(所以說它是可重入排他鎖),但是判斷c==0之后的邏輯就比較麻煩了。

    首先理解一下當前的邏輯:如果state=0說明lock空閑,又因為是公平鎖,所以要先判斷當前AQS隊列中還有沒有排隊的任務,如果沒有的話,就走一個CAS將state改成1,然后設置排他的執行線程,獲取執行權;如果隊列中有任務,那么acquire方法只能先返回false了。那么可以推斷出,hasQueuedPredecessors方法就是用來判斷隊列中是否有排隊的

    點進去看看Lea大神的實現邏輯吧。

 1 public final boolean hasQueuedPredecessors() {
 2         // The correctness of this depends on head being initialized
 3         // before tail and on head.next being accurate if the current
 4         // thread is first in queue.
 5         Node t = tail; // Read fields in reverse initialization order
 6         Node h = head;
 7         Node s;
 8         return h != t &&
 9             ((s = h.next) == null || s.thread != Thread.currentThread());
10     }

    代碼不多,但表達的意思比較晦澀。第一個判斷h!=t,如果h=t,說明隊列是空的,這時這個判斷條件是false,方法直接就返回了,這時外面的if取反是true,會繼續走CAS搶占state和排他線程,獲取鎖,這種情況的路就走完了。如果h!=t為true,說明現在隊列中有任務,這時進入后面的大括號 ((s = h.next) == null || s.thread != Thread.currentThread()) ,在隊列中有任務的情況下,還有兩種可能,一種是隊列中的第一個任務就是當前線程,另一種是第一個任務不是當前線程。因為是公平鎖,如果第一個任務時當前線程的話,那么它有權再去申請一下獲取鎖,如果第一個任務不是當前線程,那么當前線程就乖乖排隊吧,等前面的執行完了才能輪到你。后面的大括號就是對這兩種情況進行了區分,我們用反向邏輯來分析。方法hasQueuedPredecessors表示如果當前線程可以去競爭鎖則返回false,不能競爭鎖則返回true后面大括號結果為false的話當前線程才會去搶占鎖,一個或運算怎樣才能是false?或的兩邊都是false,就是說要(s = h.next) != null && s.thread == Thread.currentThread(),意思就是隊列中第一個任務不為空且第一個任務就是當前線程,而這個&&的非與上述源碼中的||在邏輯上是等價的,所以到這里意思就清楚了,return的&&連接的兩個條件意思是:判斷是否隊列不為空且(第一個任務為空或者不是當前線程)。

    hasQueuedPredecessors方法講完,tryAcquire方法就沒有什么難點了,這時我們回到上面開始的acquire(int arg)方法。如果tryAcquire返回的是true,說明獲取到了鎖,那么就不會再走后面的流程了;如果返回的是false,則進入acquireQueue。但我們先看里面的addWaiter方法。

方法2:  addWaiter(Node.EXCLUSIVE), arg)

     此方法用於生成當前線程的node節點並把它放在隊尾,方法源碼:

 1 private Node addWaiter(Node mode) {
 2         Node node = new Node(Thread.currentThread(), mode);// 創建當前線程的node節點
 3         // Try the fast path of enq; backup to full enq on failure
 4         Node pred = tail;
 5         if (pred != null) { // 判斷隊尾是否為空,如果不為空則將node節點拼接在后面
 6             node.prev = pred; // 將node節點連接到隊尾節點
 7             if (compareAndSetTail(pred, node)) { // 通過CAS將node節點放到隊尾
 8                 pred.next = node; // 如果CAS操作成功了,那么將原隊尾節點的next連接到node節點,組成雙向隊列
 9                 return node;
10             }
11         }
12         enq(node); // 能到這里的話分兩種情況:1、隊尾是空的;2、隊尾不是空的,但是進行CAS操作時由於被其他線程搶占導致失敗;
13         return node;
14     }

通過注解大家應該能梳理清楚邏輯,下面着重說一下enq(node)方法的實現:

 1 private Node enq(final Node node) {
 2         for (;;) {
 3             Node t = tail;
 4             if (t == null) { // Must initialize 隊尾是null,符合前面說的第一種情況
 5                 if (compareAndSetHead(new Node())) // 設置隊首
 6                     tail = head; // 隊首隊尾都初始化成空node
 7             } else { // 隊尾不為空,是前面說的第二種情況,此種情況的處理邏輯同上面對pred != null的處理
 8                 node.prev = t;
 9                 if (compareAndSetTail(t, node)) {
10                     t.next = node;
11                     return t;
12                 }
13             }
14         }
15     }

    可以看到此方法無限循環,直到執行完else中的邏輯。此處需要注意的一點是,如果剛開始時隊列是空的,即tail是null,會觸發隊首隊尾的初始化,初始化之后再一次循環會進入else中,將node放到原隊尾的后面,返回t。注意返回的t沒有用到,是在其他場景的方法中用的。

 方法3:acquireQueued(final Node node, int arg)

     該方法用於獲取鎖,返回值表示當前獲取到鎖的線程在獲取鎖的過程中是否中斷過,下面先看源碼:

 1 final boolean acquireQueued(final Node node, int arg) {
 2         boolean failed = true;
 3         try {
 4             boolean interrupted = false;
 5             for (;;) {
 6                 final Node p = node.predecessor(); // 獲取當前節點的前一個節點
 7                 if (p == head && tryAcquire(arg)) { // 如果p==head說明node是第一個任務,那么就可以通過tryAcquire去獲取鎖
 8                     setHead(node); // 獲取鎖成功,則將node放到隊首位置,並將thread和prev置為null
 9                     p.next = null; // help GC 再將p的next置為null,切斷與外界的一切聯系
10                     failed = false;
11                     return interrupted;
12                 }// 下面if中的兩個方法很重要,着重講解
13                 if (shouldParkAfterFailedAcquire(p, node) &&
14                     parkAndCheckInterrupt())
15                     interrupted = true;
16             }
17         } finally {
18             if (failed)
19                 cancelAcquire(node);
20         }
21     }

    通過注解,相信對第一個if中的邏輯能理解清楚,下我們着重講解第二個if中的兩個方法。

    第一個是 shouldParkAfterFailedAcquire(p, node) 方法,此方法的邏輯為:

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2         int ws = pred.waitStatus; // 1、對於新建的Node節點,此狀態都為0(只有addConditionWaiter新建node節點時才不是0)
 3         if (ws == Node.SIGNAL)
 4             // 3、在2中將ws置為-1后,該方法返回false,外層for循環再走一圈,第二次進入此方法時會進入這里,直接返回true。 -1的狀態表示可以將當前線程park
 5             return true;
 6         if (ws > 0) {
 7 
 8             do {
 9                 node.prev = pred = pred.prev;
10             } while (pred.waitStatus > 0);
11             pred.next = node;
12         } else {
13             // 2、是ws=0的話會進入這里,將ws置為-1,0的狀態表示還不能park
14             compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
15         }
16         return false;
17     }

    如果返回的是true,則進入第二個方法將當前線程暫停:

1 private final boolean parkAndCheckInterrupt() {
2         LockSupport.park(this);
3         return Thread.interrupted();
4     }

    當前面的線程執行完畢,喚醒這個線程的時候,就會從第三行開始繼續執行for循環中獲取鎖的邏輯,直到獲取鎖。

 

    到這里,ReentrantLock的lock方法便結束了,整體流程就是這樣。看JUC包中的源碼,可以看到寫的很簡潔,有時一兩個簡單的判斷條件卻代表了非常多的意思,充分顯示了編程者縝密又舉重若輕的實力,讀這樣的源碼,有一種看本格推理小說般的思維上的愉悅感。

    下一節我們將介紹unlock方法的原理,與本節最后一個方法就能接上了,下期再會!

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM