1 簡介
Semaphore
可翻譯為信號量,它維護一組許可證, 每次嘗試獲取許可證時都將阻塞等待直到可獲取,它才能獲取到並解除阻塞狀態。 Semaphore
可以控制一些物理或邏輯資源的訪問或使用,它常常用於限制線程數目。在實際開發中,可用作流量控制,特別對於一些公共資源有限的應用場景,如數據庫連接,或是一些其他限流的緩存池。(基於JDK1.8)
2 示例

public class SemaphoreDemo { static class Pool { private static final int MAX_AVAILABLE = 6; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } protected Object[] items; protected boolean[] used = new boolean[MAX_AVAILABLE]; Pool() { items = new Object[15]; for (int i = 0; i < items.length; i++) { items[i] = "item" + i; } } protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; // not reached } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; } } public static void main(String[] args) { final int THREAD_COUNT = 10; ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); Pool pool = new Pool(); for (int i = 0; i < THREAD_COUNT; i++) { int tmpI = i; threadPool.submit(() -> { try { Object item = pool.getItem(); System.out.printf("當前線程:%s,獲取到緩存池中的資源:%s\n", Thread.currentThread().getName(), item); Thread.sleep(7); pool.putItem(item); System.out.printf("當前線程:%s,已將緩存池中的資源%s放回池中\n", Thread.currentThread().getName(), item); } catch (InterruptedException e) { e.printStackTrace(); } }); } threadPool.shutdown(); } }
假如有一個需求,要讀取幾萬個文件的數據,因為都是IO密集型任務,我們可以啟動幾十個線程並發地讀取,但是如果讀到內存后,還需要存儲到數據庫中,而數據庫的連接數只有10個,這時我們必須控制只有10個線程同時獲取數據庫連接保存數據,否則會報錯無法獲取數據庫連接.
class SemaphoreTest { private static final int THREAD_COUNT = 30; private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT); private static Semaphore s = new Semaphore(10); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { threadPool.execute(new Runnable() { @Override public void run() { try { s.acquire(); System.out.println("save data"); s.release(); } catch (InterruptedException e) { } } }); } threadPool.shutdown(); } }
3 實現原理
Semaphore
內部主要有一個Sync
類型成員變量sync
, Sync
是繼承抽象類AbstractQueuedSynchronizer的靜態抽象內部類。
Semaphore
利用父類AQS
實現了一個共享鎖,Sync有兩個子類NonfairSync
和FairSync
,分另代表非公平鎖、公平鎖。共享鎖的關鍵在於實現重寫tryAcquireShared
和 tryReleaseShared
方法,這兩個方法分別會被父類的模板方法acquireShared
、releaseShared
所調用,具體細節請看AbstractQueuedSynchronizer實現原理分析。
Semaphore的默認構造方法使用非公平鎖,Semaphore的構造方法有一個布爾型可選參數fair,此參數指定鎖的公平鎖。
public Semaphore(int permits) { sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
(1) 靜態內部類Sync
構造方法Sync(int)
將父類AbstractQueuedSynchronizer
的實例變量state
設置為指定的許可證數permits
。
Sync(int permits) { setState(permits);// }
getPermits()`返回的許可證數即是父類AQS的state值.
final int getPermits() { return getState(); }
nonfairTryAcquireShared
是非公平鎖嘗試釋獲取鎖的方法,每成功獲取一次鎖,就從池中拿走一個許可證,而剩余的許可證就少1個。
其主要邏輯是: CAS自旋直到成功將state減1,並返回新的state,或當前已獲取到的許可證數超出了設定的許可證總數,方法返回。
此方法返回負數時,線程將被阻塞。
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); int remaining = available - acquires;//acquires一般是1 if (remaining < 0 ||//為負數,表示超出了設定的許可證總數,直接返回。不能再獲取許可證 compareAndSetState(available, remaining)) return remaining; } }
公平鎖與非公平鎖釋放鎖狀態的邏輯是一樣的,都會執行tryReleaseShared
方法。每釋放一把鎖,就將一個許可證放回池中,可用的許可證就多一個。
其主要邏輯是: CAS自旋直到成功將state加1,最終返回true.
protected final boolean tryReleaseShared(int releases) { for (;;) { int current = getState(); int next = current + releases;//acquires一般是1 if (next < current) // overflow next成為負數,next超出int類型的最大可表示范圍 throw new Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) return true; } }
reducePermits減少許可證數,邏輯與nonfairTryAcquireShared
類似,其邏輯是:CAS自旋嘗試將state
減少指定數目reductions
。
final void reducePermits(int reductions) { for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow throw new Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } }
drainPermits將所有許可證拿走,其邏輯是:CAS自旋嘗試將state
設為0,並返回當前所有可用的許可證。
final int drainPermits() { for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } }
非公平的Sync:NonfairSync
NonfairSync
代表一個公平鎖的實現,它並沒有自己的邏輯,其tryAcquireShared方法也是直接調用父類的nonfairTryAcquireShared
方法。
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
公平的Sync: FairSync
FairSync
代表一個公平鎖的實現,它的tryAcquireShared方法有自己的邏輯,與兄弟類NonfairSync的不同之處在於多了等待隊列上是否存在其前驅節點的判斷。
static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors())//有前驅,即存在比當前線程等待更長時間的線程,此時獲取鎖失敗 return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
(2) 獲取許可證
acquireUninterruptibly
獲取一個許可證不響應中斷,若獲取失敗,將阻塞等待直到可獲取為止
acquire
獲取一個許可證會響應中斷,若獲取失敗,將阻塞等待直到可獲取為止
public void acquireUninterruptibly() { sync.acquireShared(1); } public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); }
acquire(int)
一性次獲取多個許可證,響應中斷。若獲取失敗,將阻塞等待直到可獲取為止
acquireUninterruptibly(int)
一性次獲取多個許可證,不響應中斷。,若獲取失敗,將阻塞等待直到可獲取為止
public void acquire(int permits) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } public void acquireUninterruptibly(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.acquireShared(permits); }
tryAcquire()
嘗試一個獲取許可證,獲取失敗直接返回false,不會阻塞等待。
tryAcquire(long,TimeUnit)
嘗試超時獲取一個許可證,在限定時間內獲取許可證失敗返回false,不會一直阻塞等待。
public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } public boolean tryAcquire(long timeout, TimeUnit unit) throws InterruptedException { return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout)); }
tryAcquire(int)
嘗試多個獲取許可證,獲取失敗直接返回false,不會阻塞等待。
tryAcquire(int,long,TimeUnit)
嘗試超時獲取多個許可證,在限定時間內獲取許可證失敗返回false,不會一直阻塞等待。
public boolean tryAcquire(int permits) { if (permits < 0) throw new IllegalArgumentException(); return sync.nonfairTryAcquireShared(permits) >= 0; } public boolean tryAcquire(int permits, long timeout, TimeUnit unit) throws InterruptedException { if (permits < 0) throw new IllegalArgumentException(); return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout)); }
drainPermits
獲取並返回當前所有可用的許可證
public int drainPermits() { return sync.drainPermits(); }
(3) 釋放許可證
release()
釋放一個許可證
release(int)
釋放多個許可證
public void release() { sync.releaseShared(1); } public void release(int permits) { if (permits < 0) throw new IllegalArgumentException(); sync.releaseShared(permits); }
(4) 減少許可證
reducePermits
是protected級別方法,外部不可見,主要提供給子類在定制化某些功能時調用。
reducePermits
與獲取許可證方法acquireXX
不同,sync.reducePermits
不是鎖的相關方法,不被父類AQS的模板方法調用。
reducePermits
只是單單減少許可證數,不會阻塞線程。
protected void reducePermits(int reduction) { if (reduction < 0) throw new IllegalArgumentException(); sync.reducePermits(reduction); }
(5) 狀態查詢
availablePermits
返回當前可獲取的許可證數。
isFair
返回當前鎖的公平性。
hasQueuedThreads
返回當前是否有線程因獲取許可證而阻塞等待。
getQueueLength
返回當前因獲取許可證而阻塞等待的線程數。
getQueuedThreads
返回當前因獲取許可證而阻塞等待的線程集合;此方法為protected級別,外界不可見,主要方便子類監控相關指標。
public int availablePermits() { return sync.getPermits(); } public boolean isFair() { return sync instanceof FairSync; } public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } public final int getQueueLength() { return sync.getQueueLength(); } protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); }
參考: 《 Java並發編程的藝術》方騰飛