Java顯式鎖學習總結之三:AbstractQueuedSynchronizer的實現原理


概述

上一篇我們講了AQS的使用,這一篇講AQS的內部實現原理。

我們前面介紹了,AQS使用一個int變量state表示同步狀態,使用一個隱式的FIFO同步隊列(隱式隊列就是並沒有聲明這樣一個隊列,只是通過每個節點記錄它的上個節點和下個節點來從邏輯上產生一個隊列)來完成阻塞線程的排隊。

這里FIFO隊列的節點在AQS中被定義為一個內部類Node,Node的主要字段有:

  1. waitStatus:等待狀態,所有的狀態見下面的表格。
  2. prev:前驅節點
  3. next:后繼節點
  4. thread:當前節點代表的線程
  5. nextWaiter:Node既可以作為同步隊列節點使用,也可以作為Condition的等待隊列節點使用(將會在后面講Condition時講到)。在作為同步隊列節點時,nextWaiter可能有兩個值:EXCLUSIVE、SHARED標識當前節點是獨占模式還是共享模式;在作為等待隊列節點使用時,nextWaiter保存后繼節點。
狀態 含義
CANCELLED 1 當前節點因為超時或中斷被取消同步狀態獲取,該節點進入該狀態不會再變化
SIGNAL -1 標識后繼的節點處於阻塞狀態,當前節點在釋放同步狀態或被取消時,需要通知后繼節點繼續運行。每個節點在阻塞前,需要標記其前驅節點的狀態為SIGNAL。
CONDITION -2 標識當前節點是作為等待隊列節點使用的。
PROPAGATE -3  
0 0 初始狀態

隊列擁有首節點和尾節點,這兩個節點分別保存於AQS的兩個字段:head、tail。

同步隊列的基本結構:

當一個線程想要獲得同步狀態的時候,如果當前有其他線程持有同步狀態,當前線程將無法獲取,轉而被構造為一個Node添加到同步隊列的尾部,而這個加入的過程必須保證線程安全,因此同步器提供了一個基於CAS的設置隊尾的方法:compareAndSetTail(Node expect, Node update),它需要傳遞當前線程"認為"的隊尾。在一個Node被CAS設置為隊列之前,這個Node的prev已經被設置為之前的尾節點,而在這個Node被設置為隊尾之后,之前尾節點的next才會被指向這個Node。因此在任一時刻,從head向后遍歷隊列不一定能遍歷到tail,因為最后的tail可能還沒有被倒數第二個節點指為next,但是從tail向head遍歷一定能遍歷head。記住這個結論之后會用到。

在隊列中,首節點是當前獲取同步狀態成功的節點,首節點在釋放同步狀態時,會喚醒后繼節點,而后繼節點會在自己獲取同步狀態成功時,將自己設置為首節點。因為設置首節點是由持有同步狀態的線程來完成的,因此不需要使用CAS來保證線程安全,只需要持有同步狀態的線程將首節點設置為原首節點的后繼節點並斷開原首節點的next引用即可。

獨占式同步狀態獲取與釋放

獲取

獲取獨占同步狀態使用方法acquire(int arg):

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

其中tryAcquire(arg)方法是我們繼承AQS的子類重寫的方法,可以看到,如果tryAcquire返回了true代表獲取鎖成功,則acquire方法立即返回。如果tryAcquire返回了false我們知道AQS應該完成的操作是阻塞當前線程直到當前線程獲取到同步狀態。可以看出會依次調用方法addWaiter、acquireQueued。

先來看addWaiter:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode); // 將當前線程構造為Node,mode傳入值為Node.EXCLUSIVE,將保存在Node的nextWaiter字段,標識當前節點為獨占模式
        // 如果當前尾節點不為空,嘗試設置當前節點為尾節點,這塊是完整設置尾節點的一個簡單實現,如果這個能成功,不用調用完整設置尾節點的方法enq了,如果失敗,則調用enq方法。
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
   // 完整的設置尾節點方法,如果當前節點不為空,則把當前節點設為尾節點,並將原尾節點next指向當前節點;如果當前尾節點為空,即當前同步隊列為空,則新建一個傀儡節點作為首節點和尾節點,然后再將當前節點設為尾節點。
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

然后看acquireQueued:

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
   //如果當前節點的前驅是頭節點,說明即將輪到自己獲得同步狀態,再次調用tryAcquire檢查是否能獲取到同步狀態(這里之所以要再次檢查,有兩個原因:
一是因為盡管當前節點排到首節點后面,而且已經被首節點喚醒,但是首節點在喚醒當前節點后,並不是馬上釋放同步狀態;
二是因為如果此時有新的線程第一次嘗試獲取同步狀態正好趕在首節點釋放同步狀態,那么新的線程可能直接就不排隊了直接獲取到同步狀態。)
if (p == head && tryAcquire(arg)) { setHead(node); //獲取到同步狀態則將自己設為頭節點 p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } } private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus;
//如果前驅節點已經是SIGNAL狀態則前驅節點執行完成后會喚醒當前節點
if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true;
//前驅節點狀態為CANCELLED,則繼續查找前驅節點的前驅節點,因為當首節點喚醒時,會跳過CANCELLED節點
if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node;
//如果是0或PROPAGATE狀態,則用CAS設置為SIGNAL }
else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; } //該方法在 shouldParkAfterFailedAcquire 方法返回true后執行,shouldParkAfterFailedAcquire 方法返回true代表前驅節點已經被設置為SIGNAL狀態,
因此當前節點可以阻塞等待喚醒了,使用LockSupport.park(this)方法來阻塞。這個方法會一直阻塞直到首節點喚醒當前節點或當前節點被中斷,如果是被中斷,中斷標識將會被一直往上層方法傳,最終acquire方法會執行selfInterrupt。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }

釋放

釋放同步狀態通過方法release(int arg):

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

tryRelease方法是我們自己在AQS子類中重寫的方法,可以看到release方法的邏輯比較簡單,如果tryRelease方法返回false,那么release方法直接返回false;如果tryRelease方法返回true則通過unparkSuccessor方法喚醒后繼節點:

    private void unparkSuccessor(Node node) {
//如果頭節點的狀態是負值,將其歸0.如果失敗了也ok。
int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); //通常要喚醒的節點就是頭節點的直接后繼節點,但是如果直接后繼節點是null或狀態為CANCELLED,則從tail向前遍歷取離head最近的一個非CANCELLED狀態的節點。這里之所以要從tail向前遍歷,前面說過原因:最后的tail節點在構造的時候在某時刻可能只有其向前一個節點的prev引用,而沒有前一個節點向它的next引用。 Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); // 喚醒下一個節點。這里要注意理論上可能頭節點喚醒下一個節點的時候,下一個節點還沒有通過park方法阻塞,而LockSupport方法在這種情況的表現是:如果先調用了unpark方法,那么之后調用park時將不會阻塞。因此在這種情況下也不會有什么問題。 }

 

共享式同步狀態獲取與釋放

共享式同步狀態獲取/釋放和獨占式狀態獲取/釋放大體一致,只是加了釋放傳播:

我們舉個例子,比如我們要定義一個類似於Semaphore的同步組件,支持n個線程可以同時獲取同步狀態,超過n時則阻塞,假如AQS沒有給我們提供共享式的tryAcquireShared和tryReleaseShared方法,我們試圖用獨占式方法來實現這個組件,那么我們可能會這樣重寫tryAcquire和tryRelease(只貼出AQS的子類實現,其他代碼略):

    private static class SemaphoreSynchronizer extends AbstractQueuedSynchronizer {
        public SemaphoreSynchronizer(int arg) {
            setState(arg);  //用state表示當前可用許可數
        }

        @Override
        protected boolean tryAcquire(int arg) {
            for (;;) {
                int state = getState();
                int newState = state - 1;   //許可數-1

                //如果已經沒有許可可用,則返回false
                if (newState < 0) {
                    return false;
                }
             
                //如果有許可可用而且CAS成功,則返回true,否則循環重新判斷是否有許可可用
                if (compareAndSetState(state, newState)) {
                    return true;
                }
            }

        }

        @Override
        protected boolean tryRelease(int arg) {
            for (;;) {
                int current = getState();
                int newCount = current + 1;  //釋放成功則許可數+1
                //如果釋放成功返回true,否則循環重新釋放
                if (compareAndSetState(current, newCount)) {
                    return true;
                }
            }
        }

        protected Condition newCondition() {
            return new ConditionObject();
        }

    }

這樣實現的話仔細想就會發現有問題:

比如許可數設為3,當前正有t1、t2、t3這三個線程在運行,然后來了兩個線程t4、t5被阻塞了,因為t1、t2、t3是並發運行,因此假設t1和t2同時釋放許可,獨占式釋放同步狀態代碼如下:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

這兩個線程在並發條件下,Node h=head這句可能h同時指向t4,然后t4被喚醒2次,最終的結果就是t3和t4在執行,而t5在被阻塞,盡管有效許可數是3。

為了避免這個問題,需要在共享式同步狀態的釋放和獲取處都做一些工作。

釋放

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

    private void doReleaseShared() {
        for (;;) {
            Node h = head; //獲取首節點
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {  //如果首節點的狀態是SIGNAL,則CAS修改SIGNAL為0,如果成功就喚醒后繼節點,失敗則重新獲取首節點
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&   //如果首節點狀態是0,則將狀態改為PROPAGATE(傳播狀態)
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            if (h == head)                   // 這個是外面一層for循環的終止條件,外面一層循環的意義在於如果首節點在以上操作中發生了變化,那么可能有其他節點已經喚醒了之前獲取的首節點的后繼節點,於是當前線程要獲取新的首節點的后繼節點。
                break;
        }
    }

獲取

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }


    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);
       
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

和獨占式獲取相比,主要區別在於setHeadAndPropagate方法:

一個節點在獲取了同步狀態后,不僅把自己設置為頭節點,而且如果當前同步狀態>0||原head為null||原head的狀態<0||當前head為null||當前狀態<0,且下一個節點的類型為null(類型未知)||下一個節點類型為shared,則繼續喚醒下一個節點。

注:節點狀態<0意味着是SIGNAL或PROPAGATE。

 

總結

AQS的實現中,獨占式狀態獲取與釋放比較簡單容易理解,共享式狀態獲取與釋放比較復雜。不過就實際應用而言,相信我們也不需要了解里面的所有細節,只需要理解原理即可。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM