Java並發包源碼學習之AQS框架(四)AbstractQueuedSynchronizer源碼分析


經過前面幾篇文章的鋪墊,今天我們終於要看看AQS的廬山真面目了,建議第一次看AbstractQueuedSynchronizer 類源碼的朋友可以先看下我前面幾篇文章:

分析源碼是非常枯燥乏味的一件事,其實代碼本身其實就是最好的說明了,因此基本都是貼出一些代碼加上一些注釋, 因為AbstractQueuedSynchronizer上千行代碼不可能也不需要每行都要分析,所以只撿一些關鍵的地方或 比較難理解的地方做說明,有一些地方可能我理解的有出入,歡迎大家指正。 詳細的注釋我都放在了GitHub上

前面提到AQS是基於CLH lock queue的,AbstractQueuedSynchronizer是通過一個內部類Node實現了一個變種。 前面基本說明了Node的主要內容,但這個類還有一些其他重要的字段:

//標記當前結點是共享模式
static final Node SHARED = new Node();

//標記當前結點是獨占模式
static final Node EXCLUSIVE = null;

//結點的等待狀態。
volatile int waitStatus;

//擁有當前結點的線程。
volatile Thread thread;

 其中waitStatus很重要,用來控制線程的阻塞/喚醒,以及避免不必要的調用LockSupport的park/unpark方法。 它主要有以下幾個取值:

//代表線程已經被取消
static final int CANCELLED = 1;

//代表后續節點需要喚醒
static final int SIGNAL = -1;

//代表線程在condition queue中,等待某一條件
static final int CONDITION = -2;

//代表后續結點會傳播喚醒的操作,共享模式下起作用
static final int PROPAGATE = -3;

 

出隊操作

只要設置新的head結點就可以了。

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

 

入隊操作

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 這個if分支其實是一種優化:CAS操作失敗的話才進入enq中的循環。
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
} 

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

 

獨占模式獲取

public final void acquire(int arg) {
    // tryAcquire 由子類實現本身不會阻塞線程,如果返回 true,則線程繼續,
    // 如果返回 false 那么就 
    加入阻塞隊列阻塞線程,並等待前繼結點釋放鎖。
    if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // acquireQueued返回true,說明當前線程被中斷喚醒后獲取到鎖,
        // 重置其interrupt status為true。
        selfInterrupt();
}

 一旦tryAcquire成功則立即返回,否則線程會加入隊列 線程可能會反復的被阻塞和喚醒直到tryAcquire成功,這是因為線程可能被中斷, 而acquireQueued方法中會保證忽視中斷,只有tryAcquire成功了才返回。中斷版本的獨占獲取是acquireInterruptibly這個方法, doAcquireInterruptibly這個方法中如果線程被中斷則acquireInterruptibly會拋出InterruptedException異常。

 

addWaiter方法只是入隊操作,acquireQueued方法是主要邏輯,需要重點理解。

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 等待前繼結點釋放鎖
        // 自旋re-check
        for (;;) {
            // 獲取前繼
            final Node p = node.predecessor();
            // 前繼是head,說明next就是node了,則嘗試獲取鎖。
            if (p == head && tryAcquire(arg)) {
                // 前繼出隊,node成為head
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }

            // p != head 或者 p == head但是tryAcquire失敗了,那么
            // 應該阻塞當前線程等待前繼喚醒。阻塞之前會再重試一次,還需要設置前繼的waitStaus為SIGNAL。
            
        // 線程會阻塞在parkAndCheckInterrupt方法中。
            // parkAndCheckInterrupt返回可能是前繼unpark或線程被中斷。
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                // 說明當前線程是被中斷喚醒的。
                // 
                注意:線程被中斷之后會繼續走到if處去判斷,也就是會忽視中斷。
                // 除非碰巧線程中斷后acquire成功了,那么根據Java的最佳實踐,
                // 需要重新設置線程的中斷狀態(acquire.selfInterrupt)。
                interrupted = true;
        }
    }
    finally {
        // 出現異常
        if (failed)
            cancelAcquire(node);
    }
}

基本每行都有注釋,但得結合shouldParkAfterFailedAcquireparkAndCheckInterrupt這兩個方法來一起理解會更 容易些。shouldParkAfterFailedAcquire方法的作用是:

  • 確定后繼是否需要park;
  • 跳過被取消的結點;
  • 設置前繼的waitStatus為SIGNAL.
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)// 前繼結點已經准備好unpark其后繼了,所以后繼可以安全的park
    /*
     * This node has already set status asking a release to signal it,
     * so it can safely park.
     */
    return true;
if (ws > 0) {// CANCELLED
    // 跳過被取消的結點。
    do {
        node.prev = pred = pred.prev;
    } while (pred.waitStatus > 0);
    pred.next = node;
} else {// 0 或 PROPAGATE (CONDITION用在ConditonObject,這里不會是這個值)
    /**
     * waitStatus 等於0(初始化)或PROPAGATE。說明線程還沒有park,會先重試 
     * 確定無法acquire到再park。
     */

    // 更新pred結點waitStatus為SIGNAL
    compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;

parkAndCheckInterrupt就是用LockSupport來阻塞當前線程,很簡單:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}

線程被喚醒只可能是:被unpark,被中斷或偽喚醒。被中斷會設置interrupted,acquire方法返回前會 selfInterrupt重置下線程的中斷狀態,如果是偽喚醒的話會for循環re-check。

 

獨占模式釋放

比較簡單只要直接喚醒后續結點就可以了,后續結點會從parkAndCheckInterrupt方法中返回。

public final boolean release(int arg) {
    // tryReease由子類實現,通過設置state值來達到同步的效果。
    if (tryRelease(arg)) {
        Node h = head;
        // waitStatus為0說明是初始化的空隊列
        if (h != null && h.waitStatus != 0)
            // 喚醒后續的結點
            unparkSuccessor(h);
        return true;
    }
    return false;
}

 

共享模式獲取

acquireShared方法是用來共享模式獲取。

public final void acquireShared(int arg) {
    //如果沒有許可了則入隊等待
    if (tryAcquireShared(arg) < 0)
        doAcquireShared(arg);
} 

private void doAcquireShared(int arg) {
    // 添加隊列
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        boolean interrupted = false;
        // 等待前繼釋放並傳遞
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);// 嘗試獲取
                if (r >= 0) {
                    // 獲取成功則前繼出隊,跟獨占不同的是
                    // 會往后面結點傳播喚醒的操作,保證剩下等待的線程能夠盡快 獲取到剩下的許可。
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    if (interrupted)
                        selfInterrupt();
                    failed = false;
                    return;
                }
            }

            // p != head || r < 0
            if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                interrupted = true;
        }
    }
    finally {
        if (failed)
            cancelAcquire(node);
    }
}

核心是這個doAcquireShared方法,跟獨占模式的acquireQueued很像,主要區別在setHeadAndPropagate方法中, 這個方法會將node設置為head。如果當前結點acquire到了之后發現還有許可可以被獲取,則繼續釋放自己的后繼, 后繼會將這個操作傳遞下去。這就是PROPAGATE狀態的含義。

private void setHeadAndPropagate(Node node, int propagate) {
    Node h = head; // Record old head for check below
    setHead(node);
    /*
     * 嘗試喚醒后繼的結點:<br />
     * propagate > 0說明許可還有能夠繼續被線程acquire;<br />
     * 或者 之前的head被設置為PROPAGATE(PROPAGATE可以被轉換為SIGNAL)說明需要往后傳遞;<br />
     * 或者為null,我們還不確定什么情況。 <br />
     * 並且 后繼結點是共享模式或者為如上為null。
     * <p>
     * 上面的檢查有點保守,在有多個線程競爭獲取/釋放的時候可能會導致不必要的喚醒。<br />
     * 
     */
    if (propagate > 0 || h == null || h.waitStatus < 0) {
        Node s = node.next;
        // 后繼結是共享模式或者s == null(不知道什么情況)
        // 如果后繼是獨占模式,那么即使剩下的許可大於0也不會繼續往后傳遞喚醒操作
        // 即使后面有結點是共享模式。
        if (s == null || s.isShared())
            // 喚醒后繼結點
            doReleaseShared();
    }
} 

private void doReleaseShared() {
    for (;;) {
        Node h = head;
        // 隊列不為空且有后繼結點
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            // 不管是共享還是獨占只有結點狀態為SIGNAL才嘗試喚醒后繼結點
            if (ws == Node.SIGNAL) {
                // 將waitStatus設置為0
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue; // loop to recheck cases
                unparkSuccessor(h);// 喚醒后繼結點
                // 如果狀態為0則更新狀態為PROPAGATE,更新失敗則重試
            } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue; // loop on failed CAS
        }
        // 如果過程中head被修改了則重試。
        if (h == head) // loop if head changed
            break;
    }
}

 

共享模式釋放

主要邏輯也就會doReleaseShared

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

獨占和共享模式除了對應的中斷版本,還有超時版本,整體代碼相差不大,具體再贅述了。提前前面文章 中提到的自旋,好像目前整個AQS中都沒用到這個功能,accquire中for循環主要作用不是為了自旋,那么 它用在什么地方呢?AQS中有一個變量:

static final long spinForTimeoutThreshold = 1000L;

這個變量用在doAcquireNanos方法,也就是支持超時的獲取版本。

private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException {
    long lastTime = System.nanoTime();
    final Node node = addWaiter(Node.EXCLUSIVE);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return true;
            }
            if (nanosTimeout <= 0)// 超時
                return false;
            // nanosTimeout > spinForTimeoutThreshold
            // 如果超時時間很短的話,自旋效率會更高。
            if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold)
                LockSupport.parkNanos(this, nanosTimeout);
            long now = System.nanoTime();
            nanosTimeout -= now - lastTime;
            lastTime = now;
            if (Thread.interrupted())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

 

AQS的的主要內容其實差不多看完了,但是上面的邏輯中waitStatus中有一個狀態還沒涉及到那就是CONDITION, 下一篇博客《Java並發包源碼學習之AQS框架(五)ConditionObject源碼分析》中會介紹它。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM