建議126:適時選擇不同的線程池來實現
Java的線程池實現從根本上來說只有兩個:ThreadPoolExecutor類和ScheduledThreadPoolExecutor類,這兩個類還是父子關系,但是Java為了簡化並行計算,還提供了一個Exceutors的靜態類,它可以直接生成多種不同的線程池執行器,比如單線程執行器、帶緩沖功能的執行器等,但歸根結底還是使用ThreadPoolExecutor類或ScheduledThreadPoolExecutor類的封裝類。
為了理解這些執行器,我們首先來看看ThreadPoolExecutor類,其中它復雜的構造函數可以很好的理解線程池的作用,代碼如下:
public class ThreadPoolExecutor extends AbstractExecutorService { // 最完整的構造函數 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 檢驗輸入條件 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 檢驗運行環境 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } }
這是ThreadPoolExecutor最完整的構造函數,其他的構造函數都是引用該構造函數實現的,我們逐步來解釋這些參數的含義。
- corePoolSize:最小線程數。線程啟動后,在池中保持線程的最小數量。需要說明的是線程數量是逐步到達corePoolSize值的,例如corePoolSize被設置為10,而任務數量為5,則線程池中最多會啟動5個線程,而不是一次性的啟動10個線程。
- maximumPoolSize:最大線程數量。這是池中最大能容納的最大線程數量,如果超出,則使用RejectedExecutionHandler 拒絕策略處理。
- keepAliveTime:線程最大生命周期。這里的生命周期有兩個約束條件,一是該參數針對的是超過corePoolSize數量的線程。二是處於非運行狀態的線程。這么說吧,如果corePoolSize為10,maximumPoolSize為20,此時線程池中有15個線程正在運行,一段時間后,其中有3個線程處於等待狀態的時間超過了keepAliveTime指定的時間,則結束這3個線程,此時線程池中還有12個線程正在運行。
- unit:時間單位。這是keepAliveTime的時間單位,可以是納秒、毫秒、秒、分等選項。
- workQuene:任務隊列。當線程池中的線程都處於運行狀態,而此時任務數量繼續增加,則需要一個容器來容納這些任務,這就是任務隊列。
- threadFactory:線程工廠。定義如何啟動一個線程,可以設置線程名稱,並且可以確認是否是后台線程等。
- handler:拒絕任務處理器。由於超出線程數量和隊列容量而對繼續增加的任務進行處理的程序。
線程池的管理是這樣一個過程:首先創建線程池,然后根據任務的數量逐步將線程增大到corePoolSize數量,如果此時仍有任務增加,則放置到workQuene中,直到workQuene爆滿為止,然后繼續增加池中的數量(增強處理能力),最終達到maximumPoolSize,那如果此時還有任務增加進來呢?這就需要handler處理了,或者丟棄任務,或者拒絕新任務,或者擠占已有任務等。
在任務隊列和線程池都飽和的情況下,一但有線程處於等待(任務處理完畢,沒有新任務增加)狀態的時間超過keepAliveTime,則該線程終止,也就說池中的線程數量會逐漸降低,直至為corePoolSize數量為止。
我們可以把線程池想象為這樣一個場景:在一個生產線上,車間規定是可以有corePoolSize數量的工人,但是生產線剛建立時,工作不多,不需要那么多的人。隨着工作數量的增加,工人數量也逐漸增加,直至增加到corePoolSize數量為止。此時還有任務增加怎么辦呢?
好辦,任務排隊,corePoolSize數量的工人不停歇的處理任務,新增加的任務按照一定的規則存放在倉庫中(也就是我們的workQuene中),一旦任務增加的速度超過了工人處理的能力,也就是說倉庫爆滿時,車間就會繼續招聘工人(也就是擴大線程數),直至工人數量到達maximumPoolSize為止,那如果所有的maximumPoolSize工人都在處理任務時,而且倉庫也是飽和狀態,新增任務該怎么處理呢?這就會扔一個叫handler的專門機構去處理了,它要么丟棄這些新增的任務,要么無視,要么替換掉別的任務。
過了一段時間后,任務的數量逐漸減少,導致一部分工人處於待工狀態,為了減少開支(Java是為了減少系統的資源消耗),於是開始辭退工人,直至保持corePoolSize數量的工人為止,此時即使沒有工作,也不再辭退工人(池中的線程數量不再減少),這也是保證以后再有任務時能夠快速的處理。
明白了線程池的概念,我們再來看看Executors提供的幾個線程創建線程池的便捷方法:
- newSingleThreadExecutor:單線程池。顧名思義就是一個池中只有一個線程在運行,該線程永不超時,而且由於是一個線程,當有多個任務需要處理時,會將它們放置到一個無界阻塞隊列中逐個處理,它的實現代碼如下:
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
它的使用方法也很簡單,下面是簡單的示例:
public static void main(String[] args) throws ExecutionException, InterruptedException { // 創建單線程執行器 ExecutorService es = Executors.newSingleThreadExecutor(); // 執行一個任務 Future<String> future = es.submit(new Callable<String>() { @Override public String call() throws Exception { return ""; } }); // 獲得任務執行后的返回值 System.out.println("返回值:" + future.get()); // 關閉執行器 es.shutdown(); }
- newCachedThreadPool:緩沖功能的線程。建立了一個線程池,而且線程數量是沒有限制的(當然,不能超過Integer的最大值),新增一個任務即有一個線程處理,或者復用之前空閑的線程,或者重親啟動一個線程,但是一旦一個線程在60秒內一直處於等待狀態時(也就是一分鍾無事可做),則會被終止,其源碼如下:
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
這里需要說明的是,任務隊列使用了同步阻塞隊列,這意味着向隊列中加入一個元素,即可喚醒一個線程(新創建的線程或復用空閑線程來處理),這種隊列已經沒有隊列深度的概念了.
- newFixedThreadPool:固定線程數量的線程池。 在初始化時已經決定了線程的最大數量,若任務添加的能力超出了線程的處理能力,則建立阻塞隊列容納多余的任務,其源碼如下:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
上面返回的是一個ThreadPoolExecutor,它的corePoolSize和maximumPoolSize是相等的,也就是說,最大線程數量為nThreads。如果任務增長的速度非常快,超過了LinkedBlockingQuene的最大容量(Integer的最大值),那此時會如何處理呢?會按照ThreadPoolExecutor默認的拒絕策略(默認是DiscardPolicy,直接丟棄)來處理。
以上三種線程池執行器都是ThreadPoolExecutor的簡化版,目的是幫助開發人員屏蔽過得線程細節,簡化多線程開發。當需要運行異步任務時,可以直接通過Executors獲得一個線程池,然后運行任務,不需要關注ThreadPoolExecutor的一系列參數是什么含義。當然,有時候這三個線程不能滿足要求,此時則可以直接操作ThreadPoolExecutor來實現復雜的多線程計算。可以這樣比喻,newSingleThreadExecutor、newCachedThreadPool、newFixedThreadPool是線程池的簡化版,而ThreadPoolExecutor則是旗艦版___簡化版容易操作,需要了解的知識相對少些,方便使用,而旗艦版功能齊全,適用面廣,難以駕馭。
建議127:Lock與synchronized是不一樣的
很多編碼者都會說,Lock類和synchronized關鍵字用在代碼塊的並發性和內存上時語義是一樣的,都是保持代碼塊同時只有一個線程執行權。這樣的說法只說對了一半,我們以一個任務提交給多個線程為例,來看看使用顯示鎖(Lock類)和內部鎖(synchronized關鍵字)有什么不同,首先定義一個任務:
class Task { public void doSomething() { try { // 每個線程等待2秒鍾,注意此時線程的狀態轉變為Warning狀態 Thread.sleep(2000); } catch (Exception e) { // 異常處理 } StringBuffer sb = new StringBuffer(); // 線程名稱 sb.append("線程名稱:" + Thread.currentThread().getName()); // 運行時間戳 sb.append(",執行時間: " + Calendar.getInstance().get(Calendar.SECOND) + "s"); System.out.println(sb); } }
該類模擬了一個執行時間比較長的計算,注意這里是模擬方式,在使用sleep方法時線程的狀態會從運行狀態轉變為等待狀態。該任務具備多線程能力時必須實現Runnable接口,我們分別建立兩種不同的實現機制,先看顯示鎖實現:
class TaskWithLock extends Task implements Runnable { // 聲明顯示鎖 private final Lock lock = new ReentrantLock(); @Override public void run() { try { // 開始鎖定 lock.lock(); doSomething(); } finally { // 釋放鎖 lock.unlock(); } } }
這里有一點需要說明,顯示鎖的鎖定和釋放必須放在一個try......finally塊中,這是為了確保即使出現異常也能正常釋放鎖,保證其它線程能順利執行。
內部鎖的處理也非常簡單,代碼如下:
//內部鎖任務 class TaskWithSync extends Task implements Runnable{ @Override public void run() { //內部鎖 synchronized("A"){ doSomething(); } } }
這兩個任務看着非常相似,應該能夠產生相同的結果吧?我們建立一個模擬場景,保證同時有三個線程在運行,代碼如下:
public class Client127 { public static void main(String[] args) throws Exception { // 運行顯示任務 runTasks(TaskWithLock.class); // 運行內部鎖任務 runTasks(TaskWithSync.class); } public static void runTasks(Class<? extends Runnable> clz) throws Exception { ExecutorService es = Executors.newCachedThreadPool(); System.out.println("***開始執行 " + clz.getSimpleName() + " 任務***"); // 啟動3個線程 for (int i = 0; i < 3; i++) { es.submit(clz.newInstance()); } // 等待足夠長的時間,然后關閉執行器 TimeUnit.SECONDS.sleep(10); System.out.println("---" + clz.getSimpleName() + " 任務執行完畢---\n"); // 關閉執行器 es.shutdown(); } }
按照一般的理解,Lock和synchronized的處理方式是相同的,輸出應該沒有差別,但是很遺憾的是,輸出差別其實很大。輸出如下:
***開始執行 TaskWithLock 任務***
線程名稱:pool-1-thread-2,執行時間: 55s
線程名稱:pool-1-thread-1,執行時間: 55s
線程名稱:pool-1-thread-3,執行時間: 55s
---TaskWithLock 任務執行完畢---
***開始執行 TaskWithSync 任務***
線程名稱:pool-2-thread-1,執行時間: 5s
線程名稱:pool-2-thread-3,執行時間: 7s
線程名稱:pool-2-thread-2,執行時間: 9s
---TaskWithSync 任務執行完畢---
注意看運行的時間戳,顯示鎖是同時運行的,很顯然pool-1-thread-1線程執行到sleep時,其它兩個線程也會運行到這里,一起等待,然后一起輸出,這還具有線程互斥的概念嗎?
而內部鎖的輸出則是我們預期的結果,pool-2-thread-1線程在運行時其它線程處於等待狀態,pool-2-threda-1執行完畢后,JVM從等待線程池中隨機獲的一個線程pool-2-thread-3執行,最后執行pool-2-thread-2,這正是我們希望的。
現在問題來了:Lock鎖為什么不出現互斥情況呢?
這是因為對於同步資源來說(示例中的代碼塊)顯示鎖是對象級別的鎖,而內部鎖是類級別的鎖,也就說說Lock鎖是跟隨對象的,synchronized鎖是跟隨類的,更簡單的說把Lock定義為多線程類的私有屬性是起不到資源互斥作用的,除非是把Lock定義為所有線程的共享變量。都說代碼是最好的解釋語言,我們來看一個Lock鎖資源的代碼:
public static void main(String[] args) { // 多個線程共享鎖 final Lock lock = new ReentrantLock(); // 啟動三個線程 for (int i = 0; i < 3; i++) { new Thread(new Runnable() { @Override public void run() { try { lock.lock(); // 休眠2秒鍾 Thread.sleep(2000); System.out.println(Thread.currentThread().getName()); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } }).start(); } }
執行時,會發現線程名稱Thread-0、Thread-1、Thread-2會逐漸輸出,也就是一個線程在執行時,其它線程就處於等待狀態。注意,這里三個線程運行的實例對象是同一個類。
除了這一點不同之外,顯示鎖和內部鎖還有什么區別呢?還有以下4點不同:
- Lock支持更細精度的鎖控制:假設讀寫鎖分離,寫操作時不允許有讀寫操作存在,而讀操作時讀寫可以並發執行,這一點內部鎖就很難實現。顯示鎖的示例代碼如下:
class Foo { // 可重入的讀寫鎖 private final ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); // 讀鎖 private final Lock r = rwl.readLock(); // 寫鎖 private final Lock w = rwl.writeLock(); // 多操作,可並發執行 public void read() { try { r.lock(); Thread.sleep(1000); System.out.println("read......"); } catch (InterruptedException e) { e.printStackTrace(); } finally { r.unlock(); } } // 寫操作,同時只允許一個寫操作 public void write() { try { w.lock(); Thread.sleep(1000); System.out.println("write....."); } catch (InterruptedException e) { e.printStackTrace(); } finally { w.unlock(); } } }
可以編寫一個Runnable實現類,把Foo類作為資源進行調用(注意多線程是共享這個資源的),然后就會發現這樣的現象:讀寫鎖允許同時有多個讀操作但只允許一個寫操作,也就是當有一個寫線程在執行時,所有的讀線程都會阻塞,直到寫線程釋放鎖資源為止,而讀鎖則可以有多個線程同時執行。
2.Lock鎖是無阻塞鎖,synchronized是阻塞鎖
當線程A持有鎖時,線程B也期望獲得鎖,此時,如果程序中使用的顯示鎖,則B線程為等待狀態(在通常的描述中,也認為此線程被阻塞了),若使用的是內部鎖則為阻塞狀態。
3.Lock可實現公平鎖,synchronized只能是非公平鎖
什么叫非公平鎖呢?當一個線程A持有鎖,而線程B、C處於阻塞(或等待)狀態時,若線程A釋放鎖,JVM將從線程B、C中隨機選擇一個持有鎖並使其獲得執行權,這叫非公平鎖(因為它拋棄了先來后到的順序);若JVM選擇了等待時間最長的一個線程持有鎖,則為公平鎖(保證每個線程的等待時間均衡)。需要注意的是,即使是公平鎖,JVM也無法准確做到" 公平 ",在程序中不能以此作為精確計算。
顯示鎖默認是非公平鎖,但可以在構造函數中加入參數為true來聲明出公平鎖,而synchronized實現的是非公平鎖,他不能實現公平鎖。
4.Lock是代碼級的,synchronized是JVM級的
Lock是通過編碼實現的,synchronized是在運行期由JVM釋放的,相對來說synchronized的優化可能性高,畢竟是在最核心的部分支持的,Lock的優化需要用戶自行考慮。
顯示鎖和內部鎖的功能各不相同,在性能上也稍有差別,但隨着JDK的不斷推進,相對來說,顯示鎖使用起來更加便利和強大,在實際開發中選擇哪種類型的鎖就需要根據實際情況考慮了:靈活、強大選擇lock,快捷、安全選擇synchronized.
建議128:預防線程死鎖
線程死鎖(DeadLock)是多線程編碼中最頭疼的問題,也是最難重現的問題,因為Java是單進程的多線程語言,一旦線程死鎖,則很難通過外科手術的方法使其起死回生,很多時候只有借助外部進程重啟應用才能解決問題,我們看看下面的多線程代碼是否會產生死鎖:
class Foo implements Runnable { @Override public void run() { fun(10); } // 遞歸方法 public synchronized void fun(int i) { if (--i > 0) { for (int j = 0; j < i; j++) { System.out.print("*"); } System.out.println(i); fun(i); } } }
注意fun方法是一個遞歸函數,而且還加上了synchronized關鍵字,它保證同時只有一個線程能夠執行,想想synchronized關鍵字的作用:當一個帶有synchronized關鍵字的方法在執行時,其他synchronized方法會被阻塞,因為線程持有該對象的鎖,比如有這樣的代碼:
class Foo1 { public synchronized void m1() { try { Thread.sleep(1000); } catch (InterruptedException e) { // 異常處理 } System.out.println("m1方法執行完畢"); } public synchronized void m2() { System.out.println("m2方法執行完畢"); } }
相信大家都明白,先輸出"m1執行完畢",然后再輸出"m2"執行完畢,因為m1方法在執行時,線程t持有foo對象的鎖,要想主線程獲得m2方法的執行權限就必須等待m1方法執行完畢,也就是釋放當前鎖。明白了這個問題,我們思考一下上例中帶有synchronized的遞歸方法是否能執行?會不會產生死鎖?運行結果如下:
*********9
********8
*******7
******6
*****5
****4
***3
**2
*1
一個倒三角形,沒有產生死鎖,正常執行,這是為何呢?很奇怪,是嗎?那是因為在運行時當前線程(Thread-0)獲得了Foo對象的鎖(synchronized雖然是標注在方法上的,但實際作用是整個對象),也就是該線程持有了foo對象的鎖,所以它可以多次重如fun方法,也就是遞歸了。可以這樣來思考該問題,一個包廂有N把鑰匙,分別由N個海盜持有 (也就是我們Java的線程了),但是同一時間只能由一把鑰匙打開寶箱,獲取寶物,只有在上一個海盜關閉了包廂(釋放鎖)后,其它海盜才能繼續打開獲取寶物,這里還有一個規則:一旦一個海盜打開了寶箱,則該寶箱內的所有寶物對他來說都是開放的,即使是“ 寶箱中的寶箱”(即內箱)對他也是開放的。可以用如下代碼來表示:
class Foo2 implements Runnable{ @Override public void run() { method1(); } public synchronized void method1(){ method2(); } public synchronized void method2(){ //doSomething } }
方法method1是synchronized修飾的,方法method2也是synchronized修飾的,method1和method2方法重入完全是可行的,此種情況下會不會產生死鎖。
那什么情況下回產生死鎖呢?看如下代碼:
class A { public synchronized void a1(B b) { String name = Thread.currentThread().getName(); System.out.println(name + " 進入A.a1()"); try { // 休眠一秒 仍持有鎖 Thread.sleep(1000); } catch (Exception e) { // 異常處理 } System.out.println(name + " 試圖訪問B.b2()"); b.b2(); } public synchronized void a2() { System.out.println("進入a.a2()"); } } class B { public synchronized void b1(A a) { String name = Thread.currentThread().getName(); System.out.println(name + " 進入B.b1()"); try { // 休眠一秒 仍持有鎖 Thread.sleep(1000); } catch (Exception e) { // 異常處理 } System.out.println(name + " 試圖訪問A.a2()"); a.a2(); } public synchronized void b2() { System.out.println("進入B.b2()"); } }
public static void main(String[] args) throws InterruptedException { final A a = new A(); final B b = new B(); // 線程A new Thread(new Runnable() { @Override public void run() { a.a1(b); } }, "線程A").start(); // 線程B new Thread(new Runnable() { @Override public void run() { b.b1(a); } }, "線程B").start(); }
此段程序定義了兩個資源A和B,然后在兩個線程A、B中使用了該資源,由於兩個資源之間交互操作,並且都是同步方法,因此在線程A休眠一秒鍾后,它會試圖訪問資源B的b2方法。但是B線程持有該類的鎖,並同時在等待A線程釋放其鎖資源,所以此時就出現了兩個線程在互相等待釋放資源的情況,也就是死鎖了,運行結果如下:
線程A 進入A.a1()
線程B 進入B.b1()
線程A 試圖訪問B.b2()
線程B 試圖訪問A.a2()
此種情況下,線程A和線程B會一直等下去,直到有外界干擾為止,比如終止一個線程,或者某一線程自行放棄資源的爭搶,否則這兩個線程就始終處於死鎖狀態了。我們知道達到線程死鎖需要四個條件:
- 互斥條件:一個資源每次只能被一個線程使用
- 資源獨占條件:一個線程因請求資源在未使用完之前,不能強行剝奪
- 不剝奪條件:線程已經獲得的資源在未使用完之前,不能強行剝奪
- 循環等待條件:若干線程之間形成一種頭尾相接的循環等待資源關系
只有滿足了這些條件才能產生線程死鎖,這也同時告誡我們如果要解決線程死鎖問題,就必須從這四個條件入手,一般情況下可以按照以下兩種方案解決:
(1)、避免或減少資源共享
一個資源被多個線程共享,若采用了同步機制,則產生死鎖的可能性大,特別是在項目比較龐大的情況下,很難杜絕死鎖,對此最好的解決辦法就是減少資源共享。
例如一個B/S結構的辦公系統可以完全忽略資源共享,這是因為此類系統有三個特征:一是並發訪問不會太高,二是讀操作多於寫操作,三是數據質量要求比較低,因此即使出現數據資源不同步的情況也不可能產生太大影響,完全可以不使用同步技術。但是如果是一個支付清算系統就必須慎重考慮資源同步問題了,因為此系統一是數據質量要求非常高(如果產生數據不同步的情況那可是重大生產事故),二是並發量大,不設置數據同步則會產生非常多的運算邏輯失效的情況,這會導致交易失敗,產生大量的"臟數據",系統可靠性大大降低。
(2)、使用自旋鎖
回到前面的例子,線程A在等待線程B釋放資源,而線程B又在等待線程A釋放資源,僵持不下,那如果線程B設置了超時時間是不是就可以解決該死鎖問題了呢?比如線程B在等待2秒后還是無法獲得資源,則自行終結該任務,代碼如下:
public void b2() { try { // 立刻獲得鎖,或者2秒等待鎖資源 if (lock.tryLock(2, TimeUnit.SECONDS)) { System.out.println("進入B.b2()"); } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }
上面的代碼中使用tryLock實現了自旋鎖(Spin Lock),它跟互斥鎖一樣,如果一個執行單元要想訪問被自旋鎖保護的共享資源,則必須先得到鎖,在訪問完共享資源后,也必須釋放鎖。如果在獲取自旋鎖時,沒有任何執行單元保持該鎖,那么將立即得到鎖;如果在獲取自旋鎖時已經有保持者,那么獲取鎖操作將"自旋" 在哪里,直到該自旋鎖的保持者釋放了鎖為止,在我們的例子中就是線程A等待線程B釋放鎖,在2秒內 不斷嘗試是否能夠獲得鎖,達到2秒后還未獲得鎖資源,線程A則結束運行,線程B將獲得資源繼續執行,死鎖解除。
對於死鎖的描述最經典的案例是哲學家進餐(五位哲學家圍坐在圓形餐桌旁,人手一根筷子,做一下兩件事情:吃飯和思考。要求吃東西的時候停止思考,思考的時候停止吃東西,而且必須使用兩根筷子才能吃東西),解決此問題的方法很多,比如引入服務生(資源地調度)、資源分級等方法都可以很好的解決此類死鎖問題。在我們Java多線程並發編程中,死鎖很難避免,也不容易預防,對付它的最好方法就是測試:提高測試覆蓋率,建立有效的邊界測試,加強資源監控,這些方法能使得死鎖無可遁形,即使發生了死鎖現象也能迅速查到原因,提高系統性能。