申明:此篇文章轉載自:https://juejin.im/post/5c11d6376fb9a049e82b6253
寫的真的很棒,感謝老錢的分享。
打通 Java 任督二脈 —— 並發數據結構的基石
每一個 Java 的高級程序員在體驗過多線程程序開發之后,都需要問自己一個問題,Java 內置的鎖是如何實現的?最常用的最簡單的鎖要數 ReentrantLock,使用它加鎖時如果沒有立即加成功,就會阻塞當前的線程等待其它線程釋放鎖之后再重新嘗試加鎖,那線程是如何實現阻塞自己的?其它線程釋放鎖之后又是如果喚醒當前線程的?當前線程是如何得出自己沒有加鎖成功這一結論的?本篇內容將會從根源上回答上面提到的所有問題
線程阻塞原語
Java 的線程阻塞和喚醒是通過 Unsafe 類的 park 和 unpark 方法做到的。
public class Unsafe { ... public native void park(boolean isAbsolute, long time); public native void unpark(Thread t); ... } 復制代碼
這兩個方法都是 native 方法,它們本身是由 C 語言來實現的核心功能。park 的意思是停車,讓當前運行的線程 Thread.currentThread() 休眠,unpark 的意思是解除停車,喚醒指定線程。這兩個方法在底層是使用操作系統提供的信號量機制來實現的。具體實現過程要深究 C 代碼,這里暫時不去具體分析。park 方法的兩個參數用來控制休眠多長時間,第一個參數 isAbsolute 表示第二個參數是絕對時間還是相對時間,單位是毫秒。
線程從啟動開始就會一直跑,除了操作系統的任務調度策略外,它只有在調用 park 的時候才會暫停運行。鎖可以暫停線程的奧秘所在正是因為鎖在底層調用了 park 方法。
parkBlocker
線程對象 Thread 里面有一個重要的屬性 parkBlocker,它保存當前線程因為什么而 park。就好比停車場上停了很多車,這些車主都是來參加一場拍賣會的,等拍下自己想要的物品后,就把車開走。那么這里的 parkBlocker 大約就是指這場「拍賣會」。它是一系列沖突線程的管理者協調者,哪個線程該休眠該喚醒都是由它來控制的。
class Thread { ... volatile Object parkBlocker; ... } 復制代碼
當線程被 unpark 喚醒后,這個屬性會被置為 null。Unsafe.park 和 unpark 並不會幫我們設置 parkBlocker 屬性,負責管理這個屬性的工具類是 LockSupport,它對 Unsafe 這兩個方法進行了簡單的包裝。
class LockSupport { ... public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); U.park(false, 0L); setBlocker(t, null); // 醒來后置null } public static void unpark(Thread thread) { if (thread != null) U.unpark(thread); } } ... }
復制代碼
Java 的鎖數據結構正是通過調用 LockSupport 來實現休眠與喚醒的。線程對象里面的 parkBlocker 字段的值就是下面我們要講的「排隊管理器」。
排隊管理器
當多個線程爭用同一把鎖時,必須有排隊機制將那些沒能拿到鎖的線程串在一起。當鎖釋放時,鎖管理器就會挑選一個合適的線程來占有這個剛剛釋放的鎖。每一把鎖內部都會有這樣一個隊列管理器,管理器里面會維護一個等待的線程隊列。ReentrantLock 里面的隊列管理器是 AbstractQueuedSynchronizer,它內部的等待隊列是一個雙向列表結構,列表中的每個節點的結構如下。
class AbstractQueuedSynchronizer { volatile Node head; // 隊頭線程將優先獲得鎖 volatile Node tail; // 搶鎖失敗的線程追加到隊尾 volatile int state; // 鎖計數 } class Node { Node prev; Node next; Thread thread; // 每個節點一個線程 // 下面這兩個特殊字段可以先不去理解 Node nextWaiter; // 請求的是共享鎖還是獨占鎖 int waitStatus; // 精細狀態描述字 }
復制代碼
加鎖不成功時,當前的線程就會把自己納入到等待鏈表的尾部,然后調用 LockSupport.park 將自己休眠。其它線程解鎖時,會從鏈表的表頭取一個節點,調用 LockSupport.unpark 喚醒它。
AbstractQueuedSynchronizer 類是一個抽象類,它是所有的鎖隊列管理器的父類,JDK 中的各種形式的鎖其內部的隊列管理器都繼承了這個類,它是 Java 並發世界的核心基石。比如 ReentrantLock、ReadWriteLock、CountDownLatch、Semaphore、ThreadPoolExecutor 內部的隊列管理器都是它的子類。這個抽象類暴露了一些抽象方法,每一種鎖都需要對這個管理器進行定制。而 JDK 內置的所有並發數據結構都是在這些鎖的保護下完成的,它是JDK 多線程高樓大廈的地基。
鎖管理器維護的只是一個普通的雙向列表形式的隊列,這個數據結構很簡單,但是仔細維護起來卻相當復雜,因為它需要精細考慮多線程並發問題,每一行代碼都寫的無比小心。
JDK 鎖管理器的實現者是 Douglas S. Lea,Java 並發包幾乎全是他單槍匹馬寫出來的,在算法的世界里越是精巧的東西越是適合一個人來做。
Douglas S. Lea是紐約州立大學奧斯威戈分校計算機科學教授和現任計算機科學系主任,專門研究並發編程和並發數據結構的設計。他是Java Community Process的執行委員會成員,主持JSR 166,它為Java編程語言添加了並發實用程序。
后面我們將 AbstractQueuedSynchronizer 簡寫成 AQS。我必須提醒各位讀者,AQS 太復雜了,如果在理解它的路上遇到了挫折,這很正常。目前市場上並不存在一本可以輕松理解 AQS 的書籍,能夠吃透 AQS 的人太少太少,我自己也不算。
公平鎖與非公平鎖
公平鎖會確保請求鎖和獲得鎖的順序,如果在某個點鎖正處於自由狀態,這時有一個線程要嘗試加鎖,公平鎖還必須查看當前有沒有其它線程排在排隊,而非公平鎖可以直接插隊。聯想一下在肯德基買漢堡時的排隊場景。
也許你會問,如果某個鎖處於自由狀態,那它怎么會有排隊的線程呢?我們假設此刻持有鎖的線程剛剛釋放了鎖,它喚醒了等待隊列中第一個節點線程,這時候被喚醒的線程剛剛從 park 方法返回,接下來它就會嘗試去加鎖,那么從 park 返回到加鎖之間的狀態就是鎖的自由態,這很短暫,而這短暫的時間內還可能有其它線程也在嘗試加鎖。
其次還有一點需要注意,執行了 Lock.park 方法的線程自我休眠后,並不是非要等到其它線程 unpark 了自己才會醒來,它可能隨時會以某種未知的原因醒來。我們看源碼注釋,park 返回的原因有四種
- 其它線程 unpark 了當前線程
- 時間到了自然醒(park 有時間參數)
- 其它線程 interrupt 了當前線程
- 其它未知原因導致的「假醒」
文檔中沒有明確說明何種未知原因會導致假醒,它倒是說明了當 park 方法返回時並不意味着鎖自由了,醒過來的線程在重新嘗試獲取鎖失敗后將會再次 park 自己。所以加鎖的過程需要寫在一個循環里,在成功拿到鎖之前可能會進行多次嘗試。
計算機世界非公平鎖的服務效率要高於公平鎖,所以 Java 默認的鎖都使用了非公平鎖。不過現實世界似乎非公平鎖的效率會差一點,比如在肯德基如果可以不停插隊,你可以想象現場肯定一片混亂。為什么計算機世界和現實世界會有差異,大概是因為在計算機世界里某個線程插隊並不會導致其它線程抱怨。
public ReentrantLock() { this.sync = new NonfairSync(); } public ReentrantLock(boolean fair) { this.sync = fair ? new FairSync() : new NonfairSync(); }
復制代碼
共享鎖與排他鎖
ReentrantLock 的鎖是排他鎖,一個線程持有,其它線程都必須等待。而 ReadWriteLock 里面的讀鎖不是排他鎖,它允許多線程同時持有讀鎖,這是共享鎖。共享鎖和排他鎖是通過 Node 類里面的 nextWaiter 字段區分的。
class AQS { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; boolean isShared() { return this.nextWaiter == SHARED; } }
復制代碼
那為什么這個字段沒有命名成 mode 或者 type 或者干脆直接叫 shared?這是因為 nextWaiter 在其它場景還有不一樣的用途,它就像 C 語言聯合類型的字段一樣隨機應變,只不過 Java 語言沒有聯合類型。
條件變量
關於條件變量,需要提出的第一個問題是為什么需要條件變量,只有鎖還不夠么?考慮下面的偽代碼,當某個條件滿足時,才去干某件事
void doSomething() { locker.lock(); while(!condition_is_true()) { // 先看能不能搞事 locker.unlock(); // 搞不了就歇會再看看能不能搞 sleep(1); locker.lock(); // 搞事需要加鎖,判斷能不能搞事也需要加鎖 } justdoit(); // 搞事 locker.unlock(); }
復制代碼
當條件不滿足時,就循環重試(其它線程會通過加鎖來修改條件),但是需要間隔 sleep,不然 CPU 就會因為空轉而飆高。這里存在一個問題,那就是 sleep 多久不好控制。間隔太久,會拖慢整體效率,甚至會錯過時機(條件瞬間滿足了又立即被重置了),間隔太短,又回導致 CPU 空轉。有了條件變量,這個問題就可以解決了
void doSomethingWithCondition() { cond = locker.newCondition(); locker.lock(); while(!condition_is_true()) { cond.await(); } justdoit(); locker.unlock(); }
復制代碼
await() 方法會一直阻塞在 cond 條件變量上直到被另外一個線程調用了 cond.signal() 或者 cond.signalAll() 方法后才會返回,await() 阻塞時會自動釋放當前線程持有的鎖,await() 被喚醒后會再次嘗試持有鎖(可能又需要排隊),拿到鎖成功之后 await() 方法才能成功返回。
阻塞在條件變量上的線程可以有多個,這些阻塞線程會被串聯成一個條件等待隊列。當 signalAll() 被調用時,會喚醒所有的阻塞線程,讓所有的阻塞線程重新開始爭搶鎖。如果調用的是 signal() 只會喚醒隊列頭部的線程,這樣可以避免「驚群問題」。
await() 方法必須立即釋放鎖,否則臨界區狀態就不能被其它線程修改,condition_is_true() 返回的結果也就不會改變。 這也是為什么條件變量必須由鎖對象來創建,條件變量需要持有鎖對象的引用這樣才可以釋放鎖以及被 signal 喚醒后重新加鎖。創建條件變量的鎖必須是排他鎖,如果是共享鎖被 await() 方法釋放了並不能保證臨界區的狀態可以被其它線程來修改,可以修改臨界區狀態的只能是排他鎖。這也是為什么 ReadWriteLock.ReadLock 類的 newCondition 方法定義如下
public Condition newCondition() { throw new UnsupportedOperationException(); }
復制代碼
有了條件變量,sleep 不好控制的問題就解決了。當條件滿足時,調用 signal() 或者 signalAll() 方法,阻塞的線程可以立即被喚醒,幾乎沒有任何延遲。
條件等待隊列
當多個線程 await() 在同一個條件變量上時,會形成一個條件等待隊列。同一個鎖可以創建多個條件變量,就會存在多個條件等待隊列。這個隊列和 AQS 的隊列結構很接近,只不過它不是雙向隊列,而是單向隊列。隊列中的節點和 AQS 等待隊列的節點是同一個類,但是節點指針不是 prev 和 next,而是 nextWaiter。
class AQS { ... class ConditionObject { Node firstWaiter; // 指向第一個節點 Node lastWaiter; // 指向第二個節點 } class Node { static final int CONDITION = -2; static final int SIGNAL = -1; Thread thread; // 當前等待的線程 Node nextWaiter; // 指向下一個條件等待節點 Node prev; Node next; int waitStatus; // waitStatus = CONDITION } ... }
復制代碼
隊列轉移
當條件變量的 signal() 方法被調用時,條件等待隊列的頭節點線程會被喚醒,該節點從條件等待隊列中被摘走,然后被轉移到 AQS 的等待隊列中,准備排隊嘗試重新獲取鎖。這時節點的狀態從 CONDITION 轉為 SIGNAL,表示當前節點是被條件變量喚醒轉移過來的。
class AQS { ... boolean transferForSignal(Node node) { // 重置節點狀態 if (!node.compareAndSetWaitStatus(Node.CONDITION, 0)) return false Node p = enq(node); // 進入 AQS 等待隊列 int ws = p.waitStatus; // 再修改狀態為SIGNAL if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } ... }
復制代碼
被轉移的節點的 nextWaiter 字段的含義也發生了變更,在條件隊列里它是下一個節點的指針,在 AQS 等待隊列里它是共享鎖還是互斥鎖的標志。
Java 並發包常用類庫依賴結構
ReentrantLock 加鎖過程
下面我們精細分析加鎖過程,深入理解鎖邏輯控制。我必須肯定 Dough Lea 的代碼寫成下面這樣的極簡形式,閱讀起來還是挺難以理解的。
class ReentrantLock { ... public void lock() { sync.acquire(1); } ... } class Sync extends AQS { ... public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } ... }
復制代碼
acquire 的 if 判斷語句要分為三個部分,tryAcquire 方法表示當前的線程嘗試加鎖,如果加鎖不成功就需要排隊,這時候調用 addWaiter 方法,將當前線程入隊。然后再調用 acquireQueued 方法,開始了 park 、醒來重試加鎖、加鎖不成功繼續 park 的循環重試加鎖過程。直到加鎖成功 acquire 方法才會返回。
如果在循環重試加鎖過程中被其它線程打斷了,acquireQueued 方法就會返回 true。這時候線程就需要調用 selfInterrupt() 方法給當前線程設置一個被打斷的標識位。
// 打斷當前線程,其實就是設置一個標識位 static void selfInterrupt() { Thread.currentThread().interrupt(); }
復制代碼
線程如何知道自己被其它線程打斷了呢?在 park 醒來之后調用 Thread.interrupted() 就知道了,不過這個方法只能調用一次,因為它在調用之后就會立即 clear 打斷標志位。這也是為什么 acquire 方法里需要調用 selfInterrupt() ,為的就是重新設置打斷標志位。這樣上層的邏輯才可以通過 Thread.interrupted() 知道自己有沒有被打斷。
acquireQueued 和 addWaiter 方法由 AQS 類提供,tryAcquire 需要由子類自己實現。不同的鎖會有不同的實現。下面我們來看看 ReentrantLock 的公平鎖 tryAcquire 方法的實現
if(c == 0) 意味着當前鎖是自由態,計數值為零。這時就需要爭搶鎖,因為同一時間可能會有多個線程在調用 tryAcquire。爭搶的方式是用 CAS 操作 compareAndSetState,成功將鎖計數值從 0 改成 1 的線程將獲得這把鎖,將當前的線程記錄到 exclusiveOwnerThread 中。
代碼里還有一個 hasQueuedPredecessors() 判斷,這個判斷非常重要,它的意思是看看當前的 AQS 等待隊列里有沒有其它線程在排隊,公平鎖在加鎖之前需要 check 一下,如果有排隊的,自己就不能插隊。而非公平鎖就不需要 check,公平鎖和非公平鎖的全部的實現差異就在於此,就這一個 check 決定了鎖的公平與否。
下面我們再看看 addWaiter 方法的實現,參數 mode 表示是共享鎖還是排他鎖,它對應 Node.nextWaiter 屬性。
將新節點添加到隊尾也是需要考慮多線程並發的,所以代碼里再一次使用了 CAS 操作 compareAndSetTail 來競爭隊尾指針。沒有競爭到的線程就會繼續下一輪競爭 for(;;) 繼續使用 CAS 操作將新節點往隊尾添加。
下面我們再看看 acquireQueue 方法的代碼實現,它會重復 park、嘗試再次加鎖、加鎖失敗繼續 park 的循環過程。
private final boolean parkAndCheckInterrupt() { LockSupport.park(this); return Thread.interrupted(); }
復制代碼
線程在 park 返回醒來之后要立即檢測一下是否被其它線程中斷了。不過即使發生中斷了,它還會繼續嘗試獲取鎖,如果獲取不到,還會繼續睡眠,直到鎖獲取到了才將中斷狀態返回。這意味着打斷線程並不會導致死鎖狀態(拿不到鎖)退出。
同時我們還可以注意到鎖是可以取消的 cancelAcquire(),准確地說是取消處於等待加鎖的狀態,線程處於 AQS 的等待隊列中等待加鎖。那什么情況下才會拋出異常而導致取消加鎖呢,唯一的可能就是 tryAcquire 方法,這個方法是由子類實現的,子類的行為不受 AQS 控制。當子類的 tryAcquire 方法拋出了異常,那 AQS 最好的處理方法就是取消加鎖了。cancelAcquire 會將當前節點從等待隊列中移除。
ReentrantLock 解鎖過程
解鎖的過程要簡單一些,將鎖計數降為零后,喚醒等待隊列中的第一個有效節點。
public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; } protected final boolean tryRelease(int releases) { int c = getState() - releases; // 解鈴還須系鈴人 if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
復制代碼
考慮到可重入鎖,需要判斷鎖計數是否降為零才可以確定鎖是否徹底被釋放。只有鎖徹底被釋放了才能喚醒后繼等待節點。unparkSuccessor 會跳過無效節點(已取消的節點),找到第一個有效節點調用 unpark() 喚醒相應的線程。
讀寫鎖
讀寫鎖分為兩個鎖對象 ReadLock 和 WriteLock,這兩個鎖對象共享同一個 AQS。AQS 的鎖計數變量 state 將分為兩個部分,前 16bit 為共享鎖 ReadLock 計數,后 16bit 為互斥鎖 WriteLock 計數。互斥鎖記錄的是當前寫鎖重入的次數,共享鎖記錄的是所有當前持有共享讀鎖的線程重入總次數。
讀寫鎖同樣也需要考慮公平鎖和非公平鎖。共享鎖和互斥鎖的公平鎖策略和 ReentrantLock 一樣,就是看看當前還有沒有其它線程在排隊,自己會乖乖排到隊尾。非公平鎖策略不一樣,它會比較偏向於給寫鎖提供更多的機會。如果當前 AQS 隊列里有任何讀寫請求的線程在排隊,那么寫鎖可以直接去爭搶,但是如果隊頭是寫鎖請求,那么讀鎖需要將機會讓給寫鎖,去隊尾排隊。 畢竟讀寫鎖適合讀多寫少的場合,對於偶爾出現一個寫鎖請求就應該得到更高的優先級去處理。
寫鎖加鎖過程
讀寫鎖的寫鎖加鎖在整體邏輯上和 ReentrantLock 是一樣的,不同的是 tryAcquire() 方法
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } protected final boolean tryAcquire(int acquires) { Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); setState(c + acquires); return true; } if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) return false; setExclusiveOwnerThread(current); return true; }
復制代碼
寫鎖也需要考慮可重入,如果當前 AQS 互斥鎖的持有線程正好是當前要加鎖的線程,那么就是寫鎖在重入,重入只需要遞增鎖計數值即可。當 c!=0 也就是鎖計數不為零時,既可能是因為當前的 AQS 有讀鎖也可能是因為有寫鎖,判斷 w == 0 就是判斷當前的計數是不是讀鎖帶來的。
如果計數值為零,那就開始爭搶鎖。取決於鎖是否公平,在爭搶之前調用 writerShouldBlock() 方法看看自己是否需要排隊,如果不需要排隊,就可以使用 CAS 操作來爭搶,成功將計數值從 0 設置為 1 的線程將獨占寫鎖。
讀鎖加鎖過程
讀鎖加鎖過程比寫鎖要復雜很多,它在整體流程上和寫鎖一樣,但是細節差距很大。特別是它需要為每一個線程記錄讀鎖計數,這部分邏輯占據了不少代碼。
public final void acquireShared(int arg) { // 如果嘗試加鎖不成功, 就去排隊休眠,然后循環重試 if (tryAcquireShared(arg) < 0) // 排隊、循環重試 doAcquireShared(arg); }
復制代碼
如果當前線程已經持有寫鎖,它還可以繼續加讀鎖,這是為了達成鎖降級必須支持的邏輯。鎖降級是指在持有寫鎖的情況下,再加讀鎖,再解寫鎖。相比於先寫解鎖再加讀鎖而言,這樣可以省去加鎖二次排隊的過程。因為鎖降級的存在,鎖計數中讀寫計數可以同時不為零。
wlock.lock(); if(whatever) { // 降級 rlock.lock(); wlock.unlock(); doRead(); rlock.unlock(); } else { // 不降級 doWrite() wlock.unlock(); }
復制代碼
為了給每一個讀鎖線程進行鎖計數,它設置了一個 ThreadLocal 變量。
private transient ThreadLocalHoldCounter readHolds; static final class HoldCounter { int count; final long tid = LockSupport.getThreadId(Thread.currentThread()); } static final class ThreadLocalHoldCounter extends ThreadLocal<HoldCounter> { public HoldCounter initialValue() { return new HoldCounter(); } }
復制代碼
但是 ThreadLocal 變量訪問起來效率不夠高,所以又設置了緩存。它會存儲最近一次獲取讀鎖線程的鎖計數。在線程爭用不是特別頻繁的情況下,直接讀取緩存會比較高效。
private transient HoldCounter cachedHoldCounter;
復制代碼
Dough Lea 覺得使用 cachedHoldCounter 還是不夠高效,所以又加了一層緩存記錄 firstReader,記錄第一個將讀鎖計數從 0 變成 1 的線程以及鎖計數。當沒有線程爭用時,直接讀取這兩個字段會更加高效。
private transient Thread firstReader; private transient int firstReaderHoldCount; final int getReadHoldCount() { // 先訪問鎖全局計數的讀計數部分 if (getReadLockCount() == 0) return 0; // 再訪問 firstReader Thread current = Thread.currentThread(); if (firstReader == current) return firstReaderHoldCount; // 再訪問最近的讀線程鎖計數 HoldCounter rh = cachedHoldCounter; if (rh != null && rh.tid == LockSupport.getThreadId(current)) return rh.count; // 無奈讀 ThreadLocal 吧 int count = readHolds.get().count; if (count == 0) readHolds.remove(); return count; }
復制代碼
所以我們看到為了記錄這個讀鎖計數作者煞費苦心,那這個讀計數的作用是什么呢?那就是線程可以通過這個計數值知道自己有沒有持有這個讀寫鎖。
讀加鎖還有一個自旋的過程,所謂自旋就是第一次加鎖失敗,那就直接循環重試,不休眠,聽起來有點像死循環重試法。
final static int SHARED_UNIT = 65536 // 讀計數是高16位 final int fullTryAcquireShared(Thread current) { for(;;) { int c = getState(); // 如果有其它線程加了寫鎖,還是返回睡覺去吧 if (exclusiveCount(c) != 0) { if (getExclusiveOwnerThread() != current) return -1; ... // 超出計數上限 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { // 拿到讀鎖了 ... return 1 } ... // 循環重試 } }
復制代碼
因為讀鎖需要使用 CAS 操作來修改底層鎖的總讀計數值,成功的才可以獲得讀鎖,獲取讀鎖的 CAS 操作失敗只是意味着讀鎖之間存在 CAS 操作的競爭,並不意味着此刻鎖被別人占據了自己不能獲得。多試幾次肯定可以加鎖成功,這就是自旋的原因所在。同樣在釋放讀鎖的時候也有一個 CAS 操作的循環重試過程。
protected final boolean tryReleaseShared(int unused) { ... for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; if (compareAndSetState(c, nextc)) { return nextc == 0; } } ... }
