信號量Semaphore深入解讀


 

1 簡介

Semaphore可翻譯為信號量,它維護一組許可證, 每次嘗試獲取許可證時都將阻塞等待直到可獲取,它才能獲取到並解除阻塞狀態。 Semaphore可以控制一些物理或邏輯資源的訪問或使用,它常常用於限制線程數目。在實際開發中,可用作流量控制,特別對於一些公共資源有限的應用場景,如數據庫連接,或是一些其他限流的緩存池。(基於JDK1.8)

2 示例

這是一個使用信號量控制對緩存池中items訪問的示例。

public class SemaphoreDemo {
    static class Pool {
        private static final int MAX_AVAILABLE = 6;
        private final Semaphore available = new Semaphore(MAX_AVAILABLE, true);
        public Object getItem() throws InterruptedException {
            available.acquire();
            return getNextAvailableItem();
        }
        public void putItem(Object x) {
            if (markAsUnused(x))
                available.release();
        }
        protected Object[] items;
        protected boolean[] used = new boolean[MAX_AVAILABLE];
        Pool() {
            items = new Object[15];
            for (int i = 0; i < items.length; i++) {
                items[i] = "item" + i;
            }
        }
        protected synchronized Object getNextAvailableItem() {
            for (int i = 0; i < MAX_AVAILABLE; ++i) {
                if (!used[i]) {
                    used[i] = true;
                    return items[i];
                }
            }
            return null; // not reached
        }
        protected synchronized boolean markAsUnused(Object item) {
            for (int i = 0; i < MAX_AVAILABLE; ++i) {
                if (item == items[i]) {
                    if (used[i]) {
                        used[i] = false;
                        return true;
                    } else
                        return false;
                }
            }
            return false;
        }
    }
    public static void main(String[] args) {
        final int THREAD_COUNT = 10;
        ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
        Pool pool = new Pool();
        for (int i = 0; i < THREAD_COUNT; i++) {
            int tmpI = i;
            threadPool.submit(() -> {
                try {
                    Object item = pool.getItem();
                    System.out.printf("當前線程:%s,獲取到緩存池中的資源:%s\n", Thread.currentThread().getName(), item);
                   Thread.sleep(7);
                   pool.putItem(item);
                   System.out.printf("當前線程:%s,已將緩存池中的資源%s放回池中\n", Thread.currentThread().getName(), item);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        threadPool.shutdown();
    }
}
SemaphoreDemo

 

假如有一個需求,要讀取幾萬個文件的數據,因為都是IO密集型任務,我們可以啟動幾十個線程並發地讀取,但是如果讀到內存后,還需要存儲到數據庫中,而數據庫的連接數只有10個,這時我們必須控制只有10個線程同時獲取數據庫連接保存數據,否則會報錯無法獲取數據庫連接.

class SemaphoreTest {
    private static final int THREAD_COUNT = 30;
    private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
    private static Semaphore s = new Semaphore(10);
​
    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        s.acquire();
                        System.out.println("save data");
                        s.release();
                    } catch (InterruptedException e) {
​
                    }
                }
            });
        }
        threadPool.shutdown();
    }
}

3 實現原理

Semaphore內部主要有一個Sync類型成員變量sync, Sync是繼承抽象類AbstractQueuedSynchronizer的靜態抽象內部類。

Semaphore利用父類AQS實現了一個共享鎖,Sync有兩個子類NonfairSyncFairSync ,分另代表非公平鎖、公平鎖。共享鎖的關鍵在於實現重寫tryAcquireSharedtryReleaseShared 方法,這兩個方法分別會被父類的模板方法acquireSharedreleaseShared 所調用,具體細節請看AbstractQueuedSynchronizer實現原理分析

Semaphore的默認構造方法使用非公平鎖,Semaphore的構造方法有一個布爾型可選參數fair,此參數指定鎖的公平鎖。

public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
   sync = fair ? new FairSync(permits) : new NonfairSync(permits);
 }

(1) 靜態內部類Sync

構造方法Sync(int)將父類AbstractQueuedSynchronizer的實例變量state設置為指定的許可證數permits

Sync(int permits) {
    setState(permits);//
}

getPermits()`返回的許可證數即是父類AQS的state值.

final int getPermits() {
    return getState();
}

 

nonfairTryAcquireShared是非公平鎖嘗試釋獲取鎖的方法,每成功獲取一次鎖,就從池中拿走一個許可證,而剩余的許可證就少1個。

其主要邏輯是: CAS自旋直到成功將state減1,並返回新的state,或當前已獲取到的許可證數超出了設定的許可證總數,方法返回。

此方法返回負數時,線程將被阻塞。

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;//acquires一般是1
        if (remaining < 0 ||//為負數,表示超出了設定的許可證總數,直接返回。不能再獲取許可證
            compareAndSetState(available, remaining))
            return remaining;
    }
}

 

公平鎖與非公平鎖釋放鎖狀態的邏輯是一樣的,都會執行tryReleaseShared方法。每釋放一把鎖,就將一個許可證放回池中,可用的許可證就多一個。

其主要邏輯是: CAS自旋直到成功將state加1,最終返回true.

protected final boolean tryReleaseShared(int releases) {
    for (;;) {
        int current = getState();
        int next = current + releases;//acquires一般是1
        if (next < current) // overflow   next成為負數,next超出int類型的最大可表示范圍
            throw new Error("Maximum permit count exceeded");
        if (compareAndSetState(current, next))
            return true;
    }
}

 

reducePermits減少許可證數,邏輯與nonfairTryAcquireShared類似,其邏輯是:CAS自旋嘗試將state減少指定數目reductions

final void reducePermits(int reductions) {
    for (;;) {
        int current = getState();
        int next = current - reductions;
        if (next > current) // underflow
            throw new Error("Permit count underflow");
        if (compareAndSetState(current, next))
            return;
    }
}

 

drainPermits將所有許可證拿走,其邏輯是:CAS自旋嘗試將state設為0,並返回當前所有可用的許可證。

final int drainPermits() {
    for (;;) {
        int current = getState();
        if (current == 0 || compareAndSetState(current, 0))
            return current;
    }
}

 

非公平的Sync:NonfairSync

NonfairSync代表一個公平鎖的實現,它並沒有自己的邏輯,其tryAcquireShared方法也是直接調用父類的nonfairTryAcquireShared方法。

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
​
    NonfairSync(int permits) {
        super(permits);
    }
​
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

 

公平的Sync: FairSync

FairSync代表一個公平鎖的實現,它的tryAcquireShared方法有自己的邏輯,與兄弟類NonfairSync的不同之處在於多了等待隊列上是否存在其前驅節點的判斷。

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
​
    FairSync(int permits) {
        super(permits);
    }
​
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())//有前驅,即存在比當前線程等待更長時間的線程,此時獲取鎖失敗
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

(2) 獲取許可證

acquireUninterruptibly獲取一個許可證不響應中斷,若獲取失敗,將阻塞等待直到可獲取為止

acquire獲取一個許可證會響應中斷,若獲取失敗,將阻塞等待直到可獲取為止

    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

 

acquire(int) 一性次獲取多個許可證,響應中斷。若獲取失敗,將阻塞等待直到可獲取為止

acquireUninterruptibly(int)一性次獲取多個許可證,不響應中斷。,若獲取失敗,將阻塞等待直到可獲取為止

public void acquire(int permits) throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireSharedInterruptibly(permits);
}
public void acquireUninterruptibly(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    sync.acquireShared(permits);
}

 

tryAcquire()嘗試一個獲取許可證,獲取失敗直接返回false,不會阻塞等待。

tryAcquire(long,TimeUnit)嘗試超時獲取一個許可證,在限定時間內獲取許可證失敗返回false,不會一直阻塞等待。

    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }
​
    public boolean tryAcquire(long timeout, TimeUnit unit)
            throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

 

tryAcquire(int)嘗試多個獲取許可證,獲取失敗直接返回false,不會阻塞等待。

tryAcquire(int,long,TimeUnit)嘗試超時獲取多個許可證,在限定時間內獲取許可證失敗返回false,不會一直阻塞等待。

public boolean tryAcquire(int permits) {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.nonfairTryAcquireShared(permits) >= 0;
}
​
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
    if (permits < 0) throw new IllegalArgumentException();
    return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}

 

drainPermits獲取並返回當前所有可用的許可證

public int drainPermits() {
    return sync.drainPermits();
}

(3) 釋放許可證

release()釋放一個許可證

release(int)釋放多個許可證

    public void release() {
        sync.releaseShared(1);
    }
​
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

(4) 減少許可證

reducePermits是protected級別方法,外部不可見,主要提供給子類在定制化某些功能時調用。

reducePermits與獲取許可證方法acquireXX不同,sync.reducePermits不是鎖的相關方法,不被父類AQS的模板方法調用。

reducePermits只是單單減少許可證數,不會阻塞線程。

protected void reducePermits(int reduction) {
    if (reduction < 0) throw new IllegalArgumentException();
    sync.reducePermits(reduction);
}

(5) 狀態查詢

availablePermits返回當前可獲取的許可證數。

isFair返回當前鎖的公平性。

hasQueuedThreads返回當前是否有線程因獲取許可證而阻塞等待。

getQueueLength返回當前因獲取許可證而阻塞等待的線程數。

getQueuedThreads返回當前因獲取許可證而阻塞等待的線程集合;此方法為protected級別,外界不可見,主要方便子類監控相關指標。

    public int availablePermits() {
        return sync.getPermits();
    }
    public boolean isFair() {
        return sync instanceof FairSync;
    }
    public final boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }
    public final int getQueueLength() {
        return sync.getQueueLength();
    }
    protected Collection<Thread> getQueuedThreads() {
        return sync.getQueuedThreads();
    }

 

 

參考: 《 Java並發編程的藝術》方騰飛

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM