AQS實現原理分析——ReentrantLock


在Java並發包java.util.concurrent中可以看到,不少源碼是基於AbstractQueuedSynchronizer(以下簡寫AQS)這個抽象類,因為它是Java並發包的基礎工具類,是實現ReentrantLock、CountDownLatch、Semaphore、FutureTask 等類的基礎。 
AQS的主要使用方式是繼承,子類通過繼承AQS並實現它的抽象方法來管理同步狀態,在抽象方法中免不了要對同步狀態進行更改,這時就需要使用AQS提供的3個方法(getState()、setState(int newState)和compareAndSetState(int expect,int update) 來進行操作,因為他們能夠保證狀態的改變是安全的 。

 

1 AQS原理介紹

在 AQS 內部,通過維護一個FIFO隊列來管理多線程的排隊工作。在公平競爭的情況下,無法獲取同步狀態的線程將會被封裝成一個節點,置於隊列尾部。入隊的線程將會通過自旋的方式獲取同步狀態,若在有限次的嘗試后,仍未獲取成功,線程則會被阻塞住。大致示意圖如下: 
此處輸入圖片的描述 
當頭結點釋放同步狀態后,且后繼節點對應的線程被阻塞,此時頭結點線程將會去喚醒后繼節點線程。后繼節點線程恢復運行並獲取同步狀態后,會將舊的頭結點從隊列中移除,並將自己設為頭結點。大致示意圖如下: 
此處輸入圖片的描述

 

2 AQS結構

 

2.1 AQS屬性

先來看看 AQS 有哪些變量,搞清楚這些基本就知道 AQS 是什么套路了,畢竟可以猜嘛!

 
  1. // 頭結點,你直接把它當做 當前持有鎖的線程 可能是最好理解的
  2. private transient volatile Node head;
  3. // 阻塞的尾節點,每個新的節點進來,都插入到最后,也就形成了一個隱視的鏈表
  4. private transient volatile Node tail;
  5. // 這個是最重要的,不過也是最簡單的,代表當前鎖的狀態,0代表沒有被占用,大於0代表有線程持有當前鎖
  6. // 之所以說大於0,而不是等於1,是因為鎖可以重入嘛,每次重入都加上1
  7. private volatile int state;
  8. // 代表當前持有獨占鎖的線程,舉個最重要的使用例子,因為鎖可以重入
  9. // reentrantLock.lock()可以嵌套調用多次,所以每次用這個來判斷當前線程是否已經擁有了鎖
  10. // if (currentThread == getExclusiveOwnerThread()) {state++}
  11. private transient Thread exclusiveOwnerThread; //繼承自AbstractOwnableSynchronizer

在並發的情況下,AQS 會將未獲取同步狀態的線程將會封裝成節點,並將其放入同步隊列尾部。同步隊列中的節點除了要保存線程,還要保存等待狀態。不管是獨占式還是共享式,在獲取狀態失敗時都會用到節點類。所以這里我們要先看一下節點類的實現,為后面的源碼分析進行簡單鋪墊。既然AQS通過一個雙向鏈表來維護所有的的節點,那么先看一下每一個節點的結構:

 
  1. static final class Node {
  2. /** Marker to indicate a node is waiting in shared mode */
  3. // 標識節點當前在共享模式下
  4. static final Node SHARED = new Node();
  5. /** Marker to indicate a node is waiting in exclusive mode */
  6. // 標識節點當前在獨占模式下
  7. static final Node EXCLUSIVE = null;
  8. // ======== 下面的幾個int常量是給waitStatus用的 ===========
  9. /** waitStatus value to indicate thread has cancelled */
  10. // 代碼此線程取消了爭搶這個鎖
  11. static final int CANCELLED = 1;
  12. /** waitStatus value to indicate successor's thread needs unparking */
  13. // 官方的描述是,其表示當前node的后繼節點對應的線程需要被喚醒
  14. static final int SIGNAL = -1;
  15. /** waitStatus value to indicate thread is waiting on condition */
  16. // 本文不分析condition,所以略過吧,下一篇文章會介紹這個
  17. static final int CONDITION = -2;
  18. /**
  19. * waitStatus value to indicate the next acquireShared should
  20. * unconditionally propagate
  21. */
  22. // 同樣的不分析,略過吧
  23. static final int PROPAGATE = -3;
  24. // =====================================================
  25. // 取值為上面的1、-1、-2、-3,或者0(以后會講到)
  26. // 這么理解,暫時只需要知道如果這個值 大於0 代表此線程取消了等待,
  27. // 也許就是說半天搶不到鎖,不搶了,ReentrantLock是可以指定timeouot的。。。
  28. volatile int waitStatus;
  29. // 前驅節點的引用
  30. volatile Node prev;
  31. // 后繼節點的引用
  32. volatile Node next;
  33. // 這個就是線程本尊
  34. volatile Thread thread;
  35. }

Node 的數據結構其實也挺簡單的,就是 thread + waitStatus + pre + next 四個屬性而已。

 

2.1 AQS方法

本節將介紹三組重要的方法,通過使用這三組方法即可實現一個同步組件。

第一組方法是用於訪問/設置同步狀態的,如下:

方法 描述
int getState() 獲取同步狀態
void setState() 設置同步狀態
boolean compareAndSetState(int expect, int update) 通過 CAS 設置同步狀態

第二組方需要由同步組件覆寫。如下:

方法 描述
boolean tryAcquire(int arg) 獨占式獲取同步狀態
boolean tryRelease(int arg) 獨占式釋放同步狀態
int tryAcquireShared(int arg) 共享式獲取同步狀態
boolean tryReleaseShared(int arg) 共享式私房同步狀態
boolean isHeldExclusively() 檢測當前線程是否獲取獨占鎖

第三組方法是一組模板方法,同步組件可直接調用。如下:

方法 描述
void acquire(int arg) 獨占式獲取同步狀態,該方法將會調用 tryAcquire 嘗試獲取同步狀態。獲取成功則返回,獲取失敗,線程進入同步隊列等待。
void acquireInterruptibly(int arg) 響應中斷版的 acquire
boolean tryAcquireNanos(int arg,long nanos) 超時+響應中斷版的 acquire
void acquireShared(int arg) 共享式獲取同步狀態,同一時刻可能會有多個線程獲得同步狀態。比如讀寫鎖的讀鎖就是就是調用這個方法獲取同步狀態的。
void acquireSharedInterruptibly(int arg) 響應中斷版的 acquireShared
boolean tryAcquireSharedNanos(int arg,long nanos) 超時+響應中斷版的 acquireShared
boolean release(int arg) 獨占式釋放同步狀態
boolean releaseShared(int arg) 共享式釋放同步狀態
 

3 ReentrantLock加鎖

ReentrantLock 在內部用了內部類 Sync 來管理鎖,所以真正的獲取鎖和釋放鎖是由 Sync 的實現類來控制的。 
Sync 有兩個實現,分別為 NonfairSync(非公平鎖)和 FairSync(公平鎖),我們看 NonfairSync 部分。

 

3.1 NonfairSync鎖

加鎖的一步如下:

 
  1. public void lock() {
  2. sync.lock();
  3. }

對於非公平鎖,則執行如下流程:

 
  1. final void lock() {
  2. // 如果鎖沒有被任何線程鎖定且加鎖成功則設定當前線程為鎖的擁有者
  3. if (compareAndSetState(0, 1))
  4. setExclusiveOwnerThread(Thread.currentThread());
  5. else
  6. acquire(1);
  7. }

1)我們假設在這個時候,還沒有任務線程獲取鎖,這個時候,第一個線程過來了(我們使用的是非公平鎖),那么第一個線程thread1會去獲取鎖,這時它會調用下面的方法,通過CAS的操作,將當前AQS的state由0變成1,證明當前thread1已經獲取到鎖,並且將AQS的exclusiveOwnerThread設置成thread1,證明當前持有鎖的線程是thread1。 
2)如果此時來了第二個線程thread2,並且我們假設thread1還沒有釋放鎖,因為我們使用的是非公平鎖,那么thread2首先會進行搶占式的去獲取鎖,調用NonFairSync.lock方法獲取鎖。NonFairSync.lock方法的第一個分支是通過CAS操作獲取鎖,很明顯,這一步肯定會失敗,因為此時thread1還沒有釋放鎖。那么thread2將會走NonFairSync.lock方法的第二個分支,進行acquire(1)操作。acquire(1)其實是AQS的方法,acquire(1)方法內部首先調用tryAcquire方法,ReentrantLock.NonFairLock重寫了tryAcquire方法,並且ReentrantLock.NonFairLock的tryAcquire方法又調用了ReentrantLock.Sync的nonfairTryAcquire方法,nonfairTryAcquire方法如下:

 
  1. // 該方法來自父類AQS,我直接貼過來這邊,下面分析的時候同樣會這樣做,不會給讀者帶來閱讀壓力
  2. // 我們看到,這個方法,如果tryAcquire(arg) 返回true, 也就結束了。
  3. // 否則,acquireQueued方法會將線程壓到隊列中
  4. public final void acquire(int arg) {
  5. // 首先調用tryAcquire(1)一下,名字上就知道,這個只是試一試
  6. // 因為有可能直接就成功了呢,也就不需要進隊列排隊了,
  7. // 對於公平鎖的語義就是:本來就沒人持有鎖,根本沒必要進隊列等待(又是掛起,又是等待被喚醒的)
  8. if (!tryAcquire(arg) &&
  9. acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  10. selfInterrupt();
  11. }
  12. protected final boolean tryAcquire(int acquires) {
  13. //直接調用下面方法
  14. return nonfairTryAcquire(acquires);
  15. }
  16. /**
  17. * Performs non-fair tryLock. tryAcquire is implemented in
  18. * subclasses, but both need nonfair try for trylock method.
  19. */
  20. // 嘗試直接獲取鎖,返回值是boolean,代表是否獲取到鎖
  21. // 返回true:1.沒有線程在等待鎖;2.重入鎖,線程本來就持有鎖,也就可以理所當然可以直接獲取
  22. final boolean nonfairTryAcquire(int acquires) {
  23. final Thread current = Thread.currentThread();
  24. int c = getState();
  25. // state == 0 此時此刻沒有線程持有鎖
  26. if (c == 0) {
  27. // 用CAS嘗試一下,成功了就獲取到鎖了,
  28. // 不成功的話,只能說明一個問題,就在剛剛幾乎同一時刻有個線程搶先了 =_=
  29. // 因為剛剛還沒人的,我判斷過了???
  30. if (compareAndSetState(0, acquires)) {
  31. // 到這里就是獲取到鎖了,標記一下,告訴大家,現在是我占用了鎖
  32. setExclusiveOwnerThread(current);
  33. return true;
  34. }
  35. }
  36. else if (current == getExclusiveOwnerThread()) {
  37. // 會進入這個else if分支,說明是重入了,需要操作:state=state+1
  38. int nextc = c + acquires;
  39. if (nextc < 0) // overflow
  40. throw new Error("Maximum lock count exceeded");
  41. setState(nextc);
  42. return true;
  43. }
  44. // 如果到這里,說明前面的if和else if都沒有返回true,說明沒有獲取到鎖
  45. // 回到上面一個外層調用方法繼續看:
  46. // if (!tryAcquire(arg)
  47. // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  48. // selfInterrupt();
  49. return false;
  50. }

nonfairTryAcquire方法的執行邏輯如下: 
1. 獲取當前將要去獲取鎖的線程,在此時的情況下,也就是我們的thread2線程。 
2. 獲取當前AQS的state的值。如果此時state的值是0,那么我們就通過CAS操作獲取鎖,然后設置AQS的exclusiveOwnerThread為thread2。很明顯,在當前的這個執行情況下,state的值是1不是0,因為我們的thread1還沒有釋放鎖。 
3. 如果當前將要去獲取鎖的線程等於此時AQS的exclusiveOwnerThread的線程,則此時將state的值加1,很明顯這是重入鎖的實現方式。在此時的運行狀態下,將要去獲取鎖的線程不是thread1,也就是說這一步不成立。 
4. 以上操作都不成立的話,我們直接返回false。 
既然返回了false,那么之后就會調用addWaiter方法,這個方法負責把當前無法獲取鎖的線程包裝為一個Node添加到隊尾。通過下面的代碼片段我們就知道調用邏輯:

 
  1. // 假設tryAcquire(arg) 返回false,那么代碼將執行:
  2. // acquireQueued(addWaiter(Node.EXCLUSIVE), arg),
  3. // 這個方法,首先需要執行:addWaiter(Node.EXCLUSIVE)
  4. /**
  5. * Creates and enqueues node for current thread and given mode.
  6. *
  7. * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
  8. * @return the new node
  9. */
  10. // 此方法的作用是把線程包裝成node,同時進入到隊列中
  11. // 參數mode此時是Node.EXCLUSIVE,代表獨占模式
  12. private Node addWaiter(Node mode) {
  13. Node node = new Node(Thread.currentThread(), mode);
  14. // Try the fast path of enq; backup to full enq on failure
  15. //以下幾行代碼想把當前node加到鏈表的最后面去,也就是進到阻塞隊列的最后
  16. Node pred = tail;
  17. // tail!=null => 隊列不為空(tail==head的時候,其實隊列是空的,不過不管這個吧)
  18. if (pred != null) {
  19. // 設置自己的前驅 為當前的隊尾節點
  20. node.prev = pred;
  21. // 用CAS把自己設置為隊尾, 如果成功后,tail == node了
  22. if (compareAndSetTail(pred, node)) {
  23. // 進到這里說明設置成功,當前node==tail, 將自己與之前的隊尾相連,
  24. // 上面已經有 node.prev = pred
  25. // 加上下面這句,也就實現了和之前的尾節點雙向連接了
  26. pred.next = node;
  27. // 線程入隊了,可以返回了
  28. return node;
  29. }
  30. // 仔細看看上面的代碼,如果會到這里,
  31. // 說明 pred==null(隊列是空的) 或者 CAS失敗(有線程在競爭入隊)
  32. // 讀者一定要跟上思路,如果沒有跟上,建議先不要往下讀了,往回仔細看,否則會浪費時間的
  33. enq(node);
  34. return node;
  35. }

很明顯在addWaiter內部: 
第一步:將當前將要去獲取鎖的線程也就是thread2和獨占模式封裝為一個node對象。並且我們也知道在當前的執行環境下,線程阻塞隊列是空的,因為thread1獲取了鎖,thread2也是剛剛來請求鎖,所以線程阻塞隊列里面是空的。很明顯,這個時候隊列的尾部tail節點也是null,那么將直接進入到enq方法。 
第二步:我們首先看下enq方法的內部實現。首先內部是一個自旋循環。

 
  1. /**
  2. * Inserts node into queue, initializing if necessary. See picture above.
  3. * @param node the node to insert
  4. * @return node's predecessor
  5. */
  6. // 采用自旋的方式入隊
  7. // 之前說過,到這個方法只有兩種可能:等待隊列為空,或者有線程競爭入隊,
  8. // 自旋在這邊的語義是:CAS設置tail過程中,競爭一次競爭不到,我就多次競爭,總會排到的
  9. private Node enq(final Node node) {
  10. for (;;) {
  11. Node t = tail;
  12. // 之前說過,隊列為空也會進來這里
  13. if (t == null) { // Must initialize
  14. // 初始化head節點
  15. // 細心的讀者會知道原來head和tail初始化的時候都是null,反正我不細心
  16. // 還是一步CAS,你懂的,現在可能是很多線程同時進來呢
  17. if (compareAndSetHead(new Node()))
  18. // 給后面用:這個時候head節點的waitStatus==0, 看new Node()構造方法就知道了
  19. // 這個時候有了head,但是tail還是null,設置一下,
  20. // 把tail指向head,放心,馬上就有線程要來了,到時候tail就要被搶了
  21. // 注意:這里只是設置了tail=head,這里可沒return哦,沒有return,沒有return
  22. // 所以,設置完了以后,繼續for循環,下次就到下面的else分支了
  23. tail = head;
  24. } else {
  25. // 下面幾行,和上一個方法 addWaiter 是一樣的,
  26. // 只是這個套在無限循環里,反正就是將當前線程排到隊尾,有線程競爭的話排不上重復排
  27. node.prev = t;
  28. if (compareAndSetTail(t, node)) {
  29. t.next = node;
  30. return t;
  31. }
  32. }
  33. }
  34. }

第一次循環:t為null,隨后我們new出了一個空的node節點,並且通過CAS操作設置了線程的阻塞隊列的head節點就是我們剛才new出來的那個空的node節點,其實這是一個“假節點”,那么什么是“假節點”呢?那就是節點中不包含線程。設置完head節點后,同時又將head節點賦值給尾部tail節點,到此第一次循環結束。此時的節點就是如下: 
此處輸入圖片的描述 
第二次循環:現在判斷維度tail已經不是null了,那么就走第二個分支了,將尾部tail節點賦值給我們傳遞進來的節點Node的前驅節點,此時的結構如下: 
此處輸入圖片的描述 
然后再通過CAS的操作,將我們傳遞進來的節點node設置成尾部tail節點,並且將我們的node節點賦值給原來的老的尾部節點的后繼節點,此時的結構如下: 
此處輸入圖片的描述 
這個時候代碼中使用了return關鍵字,也就是證明我們經過了2次循環跳出了這個自懸循環體系。

在執行addWaiter將節點加入阻塞隊列后,接下來將會調用acquireQueued方法,主要是判斷當前節點的前驅節點是不是head節點,如果是的話,就再去嘗試獲取鎖,如果不是,就掛起當前線程。這里可能有人疑問了,為什么判斷當前節點的前驅節點是head節點的話就去嘗試獲取鎖呢?因為我們知道head節點是一個假節點,如果當前的節點的前驅節點是頭節點即是假節點的話,那么這個假節點的后繼節點就有可能有獲取鎖的機會,所以我們需要去嘗試。 
現在我們看下acquireQueued方法內部,我們也可以清楚的看到,這個方法的內部也是一個自懸循環。:

 
  1. // 現在,又回到這段代碼了
  2. // if (!tryAcquire(arg)
  3. // && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
  4. // selfInterrupt();
  5. // 下面這個方法,參數node,經過addWaiter(Node.EXCLUSIVE),此時已經進入阻塞隊列
  6. // 注意一下:如果acquireQueued(addWaiter(Node.EXCLUSIVE), arg))返回true的話,
  7. // 意味着上面這段代碼將進入selfInterrupt(),所以正常情況下,下面應該返回false
  8. // 這個方法非常重要,應該說真正的線程掛起,然后被喚醒后去獲取鎖,都在這個方法里了
  9. /**
  10. * Acquires in exclusive uninterruptible mode for thread already in
  11. * queue. Used by condition wait methods as well as acquire.
  12. *
  13. * @param node the node
  14. * @param arg the acquire argument
  15. * @return {@code true} if interrupted while waiting
  16. */
  17. final boolean acquireQueued(final Node node, int arg) {
  18. boolean failed = true;
  19. try {
  20. boolean interrupted = false;
  21. for (;;) {
  22. final Node p = node.predecessor();
  23. // p == head 說明當前節點雖然進到了阻塞隊列,但是是阻塞隊列的第一個,因為它的前驅是head
  24. // 注意,阻塞隊列不包含head節點,head一般指的是占有鎖的線程,head后面的才稱為阻塞隊列
  25. // 所以當前節點可以去試搶一下鎖
  26. // 這里我們說一下,為什么可以去試試:
  27. // 首先,它是隊頭,這個是第一個條件,其次,當前的head有可能是剛剛初始化的node,
  28. // enq(node) 方法里面有提到,head是延時初始化的,而且new Node()的時候沒有設置任何線程
  29. // 也就是說,當前的head不屬於任何一個線程,所以作為隊頭,可以去試一試,
  30. // tryAcquire已經分析過了, 忘記了請往前看一下,就是簡單用CAS試操作一下state
  31. if (p == head && tryAcquire(arg)) {
  32. setHead(node);
  33. p.next = null; // help GC
  34. failed = false;
  35. return interrupted;
  36. }
  37. // 到這里,說明上面的if分支沒有成功,要么當前node本來就不是隊頭,
  38. // 要么就是tryAcquire(arg)沒有搶贏別人,繼續往下看
  39. if (shouldParkAfterFailedAcquire(p, node) &&
  40. parkAndCheckInterrupt())
  41. interrupted = true;
  42. }
  43. } finally {
  44. if (failed)
  45. cancelAcquire(node);
  46. }
  47. }

第一次循環:獲取我們傳入node的前驅節點,判斷是否是head節點,現在我們的狀態是: 
此處輸入圖片的描述 
很明顯滿足當前node節點的前驅節點是head節點,那么現在我們就要去調用tryAcquire方法,也就是NonfairSync類的tryAcquire方法,而這個方法又調用了ReentrantLock.Sync.nonfairTryAcquire方法。

很明顯thread1占用鎖,所以thread2獲取鎖是失敗的,直接返回false。按照調用流程,現在進入了當前節點的前驅節點的shouldParkAfterFailedAcquire方法,檢查當前節點的前驅節點的waitstatus。shouldParkAfterFailedAcquire方法內部如下:

 
  1. /**
  2. * Checks and updates status for a node that failed to acquire.
  3. * Returns true if thread should block. This is the main signal
  4. * control in all acquire loops. Requires that pred == node.prev.
  5. *
  6. * @param pred node's predecessor holding status
  7. * @param node the node
  8. * @return {@code true} if thread should block
  9. */
  10. private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
  11. int ws = pred.waitStatus;
  12. // 前驅節點的 waitStatus == -1 ,說明前驅節點狀態正常,當前線程需要掛起,直接可以返回true
  13. if (ws == Node.SIGNAL)
  14. /*
  15. * This node has already set status asking a release
  16. * to signal it, so it can safely park.
  17. */
  18. return true;
  19. // 前驅節點 waitStatus大於0 ,之前說過,大於0 說明前驅節點取消了排隊。這里需要知道這點:
  20. // 進入阻塞隊列排隊的線程會被掛起,而喚醒的操作是由前驅節點完成的。
  21. // 所以下面這塊代碼說的是將當前節點的prev指向waitStatus<=0的節點,
  22. // 簡單說,就是為了找個好爹,因為你還得依賴它來喚醒呢,如果前驅節點取消了排隊,
  23. // 找前驅節點的前驅節點做爹,往前循環總能找到一個好爹的
  24. if (ws > 0) {
  25. /*
  26. * Predecessor was cancelled. Skip over predecessors and
  27. * indicate retry.
  28. */
  29. do {
  30. node.prev = pred = pred.prev;
  31. } while (pred.waitStatus > 0);
  32. pred.next = node;
  33. } else {
  34. /*
  35. * waitStatus must be 0 or PROPAGATE. Indicate that we
  36. * need a signal, but don't park yet. Caller will need to
  37. * retry to make sure it cannot acquire before parking.
  38. */
  39. // 仔細想想,如果進入到這個分支意味着什么
  40. // 前驅節點的waitStatus不等於-1和1,那也就是只可能是0,-2,-3
  41. // 在我們前面的源碼中,都沒有看到有設置waitStatus的,所以每個新的node入隊時,waitStatu都是0
  42. // 用CAS將前驅節點的waitStatus設置為Node.SIGNAL(也就是-1)
  43. compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
  44. }
  45. return false;
  46. }

上面的流程如下: 
1 如果當前節點的前驅節點的waitStatus=-1,就直接返回true; 
2 如果當前節點的前驅節點的waitStatus>0,也就是前驅節點被取消了,那么就從阻塞隊列中刪除前驅節點; 
3 如果以上情況都不滿足,則通過CAS操作將前驅節點設置為-1(SIGNAL)。 
此時,阻塞隊列中,會將head節點的waitStatus由0變為-1(初始化節點的waitStatus都是0),然后返回false; 
此處輸入圖片的描述 
然后回到acquireQueued執行第二次循環:很明顯滿足當前node節點的前驅節點是head節點,那么現在我們就要去調用tryAcquire方法,也就是NonfairSync類的tryAcquire方法,而這個方法又調用了ReentrantLock.Sync.nonfairTryAcquire方法。很明顯此時thread2獲取鎖是失敗的,直接返回false。按照調用流程,現在進入了當前節點的前驅節點的shouldParkAfterFailedAcquire方法,檢查當前節點的前驅節點的waitstatus。此時waitstatus為-1,這個方法返回true。shouldParkAfterFailedAcquire返回true后,就會調用parkAndCheckInterrupt方法,直接將當前線程thread2中斷。

 
  1. // 1. 如果shouldParkAfterFailedAcquire(p, node)返回true,
  2. // 那么需要執行parkAndCheckInterrupt():
  3. // 這個方法很簡單,因為前面返回true,所以需要掛起線程,這個方法就是負責掛起線程的
  4. // 這里用了LockSupport.park(this)來掛起線程,然后就停在這里了,等待被喚醒=======
  5. private final boolean parkAndCheckInterrupt() {
  6. LockSupport.park(this);
  7. return Thread.interrupted();
  8. }

仔細看這個方法acquireQueued方法,是無限循環,感覺如果p == head && tryAcquire(arg)條件不滿足循環將永遠無法結束,在這里,當然不會出現死循環。因為parkAndCheckInterrupt會把當前線程掛起,從而阻塞住線程的調用棧。分析到這里,我們的thread2線程已經被中斷了,這個線程不會再繼續執行下去了。

3)假設現在我們的thread1還沒有釋放鎖,而現在又來了一個線程thread3。 
thread3首先調用lock方法獲取鎖,首先去搶占鎖,因為我們知道thread1還沒有釋放鎖,這個時候thread3肯定搶占失敗,於是又調用了acquire方法,接着又失敗。接着會去調用addWaiter方法,將當前線程thread3封裝成node加入到線程阻塞隊列的尾部。現在的結構如下: 
此處輸入圖片的描述 
然后,調用addWaiter方法后,第一步,將當前將要去獲取鎖的線程也就是thread3和獨占模式封裝為一個node對象。並且我們也知道在當前的執行環境下,線程阻塞隊列不是空的,因為thread2獲取了鎖,thread2已經加入了隊列。很明顯,這個時候隊列的尾部tail節點也不是null,那么將直接進入到if分支。將尾部tail節點賦值給我們傳入的node節點的前驅節點。如下: 
此處輸入圖片的描述
第二步:通過CAS將我們傳遞進來的node節點設置成tail節點,並且將新tail節點設置成老tail節點的后繼節點。 
此處輸入圖片的描述
在執行addWaiter方法,將thread3插入到阻塞隊列尾部后,然后繼續調用acquireQueued方法,這是一個自循環方法。

 
  1. final boolean acquireQueued(final Node node, int arg) {
  2. boolean failed = true;
  3. try {
  4. boolean interrupted = false;
  5. for (;;) {
  6. final Node p = node.predecessor();
  7. if (p == head && tryAcquire(arg)) {
  8. setHead(node);
  9. p.next = null; // help GC
  10. failed = false;
  11. return interrupted;
  12. }
  13. if (shouldParkAfterFailedAcquire(p, node) &&
  14. parkAndCheckInterrupt())
  15. interrupted = true;
  16. }
  17. } finally {
  18. if (failed)
  19. cancelAcquire(node);
  20. }
  21. }

第一次循環:獲取thread3節點的前驅節點,判斷是否是head節點,現在阻塞隊列的結果如下: 
此處輸入圖片的描述
由於thread3的節點的前驅節點是thread2,不是head,所以會直接調用shouldParkAfterFailedAcquire方法: 
1 判斷thread3節點的前驅節點的waitStatus,狀態為-1,直接返回true; 
2 如果thread3節點的前驅節點的waitStatus大於0,說明這個節點被CANCEL了,一直循環向前查找,直到找到waitStatus<=0; 
3 如果都不是以上的情況,就通過CAS操作將這個前驅節點設置成-1(SIGHNAL)。 
此時的結構如下,主要是thread2節點的waitStatus由0變成了-1。 
此處輸入圖片的描述
第二次循環:獲取thread3節點的前驅節點,判斷是否是head節點,由於明顯不是head節點,那么直接進入調用shouldParkAfterFailedAcquire方法,此時,thread3節點的前驅節點的waitStatus為-1,因為返回ture,所以接下來會調用parkAndCheckInterrupt方法,直接將當前線程thread3線程中斷。現在thread2和thread3線程都被中斷了。

 

3.2 FairSync鎖

其實,公平鎖和非公平鎖,不同點只存在兩個地方: 
1 對於非公平鎖,獲取鎖的第一步就是通過CAS設置state的狀態,如果成功,則直接獲取了鎖; 
2 對於公平是和非公平鎖,都會調用tryAcquire方法來獲取鎖,但是二者是有區別的:

 
  1. //非公平鎖
  2. /**
  3. * Performs non-fair tryLock. tryAcquire is implemented in
  4. * subclasses, but both need nonfair try for trylock method.
  5. */
  6. final boolean nonfairTryAcquire(int acquires) {
  7. final Thread current = Thread.currentThread();
  8. int c = getState();
  9. if (c == 0) {
  10. // 非公平鎖,直接搶占鎖
  11. if (compareAndSetState(0, acquires)) {
  12. setExclusiveOwnerThread(current);
  13. return true;
  14. }
  15. }
  16. else if (current == getExclusiveOwnerThread()) {
  17. int nextc = c + acquires;
  18. if (nextc < 0) // overflow
  19. throw new Error("Maximum lock count exceeded");
  20. setState(nextc);
  21. return true;
  22. }
  23. return false;
  24. }
  25. //公平鎖
  26. /**
  27. * Fair version of tryAcquire. Don't grant access unless
  28. * recursive call or no waiters or is first.
  29. */
  30. protected final boolean tryAcquire(int acquires) {
  31. final Thread current = Thread.currentThread();
  32. int c = getState();
  33. if (c == 0) {
  34. // 雖然此時此刻鎖是可以用的,但是這是公平鎖,既然是公平,就得講究先來后到,
  35. // 看看有沒有別人在隊列中等了半天了
  36. if (!hasQueuedPredecessors() &&
  37. compareAndSetState(0, acquires)) {
  38. setExclusiveOwnerThread(current);
  39. return true;
  40. }
  41. }
  42. else if (current == getExclusiveOwnerThread()) {
  43. int nextc = c + acquires;
  44. if (nextc < 0)
  45. throw new Error("Maximum lock count exceeded");
  46. setState(nextc);
  47. return true;
  48. }
  49. return false;
  50. }
  51. }
 

4 ReentrantLock 釋放鎖

現在thread1要開始釋放鎖了。調用unlock方法,unlock方法又調用了內部的release方法:

 
  1. /**
  2. * Releases in exclusive mode. Implemented by unblocking one or
  3. * more threads if {@link #tryRelease} returns true.
  4. * This method can be used to implement method {@link Lock#unlock}.
  5. *
  6. * @param arg the release argument. This value is conveyed to
  7. * {@link #tryRelease} but is otherwise uninterpreted and
  8. * can represent anything you like.
  9. * @return the value returned from {@link #tryRelease}
  10. */
  11. public final boolean release(int arg) {
  12. if (tryRelease(arg)) {
  13. Node h = head;
  14. if (h != null && h.waitStatus != 0)
  15. unparkSuccessor(h);
  16. return true;
  17. }
  18. return false;
  19. }
  20. protected final boolean tryRelease(int releases) {
  21. int c = getState() - releases;
  22. if (Thread.currentThread() != getExclusiveOwnerThread())
  23. throw new IllegalMonitorStateException();
  24. // 是否完全釋放鎖
  25. boolean free = false;
  26. // 其實就是重入的問題,如果c==0,也就是說沒有嵌套鎖了,可以釋放了,否則還不能釋放掉
  27. if (c == 0) {
  28. free = true;
  29. setExclusiveOwnerThread(null);
  30. }
  31. setState(c);
  32. return free;
  33. }

調用tryRelease方法釋放鎖,獲取當前AQS的state,並減去1,判斷當前線程是否等於AQS的exclusiveOwnerThread,如果不是,就拋異常,這就保證了加鎖和釋放鎖必須是同一個線程。如果(state-1)的結果不為0,說明鎖被重入了,需要多次unlock。如果(state-1)等於0,我們就將AQS的ExclusiveOwnerThread設置為null。如果上述操作成功了,也就是tryRelase方法返回了true,那么就會判斷當前隊列中的head節點,當前結構如下: 
此處輸入圖片的描述
如果head節點不為null,並且head節點的waitStatus不為0,我們就調用unparkSuccessor方法去喚醒head節點的后繼節點。

 
  1. /**
  2. * Wakes up node's successor, if one exists.
  3. *
  4. * @param node the node
  5. */
  6. // 喚醒后繼節點
  7. // 從上面調用處知道,參數node是head頭結點
  8. private void unparkSuccessor(Node node) {
  9. /*
  10. * If status is negative (i.e., possibly needing signal) try
  11. * to clear in anticipation of signalling. It is OK if this
  12. * fails or if status is changed by waiting thread.
  13. */
  14. int ws = node.waitStatus;
  15. // 如果head節點當前waitStatus<0, 將其修改為0
  16. if (ws < 0)
  17. compareAndSetWaitStatus(node, ws, 0);
  18. /*
  19. * Thread to unpark is held in successor, which is normally
  20. * just the next node. But if cancelled or apparently null,
  21. * traverse backwards from tail to find the actual
  22. * non-cancelled successor.
  23. */
  24. // 下面的代碼就是喚醒后繼節點,但是有可能后繼節點取消了等待(waitStatus==1)
  25. // 從隊尾往前找,找到waitStatus<=0的所有節點中排在最前面的
  26. Node s = node.next;
  27. if (s == null || s.waitStatus > 0) {
  28. s = null;
  29. // 從后往前找,仔細看代碼,不必擔心中間有節點取消(waitStatus==1)的情況
  30. for (Node t = tail; t != null && t != node; t = t.prev)
  31. if (t.waitStatus <= 0)
  32. s = t;
  33. }
  34. if (s != null)
  35. // 喚醒線程
  36. LockSupport.unpark(s.thread);
  37. }

第一步:獲取head節點的waitStatus,如果小於0,就通過CAS操作將head節點的waitStatus修改為0,現在是: 
此處輸入圖片的描述
第二步:尋找head節點的下一個節點,如果這個節點的waitStatus小於0,就喚醒這個節點,否則遍歷下去,找到第一個waitStatus<=0的節點,並喚醒。現在thread2線程被喚醒了,我們知道剛才thread2在acquireQueued被中斷,現在繼續執行,又進入了for循環,當前節點的前驅節點是head並且調用tryAquire方法獲得鎖並且成功。那么設置當前Node為head節點,將里面的thead和prev設置為null。 
此處輸入圖片的描述
此時,thread2獲取了鎖,並成為頭節點,原來的頭節點釋放掉,等待被回收。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM