本文將從 ReentrantLock 的公平鎖源碼出發,分析下 AbstractQueuedSynchronizer 這個類是怎么工作的,希望能給大家提供一些簡單的幫助。
AQS 結構
先來看看 AQS 有哪些屬性,搞清楚這些基本就知道 AQS 是什么套路了!
// 頭結點,你直接把它當做 當前持有鎖的線程 private transient volatile Node head; // 阻塞的尾節點,每個新的節點進來,都插入到最后,也就形成了一個隱視的鏈表 private transient volatile Node tail; // 這個是最重要的,不過也是最簡單的,代表當前鎖的狀態,0代表沒有被占用,大於0代表有線程持有當前鎖 // 之所以說大於0,而不是等於1,是因為鎖可以重入嘛,每次重入都加上1 private volatile int state; // 代表當前持有獨占鎖的線程,舉個最重要的使用例子,因為鎖可以重入 // reentrantLock.lock()可以嵌套調用多次,所以每次用這個來判斷當前線程是否已經擁有了鎖 // if (currentThread == getExclusiveOwnerThread()) {state++} private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer
AbstractQueuedSynchronizer 的等待隊列示意如下所示,注意了,之后分析過程中所說的 queue,也就是阻塞隊列不包含 head,因為head表示當前持有鎖的線程,並沒有在等待獲取鎖。
等待隊列中每個線程被包裝成一個 node,數據結構是鏈表,一起看看源碼吧:
static final class Node { /** Marker to indicate a node is waiting in shared mode */ // 標識節點當前在共享模式下 static final Node SHARED = new Node(); /** Marker to indicate a node is waiting in exclusive mode */ // 標識節點當前在獨占模式下 static final Node EXCLUSIVE = null; // ======== 下面的幾個int常量是給waitStatus用的 =========== /** waitStatus value to indicate thread has cancelled */ // 代碼此線程取消了爭搶這個鎖 static final int CANCELLED = 1; /** waitStatus value to indicate successor's thread needs unparking */ // 官方的描述是,其表示當前node的后繼節點對應的線程需要被喚醒 static final int SIGNAL = -1; /** waitStatus value to indicate thread is waiting on condition */ // 本文不分析condition,所以略過吧,下一篇文章會介紹這個 static final int CONDITION = -2; /** * waitStatus value to indicate the next acquireShared should * unconditionally propagate */ // 同樣的不分析,略過吧 static final int PROPAGATE = -3; // ===================================================== // 取值為上面的1、-1、-2、-3,或者0(以后會講到) // 這么理解,暫時只需要知道如果這個值 大於0 代表此線程取消了等待, // 也許就是說半天搶不到鎖,不搶了,ReentrantLock是可以指定timeouot的。。。 volatile int waitStatus; // 前驅節點的引用 volatile Node prev; // 后繼節點的引用 volatile Node next; // 這個就是線程本尊 volatile Thread thread; // 這個是在condition中用來構建單向鏈表,同樣下一篇文章中介紹 Node nextWaiter; }
Node 的數據結構其實也挺簡單的,就是 thread + waitStatus + pre + next 四個屬性而已,如果大家對LinkedList熟悉的話,那就更簡單了,如果想了解LinkedList,可以看看我前面的文章JDK1.8源碼(二)——java.util.LinkedList。
下面,我們開始說 ReentrantLock 的公平鎖,首先,我們先看下 ReentrantLock 的使用方式。
// 我用個web開發中的service概念吧 public class OrderService { // 使用static,這樣每個線程拿到的是同一把鎖 private static ReentrantLock reentrantLock = new ReentrantLock(true); public void createOrder() { // 比如我們同一時間,只允許一個線程創建訂單 reentrantLock.lock(); // 通常,lock 之后緊跟着 try 語句 try { // 這塊代碼同一時間只能有一個線程進來(獲取到鎖的線程), // 其他的線程在lock()方法上阻塞,等待獲取到鎖,再進來 // 執行代碼... } finally { // 釋放鎖 // 釋放鎖必須要在finally里,確保鎖一定會被釋放,如果寫在try里面,發生異常,則有可能不會執行,就會發生死鎖 reentrantLock.unlock(); } } }
ReentrantLock 在內部用了內部類 Sync 來管理鎖,所以真正的獲取鎖和釋放鎖是由 Sync 的實現類來控制的。
abstract static class Sync extends AbstractQueuedSynchronizer { }
Sync 有兩個實現,分別為 NonfairSync(非公平鎖)和 FairSync(公平鎖)
公平鎖:每個線程搶占鎖的順序為先后調用lock方法的順序依次獲取鎖,類似於排隊吃飯。
非公平鎖:每個線程搶占鎖的順序不定,誰運氣好,誰就獲取到鎖,和調用lock方法的先后順序無關。
我們看 FairSync 部分。
public ReentrantLock(boolean fair) { sync = fair ? new FairSync() : new NonfairSync(); }
線程搶鎖
我們來看看lock方法的實現
1 static final class FairSync extends Sync { 2 private static final long serialVersionUID = -3000897897090466540L; 3 // 爭鎖 4 final void lock() { 5 acquire(1); 6 } 7 // 來自父類AQS,我直接貼過來這邊 8 // 如果tryAcquire(arg) 返回true,表示嘗試獲取鎖成功,獲取到鎖,也就結束了。 9 // 否則,acquireQueued方法會將線程壓到隊列中 10 public final void acquire(int arg) { // 此時 arg == 1 11 // 首先調用tryAcquire(1)一下,名字上就知道,這個只是試一試 12 // 因為有可能直接就成功了呢,也就不需要進隊列排隊了 13 if (!tryAcquire(arg) && 14 // tryAcquire(arg)沒有成功,這個時候需要把當前線程掛起,放到阻塞隊列中。 15 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { 16 selfInterrupt(); 17 } 18 } 19 //先看下tryAcquire方法:使用protected修飾,留空了,是想留給子類去實現 20 protected boolean tryAcquire(int arg) { 21 throw new UnsupportedOperationException(); 22 } 23 24 //看FairSync的tryAcquire方法: 25 // 嘗試直接獲取鎖,返回值是boolean,代表是否獲取到鎖 26 // 返回true:1.沒有線程在等待鎖;2.重入鎖,線程本來就持有鎖,也就可以理所當然可以直接獲取 27 protected final boolean tryAcquire(int acquires) { 28 final Thread current = Thread.currentThread(); 29 int c = getState(); 30 // state == 0 此時此刻沒有線程持有鎖 31 if (c == 0) { 32 // 雖然此時此刻鎖是可以用的,但是這是公平鎖,既然是公平,就得講究先來后到, 33 // 看看有沒有別人在隊列中等了半天了,如果在隊列中有等待的線程,則這里就不能獲取到鎖 34 if (!hasQueuedPredecessors() && 35 // 如果沒有線程在等待,那就用CAS嘗試一下,嘗試將state的狀態從0改成1,成功了就獲取到鎖了, 36 // 不成功的話,只能說明一個問題,就在剛剛幾乎同一時刻有個線程搶先了 37 // 有其他線程同時進入到了這一步,並且執行CAS改變state狀態成功 38 compareAndSetState(0, acquires)) { 39 // 到這里就是獲取到鎖了,標記一下,告訴大家,現在是我占用了鎖 40 setExclusiveOwnerThread(current); 41 return true; 42 } 43 } 44 // 會進入這個else if分支,說明是重入了,需要操作:state=state+1 45 else if (current == getExclusiveOwnerThread()) { 46 int nextc = c + acquires; 47 if (nextc < 0) 48 throw new Error("Maximum lock count exceeded"); 49 setState(nextc); 50 return true; 51 } 52 // 如果到這里,說明前面的if和else if都沒有返回true,說明沒有獲取到鎖 53 return false; 54 } 55 }
我們來看看 tryAcquire 里面的幾個方法
public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; //如果隊列中有等待的線程,則返回true,否則返回false return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); } //我們看看第38行CAS改變state狀態的方法 //compareAndSetState(0, acquires)) acquires=1 protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this //此處調用了sun.misc.Unsafe類方法進行CAS操作 //this表示AbstractQueuedSynchronizer對象,stateOffset表示state的偏移量,expect此時為0,update為1 //此方法表示比較state的stateOffset處內存位置中的值和期望的值,如果相同則更新。 //此時表示把state的值從0改為1,成功返回true;但是同時有可能其他線程也來修改了state的值,已經不為0了,則返回false return unsafe.compareAndSwapInt(this, stateOffset, expect, update); } private static final Unsafe unsafe = Unsafe.getUnsafe(); //AbstractQueuedSynchronizer中state屬性的偏移量 private static final long stateOffset; //AbstractQueuedSynchronizer中head屬性的偏移量,后面以此類推 private static final long headOffset; private static final long tailOffset; private static final long waitStatusOffset; private static final long nextOffset; static { try { //在類加載的時候會通過sun.misc.Unsafe類方法獲取AbstractQueuedSynchronizer中各個屬性的偏移量,方便后面各種CAS操作 stateOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("state")); headOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("head")); tailOffset = unsafe.objectFieldOffset (AbstractQueuedSynchronizer.class.getDeclaredField("tail")); waitStatusOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("waitStatus")); nextOffset = unsafe.objectFieldOffset (Node.class.getDeclaredField("next")); } catch (Exception ex) { throw new Error(ex); } } /** * 比較obj的offset處內存位置中的值和期望的值,如果相同則更新。此更新是不可中斷的。 * * @param obj 需要更新的對象 * @param offset obj中整型field的偏移量 * @param expect 希望field中存在的值 * @param update 如果期望值expect與field的當前值相同,設置filed的值為這個新值 * @return 如果field的值被更改返回true */ public native boolean compareAndSwapInt(Object obj, long offset, int expect, int update); //等待隊列里沒有線程等待,並且CAS改變state成功,則進入第40行代碼 setExclusiveOwnerThread(current); protected final void setExclusiveOwnerThread(Thread thread) { //就是對exclusiveOwnerThread賦值 exclusiveOwnerThread = thread; }
由此我們清楚了tryAcquire(arg)方法的作用,就是改變把state的狀態改為1或者加1,並將 exclusiveOwnerThread 賦值為當前線程,如果獲取鎖成功,則lock()方法結束,主線程里面的業務代碼繼續往下執行。
如果不tryAcquire(arg)返回false,則要執行 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)),將當前線程掛起,放到阻塞隊列中
這個方法,首先需要執行:addWaiter(Node.EXCLUSIVE)
1 /** 2 * Creates and enqueues node for current thread and given mode. 3 * 4 * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared 5 * @return the new node 6 */ 7 // 此方法的作用是把線程包裝成node,同時進入到隊列中 8 // 參數mode此時是Node.EXCLUSIVE,代表獨占模式 9 private Node addWaiter(Node mode) { 10 Node node = new Node(Thread.currentThread(), mode); 11 // Try the fast path of enq; backup to full enq on failure 12 // 以下幾行代碼想把當前node加到鏈表的最后面去,也就是進到阻塞隊列的最后 13 Node pred = tail; 14 15 // tail!=null => 隊列不為空 16 if (pred != null) { 17 // 設置自己的前驅 為當前的隊尾節點 18 node.prev = pred; 19 // 用CAS把自己設置為隊尾,就是更新tail的值,pred表示tail原始值,node表示期望更新的值, 如果成功后,tail == node了 20 if (compareAndSetTail(pred, node)) { 21 // 進到這里說明設置成功,當前node==tail, 將自己與之前的隊尾相連 22 // 上面已經有 node.prev = pred 23 // 加上下面這句,也就實現了和之前的尾節點雙向連接了 24 // pred為臨時變量,表示之前的隊尾節點,現在將隊尾節點的next指向node,則將node添加到隊尾了 25 pred.next = node; 26 // 線程入隊了,可以返回了 27 return node; 28 } 29 } 30 // 仔細看看上面的代碼,如果會到這里, 31 // 說明 pred==null(隊列是空的) 或者 CAS失敗(有線程在競爭入隊) 32 enq(node); 33 return node; 34 } 35 36 /** 37 * Inserts node into queue, initializing if necessary. See picture above. 38 * @param node the node to insert 39 * @return node's predecessor 40 */ 41 // 采用自旋的方式入隊 42 // 之前說過,到這個方法只有兩種可能:等待隊列為空,或者有線程競爭入隊, 43 // 自旋在這邊的語義是:CAS設置tail過程中,競爭一次競爭不到,我就多次競爭,總會排到的 44 private Node enq(final Node node) { 45 for (;;) { 46 Node t = tail; 47 // 之前說過,隊列為空也會進來這里 48 if (t == null) { // Must initialize 49 // 初始化head節點 50 // 還是一步CAS,你懂的,現在可能是很多線程同時進來呢 51 if (compareAndSetHead(new Node())) 52 // 給后面用:這個時候head節點的waitStatus==0 53 // 這個時候有了head,但是tail還是null,設置一下, 54 // 注意:這里只是設置了tail=head,這里還沒return 55 // 所以,設置完了以后,繼續for循環,下次就到下面的else分支了 56 tail = head; 57 } else { 58 // 下面幾行,和上一個方法 addWaiter 是一樣的, 59 // 只是這個套在無限循環里,反正就是將當前線程排到隊尾,有線程競爭的話排不上重復排 60 node.prev = t; 61 if (compareAndSetTail(t, node)) { 62 t.next = node; 63 return t; 64 } 65 } 66 } 67 } 68 69 70 // 現在,又回到這段代碼了 71 // if (!tryAcquire(arg) 72 // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 73 // selfInterrupt(); 74 75 // 下面這個方法,參數node,經過addWaiter(Node.EXCLUSIVE),此時已經進入阻塞隊列 76 // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的話, 77 // 意味着上面這段代碼將進入selfInterrupt(),所以正常情況下,下面應該返回false 78 // 這個方法非常重要,應該說真正的線程掛起,然后被喚醒后去獲取鎖,都在這個方法里了 79 final boolean acquireQueued(final Node node, int arg) { 80 boolean failed = true; 81 try { 82 boolean interrupted = false; 83 for (;;) { 84 //獲取node的prev節點(node的上一個節點) 85 final Node p = node.predecessor(); 86 // p == head 說明當前節點雖然進到了阻塞隊列,但是是阻塞隊列的第一個,因為它的前驅是head 87 // 注意,阻塞隊列不包含head節點,head一般指的是占有鎖的線程,head后面的才稱為阻塞隊列 88 // 所以當前節點可以去試搶一下鎖 89 // 這里我們說一下,為什么可以去試試: 90 // 首先,它是隊頭,這個是第一個條件,其次,當前的head有可能是剛剛初始化的node, 91 // enq(node) 方法里面有提到,head是延時初始化的,而且new Node()的時候沒有設置任何線程 92 // 也就是說,當前的head不屬於任何一個線程,所以作為隊頭,可以去試一試, 93 // tryAcquire已經分析過了, 忘記了請往前看一下,就是簡單用CAS試操作一下state 94 if (p == head && tryAcquire(arg)) { 95 //到這里說明剛加入到等待隊列里面的node只有一個,並且此時獲取鎖成功,設置head為node 96 setHead(node); 97 //將之前的head的next設為null方便jvm垃圾回收 98 p.next = null; // help GC 99 failed = false; 100 //此時interrupted = false; 101 return interrupted; 102 } 103 // 到這里,說明上面的if分支沒有成功,要么當前node本來就不是隊頭, 104 // 要么就是tryAcquire(arg)沒有搶贏別人,繼續往下看 105 if (shouldParkAfterFailedAcquire(p, node) && 106 parkAndCheckInterrupt()) 107 interrupted = true; 108 } 109 } finally { 110 if (failed) 111 cancelAcquire(node); 112 } 113 } 114 115 /** 116 * Checks and updates status for a node that failed to acquire. 117 * Returns true if thread should block. This is the main signal 118 * control in all acquire loops. Requires that pred == node.prev 119 * 120 * @param pred node's predecessor holding status 121 * @param node the node 122 * @return {@code true} if thread should block 123 */ 124 // 剛剛說過,會到這里就是沒有搶到鎖唄,這個方法說的是:"當前線程沒有搶到鎖,是否需要掛起當前線程?" 125 // 第一個參數是前驅節點,第二個參數才是代表當前線程的節點 126 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 127 int ws = pred.waitStatus; 128 // 前驅節點的 waitStatus == -1 ,說明前驅節點狀態正常,當前線程需要掛起,直接可以返回true 129 if (ws == Node.SIGNAL) 130 /* 131 * This node has already set status asking a release 132 * to signal it, so it can safely park. 133 */ 134 return true; 135 136 // 前驅節點 waitStatus大於0 ,之前說過,大於0 說明前驅節點取消了排隊。這里需要知道這點: 137 // 進入阻塞隊列排隊的線程會被掛起,而喚醒的操作是由前驅節點完成的。 138 // 所以下面這塊代碼說的是將當前節點的prev指向waitStatus<=0的節點, 139 // 簡單說,如果前驅節點取消了排隊, 140 // 找前驅節點的前驅節,往前循環總能找到一個waitStatus<=0的節點 141 if (ws > 0) { 142 /* 143 * Predecessor was cancelled. Skip over predecessors and 144 * indicate retry. 145 */ 146 do { 147 node.prev = pred = pred.prev; 148 } while (pred.waitStatus > 0); 149 pred.next = node; 150 } else { 151 /* 152 * waitStatus must be 0 or PROPAGATE. Indicate that we 153 * need a signal, but don't park yet. Caller will need to 154 * retry to make sure it cannot acquire before parking. 155 */ 156 // 仔細想想,如果進入到這個分支意味着什么 157 // 前驅節點的waitStatus不等於-1和1,那也就是只可能是0,-2,-3 158 // 在我們前面的源碼中,都沒有看到有設置waitStatus的,所以每個新的node入隊時,waitStatu都是0 159 // 用CAS將前驅節點的waitStatus設置為Node.SIGNAL(也就是-1) 160 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 161 } 162 return false; 163 } 164 165 // private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) 166 // 這個方法結束根據返回值我們簡單分析下: 167 // 如果返回true, 說明前驅節點的waitStatus==-1,是正常情況,那么當前線程需要被掛起,等待以后被喚醒 168 // 我們也說過,以后是被前驅節點喚醒,就等着前驅節點拿到鎖,然后釋放鎖的時候叫你好了 169 // 如果返回false, 說明當前不需要被掛起,為什么呢?往后看 170 171 // 跳回到前面是這個方法 172 // if (shouldParkAfterFailedAcquire(p, node) && 173 // parkAndCheckInterrupt()) 174 // interrupted = true; 175 176 // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true, 177 // 那么需要執行parkAndCheckInterrupt(): 178 179 // 這個方法很簡單,因為前面返回true,所以需要掛起線程,這個方法就是負責掛起線程的 180 // 這里用了LockSupport.park(this)來掛起線程,然后就停在這里了,等待被喚醒======= 181 private final boolean parkAndCheckInterrupt() { 182 LockSupport.park(this); 183 return Thread.interrupted(); 184 } 185 186 // 2. 接下來說說如果shouldParkAfterFailedAcquire(p, node)返回false的情況 187 188 // 仔細看shouldParkAfterFailedAcquire(p, node),我們可以發現,其實第一次進來的時候,一般都不會返回true的,原因很簡單,前驅節點的waitStatus=-1是依賴於后繼節點設置的。 189 //intwaitStatus默認值為0,也就是說,我都還沒給前驅設置-1呢,怎么可能是true呢,但是要看到,這個方法是套在循環里的,所以第二次進來的時候狀態就是-1了。 190 191 // 為什么shouldParkAfterFailedAcquire(p, node)返回false的時候不直接掛起線程: 192 // 如果 head后面的節點 if (ws > 0)這里有多個節點的waitStatus都為1,這里多次循環之后,node的prev指向了head,此時還需要掛起嗎?當然是不需要了,下一次for循環,就能獲取到鎖了。
解鎖操作
最后,就是還需要介紹下喚醒的動作了。我們知道,正常情況下,如果線程沒獲取到鎖,線程會被 LockSupport.park(this);
掛起停止,等待被喚醒。
// 喚醒的代碼還是比較簡單的,你如果上面加鎖的都看懂了,下面都不需要看就知道怎么回事了 public void unlock() { sync.release(1); } public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } // 回到ReentrantLock看tryRelease方法 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 是否完全釋放鎖 boolean free = false; // 其實就是重入的問題,如果c==0,也就是說沒有嵌套鎖了,可以釋放了,否則還不能釋放掉 if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /** * Wakes up node's successor, if one exists. * * @param node the node */ // 喚醒后繼節點 // 從上面調用處知道,參數node是head頭結點 private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; // 如果head節點當前waitStatus<0, 將其修改為0 if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ // 下面的代碼就是喚醒后繼節點,但是有可能后繼節點取消了等待(waitStatus==1) // 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的一個 Node s = node.next; //如果頭節點后面的第一個節點狀態為-1,並沒有被取消,是不會進入到下面的方法中 if (s == null || s.waitStatus > 0) { s = null; // 從后往前找,仔細看代碼,不必擔心中間有節點取消(waitStatus==1)的情況 for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) // 喚醒線程 LockSupport.unpark(s.thread); }
但是為什么要從后面開始遍歷尋找waitStatus<=0的所有節點中排在最前面的一個,為什么不從前面的節點開始找呢?
這個問題的答案在 addWaiter(Node mode)方法中,看下面的代碼:
1 Node pred = tail; 2 if (pred != null) { 3 node.prev = pred; 4 // 1. 先設置的 tail 5 if (compareAndSetTail(pred, node)) { 6 // 2. 設置前驅節點的后繼節點為當前節點 7 pred.next = node; 8 return node; 9 } 10 }
這里存在並發問題:從前往后尋找不一定能找到剛剛加入隊列的后繼節點。
如果此時正有一個線程加入等待隊列的尾部,執行到上面第7行,第7行還未執行,解鎖操作如果從前面開始找 頭節點后面的第一個節點狀態為-1的節點,此時是找不到這個新加入的節點的,因為尾節點的next 還未指向新加入的node,但是從后面開始遍歷的話,那就不存在這種情況。
喚醒線程以后,被喚醒的線程將從以下代碼中繼續往前走:我們剛才是找到head后面第一個狀態為-1的節點里面的線程進行喚醒
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); // 剛剛線程被掛起在這里了 return Thread.interrupted(); } // 又回到這個方法了:acquireQueued(final Node node, int arg),這個時候,node的前驅是head了
此時第一個等待節點已經被喚醒,則第一個等待節點里面的線程繼續執行 acquireQueued ,此時acquireQueued 方法中 85行處p已經是head節點了,94行處就可以繼續嘗試獲取鎖了。依次循環,這個節點獲取到鎖,解鎖后,等待隊列head節點后第一個節點進行喚醒獲取鎖。
總結
在並發環境下,加鎖和解鎖需要以下三個部件的協調:
- 鎖狀態。我們要知道鎖是不是被別的線程占有了,這個就是 state 的作用,它為 0 的時候代表沒有線程占有鎖,可以去爭搶這個鎖,用 CAS 將 state 設為 1,如果 CAS 成功,說明搶到了鎖,這樣其他線程就搶不到了,如果鎖重入的話,state進行+1 就可以,解鎖就是減 1,直到 state 又變為 0,代表釋放鎖,所以 lock() 和 unlock() 必須要配對啊。然后喚醒等待隊列中的第一個線程,讓其來占有鎖。
- 線程的阻塞和解除阻塞。AQS 中采用了 LockSupport.park(thread) 來掛起線程,用 unpark 來喚醒線程。
- 阻塞隊列。因為爭搶鎖的線程可能很多,但是只能有一個線程拿到鎖,其他的線程都必須等待,這個時候就需要一個 queue 來管理這些線程,AQS 用的是一個 FIFO 的隊列,就是一個鏈表,每個 node 都持有后繼節點的引用。