一、引子
Java.util.concurrent包都是Doug Lea寫的,來混個眼熟
是的,就是他,提出了JSR166(Java Specification RequestsJava 規范提案),該規范的核心就是AbstractQueuedSynchronizer同步器框架(AQS)。這個框架為構造同步器提供一種通用的機制,並且被j.u.c包中大部分類使用。
包結構如下圖,其中AbstractOwnableSynchronizer是其父類,而AbstractQueuedLongSynchronizer是其32位狀態的升級版64位的實現,適用於多級屏障(CyclicBarrier
)。
AQS的繼承關系如下圖,可見老李頭對它多重視了。老李頭的論文解析飛機票:《The java.util.concurrent Synchronizer Framework》 JUC同步器框架(AQS框架)原文翻譯 。
二、AQS架構設計原理
2.1 需求分析
為了使框架能得到廣泛應用,AQS同步器定義兩種資源共享方式:
Exclusive:獨占模式,同時只有一個線程能執行,如ReentrantLock
Share:共享模式,多個線程可同時執行,如Semaphore/CountDownLatch。
一般來說,自定義同步器要么是獨占方法,要么是共享方式,他們也只需實現tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一種即可。但AQS也支持自定義同步器同時實現獨占和共享兩種方式,如ReentrantReadWriteLock。
自定義同步器在實現時只需要實現共享資源state的獲取與釋放方式即可,至於具體線程等待隊列的維護(如獲取資源失敗入隊/喚醒出隊等),AQS已經在頂層實現好了。自定義同步器實現時主要實現以下幾種方法:
- isHeldExclusively():該線程是否正在獨占資源。只有用到condition才需要去實現它。
- tryAcquire(int):獨占方式。嘗試獲取資源,成功則返回true,失敗則返回false。
- tryRelease(int):獨占方式。嘗試釋放資源,成功則返回true,失敗則返回false。
- tryAcquireShared(int):共享方式。嘗試獲取資源。負數表示失敗;0表示成功,但沒有剩余可用資源;正數表示成功,且有剩余資源。
- tryReleaseShared(int):共享方式。嘗試釋放資源,成功則返回true,失敗則返回false。
AQS為了實現上述操作,需要下面三個基本組件的相互協作:
- 同步狀態的原子性管理;
- 線程的阻塞與解除阻塞;
- 隊列的管理;
2.2 同步狀態的原子性管理
state字段, 用於同步線程之間的共享狀態。通過 CAS 和 volatile 保證其原子性和可見性。
如下圖:
1)volatile修飾state:
線程內的工作內存修改數據后會強制刷新到主存中去,且使其他線程中的工作內存中的該變量失效,下次只能從主存讀取。實現了多線程數據可見性。
2)CAS操作state:
unsafe.compareAndSwapInt(this, stateOffset, expect, update); 根據對象的state同步狀態偏移量是否和expect值相同,相同則更新。標准的CAS操作。unsafe飛機票:在openjdk8下看Unsafe源碼
2.3 線程的阻塞與解除阻塞
利用LockSupport.park() 和 LockSupport.unpark() 實現線程的阻塞和喚醒(底層調用Unsafe的native park和unpark實現),同時支持超時時間。
2.4 隊列的管理
根據論文里描述, AQS 里將阻塞線程封裝到一個內部類 Node 里。並維護一個 CLH Node FIFO 隊列。 CLH隊列是一個非阻塞的 FIFO 隊列,也就是說往里面插入或移除一個節點的時候,在並發條件下不會阻塞,而是通過自旋鎖和 CAS 保證節點插入和移除的原子性。AQS里的CLH是一個雙向鏈表,數據結構如下圖:
node數據結構,后續加上。
三、AQS源碼實現
本節開始講解AQS的源碼實現。依照acquire-release、acquireShared-releaseShared的次序來。
3.1 acquire(int)獨占模式獲取資源
此方法是獨占模式下線程獲取共享資源的頂層入口。如果獲取到資源,線程直接返回,否則進入等待隊列,直到獲取到資源為止,且整個過程忽略中斷的影響。這也正是lock()的語義,當然不僅僅只限於lock()。獲取到資源后,線程就可以去執行其臨界區代碼了。下面是acquire()的源碼:
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
函數流程如下:
-
- tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
- addWaiter()將該線程加入等待隊列的尾部,並標記為獨占模式;
- acquireQueued()使線程在等待隊列中獲取資源,一直獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
- 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
這時單憑這4個抽象的函數來看流程還有點朦朧,不要緊,看完接下來的分析后,你就會明白了。就像《大話西游》里唐僧說的:等你明白了舍生取義的道理,你自然會回來和我唱這首歌的。
3.1.1 tryAcquire(int)
此方法嘗試去獲取獨占資源。如果獲取成功,則直接返回true,否則直接返回false。這也正是tryLock()的語義,還是那句話,當然不僅僅只限於tryLock()。如下是tryAcquire()的源碼:
1 protected boolean tryAcquire(int arg) { 2 throw new UnsupportedOperationException(); 3 }
什么?直接throw異常?說好的功能呢?好吧,還記得概述里講的AQS只是一個框架,具體資源的獲取/釋放方式交由自定義同步器去實現嗎?就是這里了!!!AQS這里只定義了一個接口,具體資源的獲取交由自定義同步器去實現了(通過state的get/set/CAS)!!!至於能不能重入,能不能加塞,那就看具體的自定義同步器怎么去設計了!!!當然,自定義同步器在進行資源訪問時要考慮線程安全的影響。
這里之所以沒有定義成abstract,是因為獨占模式下只用實現tryAcquire-tryRelease,而共享模式下只用實現tryAcquireShared-tryReleaseShared。如果都定義成abstract,那么每個模式也要去實現另一模式下的接口。說到底,Doug Lea還是站在咱們開發者的角度,盡量減少不必要的工作量。
3.1.2 addWaiter(Node)
此方法用於將當前線程加入到等待隊列的隊尾,並返回當前線程所在的結點。還是上源碼吧:
1 private Node addWaiter(Node mode) { 2 //以給定模式構造結點。mode有兩種:EXCLUSIVE(獨占)和SHARED(共享) 3 Node node = new Node(Thread.currentThread(), mode); 4 5 //嘗試快速方式直接放到隊尾。 6 Node pred = tail; 7 if (pred != null) { 8 node.prev = pred; 9 if (compareAndSetTail(pred, node)) { 10 pred.next = node; 11 return node; 12 } 13 } 14 15 //上一步失敗則通過enq入隊。 16 enq(node); 17 return node; 18 }
不用再說了,直接看注釋吧。
3.1.2.1 enq(Node)
此方法用於將node加入隊尾。源碼如下:
1 private Node enq(final Node node) { 2 //CAS"自旋",直到成功加入隊尾 3 for (;;) { 4 Node t = tail; 5 if (t == null) { // 隊列為空,創建一個空的標志結點作為head結點,並將tail也指向它。 6 if (compareAndSetHead(new Node())) 7 tail = head; 8 } else {//正常流程,放入隊尾 9 node.prev = t; 10 if (compareAndSetTail(t, node)) { 11 t.next = node; 12 return t; 13 } 14 } 15 } 16 }
如果你看過AtomicInteger.getAndIncrement()函數源碼,那么相信你一眼便看出這段代碼的精華。CAS自旋volatile變量,是一種很經典的用法。還不太了解的,自己去百度一下吧。
3.1.3 acquireQueued(Node, int)
OK,通過tryAcquire()和addWaiter(),該線程獲取資源失敗,已經被放入等待隊列尾部了。聰明的你立刻應該能想到該線程下一部該干什么了吧:進入等待狀態休息,直到其他線程徹底釋放資源后喚醒自己,自己再拿到資源,然后就可以去干自己想干的事了。沒錯,就是這樣!是不是跟醫院排隊拿號有點相似~~acquireQueued()就是干這件事:在等待隊列中排隊拿號(中間沒其它事干可以休息),直到拿到號后再返回。這個函數非常關鍵,還是上源碼吧:
1 final boolean acquireQueued(final Node node, int arg) { 2 boolean failed = true;//標記是否成功拿到資源 3 try { 4 boolean interrupted = false;//標記等待過程中是否被中斷過 5 6 //又是一個“自旋”! 7 for (;;) { 8 final Node p = node.predecessor();//拿到前驅 9 //如果前驅是head,即該結點已成老二,那么便有資格去嘗試獲取資源(可能是老大釋放完資源喚醒自己的,當然也可能被interrupt了)。 10 if (p == head && tryAcquire(arg)) { 11 setHead(node);//拿到資源后,將head指向該結點。所以head所指的標桿結點,就是當前獲取到資源的那個結點或null。 12 p.next = null; // setHead中node.prev已置為null,此處再將head.next置為null,就是為了方便GC回收以前的head結點。也就意味着之前拿完資源的結點出隊了! 13 failed = false; 14 return interrupted;//返回等待過程中是否被中斷過 15 } 16 17 //如果自己可以休息了,就進入waiting狀態,直到被unpark() 18 if (shouldParkAfterFailedAcquire(p, node) && 19 parkAndCheckInterrupt()) 20 interrupted = true;//如果等待過程中被中斷過,哪怕只有那么一次,就將interrupted標記為true 21 } 22 } finally { 23 if (failed) 24 cancelAcquire(node); 25 } 26 }
到這里了,我們先不急着總結acquireQueued()的函數流程,先看看shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()具體干些什么。
3.1.3.1 shouldParkAfterFailedAcquire(Node, Node)
此方法主要用於檢查狀態,看看自己是否真的可以去休息了(進入waiting狀態,如果線程狀態轉換不熟,可以參考本人上一篇寫的Thread詳解),萬一隊列前邊的線程都放棄了只是瞎站着,那也說不定,對吧!
1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { 2 int ws = pred.waitStatus;//拿到前驅的狀態 3 if (ws == Node.SIGNAL) 4 //如果已經告訴前驅拿完號后通知自己一下,那就可以安心休息了 5 return true; 6 if (ws > 0) { 7 /* 8 * 如果前驅放棄了,那就一直往前找,直到找到最近一個正常等待的狀態,並排在它的后邊。 9 * 注意:那些放棄的結點,由於被自己“加塞”到它們前邊,它們相當於形成一個無引用鏈,稍后就會被保安大叔趕走了(GC回收)! 10 */ 11 do { 12 node.prev = pred = pred.prev; 13 } while (pred.waitStatus > 0); 14 pred.next = node; 15 } else { 16 //如果前驅正常,那就把前驅的狀態設置成SIGNAL,告訴它拿完號后通知自己一下。有可能失敗,人家說不定剛剛釋放完呢! 17 compareAndSetWaitStatus(pred, ws, Node.SIGNAL); 18 } 19 return false; 20 }
整個流程中,如果前驅結點的狀態不是SIGNAL,那么自己就不能安心去休息,需要去找個安心的休息點,同時可以再嘗試下看有沒有機會輪到自己拿號。
3.1.3.2 parkAndCheckInterrupt()
如果線程找好安全休息點后,那就可以安心去休息了。此方法就是讓線程去休息,真正進入等待狀態。
1 private final boolean parkAndCheckInterrupt() { 2 LockSupport.park(this);//調用park()使線程進入waiting狀態 3 return Thread.interrupted();//如果被喚醒,查看自己是不是被中斷的。 4 }
park()會讓當前線程進入waiting狀態。在此狀態下,有兩種途徑可以喚醒該線程:1)被unpark();2)被interrupt()。(再說一句,如果線程狀態轉換不熟,可以參考本人寫的Thread詳解)。需要注意的是,Thread.interrupted()會清除當前線程的中斷標記位。
3.1.3.3 小結
OK,看了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt(),現在讓我們再回到acquireQueued(),總結下該函數的具體流程:
- 結點進入隊尾后,檢查狀態,找到安全休息點;
- 調用park()進入waiting狀態,等待unpark()或interrupt()喚醒自己;
- 被喚醒后,看自己是不是有資格能拿到號。如果拿到,head指向當前結點,並返回從入隊到拿到號的整個過程中是否被中斷過;如果沒拿到,繼續流程1。
3.1.4 小結
OKOK,acquireQueued()分析完之后,我們接下來再回到acquire()!再貼上它的源碼吧:
1 public final void acquire(int arg) { 2 if (!tryAcquire(arg) && 3 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) 4 selfInterrupt(); 5 }
再來總結下它的流程吧:
- 調用自定義同步器的tryAcquire()嘗試直接去獲取資源,如果成功則直接返回;
- 沒成功,則addWaiter()將該線程加入等待隊列的尾部,並標記為獨占模式;
- acquireQueued()使線程在等待隊列中休息,有機會時(輪到自己,會被unpark())會去嘗試獲取資源。獲取到資源后才返回。如果在整個等待過程中被中斷過,則返回true,否則返回false。
- 如果線程在等待過程中被中斷過,它是不響應的。只是獲取資源后才再進行自我中斷selfInterrupt(),將中斷補上。
由於此函數是重中之重,我再用流程圖總結一下:
至此,acquire()的流程終於算是告一段落了。這也就是ReentrantLock.lock()的流程,不信你去看其lock()源碼吧,整個函數就是一條acquire(1)!!!
3.2 release(int)獨占模式釋放資源
上一小節已經把acquire()說完了,這一小節就來講講它的反操作release()吧。此方法是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。這也正是unlock()的語義,當然不僅僅只限於unlock()。下面是release()的源碼:
1 public final boolean release(int arg) { 2 if (tryRelease(arg)) { 3 Node h = head;//找到頭結點 4 if (h != null && h.waitStatus != 0) 5 unparkSuccessor(h);//喚醒等待隊列里的下一個線程 6 return true; 7 } 8 return false; 9 }
邏輯並不復雜。它調用tryRelease()來釋放資源。有一點需要注意的是,它是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自定義同步器在設計tryRelease()的時候要明確這一點!!
3.2.1 tryRelease(int)
此方法嘗試去釋放指定量的資源。下面是tryRelease()的源碼:
1 protected boolean tryRelease(int arg) { 2 throw new UnsupportedOperationException(); 3 }
跟tryAcquire()一樣,這個方法是需要獨占模式的自定義同步器去實現的。正常來說,tryRelease()都會成功的,因為這是獨占模式,該線程來釋放資源,那么它肯定已經拿到獨占資源了,直接減掉相應量的資源即可(state-=arg),也不需要考慮線程安全的問題。但要注意它的返回值,上面已經提到了,release()是根據tryRelease()的返回值來判斷該線程是否已經完成釋放掉資源了!所以自義定同步器在實現時,如果已經徹底釋放資源(state=0),要返回true,否則返回false。
3.2.2 unparkSuccessor(Node)
此方法用於喚醒等待隊列中下一個線程。下面是源碼:
1 private void unparkSuccessor(Node node) { 2 //這里,node一般為當前線程所在的結點。 3 int ws = node.waitStatus; 4 if (ws < 0)//置零當前線程所在的結點狀態,允許失敗。 5 compareAndSetWaitStatus(node, ws, 0); 6 7 Node s = node.next;//找到下一個需要喚醒的結點s 8 if (s == null || s.waitStatus > 0) {//如果為空或已取消 9 s = null; 10 for (Node t = tail; t != null && t != node; t = t.prev) 11 if (t.waitStatus <= 0)//從這里可以看出,<=0的結點,都是還有效的結點。 12 s = t; 13 } 14 if (s != null) 15 LockSupport.unpark(s.thread);//喚醒 16 }
這個函數並不復雜。一句話概括:用unpark()喚醒等待隊列中最前邊的那個未放棄線程,這里我們也用s來表示吧。此時,再和acquireQueued()聯系起來,s被喚醒后,進入if (p == head && tryAcquire(arg))的判斷(即使p!=head也沒關系,它會再進入shouldParkAfterFailedAcquire()尋找一個安全點。這里既然s已經是等待隊列中最前邊的那個未放棄線程了,那么通過shouldParkAfterFailedAcquire()的調整,s也必然會跑到head的next結點,下一次自旋p==head就成立啦),然后s把自己設置成head標桿結點,表示自己已經獲取到資源了,acquire()也返回了!!And then, DO what you WANT!
3.2.3 小結
release()是獨占模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。
3.3 acquireShared(int)共享模式獲取資源
此方法是共享模式下線程獲取共享資源的頂層入口。它會獲取指定量的資源,獲取成功則直接返回,獲取失敗則進入等待隊列,直到獲取到資源為止,整個過程忽略中斷。下面是acquireShared()的源碼:
1 public final void acquireShared(int arg) { 2 if (tryAcquireShared(arg) < 0) 3 doAcquireShared(arg); 4 }
這里tryAcquireShared()依然需要自定義同步器去實現。但是AQS已經把其返回值的語義定義好了:負值代表獲取失敗;0代表獲取成功,但沒有剩余資源;正數表示獲取成功,還有剩余資源,其他線程還可以去獲取。所以這里acquireShared()的流程就是:
-
- tryAcquireShared()嘗試獲取資源,成功則直接返回;
- 失敗則通過doAcquireShared()進入等待隊列,直到獲取到資源為止才返回。
3.3.1 doAcquireShared(int)
此方法用於將當前線程加入等待隊列尾部休息,直到其他線程釋放資源喚醒自己,自己成功拿到相應量的資源后才返回。下面是doAcquireShared()的源碼:
1 private void doAcquireShared(int arg) { 2 final Node node = addWaiter(Node.SHARED);//加入隊列尾部 3 boolean failed = true;//是否成功標志 4 try { 5 boolean interrupted = false;//等待過程中是否被中斷過的標志 6 for (;;) { 7 final Node p = node.predecessor();//前驅 8 if (p == head) {//如果到head的下一個,因為head是拿到資源的線程,此時node被喚醒,很可能是head用完資源來喚醒自己的 9 int r = tryAcquireShared(arg);//嘗試獲取資源 10 if (r >= 0) {//成功 11 setHeadAndPropagate(node, r);//將head指向自己,還有剩余資源可以再喚醒之后的線程 12 p.next = null; // help GC 13 if (interrupted)//如果等待過程中被打斷過,此時將中斷補上。 14 selfInterrupt(); 15 failed = false; 16 return; 17 } 18 } 19 20 //判斷狀態,尋找安全點,進入waiting狀態,等着被unpark()或interrupt() 21 if (shouldParkAfterFailedAcquire(p, node) && 22 parkAndCheckInterrupt()) 23 interrupted = true; 24 } 25 } finally { 26 if (failed) 27 cancelAcquire(node); 28 } 29 }
有木有覺得跟acquireQueued()很相似?對,其實流程並沒有太大區別。只不過這里將補中斷的selfInterrupt()放到doAcquireShared()里了,而獨占模式是放到acquireQueued()之外,其實都一樣,不知道Doug Lea是怎么想的。
跟獨占模式比,還有一點需要注意的是,這里只有線程是head.next時(“老二”),才會去嘗試獲取資源,有剩余的話還會喚醒之后的隊友。那么問題就來了,假如老大用完后釋放了5個資源,而老二需要6個,老三需要1個,老四需要2個。因為老大先喚醒老二,老二一看資源不夠自己用繼續park(),也更不會去喚醒老三和老四了。獨占模式,同一時刻只有一個線程去執行,這樣做未嘗不可;但共享模式下,多個線程是可以同時執行的,現在因為老二的資源需求量大,而把后面量小的老三和老四也都卡住了。
3.3.1.1 setHeadAndPropagate(Node, int)
1 private void setHeadAndPropagate(Node node, int propagate) { 2 Node h = head; 3 setHead(node);//head指向自己 4 //如果還有剩余量,繼續喚醒下一個鄰居線程 5 if (propagate > 0 || h == null || h.waitStatus < 0) { 6 Node s = node.next; 7 if (s == null || s.isShared()) 8 doReleaseShared(); 9 } 10 }
此方法在setHead()的基礎上多了一步,就是自己蘇醒的同時,如果條件符合(比如還有剩余資源),還會去喚醒后繼結點,畢竟是共享模式!
doReleaseShared()我們留着下一小節的releaseShared()里來講。
3.3.2 小結
OK,至此,acquireShared()也要告一段落了。讓我們再梳理一下它的流程:
- tryAcquireShared()嘗試獲取資源,成功則直接返回;
- 失敗則通過doAcquireShared()進入等待隊列park(),直到被unpark()/interrupt()並成功獲取到資源才返回。整個等待過程也是忽略中斷的。
其實跟acquire()的流程大同小異,只不過多了個自己拿到資源后,還會去喚醒后繼隊友的操作(這才是共享嘛)。
3.4 releaseShared()共享模式釋放資源
上一小節已經把acquireShared()說完了,這一小節就來講講它的反操作releaseShared()吧。此方法是共享模式下線程釋放共享資源的頂層入口。它會釋放指定量的資源,如果徹底釋放了(即state=0),它會喚醒等待隊列里的其他線程來獲取資源。下面是releaseShared()的源碼:
1 public final boolean releaseShared(int arg) { 2 if (tryReleaseShared(arg)) {//嘗試釋放資源 3 doReleaseShared();//喚醒后繼結點 4 return true; 5 } 6 return false; 7 }
此方法的流程也比較簡單,一句話:釋放掉資源后,喚醒后繼。跟獨占模式下的release()相似,但有一點稍微需要注意:獨占模式下的tryRelease()在完全釋放掉資源(state=0)后,才會返回true去喚醒其他線程,這主要是基於可重入的考量;而共享模式下的releaseShared()則沒有這種要求,一是共享的實質--多線程可並發執行;二是共享模式基本也不會重入吧(至少我還沒見過),所以自定義同步器可以根據需要決定返回值。
3.4.1 doReleaseShared()
此方法主要用於喚醒后繼。下面是它的源碼:
1 private void doReleaseShared() { 2 for (;;) { 3 Node h = head; 4 if (h != null && h != tail) { 5 int ws = h.waitStatus; 6 if (ws == Node.SIGNAL) {//如果頭結點狀態是signal,即需要喚醒后繼節點 7 if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//CAS一下如果當前狀態是signal則重置為0,否則退出當前循環進入下次循環 8 continue; 9 unparkSuccessor(h);//喚醒后繼 10 } 11 else if (ws == 0 && 12 !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//如果頭結點狀態是0且CAS成功狀態重置為傳播失敗了,退出當前循環進入下次循環 13 continue; 14 } 15 if (h == head)// head發生變化 16 break; 17 } 18 }
3.5 小結
本節我們詳解了獨占和共享兩種模式下獲取-釋放資源(acquire-release、acquireShared-releaseShared)的源碼,相信大家都有一定認識了。值得注意的是,acquire()和acquireSahred()兩種方法下,線程在等待隊列中都是忽略中斷的。AQS也支持響應中斷的,acquireInterruptibly()/acquireSharedInterruptibly()即是,這里相應的源碼跟acquire()和acquireSahred()差不多,這里就不再詳解了。
四、簡單應用
下面我們就以AQS源碼里的Mutex為例,講一下AQS的簡單應用。
同步類自己(Mutex)則實現某個接口,對外服務。同步類在實現時一般都將自定義同步器(sync)定義為內部類,只用實現state的獲取-釋放方式tryAcquire-tryRelelase,至於線程的排隊、等待、喚醒等,上層的AQS都已經實現好了,我們不用關心。
1 class Mutex implements Lock, java.io.Serializable { 2 // 自定義同步器 3 private static class Sync extends AbstractQueuedSynchronizer { 4 // 判斷是否鎖定狀態 5 protected boolean isHeldExclusively() { 6 return getState() == 1; 7 } 8 9 // 嘗試獲取資源,立即返回。成功則返回true,否則false。 10 public boolean tryAcquire(int acquires) { 11 assert acquires == 1; // 這里限定只能為1個量 12 if (compareAndSetState(0, 1)) {//state為0才設置為1,不可重入! 13 setExclusiveOwnerThread(Thread.currentThread());//設置為當前線程獨占資源 14 return true; 15 } 16 return false; 17 } 18 19 // 嘗試釋放資源,立即返回。成功則為true,否則false。 20 protected boolean tryRelease(int releases) { 21 assert releases == 1; // 限定為1個量 22 if (getState() == 0)//既然來釋放,那肯定就是已占有狀態了。只是為了保險,多層判斷! 23 throw new IllegalMonitorStateException(); 24 setExclusiveOwnerThread(null); 25 setState(0);//釋放資源,放棄占有狀態 26 return true; 27 } 28 } 29 30 // 真正同步類的實現都依賴繼承於AQS的自定義同步器! 31 private final Sync sync = new Sync(); 32 33 //lock<-->acquire。兩者語義一樣:獲取資源,即便等待,直到成功才返回。 34 public void lock() { 35 sync.acquire(1); 36 } 37 38 //tryLock<-->tryAcquire。兩者語義一樣:嘗試獲取資源,要求立即返回。成功則為true,失敗則為false。 39 public boolean tryLock() { 40 return sync.tryAcquire(1); 41 } 42 43 //unlock<-->release。兩者語文一樣:釋放資源。 44 public void unlock() { 45 sync.release(1); 46 } 47 48 //鎖是否占有狀態 49 public boolean isLocked() { 50 return sync.isHeldExclusively(); 51 } 52 }
五、總結
公共方法 |
子類需要自定義的方法(AQS中默認返回異常,子類覆蓋實現) |
子類可直接使用的方法 |
|
獨占模式 | CAS操作節點、state同步狀態 compareAndSetState 設置同步狀態 compareAndSetHead 設置head節點
|
protected boolean tryAcquire(int arg)獲取資源 protected boolean tryRelease(int arg) 釋放資源 protected boolean isHeldExclusively()該線程是否正在獨占資源。 |
AbstractOwnableSynchronizer是AQS的父類,繼承AQS類自然繼承了AbstractOwnableSynchronizer, 方法: protected final Thread getExclusiveOwnerThread()獲取當前獨占線程 |
共享模式 | protected int tryAcquireShared(int arg)獲取資源 protected boolean tryReleaseShared(int arg) 釋放資源 |
了解了老李頭的AQS,再去看JUC下的類就簡單明了啦,如下:
1.獨占模式:
ReentrantLock:可重入鎖。state=0獨占鎖,或者同一線程可多次獲取鎖(獲取+1,釋放-1)。
Worker(java.util.concurrent.ThreadPoolExecutor類中的內部類)線程池類。shutdown關閉空閑工作線程,中斷worker工作線程是獨占的,互斥的。
2.共享模式:
Semaphore:信號量。 控制同時有多少個線程可以進入代碼段。(互斥鎖的拓展)
CountDownLatch:倒計時器。 初始化一個值,多線程減少這個值,直到為0,倒計時完畢,執行后續代碼。
3.獨占+共享模式:
ReentrantReadWriteLock:可重入讀寫鎖。獨占寫+共享讀,即並發讀,互斥寫。
========參考=================
1.《The java.util.concurrent Synchronizer Framework》
2.http://singleant.iteye.com/blog/1418580