前言
1965年,荷蘭計算機科學家Dijkstra提出的信號量機制成為一種高效的進程同步機制。這之后的15年,信號量一直都是並發編程領域的終結者。1980年,管程被提出,成為繼信號量之后的在並發編程領域的第二個選擇。目前幾乎所有的語言都支持信號量機制,Java也不例外。Java中提供了Semaphore
並發工具類來支持信號量機制。下面我們就來了解Java實現的信號量機制。
首先介紹信號量模型,然后介紹如何使用,最后使用信號量來實現一個限流器。
信號量模型
信號量模型圖(圖來自參考[1]):

信號量模型總結為:一個計數器、一個等待隊列和三個對外調用的方法。
計數器和等待隊列時對外透明的,所有我們只能通過三個對外方法來訪問計數器和等待隊列。
init()
:設置計數器的初始值。
down()
:計數器的值減一。如果此時計數器的值小於0,則當前線程插入等待隊列並阻塞,否則當前線程可以繼續執行。
up()
:計數器的值加一。如果此時計數器的值小於或者等於0,則喚醒等待隊列中的一個線程,並將其從等待隊列中移除。
這三個方法都是原子性的,由實現信號量模型的方法保證。在Java SDK中,信號量模型是由java.util.concurrent.Semaphore
實現。
信號量模型代碼化大致類似如下:
class Semaphore{
int count; // 計數器
Queue queue; // 等待隊列
// 初始化操作
Semaphore(int c){
this.count=c;
}
void down(){
this.count--; // 計數器值減一
if(this.count < 0){
// 將當前線程插入等待隊列
// 阻塞當前線程
}
}
void up(){
this.count++; // 計數器值加一
if(this.count <= 0) {
// 移除等待隊列中的某個線程T
// 喚醒線程T
}
}
}
在信號量模型中,down()
和up()
這兩個操作也被成為P操作(荷蘭語proberen,測試)和V操作(荷荷蘭語verhogen,增加)。在我學的操作系統教材中(C語言實現),P操作對應wait(),V操作對應singal()。雖然叫法不同,但是語義都是相同的。在Java SDK並發包中,down()
和up()
分別對應於Semaphore中的acquire()
和release()
。
如何使用信號量
信號量有時也被稱為紅綠燈,我們想想紅綠燈時怎么控制交通的,就知道該如何使用信號量。車輛路過十字路時,需要先檢查是否為綠燈,如果是則通行,否則就等待。想想和加鎖機制有點相似,都是一樣的操作,先檢查是否符合條件(“嘗試獲取”),符合(“獲取到”)則線程繼續運行,否則阻塞線程。
下面使用累加器的例子來說明如何使用信號量。
count+=1
操作是個臨界區,只允許一個線程執行,即要保證互斥。於是我們在進入臨界區之前,使用down()即Java中的acquire(),在退出之后使用up()即Java中的release()。
static int count;
//初始化信號量
static final Semaphore s = new Semaphore(1); // 構造函數參數為1,表示只允許一個線程進行臨界區。可實現一個互斥鎖的功能。
//用信號量保證互斥
static void addOne() {
s.acquire(); // 獲取一個許可(可看作加鎖機制中加鎖)
try {
count+=1;
} finally {
s.release(); // 歸還許可(可看做加鎖機制中解鎖)
}
}
完整代碼如下:
package com.sakura.concrrent;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
static int count;
static final Semaphore s = new Semaphore(1);
static void addOne() throws InterruptedException {
//只會有一個線程將信號量中的計數器減為1,而另外一個線程只能將信號量中計數器減為-1,導致被阻塞
s.acquire();
try {
count +=1;
System.out.println("Now thread is " + Thread.currentThread() + " and count is " + count);
}finally {
//進入臨界區的線程在執行完臨界區代碼后將信號量中計數器的值加1然后,此時信號量中計數器的值為0,則從阻塞隊列中喚醒被阻塞的進程
s.release();
}
}
public static void main(String[] args) {
// 創建兩個線程運行
MyThread thread1 = new MyThread();
MyThread thread2 = new MyThread();
thread1.start();
thread2.start();
System.out.println("main thread");
}
}
class MyThread extends Thread{
@Override
public void run() {
super.run();
for(int i=0; i<10; i++) {
try {
SemaphoreTest.addOne();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
運行結果:
如果Semaphore的構造函數參數(許可數量,內置計數器的值)修改一下:
static final Semaphore s = new Semaphore(2);
則計數器值的為2,那么就允許有兩個線程進入臨界區,我們的count值就會出現問題
快速實現一個限流器
當設置信號量的計數器為1時,可實現一個簡單的互斥鎖功能。但是,我們前面剛介紹過Java SDK中的Lock
,Semaphore的用途顯然不會與Lock一致,不然就重復造輪子了。Semaphore最重要的一個功能便是:可以允許多個線程訪問一個臨界區。(上述例子我們就設置了計數器的值為2,可發現thread1和thread2都可進入臨界區。)
我們會在什么地方遇見這種需求呢?
各種池化資源,例如連接池、對象池、線程池等等。例如,數據庫連接池,在同一時刻,一定是允許多個線程同時使用連接池,當然,每個連接在被釋放之前,是不允許其他線程使用的。
我們設計如下可以允許N個線程使用的對象池,我們將信號量的計數器值設為N,就可以讓N個線程同時進行臨界區,多余的就會被阻塞。(代碼來自參考[1])
class ObjPool<T, R> {
final List<T> pool; //使用List保存實例對象
// 用信號量實現限流器
final Semaphore sem;
// 構造函數
ObjPool(int size, T t){
pool = new Vector<T>(){};
for(int i=0; i<size; i++){
pool.add(t);
}
sem = new Semaphore(size);
}
// 獲取對象池的對象,調用 func
R exec(Function<T,R> func) {
T t = null;
sem.acquire(); //允許N個進程同時進入臨界區
try {
//我們需要注意,因為多個進行可以進入臨界區,所以Vector的remove方法是線程安全的
t = pool.remove(0);
return func.apply(t); //獲取對象池匯中的一個對象后,調用func函數
} finally {
pool.add(t); //離開臨界區之前,將之前獲取的對象放回到池中
sem.release(); //使得計數器加1,如果信號量中計數器小於等於0,那么說明有線程在等待,此時就會自動喚醒等待線程
}
}
}
// 創建對象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);
// 通過對象池獲取 t,之后執行
pool.exec(t -> {
System.out.println(t);
return t.toString();
});
小結
記得學習操作系統時,信號量類型分為了好幾種整型信號量、記錄型信號量、AND信號量以及“信號量集”(具體了解可戳參考[2])。我認為Java SDK中Semaphore應該是記錄型信號量的實現。不由想起,編程語言是對OS層面操作的一種抽象描述。這句話需要品需要細細品。
參考:
[1] 極客時間專欄王寶令《Java並發編程實戰》
[2] 靜水深流.操作系統之信號量機制總結.https://www.cnblogs.com/IamJiangXiaoKun/p/9464336.html