Java顯式鎖學習總結之六:Condition源碼分析


概述

先來回顧一下java中的等待/通知機制

我們有時會遇到這樣的場景:線程A執行到某個點的時候,因為某個條件condition不滿足,需要線程A暫停;等到線程B修改了條件condition,使condition滿足了線程A的要求時,A再繼續執行。

自旋實現的等待通知

最簡單的實現方法就是將condition設為一個volatile的變量,當A線程檢測到條件不滿足時就自旋,類似下面:

public class Test {
    private static volatile int condition = 0;

    public static void main(String[] args) throws InterruptedException {
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!(condition == 1)) {
                    // 條件不滿足,自旋
                }
                System.out.println("a executed");
            }
        });

        A.start();
        Thread.sleep(2000);
        condition = 1;
    }

}

這種方式的問題在於自旋非常耗費CPU資源,當然如果在自旋的代碼塊里加入Thread.sleep(time)將會減輕CPU資源的消耗,但是如果time設的太大,A線程就不能及時響應condition的變化,如果設的太小,依然會造成CPU的消耗。

Object提供的等待通知

因此,java在Object類里提供了wait()和notify()方法,使用方法如下:

class Test1 {
    private static volatile int condition = 0;
    private static final Object lock = new Object();

    public static void main(String[] args) throws InterruptedException {
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronized (lock) {
                    while (!(condition == 1)) {
                        try {
                            lock.wait();
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    }
                    System.out.println("a executed by notify");
                }
            }
        });
        A.start();
        Thread.sleep(2000);
        condition = 1;
        synchronized (lock) {
            lock.notify();
        }
    }
}

通過代碼可以看出,在使用一個對象的wait()、notify()方法前必須要獲取這個對象的鎖。

當線程A調用了lock對象的wait()方法后,線程A將釋放持有的lock對象的鎖,然后將自己掛起,直到有其他線程調用notify()/notifyAll()方法或被中斷。可以看到在lock.wait()前面檢測condition條件的時候使用了一個while循環而不是if,那是因為當有其他線程把condition修改為滿足A線程的要求並調用notify()后,A線程會重新等待獲取鎖,獲取到鎖后才從lock.wait()方法返回,而在A線程等待鎖的過程中,condition是有可能再次變化的。

因為wait()、notify()是和synchronized配合使用的,因此如果使用了顯示鎖Lock,就不能用了。所以顯示鎖要提供自己的等待/通知機制,Condition應運而生。

顯示鎖提供的等待通知

我們用Condition實現上面的例子:

class Test2 {
    private static volatile int condition = 0;
    private static Lock lock = new ReentrantLock();
    private static Condition lockCondition = lock.newCondition();

    public static void main(String[] args) throws InterruptedException {
        Thread A = new Thread(new Runnable() {
            @Override
            public void run() {
                lock.lock();
                try {
                    while (!(condition == 1)) {
                        lockCondition.await();
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                } finally {
                    lock.unlock();
                }
                System.out.println("a executed by condition");
            }
        });
        A.start();
        Thread.sleep(2000);
        condition = 1;
        lock.lock();
        try {
            lockCondition.signal();
        } finally {
            lock.unlock();
        }
    }
}

可以看到通過 lock.newCondition() 可以獲得到 lock 對應的一個Condition對象lockCondition ,lockCondition的await()、signal()方法分別對應之前的Object的wait()和notify()方法。整體上和Object的等待通知是類似的。

應用舉例

上面我們看到了Condition實現的等待通知和Object的等待通知是非常類似的,而Condition提供的等待通知功能更強大,最重要的一點是,一個lock對象可以通過多次調用 lock.newCondition() 獲取多個Condition對象,也就是說,在一個lock對象上,可以有多個等待隊列,而Object的等待通知在一個Object上,只能有一個等待隊列。用下面的例子說明,下面的代碼實現了一個阻塞隊列,當隊列已滿時,add操作被阻塞有其他線程通過remove方法刪除元素;當隊列已空時,remove操作被阻塞直到有其他線程通過add方法添加元素。

public class BoundedQueue1<T> {
    public List<T> q; //這個列表用來存隊列的元素
    private int maxSize; //隊列的最大長度
    private Lock lock = new ReentrantLock();
    private Condition addConditoin = lock.newCondition();
    private Condition removeConditoin = lock.newCondition();

    public BoundedQueue1(int size) {
        q = new ArrayList<>(size);
        maxSize = size;
    }

    public void add(T e) {
        lock.lock();
        try {
            while (q.size() == maxSize) {
                addConditoin.await();
            }
            q.add(e);
            removeConditoin.signal(); //執行了添加操作后喚醒因隊列空被阻塞的刪除操作
        } catch (InterruptedException e1) {
            Thread.currentThread().interrupt();
        } finally {
            lock.unlock();
        }
    }

    public T remove() {
        lock.lock();
        try {
            while (q.size() == 0) {
                removeConditoin.await();
            }
            T e = q.remove(0);
            addConditoin.signal(); //執行刪除操作后喚醒因隊列滿而被阻塞的添加操作
            return e;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        } finally {
            lock.unlock();
        }
    }

}

 

源碼分析

下面來分析Condition源碼

概述

之前我們介紹AQS的時候說過,AQS的同步排隊用了一個隱式的雙向隊列,同步隊列的每個節點是一個AbstractQueuedSynchronizer.Node實例。

Node的主要字段有:

  1. waitStatus:等待狀態,所有的狀態見下面的表格。
  2. prev:前驅節點
  3. next:后繼節點
  4. thread:當前節點代表的線程
  5. nextWaiter:Node既可以作為同步隊列節點使用,也可以作為Condition的等待隊列節點使用(將會在后面講Condition時講到)。在作為同步隊列節點時,nextWaiter可能有兩個值:EXCLUSIVE、SHARED標識當前節點是獨占模式還是共享模式;在作為等待隊列節點使用時,nextWaiter保存后繼節點。
狀態 含義
CANCELLED 1 當前節點因為超時或中斷被取消同步狀態獲取,該節點進入該狀態不會再變化
SIGNAL -1 標識后繼的節點處於阻塞狀態,當前節點在釋放同步狀態或被取消時,需要通知后繼節點繼續運行。每個節點在阻塞前,需要標記其前驅節點的狀態為SIGNAL。
CONDITION -2 標識當前節點是作為等待隊列節點使用的。
PROPAGATE -3  
0 0 初始狀態

Condition實現等待的時候內部也有一個等待隊列,等待隊列是一個隱式的單向隊列,等待隊列中的每一個節點也是一個AbstractQueuedSynchronizer.Node實例。

每個Condition對象中保存了firstWaiter和lastWaiter作為隊列首節點和尾節點,每個節點使用Node.nextWaiter保存下一個節點的引用,因此等待隊列是一個單向隊列。

每當一個線程調用Condition.await()方法,那么該線程會釋放鎖,構造成一個Node節點加入到等待隊列的隊尾。

等待

Condition.await()方法的源碼如下:

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter(); //構造一個新的等待隊列Node加入到隊尾
            int savedState = fullyRelease(node); //釋放當前線程的獨占鎖,不管重入幾次,都把state釋放為0
            int interruptMode = 0;
//如果當前節點沒有在同步隊列上,即還沒有被signal,則將當前線程阻塞
while (!isOnSyncQueue(node)) { LockSupport.park(this);
//后面的藍色代碼都是和中斷相關的,主要是區分兩種中斷:是在被signal前中斷還是在被signal后中斷,如果是被signal前就被中斷則拋出 InterruptedException,否則執行 Thread.currentThread().interrupt();
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中斷則直接退出自旋 break; }
//退出了上面自旋說明當前節點已經在同步隊列上,但是當前節點不一定在同步隊列隊首。acquireQueued將阻塞直到當前節點成為隊首,即當前線程獲得了鎖。然后await()方法就可以退出了,讓線程繼續執行await()后的代碼。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }


final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } } final boolean isOnSyncQueue(Node node) {
//如果當前節點狀態是CONDITION或node.prev是null,則證明當前節點在等待隊列上而不是同步隊列上。之所以可以用node.prev來判斷,是因為一個節點如果要加入同步隊列,在加入前就會設置好prev字段。
if (node.waitStatus == Node.CONDITION || node.prev == null) return false;
//如果node.next不為null,則一定在同步隊列上,因為node.next是在節點加入同步隊列后設置的
if (node.next != null) // If has successor, it must be on queue return true; return findNodeFromTail(node); //前面的兩個判斷沒有返回的話,就從同步隊列隊尾遍歷一個一個看是不是當前節點。 } private boolean findNodeFromTail(Node node) { Node t = tail; for (;;) { if (t == node) return true; if (t == null) return false; t = t.prev; } } 
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

 

通知

Condition.signal() 方法的源碼如下:

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException(); //如果同步狀態不是被當前線程獨占,直接拋出異常。從這里也能看出來,Condition只能配合獨占類同步組件使用。
            Node first = firstWaiter;
            if (first != null)
                doSignal(first); //通知等待隊列隊首的節點。
        }
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&   //transferForSignal方法嘗試喚醒當前節點,如果喚醒失敗,則繼續嘗試喚醒當前節點的后繼節點。
                     (first = firstWaiter) != null);
        }

    final boolean transferForSignal(Node node) {
        //如果當前節點狀態為CONDITION,則將狀態改為0准備加入同步隊列;如果當前狀態不為CONDITION,說明該節點等待已被中斷,則該方法返回false,doSignal()方法會繼續嘗試喚醒當前節點的后繼節點
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);  //將節點加入同步隊列,返回的p是節點在同步隊列中的先驅節點
        int ws = p.waitStatus;
//如果先驅節點的狀態為CANCELLED(>0) 或設置先驅節點的狀態為SIGNAL失敗,那么就立即喚醒當前節點對應的線程,線程被喚醒后會執行acquireQueued方法,該方法會重新嘗試將節點的先驅狀態設為SIGNAL並再次park線程;如果當前設置前驅節點狀態為SIGNAL成功,那么就不需要馬上喚醒線程了,當它的前驅節點成為同步隊列的首節點且釋放同步狀態后,會自動喚醒它。
//其實筆者認為這里不加這個判斷條件應該也是可以的。只是對於CAS修改前驅節點狀態為SIGNAL成功這種情況來說,如果不加這個判斷條件,提前喚醒了線程,等進入acquireQueued方法了節點發現自己的前驅不是首節點,還要再阻塞,等到其前驅節點成為首節點並釋放鎖時再喚醒一次;而如果加了這個條件,線程被喚醒的時候它的前驅節點肯定是首節點了,線程就有機會直接獲取同步狀態從而避免二次阻塞,節省了硬件資源。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; }

Condition等待通知的本質

總的來說,Condition的本質就是等待隊列和同步隊列的交互:

當一個持有鎖的線程調用Condition.await()時,它會執行以下步驟:

  1. 構造一個新的等待隊列節點加入到等待隊列隊尾
  2. 釋放鎖,也就是將它的同步隊列節點從同步隊列隊首移除
  3. 自旋,直到它在等待隊列上的節點移動到了同步隊列(通過其他線程調用signal())或被中斷
  4. 阻塞當前節點,直到它獲取到了鎖,也就是它在同步隊列上的節點排隊排到了隊首。

當一個持有鎖的線程調用Condition.signal()時,它會執行以下操作:

從等待隊列的隊首開始,嘗試對隊首節點執行喚醒操作;如果節點CANCELLED,就嘗試喚醒下一個節點;如果再CANCELLED則繼續迭代。

對每個節點執行喚醒操作時,首先將節點加入同步隊列,此時await()操作的步驟3的解鎖條件就已經開啟了。然后分兩種情況討論:

  1. 如果先驅節點的狀態為CANCELLED(>0) 或設置先驅節點的狀態為SIGNAL失敗,那么就立即喚醒當前節點對應的線程,此時await()方法就會完成步驟3,進入步驟4.
  2. 如果成功把先驅節點的狀態設置為了SIGNAL,那么就不立即喚醒了。等到先驅節點成為同步隊列首節點並釋放了同步狀態后,會自動喚醒當前節點對應線程的,這時候await()的步驟3才執行完成,而且有很大概率快速完成步驟4.

總結  

如果知道Object的等待通知機制,Condition的使用是比較容易掌握的,因為和Object等待通知的使用基本一致。

對Condition的源碼理解,主要就是理解等待隊列,等待隊列可以類比同步隊列,而且等待隊列比同步隊列要簡單,因為等待隊列是單向隊列,同步隊列是雙向隊列。

以下是筆者對等待隊列是單向隊列、同步隊列是雙向隊列的一些思考,歡迎提出不同意見:

之所以同步隊列要設計成雙向的,是因為在同步隊列中,節點喚醒是接力式的,由每一個節點喚醒它的下一個節點,如果是由next指針獲取下一個節點,是有可能獲取失敗的,因為虛擬隊列每添加一個節點,是先用CAS把tail設置為新節點,然后才修改原tail的next指針到新節點的。因此用next向后遍歷是不安全的,但是如果在設置新節點為tail前,為新節點設置prev,則可以保證從tail往前遍歷是安全的。因此要安全的獲取一個節點Node的下一個節點,先要看next是不是null,如果是null,還要從tail往前遍歷看看能不能遍歷到Node。

而等待隊列就簡單多了,等待的線程就是等待者,只負責等待,喚醒的線程就是喚醒者,只負責喚醒,因此每次要執行喚醒操作的時候,直接喚醒等待隊列的首節點就行了。等待隊列的實現中不需要遍歷隊列,因此也不需要prev指針。

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM