ConditionObject是AQS中的內部類,提供了條件鎖的同步實現,實現了Condition接口,並且實現了其中的await(),signal(),signalALL()等方法。
AbstractQueuedSynchronizer(AQS)的分析點此
ConditionObject主要是為並發編程中的同步提供了等待通知的實現方式,可以在不滿足某個條件的時候掛起線程等待。直到滿足某個條件的時候在喚醒線程。
使用方式如下:
Lock lock = new ReentrantLock(); Condition condition = lock.newCondition();//創建和該鎖關聯的條件鎖 public void conditionWait() throws InterruptedException{ lock.lock(); try { condition.await(); }finally { lock.unlock(); } } public void ConditionSignal() throws InterruptedException{ lock.lock(); try { condition.signal(); }finally { lock.unlock(); } }
lock()和unlock()在AQS一文中已經分析過其實現方式了,這里主要分析ConditionObject中的await()和signal()的實現分析。
在一個AQS同步器中,可以定義多個Condition,只需要多次lock.newCondition(),每次都會返回一個新的ConditionObject對象。
在ConditionObject中,通過一個等待隊列來維護條線等待的線程。所以在一個同步器中可以有多個等待隊列,他們等待的條件是不一樣的。
等待隊列
等待隊列是一個FIFO的隊列,在隊列的每個節點都包含了一個線程引用。該線程就是在Condition對象上等待的線程。這里的節點和AQS中的同步隊列中的節點一樣,使用的都是AbstractQueuedSynchronizer.Node類。每個調用了condition.await()的線程都會進入到等待隊列中去。
在Condition中包含了firstWaiter和lastWaiter,每次加入到等待隊列中的線程都會加入到等待隊列的尾部,來構成一個FIFO的等待隊列。
下面看看await()方法的具體實現
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); //把當前線程的節點加入到等待隊列中 int savedState = fullyRelease(node); //由於調用await()方法的線程是已經獲取了鎖的,所以在加入到等待隊列之后,需要去釋放鎖,並且喚醒后繼節點線程 int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); //掛起當前線程,當別的線程調用了signal(),並且是當前線程被喚醒的時候才從park()方法返回 if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //當被喚醒后,該線程會嘗試去獲取鎖,只有獲取到了才會從await()方法返回,否則的話,會掛起自己 interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
可以看到這個方法是會響應中斷的。
private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { //首先判斷lastWaiter節點是否為空,或者是否是處於條件等待,如果不是的話則把它從等待隊列中刪除。 unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) //把當前線程構建的節點加入到等待隊列中去,並且返回當前節點 firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; }
final int fullyRelease(Node node) { boolean failed = true; try { int savedState = getState(); if (release(savedState)) { failed = false; return savedState; } else { throw new IllegalMonitorStateException(); } } finally { if (failed) node.waitStatus = Node.CANCELLED; } }
在看看signal()方法的具體實現:
private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && //從first開始遍歷等待隊列,把第一個非空、沒取消的節點transfer到同步隊列 (first = firstWaiter) != null); } public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }
signal()方法首先會判斷當前線程是不是獨占的持有鎖,然后喚醒等待隊列中的第一個等待線程。
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) */ final boolean transferForSignal(Node node) { /* * If cannot change waitStatus, the node has been cancelled. */ if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); //返回的是node的前一個節點 int ws = p.waitStatus; if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); //喚醒剛加入到同步隊列的線程,被喚醒之后,該線程才能從await()方法的park()中返回。 return true; }