官方解釋:
- 一個計數信號量。在概念上,信號量維持一組許可證。如果有必要,每個acquire()都會阻塞,直到許可證可用,然后才能使用它。每個release()添加許可證,潛在地釋放阻塞獲取方。但是,沒有使用實際的許可證對象; Semaphore只保留可用數量的計數,並相應地執行。信號量通常用於限制線程數,而不是訪問某些(物理或邏輯)資源
我記得考科目一的時候有一個大教室,這個教室只能同時允許兩百人考試,當有一個考完之后,下一個才能進去進行考試。門口會有安檢人員進行安檢,這個Semaphore就相當於這個安檢員。
也可以理解為停車場,停車場內的停車位是固定的,只有當一輛或多輛車開走之后外面等待的車才能進去停車。
用法:
1、定義三個資格
Semaphore semaphore = new Semaphore(3);
ThreadPoolExecutor poolExecutor =
new ThreadPoolExecutor(10, 20,
5000, TimeUnit.MILLISECONDS,
new LinkedBlockingDeque<>(100));
for (int i = 0; i < 10; i++) {
int finalI = i;
poolExecutor.execute(new Thread() {
@Override
public void run() {
try {
//獲取執行資格
semaphore.acquire(1);
System.out.println(finalI+"=========");
//模擬每個線程運行的時間
Thread.sleep(1000);
//釋放執行資格
semaphore.release(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
poolExecutor.shutdown();
運行結果如下:(同一時刻只能運行三個線程。有點模糊,湊合看)
解析:
一、定義:
public Semaphore(int permits) { sync = new NonfairSync(permits);}
Semaphroe底層也是用Sync類,默認是非公平的,也有公平的構造方法。
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
定義的資格數其實是設置鎖的狀態值的(AQS之前已說過,維護鎖狀態值和線程等待隊列)
abstract static class Sync extends AbstractQueuedSynchronizer {
Sync(int permits) {
setState(permits);
}
}
二、為什么能限制同時執行的線程數量呢?
這就是acquire方法的用處了
public void acquire(int permits) { sync.acquireSharedInterruptibly(permits);}
點進acquireSharedInterruptibly這個方法看看:
public final void acquireSharedInterruptibly(int arg)
{
1、嘗試獲取鎖,返回值小於0就是獲取鎖失敗
if (tryAcquireShared(arg) < 0)
2、如果獲取失敗,則進入隊列進行等待,之前已經解析過
doAcquireSharedInterruptibly(arg);
}
可以看到,跟之前CountDownLatch的await方法是一樣的。
tryAcquireShared方法最終執行的如下方法:
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
1、獲取當前鎖狀態,鎖狀態值一開始是自定義的
int available = getState();
2、當前申請后剩余的鎖狀態值
int remaining = available - acquires;
if (3、如小於0,則申請失敗,進入等待隊列中
remaining < 0 ||
4、CAS替換鎖狀態值
compareAndSetState(available, remaining))
return remaining;
}
}
上述是非公平的,公平的只加了一個判斷線程等待隊列前是否有其它線程。排隊一個一個來。
static final class FairSync extends Sync {
protected int tryAcquireShared(int acquires) {
for (;;) {
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
}
這個就是為什么Semaphore能控制當前並發線程的數量的原因。
三、釋放鎖
線程獲取執行資格之后需要釋放鎖。這就是release方法的用處。不釋放的話鎖會一直被占用,其他線程就無法運行。
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
點進releaseShared看看
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
跟之前的CountDownLatch是一樣的,只是實現不一樣。Semaphore實現如下:
protected final boolean tryReleaseShared(int releases) {
for (;;) {
1、獲取鎖當前狀態
int current = getState();
2、釋放鎖,直接相加
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
3、用CAS更新鎖狀態
if (compareAndSetState(current, next))
return true;
}
}
=======================================================
我是Liusy,一個喜歡健身的程序員。
歡迎關注微信公眾號【Liusy01】,一起交流Java技術及健身,獲取更多干貨,最新更新【K8S】。
