1.Semaphore簡介
Semaphore,是JDK1.5的java.util.concurrent並發包中提供的一個並發工具類。
所謂Semaphore即 信號量 的意思。
這個叫法並不能很好地表示它的作用,更形象的說法應該是許可證管理器。
其作用在JDK注釋中是這樣描述的:
A counting semaphore.
Conceptually, a semaphore maintains a set of permits.
Each {@link #acquire} blocks if necessary until a permit is available, and then takes it.
Each {@link #release} adds a permit, potentially releasing a blocking acquirer.
However, no actual permit objects are used; the {@code Semaphore} just keeps a count of the number available and acts accordingly.
翻譯過來,就是:
- Semaphore是一個計數信號量。
- 從概念上將,Semaphore包含一組許可證。
- 如果有需要的話,每個acquire()方法都會阻塞,直到獲取一個可用的許可證。
- 每個release()方法都會釋放持有許可證的線程,並且歸還Semaphore一個可用的許可證。
- 然而,實際上並沒有真實的許可證對象供線程使用,Semaphore只是對可用的數量進行管理維護。
2.Semaphore方法說明
Semaphore的方法如下:
——Semaphore(permits)
初始化許可證數量的構造函數
——Semaphore(permits,fair)
初始化許可證數量和是否公平模式的構造函數
——isFair()
是否公平模式FIFO
——availablePermits()
獲取當前可用的許可證數量
——acquire()
當前線程嘗試去阻塞的獲取1個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前線程獲取了1個可用的許可證,則會停止等待,繼續執行。
- 當前線程被中斷,則會拋出InterruptedException異常,並停止等待,繼續執行。
——acquire(permits)
當前線程嘗試去阻塞的獲取permits個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前線程獲取了n個可用的許可證,則會停止等待,繼續執行。
- 當前線程被中斷,則會拋出InterruptedException異常,並停止等待,繼續執行。
——acquierUninterruptibly()
當前線程嘗試去阻塞的獲取1個許可證(不可中斷的)。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前線程獲取了1個可用的許可證,則會停止等待,繼續執行。
——acquireUninterruptibly(permits)
當前線程嘗試去阻塞的獲取permits個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前線程獲取了n個可用的許可證,則會停止等待,繼續執行。
——tryAcquire()
當前線程嘗試去獲取1個許可證。
此過程是非阻塞的,它只是在方法調用時進行一次嘗試。
如果當前線程獲取了1個可用的許可證,則會停止等待,繼續執行,並返回true。
如果當前線程沒有獲得這個許可證,也會停止等待,繼續執行,並返回false。
——tryAcquire(permits)
當前線程嘗試去獲取permits個許可證。
此過程是非阻塞的,它只是在方法調用時進行一次嘗試。
如果當前線程獲取了permits個可用的許可證,則會停止等待,繼續執行,並返回true。
如果當前線程沒有獲得permits個許可證,也會停止等待,繼續執行,並返回false。
——tryAcquire(timeout,TimeUnit)
當前線程在限定時間內,阻塞的嘗試去獲取1個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前線程獲取了可用的許可證,則會停止等待,繼續執行,並返回true。
- 當前線程等待時間timeout超時,則會停止等待,繼續執行,並返回false。
- 當前線程在timeout時間內被中斷,則會拋出InterruptedException一次,並停止等待,繼續執行。
——tryAcquire(permits,timeout,TimeUnit)
當前線程在限定時間內,阻塞的嘗試去獲取permits個許可證。
此過程是阻塞的,它會一直等待許可證,直到發生以下任意一件事:
- 當前線程獲取了可用的permits個許可證,則會停止等待,繼續執行,並返回true。
- 當前線程等待時間timeout超時,則會停止等待,繼續執行,並返回false。
- 當前線程在timeout時間內被中斷,則會拋出InterruptedException一次,並停止等待,繼續執行。
——release()
當前線程釋放1個可用的許可證。
——release(permits)
當前線程釋放permits個可用的許可證。
——drainPermits()
當前線程獲得剩余的所有可用許可證。
——hasQueuedThreads()
判斷當前Semaphore對象上是否存在正在等待許可證的線程。
——getQueueLength()
獲取當前Semaphore對象上是正在等待許可證的線程數量。
3.Semaphore方法練習
練習目的:熟悉Semaphore的各類方法的用法。
實例代碼:
//new Semaphore(permits):初始化許可證數量的構造函數
Semaphore semaphore = new Semaphore(5); //new Semaphore(permits,fair):初始化許可證數量和是否公平模式的構造函數
semaphore = new Semaphore(5, true); //isFair():是否公平模式FIFO
System.out.println("是否公平FIFO:" + semaphore.isFair()); //availablePermits():獲取當前可用的許可證數量
System.out.println("獲取當前可用的許可證數量:開始---" + semaphore.availablePermits()); //acquire():獲取1個許可證 //---此線程會一直阻塞,直到獲取這個許可證,或者被中斷(拋出InterruptedException異常)。
semaphore.acquire(); System.out.println("獲取當前可用的許可證數量:acquire 1 個---" + semaphore.availablePermits()); //release():釋放1個許可證
semaphore.release(); System.out.println("獲取當前可用的許可證數量:release 1 個---" + semaphore.availablePermits()); //acquire(permits):獲取n個許可證 //---此線程會一直阻塞,直到獲取全部n個許可證,或者被中斷(拋出InterruptedException異常)。
semaphore.acquire(2); System.out.println("獲取當前可用的許可證數量:acquire 2 個---" + semaphore.availablePermits()); //release(permits):釋放n個許可證
semaphore.release(2); System.out.println("獲取當前可用的許可證數量:release 1 個---" + semaphore.availablePermits()); //hasQueuedThreads():是否有正在等待許可證的線程
System.out.println("是否有正在等待許可證的線程:" + semaphore.hasQueuedThreads()); //getQueueLength():正在等待許可證的隊列長度(線程數量)
System.out.println("正在等待許可證的隊列長度(線程數量):" + semaphore.getQueueLength()); Thread.sleep(10); System.out.println(); //定義final的信號量
Semaphore finalSemaphore = semaphore; new Thread(() -> { //drainPermits():獲取剩余的所有的許可證
int permits = finalSemaphore.drainPermits(); System.out.println(Thread.currentThread().getName() + "獲取了剩余的全部" + permits + "個許可證."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } //釋放所有的許可證
finalSemaphore.release(permits); System.out.println(Thread.currentThread().getName() + "釋放了" + permits + "個許可證."); }).start(); Thread.sleep(10); new Thread(() -> { try { //有一個線程正在等待獲取1個許可證
finalSemaphore.acquire(); System.out.println(Thread.currentThread().getName() + "獲取了1個許可證."); } catch (InterruptedException e) { e.printStackTrace(); } //釋放1個許可證
finalSemaphore.release(); System.out.println(Thread.currentThread().getName() + "釋放了1個許可證."); }).start(); Thread.sleep(10); System.out.println(); System.out.println("獲取當前可用的許可證數量:drain 剩余的---" + finalSemaphore.availablePermits()); System.out.println("是否有正在等待許可證的線程:" + finalSemaphore.hasQueuedThreads()); System.out.println("正在等待許可證的隊列長度(線程數量):" + finalSemaphore.getQueueLength()); System.out.println(); Thread.sleep(10); new Thread(() -> { try { //有一個線程正在等待獲取2個許可證
finalSemaphore.acquire(2); System.out.println(Thread.currentThread().getName() + "獲取了2個許可證."); } catch (InterruptedException e) { e.printStackTrace(); } //釋放兩個許可證
finalSemaphore.release(2); System.out.println(Thread.currentThread().getName() + "釋放了2個許可證."); }).start(); Thread.sleep(10); System.out.println(); System.out.println("獲取當前可用的許可證數量:drain 剩余的---" + finalSemaphore.availablePermits()); System.out.println("是否有正在等待許可證的線程:" + finalSemaphore.hasQueuedThreads()); System.out.println("正在等待許可證的隊列長度(線程數量):" + finalSemaphore.getQueueLength()); System.out.println(); Thread.sleep(5000); System.out.println(); System.out.println("獲取當前可用的許可證數量:---" + finalSemaphore.availablePermits()); System.out.println("是否有正在等待許可證的線程:" + finalSemaphore.hasQueuedThreads()); System.out.println("正在等待許可證的隊列長度(線程數量):" + finalSemaphore.getQueueLength());
運行結果:
是否公平FIFO:true 獲取當前可用的許可證數量:開始---5 獲取當前可用的許可證數量:acquire 1 個---4 獲取當前可用的許可證數量:release 1 個---5 獲取當前可用的許可證數量:acquire 2 個---3 獲取當前可用的許可證數量:release 1 個---5 是否有正在等待許可證的線程:false 正在等待許可證的隊列長度(線程數量):0 Thread-0獲取了剩余的全部5個許可證. 獲取當前可用的許可證數量:drain 剩余的---0 是否有正在等待許可證的線程:true 正在等待許可證的隊列長度(線程數量):1 獲取當前可用的許可證數量:drain 剩余的---0 是否有正在等待許可證的線程:true 正在等待許可證的隊列長度(線程數量):2 Thread-0釋放了5個許可證. Thread-2獲取了2個許可證. Thread-1獲取了1個許可證. Thread-1釋放了1個許可證. Thread-2釋放了2個許可證. 獲取當前可用的許可證數量:---5 是否有正在等待許可證的線程:false 正在等待許可證的隊列長度(線程數量):0
4.Semaphore應用場景-實例
Semaphore經常用於限制獲取某種資源的線程數量。
場景說明:
- 模擬學校食堂的窗口打飯過程
- 學校食堂有2個打飯窗口
- 學校中午有20個學生 按次序 排隊打飯
- 每個人打飯時耗費時間不一樣
- 有的學生耐心很好,他們會一直等待直到打到飯
- 有的學生耐心不好,他們等待時間超過了心里預期,就不再排隊,而是回宿舍吃泡面了
- 有的學生耐心很好,但是突然接到通知,說是全班聚餐,所以也不用再排隊,而是去吃大餐了
重點分析
- 食堂有2個打飯窗口:需要定義一個permits=2的Semaphore對象。
- 學生 按次序 排隊打飯:此Semaphore對象是公平的。
- 有20個學生:定義20個學生線程。
- 打到飯的學生:調用了acquireUninterruptibly()方法,無法被中斷
- 吃泡面的學生:調用了tryAcquire(timeout,TimeUnit)方法,並且等待時間超時了
- 吃大餐的學生:調用了acquire()方法,並且被中斷了
實例代碼:
定義2個窗口的食堂
/** * 打飯窗口 * 2: 2個打飯窗口 * true:公平隊列-FIFO */
static Semaphore semaphore = new Semaphore(2, true);
定義打飯學生
/** * <p>打飯學生</p> * * @author hanchao 2018/3/31 19:45 **/
static class Student implements Runnable { private static final Logger LOGGER = Logger.getLogger(Student.class); //學生姓名
private String name; //打飯許可
private Semaphore semaphore; /** * 打飯方式 * 0 一直等待直到打到飯 * 1 等了一會不耐煩了,回宿舍吃泡面了 * 2 打飯中途被其他同學叫走了,不再等待 */
private int type; public Student(String name, Semaphore semaphore, int type) { this.name = name; this.semaphore = semaphore; this.type = type; } /** * <p>打飯</p> * * @author hanchao 2018/3/31 19:49 **/ @Override public void run() { //根據打飯情形分別進行不同的處理
switch (type) { //打飯時間 //這個學生很有耐心,它會一直排隊直到打到飯
case 0: //排隊
semaphore.acquireUninterruptibly(); //進行打飯
try { Thread.sleep(RandomUtils.nextLong(1000, 3000)); } catch (InterruptedException e) { e.printStackTrace(); } //將打飯機會讓后后面的同學
semaphore.release(); //打到了飯
LOGGER.info(name + " 終於打到了飯."); break; //這個學生沒有耐心,等了1000毫秒沒打到飯,就回宿舍泡面了
case 1: //排隊
try { //如果等待超時,則不再等待,回宿舍吃泡面
if (semaphore.tryAcquire(RandomUtils.nextInt(6000, 16000), TimeUnit.MILLISECONDS)) { //進行打飯
try { Thread.sleep(RandomUtils.nextLong(1000, 3000)); } catch (InterruptedException e) { e.printStackTrace(); } //將打飯機會讓后后面的同學
semaphore.release(); //打到了飯
LOGGER.info(name + " 終於打到了飯."); } else { //回宿舍吃泡面
LOGGER.info(name + " 回宿舍吃泡面."); } } catch (InterruptedException e) { //e.printStackTrace();
} break; //這個學生也很有耐心,但是他們班突然宣布聚餐,它只能放棄打飯了
case 2: //排隊
try { semaphore.acquire(); //進行打飯
try { Thread.sleep(RandomUtils.nextLong(1000, 3000)); } catch (InterruptedException e) { //e.printStackTrace();
} //將打飯機會讓后后面的同學
semaphore.release(); //打到了飯
LOGGER.info(name + " 終於打到了飯."); } catch (InterruptedException e) { //e.printStackTrace(); //被叫去聚餐,不再打飯
LOGGER.info(name + " 全部聚餐,不再打飯."); } break; default: break; } } }
編寫食堂打飯過程:
/** * <p>食堂打飯</p> * * @author hanchao 2018/3/31 21:13 **/
public static void main(String[] args) throws InterruptedException { //101班的學生
Thread[] students101 = new Thread[5]; for (int i = 0; i < 20; i++) { //前10個同學都在耐心的等待打飯
if (i < 10) { new Thread(new Student("打飯學生" + i, SemaphoreDemo.semaphore, 0)).start(); } else if (i >= 10 && i < 15) {//這5個學生沒有耐心打飯,只會等1000毫秒
new Thread(new Student("泡面學生" + i, SemaphoreDemo.semaphore, 1)).start(); } else {//這5個學生沒有耐心打飯
students101[i - 15] = new Thread(new Student("聚餐學生" + i, SemaphoreDemo.semaphore, 2)); students101[i - 15].start(); } } // Thread.sleep(5000); for (int i = 0; i < 5; i++) { students101[i].interrupt(); } }
運行結果:
2018-04-01 21:13:16 INFO - 打飯學生1 終於打到了飯. 2018-04-01 21:13:16 INFO - 打飯學生0 終於打到了飯. 2018-04-01 21:13:18 INFO - 打飯學生2 終於打到了飯. 2018-04-01 21:13:18 INFO - 打飯學生3 終於打到了飯. 2018-04-01 21:13:19 INFO - 聚餐學生15 全部聚餐,不再打飯. 2018-04-01 21:13:19 INFO - 聚餐學生19 全部聚餐,不再打飯. 2018-04-01 21:13:19 INFO - 聚餐學生17 全部聚餐,不再打飯. 2018-04-01 21:13:19 INFO - 聚餐學生18 全部聚餐,不再打飯. 2018-04-01 21:13:19 INFO - 聚餐學生16 全部聚餐,不再打飯. 2018-04-01 21:13:19 INFO - 打飯學生4 終於打到了飯. 2018-04-01 21:13:20 INFO - 打飯學生5 終於打到了飯. 2018-04-01 21:13:21 INFO - 泡面學生13 回宿舍吃泡面. 2018-04-01 21:13:21 INFO - 泡面學生11 回宿舍吃泡面. 2018-04-01 21:13:21 INFO - 打飯學生7 終於打到了飯. 2018-04-01 21:13:22 INFO - 打飯學生6 終於打到了飯. 2018-04-01 21:13:23 INFO - 打飯學生9 終於打到了飯. 2018-04-01 21:13:24 INFO - 打飯學生8 終於打到了飯. 2018-04-01 21:13:24 INFO - 泡面學生10 終於打到了飯. 2018-04-01 21:13:26 INFO - 泡面學生14 終於打到了飯. 2018-04-01 21:13:26 INFO - 泡面學生12 終於打到了飯.
