AQS同步器的實現原理


1.什么是AQS?

     AQS的核心思想是基於volatile int state這樣的volatile變量,配合Unsafe工具對其原子性的操作來實現對當前鎖狀態進行修改。同步器內部依賴一個FIFO的雙向隊列來完成資源獲取線程的排隊工作。

2.同步器的應用

 同步器主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法來管理同步狀態,對同步狀態的修改或者訪問主要通過同步器提供的3個方法:

  • getState() 獲取當前的同步狀態
  • setState(int newState) 設置當前同步狀態
  • compareAndSetState(int expect,int update) 使用CAS設置當前狀態,該方法能夠保證狀態設置的原子性。

     同步器可以支持獨占式的獲取同步狀態,也可以支持共享式的獲取同步狀態,這樣可以方便實現不同類型的同步組件。

     同步器也是實現鎖的關鍵,在鎖的實現中聚合同步器,利用同步器實現鎖的語義。

3.AQS同步隊列

   同步器AQS內部的實現是依賴同步隊列(一個FIFO的雙向隊列,其實就是數據結構雙向鏈表)來完成同步狀態的管理。

   當前線程獲取同步狀態失敗時,同步器AQS會將當前線程和等待狀態等信息構造成為一個節點(node)加入到同步隊列,同時會阻塞當前線程;

   當同步狀態釋放的時候,會把首節點中的線程喚醒,使首節點的線程再次嘗試獲取同步狀態。AQS是獨占鎖和共享鎖的實現的父類。   

 

4.AQS鎖的類別:分為獨占鎖和共享鎖兩種。

  • 獨占鎖:鎖在一個時間點只能被一個線程占有。根據鎖的獲取機制,又分為“公平鎖”和“非公平鎖”。等待隊列中按照FIFO的原則獲取鎖,等待時間越長的線程越先獲取到鎖,這就是公平的獲取鎖,即公平鎖。而非公平鎖,線程獲取的鎖的時候,無視等待隊列直接獲取鎖。ReentrantLock和ReentrantReadWriteLock.Writelock是獨占鎖。
  • 共享鎖:同一個時候能夠被多個線程獲取的鎖,能被共享的鎖。JUC包中ReentrantReadWriteLock.ReadLock,CyclicBarrier,CountDownLatch和Semaphore都是共享鎖。

  JUC包中的鎖的包括:Lock接口,ReadWriteLock接口;Condition條件,LockSupport阻塞原語。      

  AbstractOwnableSynchronizer/AbstractQueuedSynchronizer/AbstractQueuedLongSynchronizer三個抽象類,

  ReentrantLock獨占鎖,ReentrantReadWriteLock讀寫鎖。CountDownLatch,CyclicBarrier和Semaphore也是通過AQS來實現的。

  下面是AQS和使用AQS實現的一些鎖,以及通過AQS實現的一些工具類的架構圖:

 

                         圖 1.依賴AQS實現的鎖和工具類

                                                                                

 

5.AQS同步器的結構:同步器擁有首節點(head)和尾節點(tail)。同步隊列的基本結構如下:

 

                                                       圖 1.同步隊列的基本結構 compareAndSetTail(Node expect,Node update)

  • 同步隊列設置尾節點(未獲取到鎖的線程加入同步隊列): 同步器AQS中包含兩個節點類型的引用:一個指向頭結點的引用(head),一個指向尾節點的引用(tail),當一個線程成功的獲取到鎖(同步狀態),其他線程無法獲取到鎖,而是被構造成節點(包含當前線程,等待狀態)加入到同步隊列中等待獲取到鎖的線程釋放鎖。這個加入隊列的過程,必須要保證線程安全。否則如果多個線程的環境下,可能造成添加到隊列等待的節點順序錯誤,或者數量不對。因此同步器提供了CAS原子的設置尾節點的方法(保證一個未獲取到同步狀態的線程加入到同步隊列后,下一個未獲取的線程才能夠加入)。  如下圖,設置尾節點:

 圖 2.尾節點的設置  節點加入到同步隊列

  •  同步隊列設置首節點(原頭節點釋放鎖,喚醒后繼節點):同步隊列遵循FIFO,頭節點是獲取鎖(同步狀態)成功的節點,頭節點在釋放同步狀態的時候,會喚醒后繼節點,而后繼節點將會在獲取鎖(同步狀態)成功時候將自己設置為頭節點。設置頭節點是由獲取鎖(同步狀態)成功的線程來完成的,由於只有一個線程能夠獲取同步狀態,則設置頭節點的方法不需要CAS保證,只需要將頭節點設置成為原首節點的后繼節點 ,並斷開原頭結點的next引用。如下圖,設置首節點:

圖 3.首節點的設置

 6.獨占式的鎖的獲取:調用同步器的acquire(int arg)方法可以獲取同步狀態,該方法對中斷不敏感,即線程獲取同步狀態失敗后進入同步隊列,后續對線程進行中斷操作時,線程不會從同步隊列中移除。

    (1) 當前線程實現通過tryAcquire()方法嘗試獲取鎖,獲取成功的話直接返回,如果嘗試失敗的話,進入等待隊列排隊等待,可以保證線程安全(CAS)的獲取同步狀態。

    (2) 如果嘗試獲取鎖失敗的話,構造同步節點(獨占式的Node.EXCLUSIVE),通過addWaiter(Node node,int args)方法,將節點加入到同步隊列的隊列尾部。

    (3) 最后調用acquireQueued(final Node node, int args)方法,使該節點以死循環的方式獲取同步狀態,如果獲取不到,則阻塞節點中的線程。acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點的時候才能嘗試獲取鎖(同步狀態)( p == head && tryAcquire(arg))。

    原因是:1.頭結點是成功獲取同步狀態的節點,而頭結點的線程釋放鎖以后,將喚醒后繼節點,后繼節點線程被喚醒后要檢查自己的前驅節點是否為頭結點。

                2.維護同步隊列的FIFO原則,節點進入同步隊列以后,就進入了一個自旋的過程,每個節點(后者說每個線程)都在自省的觀察。

 下圖為節點自旋檢查自己的前驅節點是否為頭結點:

                              圖 4 節點自旋獲取同步狀態

 

獨占式的鎖的獲取源碼:

acquire方法源碼如下


 1 /**
 2      * Acquires in exclusive(互斥) mode, ignoring(忽視) interrupts.  Implemented
 3      * by invoking at least once {@link #tryAcquire},
 4      * returning on success.  Otherwise the thread is queued(排隊), possibly
 5      * repeatedly(反復的) blocking and unblocking, invoking {@link
 6      * #tryAcquire} until success.  This method can be used
 7      * to implement method {@link Lock#lock}.
 8      *
 9      * @param arg the acquire argument.  This value is conveyed(傳達) to
10      *        {@link #tryAcquire} but is otherwise uninterpreted and
11      *        can represent anything you like.
12      *        
13      *  獨占式的獲取同步狀態      
14      *        
15      */
16     public final void acquire(int arg) {
17         if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
18             selfInterrupt();
19     }

嘗試獲取鎖:tryAcquire方法:如果獲取到了鎖,tryAcquire返回true,反之,返回false。

 1 //方法2:
 2     protected final boolean tryAcquire(int acquires) {
 3         // 獲取當前線程
 4         final Thread current = Thread.currentThread();
 5         // 獲取“獨占鎖”的狀態,獲取父類AQS的標志位
 6         int c = getState();
 7         //c == 0 意思是鎖(同步狀態)沒有被任何線程所獲取
 8         //1.當前線程是否是同步隊列中頭結點Node,如果是的話,則獲取該鎖,設置鎖的狀態,並設置鎖的擁有者為當前線程
 9         if (c == 0) {
10             if (!hasQueuedPredecessors() &&
11                 
12                // 修改下狀態為,這里的acquires的值是1,是寫死的調用子類的lock的方法的時候傳進來的,如果c == 0,compareAndSetState操作會更新成功為1.
13                 compareAndSetState(0, acquires)) {
14                 // 上面CAS操作更新成功為1,表示當前線程獲取到了鎖,因為將當前線程設置為AQS的一個變量中,代表這個線程拿走了鎖。
15                 setExclusiveOwnerThread(current);
16                 return true;
17             }
18         }
19         //2.如果c不為0,即狀態不為0,表示鎖已經被拿走。
20         //因為ReetrantLock是可重入鎖,是可以重復lock和unlock的,所以這里還要判斷一次,獲取鎖的線程是否為當前請求鎖的線程。
21         else if (current == getExclusiveOwnerThread()) {
22             //如果是,state繼續加1,這里nextc的結果就會 > 1,這個判斷表示獲取到的鎖的線程,還可以再獲取鎖,這里就是說的可重入的意思
23             int nextc = c + acquires;
24             if (nextc < 0)
25                 throw new Error("Maximum lock count exceeded");
26             setState(nextc);
27             return true;
28         }
29         return false;
30     }
addWaiter方法的源碼:回到aquire方法,如果嘗試獲取同步狀態(鎖)失敗的話,則構造同步節點(獨占式的Node.EXCLUSIVE),
通過addWaiter(Node node,int args)方法
將該節點加入到同步隊列的隊尾。
 1 /**
 2     * Creates and enqueues node for current thread and given mode.
 3     *
 4     * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
 5     * @return the new node
 6     * 
 7     * 
 8     * 如果嘗試獲取同步狀態失敗的話,則構造同步節點(獨占式的Node.EXCLUSIVE),通過addWaiter(Node node,int args)方法將該節點加入到同步隊列的隊尾。
 9     * 
10     */
11     private Node addWaiter(Node mode) {
12         // 用當前線程夠着一個Node對象,mode是一個表示Node類型的字段,或者說是這個節點是獨占的還是共享的,或者說AQS的這個隊列中,哪些節點是獨占的,哪些節點是共享的。
13         Node node = new Node(Thread.currentThread(), mode);
14         // Try the fast path of enq; backup to full enq on failure
15         Node pred = tail;
16         //隊列不為空的時候
17         if (pred != null) {
18             node.prev = pred;
19             // 確保節點能夠被線程安全的添加,使用CAS方法
20             // 嘗試修改為節點為最新的節點,如果修改失敗,意味着有並發,這個時候進入enq中的死循環,進行“自旋”的方式修改
21             if (compareAndSetTail(pred, node)) {
22                 pred.next = node;
23                 return node;
24             }
25         }
26         //進入自旋
27         enq(node);
28         return node;
29     }
enq方法的源碼:同步器通過死循環的方式來保證節點的正確添加,在“死循環” 中通過CAS將節點設置成為尾節點之后,當前線程才能從該方法中返回,否則
當前線程不斷的嘗試設置。

enq方法將並發添加節點的請求通過CAS變得“串行化”了。

 1 /**
 2      * Inserts node into queue, initializing if necessary. See picture above.
 3      * @param node the node to insert
 4      * @return node's predecessor
 5      * 
 6      * 同步器通過死循環的方式來保證節點的正確添加,在“死循環” 中通過CAS將節點設置成為尾節點之后,當前線程才能從該方法中返回,否則當前線程不斷的嘗試設置。
 7      * enq方法將並發添加節點的請求通過CAS變得“串行化”了。
 8      * 
 9      */
10     private Node enq(final Node node) {
11         for (;;) {
12             Node t = tail;
13             if (t == null) { // Must initialize
14                 if (compareAndSetHead(new Node()))
15                     tail = head;
16             } else {
17                 node.prev = t;
18                 if (compareAndSetTail(t, node)) {
19                     t.next = node;
20                     return t;
21                 }
22             }
23         }
24     }
acquireQueued方法:在隊列中的線程獲取鎖的過程:
 1 /**
 2     * Acquires in exclusive uninterruptible mode for thread already in
 3     * queue. Used by condition wait methods as well as acquire.
 4     *
 5     * @param node the node
 6     * @param arg the acquire argument
 7     * @return {@code true} if interrupted while waiting
 8     * 
 9     * acquireQueued方法當前線程在死循環中獲取同步狀態,而只有前驅節點是頭節點才能嘗試獲取同步狀態(鎖)( p == head && tryAcquire(arg))
10     *     原因是:1.頭結點是成功獲取同步狀態(鎖)的節點,而頭節點的線程釋放了同步狀態以后,將會喚醒其后繼節點,后繼節點的線程被喚醒后要檢查自己的前驅節點是否為頭結點。
11     *           2.維護同步隊列的FIFO原則,節點進入同步隊列之后,就進入了一個自旋的過程,每個節點(或者說是每個線程)都在自省的觀察。
12     * 
13     */
14     final boolean acquireQueued(final Node node, int arg) {
15         boolean failed = true;
16         try {
17             boolean interrupted = false;
18             //死循環檢查(自旋檢查)當前節點的前驅節點是否為頭結點,才能獲取鎖
19             for (;;) {
20                 // 獲取節點的前驅節點
21                 final Node p = node.predecessor();
22                 if (p == head && tryAcquire(arg)) {//節點中的線程循環的檢查,自己的前驅節點是否為頭節點
23                     //將當前節點設置為頭結點,移除之前的頭節點
24                     setHead(node);
25                     p.next = null; // help GC
26                     failed = false;
27                     return interrupted;
28                 }
29                 // 否則檢查前一個節點的狀態,看當前獲取鎖失敗的線程是否要掛起
30                 if (shouldParkAfterFailedAcquire(p, node) &&
31                     //如果需要掛起,借助JUC包下面的LockSupport類的靜態方法park掛起當前線程,直到被喚醒
32                     parkAndCheckInterrupt())
33                     interrupted = true;
34             }
35         } finally {
36             //如果有異常
37             if (failed)
38                 //取消請求,將當前節點從隊列中移除
39                 cancelAcquire(node);
40         }
41     }

獨占式的獲取同步狀態的流程如下: 

圖5 獨占式的獲取同步狀態的流程

 7.獨占鎖的釋放:下面直接看源碼:

 1 /* 
 2   1. unlock():unlock()是解鎖函數,它是通過AQS的release()函數來實現的。
 3  * 在這里,“1”的含義和“獲取鎖的函數acquire(1)的含義”一樣,它是設置“釋放鎖的狀態”的參數。
 4  * 由於“公平鎖”是可重入的,所以對於同一個線程,每釋放鎖一次,鎖的狀態-1。
 5 
 6     unlock()在ReentrantLock.java中實現的,源碼如下:
 7  */
 8     public void unlock() {
 9         sync.release(1);
10     }

release()會調用tryRelease方法嘗試釋放當前線程持有的鎖(同步狀態),成功的話喚醒后繼線程,並返回true,否則直接返回false

 1 /**
 2     * Releases in exclusive mode.  Implemented by unblocking one or
 3     * more threads if {@link #tryRelease} returns true.
 4     * This method can be used to implement method {@link Lock#unlock}.
 5     *
 6     * @param arg the release argument.  This value is conveyed to
 7     *        {@link #tryRelease} but is otherwise uninterpreted and
 8     *        can represent anything you like.
 9     * @return the value returned from {@link #tryRelease}
10     * 
11     * 
12     * 
13     */
14     public final boolean release(int arg) {
15         if (tryRelease(arg)) {
16             Node h = head;
17             if (h != null && h.waitStatus != 0)
18                 unparkSuccessor(h);
19             return true;
20         }
21         return false;
22     }
 1 // tryRelease() 嘗試釋放當前線程的同步狀態(鎖)
 2   protected final boolean tryRelease(int releases) {
 3             //c為釋放后的同步狀態
 4           int c = getState() - releases;
 5           //判斷當前釋放鎖的線程是否為獲取到鎖(同步狀態)的線程,不是拋出異常(非法監視器狀態異常)
 6           if (Thread.currentThread() != getExclusiveOwnerThread())
 7               throw new IllegalMonitorStateException();
 8           boolean free = false;
 9           //如果鎖(同步狀態)已經被當前線程徹底釋放,則設置鎖的持有者為null,同步狀態(鎖)變的可獲取
10           if (c == 0) {
11               free = true;
12               setExclusiveOwnerThread(null);
13           }
14           setState(c);
15           return free;
16       }

釋放鎖成功后,找到AQS的頭結點,並喚醒它即可:

 1 // 4. 喚醒頭結點的后繼節點
 2      
 3      private void unparkSuccessor(Node node) {
 4          //獲取頭結點(線程)的狀態
 5         int ws = node.waitStatus;
 6         //如果狀態<0,設置當前線程對應的鎖的狀態為0
 7         if (ws < 0)
 8             compareAndSetWaitStatus(node, ws, 0);
 9             
10         Node s = node.next;
11         
12          //解釋:Thread to unpark is held in successor, which is normally just the next node. 
13          //But if cancelled or apparently(顯然) null, traverse backwards(向后遍歷) from tail to find the actual(實際的) non-cancelled successor(前繼節點).
14          //從隊列尾部開始往前去找最前面的一個waitStatus小於0的節點。
15         if (s == null || s.waitStatus > 0) {
16             s = null;
17             for (Node t = tail; t != null && t != node; t = t.prev)
18                 if (t.waitStatus <= 0)
19                     s = t;
20         }
21         //喚醒后繼節點對應的線程
22         if (s != null)
23             LockSupport.unpark(s.thread);
24     }

 上面說的是ReentrantLock的公平鎖獲取和釋放的AQS的源碼,唯獨還剩下一個非公平鎖NonfairSync沒說,其實,它和公平鎖的唯一區別就是獲取鎖的方式不同,公平鎖是按前后順序一次獲取鎖,非公平鎖是搶占式的獲取鎖,那ReentrantLock中的非公平鎖NonfairSync是怎么實現的呢?

 1 /**
 2      * Sync object for non-fair locks
 3      */
 4     static final class NonfairSync extends Sync {
 5         private static final long serialVersionUID = 7316153563782823691L;
 6 
 7         /**
 8          * Performs lock.  Try immediate barge, backing up to normal
 9          * acquire on failure.
10          */
11         final void lock() {
12             if (compareAndSetState(0, 1))
13                 setExclusiveOwnerThread(Thread.currentThread());
14             else
15                 acquire(1);
16         }
17 
18         protected final boolean tryAcquire(int acquires) {
19             return nonfairTryAcquire(acquires);
20         }
21     }

非公平鎖的lock的時候多了上面加粗的代碼:在lock的時候先直接用cas判斷state變量是否為0(嘗試獲取鎖),成功的話更新成1,表示當前線程獲取到了鎖,不需要在排隊,從而直接搶占的目的。而對於公平鎖的lock方法是一開始就走AQS的雙向隊列排隊獲取鎖。更詳細的關於ReentrantLock的實現請看后面寫的一篇文章:http://www.cnblogs.com/200911/p/6035765.html

 

 總結:在獲取同步狀態的時候,同步器維護一個同步隊列,獲取失敗的線程會被加入到隊列中並在隊列中自旋;移除隊列(或停止自旋)的條件是前驅節點為頭結點並且獲取到了同步狀態。在釋放同步狀態時,同步器調用tryRelease(int args)方法釋放同步狀態,然后喚醒頭結點的后繼節點。AQS的實現思路其實並不復雜,用一句話准確的描述的話,其實就是使用標志狀態位status(volatile int state)和 一個雙向隊列的入隊和出隊來實現。AQS維護一個線程何時訪問的狀態,它只是對狀態負責,而這個狀態的含義,子類可以自己去定義。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM