Semaphore



JUC 高並發工具類(3文章)與高並發容器類(N文章) :

1 Semaphore是什么?

Semaphore是計數信號量。Semaphore管理一系列許可。每個acquire方法阻塞,直到有一個許可證可以獲得然后拿走一個許可證;每個release方法增加一個許可,這可能會釋放一個阻塞的acquire方法。然而,其實並沒有實際的許可這個對象,Semaphore只是維持了一個可獲得許可證的數量。

比如:停車場入口立着的那個顯示屏,每有一輛車進入停車場顯示屏就會顯示剩余車位減1,每有一輛車從停車場出去,顯示屏上顯示的剩余車輛就會加1,當顯示屏上的剩余車位為0時,停車場入口的欄桿就不會再打開,車輛就無法進入停車場了,直到有一輛車從停車場出去為止。

比如:在學生時代都去餐廳打過飯,假如有3個窗口可以打飯,同一時刻也只能有3名同學打飯。第四個人來了之后就必須在外面等着,只要有打飯的同學好了,就可以去相應的窗口了 。

1604135894267

2 怎么使用 Semaphore

2.1 構造方法

//創建具有給定的許可數和非公平的公平設置的 Semaphore。  
Semaphore(int permits)   

//創建具有給定的許可數和給定的公平設置的 Semaphore。  
Semaphore(int permits, boolean fair)   
       

2.2 重要方法

在上面我們使用最基本的acquire方法和release方法就可以實現Semaphore最常見的功能,不過其他方法還是需要我們去了解一下的。



1、acquire(int permits)

從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。就好比是一個學生占兩個窗口。這同時也對應了相應的release方法。

2、release(int permits)

釋放給定數目的許可,將其返回到信號量。這個是對應於上面的方法,一個學生占幾個窗口完事之后還要釋放多少

3、availablePermits()

返回此信號量中當前可用的許可數。也就是返回當前還有多少個窗口可用。

4、reducePermits(int reduction)

根據指定的縮減量減小可用許可的數目。

5、hasQueuedThreads()

查詢是否有線程正在等待獲取資源。

6、getQueueLength()

返回正在等待獲取的線程的估計數目。該值僅是估計的數字。

7、tryAcquire(int permits, long timeout, TimeUnit unit)

如果在給定的等待時間內此信號量有可用的所有許可,並且當前線程未被中斷,則從此信號量獲取給定數目的許可。

8、acquireUninterruptibly(int permits)

從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。

3 使用案例

這個案例使用的就是我們之前的小例子,也就是去餐廳打飯的案例。

我們先看Test類:

img

在這個代碼中我們看到,主要是new了一個Semaphore,然后賦給每一位同學Student,接下來我們就來好好看看Student線程是如何實現的。

img

在這個Student類中我們最主要看run方法的實現,首先我們通過acquire獲取了當前窗口的許可,然后休眠3秒代表打飯,最后在finally使用release方法釋放這個窗口許可證。代碼很簡單,原理很清楚,我們測試一波:

img

這個結果你也看到了,基本上同一時刻只能有三個學生在窗口旁邊。

在這里你可能有一個疑問了,Semaphore好像和synchronized關鍵字沒什么區別,都可以實現同步,如果是這樣那說明我們還沒有真正理解jdk的注釋,他只是限制了訪問某些資源的線程數,其實並沒有實現同步,我們可以看一下:

img

現在我們在獲取許可前增加了一條輸出語句,也就是能打印出有哪個線程進入了,再去測試一波

img

結果很清晰,所以對於Semaphore來說,我們需要記住的其實是資源的互斥而不是資源的同步,在同一時刻是無法保證同步的,但是卻可以保證資源的互斥。

4 Semaphore使用場景

用於那些資源有明確訪問數量限制的場景,常用於限流 。

  • 比如:數據庫連接池,同時進行連接的線程有數量限制,連接不能超過一定的數量,當連接達到了限制數量后,后面的線程只能排隊等前面的線程釋放了數據庫連接才能獲得數據庫連接。

  • 比如:停車場場景,車位數量有限,同時只能容納多少台車,車位滿了之后只有等里面的車離開停車場外面的車才可以進入。

5 Semaphore原理

(1)、Semaphore初始化。

Semaphore semaphore=new Semaphore(2);

1、當調用new Semaphore(2) 方法時,默認會創建一個非公平的鎖的同步阻塞隊列。

2、把初始許可數量賦值給同步隊列的state狀態,state的值就代表當前所剩余的許可數量。

初始化完成后同步隊列信息如下圖:

img

(2)獲取許可

semaphore.acquire();

1、當前線程會嘗試去同步隊列獲取一個許可,獲取許可的過程也就是使用原子的操作去修改同步隊列的state ,獲取一個許可則修改為state=state-1。

2、 當計算出來的state<0,則代表許可數量不足,此時會創建一個Node節點加入阻塞隊列,掛起當前線程。

3、當計算出來的state>=0,則代表獲取許可成功。

源碼:

/**
     *  獲取1個許可
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
/**
     * 共享模式下獲取許可,獲取成功則返回,失敗則加入阻塞隊列,掛起線程
     * @param arg
     * @throws InterruptedException
     */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //嘗試獲取許可,arg為獲取許可個數,當可用許可數減當前許可數結果小於0,則創建一個節點加入阻塞隊列,掛起當前線程。
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }
/**
     * 1、創建節點,加入阻塞隊列,
     * 2、重雙向鏈表的head,tail節點關系,清空無效節點
     * 3、掛起當前節點線程
     * @param arg
     * @throws InterruptedException
     */
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //創建節點加入阻塞隊列
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            for (;;) {
                //獲得當前節點pre節點
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);//返回鎖的state
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //重組雙向鏈表,清空無效節點,掛起當前線程
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

線程1、線程2、線程3、分別調用semaphore.acquire(),整個過程隊列信息變化如下圖:

img

(3)、釋放許可

 semaphore.release();

當調用semaphore.release() 方法時

1、線程會嘗試釋放一個許可,釋放許可的過程也就是把同步隊列的state修改為state=state+1的過程

2、釋放許可成功之后,同時會喚醒同步隊列的所有阻塞節共享節點線程

3、被喚醒的節點會重新嘗試去修改state=state-1 的操作,如果state>=0則獲取許可成功,否則重新進入阻塞隊列,掛起線程。

源碼:

 /**
     * 釋放許可
     */
    public void release() {
        sync.releaseShared(1);
    }
/**
     *釋放共享鎖,同時喚醒所有阻塞隊列共享節點線程
     * @param arg
     * @return
     */
    public final boolean releaseShared(int arg) {
        //釋放共享鎖
        if (tryReleaseShared(arg)) {
            //喚醒所有共享節點線程
            doReleaseShared();
            return true;
        }
        return false;
    }
 /**
     * 喚醒所有共享節點線程
     */
    private void doReleaseShared() {
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//是否需要喚醒后繼節點
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//修改狀態為初始0
                        continue;
                    unparkSuccessor(h);//喚醒h.nex節點線程
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE));
            }
            if (h == head)                   // loop if head changed
                break;
        }
    }

繼上面的圖,當我們線程1調用semaphore.release(); 時候整個流程如下圖:

img


回到◀瘋狂創客圈

瘋狂創客圈 - Java高並發研習社群,為大家開啟大廠之門


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM