AQS同步隊列器之一:使用和原理


一、簡介

   JDK1.5之前都是通過synchronized關鍵字實現並發同步,而JDK1.5以后Doug Lea大師開發了current包下的類,通過Java代碼實現了synchronized關鍵字的語義。

   然而在current包下的這些類的實現大部分都離不開一個基礎組件----AQS(AbstractQueuedSynchronizer)也就是同步隊列器。

   AQS,AbstractQueuedSynchronizer,即隊列同步器。它是構建鎖或者其他同步組件的基礎框架(如ReentrantLock、ReentrantReadWriteLock、Semaphore等),JUC並發包的作者期望它能夠成為實現大部分同步需求的基礎。它是JUC並發包中的核心基礎組件。AQS解決了子類實現同步器時涉及當的大量細節問題,例如獲取同步狀態、FIFO同步隊列。基於AQS來構建同步器可以帶來很多好處。它不僅能夠極大地減少實現工作,而且也不必處理在多個位置上發生的競爭問題。在基於AQS構建的同步器中,只能在一個時刻發生阻塞,從而降低上下文切換的開銷,提高了吞吐量。同時在設計AQS時充分考慮了可伸縮行,因此J.U.C中所有基於AQS構建的同步器均可以獲得這個優勢。

   AQS的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態。

二、簡單使用示例

   在使用AQS基礎組件前,先了解一下內部的基本的方法,這些方法可以分為兩類:

      第一類:子類實現的方法,AQS不作處理(模板方法)

        tryAcquire(int arg):獨占式的獲取鎖,返回值是boolean類型的,true代表獲取鎖,false代表獲取失敗。

        tryRelease(int arg):釋放獨占式同步狀態,釋放操作會喚醒其后繼節點獲取同步狀態。

        tryAcquireShared(int arg):共享式的獲取同步狀態,返回大於0代表獲取成功,否則就是獲取失敗。

        tryReleaseShared(int arg):共享式的釋放同步狀態。

        isHeldExclusively():判斷當前的線程是否已經獲取到了同步狀態。

這些方法是子類實現時必須實現的方法,通過上面的這些方法來判斷是否獲取了鎖,然后再通過AQS本身的方法執行獲取鎖與未獲取鎖的過程。

      第二類:AQS本身的實現的方法,定義給子類通用實現的方法

        acquire(int arg):獨占式的獲取鎖操作,獨占式獲取同步狀態都調用這個方法,通過子類實現的tryAcquire方法判斷是否獲取了鎖。

        acquireShared(int arg):共享式的獲取鎖操作,在讀寫鎖中用到,通過tryAcquireShared方法判斷是否獲取到了同步狀態。

        release(int arg):獨占式的釋放同步狀態,通過tryRelease方法判斷是否釋放了獨占式同步狀態。

        releaseShared(int arg):共享式的釋放同步狀態,通過tryReleaseShared方法判斷是否已經釋放了共享同步狀態。

從這兩類方法可以看出,AQS為子類定義了一套獲取鎖和釋放鎖以后的操作,而具體的如何判斷是否獲取鎖和釋放鎖都是交由不同的子類自己去實現其中的邏輯,這也是Java設計模式之一:模板模式的實現。有了AQS我們就可以實現一個屬於自己的Lock,下面就是一個AQS源碼層的一個Demo:

 1    public class Mutex implements Lock,java.io.Serializable{
       //內部自定義實現的隊列同步器
2 private static class Sync extends AbstractQueuedSynchronizer{
           //判斷是否同步狀態已經被占用了
3 protected boolean isHeldExclusively(){ 4 return getState() == 1; 5 } 6          //獲取鎖的操作 7 public boolean tryAcquire(int acquires){ 8 if(compareAndSetState(0,1)){//CAS操作獲取鎖狀態 9 setExclusiveOwnerThread(Thread.currentThread());//將當前線程設置為獲取同步狀態的線程 10 return true; 11 } 12 return false; 13 }
           //釋放鎖操作
14 protected boolean tryRelease(int releases){ 15 if(getState() == 0){//當前同步狀態值為0代表已經釋放 16 setExclusiveOwnerThread(null); 17 setState(0); 18 return true; 19 } 20 } 21 public void lock(){ sync.acquire(1);}//最終調用AQS中的acquire方法 22 public boolean tryLock(){return sync.tryAcquire(1);} 23 public void unlock(){ sync.release(1);} 24 public Boolean isLocked(){ return sync.isHeldExclusively();} 25 }

上面的Mutex是自定義實現的一個獨占式鎖,通過tryAcquire操作判斷線程是否獲取到了同步狀態,這個方法是Mutex自身實現的一個方法。通過tryRelease方法判斷是否釋放了同步狀態。通過子類自定義實現獲取和釋放的操作最終調用AQS中的方法實現鎖操作。

三、源碼分析以及原理

AQS類結構

從圖中可以看出來,AbstractQueuedSynchronizer內部維護了一個Node節點類和一個ConditionObject內部類。Node內部類是一個雙向的FIFO隊列,用來保存阻塞中的線程以及獲取同步狀態的線程,而ConditionObject對應的是下一篇要講的Lock中的等待和通知機制。

 node類結構

同步隊列

除了Node節點的這個FIFO隊列,還有一個重要的概念就是waitStatus一個volatile關鍵字修飾的節點等待狀態。在AQS中waitstatus有五種值:

    SIGNAL 值為-1、后繼節點的線程處於等待的狀態、當前節點的線程如果釋放了同步狀態或者被取消、會通知后繼節點、后繼節點會獲取鎖並執行(當一個節點的狀態為SIGNAL時就意味着在等待獲取同步狀態,前節點是頭節點也就是獲取同步狀態的節點)

    CANCELLED 值為1、因為超時或者中斷,結點會被設置為取消狀態,被取消狀態的結點不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態。處於這種狀態的結點會被踢出隊列,被GC回收(一旦節點狀態值為1說明被取消,那么這個節點會從同

步隊列中刪除)

    CONDITION 值為-2、節點在等待隊列中、節點線程等待在Condition、當其它線程對Condition調用了singal()方法該節點會從等待隊列中移到同步隊列中

    PROPAGATE 值為-3、表示下一次共享式同步狀態獲取將會被無條件的被傳播下去(讀寫鎖中存在的狀態,代表后續還有資源,可以多個線程同時擁有同步狀態)

    initial 值為0、表示當前沒有線程獲取鎖(初始狀態)

了解了節點等待的狀態以及同步隊列的作用,AQS中還通過了一個volatile關鍵字修飾的status對象用來管理鎖的狀態並提供了getState()、setState()、compareAndSetStatus()三個方法改變status的狀態。知道了這些就可以開始真正看AQS是如何處理沒有獲取鎖的線程的。在真正了解底層實現AQS之前還要介紹一下獨占鎖和共享鎖:

    獨占鎖:在同一個時刻只能有一個線程獲得同步狀態,一旦這個線程獲取同步狀態,其它線程就無法再獲取將會進入阻塞的狀態。

    共享鎖:在同一個時刻可以存在多個線程獲取到同步狀態。

接下來就從源碼的角度了解AQS中的鎖操作機制:

acquire(int arg):獨占式的獲取鎖,此方法不響應中斷,在這過程中中斷,線程不會從同步隊列中移除也不會立馬中斷

1       public final void acquire(int arg){ 2               if(!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE))){ 3                          selfInterrupt();//如果這個過程中出現中斷,在整個過程結束后再自我中斷
4  } 5          }

acquire方法代碼很少,但是它做了很多事,首先前面介紹過tryAcquire()方法是子類實現的具體獲取鎖的方法,當鎖獲取到了就會立刻退出if條件也就代表獲取鎖具體的就是啥也不干。那么看鎖獲取失敗具體干了啥呢。首先是addWaiter(Node.EXCLUSIVE)方法

 

addWaiter(Node mode):往同步隊列中添加元素

 1     private Node addWaiter(Node mode){  2       //通過當前線程和鎖模式創建了一個Node節點  3 Node node = new Node(Thread.currentThread(),mode);  4       //獲取尾節點  5 Node pred = tail;  6 if(pred != null){  7 node.prev = pred;//新增的節點每次都是加在同步隊列的尾部  8 //通過CAS操作設置尾節點防止線程不安全  9 if(compareAndSetTail(pred,node)){ 10 pred.next = node; 11 return node; 12  } 13  } 14 enq(node);//防止CAS操作失敗,再次處理 15 return node; 16 }

addWaiter方法主要做的就是創建一個節點,如果通過CAS操作成功就直接將節點加入同步隊列的尾部,否則需要enq方法的幫忙再次進行處理。設置尾節點的操作必須是CAS類型的,因為會有多個線程同時去獲取同步狀態防止並發不安全。

添加到隊列尾節點操作 

enq(Node node):在addWaiter方法處理失敗的時候進一步進行處理

 1      private Node enq(final Node node){  2          //死循環【發現很多的底層死循環都是這么寫不知道是不是有什么優化點】
 3            for(;;){  4               Node t = tail;  5               if(t == null){//如果尾節點為null
 6                 if(compareAndSetHead(new Node())){//創建一個新的節點並添加到隊列中初始化
 7                     tail = head;  8                 }else{  9                     node.prev = t; 10              //還是通過CAS操作添加到尾部
11                     if(compareAndSetTail(t,node)){ 12                          t.next = node; 13                          return t; 14    } 15    } 16  } 17  } 18         }

enq方法就是通過死循環,不斷的通過CAS操作設置尾節點,直到添加成功才返回。

 

acquireQueued(final Node node,int arg):當線程獲取鎖失敗並加入同步隊列以后,就進入了一個自旋的狀態,如果獲取到了這個狀態就退出阻塞狀態否則就一直阻塞

 1      final boolean acquireQueued(final Node node,int arg){  2            boolean failed = true;//用來判斷是否獲取了同步狀態
 3               try{  4                    boolean interrupted = false;//判斷自旋過程中是否被中斷過
 5                    for(;;){  6                        final Node p = node.predecessor();//獲取前繼節點
 7                        if(p == head && tryAcquire(arg)){//如果當前的這個節點的前繼節點是頭節點就去嘗試獲取了同步狀態
 8                            setHead(node);//設為頭節點
 9                            p.next = null; 10                            failed = false;//代表獲取了同步狀態
11                            return interrupted; 12  } 13               //判斷自己是否已經阻塞了檢查這個過程中是否被中斷過
14                        if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt() ){ 15                            interrupted = true; 16  } 17                    }finally{ 18                        if(failed){ 19                           cancelAcquired(node);
20  } 21  } 22  } 23         }

acquireQueued方法主要是讓線程通過自旋的方式去獲取同步狀態,當然也不是每個節點都有獲取的資格,因為是FIFO先進先出隊列,acquireQueued方法保證了只有頭節點的后繼節點才有資格去獲取同步狀態,如果線程可以休息了就讓該線程休息然后記錄下這個過程中是否被中斷過,當線程獲取了同步狀態就會從這個同步隊列中移除這個節點。同時還會設置獲取同步狀態的線程為頭節點,在設置頭節點的過程中不需要任何的同步操作,因為獨占式鎖中能獲取同步狀態的必定是同一個線程。

設置頭節點操作

同步隊列中節點自旋操作

shouldParkAfterFailedAcquire(Node node,Node node):判斷一個線程是否阻塞

 1      private static boolean shouldPArkAfterFailedAcquire(Node pred,Node node){  2             int ws = pred.waitStatus;//獲取節點的等待狀態
 3             if(ws == Node.SIGNAL){//如果是SIGNAL就代表當頭節點釋放后,這個節點就會去嘗試獲取狀態
 4                    return true;//代表阻塞中
 5  }  6             if(ws > 0){//代表前繼節點放棄了
 7               do {  8                   node.prev = pred = pred.prev;//循環不停的往前找知道找到節點的狀態是正常的
 9               }while(pred.waitStatus > 0 ); 10                pred.next = node; 11               }else{ 12                  compareAndSetWaitStatus(pred,ws,Node.SIGNAL);//通過CAS操作設置狀態為SIGNAL
13  } 14               return false; 15         }

整個流程中,如果前驅結點的狀態不是SIGNAL,那么自己就不能安心去休息,也就是只有當前驅節點為SIGNAL時這個線程才可以進入等待狀態。

 

parkAndCheckInterrupt():前面的方法是判斷是否阻塞,而這個方法就是真正的執行阻塞的方法同時返回中斷狀態

1       private final boolean parkAndCheckInterupt(){
3             LockSupport.park(this);//阻塞當前線程
5             return Thread.interrupted();//返回中斷狀態
7         }

經過了上面的這么多方法,再次回頭看acquire方法的時候。會發現其實整個流程也沒有想象中的那么難以理解。acquire方法流程

    首先通過子類判斷是否獲取了鎖,如果獲取了就什么也不干。

    如果沒有獲取鎖、通過線程創建節點加入同步隊列的隊尾。

    當線程在同步隊列中不斷的通過自旋去獲取同步狀態,如果獲取了鎖,就把其設為同步隊列中的頭節點,否則在同步隊列中不停的自旋等待獲取同步狀態。

    如果在獲取同步狀態的過程中被中斷過最后自行調用interrupted方法進行中斷操作。

這里可以看一下acquire也就是獨占式獲取鎖的整個流程

AQS之aquire獨占式獲取鎖流程 

release(int arg):獨占式的釋放鎖 

 1      public final boolean release(int arg){
 3           if(tryRelease(arg)){//子類自定義實現
 4               Node h = head;  5               if(h != null && h.waitStatus != 0){  6                    unparkSuccessor(h);//喚醒下一個節點
 7  }  8               return true;  9  } 10           return false; 11         }

釋放鎖的流程很簡單,首先子類自定義的方法如果釋放了同步狀態,如果頭節點不為空並且頭節點的等待狀態不為0就喚醒其后繼節點。主要依賴的就是子類自定義實現的釋放操作。

 

unparkSuccessor(Node node):喚醒后繼節點獲取同步狀態

 1      private void unparkSuccessor(Node node){  2            //獲取頭節點的狀態
 3            int ws = node.waitStatus;  4            if(ws < 0){  5               compareAndSetWaitStatus(node,ws,0);//通過CAS將頭節點的狀態設置為初始狀態
 6  }  7            Node s = node.next;//后繼節點
 8            if(s == null || s.waitStatus >0){//不存在或者已經取消
 9               s = null; 10               for(Node t = tail;t != null && t != node;t = t.prev){//從尾節點開始往前遍歷,尋找離頭節點最近的等待狀態正常的節點
11                  if(t.waitStatus <= 0){ 12                     s = t; 13  } 14  } 15  } 16            if(s != null){ 17               LockSupport.unpark(s.thread);//真正的喚醒操作
18  } 19         }

喚醒操作,通過判斷后繼節點是否存在,如果不存在就尋找等待時間最長的適合的節點將其喚醒喚醒操作通過LockSupport中的unpark方法喚醒底層也就是unsafe類的操作。

以上就是獨占式的獲取鎖以及釋放鎖的過程總結的來說:線程獲取鎖,如果獲取了鎖就啥也不干,如果沒獲取就創造一個節點通過compareAndSetTail(CAS操作)操作的方式將創建的節點加入同步隊列的尾部,在同步隊列中的節點通過自旋的操作不斷去獲取同步狀態【當然由於FIFO先進先出的特性】等待時間越長就越先被喚醒。當頭節點釋放同步狀態的時候,首先查看是否存在后繼節點,如果存在就喚醒自己的后繼節點,如果不存在就獲取等待時間最長的符合條件的線程。

 

acquireShared(int arg):共享式的獲取鎖 

1          public final void acquireShared(int arg){ 2            //子類自定義實現的獲取狀態【也就是當返回為>=0的時候就代表獲取鎖】
3                  if(tryAcquireShared(arg) < 0){ 4                          doAcquiredShared(arg);//具體的處理沒有獲取鎖的線程的方法
5  } 6          }

 

doAcquiredShared(int arg):處理未獲取同步狀態的線程

 1       private void doAcquire(int arg){  2 final Node node = addWaiter(Node.SHARED);//創建一個節點加入同步隊列尾部  3 boolean failed = true;//判斷獲取狀態  4 try{  5 boolean interrupted = false;//是否被中斷過  6 for(;;){  7 final Node p =node.predecessor();//獲取前驅節點  8 if(p == head){  9 int r = tryAcquireShared(arg);//獲取同步狀態 10 if(r >= 0 ){//大於0代表獲取到了 11 setHeadAndPropagate(node,r);//設置為頭節點並且如果有多余資源一並喚醒 12 p.next = null; 13 if(interrupted){ 14   selfInterrupted();//自我中斷 15    } 16   failed = false; 17   return; 18   } 19    } 20            //判斷線程是否可以進行休息如果可以休息就調用park方法 21   if(shouldParkAfterFailedAcquire(p,node) && parkAndCheckInterrupt()){ 22   interrupted = true; 23    }
            }
24  }finally{ 25   if(failed){ 26     cancelAcquire(node); 27    } 28      } 29      }

共享式獲取鎖和獨占式唯一的區別在於setHeadAndPropagate這個方法,獨占式的鎖會去判斷是否為后繼節點,只有后繼節點才有資格在頭節點釋放了同步狀態以后獲取到同步狀態而共享式的實現依靠着setHeadAndPropagate這個方法

 

setHeadAndPorpagate(Node node,int arg):獲取共享同步狀態以后的操作

 1       private void setHeadAndPropaGate(Node node,int propagate){  2                  Node h = head;  3                  setHead(node);//設置為頭節點
 4                 if(propagate >0 || h == null || h.waitStatus < 0){//大於0代表還有其他資源一並可以喚醒
 5                          Node s = node.next;//下一個節點
 6                           if(s == null || s.isShared()){  7  doReleaseShared();  8  }  9  } 10          }

這個方法主要的目的就是將獲取到同步狀態的節點設置為頭節點、如果存在多個資源就將多個資源一並喚醒

 

doReleaseShared():喚醒后繼節點

 1      private void doReleaseShared(int arg){  2            for(;;){  3               Node h = head;  4               if(h != null && h != tail){  5                   int ws = h.waitStatus;//獲取頭節點的等待狀態
 6                   if(!compareAndSetWaitStatus(h,Node.SIGNAL,0)){//設置不成功就一直進行設置
 7                        continue;  8  }  9                   unparkSuccessor(h);//喚醒后繼節點
10               }else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) 11                    continue; 12   } 13               if (h == head) 14                  break; 15         }            

 OK,至此,共享式的獲取鎖也研究過了。讓我們再梳理一下它的流程

  1. tryAcquireShared()嘗試獲取資源,成功則直接返回;
  2. 失敗則通過doAcquireShared()進入同步隊列中,直到頭節點釋放同步狀態后喚醒后繼節點並成功獲取到資源才返回。整個等待過程也是忽略中斷的。

其實跟acquire()的流程大同小異,只不過多了個自己拿到資源后,還會去喚醒后繼隊友的操作(這才是共享嘛)

 

releaseShared():釋放共享同步狀態

1       public final boolean releaseShared(int arg){ 2         //子類自定義釋放鎖操作true代表釋放
3             if(tryReleaseShared(arg)){ 4                 doReleaseShared();//處理釋放的操作
5                 return true; 6  } 7          }

通過子類自定義實現的釋放鎖操作判斷,如果未釋放就什么也不干,而doReleased方法就是去喚醒當前的后繼節點

四、總結

    AQS在並發中是一個非常重要的基礎類,它定義了很多同步組件需要的方法。通過這些方法開發者可以簡單的實現一個相關的鎖。我們詳解了獨占和共享兩種模式下獲取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,相信大家都有一定認識了。值得注意的是,acquire()和acquireSahred()兩種方法下,線程在等待隊列中都是忽略中斷的。AQS也支持響應中斷的,acquireInterruptibly()/acquireSharedInterruptibly()即是,這里相應的源碼跟acquire()和acquireSahred()差不多,這里就簡單闡述一下。

   對於響應中斷的獲取同步狀態操作而言:其會判斷獲取同步狀態的線程是否處於被中斷的狀態,如果處於被中斷的操作就會拋出InterruptedException異常

   對於超時響應的獲取同步狀態而言:內部多了一個時間判斷。其實這些都是在最基礎的獲取鎖上做了一些加強基本的原理還是相同的。

 

 

  

================================================================================== 

不管歲月里經歷多少辛酸和艱難,告訴自己風雨本身就是一種內涵,努力的面對,不過就是一場命運的漂流,既然在路上,那么目的地必然也就是前方。


==================================================================================

    

    

  

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM