AQS-等待隊列


  AQS的原理在於,每當有新的線程請求資源時,該線程會進入一個等待隊列(Waiter Queue),只有當持有鎖的線程釋放資源后,該線程才能持有資源。該等待隊列的實現方式是雙向鏈表,線程會被包裹在鏈表節點Node中。Node即隊列的節點對象,它封裝了各種等待狀態(典型的狀態機模式),前驅和后繼節點信息,以及它對應的線程。

  AQS定義兩種資源共享方式:Exclusive(獨占,在特定時間內,只有一個線程能夠執行,如ReentrantLock)和share(共享,多個線程可以同時執行,如ReadLock、Semaphore、CountDownLatch),可見不同的實現方式征用共享資源的方式不同,由此,自定義同步器在實現時要根據需求來實現共享資源state的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經實現好了。

Node節點

 1 //標記節點為共享模式
 2 static final Node SHARED = new Node();  3 //標記節點為獨占模式
 4 static final Node EXCLUSIVE = null;  5 //等待狀態
 6 volatile int waitStatus;  7 //前驅結點
 8 volatile Node prev;  9 //后繼節點
10 volatile Node next; 11 //線程
12 volatile Thread thread;

自定義同步器時主要需要實現以下幾種方法: 

  • isHeldExclusively():該線程是否正在獨占資源,只有用到condition時才需要去使用它。
  • tryAcquire(int):獨占方式,嘗試獲取資源,返回boolean。
  • tryRelease(int):獨占方式,嘗試釋放資源,返回boolean。
  • tryAcquireShared(int):共享方式,嘗試獲取資源,返回int。
  • tryReleaseShared(int):共享方式,嘗試釋放資源,返回boolean。

等待隊列節點對象Node有四種不同的狀態:

  • CANCELLED(1)已取消
  • SIGNAL(-1)競爭獲勝需要喚醒
  • CONDITION(-2)在condition隊列中等待
  • PROPAGATE(-3)后續節點傳播喚醒操作,共享模式下使用

acquire方法執行流程

1 public final void acquire(int arg) { 2     if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) { 3  selfInterrupt(); 4  } 5 }

1)      嘗試獲取鎖tryAcquire,返回值表示當前線程是否獲取鎖。 

2)      如果獲取成功,那么說明當前對象已經持有鎖,執行中斷操作,中斷操作會解除線程阻塞。

3)      如果獲取失敗,那么把當前線程封裝為Waiter節點,等待隊列沒有節點時初始化隊列,有則使用compareAndSetTail()添加進Waiter隊列尾端。

4)      acquiredQueue自旋獲取資源,並且返回Waiter節點持有的線程的應當具備的中斷狀態。

5)      根據返回結果來確定是否需要執行線程中斷操作。

 1 private Node addWaiter(Node mode) {  2     //封裝當前節點為node節點
 3     Node node = new Node(Thread.currentThread(), mode);  4     Node pred = tail;  5     if(pred != null) {  6         //將node節點的前驅節點設置為tail
 7         node.prev = pred;  8         //多線程環境下,tail可能已經被其它線程修改了,這里校驗pred是否依然是為節點  9         //如果是,那么將node設置為尾結點,原尾結點的后繼節點設置為node,返回node
10         if(compareAndSelfTail(pred, node)) { 11             pred.next = node; 12             return node; 13  } 14  } 15     //執行到這里,說明tail為null,或者tail已經發生了變動
16  enq(node); 17     return node; 18 }    
 1 private Node enq(final Node node) {  2     //下面這個死循環用於把node節點插入到隊尾,由於多線程環境下,tail節點可能  3     //隨時變動,必須不停的嘗試,讓下面兩個操作不會被其它線程干涉。  4     //1,node.prev必須為當前尾結點  5     //2,node設置為新的尾結點
 6     for(;;) {  7         Node t = tail;  8         //tail為空,也說明head為空,此時初始化隊列
 9         if(t == null) { 10             //CAS方式初始化隊頭
11             if(compareAndSetHead(new Node())) 12                 tail = head; 13         } else { 14             //設置node.prev為當前尾結點
15             node.prev = t; 16            //多線程環境下,此時尾結點可能已經被其它訪問修改了,需要CAS來進行比較 17             //如果t依然是尾結點,那么node設置為尾結點、
18             if(compareAndSetTail(t, node)) { 19                 t.next = node; 20                 return t; 21  } 22  } 23  } 24 }

  acquiredQueued(Node)方法會接收addWaiter封裝好的Node對象,該方法的本質在於以自旋的方式獲取資源,即自旋鎖。它做了兩件事,如果指定節點的前驅節點時頭結點,那么再次嘗試獲取鎖,反之,嘗試阻塞當前線程。自旋不能構成死循環,否則會浪費大量CPU資源,在AQS中如果p==head&&tryAcquire(arg)條件不足時不會一直循環下去。通常,在p==head之前,必然會有一個線程得到鎖,此時tryAcquire()通過,循環結束。如果發生了極端情況,那么node.predecessor()也會在node==head的情況下拋出空指針異常,循環結束。shouldParkAfterFailedAcquire(p,node)檢測前驅節點的等待狀態,需要阻塞則調用partAndCheckInterrupt()方法會阻塞當前線程,該循環也不會無限制的消耗資源。

 1 final boolean acquireQueued(final Node, int arg) {  2     boolean failed = true;  3     try {  4         boolean interrupted = false;  5         for(;;) {  6             //找到node的前驅節點,如果node已經為head,那么會拋出空指針異常  7             //空指針異常說明整個等待隊列都沒有能夠獲取鎖的線程。
 8             final Node p = node.predecessor();  9             //前驅節點為頭結點時,當前線程嘗試獲取鎖 10             //如果獲取成功,那么node會成為新的頭結點,這個過程會清空node的線程信息。
11             if(p == head && tryAcquire(arg)) { 12  setHead(node); 13                p.next = null; 14                 failed = false; 15                 return interrupted; 16  } 17             //當前線程不能獲取鎖,則說明該節點需要阻塞 18             //shouldParkAfterFailedAcquire()用於檢查和設置節點阻塞狀態 19             //如果為通過檢查,那么說明沒有阻塞,parkAndCheckInterrupt()用於阻塞當前線程。
20             if(shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) 21                interrupted = true; 22  } 23             finally { 24                 if(failed) cancelAcquire(node); 25  } 26  } 27 }                    
 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {  2     //前驅節點的等待狀態
 3     int ws = pred.waitStatus;  4     if(ws == Node.SIGNAL) {  5         //SIGNAL表示前驅節點需要被喚醒,此時node是一定可以安全阻塞的,所以返回true
 6         return true;  7  }  8     if(ws > 0) {  9         //大於0的等待狀態只有CANCELLED,從隊列里移除所有前置的CANCELLED節點。
10         do { 11             node.prev = pred = pred.prev; 12         } while (pred.waitStatus > 0); 13         pred.next = node; 14     } else { 15         //運行到這里,說明前驅節點處於0、CONDITION或者PROPAGATE狀態下 16         //此時該節點需要被置為SIGNAL狀態,等待被喚醒。
17  compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 18  } 19 }
1 private final boolean parkAndCheckInterrupt() { 2     //LockSupport.park()用於阻塞當前線程
3     LockSupport.park(this); 4     return Thread.interruupted(); 5 }

  由此可以得出結論,當一個新的線程節點入隊之后,會檢查它的前驅節點,只要有一個節點的狀態是SIGNAL,就表示當前節點之前的節點正在被等待喚醒,那么當前線程就需要被阻塞,以等待RentrantLock.unlock()喚醒之前的線程。 

 

  在過程2中,node1剛剛入隊,沒有爭搶到鎖,此時head狀態為初始化的0狀態,於是調用了compareAndSetWaitStatus(pred,ws,Node.SIGNAL),這個方法會把head的狀態改為SIGNAL。

  在過程3中,acquired()方法里的for循環會在執行一次,此時,node1的前驅節點依然是head,如果它依然沒有競爭鎖,那么由於head的waitStatus屬性的值為SIGNAL,這會導致shouldParkAfterFailedAcquire()方法返回true,當前線程(node1持有的線程)被阻塞,代碼不在繼續往下執行。這樣就達到了讓等待隊列里的線程阻塞的目的,由此可以類推更多線程入隊的過程。由此可以類推更多線程入隊的過程:

 

       SIGNAL狀態由release()方法進行修改,這個方法首先調用tryRelease()方法嘗試釋放鎖,它返回的是鎖是否處於可用狀態,如果鎖可用,那么該方法也不負責中斷等待線程的阻塞,它僅僅把鎖的線程持有者設為null;然后,如果成功的釋放鎖,那么判斷隊頭狀態,隊頭為空則說明隊列沒有等待線程,不再做其它操作,反之再判斷隊頭的狀態waitStatus,只要它不為0,就說明等待隊列中有被阻塞的節點。

 1 public final boolean release(int arg) {  2     if(tryRelease(arg)) {  3         Node h = head;  4         if(h != null && h.waitStatus != 0) {  5  unparkSuccessor(h);  6  }  7         return true;  8  }  9     return false; 10 }
 1 private void unparkSuccessor(Node node) {  2     int ws = node.waitStatus;  3     //小於0的狀態waitStatus只有SIGNAL和CONDITION
 4     if(ws < 0) {  5         compareAndSetWaitStatus(node, ws, 0);  6  }  7     Node s = node.next;  8     //前驅查找需要喚醒的節點
 9     if(s == null || s.waitStatus > 0) { 10        s = null; 11         for(Node t = tail; t != null && t != node; t = t.prev) { 12             if(t.waitStatus <= 0) s = t; 13  } 14  } 15     if(s != null) { 16  LockSupport.unpark(s.thread); 17  } 18 }

       unparkSuccessor()負責確保中斷正確的線程阻塞。在ReentrantLock.unlock()的調用過程中,unparkSuccessor(Node node)的形參node始終為head節點,這個方法執行的主要操作為:

  • 首先把head節點的waitStatus設置為0,表示隊列里沒有需要中斷阻塞的線程。
  • 然后確定需要被喚醒的節點,該節點是隊列中第一個waitStatus小於等於0的節點。
  • 最后,調用LockSupport.unlock()方法中斷指定線程的阻塞狀態。

 

       需要注意的是,node1對應的線程此時已經中斷了阻塞,它會開始繼續執行AQS的AacquireQueued()方法中for循環的代碼final Node p = node.predecessor();顯然node1的前驅節點head由於鎖已經被釋放,隊列變化為

 

       這部分代碼比較巧妙,可以注意到,在釋放的過程中,代碼里並沒有改變head的waitStatus為SIGNAL,而是直接使用node1替代了原先的head。換言之,原本需要修改head/node2的前驅和后置,並且把head的waitStatus修改為SIGNAL,使用當前的代碼,只需要釋放node1的持有線程,然后移除head節點,這樣可以更快的到達隊列規整的目的。

AQS如何阻塞線程和中斷阻塞

       在acquired()方法中,當前線程嘗試獲取鎖,如果沒有獲得,那么會把線程加入等待隊列中,加入到隊列的線程會被阻塞。

       線程阻塞有三種常見的實現方式:Object.wait()、Thread.join()、或者Thread.sleep()。

       中斷阻塞則通過Thread.interrupt()方法來實現,這個方法會發出一個中斷信號量從而導致線程拋出中斷異常InterruptedException,已達到結束阻塞的目的。需要注意的是Interrupt不會中斷用戶循環體造成阻塞,它僅僅是拋出信號量,具體處理方式還是由用戶處理。Thread.isInterrupted可以得到中斷狀態。

       對於wait、sleep、join等會造成線程阻塞的方法,由於它們都會拋出Interrupted Exception,處理方式如下

1 try { 2     Thread.currentThread().sleep(500); 3 } catch (InterruptedException e) { 4     //中斷后拋出異常,在異常捕獲里可以對中斷定制處理
5 }

        對循環體處理方式如下表示:

1 //使用Thread.isInterrupted方法獲取中斷信號量
2 while(!Thread.currentThread().isInterrupted && 用戶自定義條件) { 3 }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM