經過前面幾篇文章的鋪墊,今天我們終於要看看AQS
的廬山真面目了,建議第一次看AbstractQueuedSynchronizer
類源碼的朋友可以先看下我前面幾篇文章:
分析源碼是非常枯燥乏味的一件事,其實代碼本身其實就是最好的說明了,因此基本都是貼出一些代碼加上一些注釋, 因為AbstractQueuedSynchronizer
上千行代碼不可能也不需要每行都要分析,所以只撿一些關鍵的地方或 比較難理解的地方做說明,有一些地方可能我理解的有出入,歡迎大家指正。 詳細的注釋我都放在了GitHub上。
前面提到AQS
是基於CLH lock queue的,AbstractQueuedSynchronizer
是通過一個內部類Node
實現了一個變種。 前面基本說明了Node的主要內容,但這個類還有一些其他重要的字段:
//標記當前結點是共享模式 static final Node SHARED = new Node(); //標記當前結點是獨占模式 static final Node EXCLUSIVE = null; //結點的等待狀態。 volatile int waitStatus; //擁有當前結點的線程。 volatile Thread thread;
其中waitStatus
很重要,用來控制線程的阻塞/喚醒,以及避免不必要的調用LockSupport的park/unpark方法。 它主要有以下幾個取值:
//代表線程已經被取消 static final int CANCELLED = 1; //代表后續節點需要喚醒 static final int SIGNAL = -1; //代表線程在condition queue中,等待某一條件 static final int CONDITION = -2; //代表后續結點會傳播喚醒的操作,共享模式下起作用 static final int PROPAGATE = -3;
出隊操作
只要設置新的head
結點就可以了。
private void setHead(Node node) { head = node; node.thread = null; node.prev = null; }
入隊操作
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; // 這個if分支其實是一種優化:CAS操作失敗的話才進入enq中的循環。 if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
獨占模式獲取
public final void acquire(int arg) { // tryAcquire 由子類實現本身不會阻塞線程,如果返回 true,則線程繼續, // 如果返回 false 那么就 加入阻塞隊列阻塞線程,並等待前繼結點釋放鎖。 if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // acquireQueued返回true,說明當前線程被中斷喚醒后獲取到鎖, // 重置其interrupt status為true。 selfInterrupt(); }
一旦tryAcquire成功則立即返回,否則線程會加入隊列 線程可能會反復的被阻塞和喚醒直到tryAcquire成功,這是因為線程可能被中斷, 而acquireQueued方法中會保證忽視中斷,只有tryAcquire成功了才返回。中斷版本的獨占獲取是acquireInterruptibly
這個方法, doAcquireInterruptibly
這個方法中如果線程被中斷則acquireInterruptibly
會拋出InterruptedException
異常。
addWaiter
方法只是入隊操作,acquireQueued
方法是主要邏輯,需要重點理解。
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; // 等待前繼結點釋放鎖 // 自旋re-check for (;;) { // 獲取前繼 final Node p = node.predecessor(); // 前繼是head,說明next就是node了,則嘗試獲取鎖。 if (p == head && tryAcquire(arg)) { // 前繼出隊,node成為head setHead(node); p.next = null; // help GC failed = false; return interrupted; } // p != head 或者 p == head但是tryAcquire失敗了,那么 // 應該阻塞當前線程等待前繼喚醒。阻塞之前會再重試一次,還需要設置前繼的waitStaus為SIGNAL。 // 線程會阻塞在parkAndCheckInterrupt方法中。 // parkAndCheckInterrupt返回可能是前繼unpark或線程被中斷。 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) // 說明當前線程是被中斷喚醒的。 // 注意:線程被中斷之后會繼續走到if處去判斷,也就是會忽視中斷。 // 除非碰巧線程中斷后acquire成功了,那么根據Java的最佳實踐, // 需要重新設置線程的中斷狀態(acquire.selfInterrupt)。 interrupted = true; } } finally { // 出現異常 if (failed) cancelAcquire(node); } }
基本每行都有注釋,但得結合shouldParkAfterFailedAcquire
和parkAndCheckInterrupt
這兩個方法來一起理解會更 容易些。shouldParkAfterFailedAcquire方法的作用是:
- 確定后繼是否需要park;
- 跳過被取消的結點;
- 設置前繼的waitStatus為SIGNAL.
int ws = pred.waitStatus; if (ws == Node.SIGNAL)// 前繼結點已經准備好unpark其后繼了,所以后繼可以安全的park /* * This node has already set status asking a release to signal it, * so it can safely park. */ return true; if (ws > 0) {// CANCELLED // 跳過被取消的結點。 do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else {// 0 或 PROPAGATE (CONDITION用在ConditonObject,這里不會是這個值) /** * waitStatus 等於0(初始化)或PROPAGATE。說明線程還沒有park,會先重試 * 確定無法acquire到再park。 */ // 更新pred結點waitStatus為SIGNAL compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false;
parkAndCheckInterrupt就是用LockSupport來阻塞當前線程,很簡單:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
線程被喚醒只可能是:被unpark
,被中斷或偽喚醒。被中斷會設置interrupted
,acquire方法返回前會 selfInterrupt
重置下線程的中斷狀態,如果是偽喚醒的話會for循環re-check。
獨占模式釋放
比較簡單只要直接喚醒后續結點就可以了,后續結點會從parkAndCheckInterrupt
方法中返回。
public final boolean release(int arg) { // tryReease由子類實現,通過設置state值來達到同步的效果。 if (tryRelease(arg)) { Node h = head; // waitStatus為0說明是初始化的空隊列 if (h != null && h.waitStatus != 0) // 喚醒后續的結點 unparkSuccessor(h); return true; } return false; }
共享模式獲取
acquireShared
方法是用來共享模式獲取。
public final void acquireShared(int arg) { //如果沒有許可了則入隊等待 if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { // 添加隊列 final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; // 等待前繼釋放並傳遞 for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg);// 嘗試獲取 if (r >= 0) { // 獲取成功則前繼出隊,跟獨占不同的是 // 會往后面結點傳播喚醒的操作,保證剩下等待的線程能夠盡快 獲取到剩下的許可。 setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } // p != head || r < 0 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
核心是這個doAcquireShared
方法,跟獨占模式的acquireQueued
很像,主要區別在setHeadAndPropagate
方法中, 這個方法會將node設置為head。如果當前結點acquire到了之后發現還有許可可以被獲取,則繼續釋放自己的后繼, 后繼會將這個操作傳遞下去。這就是PROPAGATE
狀態的含義。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * 嘗試喚醒后繼的結點:<br /> * propagate > 0說明許可還有能夠繼續被線程acquire;<br /> * 或者 之前的head被設置為PROPAGATE(PROPAGATE可以被轉換為SIGNAL)說明需要往后傳遞;<br /> * 或者為null,我們還不確定什么情況。 <br /> * 並且 后繼結點是共享模式或者為如上為null。 * <p> * 上面的檢查有點保守,在有多個線程競爭獲取/釋放的時候可能會導致不必要的喚醒。<br /> * */ if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; // 后繼結是共享模式或者s == null(不知道什么情況) // 如果后繼是獨占模式,那么即使剩下的許可大於0也不會繼續往后傳遞喚醒操作 // 即使后面有結點是共享模式。 if (s == null || s.isShared()) // 喚醒后繼結點 doReleaseShared(); } } private void doReleaseShared() { for (;;) { Node h = head; // 隊列不為空且有后繼結點 if (h != null && h != tail) { int ws = h.waitStatus; // 不管是共享還是獨占只有結點狀態為SIGNAL才嘗試喚醒后繼結點 if (ws == Node.SIGNAL) { // 將waitStatus設置為0 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h);// 喚醒后繼結點 // 如果狀態為0則更新狀態為PROPAGATE,更新失敗則重試 } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } // 如果過程中head被修改了則重試。 if (h == head) // loop if head changed break; } }
共享模式釋放
主要邏輯也就會doReleaseShared
。
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
獨占和共享模式除了對應的中斷版本,還有超時版本,整體代碼相差不大,具體再贅述了。提前前面文章 中提到的自旋
,好像目前整個AQS中都沒用到這個功能,accquire中for循環主要作用不是為了自旋,那么 它用在什么地方呢?AQS中有一個變量:
static final long spinForTimeoutThreshold = 1000L;
這個變量用在doAcquireNanos
方法,也就是支持超時的獲取版本。
private boolean doAcquireNanos(int arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); boolean failed = true; try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return true; } if (nanosTimeout <= 0)// 超時 return false; // nanosTimeout > spinForTimeoutThreshold // 如果超時時間很短的話,自旋效率會更高。 if (shouldParkAfterFailedAcquire(p, node) && nanosTimeout > spinForTimeoutThreshold) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; if (Thread.interrupted()) throw new InterruptedException(); } } finally { if (failed) cancelAcquire(node); } }
AQS的的主要內容其實差不多看完了,但是上面的邏輯中waitStatus中有一個狀態還沒涉及到那就是CONDITION
, 下一篇博客《Java並發包源碼學習之AQS框架(五)ConditionObject源碼分析》中會介紹它。