AQS全解析


 什么是AQS?

  AQS是JUC內容的基石,它本質上是一個抽象類,定義了多線程下資源爭奪與釋放的規則和過程,許多實現類都是繼承於AQS,使用AQS的骨架。

 

 

 AQS的原理

  AQS總體上來看是由一個FIFO(先進先出)隊列和一個state屬性配合組成的。FIFO用來存儲線程結點的,state屬性用來表示資源的狀態,如果為0表示空閑,如果資源被某個線程獲取到,那么這個state就會+1,釋放-1。當其他線程試圖爭奪資源時會檢查state值,如果發現不為0就會放棄爭奪。

當然這只是總體上的原理,如果想要了解其中的細節,還需要閱讀相應的源碼才能徹底弄清楚其中的細節。

 

源碼剖析

結構

  要想完整知道AQS的原理,需要從它的源碼出發,查看它的內部結構。這里只針對幾個重要內部類和屬性說明。

   

  從左圖可以看出在AQS內部含有一個內部類Node,這個Node就是上面提到的隊列中存儲的線程結點對象對應的類,可以看到它包含prev,next屬性,所以可以看出這是一個雙向鏈表結構形成的隊列。waitStatus表示當前結點對應線程的狀態,它的值也在屬性中設置了,就是1,-1,-2,-3那幾個常量屬性。1表示線程的請求已經被取消了,-1表示線程已經喚醒,正在資源釋放,-2表示線程還在阻塞狀態中,等待喚醒,-3表示鎖是共享鎖。

  右圖是AQS的屬性,head表示隊列的頭結點,tail表示隊列的尾結點,state表示資源的狀態。

 

過程

  這里從ReentrantLock為例,查看它的lock、unlock方法的源碼過程。

  首先需要知道ReentrantLock的繼承關系。

  

   sync、FairSync、NonfairSync都是ReentrantLock的內部類,其中Sync是直接繼承AQS的,而ReentrantLock在定義時可以聲明為公平鎖或者是非公平鎖,所以內部設置了兩個內部類,一個 FairSync 表示公平鎖,一個 NonfairSync 表示非公平鎖,這兩個類又是繼承Sync,實際執行的方法會根據鎖性質的不同而選擇執行這兩個類中對應的實現方法。

 

lock()

public void lock() {
        sync.lock();
}







abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -5179523762034025860L;

        /**
         * Performs {@link Lock#lock}. The main reason for subclassing
         * is to allow fast path for nonfair version.
         */
        abstract void lock();
        ....
}
View Code

 可以看到這里直接跳轉到一個sync的抽象方法,上面也說了,這里會根據鎖的性質來選擇不同的實現執行。

 1 static final class FairSync extends Sync {
 2         private static final long serialVersionUID = -3000897897090466540L;
 3 
 4         final void lock() {
 5             acquire(1);
 6         }
 7    ....
 8 }
 9 
10 
11 
12 
13 static final class NonfairSync extends Sync {
14         private static final long serialVersionUID = 7316153563782823691L;
15 
16         /**
17          * Performs lock.  Try immediate barge, backing up to normal
18          * acquire on failure.
19          */
20         final void lock() {
21             if (compareAndSetState(0, 1))
22                 setExclusiveOwnerThread(Thread.currentThread());
23             else
24                 acquire(1);
25         }
26     ....
27 }

  可以看出公平鎖的實現是比較簡單的,因為公平鎖是需要遵守隊列秩序,按順序執行就可以了,而非公平鎖則沒有那么 "老實" ,它會先嘗試獲取鎖,如果之前獲取資源的線程正好執行完了或者調用wait等方法釋放鎖了,那么就會 "插隊" 直接奪取資源執行。這里就看一下更復雜的非公平鎖是如何執行的。

1、compareAndSetState 方法

  根據NonfairSync對lock方法的實現可以看到,第一步會執行 compareAndSetState 方法。

 1 protected final boolean compareAndSetState(int expect, int update) {
 2     // See below for intrinsics setup to support this
 3     return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
 4 }
 5 
 6 
 7 
 8 
 9 
10 public final native boolean compareAndSwapInt(Object var1, long var2, int var4, int var5);

發現這里最終調用的是一個本地方法,其實這個方法就是一個CAS樂觀鎖方法,compareAndSwapInt 方法的四個參數分別是對象,屬性偏移地址(是unsafe類中直接操作特定內存數據的方式,unsafe是CAS實現的核心類),期望值,更新值。當修改時檢查該對象屬性值等於期望值就更新成功,否則就失敗。而這里的 stateOffset 又是哪個屬性呢?

 1 private static final Unsafe unsafe = Unsafe.getUnsafe();
 2     private static final long stateOffset;
 3     private static final long headOffset;
 4     private static final long tailOffset;
 5     private static final long waitStatusOffset;
 6     private static final long nextOffset;
 7 
 8     static {
 9         try {
10             stateOffset = unsafe.objectFieldOffset
11                 (AbstractQueuedSynchronizer.class.getDeclaredField("state"));
12             headOffset = unsafe.objectFieldOffset
13                 (AbstractQueuedSynchronizer.class.getDeclaredField("head"));
14             tailOffset = unsafe.objectFieldOffset
15                 (AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
16             waitStatusOffset = unsafe.objectFieldOffset
17                 (Node.class.getDeclaredField("waitStatus"));
18             nextOffset = unsafe.objectFieldOffset
19                 (Node.class.getDeclaredField("next"));
20 
21         } catch (Exception ex) { throw new Error(ex); }
22 }

可以看出這個 stateOffset 屬性就是 AQS 的 state 屬性。所以在lock方法里首先是嘗試將state改成1,如果成功就繼續執行條件代碼塊中的代碼。也就是 setExclusiveOwnerThread 方法,這個方法實現是這樣的。

1 protected final void setExclusiveOwnerThread(Thread thread) {
2     exclusiveOwnerThread = thread;
3 }

 關於這個方法和這個屬性可以看到是屬於 AbstractOwnableSynchronizer 這個類的,而這個類又是AQS的父類,所以也是從 AbstractOwnableSynchronizer 繼承而來的,這個屬性就是表示當前占用資源的線程。所以第一步是直接使用CAS嘗試搶占鎖,如果成功就修改相關屬性,然后結束。如果失敗就執行 acquire 方法。

 

2、acquire 方法

  這個方法是AQS中的方法。在這個方法里面又包含許多小的方法。首先先看一下源碼。

1 public final void acquire(int arg) {
2   if (!tryAcquire(arg) &&
3     acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
4     selfInterrupt();
5 }

  

  2.1、tryAcquire 方法:嘗試獲取鎖資源以及判斷是否是當前線程已獲取到鎖資源並重復加鎖

   這個是一個抽象方法。下面是 NonFairSync 實現的相關代碼。

 1 protected final boolean tryAcquire(int acquires) {
 2   return nonfairTryAcquire(acquires);
 3 }
 4 
 5 
 6 
 7 
 8 final boolean nonfairTryAcquire(int acquires) {
 9   final Thread current = Thread.currentThread();
10   int c = getState();
11   if (c == 0) {
12     if (compareAndSetState(0, acquires)) {
13       setExclusiveOwnerThread(current);
14       return true;
15           }
16        }
17   else if (current == getExclusiveOwnerThread()) {
18     int nextc = c + acquires;
19     if (nextc < 0) // overflow
20       throw new Error("Maximum lock count exceeded");
21       setState(nextc);
22       return true;
23         }
24   return false;
25 }

  這個方法首先是判斷當前資源是否空閑(state=0),如果空閑就將相關屬性進行修改(還是上面說得 exclusiveOwnerThread 屬性),然后結束,返回 true(這是針對之前占用資源的線程剛好釋放鎖的情況);否則檢查當前線程是否和占用資源的線程屬性一致,如果一致就將state+傳參值(一般情況下是+1),然后結束,返回 true(這是針對當前線程在已占用資源的情況下再次加鎖(可重入鎖));負責返回 false (獲取鎖失敗)。 

 

  2.2、addWriter 方法:執行隊列初始化以及Node結點插入操作並返回這個結點

 1   private Node addWaiter(Node mode) {
 2     Node node = new Node(Thread.currentThread(), mode);
 3     // Try the fast path of enq; backup to full enq on failure
 4     Node pred = tail;       // 獲取尾結點
 5     if (pred != null) {
 6       node.prev = pred;
 7       if (compareAndSetTail(pred, node)) {        // 判讀尾結點是否為空,如果不為空就直接將當前結點新增至尾結點之后作為尾結點
 8         pred.next = node;
 9         return node;
10       }
11     }
12     enq(node);      // 隊列初始化以及執行插入操作
13     return node;
14  }

 

  enp方法:初始化方法

 private AbstractQueuedSynchronizer.Node enq(final AbstractQueuedSynchronizer.Node node) {
   for (;;) {
     Node t = tail;
     if (t == null) { // 隊列初始化,如果尾結點為空就新建一個空結點作為頭結點,並且因為是for循環所以在初始化隊列后還會繼續執行插入操作
       if (compareAndSetHead(new Node()))
         tail = head;
       } else {
         node.prev = t;
         if (compareAndSetTail(t, node)) {       // 執行插入操作,然后return 返回
          t.next = node;
          return t;
       }
    }
  }
}

 從addWriter方法的源碼可以知道,這個方法就是執行隊列初始化以及Node結點插入操作的,並且在隊列的頭結點會是一個空結點(哨兵結點)。

 

  2.3、acquireQueued 方法:控制線程的阻塞

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();      // 獲取當前結點的前一個結點
                if (p == head && tryAcquire(arg)) {     // 如果是頭結點且嘗試獲取鎖資源成功,就將當前結點設為頭結點(哨兵結點),
                    // 然后將之前的頭結點引用全部消除,讓它順利回收,再返回中斷狀態false 。  
            // 這里的代碼是當前線程獲取到鎖后執行的(是非公平鎖,在還沒加入隊列正好碰上占用線程釋放了鎖資源或者是正常在隊列中收到阻塞喚醒,也就是其他線程執行了unlock方法)
setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 先判斷修改前一個線程結點的waitStatus(防止中途跳出取消等待),
                               // 如果符合再進行線程的阻塞,通過后將中斷狀態設為true(因為獲取到資源),執行后面的出隊操作
interrupted = true; } } finally { if (failed) cancelAcquire(node); // 如果線程發生異常,避免沒有執行線程出隊的代碼所以這里使用finally強制執行,將線程從隊列中移除 } }

  shouldParkAfterFailedAcquire:檢查前面一個線程的waitStatus狀態,如果不是1(取消執行),那么就將當前線程正式加入阻塞隊列

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)      // 表示線程准備就緒,直接返回true
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
            return true;
        if (ws > 0) {       // 表示線程請求取消了,將跳過該線程往后找直到<=0
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     // 找到后將其設為-1(就緒)
        }
        return false;       // 雖然這里返回false,但是上一個方法是for循環,所以下一個循環還是會返回true來繼續執行后面的判斷代碼
    }

  parkAndCheckInterrupt:進行線程的阻塞並清除該線程的中斷。

private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);     // 將當前線程阻塞,效果與wait()、notify()一樣,不過更靈活,
        // 不需要在同步代碼塊中也不需要按照先加鎖后解鎖的方式,它是通過“通行證”完成的
        return Thread.interrupted();
    }

  到這里如果線程沒有釋放資源的話,那么當前線程就會因為LockSupport的park方法進入阻塞,正式進入阻塞隊列等待資源釋放。而讓它解除阻塞就是靠unlock()方法,unlock()方法下面也會說到。這個方法在最后調用了interrupted()方法,這個方法的作用在多線程基礎中說過,是返回當前線程的中斷狀態,同時清除這個中斷。這里清除的目的是防止當前線程不能被重新阻塞,因為當前鎖是非公平鎖,雖然現在調用了unlock()后當前線程准備就緒了,但是還是有可能被其他線程中途搶占資源的,這時候當前線程就需要重新阻塞,這里如果沒有清除,那么假如本身是中斷狀態后就不會被阻塞,那么這個非公平鎖機制就會被破壞。acquireQueued方法的返回值就是是否消除了當前線程的中斷狀態,如果是true,就需要再添加一個中斷。

 

  2.4、SelfInterrupt :為該線程設置一個中斷

static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

interrupt()方法就是為當前線程設置一個中斷狀態,執行了這段代碼就說明前面執行了消除線程的中斷狀態,所以這里需要再補回去。

 

unlock方法

  這里調用的是內部sync的release方法。

public void unlock() {
        sync.release(1);
    }





    public final boolean release(int arg) {
        if (tryRelease(arg)) {      // 判斷資源是否空閑
            Node h = head;
            if (h != null && h.waitStatus != 0)     // 如果頭結點不為空且狀態不是初始值就喚醒隊列中第一個有效線程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

 

1、tryRelease 方法:更新state等屬性並返回資源空閑狀態

  ReentrantLock的實現方法:

 protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }




protected final boolean tryRelease(int releases) {
        int c = getState() - releases;      // 讓state減去參數值
        if (Thread.currentThread() != getExclusiveOwnerThread())    // 如果當前線程不是占用資源的線程,就拋出異常
            throw new IllegalMonitorStateException();
        boolean free = false;       // 當前資源是否空閑
        if (c == 0) {
            free = true;        // 如果state變成0,就設為true,再將當前占用線程屬性設為null
            setExclusiveOwnerThread(null);
        }
        setState(c);        // 更新state
        return free;
    }

 

2、unparkSuccessor:將頭結點的waitStatus設為初始值,並喚醒隊列第一個有效結點對應的線程(如果頭結點下一個結點不符合條件就從隊尾開始找到第一個合適的線程)
private void unparkSuccessor(Node node) {   // 因為傳來的頭結點,所以這里的node就是頭結點
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);       // 將頭結點的waitStatus設為0

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {    // 下一個線程結點不滿足條件執行(結點為空或者取消請求了)
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)     // 從隊列尾部循環遍歷,找到前一個有效結點
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);   // 將其釋放
    }

在執行完這段代碼后隊列中第一個有效線程就會被喚醒,隨后成為新的哨兵結點,而前一個線程的引用也會被斷開。需要注意的是當頭結點后面一個結點不符合條件,是從隊尾開始遍歷直到找到第一個合適的線程喚醒的,而不是從頭部開始遍歷。至於原因可以參考 https://www.zhihu.com/question/50724462 ,其中有一個解釋比較有道理,那就是在上面 addWrite 方法中的 enq 方法中,是先執行 "node.prev=t " 以及 "compareAndSetTail(t, node)" 的,然后才執行后一句 "t.next = node;"  在此之間可能就會發生鎖的釋放,如果是從head開始向后遍歷,那么因為 "t.next=node" 還沒有執行,所以新加的結點就遍歷不到,這就會影響總體的性能。

 

而在 unlock()喚醒合適的線程之后,上面lock中的代碼就會繼續往后執行。

 

下面是總體上大概的流程圖:

如果不清晰,可以查看原圖 https://www.processon.com/view/5fa68f20f346fb686799d4b4 。

 

 

總結

  AQS是JUC並發編程的基石,它定義了線程執行的過程。總體上來看其原理主要通過state和一個FIFO隊列組成。state展示資源的占用狀態,隊列用來存儲排隊的線程(頭結點是哨兵結點)。每個線程結點包含一個等待狀態屬性waitStatus,用來表示對應線程的等待狀態。需要注意的是,1.是非公平鎖時隊列並一定是先進先出的,當順數第一個線程中斷了等待且沒有其他線程搶奪資源時,就會從隊列尾部遍歷找到第一個沒有中斷的線程喚醒執行。2.隊列頭結點並不是下一個會檢查執行的線程結點,而是一個哨兵結點,下一個會檢查第二個。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM