Java並發編程實踐 目錄
並發編程 04—— 閉鎖CountDownLatch 與 柵欄CyclicBarrier
並發編程 06—— CompletionService : Executor 和 BlockingQueue
並發編程 10—— 任務取消 之 關閉 ExecutorService
並發編程 12—— 任務取消與關閉 之 shutdownNow 的局限性
並發編程 13—— 線程池的使用 之 配置ThreadPoolExecutor 和 飽和策略
概述
第1部分 問題引入
上一篇 並發編程—— 任務取消 中,PrimeGenerator 的取消機制最終會使得搜索素數的任務退出,但在退出過程中需要花費一定的時間。如果使用這種方法的任務調用了一個阻塞方法,例如BlockingQueue.put,那么可能會產生一個更嚴重的問題——任務可能永遠不會檢查取消標志,因此永遠不會結束。
在下面的程序中,BrokenPrimeProducer 就說明了這個問題。生產者線程生產素數,並將它們放入一個阻塞隊列。如果生產者的速度超過了消費者的處理速度,隊列將被填滿,put 方法也會阻塞。當生產者在put 方法中阻塞時,如果消費者希望取消生產者任務,那么將發生什么情況呢?它可以調用cancel 方法來設置cancelled標志,但此時生產者卻永遠不能檢查這個標志,因為它無法從阻塞的put 方法中恢復過來(因為消費者此時已經停止從隊列中取出素數,所以put方法將一直保持阻塞狀態)。
1 /** 2 * 7.3 不可靠的取消操作將把生產者置於阻塞的操作中 3 * @ClassName: BrokenPrimeProducer 4 * TODO 5 * @author Xingle 6 * @date 2014-9-30 上午9:55:56 7 */ 8 public class BrokenPrimeProducer extends Thread{ 9 10 private final BlockingQueue<BigInteger> queue; 11 private volatile boolean cancelled = false; 12 13 public BrokenPrimeProducer(BlockingQueue<BigInteger> queue){ 14 this.queue = queue; 15 } 16 17 public void run(){ 18 BigInteger p = BigInteger.ONE; 19 while(!cancelled){ 20 try { 21 queue.put(p= p.nextProbablePrime()); 22 System.out.println(Thread.currentThread().getName()+"生產數字:"+p); 23 } catch (InterruptedException e) { 24 System.out.println(Thread.currentThread().getName()+"線程中斷"); 25 } 26 } 27 } 28 29 public void cancel(){ 30 this.cancelled = true; 31 } 32 }
以上的測試程序:
1 public class BrokenPrimeProducer_Main { 2 3 public static void main(String[] args){ 4 BlockingQueue<BigInteger> queue = new ArrayBlockingQueue<>(3); 5 BrokenPrimeProducer producer = new BrokenPrimeProducer(queue); 6 producer.start(); 7 while(true){ 8 try { 9 System.out.println(Thread.currentThread().getName() 10 +"消費數據"+queue.take());// 從隊列取出一個數 11 TimeUnit.SECONDS.sleep(1);// 停止1s,顯示出消費速度慢於生產速度 12 producer.cancel();// 消費者請求停止生產 13 14 } catch (InterruptedException e) { 15 System.out.println("被中斷了"); 16 } 17 } 18 } 19 }
執行結果:
線程將不會停止,而是一直阻塞到這個地方
第2部分 中斷
2.1 中斷
下面是一個改進的例子,用中斷來進行線程的停止。
線程中斷是一種協作機制,線程可以通過這種機制來通知另一個線程,告訴它在合適的或者可能的情況下停止當前工作,並轉而執行其他的工作。
在java 的API 或語言規范中,並沒有將中斷與任何取消語義關聯起來,但實際上,如果在取消之外的其他操作中使用中斷,那么都是不合適的,並且很難支撐起更大的應用。
每個線程都有一個Boolean 類型的中斷狀態。當中斷線程時,這個線程的中斷狀態將被設置為true。在Thread 中包含了中斷線程以及查詢線程中斷狀態的方法。interrupt方法能中斷目標線程,而isInterrupt方法能返回目標線程的中斷狀態。靜態的intertupt 方法將清除當前線程的中斷狀態,並返回它之前的值,這也是清除中斷狀態的唯一方法。
public void interrupt() 中斷線程。 public boolean isInterrupted() 測試線程是否已經中斷。 public static boolean interrupted() 測試當前線程是否已經中斷。
調用interrupt並不意味着立即停止目標線程正在進行的工作,而只是傳遞了請求中斷的消息。
對中斷的正確理解是:它並不會真正地中斷一個正在運行的線程,而只是發出中斷請求,然后由線程在下一個合適的時刻中斷自己。(這些時刻也被稱為取消點)。在使用靜態的interruptd時應該小心,因為它會清除當前線程的中斷狀態。如果在調用interruptd時返回了true,那么除非你想屏蔽這個中斷,否則必須對它進行處理——可以拋出 InterruptedException,或者通過再次調用 interrupt 來恢復中斷狀態。
通常,中斷是實現取消的最合理方式。
BrokenPrimeProducer 中的問題很容易解決:使用中斷而不是boolean標志累請求取消,如下面的程序所示。
1 /** 2 * 7.5 通過中斷來取消 3 * 4 * @ClassName: PrimeProducer 5 * @author Xingle 6 * @date 2014-9-26 下午6:48:22 7 */ 8 public class PrimeProducer extends Thread { 9 10 private final BlockingQueue<BigInteger> queue; 11 12 PrimeProducer(BlockingQueue<BigInteger> queue) { 13 this.queue = queue; 14 } 15 16 public void run() { 17 try { 18 BigInteger p = BigInteger.ONE; 19 while (!Thread.currentThread().interrupted()) { 20 queue.put(p = p.nextProbablePrime()); 21 System.out.println(Thread.currentThread().getName() + " 生產數字 " + p); 22 } 23 } catch (InterruptedException e) { 24 /*允許線程退出*/ 25 System.out.println(Thread.currentThread().getName() + " 線程中斷了"); 26 System.out.println(Thread.currentThread().isInterrupted()); 27 } 28 } 29 30 public void cancel() { 31 interrupt(); 32 } 33 }
測試程序:
1 public class PrimeGeneratorMain { 2 3 public static void main(String[] args){ 4 PrimeGenerator generator = new PrimeGenerator(); 5 new Thread(generator).start(); 6 try { 7 Thread.sleep(100); 8 Thread.currentThread().interrupt(); 9 } catch (InterruptedException e) { 10 e.printStackTrace(); 11 } finally { 12 generator.cancel(); 13 } 14 List<BigInteger> ls = generator.get(); 15 for(int i= 0;i<ls.size();i++){ 16 System.out.println(ls.get(i)); 17 } 18 } 19 }
執行結果:
2.2 中斷策略
中斷策略規定線程如何解釋某個中斷請求——當發現中斷請求時,應該做哪些工作(如果需要的話),哪些工作單元對於中斷來說是原子操作,以及以多快的速度來響應中斷。
最合理的中斷策略是某種形式的線程級取消操作或服務級取消操作;盡快退出,在必要時進行清理,通知某個所有者該線程已經退出。
任務不會在其自己擁有的線程中執行,而是在某個服務(例如線程池)擁有的線程中執行。對於非線程所有者的代碼來說(例如,對於線程池而言,任何在線程池實現以外的代碼),應該小心地保存中斷狀態,這樣擁有線程的代碼才能對中斷做出響應,即使“非所有者”代碼也可以做出響應。
這就是為什么大多數可阻塞的庫函數都只是拋出 InterruptedException作出中斷響應。它們永遠不會在某個自己擁有的線程中運行,因此它們為任務或庫代碼實現了最合理的取消策略:盡快退出流程,並把中斷信息傳遞給調用者,從而使調用棧中的上層代碼可以采取進一步的操作。
任務不應該對執行該任務的線程的中斷策略做出任何假設,除非該任務被專門設計為在服務中運行,並且在這些服務中包含特定的中斷策略。無論任務把中斷視為取消,還是其他某個中斷響應操作,都應該小心地保存執行線程的中斷狀態。如果除了將 InterruptedException 傳遞給調用者外還需要執行其他操作,那么應該在捕獲 InterruptedException 之后恢復中斷狀態:
Thread.currentThread().interrupt();
正如任務代碼不應該對其執行所在的線程的中斷策略做出假設,執行取消操作的代碼也不應該對線程的中斷策略做出假設。線程應該只能由其所有者中斷,所有者可以將線程的中斷策略信心封裝到某個合適的取消機制中,例如關閉(shutdown)方法。
2.3 響應中斷
有兩種實用策略可用於處理 InterruptedException:
- 傳遞異常(可能在執行某個特定於任務的清除操作之后),從而使你的方法也成為可中斷的阻塞方法。
- 恢復中斷狀態,從而使調用棧中的上層代碼能夠對其進行處理。
傳遞 InterruptedException 與將 InterruptedException 添加到throws 子句中一樣容易,如下所示:
//將 InterruptedException 傳遞給調用者 BlockingQueue<Task> queue; public Task getNextTask() throws InterruptedException{ return queue.take(); }
如果不想或無法傳遞 InterruptedException (或許通過Runnable來定義任務),那么需要尋找另一種方式來保存中斷請求。一種標准的方法就是通過再次調用 interrupt 來恢復中斷狀態。
只有實現了線程中斷策略的代碼才可以屏蔽中斷請求,在常規的任務和庫代碼中都不應該屏蔽中斷請求。
對於一些不支持取消但仍可以調用可中斷阻塞方法的操作,它們必須在循環中調用這些方法,並在發現中斷后重新嘗試。在這種情況下,它們應該在本地保存中斷狀態,並在返回前恢復狀態而不是捕獲 InterruptException 時恢復狀態,如下所示:
1 /** 2 * 不可取消的任務在退出前恢復中斷 3 * 4 */ 5 public class NoncancelableTask { 6 public Task getNextTask(BlockingQueue<Task> queue) { 7 boolean interrupted = false; 8 try { 9 while (true) { 10 try { 11 return queue.take(); 12 } catch (InterruptedException e) { 13 interrupted = true; 14 // 重新嘗試 15 } 16 } 17 } finally { 18 if (interrupted) 19 Thread.currentThread().interrupt(); 20 } 21 } 22 23 interface Task { 24 } 25 }
2.4 實例——計時任務的取消
在上一節並發編程—— 任務取消 中 PrimeGeneratorMain 方法將啟動一個 PrimeGenerator ,並在100ms 后中斷。盡管PrimeGenerator 可能需要超過100ms 的時間才能停止,但它最終會發現中斷,然后停止,並使線程結束。在執行任務時的另一個方面是,你希望知道在任務執行過程中是否會拋出異常,如果 PrimeGenerator 在指定時限內拋出了一個未檢查的異常,那么這個異常可能會被忽略,因為素數生成器在另一個獨立的線程中運行,而這個線程並不會顯示地處理異常。
在下面的程序中,給出了在指定時間內運行一個任意的Runnable 的示例。它在調用線程中運行任務,並安排了一個取消任務,在運行指定的時間間隔后中斷它。這解決了從任務中拋出未檢查異常的問題,因為該異常會被 timedRun 的調用者捕獲。
1 /** 2 * 7.8 在外部線程中安排中斷(不要這樣做) 3 * @ClassName: TimedRun1 4 * @author Administrator 5 * @date 2014-10-20 下午2:50:29 6 */ 7 public class TimedRun1 { 8 9 private static final ScheduledExecutorService cancelExec = Executors.newScheduledThreadPool(1); 10 11 public static void timedRun(Runnable r,long timeout, TimeUnit unit) { 12 final Thread taskThread = Thread.currentThread(); 13 cancelExec.schedule(new Runnable() { 14 public void run() { 15 taskThread.interrupt(); 16 System.out.println("1--"+taskThread.isInterrupted()); 17 } 18 }, timeout, unit); 19 r.run(); 20 System.out.println("2--"+taskThread.isInterrupted()); 21 } 22 }
這是一種非常簡單的方法,但卻破壞了以下規則:在中斷線程之前,應該了解它的中斷策略。由於 timedRun 可以從任意一個線程中調用,因此它無法知道這個調用線程的中斷策略。如果任務在超時之前完成,那么中斷timedRun 所在線程的取消任務將在 timedRun 返回到調用者之后啟動。我們不知道在這種情況下將運行什么代碼,但結果一定是不好的。
測試程序:
1 public class TimedRun_Main { 2 public static void main(String[] args) { 3 TimedRun1 timeRun1 = new TimedRun1(); 4 Runnable run = new Runnable() { 5 6 @Override 7 public void run() { 8 int i = 0; 9 for (int j = 0; j < 100000000; j++) { 10 i++; 11 if (i % 10000000 == 0) { 12 System.out.println(i + " "+ Thread.currentThread().getName()); 13 } 14 } 15 } 16 }; 17 timeRun1.timedRun(run, 1, TimeUnit.MILLISECONDS); 18 } 19 }
執行結果:
而且,如果任務不響應中斷,那么 timedRun 會在任務結束時才返回,此時可能已經超過了指定的時限(或者還沒有超過時限)。如果某個限時運行的服務沒有在指定的時間內返回,那么將對調用者帶來負面影響。
在下面的程序中解決了最開始的異常處理問題以及上面解決方案中的問題。執行任務的線程擁有自己的執行策略,即使任務不響應中斷,限時運行的方法仍能返回到它的調用者。在啟動任務線程之后,timedRun 將執行一個限時的 join 方法。在join返回后,它將檢查任務中是否有異常拋出,如果有的話,則會在調用timedRun 的線程中再次拋出該異常。由於 Throwable 將在兩個線程之間共享,因此該變量被聲明為 volatile類型,從而確保安全地將其從任務線程發布到timedRun線程。
1 /** 2 * 7.9 在專門的線程中中斷任務 3 * @ClassName: TimedRun2 4 * @author Administrator 5 * @date 2014-10-17 下午7:41:19 6 */ 7 public class TimedRun2 { 8 private static final ScheduledExecutorService cancelExec = Executors 9 .newScheduledThreadPool(1); 10 11 public static void timedRun(final Runnable r, long timeout, TimeUnit unit) 12 throws InterruptedException { 13 class RethrowableTask implements Runnable { 14 private volatile Throwable t; 15 16 public void run() { 17 try { 18 r.run(); 19 } catch (Throwable t) { 20 this.t = t; 21 } 22 } 23 24 void rethrow() { 25 if (t != null) 26 try { 27 throw launderThrowable(t); 28 } catch (Exception e) { 29 e.printStackTrace(); 30 } 31 } 32 } 33 34 RethrowableTask task = new RethrowableTask(); 35 final Thread taskThread = new Thread(task); 36 taskThread.start(); 37 cancelExec.schedule(new Runnable() { 38 public void run() { 39 taskThread.interrupt(); 40 System.out.println("1--" + taskThread.isInterrupted()); 41 } 42 }, timeout, unit); 43 taskThread.join(unit.toMillis(timeout)); 44 task.rethrow(); 45 System.out.println("2--" + taskThread.isInterrupted()); 46 } 47 48 public static Exception launderThrowable(Throwable t) { 49 if (t instanceof RuntimeException) 50 return (RuntimeException) t; 51 else if (t instanceof Error) 52 throw (Error) t; 53 else 54 throw new IllegalStateException("Not unchecked", t); 55 } 56 }
執行同樣的程序:
1 public class TimedRun_Main { 2 public static void main(String[] args) { 3 TimedRun2 timeRun = new TimedRun2(); 4 Runnable run = new Runnable() { 5 6 @Override 7 public void run() { 8 int i = 0; 9 for (int j = 0; j < 100000000; j++) { 10 i++; 11 if (i % 10000000 == 0) { 12 System.out.println(i + " "+ Thread.currentThread().getName()); 13 } 14 } 15 } 16 }; 17 try { 18 timeRun.timedRun(run, 1, TimeUnit.MILLISECONDS); 19 } catch (InterruptedException e) { 20 e.printStackTrace(); 21 } 22 } 23 }
結果:
在這個示例的代碼中解決了前面示例中的問題,但由於它依賴一個限時的 join ,因此存在着join的不足,無法知道執行控制是因為線程正常退出而返回還是因為 join 超時而返回。
2.5 通過 Future 來實現取消
前面的例子都是直接使用runnable來執行本身,所以如果要取消任務的話只能使用wait join sleep與Interrupt來組合取消任務。
其實 Future 早已經提供這樣的功能 ,ExecutorService.submit 將返回一個 Future 來描述任務。Future 擁有一個cancel 方法,該方法帶有一個 boolean 類型的參數 mayinterruptIfRunning,表示取消操作是否成功。(這只是表示任務是否能接受中斷,而不是表示任務是否能檢測並處理中斷。)如果 mayinterruptIfRunning 為 true 並且任務當前正在某個線程中運行,那么這個線程能被中斷。如果這個參數為 false,那么意味着“若任務還沒有啟動,就不要運行它”,這種方式應該用於那些不處理中斷的任務中。
下面程序給出了另一個版本的 timedRun:將任務提交給一個 ExecutorService ,並通過一個定時的 Future.get 來獲取結果。如果 get 在返回時拋出一個 TimeoutException,那么任務將通過它的 Future 來取消。如果任務在被取消前就拋出一個異常,那么該異常將被重新拋出以便由調用者來處理異常。
/** * 7.10 通過Future 來取消任務 * */ public class TimedRun { private static final ExecutorService taskExec = Executors.newCachedThreadPool(); public static void timedRun(Runnable r, long timeout, TimeUnit unit) throws InterruptedException { Future<?> task = taskExec.submit(r); try { task.get(timeout, unit); } catch (TimeoutException e) { // task will be cancelled below } catch (ExecutionException e) { // exception thrown in task; rethrow throw launderThrowable(e.getCause()); } finally { // Harmless if task already completed task.cancel(true); // interrupt if running } } }
當Future.get 拋出InterruptedException 或 TimeoutException 時,如果你知道不再需要結果,那么就可以調用 Future.cancel 來取消。
實例:
1 /** 2 * 3 * @ClassName: Task 4 * TODO 5 * @author xingle 6 * @date 2014-10-22 下午12:10:49 7 */ 8 public class Task implements Callable<String>{ 9 10 //創建Task類,指定實現Callable接口,並參數化為String類型。 11 //實現call()方法,寫入一條信息到控制台,並使這個線程在循環中睡眠100毫秒。 12 @Override 13 public String call() throws Exception { 14 while (true) { 15 System.out.println("我在執行任務: Test 來自"+Thread.currentThread().getName()+"\n"); 16 Thread.sleep(100); 17 } 18 } 19 }
測試程序:
1 /** 2 * 通過 Future 來取消任務 3 * @ClassName: Task_Main 4 * TODO 5 * @author xingle 6 * @date 2014-10-22 下午12:11:53 7 */ 8 public class Task_Main { 9 public static final ScheduledExecutorService executor = Executors 10 .newScheduledThreadPool(1); 11 12 public static void main(String[] args) { 13 Task task = new Task(); 14 System.out.printf("Main: 開始\n"); 15 Future<String> future = executor.submit(task); 16 try { 17 future.get(300, TimeUnit.MILLISECONDS);//設置超時執行時間 18 } catch (InterruptedException e) { 19 e.printStackTrace(); 20 } catch (ExecutionException e) { 21 //如果在任務中拋出了異常,那么重新拋出該異常 22 throw launderThrowable(e.getCause()); 23 } catch (TimeoutException e) { 24 e.printStackTrace(); 25 //接下來任務將被取消 26 } finally { 27 System.out.printf("執行取消任務 \n"); 28 future.cancel(true);//如果任務正在運行,那么將被中斷 29 } 30 31 //將isCancelled()方法和isDone()的調用結果寫入控制台,驗證任務已取消,因此,已完成。 32 System.out.printf("Canceled: "+ future.isCancelled()+"\n"); 33 System.out.printf("Done: "+ future.isDone()+"\n"); 34 // 35 executor.shutdown(); 36 System.out.printf("The executor has finished\n"); 37 38 } 39 public static RuntimeException launderThrowable(Throwable t) { 40 41 if (t instanceof RuntimeException) 42 return (RuntimeException) t; 43 else if (t instanceof Error) 44 throw (Error) t; 45 else 46 throw new IllegalStateException("Not unchecked", t); 47 } 48 }
執行結果:
2.6 處理不可中斷的阻塞
在java庫中,許多可阻塞的方法都是通過提前返回或者拋出 InterruptedException 來響應中斷請求的,從而使開發人員更容易構建出能響應取消請求的任務。然而,並非所有的可阻塞方法或者阻塞機制都能響應中斷:
- 造成線程阻塞的原因:
1. java.io包中的同步Socket I/O。如套接字中進行讀寫操作read, write方法。
2. java.io包中的同步I/O。如當中斷或關閉正在InterruptibleChannel上等待的線程時,會對應拋出ClosedByInterruptException或 AsynchronousCloseException。
3. Selector的異步I/O。如果一個線程在調用Selector.select時阻塞了,那么調用close, wakeup會使線程拋出ClosedSelectorException。
4. 獲取某個鎖。當一個線程等待某個鎖而阻塞時,不會響應中斷。但Lock類的lockInterruptibly允許在等待鎖時響應中斷。
1 /** 2 * 7.11 通過改寫 interrupt 方法將非標准的取消操作封裝在 Thread 中 3 * @ClassName: ReaderThread 4 * @author xingle 5 * @date 2014-10-24 上午9:05:56 6 */ 7 public class ReaderThread extends Thread{ 8 9 private static final int BUFSZ = 512; 10 private final Socket socket; 11 private final InputStream in; 12 13 public ReaderThread(Socket socket) throws IOException{ 14 this.socket = socket; 15 this.in = socket.getInputStream(); 16 } 17 18 public void interrupt(){ 19 try { 20 socket.close(); 21 } catch (IOException e) { 22 e.printStackTrace(); 23 } finally{ 24 super.interrupt(); 25 } 26 } 27 28 public void run(){ 29 byte[] buf = new byte[BUFSZ]; 30 while(true){ 31 try { 32 int count = in.read(buf); 33 if (count < 0){ 34 break; 35 }else if(count >0 ){ 36 processBuffer(buf, count); 37 } 38 39 } catch (IOException e) { 40 //允許線程退出 41 } 42 43 } 44 } 45 46 47 private void processBuffer(byte[] buf, int count) { 48 // TODO Auto-generated method stub 49 50 } 51 }
2.7 采用 newTaskFor 來封裝非標准的取消
我們可以通過 newTaskFor 方法來進一步優化 ReaderThead 中封裝非標准取消的技術,這是 Java 6 在 ThreadPoolExecutor 中的新增功能。當把一個 Callable 提交給 ExecutorService 時,submit 方法會返回一個 Future ,我們可以通過這個 Future 來取消任務。newTaskFor 是一個工廠方法,它將創建 Future 來代表任務。
通過定制表示任務的 Future 可以改變Future.cancel 的行為。例如,定制的取消代碼可以實現日志記錄或者收集取消操作的統計信息,以及取消一些不響應中斷的操作。通過改寫 interrupt 方法,ReaderThead 可以取消基於套接字的線程。同樣,通過改寫任務的 Future.cancel 方法也可以實現類似的功能。
在下面的程序中,定義了一個CancellableTask 接口,該接口擴展了 Callable,並增加了一個 cancel 方法和一個 newTask 工廠方法來構造RunnableFuture 。CancellingExecutor 擴展了 ThreadPoolExecutor ,並通過改寫 newTaskFor 使得 CancellableTask 可以創建自己的 Future.
1 /** 2 * 7.12 通過 newTaskFor 將非標准的取消操作封裝在一個任務中 3 * 4 * @ClassName: SocketUsingTask 5 * @author xingle 6 * @date 2014-10-24 下午2:27:07 7 */ 8 public class SocketUsingTask<T> implements CancellableTask<T> { 9 10 @GuardedBy("this") 11 private Socket socket; 12 13 protected synchronized void setSocket(Socket socket) { 14 this.socket = socket; 15 } 16 17 @Override 18 public T call() throws Exception { 19 //do working 20 return null; 21 } 22 23 @Override 24 public void cancel() { 25 try { 26 if (socket != null) 27 socket.close(); 28 } catch (IOException e) { 29 e.printStackTrace(); 30 } 31 } 32 33 @Override 34 public RunnableFuture<T> newTask() { 35 return new FutureTask<T>(this) { 36 public boolean cancel(boolean mayInterruptIfRunning) { 37 try { 38 SocketUsingTask.this.cancel(); 39 } finally { 40 return super.cancel(mayInterruptIfRunning); 41 } 42 } 43 }; 44 } 45 46 /** 47 * 通過newTaskFor將非標准的取消操作封裝在任務中 48 */ 49 public class CancellingExecutor extends ThreadPoolExecutor { 50 51 /** 52 * @param corePoolSize 53 * @param maximumPoolSize 54 * @param keepAliveTime 55 * @param unit 56 * @param workQueue 57 */ 58 public CancellingExecutor(int corePoolSize, int maximumPoolSize, 59 long keepAliveTime, TimeUnit unit, 60 BlockingQueue<Runnable> workQueue) { 61 super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue); 62 // TODO Auto-generated constructor stub 63 } 64 65 @Override 66 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { 67 if (callable instanceof CancellableTask) { // 若是我們定制的可取消任務 68 return ((CancellableTask<T>) callable).newTask(); 69 } 70 return super.newTaskFor(callable); 71 } 72 } 73 74 } 75 76 /** 77 * 可取消的任務接口 78 */ 79 interface CancellableTask<T> extends Callable<T> { 80 void cancel(); 81 82 RunnableFuture<T> newTask(); 83 }
SocketUsingTask 實現了 CancellableTask,並定義了Future.cancel 來關閉套接字和調用 super.cancel。如果 SocketUsingTask 使用自己的 Future 來取消,那么底層的套接字將被關閉並且線程將被中斷。
1. 《並發編程實戰》 第7章