若有不正之處請多多諒解,並歡迎批評指正。
請尊重作者勞動成果,轉載請標明原文鏈接:
http://www.cnblogs.com/go2sea/p/5618628.html
AbstractQueuedSynchronizer(AQS)是一個同步器框架,在實現鎖的時候,一般會實現一個繼承自AQS的內部類sync,作為我們的自定義同步器。AQS內部維護了一個state成員和一個隊列。其中state標識了共享資源的狀態,隊列則記錄了等待資源的線程。以下這五個方法,在AQS中實現為直接拋出異常,這是我們自定義同步器需要重寫的方法:
①isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
②tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗返回false。
③tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗返回false。
④tryAcquireShared(int):共享方式。嘗試獲取資源。成功返回true,失敗返回false。
⑤tryReleaseShared(int):共享方式。嘗試釋放資源,成功則返回true,失敗返回false。
其中isHeldExclusively需要在使用Condition時重寫,他在AQS中的調用全部發生在其內部類ConditionObject的方法中。②③和④⑤分別對應了AQS定義的兩種資源共享的方式:Exclusive&share,例如ReentrantLock就是一種獨占鎖,CountDownLatch和Semaphore是共享鎖。與CountDownLatch有一定相似性的CyclicBarrier並沒有自己的共享同步器,而是使用Lock和Condition來實現的(關於CyclicBarrier的詳解可以參考本人的另一篇博文http://www.cnblogs.com/go2sea/p/5615531.html)。
下面是一個簡單的獨占鎖的實現,它是不可重入的。它重寫了AQS的tryAcquire方法和tryRelease方法:
1 import java.io.Serializable; 2 import java.util.concurrent.TimeUnit; 3 import java.util.concurrent.locks.AbstractQueuedSynchronizer; 4 import java.util.concurrent.locks.Condition; 5 import java.util.concurrent.locks.Lock; 6 7 class Mutex implements Lock, Serializable { 8 //自定義同步器,繼承自AQS 9 private static class Sync extends AbstractQueuedSynchronizer { 10 //試圖獲取鎖,當state為0時能成功獲取, 11 public boolean tryAcquire(int acquires) { 12 assert acquires == 1; //這是一個對於state進行操作的量,含義自定義 13 if (compareAndSetState(0, 1)) { //注意:這是一個原子操作 14 setExclusiveOwnerThread(Thread.currentThread()); 15 return true; 16 } 17 return false; 18 } 19 //釋放鎖,此時state應為1,Mutex處於被獨占狀態 20 protected boolean tryRelease(int releases) { 21 assert releases == 1; // Otherwise unused 22 if (getState() == 0) throw new IllegalMonitorStateException(); 23 setExclusiveOwnerThread(null); 24 setState(0); 25 return true; 26 } 27 //返回一個Condition 28 Condition newCondition() { return new ConditionObject(); } 29 } 30 31 private final Sync sync = new Sync(); 32 33 public void lock() { sync.acquire(1); } 34 public boolean tryLock() { return sync.tryAcquire(1); } 35 public void unlock() { sync.release(1); } 36 public Condition newCondition() { return sync.newCondition(); } 37 public void lockInterruptibly() throws InterruptedException { 38 sync.acquireInterruptibly(1); 39 } 40 public boolean tryLock(long timeout, TimeUnit unit) 41 throws InterruptedException { 42 return sync.tryAcquireNanos(1, unit.toNanos(timeout)); 43 } 44 }
我們可以看到,利用AQS實現一個簡單的自定義鎖看上去並不復雜,讓我們以此為例,來學習一下AQS的內部原理吧。
一、acquire 獲取鎖
我們先來看一下Mutex重寫的tryAcquire方法:
//試圖獲取鎖,當state為0時能成功獲取, public boolean tryAcquire(int acquires) { assert acquires == 1; //這是一個對於state進行操作的量,含義自定義 if (compareAndSetState(0, 1)) { //注意:這是一個原子操作 setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; }
注意:當我們初始化一個Sync的時候,如果沒有指定state的初值(無參數),那么state的默認初值是0。可以看到,方法開頭首先有一個斷言acquires==1,參數acquires代表要在state上做的改變的量(減去或增加),在Mutex中,我們定義state只有兩個狀態:0或1,0代表共享資源可以被獲取,1表示共享資源正在被占用,因此Mutex是不可重入的。實際上,自定義同步器通過重寫tryAcquire和tryRelease來定義state代表的意義和資源的共享方式,這是同步器的主要任務。Mutex的tryAcquire使用一個原子操作compareAndSetState來試圖獲取資源,這個原子操作由上層的AQS提供,如果成功,將當前線程設置為獨占線程並返回true。
Mutex的lock方法調用了sync的acquire方法,acquire方法實現為:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
它首先調用tryAquire去獲取共享資源,如果失敗,調用addWaiter將當前線程放入等待隊列,返回持有當前線程的Node對象,然后調用acquireQueued方法來監視等待隊列並獲取資源。acquireQueued方法會阻塞,直到成功獲取。注意,acquire方法不能及時響應中斷,只能在成功獲取鎖之后,再來處理。中斷當前線程的操作跑出的異常在acquireQueued方法中被捕獲,外部調用者沒能看到這個異常,因此調用selfInterrupt來重置中斷標識。
我們需要詳細了解addWaiter方法和acquireQueued方法,之后再來回顧acquire的過程,才能對整個獲取鎖的流程有比較詳細的了解。
我們先來看addWaiter方法:
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
addWaiter首先將當前線程包裝在一個Node對象node中,然后獲取了一下隊列的尾節點,如果隊列不為空(tail不為null)的話,調用一個CAS函數試圖將node放入等待隊列的尾部,注意,此時可能發生競爭,如果有另外一個線程在兩個if之間搶先更新的隊列的尾節點,CAS操作將會失敗,這時會調用enq方法,繼續試圖將node放入隊列:
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
enq方法會循環檢測隊列,如果隊列為空,則調用CAS函數初始化隊列(此時node==head==tail),否則調用CAS函數將node放入隊列尾。注意,這兩個CAS是由AQS提供的原子操作。如果CAS失敗,enq會繼續循環檢測,直到成功將node入列。enq方法的這種方式有一個專用的名詞:CAS自旋,這種方式在AQS中有多處應用。這里有一個隱含的知識點,即tail是一個volatile成員,確保某個線程更新隊列后對其他線程的可見性。
注意:隊列為空的時候,第一個線程進入隊列的情況有點tricky:第一個發現隊列為空並初始化隊列(head節點)的線程不一定優先拿到資源。head節點被初始化后,當前線程需要下一次旋轉才有機會進入隊列,在這期間,完全有可能半路殺出程咬金,將當前線程與它初始化出的head節點無情分開。我們來總結一下,當隊列只有一個節點時(head=tail),有兩種情況:第一種是這個隊列剛剛被初始化,head並沒有持有任何線程對象。這個狀態不會持續太久,初始化隊列的線程有很大機會在下次自旋時把自己接到隊尾。第二種情況是,所有等待線程都已經獲得資源並繼續執行下去了,隊列僅有的節點是最后一個獲取共享資源的線程,等到下一個線程到達等待隊列並將它踢出隊列之后,它才有機會被回收。
enq執行完畢,我們已經成功把當前線程放入等待隊列,接下來的任務就是監視隊列,等待獲取資源。這個過程由acquireQueued方法實現:
final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
acquireQueued方法是一個很重要的方法,在分析這個方法之前,我們先來說一下AQS中的那個等待隊列。這個隊列實際上是一個CLH隊列,它保證了競爭資源的線程按到達順序來獲取資源,避免了飢餓的發生。CLH隊列的工作過程,就是acquireQueued方法的工作過程。很明顯,這又是一個自旋。首先,我們調用predecessor方法獲取當前線程的前驅節點,如果這個前驅是head節點,就緊接着調用tryAcquire去獲取共享資源,當然這是有可能失敗的,因為head節點可能剛剛“上位”,還沒有釋放資源。如果很幸運,我們拿到了資源,就調用setHead將node設置為隊列的頭結點,setHead方法同時會將node的prev置為null,緊接着將原先head的next也置為null,顯然這是為了讓其后續被回收。注意:acquireQueued方法在自旋過程中是不可被中斷的,當然它會檢測到中斷(在parkAndCheckInterrupt方法中檢測中斷標志),但並不會因此結束自旋,只能在獲得資源退出方法后,反饋給上層的方法:我剛剛被中斷了。還記得acquire方法中的selfInterrupt的調用嗎,就是為了“補上”這里沒有響應的中斷。
好,我們繼續往下。獲取資源失敗后(原因有二,head與我之間還有等待線程或者head節點的線程正在使用資源),調用shouldParkAfterFailedAcquire方法檢測是否該去“休息”下,畢竟一直自旋很累嘛。如果可以休息就調用parkAndCheckInterrupt放心去休息。我們先來看一下shuldParkAfterFailedAcquire:
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) /* * This node has already set status asking a release * to signal it, so it can safely park. */ return true; if (ws > 0) { /* * Predecessor was cancelled. Skip over predecessors and * indicate retry. */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking. */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
我們首先了解一下waitStatus。Node對象維護了一個int成員waitStatus,他的可能取值如下:
static final int CANCELLED = 1; static final int SIGNAL = -1; static final int CONDITION = -2; static final int PROPAGATE = -3;
下面解釋一下每個值的含義
CANCELLED:因為超時或者中斷,結點會被設置為取消狀態,被取消狀態的結點不應該去競爭鎖,只能保持取消狀態不變,不能轉換為其他狀態。處於這種狀態的結點會被踢出隊列,被GC回收;
SIGNAL:表示這個結點的繼任結點被阻塞了,到時需要通知它;
CONDITION:表示這個結點在條件隊列中,因為等待某個條件而被阻塞;
PROPAGATE:使用在共享模式頭結點有可能處於這種狀態,表示鎖的下一次獲取可以無條件傳播;
0:None of the above,新結點會處於這種狀態。
在我們的Mutex的例子中,節點的waitStatus只可能有CANCELLED、SIGNAL和0三中狀態(事實上,獨占模式下所有不使用Condition的同步器都是這樣)。
我們繼續來分析shouldParkAfterFailedAcquire方法:
首先檢測下node的前驅節點pred,如果pred狀態已經被置為SIGNAL,直接返回true。否則,從node的前驅繼續往前找,直到找到一個waitStatus小於等於0的節點,設置該點為node的前驅(注意:此時node與這個節點之間的節點從等待隊列中被“摘下”,等待被回收了)並返回false。返回之后,上層的acquireQueued方法繼續自旋,再次進入shouldParkAfterFailedAcquire方法之后,如果發現node前驅不是取消狀態且waitStatus不等於SIGNAL,調用CAS函數進行注冊。注意:這個操作可能失敗,因此不能直接返回true,而是返回false由上層的自旋再次調用shouldParkAfterFailedAcquire直到確認注冊成功。
歷盡曲折,我們終於可以安心休息了:
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
parkAndCheckInterrupt方法十分簡單,他調用LockSupport的靜態方法park阻塞當前線程,直到被中斷,這次中斷會被acquireQueued記錄,但不會立即響應,直到自旋完成。注意:返回操作中的interrupted方法會將中斷標志復位,因此我們在上層需要將這個中斷“補上”,再一次:還記得大明湖邊的selfInterrupt嗎?
二、release 釋放鎖
我們先來看一下Mutex中重寫的tryRelease方法:
//釋放鎖,此時state應為1,Mutex處於被獨占狀態 protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); setState(0); return true; }
邏輯比較簡單,首先將獨占線程置為null,緊接着將state設置為0,這里不會發生資源競爭,因此不需要用CAS去設置state值,直接置0即可。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
好,我們開始分析release方法。首先調用tryRelease試圖釋放共享資源,緊接着檢測自己的waitStatus是否為SIGNAL,如果是的話,調用unparkSuccessor喚醒隊列中的下一個線程。獨占模式下,waitStatus!=0與waitStatus==-1等價(這里waitStatus不會為CANCELLED,因為已經獲取資源了)。如果不為SIGNAL,說明如果有下個等待線程,它正在自旋。所以直接返回true即可。我們來看下unparkSuccessor方法:
private void unparkSuccessor(Node node) { /* * If status is negative (i.e., possibly needing signal) try * to clear in anticipation of signalling. It is OK if this * fails or if status is changed by waiting thread. */ int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0); /* * Thread to unpark is held in successor, which is normally * just the next node. But if cancelled or apparently null, * traverse backwards from tail to find the actual * non-cancelled successor. */ Node s = node.next; if (s == null || s.waitStatus > 0) { s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread); }
unparkSuccessor方法也會在共享模式的工作流程中被調用,因此方法開始做的判斷是有必要的。對於獨占模式而言,ws應該都是0。然后找到下一個需要被喚醒的線程並調用LockSupport的靜態方法unpark喚醒等待線程。
至此,我們比較詳細地了解了acquire&release的工作流程。
三、acquireShared 獲取鎖
下面,我們來學習下共享模式下的獲取&釋放鎖的工作流程。
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
acquireShared方法首先調用tryAcquireShared試圖獲取共享資源。tryAcquireShared的返回值表示剩余資源個數,負值表示獲取失敗,0表示獲取成功但已無剩余資源。如果獲取失敗,調用doAcquireShared方法完成獨占模式下類似的操作,后面我們會詳細分析。注意,doAcquireShared方法在等待資源的過程中也是不響應中斷的,它能覺察到中斷,但在成功獲取資源之前不會處理。
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
doAcquireShared方法與acquireQueued方法相似,不同的地方在於,共享模式下成功獲取資源並將head指向自己之后,要檢查並試圖喚醒之后的等待線程。因為共享資源可能剩余,可以被后面的等待線程獲取。
private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
setHeadAndPropagate中有一個長長的if,來判斷是否應該去試圖喚醒后面的線程。其中h==null的判斷筆者始終不能理解,因為查看代碼發現,之后隊列尚未初始化的時候為空,后續都不可能為空了。關於這點希望各位看官不吝指教。其他情況,propagate大於0,表示尚有資源可被獲取,顯然應該繼續判斷;而當h.waitStatus小於0時,它有兩種取值可能,SIGNAL和PROPAGATE,我們將在后面看到,這兩種情況都是應該繼續判斷。后續是對node的后繼進行的判斷,注意,node此時可能已經不是head節點了,因為這是共享模式,所以可能有一個node的后繼成功獲取資源后,把自己設為head,將node踢出了隊列。這種情況下node的后繼s是可能為null的,但貌似這種情況doReleaseShared的調用沒有意義。s.isShared的判斷主要是考慮到讀寫鎖的情況,在讀寫鎖的使用過程中,申請寫鎖(獨占模式)和申請讀鎖(共享模式)的線程可能同時存在,這個判斷發現后即線程是共享模式的時候,調用soReleaseShared方法喚醒他。
但總之,我們十分保守謹慎地調用了doReleaseShared方法試圖喚醒后繼線程:
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
又是一個自旋。我們首先獲取head節點h,然后檢查它的waitStatus是否為SIGNAL,如果是的話,調用CAS將h的waitStatus設置為0,並調用unparkSuccessor喚醒下一個等待線程。注意,這里調用CAS方法而不是直接賦值,是因為在共享模式下,這里可能發生競爭。doReleaseShared方法可能由head節點在使用完共享資源后主動調用(后續在releaseShared方法中可以看到),也可能由剛剛“上位”的等待線程調用,在上位之后,原來的head線程已被踢出隊列。
因此,doReleaseShared方法的執行情況變得比較復雜,需要細致分析。
第一種情況,只有剛剛釋放資源的head線程調用,這時候沒有競爭,waitStatus是SIGNAL,就去喚醒下個線程,是0,就重置為PROPAGATE。
第二種情況,剛剛釋放完資源的舊head,和剛剛上位的新head同時調用doReleaseShared方法,這時候最新的head獲取的都是自己,若干被踢出的舊head獲取的可能是舊head,也可能是新head,這些被踢出的舊head線程也在根據自己獲取的head(不管新舊)的狀態進行CAS操作和unparkSuccessor操作,幸運的是(必須幸運啊。。),這些操作不會造成錯誤,只是多了一些喚醒而已(這些喚醒可能導致一個線程獲得資源,也可能是一個“虛晃”)。
我們可以發現,不管head引用怎樣更迭,最終新head的waitStatus都會被順利處理。注意,可能有多個舊head同時參與這個過程,都不影響正確性。
我們注意到,一個新head,在他剛上位的時候有機會調用一次setHeadAndPropagate進而調用doReleaseShared,在他釋放資源之后,又一次調用doReleaseShared(這次是必然的)。第一次調用時,不管新head的waitStatus是0還是SIGNAL,最終狀態都被PROPAGATE(當然,被踢出隊列的head可能還沒來得及設置成PROPAGATE,但新上位的head最終會被設置),這也符合PROPAGATE的語義:使用在共享模式頭結點有可能處於這種狀態,表示鎖的下一次獲取可以無條件傳播。
還有一個問題,它是由SIGNAL-->0-->PROPAGATE變化而來的,為什么不是SIGNAL-->PROPAGA這樣直接變化呢?原因是unparkSuccessor方法會試圖將當前node的waitStatus復位成0,如果我們直接SIGNAL-->PROPAGA后,那么又被復位成0,還需要一次CAS操作置為PROPAGATE。
四、releaseShared 釋放鎖
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
我們可以看到,調用tryReleaseShared成功釋放共享資源之后,最終要再次調用doReleaseShared試圖喚醒后面的等待線程。
五、ConditionObject
關於Condition的內容請看筆者的另一篇博文Condition源碼學習筆記。
至此,我們對獨占模式和共享模式下、不響應中斷的、沒有等待時間參數的獲取資源和釋放資源的流程有了初步了解。這時去看JUC包中的鎖的源碼,相信會有更深的理解。