一概述
談論到並發,不得不談論鎖,而談論到鎖而言,又離不開ReentrantLock.ReentrantLock是鎖鎖的一種實現方式,對於鎖而言,我們這里就需要討論到AQS,即上面的AbstractQueuedLongSynchronize。我直接翻譯過來就叫做抽象隊列同步器。它規定了多線程訪問並發資源的策略,或者提供了一種多線程訪問資源的機制,也可以認為是規定多線程訪問共享資源的框架。
二框架
AbstractOwnableSynchronizer:在談論本文的主角之前,我們先來看看AQS的結構體系:
public abstract class AbstractQueuedLongSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
AQS繼承AOS,我將AOS稱之為抽象擁有同步器。哈哈,這樣翻譯過來可能不是太准確。我么看看AOS中都有什么。
public abstract class AbstractOwnableSynchronizer implements java.io.Serializable { /**序列化版本號*/
private static final long serialVersionUID = 3737899427754241961L; /**無參數的構造器*/
protected AbstractOwnableSynchronizer() { } /**獨占模式、鎖的持有者*/
private transient Thread exclusiveOwnerThread; /**設置該線程為鎖的持有者(獨占模式)*/
protected final void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } /**獲取獨占模式下鎖的持有者對象*/
protected final Thread getExclusiveOwnerThread() { return exclusiveOwnerThread; } }
從上面的AOS可以看出,主要有兩個方法和一個成員變量。都是以獨占模式下的關於線程獲取鎖和設置鎖的一些操作。但是並沒有實現,而是讓子類自行去實現。看如下的示意圖:
在上圖中我們可以看到存在一個變量的狀態。在AQS中維護了一個長型的變量狀態以及一個FIFO(先進先出的隊列)的隊列,當多線程爭共享資源時,收到阻塞會進入該隊列。
private volatile long state;
關於狀態值狀態的訪問存在三種形式:
- getState():
protected final long getState() { return state; }
- setState(long newState):
protected final void setState(long newState) { state = newState; }
- compareAndSetState(long expect,long update):
protected final boolean compareAndSetState(long expect, long update) { // See below for intrinsics setup to support this
return unsafe.compareAndSwapLong(this, stateOffset, expect, update); }
這三個方法很容易理解,的getState()是獲取當前可用資源的個數,的setState()為是當前可用資源重新設置值。左右的一個方法CAS方法是多線程在並發修改狀態值時做的原子操作,允許某一時刻只有一個線程修改成功。從另外一個角度來看,也就是第一個和第二個方法是單個線程在做的操作,CAS方法是多線程操作,但是只有一個會修改成功,是一個輕量級的鎖,關於CAS論述,在這里不再贅述。
AQS定義了兩種訪問資源的規則:
1 exclusive(獨占鎖),獨占式訪問,可以認為是在某某時刻只允許一個線程來操作。比如說寫鎖,在寫的時候不允許其它線程寫或者讀.2 share(共享鎖)。允許多個線程對同一個資源做操作。使得串行化的任務並行執行。在並發包下的Semaphore,CountDownLatch,以及ReentrantReadWriteLock都是可以共享執行的。獨占鎖中的ReentrantLock的則是一個典型的排它鎖或者獨占鎖,和前面的幾個恰好相反。不同的自定義同步器爭用共享資源的方式也不同。自定義同步器在實現時只需要實現共享資源狀態的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法(這些方法是采用的模板方法來設計的,需要繼承的類去實現):
- isHeldExclusively():該線程是否正在獨占資源。只有用到條才才需要去實現它。
- tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗; 0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
-
tryReleaseShared(int):共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。
以ReentrantLock為例,state初始化為0,表示未鎖定狀態.A線程lock()時,會調用tryAcquire()獨占該鎖並將狀態+ 1。此后,其他線程再的tryAcquire()時就會失敗,直到甲線程解鎖()到狀態= 0(即釋放鎖)為止,其它線程才有機會獲取該鎖。當然,釋放鎖之前,A線程自己是可以重復獲取此鎖的(狀態會累加),這就是可重入的概念。但要注意,獲取多少次就要釋放多么次,這樣才能保證狀態是能回到零態的。再以CountDownLatch以例,任務分為Ñ個子線程去執行,狀態也初始化為N(注意Ñ要與線程個數一致)。這Ñ個子線程是並行執行的,每個子線程執行完后COUNTDOWN()一次,狀態會CAS減1.等到所有子線程都執行完后(即狀態= 0),會取消駐留()主調用線程,然后主調用線程就會從AWAIT()函數返回,繼續后余動作。
一般來說,自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現的tryAcquire-tryRelease,tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現獨占和共享兩種方式,如的ReentrantReadWriteLock。
三自定義同步器實現
實現原則:要想去實現一個同步器的實現,那么我么的類應該去繼續AQS這個類,然后重寫其中的方法,對於獨占鎖而言,要實現tryAcquire,tryRelease(),如果要實現共享鎖,那么就要實現tryAcquireShared(),tryReleaseShared()這些方法。最后,在我們的組件中調用AQS中的模板方法就可以了,而這些模板方法是會調用到我們之前重寫的那些方法的。也就是說,我們只需要很小的工作量就可以實現自己的同步組件,重寫的那些方法,僅僅是一些簡單的對於共享資源狀態的獲取和釋放操作,至於像是獲取資源失敗,線程需要阻塞之類的操作,自然是AQS幫我們完成了。
設計思想:對於使用者來講,我們無需關心獲取資源失敗,線程排隊,線程阻塞/喚醒等一系列復雜的實現,這些都在AQS中為我們處理好了。我們只需要負責好自己的那個環節就好,也就是獲取/釋放共享資源狀態 姿勢T_T。很經典的模板方法設計模式的應用,AQS為我們定義好頂級邏輯的骨架,並提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列復雜邏輯的實現,將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現即可。
public class Mutex implements Serializable { private static final long serialVersionUID = -7213470713366791047L; //同步器對象
private final Sync sync = new Sync(); /** * 自定義同步器實現 * @author gosaint * */
private static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = 1L; //線程是否持有鎖,返回true表示鎖定狀態
@Override protected boolean isHeldExclusively() { return getState()==1; } /** * 以獨占的方式獲取鎖 */ @Override protected boolean tryAcquire(int acquires) { //當狀態為0 的時候開始獲取鎖,CAS成功之后狀態值修改為1
if(compareAndSetState(0, 1)){ //設置為當前線程為獨占鎖定狀態
setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } @Override protected boolean tryRelease(int releases) { assert releases == 1; // Otherwise unused 斷言執行
if (getState() == 0) throw new IllegalMonitorStateException(); setExclusiveOwnerThread(null); //設置共享資源的狀態為0,即釋放鎖的狀態
setState(0); return true; } }
實現解讀:對於上述的代碼,定義了一個內部類同步,繼承了類AQS,然后實現了其中的isHeldExclusively(),的tryAcquire(),tryRelease()下面的幾個方法包括鎖(),開鎖()等方法是鎖鎖中的方法,在這里我沒有實現鎖定接口看下調用過程吧:sync.acquire(1),調用這個方法之后,看看這個方法都干了些什么,看如下的代碼:
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } // acquireQueued(addWaiter(Node.EXCLUSIVE),arg)指定了獨占模式下的資源鎖定。調用了tryAcquire()方法,我們接着看看這個方法干了些什么? protected boolean tryAcquire(int arg) { throw new UnsupportedOperationException(); }
什么?這個方法里面竟然沒有任何的實現,我是看錯了嗎?沒有,其實這正是我第一次的內心的想法。我以為自己看錯了。經過反復的閱讀之后發現這樣的設計簡直是合情合理,回憶起來曾經觀看任小龍老師的視頻時講的模板方法。在AQS中不做任何的實現,因為AQS定義了或者規定了多線程訪問共享資源的策略。它有獨占鎖定和共享鎖定。因此方法的設計並不是抽象的,而是受保護的。它讓具體的實現自行選擇對應的策略去實現具體的方法,這正是模板方法閃閃發光的地方。這位設計者正是大名鼎鼎的Doug Lea的
同步器代碼測試:定義了30個線程,每個線程自增10000,正常情況下就是300000其實這種同步可以使用原子包下的AutomicInteger來完成的這里使用自定義同步器實現,為的是說明問題。
public class TestMutex { private static CyclicBarrier barrier = new CyclicBarrier(31); private static int a = 0; private static Mutex mutex = new Mutex(); public static void main(String []args) throws Exception { //說明:我們啟用30個線程,每個線程對i自加10000次,同步正常的話,最終結果應為300000; //未加鎖前
for(int i=0;i<30;i++){ Thread t = new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10000;i++){ increment1();//沒有同步措施的a++;
} try { barrier.await();//等30個線程累加完畢啟動線程
} catch (Exception e) { e.printStackTrace(); } } }); t.start(); } barrier.await(); System.out.println("加鎖前,a="+a); //加鎖后
barrier.reset();//重置CyclicBarrier
a=0; for(int i=0;i<30;i++){ new Thread(new Runnable() { @Override public void run() { for(int i=0;i<10000;i++){ increment2();//a++采用Mutex進行同步處理
} try { barrier.await();//等30個線程累加完畢
} catch (Exception e) { e.printStackTrace(); } } }).start(); } barrier.await(); System.out.println("加鎖后,a="+a); } /** * 沒有同步措施的a++ * @return
*/
public static void increment1(){ a++; } /** * 使用自定義的Mutex進行同步處理的a++ */
public static void increment2(){ mutex.lock(); a++; mutex.unlock(); } }
看如下的結果::
加鎖前,A = 283378
加鎖后,A = 300000
四源碼解析
CLH隊列:我們先來簡單描述下AQS的基本實現,前面我們提到過,AQS維護一個共享資源的狀態,通過內置的FIFO來完成獲取資源線程的排隊工作(這個內置的同步隊列稱為” CLH “隊列)。該隊列由一個一個的節點結點組成,每個節點結點維護一個先前引用和下一個引用,分別指向自己的前驅和后繼結點.AQS維護兩個指針,分別指向隊列頭部的頭和尾部尾這個在文章開始之前我就已經粘貼了一張圖片下面的圖片是我主要是針對這個線程排隊的雙向鏈表的示意圖。:
實質其就是一個雙向的鏈表
當線程通過的tryAcquire()設置狀態失敗的時候,就會進入CLH隊列,當持有同步狀態的線程釋放同步狀態時,就會喚醒后繼節點,然后此節點就會繼續假如。到同步狀態的爭奪當中在AQS中維持着一個節點的節點對象,來定義一個節點:
static final class Node { /** 等待狀態的值,表示線程已經被中斷或者取消 */
static final int CANCELLED = 1; /** waitStatus值,表示后續線程需要被喚醒 */
static final int SIGNAL = -1; /** waitStatus值,表示節點在condition上,當被signal后,會從等待隊列轉移到同步到隊列中 */
static final int CONDITION = -2; /**等待狀態的值,初始值為0*/
volatile int waitStatus; /**當前結點的前驅結點*/
volatile Node prev; /** 當前結點的后繼結點 */
volatile Node next; /** 與當前結點關聯的排隊中的線程 */
volatile Thread thread; }
獨占式
取同步狀態:
public final void acquire(long arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
一般的,我們的鎖會直接調用獲取()方法去獲取鎖,也就是同步狀態。此時會調用的tryAcquire()方法,我們一般自定義同步器的時候會重寫這個方法,去執行邏輯,獲取同步狀態。
流程::
1如果調用此時會調用的tryAcquire()方法獲取鎖的同步狀態成功,那么直接返回真,真就是假的,后續的邏輯不再執行如果獲取同步狀態失敗,那么進入2步驟!
2 acquireQueued (addWaiter(Node.EXCLUSIVE),arg)就是執行段代碼,當獲取同步狀態失敗之后,構造獨占式同步節點,並通過addWaiter()添加到同步隊列的尾部。(此時可能有多個線程需要添加至此隊列,這里通過的是CAS保證了添加過程中的安全性)
3該結點以在隊列中嘗試獲取同步狀態,若獲取不到,則阻塞結點線程,直到被前驅結點喚醒或者被中斷。
addWaiter
獲取同步狀態失敗的線程都要添加的CLH隊列。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
流程::
1個節點node =新節點(Thread.currentThread(),模式);構造獨占式節點
2節點預解碼值=尾;將當前節點作為尾節點,並且判斷原來的尾節點是否為空,不為空,
嘗試CAS的快速插入3否則進入enq()方法
private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize
if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
ENQ內部是個死循環,通過CAS設置尾結點,不成功就一直重試。很經典的CAS自旋的用法,我們在之前關於原子類的源碼分析中也提到過。這是一種樂觀的並發策略。
最后,看下acquireQueued方法
final boolean acquireQueued(final Node node, long arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC
failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
至此,關於獲取的方法源碼已經分析完畢,我們來簡單總結下
一個首先的tryAcquire獲取同步狀態,成功則直接返回;否則,進入下一環節;
B線程獲取同步狀態失敗,就構造一個結點,加入同步隊列中,這個過程要保證線程安全;
加入隊列中的結點線程進入自旋狀態,若是老二結點(即前驅結點為頭結點),才有機會嘗試去獲取同步狀態;否則,當其前驅結點的狀態為信號時,線程便可安心休息,進入阻塞狀態,直到被中斷或者被前驅結點喚醒。
釋放同步狀態-release(): :
當前線程執行完自己的邏輯之后,需要釋放同步狀態,來看看釋放方法的邏輯
private void unparkSuccessor(Node node) { //獲取wait狀態
int ws = node.waitStatus; if (ws < 0) compareAndSetWaitStatus(node, ws, 0);// 將等待狀態waitStatus設置為初始值0
Node s = node.next;//后繼結點
if (s == null || s.waitStatus > 0) {//若后繼結點為空,或狀態為CANCEL(已失效),則從后尾部往前遍歷找到一個處於正常阻塞狀態的結點 進行喚醒
s = null; for (Node t = tail; t != null && t != node; t = t.prev) if (t.waitStatus <= 0) s = t; } if (s != null) LockSupport.unpark(s.thread);//使用LockSupprot喚醒結點對應的線程
}
發布的同步狀態相對簡單,需要找到頭結點的后繼結點進行喚醒,若后繼結點為空或處於取消狀態,從后向前遍歷找尋一個正常的結點,喚醒其對應線程。
共享式
共享式:共享式地獲取同步狀態對於獨占式同步組件來講,同一時刻只有一個線程能獲取到同步狀態,其他線程都得去排隊等待,其待重寫的嘗試獲取同步狀態的方法的tryAcquire返回值為布爾值,這很容易理解。對於共享式同步組件來講,同一時刻可以有多個線程同時獲取到同步狀態,這也是“共享”的意義所在其待重寫的嘗試獲取同步狀態的方法tryAcquireShared返回值為int類型。
protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); }
1.當返回值大於0時,表示獲取同步狀態成功,同時還有剩余同步狀態可供其他線程獲取;
2.當返回值等於0時,表示獲取同步狀態成功,但沒有可用同步狀態了;
3 。當返回值小於0時,表示獲取同步狀態失敗。
獲取同步狀態-acquireShared ::
public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0)//返回值小於0,獲取同步狀態失敗,排隊去;獲取同步狀態成功,直接返回去干自己的事兒。
doAcquireShared(arg); }
doAcquireShared:
private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED);//構造一個共享結點,添加到同步隊列尾部。若隊列初始為空,先添加一個無意義的傀儡結點,再將新節點添加到隊列尾部。
boolean failed = true;//是否獲取成功
try { boolean interrupted = false;//線程parking過程中是否被中斷過
for (;;) {//死循環
final Node p = node.predecessor();//找到前驅結點
if (p == head) {//頭結點持有同步狀態,只有前驅是頭結點,才有機會嘗試獲取同步狀態
int r = tryAcquireShared(arg);//嘗試獲取同步裝填
if (r >= 0) {//r>=0,獲取成功
setHeadAndPropagate(node, r);//獲取成功就將當前結點設置為頭結點,若還有可用資源,傳播下去,也就是繼續喚醒后繼結點
p.next = null; // 方便GC
if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) &&//是否能安心進入parking狀態
parkAndCheckInterrupt())//阻塞線程
interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
釋放同步狀態releaseShared:
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared();//釋放同步狀態
return true; } return false; }
doReleaseShared:
private void doReleaseShared() { for (;;) {//死循環,共享模式,持有同步狀態的線程可能有多個,采用循環CAS保證線程安全
Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; unparkSuccessor(h);//喚醒后繼結點
} else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; } if (h == head) break; } }
代碼邏輯比較容易理解,需要注意的是,共享模式,釋放同步狀態也是多線程的,此處采用了CAS自旋來保證。
總結:
關於AQS的介紹及源碼分析到此為止了.AQS
是JUC 中很多同步組件的構建基礎,簡單來講,它內部實現主要是狀態變量state和一個FIFO隊列來完成,同步隊列的頭結點是當前獲取到同步狀態的結點,獲取同步狀態狀態失敗的線程,會被構造成一個結點(或共享式或獨占式)加入到同步隊列尾部(采用自旋CAS來保證此操作的線程安全),隨后線程會阻塞;釋放時喚醒頭結點的后繼結點,使其加入對同步狀態的爭奪中
AQS為我們定義好了頂層的處理實現邏輯,我們在使用AQS構建符合我們需求的同步組件時,只需重寫的tryAcquire,tryAcquireShared,tryRelease,tryReleaseShared幾個方法,來決定同步狀態的釋放和獲取即可,至於背后復雜的線程排隊,線程阻塞/喚醒,如何保證線程安全,都由AQS為我們完成了,這也是非常典型的模板方法的應用.AQS定義好頂級邏輯的骨 ,並提取出公用的線程入隊列/出隊列,阻塞/喚醒等一系列復雜邏輯的實現,將部分簡單的可由使用者決定的操作邏輯延遲到子類中去實現。