condition實現原理


  condition是對線程進行控制管理的接口,具體實現是AQS的一個內部類ConditionObject,主要功能是控制線程的啟/停(這么說並不嚴格,還要有鎖的競爭排隊)。
condition主要方法:
 
void await() throws InterruptedException
進入等待,直到被通知或中斷
void awaitUninterruptibly()
進入等待,直到被通知,不響應中斷
long awaitNanos(long nanosTimeout) throws InterruptedException
等待xxx納秒
boolean awaitUntil(Date deadline) throws InterruptedException
等待,直到某個時間點
void signal()
喚醒
void signalAll()
喚醒所有等待在condition上的線程,會從等待隊列挨個signal()
使用示例:
   通過實現一個有界隊列來深入理解Condition的使用方式。有界隊列:一種特殊的隊列,當隊列為空時,隊列的獲取操作將會阻塞獲取線程,直到隊列中有新增元素,當隊列已滿時,隊列的插入操作將會阻塞插入線程,
,直到隊列出現空位。
public class BoundedQueue<T> {
    private Object[] items;
    //添加的下標,刪除的下標和數組當前數量
    private int addIndex, removeIndex, count;
    private Lock lock = new ReentrantLock();
    private Condition empty = lock.newCondition();
    private Condition full = lock.newCondition();
    //構造方法
    public BoundedQueue(int size){
        items = new Object[size];
    }
    //添加元素,如果數組滿,則添加線程進入等待,直到有空位
    public void add(T t) throws InterruptedException{
        lock.lock();
        try {
            while (count == items.length)  //改成if會如何
                full.await();
            items[addIndex] = t;
            if(++addIndex == items.length)
                addIndex = 0;
            ++count;
            empty.signal();
        }finally {
            lock.unlock();
        }
    }

    //從頭部刪除一個元素,如果數組空,則刪除線程進入等待狀態,直到添加新元素
    public T remove() throws InterruptedException{
        lock.lock();
        try{
            while (count == 0)
                empty.await();
            Object x = items[removeIndex];
            if(++removeIndex == items.length)
                removeIndex = 0;
            --count;
            full.signal();
            return (T)x;
        }finally {
            lock.unlock();
        }
    }
}
  在這里,阻塞隊列的數據存放是在items數組中,注意幾個下標的賦值操作,當addIndex到頭的時候,因為之前可能有remove操作,故items數組的頭部或者中間位置可能是空的,如果繼續添加數據,數據應添加在頭部,相當於形成了一個“環”,這就是19-20行的含義。至於16行代碼中的while替換成if,書中給出的解釋是:使用while目的是為了防止過早或意外的通知,只有條件符合才能退出循環。這個地方沒有想出相應的場景,僅從目前的代碼邏輯來說,換成if也是可以的,但如果考慮異常導致等待線程被喚醒,那阻塞隊列就無法正常工作了。
原理分析:
  ConditionObject是同步器AQS的內部類,因為Condition的操作需要獲取相關聯的鎖,所以作為同步器的內部類也較為合理。每個Condition對象都包含着一個隊列(等待隊列),該隊列是condition對象實現等待/通知的功能的關鍵。
   等待隊列:
  等待隊列是一個FIFO的隊列,在隊列中的每個節點都包含了一個線程引用,該線程就是在Condition對象上等待的線程,如果一個線程調用了Condition.await()方法,那么該線程將會釋放鎖、構造成節點加入等待隊列並進入等待狀態。事實上,節點的定義復用了同步器中節點的定義, 也就是說,同步隊列和等待隊列中節點類型都是同步器的靜態內部類AbstractQueuedSynchronizer.Node

  如圖所示,Condition擁有首尾節點的引用,而新增節點只需要將原有的尾節點nextWaiter指向它,並且更新尾節點即可。 上述節點引用更新的過程並沒有使用CAS保證,原因在於調用await()方法的線程必定是獲取了鎖的線程,也就是說該過程是由鎖來保證線程安全的。
  在Object的監視器模型上,一個對象擁有一個同步隊列和等待隊列, 而並發包中的Lock實現類擁有一個同步隊列和多個等待隊列:

  如上圖所示,Condition的實現是同步器的內部類,因此每個Condition實例都能夠訪問同步器提供的方法,相當於每個Condition都擁有所屬同步器的引用。
等待:
  調用Condition的await()方法(或者以await開頭的方法),會使當前線程進入等待隊列並釋放鎖,同時線程狀態變為等待狀態。當從await()方法返回時,當前線程一定獲取了Condition相關聯的鎖。
  如果從隊列(同步隊列和等待隊列)的角度看await()方法,當調用await()方法時,相當於同步隊列的首節點(獲取了鎖的節點)移動到Condition的等待隊列中。
   Condition的await()方法:
public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter(); //當前線程加入等待隊列
    int savedState = fullyRelease(node); //釋放同步狀態,也就是釋放鎖
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null) // clean up if cancelled
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}
  調用該方法的線程成功獲取了鎖的線程,也就是同步隊列中的首節點,該方法會將當前線程構造成節點並加入等待隊列中,( 因為已經獲取了同步狀態,所以無需通過cas,在隊列尾部添加等待節點 )然后釋放同步狀態,喚醒同步隊列中的后繼節點,然后當前線程會進入等待狀態。
  當等待隊列中的節點被喚醒,則喚醒節點的線程開始嘗試獲取同步狀態。如果不是通過其他線程調用Condition.signal()方法喚醒,而是對等待線程進行中斷,則會拋出InterruptedException。
  如果從隊列的角度去看,當前線程加入Condition的等待隊列,如圖所示,同步隊列的首節點並不會直接加入等待隊列,而是通過addConditionWaiter()方法把當前線程構造成一個新的節點並將其加入等待隊列中:
通知:
  調用Condition的signal()方法,將會喚醒在等待隊列中等待時間最長的節點(首節點),在喚醒節點之前,會將節點移到同步隊列末尾。
  signal方法:
public final void signal() {
    if (!isHeldExclusively()) //是否獲取了鎖(重入鎖中是直接 return  獨占鎖線程==當前線程)
        throw new IllegalMonitorStateException();
    Node first = firstWaiter; //等待隊列頭節點
    if (first != null)
        doSignal(first);
}
private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null) //等待隊列首節點的后繼節點為空,說明只有一個節點,那么尾節點也置空
            lastWaiter = null; 
        first.nextWaiter = null;
    } while (!transferForSignal(first)  && (first = firstWaiter) != null); // 等待隊列首節點喚醒失敗,則喚醒下個節點
}
final boolean transferForSignal(Node node) {// 將節點加入同步隊列
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) // 設置節點狀態為0----(等待隊列狀態只能為-2,-3,用-2進行cas設置失敗,說明是-3)
        return false;
    Node p = enq(node); // 放入同步隊列尾部
    int ws = p.waitStatus;
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
        LockSupport.unpark(node.thread);
    return true;
}
  調用signal方法的前置條件是當前線程必須獲取了鎖,可以看到signal()方法進行了isHeldExclusively()檢查,也就是當前線程必須是獲取了鎖的線程。接着獲取等待隊列的首節點,將其移動到同步隊列並使用LockSupport喚醒節點中的線程。
  節點從等待隊列移動到同步隊列的過程如下圖所示:
  被喚醒后的線程,將從await()方法中的while循環中退出(isOnSyncQueue(Node node)方法返回true,節點已經在同步隊列中),進而調用同步器的acquireQueued()方法加入到獲取同步狀態的競爭中。成功獲取同步狀態之后,被喚醒的線程將從先前調用的await()方法返回,此時該線程已經成功地獲取了鎖。
  Condition的signalAll()方法,相當於對等待隊列中的每個節點均執行一次signal()方法,效果就是將等待隊列中所有節點全部移動到同步隊列中,並喚醒每個節點的線程。
-----------------------------------------------------------------------------------
想到的問題:
  1、wait,sleep,park都有啥區別,這個讓出cpu資源的操作底層實現是一樣的么?
  wait是Object的方法,會釋放鎖,原理是monitor機制(搶占對象頭信息的鎖標志位),是個native方法,也就是是java的一套機制,底層是用c編寫的,具體如何掛起線程的邏輯未知。
  sleep是Thread的方法,不會釋放鎖,也是個native方法,具體底層邏輯未知。
  park是unsafe類的方法,這個unsafe類提供了很多直接操作內存的方法,這個park也是個native方法,openjdk源碼顯示底層調用了操作系統的函數,停掉了線程。wait跟sleep應該也是類似原理。
  2、線程的喚醒是怎么實現的,在線程非常多的情況下,喚醒了就能馬上執行么?
  喚醒肯定也是最終調用os的函數來實現的,線程非常多的情況下,硬件線程數固定,操作系統或者jvm肯定也要有相應機制來調用線程,這個喚醒只是讓線程“醒”了而已,醒了!=執行,所以,喚醒了可能並不能馬上執行,而是要等待別的線程執行完后進行搶占,成功后才能執行。

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM