並發編程(五)——AbstractQueuedSynchronizer 之 ReentrantLock源碼分析


本文將從 ReentrantLock 的公平鎖源碼出發,分析下 AbstractQueuedSynchronizer 這個類是怎么工作的,希望能給大家提供一些簡單的幫助。

AQS 結構

先來看看 AQS 有哪些屬性,搞清楚這些基本就知道 AQS 是什么套路了!

// 頭結點,你直接把它當做 當前持有鎖的線程 
private transient volatile Node head;
// 阻塞的尾節點,每個新的節點進來,都插入到最后,也就形成了一個隱視的鏈表
private transient volatile Node tail;
// 這個是最重要的,不過也是最簡單的,代表當前鎖的狀態,0代表沒有被占用,大於0代表有線程持有當前鎖
// 之所以說大於0,而不是等於1,是因為鎖可以重入嘛,每次重入都加上1
private volatile int state;
// 代表當前持有獨占鎖的線程,舉個最重要的使用例子,因為鎖可以重入
// reentrantLock.lock()可以嵌套調用多次,所以每次用這個來判斷當前線程是否已經擁有了鎖
// if (currentThread == getExclusiveOwnerThread()) {state++}
private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer

AbstractQueuedSynchronizer 的等待隊列示意如下所示,注意了,之后分析過程中所說的 queue,也就是阻塞隊列不包含 head,因為head表示當前持有鎖的線程,並沒有在等待獲取鎖。

等待隊列中每個線程被包裝成一個 node,數據結構是鏈表,一起看看源碼吧:

static final class Node {
    /** Marker to indicate a node is waiting in shared mode */
    // 標識節點當前在共享模式下
    static final Node SHARED = new Node();
    /** Marker to indicate a node is waiting in exclusive mode */
    // 標識節點當前在獨占模式下
    static final Node EXCLUSIVE = null;

    // ======== 下面的幾個int常量是給waitStatus用的 ===========
    /** waitStatus value to indicate thread has cancelled */
    // 代碼此線程取消了爭搶這個鎖
    static final int CANCELLED =  1;
    /** waitStatus value to indicate successor's thread needs unparking */
    // 官方的描述是,其表示當前node的后繼節點對應的線程需要被喚醒
    static final int SIGNAL    = -1;
    /** waitStatus value to indicate thread is waiting on condition */
    // 本文不分析condition,所以略過吧,下一篇文章會介紹這個
    static final int CONDITION = -2;
    /**
     * waitStatus value to indicate the next acquireShared should
     * unconditionally propagate
     */
    // 同樣的不分析,略過吧
    static final int PROPAGATE = -3;
    // =====================================================

    // 取值為上面的1、-1、-2、-3,或者0(以后會講到)
    // 這么理解,暫時只需要知道如果這個值 大於0 代表此線程取消了等待,
    // 也許就是說半天搶不到鎖,不搶了,ReentrantLock是可以指定timeouot的。。。
    volatile int waitStatus;
    // 前驅節點的引用
    volatile Node prev;
    // 后繼節點的引用
    volatile Node next;
    // 這個就是線程本尊
    volatile Thread thread;
    // 這個是在condition中用來構建單向鏈表,同樣下一篇文章中介紹
    Node nextWaiter;

}

Node 的數據結構其實也挺簡單的,就是 thread + waitStatus + pre + next 四個屬性而已,如果大家對LinkedList熟悉的話,那就更簡單了,如果想了解LinkedList,可以看看我前面的文章JDK1.8源碼(二)——java.util.LinkedList

下面,我們開始說 ReentrantLock 的公平鎖,首先,我們先看下 ReentrantLock 的使用方式。

// 我用個web開發中的service概念吧
public class OrderService {
    // 使用static,這樣每個線程拿到的是同一把鎖
    private static ReentrantLock reentrantLock = new ReentrantLock(true);

    public void createOrder() {
        // 比如我們同一時間,只允許一個線程創建訂單
        reentrantLock.lock();
        // 通常,lock 之后緊跟着 try 語句
        try {
            // 這塊代碼同一時間只能有一個線程進來(獲取到鎖的線程),
            // 其他的線程在lock()方法上阻塞,等待獲取到鎖,再進來
            // 執行代碼...
        } finally {
            // 釋放鎖
            // 釋放鎖必須要在finally里,確保鎖一定會被釋放,如果寫在try里面,發生異常,則有可能不會執行,就會發生死鎖
            reentrantLock.unlock();
        }
    }
}

ReentrantLock 在內部用了內部類 Sync 來管理鎖,所以真正的獲取鎖和釋放鎖是由 Sync 的實現類來控制的。

abstract static class Sync extends AbstractQueuedSynchronizer {

}

Sync 有兩個實現,分別為 NonfairSync(非公平鎖)和 FairSync(公平鎖)

公平鎖:每個線程搶占鎖的順序為先后調用lock方法的順序依次獲取鎖,類似於排隊吃飯。

非公平鎖:每個線程搶占鎖的順序不定,誰運氣好,誰就獲取到鎖,和調用lock方法的先后順序無關。

我們看 FairSync 部分。

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

線程搶鎖

我們來看看lock方法的實現

 1 static final class FairSync extends Sync {
 2     private static final long serialVersionUID = -3000897897090466540L;
 3       // 爭鎖
 4     final void lock() {
 5         acquire(1);
 6     }
 7     // 來自父類AQS,我直接貼過來這邊
 8     // 如果tryAcquire(arg) 返回true,表示嘗試獲取鎖成功,獲取到鎖,也就結束了。
 9     // 否則,acquireQueued方法會將線程壓到隊列中
10     public final void acquire(int arg) { // 此時 arg == 1
11         // 首先調用tryAcquire(1)一下,名字上就知道,這個只是試一試
12         // 因為有可能直接就成功了呢,也就不需要進隊列排隊了
13         if (!tryAcquire(arg) &&
14             // tryAcquire(arg)沒有成功,這個時候需要把當前線程掛起,放到阻塞隊列中。
15             acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) {
16               selfInterrupt();
17         }
18     }
19     //先看下tryAcquire方法:使用protected修飾,留空了,是想留給子類去實現
20     protected boolean tryAcquire(int arg) {
21         throw new UnsupportedOperationException();
22     }
23     
24     //看FairSync的tryAcquire方法:
25     // 嘗試直接獲取鎖,返回值是boolean,代表是否獲取到鎖
26     // 返回true:1.沒有線程在等待鎖;2.重入鎖,線程本來就持有鎖,也就可以理所當然可以直接獲取
27     protected final boolean tryAcquire(int acquires) {
28         final Thread current = Thread.currentThread();
29         int c = getState();
30         // state == 0 此時此刻沒有線程持有鎖
31         if (c == 0) {
32             // 雖然此時此刻鎖是可以用的,但是這是公平鎖,既然是公平,就得講究先來后到,
33             // 看看有沒有別人在隊列中等了半天了,如果在隊列中有等待的線程,則這里就不能獲取到鎖
34             if (!hasQueuedPredecessors() &&
35                 // 如果沒有線程在等待,那就用CAS嘗試一下,嘗試將state的狀態從0改成1,成功了就獲取到鎖了,
36                 // 不成功的話,只能說明一個問題,就在剛剛幾乎同一時刻有個線程搶先了
37                 // 有其他線程同時進入到了這一步,並且執行CAS改變state狀態成功
38                 compareAndSetState(0, acquires)) {
39                 // 到這里就是獲取到鎖了,標記一下,告訴大家,現在是我占用了鎖
40                 setExclusiveOwnerThread(current);
41                 return true;
42             }
43         }
44           // 會進入這個else if分支,說明是重入了,需要操作:state=state+1
45         else if (current == getExclusiveOwnerThread()) {
46             int nextc = c + acquires;
47             if (nextc < 0)
48                 throw new Error("Maximum lock count exceeded");
49             setState(nextc);
50             return true;
51         }
52         // 如果到這里,說明前面的if和else if都沒有返回true,說明沒有獲取到鎖
53         return false;
54     }
55 }

 我們來看看 tryAcquire 里面的幾個方法

public final boolean hasQueuedPredecessors() {
    // The correctness of this depends on head being initialized
    // before tail and on head.next being accurate if the current
    // thread is first in queue.
    Node t = tail; // Read fields in reverse initialization order
    Node h = head;
    Node s;
    //如果隊列中有等待的線程,則返回true,否則返回false
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}
//我們看看第38行CAS改變state狀態的方法 
//compareAndSetState(0, acquires)) acquires=1

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    //此處調用了sun.misc.Unsafe類方法進行CAS操作
    //this表示AbstractQueuedSynchronizer對象,stateOffset表示state的偏移量,expect此時為0,update為1
    //此方法表示比較state的stateOffset處內存位置中的值和期望的值,如果相同則更新。
    //此時表示把state的值從0改為1,成功返回true;但是同時有可能其他線程也來修改了state的值,已經不為0了,則返回false
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

private static final Unsafe unsafe = Unsafe.getUnsafe();
//AbstractQueuedSynchronizer中state屬性的偏移量
private static final long stateOffset;
//AbstractQueuedSynchronizer中head屬性的偏移量,后面以此類推
private static final long headOffset;
private static final long tailOffset;
private static final long waitStatusOffset;
private static final long nextOffset;

static {
    try {
        //在類加載的時候會通過sun.misc.Unsafe類方法獲取AbstractQueuedSynchronizer中各個屬性的偏移量,方便后面各種CAS操作
        stateOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
        headOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
        tailOffset = unsafe.objectFieldOffset
            (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
        waitStatusOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("waitStatus"));
        nextOffset = unsafe.objectFieldOffset
            (Node.class.getDeclaredField("next"));

    } catch (Exception ex) { throw new Error(ex); }
}
    
/**
* 比較obj的offset處內存位置中的值和期望的值,如果相同則更新。此更新是不可中斷的。
* 
* @param obj 需要更新的對象
* @param offset obj中整型field的偏移量
* @param expect 希望field中存在的值
* @param update 如果期望值expect與field的當前值相同,設置filed的值為這個新值
* @return 如果field的值被更改返回true
*/
public native boolean compareAndSwapInt(Object obj, long offset, int expect, int update);

//等待隊列里沒有線程等待,並且CAS改變state成功,則進入第40行代碼 setExclusiveOwnerThread(current);
protected final void setExclusiveOwnerThread(Thread thread) {
    //就是對exclusiveOwnerThread賦值
    exclusiveOwnerThread = thread;
}

由此我們清楚了tryAcquire(arg)方法的作用,就是改變把state的狀態改為1或者加1,並將 exclusiveOwnerThread 賦值為當前線程,如果獲取鎖成功,則lock()方法結束,主線程里面的業務代碼繼續往下執行。

如果不tryAcquire(arg)返回false,則要執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),將當前線程掛起,放到阻塞隊列中

這個方法,首先需要執行:addWaiter(Node.EXCLUSIVE)

  1 /**
  2  * Creates and enqueues node for current thread and given mode.
  3  *
  4  * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  5  * @return the new node
  6  */
  7 // 此方法的作用是把線程包裝成node,同時進入到隊列中
  8 // 參數mode此時是Node.EXCLUSIVE,代表獨占模式
  9 private Node addWaiter(Node mode) {
 10     Node node = new Node(Thread.currentThread(), mode);
 11     // Try the fast path of enq; backup to full enq on failure
 12     // 以下幾行代碼想把當前node加到鏈表的最后面去,也就是進到阻塞隊列的最后
 13     Node pred = tail;
 14 
 15     // tail!=null => 隊列不為空
 16     if (pred != null) { 
 17         // 設置自己的前驅 為當前的隊尾節點
 18         node.prev = pred; 
 19         // 用CAS把自己設置為隊尾,就是更新tail的值,pred表示tail原始值,node表示期望更新的值, 如果成功后,tail == node了
 20         if (compareAndSetTail(pred, node)) { 
 21             // 進到這里說明設置成功,當前node==tail, 將自己與之前的隊尾相連
 22             // 上面已經有 node.prev = pred
 23             // 加上下面這句,也就實現了和之前的尾節點雙向連接了
 24             // pred為臨時變量,表示之前的隊尾節點,現在將隊尾節點的next指向node,則將node添加到隊尾了
 25             pred.next = node;
 26             // 線程入隊了,可以返回了
 27             return node;
 28         }
 29     }
 30     // 仔細看看上面的代碼,如果會到這里,
 31     // 說明 pred==null(隊列是空的) 或者 CAS失敗(有線程在競爭入隊)
 32     enq(node);
 33     return node;
 34 }
 35 
 36 /**
 37  * Inserts node into queue, initializing if necessary. See picture above.
 38  * @param node the node to insert
 39  * @return node's predecessor
 40  */
 41 // 采用自旋的方式入隊
 42 // 之前說過,到這個方法只有兩種可能:等待隊列為空,或者有線程競爭入隊,
 43 // 自旋在這邊的語義是:CAS設置tail過程中,競爭一次競爭不到,我就多次競爭,總會排到的
 44 private Node enq(final Node node) {
 45     for (;;) {
 46         Node t = tail;
 47         // 之前說過,隊列為空也會進來這里
 48         if (t == null) { // Must initialize
 49             // 初始化head節點
 50             // 還是一步CAS,你懂的,現在可能是很多線程同時進來呢
 51             if (compareAndSetHead(new Node()))
 52                 // 給后面用:這個時候head節點的waitStatus==0
 53                 // 這個時候有了head,但是tail還是null,設置一下,
 54                 // 注意:這里只是設置了tail=head,這里還沒return
 55                 // 所以,設置完了以后,繼續for循環,下次就到下面的else分支了
 56                 tail = head;
 57         } else {
 58             // 下面幾行,和上一個方法 addWaiter 是一樣的,
 59             // 只是這個套在無限循環里,反正就是將當前線程排到隊尾,有線程競爭的話排不上重復排
 60             node.prev = t;
 61             if (compareAndSetTail(t, node)) {
 62                 t.next = node;
 63                 return t;
 64             }
 65         }
 66     }
 67 }
 68 
 69 
 70 // 現在,又回到這段代碼了
 71 // if (!tryAcquire(arg) 
 72 //        && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 
 73 //     selfInterrupt();
 74 
 75 // 下面這個方法,參數node,經過addWaiter(Node.EXCLUSIVE),此時已經進入阻塞隊列
 76 // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的話,
 77 // 意味着上面這段代碼將進入selfInterrupt(),所以正常情況下,下面應該返回false
 78 // 這個方法非常重要,應該說真正的線程掛起,然后被喚醒后去獲取鎖,都在這個方法里了
 79 final boolean acquireQueued(final Node node, int arg) {
 80     boolean failed = true;
 81     try {
 82         boolean interrupted = false;
 83         for (;;) {
 84             //獲取node的prev節點(node的上一個節點)
 85             final Node p = node.predecessor();
 86             // p == head 說明當前節點雖然進到了阻塞隊列,但是是阻塞隊列的第一個,因為它的前驅是head
 87             // 注意,阻塞隊列不包含head節點,head一般指的是占有鎖的線程,head后面的才稱為阻塞隊列
 88             // 所以當前節點可以去試搶一下鎖
 89             // 這里我們說一下,為什么可以去試試:
 90             // 首先,它是隊頭,這個是第一個條件,其次,當前的head有可能是剛剛初始化的node,
 91             // enq(node) 方法里面有提到,head是延時初始化的,而且new Node()的時候沒有設置任何線程
 92             // 也就是說,當前的head不屬於任何一個線程,所以作為隊頭,可以去試一試,
 93             // tryAcquire已經分析過了, 忘記了請往前看一下,就是簡單用CAS試操作一下state
 94             if (p == head && tryAcquire(arg)) {
 95                 //到這里說明剛加入到等待隊列里面的node只有一個,並且此時獲取鎖成功,設置head為node
 96                 setHead(node);
 97                 //將之前的head的next設為null方便jvm垃圾回收
 98                 p.next = null; // help GC
 99                 failed = false;
100                 //此時interrupted = false;
101                 return interrupted;
102             }
103             // 到這里,說明上面的if分支沒有成功,要么當前node本來就不是隊頭,
104             // 要么就是tryAcquire(arg)沒有搶贏別人,繼續往下看
105             if (shouldParkAfterFailedAcquire(p, node) &&
106                 parkAndCheckInterrupt())
107                 interrupted = true;
108         }
109     } finally {
110         if (failed)
111             cancelAcquire(node);
112     }
113 }
114 
115 /**
116  * Checks and updates status for a node that failed to acquire.
117  * Returns true if thread should block. This is the main signal
118  * control in all acquire loops.  Requires that pred == node.prev
119  *
120  * @param pred node's predecessor holding status
121  * @param node the node
122  * @return {@code true} if thread should block
123  */
124 // 剛剛說過,會到這里就是沒有搶到鎖唄,這個方法說的是:"當前線程沒有搶到鎖,是否需要掛起當前線程?"
125 // 第一個參數是前驅節點,第二個參數才是代表當前線程的節點
126 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
127     int ws = pred.waitStatus;
128     // 前驅節點的 waitStatus == -1 ,說明前驅節點狀態正常,當前線程需要掛起,直接可以返回true
129     if (ws == Node.SIGNAL)
130         /*
131          * This node has already set status asking a release
132          * to signal it, so it can safely park.
133          */
134         return true;
135 
136     // 前驅節點 waitStatus大於0 ,之前說過,大於0 說明前驅節點取消了排隊。這里需要知道這點:
137     // 進入阻塞隊列排隊的線程會被掛起,而喚醒的操作是由前驅節點完成的。
138     // 所以下面這塊代碼說的是將當前節點的prev指向waitStatus<=0的節點,
139     // 簡單說,如果前驅節點取消了排隊,
140     // 找前驅節點的前驅節,往前循環總能找到一個waitStatus<=0的節點
141     if (ws > 0) {
142         /*
143          * Predecessor was cancelled. Skip over predecessors and
144          * indicate retry.
145          */
146         do {
147             node.prev = pred = pred.prev;
148         } while (pred.waitStatus > 0);
149         pred.next = node;
150     } else {
151         /*
152          * waitStatus must be 0 or PROPAGATE.  Indicate that we
153          * need a signal, but don't park yet.  Caller will need to
154          * retry to make sure it cannot acquire before parking.
155          */
156         // 仔細想想,如果進入到這個分支意味着什么
157         // 前驅節點的waitStatus不等於-1和1,那也就是只可能是0,-2,-3
158         // 在我們前面的源碼中,都沒有看到有設置waitStatus的,所以每個新的node入隊時,waitStatu都是0
159         // 用CAS將前驅節點的waitStatus設置為Node.SIGNAL(也就是-1)
160         compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
161     }
162     return false;
163 }
164 
165 // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node)
166 // 這個方法結束根據返回值我們簡單分析下:
167 // 如果返回true, 說明前驅節點的waitStatus==-1,是正常情況,那么當前線程需要被掛起,等待以后被喚醒
168 // 我們也說過,以后是被前驅節點喚醒,就等着前驅節點拿到鎖,然后釋放鎖的時候叫你好了
169 // 如果返回false, 說明當前不需要被掛起,為什么呢?往后看
170 
171 // 跳回到前面是這個方法
172 // if (shouldParkAfterFailedAcquire(p, node) &&
173 //                parkAndCheckInterrupt())
174 //                interrupted = true;
175 
176 // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,
177 // 那么需要執行parkAndCheckInterrupt():
178 
179 // 這個方法很簡單,因為前面返回true,所以需要掛起線程,這個方法就是負責掛起線程的
180 // 這里用了LockSupport.park(this)來掛起線程,然后就停在這里了,等待被喚醒=======
181 private final boolean parkAndCheckInterrupt() {
182     LockSupport.park(this);
183     return Thread.interrupted();
184 }
185 
186 // 2. 接下來說說如果shouldParkAfterFailedAcquire(p, node)返回false的情況
187 
188 // 仔細看shouldParkAfterFailedAcquire(p, node),我們可以發現,其實第一次進來的時候,一般都不會返回true的,原因很簡單,前驅節點的waitStatus=-1是依賴於后繼節點設置的。
189 //intwaitStatus默認值為0,也就是說,我都還沒給前驅設置-1呢,怎么可能是true呢,但是要看到,這個方法是套在循環里的,所以第二次進來的時候狀態就是-1了。
190 
191 // 為什么shouldParkAfterFailedAcquire(p, node)返回false的時候不直接掛起線程:
192 // 如果 head后面的節點 if (ws > 0)這里有多個節點的waitStatus都為1,這里多次循環之后,node的prev指向了head,此時還需要掛起嗎?當然是不需要了,下一次for循環,就能獲取到鎖了。

解鎖操作

最后,就是還需要介紹下喚醒的動作了。我們知道,正常情況下,如果線程沒獲取到鎖,線程會被 LockSupport.park(this); 掛起停止,等待被喚醒。

// 喚醒的代碼還是比較簡單的,你如果上面加鎖的都看懂了,下面都不需要看就知道怎么回事了
public void unlock() {
    sync.release(1);
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

// 回到ReentrantLock看tryRelease方法
protected final boolean tryRelease(int releases) {
    int c = getState() - releases;
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    // 是否完全釋放鎖
    boolean free = false;
    // 其實就是重入的問題,如果c==0,也就是說沒有嵌套鎖了,可以釋放了,否則還不能釋放掉
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    setState(c);
    return free;
}

/**
 * Wakes up node's successor, if one exists.
 *
 * @param node the node
 */
// 喚醒后繼節點
// 從上面調用處知道,參數node是head頭結點
private void unparkSuccessor(Node node) {
    /*
     * If status is negative (i.e., possibly needing signal) try
     * to clear in anticipation of signalling.  It is OK if this
     * fails or if status is changed by waiting thread.
     */
    int ws = node.waitStatus;
    // 如果head節點當前waitStatus<0, 將其修改為0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    /*
     * Thread to unpark is held in successor, which is normally
     * just the next node.  But if cancelled or apparently null,
     * traverse backwards from tail to find the actual
     * non-cancelled successor.
     */
    // 下面的代碼就是喚醒后繼節點,但是有可能后繼節點取消了等待(waitStatus==1)
    // 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的一個
    Node s = node.next;
    //如果頭節點后面的第一個節點狀態為-1,並沒有被取消,是不會進入到下面的方法中
    if (s == null || s.waitStatus > 0) {
        s = null;
        // 從后往前找,仔細看代碼,不必擔心中間有節點取消(waitStatus==1)的情況
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)
        // 喚醒線程
        LockSupport.unpark(s.thread);
}

但是為什么要從后面開始遍歷尋找waitStatus<=0的所有節點中排在最前面的一個,為什么不從前面的節點開始找呢?
這個問題的答案在 addWaiter(Node mode)方法中,看下面的代碼:

 1 Node pred = tail;
 2     if (pred != null) {
 3         node.prev = pred;
 4         // 1. 先設置的 tail
 5         if (compareAndSetTail(pred, node)) {
 6             // 2. 設置前驅節點的后繼節點為當前節點
 7             pred.next = node;
 8             return node;
 9         }
10 }

這里存在並發問題:從前往后尋找不一定能找到剛剛加入隊列的后繼節點。

如果此時正有一個線程加入等待隊列的尾部,執行到上面第7行,第7行還未執行,解鎖操作如果從前面開始找 頭節點后面的第一個節點狀態為-1的節點,此時是找不到這個新加入的節點的,因為尾節點的next 還未指向新加入的node,但是從后面開始遍歷的話,那就不存在這種情況。

 

喚醒線程以后,被喚醒的線程將從以下代碼中繼續往前走:我們剛才是找到head后面第一個狀態為-1的節點里面的線程進行喚醒

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this); // 剛剛線程被掛起在這里了
    return Thread.interrupted();
}
// 又回到這個方法了:acquireQueued(final Node node, int arg),這個時候,node的前驅是head了

此時第一個等待節點已經被喚醒,則第一個等待節點里面的線程繼續執行 acquireQueued ,此時acquireQueued 方法中 85行處p已經是head節點了,94行處就可以繼續嘗試獲取鎖了。依次循環,這個節點獲取到鎖,解鎖后,等待隊列head節點后第一個節點進行喚醒獲取鎖。

 

總結

在並發環境下,加鎖和解鎖需要以下三個部件的協調:

  1. 鎖狀態。我們要知道鎖是不是被別的線程占有了,這個就是 state 的作用,它為 0 的時候代表沒有線程占有鎖,可以去爭搶這個鎖,用 CAS 將 state 設為 1,如果 CAS 成功,說明搶到了鎖,這樣其他線程就搶不到了,如果鎖重入的話,state進行+1 就可以,解鎖就是減 1,直到 state 又變為 0,代表釋放鎖,所以 lock() 和 unlock() 必須要配對啊。然后喚醒等待隊列中的第一個線程,讓其來占有鎖。
  2. 線程的阻塞和解除阻塞。AQS 中采用了 LockSupport.park(thread) 來掛起線程,用 unpark 來喚醒線程。
  3. 阻塞隊列。因為爭搶鎖的線程可能很多,但是只能有一個線程拿到鎖,其他的線程都必須等待,這個時候就需要一個 queue 來管理這些線程,AQS 用的是一個 FIFO 的隊列,就是一個鏈表,每個 node 都持有后繼節點的引用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM