或許我可以把AQS講清楚


AQS是JUC包中許多類的實現根基,這篇文章基於個人理解的前提下完成,所以在結構上跟其他AQS文章有些差異。

1 AQS內臟圖

tips:如果只是想看AQS的實現的話可以從第三節開始看,前面只是講結構和使用

1.1 整體結構

  在開始了解AQS之前,先看下AQS的內部結構,這樣在看實現代碼的時候至少有一個整體的概念,重點要記住的是Node類幾種狀態的作用,其他結構有個概念就行。

1575639520339

  如上,在AQS中大致有:

  1. state變量:內部維護了一個volatile修飾的資源變量state,可以簡單的理解為鎖,拿到資源就是拿到鎖。
  2. 同步隊列(CLH):所有關於資源的搶奪都是在這個隊列中發生的;在AQS中只存了頭尾節點,本質上就是一種雙向鏈表,隊列為先進先出隊列(FIFO),也就是說對於資源state的爭奪都是按照隊列中的順序來的,另外能參與資源爭奪的隊列只有有效的節點(節點狀態不為取消或者同步)
  3. 等待隊列:跟同步隊列類似,只有頭尾節點,不同是的其在一個內部類ConditionObjet中,也就是說一個ConditionObject對象就是一個等待隊列,所以允許有多個。處於等待隊列中的節點不會參與資源的競爭,其狀態為CONDITION,當節點被標記為CONDITION時(await方法)其會從同步隊列中移除,加入對應的等待隊列,而如果等待隊列中的節點被喚醒(例如調用condition.signalAll())時會節點重新被放入同步隊列尾部參與資源的競爭(ReentrantLock按組喚醒線程的實現原理就是這個)。

1.2 內部類Node

  在AQS中,內部類有兩個:NodeConditionObjectNode是隊列的實現根基,里面存放了許多重要的信息,如操作的線程、線程競爭的狀態(特別重要)等;而ConditionObject則是Condition接口的實現類,用來實現喚醒指定線程組的(等待隊列)。

  關系如下圖(下方的Waiter節點也是Node節點,這里為了便於區分取名不同):

1575905757547

Node內部類AQS兩個隊列的實現節點。

  • waitStatus :節點狀態,取值為-3~1(整數)。當狀態為1時表示沒用了,其他狀態表示是有用的。

    0:初始狀態或者不代表任何意義時的取值。

    SIGNAL(-1):這個狀態一般由下一個節點來設置,代表的意思是當前節點在釋放了資源后將后續節點的線程喚醒。(大白話就是后續節點拜托前方的大哥東西用完了叫他,他先去睡會兒)

    CONDITION(-2):表示節點處於等待隊列中,等待隊列中的節點不會參與資源競爭,必須從等待隊列出來后重新加入同步隊列才能參與競爭。

    PROPAGATE(-3):在共享模式的時候用到。共享模式下,不僅只是喚醒下個節點,還可能喚醒下下個節點(根據當前剩余資源state的值能否滿足最近節點的需求決定)。

    CANCELLED(1):表示該節點沒用了,可能是等太久了,也可能是其他原因,總之就是廢了,處於該狀態的節點不會再改變,所以AQS中經常會判斷節點狀態是否大於0來檢查節點是否還有用。

  • thread:爭奪資源的線程,存放在節點當中。
  • prev:同步隊列中的上一個節點。
  • next:同步隊列的下一個節點。
  • nextWaiter:下一個等待節點,用來實現等待隊列。

2 簡單的使用AQS

  現在對AQS有了模模糊糊的了解,來看看要如何使用這個框架。其采用模板設計模式實現,定義了許多頂級方法如acquirerelease等,這些方法子類不能重寫但是可以調用,而要正確的使用這些方法則要按照其要求重寫一些方法如tryAcquire頂級方法內部調用了開放方法)。

  可以重寫的方法有tryAcquiretryReleasetryAcquireSharedtryReleaseSharedisHeldExclusively共五種,每個方法里面沒有具體的實現,反而是直接拋出了異常,但是不一定要全部重寫,比方說只重寫tryAcquiretryRelease則表示要實現的是獨占模式的鎖。

protected boolean tryAcquire(int arg) {
    throw new UnsupportedOperationException();
}

  這些方法表示嘗試去獲取資源或者釋放資源,其實現必須要跟state資源狀態相關,舉個例子,tryAcquire方法表示以獨占的方式嘗試獲取資源,如果獲取到了那么其他線程不得操作其資源,其中入參的arg則表示想要獲取到的資源數量,例如我tryAcquire(5)成功了,那么狀態變量state變量則增加5,如果tryRelease(5)成功則state狀態變量減少5,等到state==0的時候則表示資源被釋放,即可以理解為鎖被釋放。

  如果只是使用AQS的話,再加上幾個變更狀態的方法就可以了,我們不需要了解更多的東西,如同AQS的文檔給出的案例一般,簡單的重寫幾個方法便可以實現一種鎖,如下,一個不可重入鎖的簡單實現。

class Mutex implements Lock, java.io.Serializable {

   // 同步內部類,鎖的真正操作都是通過該類的操作 
   private static class Sync extends AbstractQueuedSynchronizer {
     // 檢查當前是否已經處於鎖定的狀態
     protected boolean isHeldExclusively() {
       return getState() == 1;
     }

     // 如果資源變量為0,則獲取鎖(資源)
     public boolean tryAcquire(int acquires) {
       // acquires的值只能是1,否則的話不進入下面代碼
       assert acquires == 1;
       if (compareAndSetState(0, 1)) {
         // 設置持有當前鎖的線程
         setExclusiveOwnerThread(Thread.currentThread());
         return true;
       }
       return false;
     }

     // 通過將狀態變量state設定為0來表示鎖的釋放
     protected boolean tryRelease(int releases) {
       // 傳入的參數只能是1,否則是無效操作
       assert releases == 1; 
       // 如果狀態狀態等於0,說明不是鎖定狀態
       if (getState() == 0) throw new IllegalMonitorStateException();
       setExclusiveOwnerThread(null);
       setState(0);
       return true;
     }

     // 提供Condition,返回其AQS內部類ConditionObject
     Condition newCondition() { return new ConditionObject(); }

     // 反序列化
     private void readObject(ObjectInputStream s)
         throws IOException, ClassNotFoundException {
       s.defaultReadObject();
       setState(0); // reset to unlocked state
     }
   }

   // 內部類已經實現了所有需要的方法,我們只要封裝一層就行
   private final Sync sync = new Sync();

   public void lock()                { sync.acquire(1); }
   public boolean tryLock()          { return sync.tryAcquire(1); }
   public void unlock()              { sync.release(1); }
   public Condition newCondition()   { return sync.newCondition(); }
   public boolean isLocked()         { return sync.isHeldExclusively(); }
   public boolean hasQueuedThreads() { return sync.hasQueuedThreads(); }
   public void lockInterruptibly() throws InterruptedException {
     sync.acquireInterruptibly(1);
   }
   public boolean tryLock(long timeout, TimeUnit unit)
       throws InterruptedException {
     return sync.tryAcquireNanos(1, unit.toNanos(timeout));
   }
 }

進行一個小測試

public static void main(String[] args) {
    Lock lock = new Mutex();
    new Thread(() -> {
        lock.lock();
        try {
            System.err.println("獲得鎖線程名:" + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(3);
            System.err.println("3秒過去....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.err.println(Thread.currentThread().getName() + "釋放鎖");
        }
    }).start();

    new Thread(() -> {
        lock.lock();
        try {
            System.err.println("獲得鎖線程名:" + Thread.currentThread().getName());
            TimeUnit.SECONDS.sleep(3);
            System.err.println("3秒過去....");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
            System.err.println(Thread.currentThread().getName() + "釋放鎖");
        }
    }).start();
}

最終的結果圖如下

1575643000409

  這樣就實現了一個不可重入鎖,是不是看起來很簡單?

3 AQS的內部實現

  首先要先明白的是AQS分為兩種模式——獨占模式共享模式。一般來說只會用到其中一種,兩種模式的資源競爭都是在同步隊列中發生的,不要跟等待隊列混淆。

獨占模式:每次只能允許一個節點獲取到資源,每次釋放資源之后也只會喚醒后驅節點。

共享模式:每次可以允許多個節點按照順序獲取資源,每次釋放頭節點資源后可能會喚醒后驅的后驅。(下方講實現的時候有解釋)

3.1 獨占式釋放資源——acquire

來看acquire方法(如果講的不是容易讓人理解,可以結合后方的流程圖一起),ReentrantLocklock就是這個方法,可以類比理解。

  在看代碼需要明確知道的是,tryAcquiretryRelease這些操作才是對資源的獲取和釋放AQS中的頂級方法如acquire的作用只是對資源獲取操作之后的處理。

// 代碼邏輯不復雜,首先嘗試獲取資源,如果成功則直接返回,失敗則加入同步隊列爭奪資源
public final void acquire(int arg) {
    // 嘗試獲得鎖,如果失敗了則增加節點放入等待隊列中
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

  可以看到整體的方法十分簡單,就在一個if條件中調用了3個方法,tryAcquire就不說了,先說下addWaiter做了什么,addWaiter方法將當前線程封裝成一個節點放入同步隊列的尾部,如果失敗就不斷的嘗試直到成功為止,其方法代碼如下。

private Node addWaiter(Node mode) {
    // 將當前線程封裝入一個節點之中,mode代表共享模式還是獨占模式
    Node node = new Node(Thread.currentThread(), mode);
    
    // 首先嘗試一次快速的尾隨其后,如果失敗的話則采用正常方式入隊
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 入隊操作
    enq(node);
    return node;
}

再看下正常的入隊操作

private Node enq(final Node node) {
    // 自旋
    for (;;) {
        Node t = tail;
        // 如果同步隊列是空的話則進行隊列的初始化
        if (t == null) { 
            // 這里注意初始化的時候head是一個新增的Node,其waitStatus為0
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            // 否則的話嘗試設置尾節點,失敗的話重新循環
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

  可以看出正常入隊比快速入隊也就多出來了自旋和初始化操作,其他的大致邏輯都是相似的。再看看acquire中的另一個方法acquireQueued

final boolean acquireQueued(final Node node, int arg) {
    // 默認獲取失敗
    boolean failed = true;
    try {
        /*
         * 線程打斷標識,我們知道使用interrupt()方法只是改變了線程中的打斷標識變量,
         * 並不能打斷正在運行的線程,而對於這個打斷變量的處理一般有兩種方式,
         * 一種是記錄下來,一種是拋出異常,這里選擇前者,而可打斷的acquire則是選擇后者
         */
        boolean interrupted = false;
        // 自旋
        for (;;) {
            // 拿到前驅節點
            final Node p = node.predecessor();
            // 如果前驅節點為頭節點則嘗試一次獲取
            // 再次強調下,獲取資源的操作是在tryAcquire中
            if (p == head && tryAcquire(arg)) {
                // 設置當前節點為頭節點,然后設置prev節點為null
                setHead(node);
                p.next = null; // help GC
                failed = false;
                // 返回中斷標識
                return interrupted;
            }
            // 獲取資源失敗了,判斷當前線程的節點是否應該休息
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                // 如果是因為中斷被喚醒的,要記錄下來,之后acquire方法要補上中斷
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 看看是否應該去休息這個方法中做了啥
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
    	// 如果前節點狀態為SIGNAL,那么表示可以安興去休息了,到了時候前驅節點會叫醒你的,返回true
        if (ws == Node.SIGNAL)
            return true;
        if (ws > 0) {
            /*
             * 狀態大於0,則表示節點已經取消作廢,那么需要一直往前找直到找到有效的節點
             * 這時還不能去休息,要是前驅節點是頭結點又恰好頭結點釋放了資源,那么你不就
             * 不用掛起就可以拿到資源了,所以返回false,再循環一次
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * 其他情況則表示前驅節點有效,將前驅節點狀態設置尾SIGNAL,表示麻煩他到時候
             * 叫醒你。這里還不可以去休息,因為有可能前驅節點剛好變成了頭結點又剛好執行完
             * 釋放了資源,這時去休息豈不是虧了,所以返回false
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
}

// 如果上面的方法判斷需要休息,那么將線程掛起
private final boolean parkAndCheckInterrupt() {
    	// 使用park方法將線程掛起
        LockSupport.park(this);
    	// 在上面我們提到線程的打斷標識,interrupted()方法返回后會重置這個標識
        return Thread.interrupted();
}

  光看代碼可能有點繞(整個流程可以看下方的流程圖),重新理一下邏輯:

​ 首先明確這個方法是不斷自旋不會退出的,除非成功拿到資源,如果拿不到資源就掛起等待。(不考慮特殊情況)

整個流程的邏輯:

  1. 判斷前驅節點是否為頭節點,如果是頭節點則嘗試獲取資源,成功了返回中斷標識,失敗了進行2。使用前驅節點判斷的原因是因為頭結點 不會進到這個方法來;不是頭結點還要去獲取資源是因為要是在這個過程中剛好頭結點釋放了資源,那么你就不用再去掛起傻傻等待了,節省了系統資源消耗。
  2. 進入shouldParkAfterFailedAcquire方法,這個方法的作用就是判斷你當前這個線程能不能去休息(掛起),而可以去休息的標志就是前驅節點的狀態為SIGNAL,這個狀態代表前驅節點釋放資源后會喚醒你。
    • 1 判斷前驅節點狀態是否為SIGNAL,如果是直接返回true,可以去休息了
    • 2 如果前驅節點狀態>0,表示作廢,那么將一直往前找直到找到一個有效的節點,然后進行連接,這時還不能去休息,要是前驅節點是頭結點呢是吧,所以返回false。也就是在這個階段中清理了同步隊列中那些沒用的節點,因為他們引用斷了,之后GC會回收它們。
    • 3 將前驅節點的狀態設置為SIGNAL,表示你准備去休息了要麻煩他叫醒你,然后先別休息,要是前驅節點這時候變成了頭結點又進行了資源釋放,那就可以省去掛起的操作直接獲取資源了,所以要再循環一次看看,返回false
  3. 根據是否應該休息方法shouldParkAfterFailedAcquire的結果判斷是否把線程掛起,如果返回true那么執行parkAndCheckInterrupt方法把線程掛起,如果是false那么則再循環一次。parkAndCheckInterrupt方法的作用是掛起線程,然后醒來的時候返回是否是因為被中斷而醒來的,如果是的話,那么將interrupted字段賦值為true,在整個acquire方法結束的時候會根據這個標識來決定是否進行線程的自我中斷

再回來看下acquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        // 根據返回的中斷標識決定是否執行下方的自我中斷
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

  整個acquire的流程大致為

img

  獨占式獲取資源的主要方法差不多就是這樣,還有可打斷的獨占式獲取方法acquireInterruptibly,代碼如下,其實現基本相同,只是對於我們方才說的打斷標識的處理從記錄改成了拋出異常,所以才是可打斷的,有興趣可以自己再看下,基本邏輯相同,看起來也就耗費不了多少時間。

public final void acquireInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        // 拋出異常處理
        throw new InterruptedException();
    if (!tryAcquire(arg))
        doAcquireInterruptibly(arg);
}

3.2 獨占式釋放資源——release

  了解完獲取資源自然知道釋放資源的過程,相對來說釋放資源要相對容易一些,大致邏輯為嘗試釋放資源,如果成功了,則改變節點的狀態並且喚醒下一個可用節點(一般是下一個,但是可能出現下一個節點已經被取消的情況)

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            // 修改線程的狀態,並且喚醒下一個節點進行資源競爭
            unparkSuccessor(h);
        return true;
    }
    return false;
}


private void unparkSuccessor(Node node) {
    // 改變節點狀態
    int ws = node.waitStatus;
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    /*
     * 喚醒下一個可用節點,一般來說是下一個節點,但是可能出現下個節點被取消
     * 或者為空的情況,這個時候就要從尾結點向前遍歷直到找到有效的節點(從尾節點向前遍歷
     * 是因為無論下個節點是空還是取消的節點,正向遍歷都不可能走得通了,取消的節點的next
     * 就是其本身,所以只能從后面開始往前遍歷)
     */
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 找到下個節點之后將其喚醒
    if (s != null)
        LockSupport.unpark(s.thread);
}

  release的流程圖如下:

img

3.3 共享模式——acquireShared

  在上面我們講的都是獨占模式的獲取和釋放處理,那接下來看看共享模式是怎么實現的。首先理解AQS中共享模式的概念,其代表資源可以被隊列中的多個節點按照順序獲得,什么意思呢?

  舉個例子,我們設置資源變量為3(state=3),首先頭結點使用tryAcquireShared(1)獲取到了一個資源,那么還剩下2個,這兩個可以給頭結點的后驅節點使用,如果后驅節點的需求是2那么獲取成功並將自己設置為頭結點同時斷開跟原頭結點的連接,但是如果需求是3的話則進入等待狀態直到可獲取的資源量達到其要求為止,這時就算后續的需求量是1也不會給后續節點,這就是按照順序獲得的意思。例子圖如下:

img

  okay,那來看下共享模式下的實現,先看acquireShared方法:判斷資源是否獲取成功,是的話直接結束,不是的話進入隊列進行資源競爭。需要注意的是tryAcquireShared返回值的語義:負值代表失敗,其他代表成功並且當前還可獲取的資源量。

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
}

看看doAcquireShared做了什么

// 還是強調一次,這些方法只是善后處理,資源的獲取還是在tryAcquireShared方法
private void doAcquireShared(int arg) {
    /*
     * 整個流程跟acquire方法有些類似,不同點是其獲取到資源后
     * 會喚醒后驅線程
     */
    
    // 加入隊列尾,不再贅述
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        // 同樣記錄一個打斷標識
        boolean interrupted = false;
        for (;;) {
            // 前驅節點
            final Node p = node.predecessor();
            if (p == head) {
                // 如果前驅節點是頭結點,那么嘗試一次獲取資源,根據其返回的值判斷執行不同操作
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    // 非負值代表資源獲取成功,將自己設為頭結點后喚醒后驅節點爭取資源
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    // 跟acquire不同的是,其補打斷的地方在方法內層,不再放外面
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    // 處理結束后就退出了
                    return;
                }
            }
            // 這里跟acquire一樣,判斷是否可以休息,休息后被喚醒后補充interrupted標識
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

看看獲取資源成功后對后續節點的操作

/**
 * @param node 當前節點
 * @param propagate 當前剩余的資源量
 */
private void setHeadAndPropagate(Node node, int propagate) {
    // 記錄原頭結點
    Node h = head; 
    // 注意這里設置頭結點的變化,這里要結合3.3一開始的例子圖來理解
    /** setHead方法體:
      * head = node;
      * node.thread = null;
      * node.prev = null;
      */
    setHead(node);
    
    // 此時頭結點已經變為當前節點了
    
    /*
     * 存在以下三種情況時喚醒當前節點后驅節點
     * 1.剩余資源量>0
     * 2.node的原前驅節點(即原頭節點)釋放了資源, == null表示釋放完被回收了,<0則表示PROPAGATION
     *   狀態,釋放之后會將節點狀態設置為PROPAGATION
     * 3.頭結點可能再次發生了改變並且也釋放了資源(競爭激烈的時候發生)
     */
    if (propagate > 0 || h == null || h.waitStatus < 0 ||
        (h = head) == null || h.waitStatus < 0) {
        Node s = node.next;
        if (s == null || s.isShared())
            // 叫醒后續節點爭奪資源,這個方法是釋放方法的主要方法,放在下節講
            doReleaseShared();
    }
}

  okay,到這里就是共享模式的acquireShared方法,總結一下邏輯:

  1. 嘗試獲取鎖是否成功,是則結束,否則進入2
  2. 同acquire一樣先來個自旋,判斷前驅節點是否為頭結點,不是的話掛起線程等待喚醒,是的話進入3
  3. 嘗試獲取資源,成功了喚醒后續線程,方法結束;失敗了掛起線程等待喚醒

  線程被喚醒后重復2操作,以下是流程圖:

img

3.4 共享模式——releaseShared

  直接上代碼吧

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        // 這個方法理解為喚醒,不要理解為釋放資源
        doReleaseShared();
        return true;
    }
    return false;
}

看看喚醒方法做了啥子

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 根據節點狀態判斷執行什么操作
            if (ws == Node.SIGNAL) {
                /*
                 * 如果是SIGNAL那么表示其后驅節點處於掛起的狀態
                 * 使用CAS改變狀態后喚醒后驅節點,失敗則再次循環(說明被其他線程先執行了該方法)
                 */
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            
                // 喚醒線程,前面已經說過,不再贅述
                unparkSuccessor(h);
            }
            // 將當前節點設置為PROPAGATE,失敗則再次循環
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                
        }
        // 如果頭節點改變了,說明喚醒操作是其他線程做的,此時要再次循環
        if (h == head)                   
            break;
    }
}

  共享模式的release方法在我們看過之前的方法后就簡單得多了,這里就不再畫流程圖了,到此AQS的兩個模式和實現暫時告一段落。

總結

  整篇文章可能有些長,先講了AQS內部的一些結構,然后使用AQS實現了簡易的不可重入鎖,接着接下來將AQS的兩個模式和實現。

  兩個模式的實現思路大致是相同的,但是方式不同,獨占模式每次只允許一個節點獲取到資源,而共享模式則允許多個節點按照順序獲取;雙方釋放后的善后操作也不同,獨占模式只喚醒后驅節點,而共享模式則可能喚醒后驅的后驅(資源充足的情況)。

沖!沖!沖!

參考:https://www.cnblogs.com/waterystone/p/4920797.html

https://snailclimb.gitee.io/javaguide/#/docs/java/Multithread/AQS


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM