最近在看一本書《Java並發編程 核心方法與框架》,打算一邊學習一邊把學習的經驗記下來,所粘貼的代碼都是我運行過的,大家一起學習,歡迎吐槽。
估計也沒多少人看我的博客,哈哈,那么我還是會記下來,天空不曾留下我的痕跡,但我已飛過,而在博客園留下了我的痕跡~
1、Semaphore的初步使用
Semaphore是什么,能做什么?
Semaphore 是 synchronized 的加強版,作用是控制線程的並發數量。就這一點而言,單純的synchronized 關鍵字是實現不了的。
直接看例子吧,這個例子包含3個類,一個是線程類,一個是 Semaphore 關鍵代碼類,一個類是主main方法類:
package com.cd.concurrent.semaphore; public class MyThread extends Thread { private SemaphoreService service; public MyThread(String name, SemaphoreService service) { super(); this.setName(name); this.service = service; } @Override public void run() { this.service.doSomething(); } }
package com.cd.concurrent.semaphore; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Semaphore; public class SemaphoreService { private static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); private Semaphore semaphore = new Semaphore(1);// 同步關鍵類,構造方法傳入的數字是多少,則同一個時刻,只運行多少個進程同時運行制定代碼 public void doSomething() { try { /** * 在 semaphore.acquire() 和 semaphore.release()之間的代碼,同一時刻只允許制定個數的線程進入, * 因為semaphore的構造方法是1,則同一時刻只允許一個線程進入,其他線程只能等待。 * */ semaphore.acquire(); System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr()); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } public static String getFormatTimeStr() { return sf.format(new Date()); } }
package com.cd.concurrent.semaphore; public class SemaphoreTest { public static void main(String args[]) { SemaphoreService service = new SemaphoreService(); for (int i = 0; i < 10; i++) { MyThread t = new MyThread("thread" + (i + 1), service); t.start();// 這里使用 t.run() 也可以運行,但是不是並發執行了 } } }
運行結果:
實踐證明,確實是同一個時刻只有一個線程能訪問,那如果把 Semaphore 的構造方法入參改成 2 呢,修改 SemaphoreService.java 文件:
package com.cd.concurrent.semaphore; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Semaphore; public class SemaphoreService { private static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); private Semaphore semaphore = new Semaphore(2);// 同步關鍵類,構造方法傳入的數字是多少,則同一個時刻,只運行多少個進程同時運行制定代碼 public void doSomething() { try { /** * 在 semaphore.acquire() 和 semaphore.release()之間的代碼,同一時刻只允許制定個數的線程進入, * 因為semaphore的構造方法是2,則同一時刻只允許2個線程進入,其他線程等待。 * */ semaphore.acquire(); System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr()); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } public static String getFormatTimeStr() { return sf.format(new Date()); } }
運行SemaphoreTest,結果如下:
驗證OK
2、方法 acquire( int permits ) 參數作用,及動態添加 permits 許可數量
acquire( int permits ) 中的參數是什么意思呢?可以這么理解, new Semaphore(6) 表示初始化了 6個通路, semaphore.acquire(2) 表示每次線程進入將會占用2個通路,semaphore.release(2) 運行時表示歸還2個通路。沒有通路,則線程就無法進入代碼塊。
而上面的代碼中,semaphore.acquire() + semaphore.release() 在運行的時候,其實和 semaphore.acquire(1) + semaphore.release(1) 效果是一樣的。
上代碼:
還是3個代碼,線程類沒有變,用的是上面的線程類,重新寫了另外兩個類:
package com.cd.concurrent.semaphore; import java.util.concurrent.Semaphore; public class SemaphoreService2 extends SemaphoreService { // 之所以繼承 SemaphoreService,僅僅是為了使用父類的打印時間的方法 0.0 private Semaphore semaphore = new Semaphore(6);// 6表示總共有6個通路 public void doSomething() { try { semaphore.acquire(2); // 2 表示進入此代碼,就會消耗2個通路,2個通路從6個中扣除 System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr()); semaphore.release(2); // 釋放占用的 2 個通路 } catch (InterruptedException e) { e.printStackTrace(); } } public int availablePermits() { // 查看可用通路數 return semaphore.availablePermits(); } }
package com.cd.concurrent.semaphore; public class SemaphoreTest2 { public static void main(String args[]) { SemaphoreService2 service = new SemaphoreService2(); // 使用總 6 通路,每個線程占用2通路 for (int i = 0; i < 10; i++) { MyThread t = new MyThread("thread" + (i + 1), service); t.start();// 這里使用 t.run() 也可以運行,但是不是並發執行了 System.out.println("可用通路數:" + service.availablePermits()); } } }
運行結果:
如果 acquire 的數量大於 release 的數量,則 通路遲早會被使用完,如果線程比較多,得不到后續運行,出現線程堆積內存,最終java進程崩掉;如果 acquire 的數量小於 release 的數量,就會出現並發執行的線程越來越多(換句話說,處理越來越快),最終也有可能出現問題。
比如,象上面的代碼,SemaphoreService2.java 中 semaphore.release(2) 如果改成 semaphore.release(1) 則 就會出現有5個線程得不到運行堆積的情況,可以算一下:6-2-2-2+1+1+1=3,運行完一個回合后,還剩3個通路,3-2+1,第二回合,還剩2個通路,2-2+1=1,第3個回合,還剩一個通路,不足以運行任何一個線程。
把上面說的用代碼實現一下,修改 SemaphoreService2.java 如下:
package com.cd.concurrent.semaphore; import java.util.concurrent.Semaphore; public class SemaphoreService2 extends SemaphoreService { // 之所以繼承 SemaphoreService,僅僅是為了使用父類的打印時間的方法 0.0 private Semaphore semaphore = new Semaphore(6);// 6表示總共有6個通路 public void doSomething() { try { semaphore.acquire(2); // 2 表示進入此代碼,就會消耗2個通路,2個通路從6個中扣除 System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr()); semaphore.release(1); // 釋放占用的 1 個通路 } catch (InterruptedException e) { e.printStackTrace(); } } public int availablePermits() { return semaphore.availablePermits(); } }
運行 SemaphoreTest2 結果:
3、acquire 的不可中斷實現
仔細看一下上面的代碼,semaphore.acquire() 和 semaphore.acquire(int permits) 是會拋出異常 InterruptedException 的,如果在 acquire 和 release 之間的代碼是一個比較慢和復制的運算,如內存占用過多,或者棧深度很深等,jvm會中斷這塊代碼。
如何才能不讓 jvm 中斷 代碼執行呢?
答案是:使用 acquireUninterruptibly() 替換acquire()、使用 acquireUninterruptibly(int permits) 替換 acquire(int permits) 。
acquireUninterruptibly 不會拋出 InterruptedException ,一個代碼塊一時執行不完,還會繼續等待執行。
個人覺得,不要隨便使用 acquireUninterruptibly ,因為 jvm 中斷執行,是自身的一種自我保護機制,保證 java 進程的正常,除了特殊情況必須用 acquireUninterruptibly 外,都應該 使用 acquire ,同時,改進一下 SemaphoreService2 的 doSomething 方法,將 release 放到 finally 塊 中,如下。
public void doSomething() { try { semaphore.acquire(2); // 2 表示進入此代碼,就會消耗2個通路,2個通路從6個中扣除 System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr()); } catch (InterruptedException e) { e.printStackTrace(); } finally { semaphore.release(2); // release 放到 finally 中 } }
4、其他一些常有工具方法
availablePermits() 方法在前面用過,表示返回 Semaphore 對象中的當前可用許可數,此方法通常用於調試,因為許可數量(通路)可能是實時在改變的。
drainPermits() 方法可獲取並返回立即可用的所有許可(通路)個數,並將可用許可置為0。
getQueueLength() 獲取等待許可的線程個數。
hasQueuedThreads() 判斷有沒有線程在等待這個許可。
getQueueLength() 和 hasQueuedThreads() 都是在判斷當前有沒有等待許可的線程信息時使用。
這里就不寫代碼校驗了,你們可以在 SemaphoreService 或者 SemaphoreService2 中加入這個信息試一下。
5、線程公平性
上面用的 Semaphore 構造方法是 Semaphore semaphore = new Semaphore(int permits)
其實,還有一個構造方法: Semaphore semaphore = new Semaphore(int permits , boolean isFair)
isFair 的意思就是,是否公平,獲得鎖的順序與線程啟動順序有關,就是公平,先啟動的線程,先獲得鎖。isFair 不能100% 保證公平,只能是大概率公平。
isFair 為 true,則表示公平,先啟動的線程先獲得鎖。
6、方法 tryAcquire() 、 tryAcquire(int permits)、 tryAcquire(int permits , long timeout , TimeUint unit) 的使用:
tryAcquire 方法,是 acquire 的擴展版,tryAcquire 作用是嘗試得獲取通路,如果未傳參數,就是嘗試獲取一個通路,如果傳了參數,就是嘗試獲取 permits 個 通路 、在指定時間 timeout 內 嘗試 獲取 permits 個通路。
上代碼試試看:
3個類,線程類未變,以下是修改了的兩個類:
package com.cd.concurrent.semaphore; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; public class SemaphoreService3 extends SemaphoreService { // 之所以繼承 SemaphoreService,僅僅是為了使用父類的打印時間的方法 0.0 private Semaphore semaphore = new Semaphore(6, true);// 6表示總共有6個通路,true 表示公平 public void doSomething() { try { if (semaphore.tryAcquire(2, 3, TimeUnit.SECONDS)) { // 在 3秒 內 嘗試獲取 2 個通路 System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr() + ",當前是否有進程等待:" + semaphore.hasQueuedThreads() + ",等待進程數:" + semaphore.getQueueLength()); semaphore.release(2); // 釋放占用的 2 個通路 } else { System.out.println(Thread.currentThread().getName() + ":doSomething 沒有獲取到鎖-准備退出-" + getFormatTimeStr()); } } catch (InterruptedException e) { e.printStackTrace(); } } public int availablePermits() { return semaphore.availablePermits(); } }
package com.cd.concurrent.semaphore; public class SemaphoreTest3 { public static void main(String args[]) { SemaphoreService3 service = new SemaphoreService3(); // 使用總 6 通路,每個線程占用2通路,嘗試獲取鎖 for (int i = 0; i < 10; i++) { MyThread t = new MyThread("thread" + (i + 1), service); t.start(); } } }
SemaphoreTest3 運行結果:
7、多進路-多處理 vs 多進路-單處理
在上面的代碼中,我們之所以可以實現單處理,是因為在上面的所有線程都共有了同一個 Semaphore 來進行進程處理,那么如果 Semaphore 本身就是進程的一部分呢,會怎么樣呢?
比如,修改 第一個例子中的 SemaphoreTest 如下:
package com.cd.concurrent.semaphore; public class SemaphoreTest { public static void main(String args[]) { for (int i = 0; i < 10; i++) { SemaphoreService service = new SemaphoreService(); MyThread t = new MyThread("thread" + (i + 1), service); t.start();// 這里使用 t.run() 也可以運行,但是不是並發執行了 } } }
運行 SemaphoreTest 結果:
所有線程同時執行了。
如果 SemaphoreTest 類不進行修改,如何實現第一個例子 中的 單處理呢?
也簡單,修改 SemaphoreService ,代碼如下:
package com.cd.concurrent.semaphore; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Semaphore; public class SemaphoreService { private static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); private Semaphore semaphore = new Semaphore(2);// 同步關鍵類,構造方法傳入的數字是多少,則同一個時刻,只運行多少個進程同時運行制定代碼 public void doSomething() { try { /** * 在 semaphore.acquire() 和 semaphore.release()之間的代碼,同一時刻只允許制定個數的線程進入, * 因為semaphore的構造方法是1,則同一時刻只允許一個線程進入,其他線程只能等待。 * */ semaphore.acquire(); doSomethingMain(); // 將主要處理部分封裝成一個方法 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } private static synchronized void doSomethingMain() throws InterruptedException { System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr()); } public static String getFormatTimeStr() { return sf.format(new Date()); } }
注意:doSomethingMain() 方法必須是 static synchronized 的才行,因為 多線程調用的話,static 方法是類方法,這樣 synchronized 同步 才能針對整個類同步,否則 就只能針對單線程多個地方調用同步。
修改 SemaphoreService ,運行 SemaphoreTest 結果:
運行達到想要的效果。
這里,拋出一個問題,上面的代碼,不用 synchronized 實現,而使用 ReentrantLock 來實現,按理說會更好的,原因如下:
synchronized 是 jvm 層面的實現,ReentrantLock 是 jdk 層面的實現,synchronized 的缺點如下:
1)不能響應中斷;
2)同一時刻不管是讀還是寫都只能有一個線程對共享資源操作,其他線程只能等待
3)鎖的釋放由虛擬機來完成,不用人工干預,不過此即使缺點也是優點,優點是不用擔心會造成死鎖,缺點是由可能獲取到鎖的線程阻塞之后其他線程會一直等待,性能不高。
而lock接口的提出就是為了完善synchronized的不完美的,首先lock是基於jdk層面實現的接口,和虛擬機層面不是一個概念;其次對於lock對象中的多個方法的調用,可以靈活控制對共享資源變量的操作,不管是讀操作還是寫操作
那么上面的代碼如果使用 ReentrantLock 來實現,豈不是更好嗎?好,修改 SemaphoreService:
package com.cd.concurrent.semaphore; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.ReentrantLock; public class SemaphoreService { private static SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); private Semaphore semaphore = new Semaphore(2);// 同步關鍵類,構造方法傳入的數字是多少,則同一個時刻,只運行多少個進程同時運行制定代碼 private ReentrantLock lock = new ReentrantLock(); public void doSomething() { try { /** * 在 semaphore.acquire() 和 semaphore.release()之間的代碼,同一時刻只允許制定個數的線程進入, * 因為semaphore的構造方法是1,則同一時刻只允許一個線程進入,其他線程只能等待。 * */ semaphore.acquire(); lock.lock(); doSomethingMain(); // 將主要處理部分封裝成一個方法 semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } private void doSomethingMain() throws InterruptedException { System.out.println(Thread.currentThread().getName() + ":doSomething start-" + getFormatTimeStr()); Thread.sleep(2000); System.out.println(Thread.currentThread().getName() + ":doSomething end-" + getFormatTimeStr()); } public static String getFormatTimeStr() { return sf.format(new Date()); } }
運行 SemaphoreTest 結果:
和預期的不一樣呀,10個線程基本是同時執行了,那么問題出在哪里呢?
答案會公布到評論里哦~