1.控制並發線程數的Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,保證合理的使用公共資源。
線程可以通過acquire()方法來獲取信號量的許可,當信號量中沒有可用的許可的時候,線程阻塞,直到有可用的許可為止。線程可以通過release()方法釋放它持有
的信號量的許可。
2.Semaphore的方法列表:
// 創建具有給定的許可數和非公平的公平設置的 Semaphore。 Semaphore(int permits) // 創建具有給定的許可數和給定的公平設置的 Semaphore。 Semaphore(int permits, boolean fair) // 從此信號量獲取一個許可,在提供一個許可前一直將線程阻塞,否則線程被中斷。 void acquire() // 從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞,或者線程已被中斷。 void acquire(int permits) // 從此信號量中獲取許可,在有可用的許可前將其阻塞。 void acquireUninterruptibly() // 從此信號量獲取給定數目的許可,在提供這些許可前一直將線程阻塞。 void acquireUninterruptibly(int permits) // 返回此信號量中當前可用的許可數。 int availablePermits() // 獲取並返回立即可用的所有許可。 int drainPermits() // 返回一個 collection,包含可能等待獲取的線程。 protected Collection<Thread> getQueuedThreads() // 返回正在等待獲取的線程的估計數目。 int getQueueLength() // 查詢是否有線程正在等待獲取。 boolean hasQueuedThreads() // 如果此信號量的公平設置為 true,則返回 true。 boolean isFair() // 根據指定的縮減量減小可用許可的數目。 protected void reducePermits(int reduction) // 釋放一個許可,將其返回給信號量。 void release() // 釋放給定數目的許可,將其返回到信號量。 void release(int permits) // 返回標識此信號量的字符串,以及信號量的狀態。 String toString() // 僅在調用時此信號量存在一個可用許可,才從信號量獲取許可。 boolean tryAcquire() // 僅在調用時此信號量中有給定數目的許可時,才從此信號量中獲取這些許可。 boolean tryAcquire(int permits) // 如果在給定的等待時間內此信號量有可用的所有許可,並且當前線程未被中斷,則從此信號量獲取給定數目的許可。 boolean tryAcquire(int permits, long timeout, TimeUnit unit) // 如果在給定的等待時間內,此信號量有可用的許可並且當前線程未被中斷,則從此信號量獲取一個許可。 boolean tryAcquire(long timeout, TimeUnit unit)
3.Semaphore的內部結構
4.Semaphore的源碼:
"公平信號量"和"非公平信號量"的區別
"公平信號量"和"非公平信號量"的釋放信號量的機制是一樣的!不同的是它們獲取信號量的機制:線程在嘗試獲取信號量許可時,對於公平信號量而言,如果當前線程不在CLH隊列的頭部,則排隊等候;而對於非公平信號量而言,無論當前線程是不是在CLH隊列的頭部,它都會直接獲取信號量。該差異具體的體現在,它們的tryAcquireShared()函數的實現不同。
公平信號量tryAcquireShared源碼如下:
/** * Fair version */ static final class FairSync extends Sync { private static final long serialVersionUID = 2014338818796000944L; FairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { for (;;) { if (hasQueuedPredecessors()) return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } } }
非公平信號量tryAcquireShared源碼如下:
/** * NonFair version */ static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); } protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
實例:
public class SemaphoreTest { private static final int THREAD_COUNT = 10;
private static ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT); // 創建5個許可,允許5個並發執行 private static Semaphore s = new Semaphore(5); public static void main(String[] args) {
//創建10個線程執行任務 for (int i = 0; i < THREAD_COUNT; i++) { executorService.execute(new Runnable() { @Override public void run() { try {
//同時只能有5個線程並發執行保存數據的任務 s.acquire(); System.out.println("線程" + Thread.currentThread().getName() + " 保存數據"); Thread.sleep(2000);
//5個線程保存完數據,釋放1個許可,其他的線程才能獲取許可,繼續執行保存數據的任務 s.release(); System.out.println("線程" + Thread.currentThread().getName() + " 釋放許可"); } catch (InterruptedException e) { e.printStackTrace(); } } }); } executorService.shutdown(); } }
結果:10個線程保存數據,但是只允許5個線程並發的執行,當5個線程都保存完數據以后,釋放許可,其他線程才能拿到許可繼續保存數據,直到10個線程都保存完數據釋放許可為止。
線程pool-1-thread-2 保存數據 線程pool-1-thread-3 保存數據 線程pool-1-thread-1 保存數據 線程pool-1-thread-4 保存數據 線程pool-1-thread-5 保存數據 線程pool-1-thread-2 釋放許可 線程pool-1-thread-6 保存數據 線程pool-1-thread-1 釋放許可 線程pool-1-thread-3 釋放許可 線程pool-1-thread-4 釋放許可 線程pool-1-thread-9 保存數據 線程pool-1-thread-10 保存數據 線程pool-1-thread-5 釋放許可 線程pool-1-thread-8 保存數據 線程pool-1-thread-7 保存數據 線程pool-1-thread-10 釋放許可 線程pool-1-thread-9 釋放許可 線程pool-1-thread-6 釋放許可 線程pool-1-thread-8 釋放許可 線程pool-1-thread-7 釋放許可