Semaphore最詳細解析


官方解釋:

  • 一個計數信號量。在概念上,信號量維持一組許可證。如果有必要,每個acquire()都會阻塞,直到許可證可用,然后才能使用它。每個release()添加許可證,潛在地釋放阻塞獲取方。但是,沒有使用實際的許可證對象; Semaphore只保留可用數量的計數,並相應地執行。信號量通常用於限制線程數,而不是訪問某些(物理或邏輯)資源

我記得考科目一的時候有一個大教室,這個教室只能同時允許兩百人考試,當有一個考完之后,下一個才能進去進行考試。門口會有安檢人員進行安檢,這個Semaphore就相當於這個安檢員。

也可以理解為停車場,停車場內的停車位是固定的,只有當一輛或多輛車開走之后外面等待的車才能進去停車。

用法:

1、定義三個資格 
Semaphore semaphore = new Semaphore(3);

ThreadPoolExecutor poolExecutor = 
        new ThreadPoolExecutor(10, 20, 
                5000, TimeUnit.MILLISECONDS,
                new LinkedBlockingDeque<>(100));

for (int i = 0; i < 10; i++) {
    int finalI = i;
    poolExecutor.execute(new Thread() {
        @Override
        public void run() {
            try {
                //獲取執行資格
                semaphore.acquire(1);
                System.out.println(finalI+"=========");
                //模擬每個線程運行的時間
                Thread.sleep(1000);
                //釋放執行資格
                semaphore.release(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });
}


poolExecutor.shutdown();

  

運行結果如下:(同一時刻只能運行三個線程。有點模糊,湊合看)

 

解析:

一、定義:

public Semaphore(int permits) {    sync = new NonfairSync(permits);}

  

Semaphroe底層也是用Sync類,默認是非公平的,也有公平的構造方法。

public Semaphore(int permits, boolean fair) {
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

  

定義的資格數其實是設置鎖的狀態值的(AQS之前已說過,維護鎖狀態值和線程等待隊列)

abstract static class Sync extends AbstractQueuedSynchronizer {
    Sync(int permits) {
        setState(permits);
    }
}

  

二、為什么能限制同時執行的線程數量呢?

這就是acquire方法的用處了

public void acquire(int permits) {    sync.acquireSharedInterruptibly(permits);}

  

點進acquireSharedInterruptibly這個方法看看:

public final void acquireSharedInterruptibly(int arg)
{
    1、嘗試獲取鎖,返回值小於0就是獲取鎖失敗
    if (tryAcquireShared(arg) < 0)
        2、如果獲取失敗,則進入隊列進行等待,之前已經解析過
        doAcquireSharedInterruptibly(arg);
}

  

可以看到,跟之前CountDownLatch的await方法是一樣的。

tryAcquireShared方法最終執行的如下方法:

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        1、獲取當前鎖狀態,鎖狀態值一開始是自定義的 
        int available = getState();
        2、當前申請后剩余的鎖狀態值
        int remaining = available - acquires;
        if (3、如小於0,則申請失敗,進入等待隊列中
            remaining < 0 ||
            4、CAS替換鎖狀態值
            compareAndSetState(available, remaining))
            return remaining;
    }
}

  

上述是非公平的,公平的只加了一個判斷線程等待隊列前是否有其它線程。排隊一個一個來。

static final class FairSync extends Sync {

        protected int tryAcquireShared(int acquires) {
            for (;;) {
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
}

  

這個就是為什么Semaphore能控制當前並發線程的數量的原因。

三、釋放鎖

線程獲取執行資格之后需要釋放鎖。這就是release方法的用處。不釋放的話鎖會一直被占用,其他線程就無法運行。

public void release(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.releaseShared(permits);
}

  

點進releaseShared看看

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}

  

跟之前的CountDownLatch是一樣的,只是實現不一樣。Semaphore實現如下:

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        1、獲取鎖當前狀態
        int current = getState();
        2、釋放鎖,直接相加
        int next = current + releases;
        if (next < current) // overflow
            throw new Error("Maximum permit count exceeded");
        3、用CAS更新鎖狀態
        if (compareAndSetState(current, next))
            return true;
    }
}

  

=======================================================

我是Liusy,一個喜歡健身的程序員。

歡迎關注微信公眾號【Liusy01】,一起交流Java技術及健身,獲取更多干貨,最新更新【K8S】。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM