Semaphore
前情提要:在學習本章前,需要先了解筆者先前講解過的ReentrantLock源碼解析,ReentrantLock源碼解析里介紹的方法有很多是本章的鋪墊。下面,我們進入本章正題Semaphore。
從概念上來講,信號量(Semaphore)會維護一組許可證用於限制線程對資源的訪問,當我們有一資源允許線程並發訪問,但我們希望能限制訪問量,就可以用信號量對訪問線程進行限制。當線程要訪問資源時,要先調用信號量的acquire方法獲取訪問許可證,當線程訪問完畢后,調用信號量的release歸還許可證。使用信號量我們可以服務做限流,尤其像淘寶天貓這樣平時訪問量就很大的電商大戶,在雙十一的時候更要評估其服務能承受的訪問量並對其做限流,避免因為訪問量過大導致服務宕機。然而,Semaphore內部實際上並沒有維護一組許可證對象,而是維護一個數字作為許可證數量,如果線程要獲取許可證,則會根據線程請求的許可證數量扣減內部的維護的數量,如果足夠扣除則線程獲取許可證成功,否則線程必須陷入阻塞,直到信號量內部的許可證數量足夠。
我們來看下面的代碼,假設OrderService是一個遠程服務,我們預估這個服務能承受的並發量是5000,訪問一次遠程服務需要獲取一個許可證,執行methodA()的業務只需要請求一次遠程服務,所以調用semaphore.acquire()默認獲取一個許可證。執行methodB()的業務需要向遠程服務並發發送兩次請求,所以這里acquire(int permits)的參數我們傳2,以保證不管是執行methodA()還是methodB(),遠程服務的並發量不超過5000。
當我們的業務不再對遠程服務的訪問,需要歸還許可證,methodA()原先只請求一個許可證,這里調用release()對信號量內部的許可證數量+1即可。methodB()請求兩個許可證,所以這里要調用release(int permits)歸還兩個。假設我們的服務里同時有4999個線程已經在執行methodA()方法,有一個線程要執行methodB()方法,可以知道許可證數量是不夠的,信號量維護的許可證數量為5000,但線程如果要同時執行需要5001個許可證,所以要執行methodB()的線程會陷入阻塞,直到信號量內部的許可證數量足夠扣除,才會獲取需要的許可證數量,然后訪問遠程服務。
public class OrderService { private Semaphore semaphore = new Semaphore(5000); public void methodA() { try { semaphore.acquire(); //methodA body } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(); } } public void methodB() { try { semaphore.acquire(2); //methodB body } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(2); } } }
如果是許可證為1的信號量可以把它當做互斥鎖,這時信號量只有兩個狀態:0或者1,我們把1代表鎖未被占用,0代表鎖被占用。如果是用這種方式將信號量當做互斥鎖我們可以用一個線程來獲取鎖,而另一個線程來釋放鎖,比如下面的<1>處和<2>處分別在不同的線程加鎖和釋放鎖。某種程度上來說這一做法可以避免死鎖,與傳統java.util.concurrent.locks.Lock的實現會有很大的不同,傳統的Lock實現,比如:ReentrantLock會要求解鎖的線程必須要是原先加鎖的線程,否則會拋出異常。
public static void main(String[] args) { Semaphore semaphore = new Semaphore(1); new Thread(() -> { try { semaphore.acquire();//<1> System.out.println(Thread.currentThread().getName() + "獲取獨占鎖"); } catch (InterruptedException e) { e.printStackTrace(); } }, "線程1").start(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { semaphore.release();//<2> System.out.println(Thread.currentThread().getName() + "釋放獨占鎖"); }, "線程2").start(); try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } }
當信號量的許可證數量為0時,如果還有線程請求獲取許可證,信號量會將線程放入一個隊列,然后掛起線程,直到有許可證被歸還,信號量會嘗試喚醒隊列中等待許可證最長時間的線程。所以信號量就分為公平(FairSync)和非公平(NonfairSync)兩種模式。在公平模式下,如果有線程要獲取信號量的許可證時,會先判斷信號量維護的等待隊列中是否已經有線程,如果有的話則乖乖入隊,沒有才嘗試請求許可證;而非公平模式則是直接請求許可證,不管隊列中是否已有線程在等待信號量的許可證。
而下面的代碼也印證了筆者之前所說的,信號量本身並不會去維護一個許可證對象的集合,當我們把許可證數量傳給信號量的構造函數時,最終會由靜態內部類Sync調用其父類AQS的setState(permits)方法將許可證賦值給AQS內部的字段state,由這個字段決定信號量有多少個許可證,請求許可證的線程能否成功。
public class Semaphore implements java.io.Serializable { private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { //... Sync(int permits) { setState(permits); } //... } static final class NonfairSync extends Sync {//非公平 NonfairSync(int permits) { super(permits); } //... } static final class FairSync extends Sync {//公平 //... FairSync(int permits) { super(permits); } //... } public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); } //... }
從上面節選的代碼來看,官方更推薦使用非公平的信號量,因為根據許可證數量創建信號量默認使用的非公平信號量,而相比於公平信號量,非公平信號量有更高的吞吐量。因此筆者先介紹非公平信號量,再介紹公平信號量。
我們先來看看acquire()和acquire(int permits) 這兩個方法,可以看到不管我們是請求一個許可證,還是請求多個許可證,本質上都是調用Sync.
acquireSharedInterruptibly(int arg)方法。如果大家觀察靜態內部類Sync的代碼可以發現:Sync並沒有實現acquireSharedInterruptibly(int arg)方法,而是其父類AQS實現了此方法。
public class Semaphore implements java.io.Serializable { //... private final Sync sync; abstract static class Sync extends AbstractQueuedSynchronizer { //... } public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } //... public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } //... }
於是我們追溯到AQS實現的acquireSharedInterruptibly(int arg)方法,這個方法的實現其實並不難,先判斷當前線程是否有中斷標記,有的話則直接拋出中斷異常InterruptedException,之后調用tryAcquireShared(int arg)嘗試獲取許可證,AQS本身並沒有實現tryAcquireShared(int arg)方法,而是交由子類去實現的,才有了子類來決定是直接嘗試獲取許可證,還是先判斷信號量的等待隊列是否有線程正在等待許可證,有的話則排隊,沒有則嘗試請求。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0) doAcquireSharedInterruptibly(arg); } //... protected int tryAcquireShared(int arg) { throw new UnsupportedOperationException(); } //... }
所以我們來看看非公平鎖實現的tryAcquireShared(int arg)方法,在非公平鎖的ryAcquireShared(int arg)方法中會調用到Sync類實現的nonfairTryAcquireShared(int acquires)方法,這個方法會先獲取當前信號量剩余的許可證數量available,然后減去請求的數量(available - acquires)得到剩余許可證數量remaining,如果remaining大於0代表信號量現有的許可證數量是允許分配調用線程請求的許可證數量,是允許分配的,所以<1>處的條件為false,會進行<2>處的CAS扣減,如果能扣減成功,則返回剩余許可證數量,返回的remaining如果大於等於0,則代表扣減成功,如果小於0代表請求失敗,表示信號量現有的許可證數量不足調用線程所需。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... abstract static class Sync extends AbstractQueuedSynchronizer { final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 ||//<1> compareAndSetState(available, remaining))//<2> return remaining; } } } //... static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } } //... }
如果在<1>處執行tryAcquireShared(arg)嘗試獲取許可證失敗,則會調用<2>處的方法將當前線程掛起。
public final void acquireSharedInterruptibly(int arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (tryAcquireShared(arg) < 0)//<1> doAcquireSharedInterruptibly(arg);//<2> }
那么我們來看看如果調用tryAcquireShared(arg)請求許可證失敗后,doAcquireSharedInterruptibly(int arg)里面完成的邏輯。如果有看過筆者前一章ReentrantLock源碼解析的朋友在看到這個方法應該會覺非常熟悉,這里會先調用<1>處的addWaiter(Node mode)方法將當前請求許可證的線程封裝成一個Node節點並入隊,這里我們也首次看到使用Node.SHARED的地方,如果一個節點Node的nextWaiter指向的是靜態常量Node.SHARED,則代表這個節點是一個共享節點,換句話說這個節點的線程可以和其他同為共享節點的線程共享資源。
當線程作為節點入隊后,判斷節點的前驅節點是否是頭節點,如果是頭節點則話則進入<2>處的分支,這里會再次調用tryAcquireShared(arg)請求許可證,之前說過如果tryAcquireShared(arg)返回的結果大於等於0代表請求許可證成功,否則請求失敗。如果請求失敗的話,之后的流程大家想必都清楚了,會先執行shouldParkAfterFailedAcquire(p, node)判斷前驅節點p的等待狀態是否為SIGNAL(-1),如果為SIGNAL則直接返回true,調用parkAndCheckInterrupt()阻塞當前線程,如果前驅節點p的等待狀態為0,會先用CAS的方式修改為SIGNAL,然后再下一次循環中阻塞當前線程。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//<1> try { for (;;) { final Node p = node.predecessor(); if (p == head) {//<2> int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } } //... private Node addWaiter(Node mode) { Node node = new Node(mode); for (;;) { Node oldTail = tail; if (oldTail != null) { node.setPrevRelaxed(oldTail); if (compareAndSetTail(oldTail, node)) { oldTail.next = node; return node; } } else { initializeSyncQueue(); } } } //... static final class Node { static final Node SHARED = new Node(); static final Node EXCLUSIVE = null; static final int CANCELLED = 1; //... static final int PROPAGATE = -3; volatile int waitStatus; volatile Node prev; volatile Node next; volatile Thread thread; Node nextWaiter; final boolean isShared() { return nextWaiter == SHARED; } //... Node(Node nextWaiter) { this.nextWaiter = nextWaiter; THREAD.set(this, Thread.currentThread()); } } //... }
上面的流程是當前線程沒有請求許可證成功而陷入阻塞的情況,那么如果是線程進入等待隊列后又獲取到許可證呢?即:執行完下面<1>處的代碼確定線程對應的節點入隊,在<2>處判斷節點的前驅節點是頭節點,進入<2>處的分支后執行<3>處的tryAcquireShared(arg)方法成功獲取到許可證此時返回的r>=0,進入<4>處的分支,那么在setHeadAndPropagate(Node node, int propagate)方法中又會做什么呢?
首先會保留一個原始頭節點head的引用,其次替換頭節點為當前節點。如果原先返回r(propagate)大於0,代表當前線程在請求完許可證后,信號量還有剩余許可證,於是<5>處的分值一定成立,因為propagate大於0,這里會判斷當前節點的下一個節點next是否是共享模式,是的話則調用doReleaseShared()方法喚醒當前節點的下一個節點。但如果傳入的propagate等於0,還有另外幾個條件可以嘗試通知當前節點的后繼節點,只要條件(h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0)成立,且當前節點的下個節點仍為共享節點,則可以喚醒后繼節點申請許可證。那么怎么來理解條件(h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0)呢?
首先我們可以先忽略這4個條件里面其中的兩個條件,h == null和(h = head) == null都不可能成立,h是原始頭節點,只要有節點入隊,頭節點不可能為null,其次判斷head也不可能為null,因為頭節點已經是當前節點,就筆者看來這兩個判斷是防止空指針異常的標准寫法,只是預防空指針不代表會發生空指針異常。所以我們只要關注h.waitStatus < 0或者head.waitStatus < 0兩個條件,其中一個成立,就可以進入<5>處的分支。那么,又如何來理解h.waitStatus < 0和head.waitStatus < 0兩個條件呢?
我們先來回憶下shouldParkAfterFailedAcquire(Node pred, Node node)這個方法,這個方法接收一個前驅節點和當前節點,把前驅節點的等待狀態改為SIGNAL(-1),代表前驅節點pred的下一個節點node等待喚醒。於是我們能夠明白如果head.waitStatus < 0代表當前節點的下一個節點等待喚醒,如果下一個節點的模式是共享節點,就會嘗試調用doReleaseShared()方法喚醒下一個節點嘗試申請許可證,即便目前傳入的信號量剩余許可證數量propagate為0,因為可能存從喚醒到申請許可證的期間,已經有別的線程歸還了許可證,這樣做可以提高整體的吞吐量,即便下一個線程被喚醒后沒有可申請的許可證數量,要做的也無非是重新阻塞線程。需要注意的是:如果隊列中有n個節點,喚醒后繼節點這個操作不一定會從頭節點一直傳播到尾節點,即便前n-1個節點的等待狀態(waitStatus)都為SIGNAL,最后一個因為是尾節點,它沒有下一個等待喚醒的節點,所以等待狀態為0。要知道當前節點能喚醒下一個節點的前提條件,首先是前驅節點為頭節點,其次當前節點的線程申請到許可證,才有資格嘗試喚醒下一個節點,如果節點被喚醒后,雖然前驅節點是頭節點,卻沒有多余的許可證可以申請,無法將頭節點替換成當前節點,就會重新陷入阻塞,也就不會嘗試喚醒下一個節點。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... private void doAcquireSharedInterruptibly(int arg) throws InterruptedException { final Node node = addWaiter(Node.SHARED);//<1> try { for (;;) { final Node p = node.predecessor(); if (p == head) {//<2> int r = tryAcquireShared(arg);//<3> if (r >= 0) {//<4> setHeadAndPropagate(node, r); p.next = null; // help GC return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) throw new InterruptedException(); } } catch (Throwable t) { cancelAcquire(node); throw t; } } //... private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 || (h = head) == null || h.waitStatus < 0) {//<5> Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } //... private void setHead(Node node) { head = node; node.thread = null; node.prev = null; } //... }
下面該來講解當h.waitStatus < 0,事實上h.waitStatus < 0這一判斷並非必要,但可以提高吞吐量。
這里筆者預先介紹一個知識點,當線程調用release()或release(int permits)方法向信號量(Semaphore)歸還許可證時后,會再調用doReleaseShared()方法喚醒信號量等待隊列中被阻塞的線程起來申請許可證,這里如果判斷頭節點的等待狀態為SIGNAL,則表明頭節點的后繼節點陷入阻塞,如果能用CAS的方式修改頭節點的等待狀態成功,則調用unparkSuccessor(h)喚醒被阻塞的后繼節點起來申請許可證。被阻塞的線程喚醒后如果能申請到許可證,會先把頭節點替換成當前節點,並根據條件判斷是否要調用doReleaseShared()喚醒下一個后繼節點,如果申請許可證失敗則執行兩次shouldParkAfterFailedAcquire(Node pred, Node node)后重新掛起當前線程。
那么h.waitStatus < 0這一判斷是如何來提高吞吐量呢?舉個例子:有一信號量許可證為2,並已經分配給線程1和線程2,此時信號量的許可證數量為0。線程3和線程4想要請求許可證只能先入隊等待,線程3和線程4的對應節點是N3和N4,隊列中的節點排序為:head->N3->N4。假設N3在入隊之后,線程1就歸還了許可證,此時N3判斷它的前驅節點是頭節點,繼而申請到許可證,因此N3不會調用shouldParkAfterFailedAcquire(Node pred, Node node)改變原先頭節點的等待狀態。線程1在歸還許可證后,調用doReleaseShared(),假定N3入隊的時候隊列為空,head是調用initializeSyncQueue()方法初始化完成的。所以head的等待狀態為0,在<3>處會用CAS的方式修改head的等待狀態為PROPAGATE(-3)。於是線程3在執行setHeadAndPropagate(Node node, int propagate)的時候,將頭節點指向N3,假定此時線程4雖然入隊,但尚未修改前驅節點N3的等待狀態為SIGNAL,所以((h = head) == null || h.waitStatus < 0)為false,但原先頭節點的等待狀態小於0,這里還是會進入<1>處的分支,判斷N4是共享節點,調用doReleaseShared()喚醒線程4。
如果線程4正在執行,且有別的線程調用LockSupport.unpark(Thread thread)喚醒線程4,線程4在第一次執行LockSupport.park(Object blocker)並不會陷入阻塞,會退出parkAndCheckInterrupt()方法后又重新申請許可證,如果申請失敗,再次調用parkAndCheckInterrupt()執行LockSupport.park(Object blocker)才會被阻塞,相當於線程4多了一次申請許可證的機會。也許在線程4第一次執行LockSupport.park(Object blocker)卻沒陷入阻塞的時候,線程2就歸還了許可證,在新一輪的循環時線程4就直接申請到許可證。
如果線程4被阻塞,此時線程2歸還了許可證卻還沒來得及調用doReleaseShared(),線程3先進入<1>處的分支調用了doReleaseShared(),線程4會被喚醒起來申請許可證,相當於有兩個線程爭相喚醒線程4。由此可見,如果頭節點的等待狀態為0,修改其等待狀態為PROPAGATE,並在<1>處加上判斷原先頭節點的等待狀態,可以提高吞吐量。
當然,執行h.compareAndSetWaitStatus(0, Node.PROPAGATE)存在失敗的情況,比如原先判定頭節點的等待狀態為0,在執行<3>處代碼之前,頭節點的后繼節點修改前驅節點的等待狀態為SIGNAL,此時CAS修改頭節點的等待狀態為PROPAGATE失敗,會重新執行一次循環,此時會進入<2>處喚醒后繼節點,於是后繼節點就又多了一次申請許可證的機會。
public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable { //... static final class Node { //... static final int SIGNAL = -1; static final int PROPAGATE = -3; //... } //... private void setHeadAndPropagate(Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); if (propagate > 0 || h == null || h.waitStatus < 0 ||//<1> (h = head) == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } //... private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) {//<2> if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !h.compareAndSetWaitStatus(0, Node.PROPAGATE))//<3> continue; // loop on failed CAS } if (h == head) // loop if head changed break; } } //... }