【Java並發工具類】Semaphore


前言

1965年,荷蘭計算機科學家Dijkstra提出的信號量機制成為一種高效的進程同步機制。這之后的15年,信號量一直都是並發編程領域的終結者。1980年,管程被提出,成為繼信號量之后的在並發編程領域的第二個選擇。目前幾乎所有的語言都支持信號量機制,Java也不例外。Java中提供了Semaphore並發工具類來支持信號量機制。下面我們就來了解Java實現的信號量機制。
首先介紹信號量模型,然后介紹如何使用,最后使用信號量來實現一個限流器。

信號量模型

信號量模型圖(圖來自參考[1]):

image-20200215222026469

信號量模型總結為:一個計數器、一個等待隊列和三個對外調用的方法。
計數器和等待隊列時對外透明的,所有我們只能通過三個對外方法來訪問計數器和等待隊列。
init():設置計數器的初始值。
down():計數器的值減一。如果此時計數器的值小於0,則當前線程插入等待隊列並阻塞,否則當前線程可以繼續執行。
up():計數器的值加一。如果此時計數器的值小於或者等於0,則喚醒等待隊列中的一個線程,並將其從等待隊列中移除。

這三個方法都是原子性的,由實現信號量模型的方法保證。在Java SDK中,信號量模型是由java.util.concurrent.Semaphore實現。

信號量模型代碼化大致類似如下:

class Semaphore{
    int count; // 計數器
    Queue queue; // 等待隊列
    
    // 初始化操作
    Semaphore(int c){
        this.count=c;
    }
    
    void down(){
        this.count--; // 計數器值減一
        if(this.count < 0){
            // 將當前線程插入等待隊列
            // 阻塞當前線程
        }
    }
    
    void up(){
        this.count++; // 計數器值加一
        if(this.count <= 0) {
            // 移除等待隊列中的某個線程T
            // 喚醒線程T
        }
    }
}

在信號量模型中,down()up()這兩個操作也被成為P操作(荷蘭語proberen,測試)和V操作(荷荷蘭語verhogen,增加)。在我學的操作系統教材中(C語言實現),P操作對應wait(),V操作對應singal()。雖然叫法不同,但是語義都是相同的。在Java SDK並發包中,down()up()分別對應於Semaphore中的acquire()release()

如何使用信號量

信號量有時也被稱為紅綠燈,我們想想紅綠燈時怎么控制交通的,就知道該如何使用信號量。車輛路過十字路時,需要先檢查是否為綠燈,如果是則通行,否則就等待。想想和加鎖機制有點相似,都是一樣的操作,先檢查是否符合條件(“嘗試獲取”),符合(“獲取到”)則線程繼續運行,否則阻塞線程。

下面使用累加器的例子來說明如何使用信號量。

count+=1 操作是個臨界區,只允許一個線程執行,即要保證互斥。於是我們在進入臨界區之前,使用down()即Java中的acquire(),在退出之后使用up()即Java中的release()。

static int count;
//初始化信號量
static final Semaphore s = new Semaphore(1); // 構造函數參數為1,表示只允許一個線程進行臨界區。可實現一個互斥鎖的功能。
//用信號量保證互斥    
static void addOne() {
    s.acquire(); // 獲取一個許可(可看作加鎖機制中加鎖)
    try {
        count+=1;
    } finally {
        s.release(); // 歸還許可(可看做加鎖機制中解鎖)
    }
}

完整代碼如下:

package com.sakura.concrrent;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
    static int count;
    static final Semaphore s = new Semaphore(1);
    static void addOne() throws InterruptedException {
        //只會有一個線程將信號量中的計數器減為1,而另外一個線程只能將信號量中計數器減為-1,導致被阻塞
        s.acquire();  
        try {
            count +=1;
            System.out.println("Now thread is " + Thread.currentThread() + "   and count is " + count);
        }finally {
            //進入臨界區的線程在執行完臨界區代碼后將信號量中計數器的值加1然后,此時信號量中計數器的值為0,則從阻塞隊列中喚醒被阻塞的進程
            s.release();   
        }
    }

    public static void main(String[] args) {
        // 創建兩個線程運行
        MyThread thread1 = new MyThread();
        MyThread thread2 = new MyThread();

        thread1.start();
        thread2.start();
        System.out.println("main thread");

    }
}
class MyThread extends Thread{
    @Override
    public void run() {
        super.run();
        for(int i=0; i<10; i++) {                   
            try {
                SemaphoreTest.addOne();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

運行結果:

如果Semaphore的構造函數參數(許可數量,內置計數器的值)修改一下:

static final Semaphore s = new Semaphore(2);

計數器值的為2,那么就允許有兩個線程進入臨界區,我們的count值就會出現問題

快速實現一個限流器

當設置信號量的計數器為1時,可實現一個簡單的互斥鎖功能。但是,我們前面剛介紹過Java SDK中的Lock,Semaphore的用途顯然不會與Lock一致,不然就重復造輪子了。Semaphore最重要的一個功能便是:可以允許多個線程訪問一個臨界區。(上述例子我們就設置了計數器的值為2,可發現thread1和thread2都可進入臨界區。)

我們會在什么地方遇見這種需求呢?
各種池化資源,例如連接池、對象池、線程池等等。例如,數據庫連接池,在同一時刻,一定是允許多個線程同時使用連接池,當然,每個連接在被釋放之前,是不允許其他線程使用的。

我們設計如下可以允許N個線程使用的對象池,我們將信號量的計數器值設為N,就可以讓N個線程同時進行臨界區,多余的就會被阻塞。(代碼來自參考[1])

class ObjPool<T, R> {
    final List<T> pool;    //使用List保存實例對象
    // 用信號量實現限流器
    final Semaphore sem;
    
    // 構造函數
    ObjPool(int size, T t){
        pool = new Vector<T>(){}; 
        for(int i=0; i<size; i++){
            pool.add(t);
        }
        sem = new Semaphore(size);
    }
    
    // 獲取對象池的對象,調用 func
    R exec(Function<T,R> func) {
        T t = null;
        sem.acquire();    //允許N個進程同時進入臨界區
        try {
            //我們需要注意,因為多個進行可以進入臨界區,所以Vector的remove方法是線程安全的
            t = pool.remove(0);    
            return func.apply(t);    //獲取對象池匯中的一個對象后,調用func函數
        } finally {
            pool.add(t);    //離開臨界區之前,將之前獲取的對象放回到池中
            sem.release();    //使得計數器加1,如果信號量中計數器小於等於0,那么說明有線程在等待,此時就會自動喚醒等待線程
        }
    }
}
// 創建對象池
ObjPool<Long, String> pool = new ObjPool<Long, String>(10, 2);

// 通過對象池獲取 t,之后執行  
pool.exec(t -> {
    System.out.println(t);
    return t.toString();
});

小結

記得學習操作系統時,信號量類型分為了好幾種整型信號量、記錄型信號量、AND信號量以及“信號量集”(具體了解可戳參考[2])。我認為Java SDK中Semaphore應該是記錄型信號量的實現。不由想起,編程語言是對OS層面操作的一種抽象描述。這句話需要品需要細細品。

參考:
[1] 極客時間專欄王寶令《Java並發編程實戰》
[2] 靜水深流.操作系統之信號量機制總結.https://www.cnblogs.com/IamJiangXiaoKun/p/9464336.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM