Java並發編程實踐 目錄
並發編程 04—— 閉鎖CountDownLatch 與 柵欄CyclicBarrier
並發編程 06—— CompletionService : Executor 和 BlockingQueue
並發編程 10—— 任務取消 之 關閉 ExecutorService
並發編程 12—— 任務取消與關閉 之 shutdownNow 的局限性
並發編程 13—— 線程池的使用 之 配置ThreadPoolExecutor 和 飽和策略
並發編程 20—— AbstractQueuedSynchronizer 深入分析
概述
第4部分 實例——JAVA並行異步編程線程池+FutureTask
Callable 和 Future 是比較有趣的一對組合。當我們需要獲取線程的執行結果時,就需要用到它們。Callable用於產生結果,Future用於獲取結果。
第1部分 Callable
Callable 是一個接口,它只包含一個call()方法。Callable是一個返回結果並且可能拋出異常的任務。
為了便於理解,我們可以將Callable比作一個Runnable接口,而Callable的call()方法則類似於Runnable的run()方法。
Callable的源碼如下:
public interface Callable<V> { V call() throws Exception; }
Callable 接口類似於 Runnable
,兩者都是為那些其實例可能被另一個線程執行的類設計的。但是 Runnable 不會返回結果,並且無法拋出經過檢查的異常。
Executors
類包含一些從其他普通形式轉換成 Callable 類的實用方法。
第2部分 Future
Future 是一個接口。它用於表示異步計算的結果。提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。
Future的源碼如下:
boolean cancel(boolean mayInterruptIfRunning) 試圖取消對此任務的執行。 V get() 如有必要,等待計算完成,然后獲取其結果。 V get(long timeout, TimeUnit unit) 如有必要,最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。 boolean isCancelled() 如果在任務正常完成前將其取消,則返回 true。 boolean isDone() 如果任務已完成,則返回 true。
在講解FutureTask之前,先看看Callable, Future, FutureTask它們之間的關系圖,如下:
Future表示一個任務的生命周期,並提供了相應的方法來判斷是否已經完成或取消,以及獲取任務的結果和取消任務等。在Future規范中包含的隱含意義是,任務的生命周期只能前進,不能后退,就像ExecutorService的生命周期一樣。當某個任務完成以后,它就永遠停留在“完成”狀態上。
get方法的行為取決於任務的狀態(尚未開始、正在運行、已完成)。如果任務已經完成,那么get會立即返回或者拋出一個Exception,如果任務沒有完成,那么get將阻塞並直到任務完成。如果任務拋出了異常,那么get將該異常封裝為ExecutionException並重新拋出。如果任務被取消,那么get將拋出CancellationException。如果get拋出了ExecutionException,那么可以通過getCause來獲得封裝的初始異常。
異常拋出:
CancellationException - 如果計算被取消 ExecutionException - 如果計算拋出異常 InterruptedException - 如果當前的線程在等待時被中斷 TimeoutException - 如果等待超時
可以通過很多種方法創建一個Future來描述任務。ExecutorService中的所有submit方法都將返回一個Future,從而將一個Runnable 或 Callable 提交給Executor ,並得到一個Future 用來獲得任務的執行結果或者取消任務。 還可以顯示地為某個任務指定的Runnable或Callable 實例化一個FutureTask。
說明:
(01) RunnableFuture是一個接口,它繼承了Runnable和Future這兩個接口。RunnableFuture的源碼如下:
public interface RunnableFuture<V> extends Runnable, Future<V> { void run(); }
(02) FutureTask實現了RunnableFuture接口。所以,也說它實現了Future接口。
第3部分 示例和源碼分析
先通過一個示例看看Callable和Future的基本用法,然后再分析示例的實現原理。
1 package com.concurrency.TaskExecution_6; 2 3 import java.util.concurrent.Callable; 4 import java.util.concurrent.ExecutionException; 5 import java.util.concurrent.ExecutorService; 6 import java.util.concurrent.Executors; 7 import java.util.concurrent.Future; 8 9 /** 10 * Callable 和 Future實現線程等待 11 * @ClassName: CallableFutureTest 12 * @author Xingle 13 * @date 2014-9-15 下午3:23:30 14 */ 15 public class CallableFutureTest { 16 17 public static void main(String[] args) throws InterruptedException, ExecutionException{ 18 System.out.println("start main thread "); 19 ExecutorService exec = Executors.newFixedThreadPool(5); 20 21 //新建一個Callable 任務,並將其提交到一個ExecutorService. 將返回一個描述任務情況的Future. 22 Callable<String> call = new Callable<String>() { 23 24 @Override 25 public String call() throws Exception { 26 System.out.println("start new thread "); 27 Thread.sleep(5000); 28 System.out.println("end new thread "); 29 return "返回內容"; 30 } 31 }; 32 33 Future<String> task = exec.submit(call); 34 Thread.sleep(1000); 35 String retn = task.get(); 36 //關閉線程池 37 exec.shutdown(); 38 System.out.println(retn+"--end main thread"); 39 } 40 41 }
執行結果:
3.1 submit()
submit()在java/util/concurrent/AbstractExecutorService.java中實現,它的源碼如下:
1 public <T> Future<T> submit(Callable<T> task) { 2 if (task == null) throw new NullPointerException(); 3 // 創建一個RunnableFuture對象 4 RunnableFuture<T> ftask = newTaskFor(task); 5 // 執行“任務ftask” 6 execute(ftask); 7 // 返回“ftask” 8 return ftask; 9 }
說明:submit()通過newTaskFor(task)創建了RunnableFuture對象ftask。它的源碼如下:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }
3.2 FutureTask的構造函數
FutureTask的構造函數如下:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); // callable是一個Callable對象 this.callable = callable; // state記錄FutureTask的狀態 this.state = NEW; // ensure visibility of callable }
3.3 FutureTask的run()方法
繼續回到submit()的源碼中。
在newTaskFor()新建一個ftask對象之后,會通過execute(ftask)執行該任務。此時ftask被當作一個Runnable對象進行執行,最終會調用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中實現,源碼如下:
1 public void run() { 2 if (state != NEW || 3 !UNSAFE.compareAndSwapObject(this, runnerOffset, 4 null, Thread.currentThread())) 5 return; 6 try { 7 // 將callable對象賦值給c。 8 Callable<V> c = callable; 9 if (c != null && state == NEW) { 10 V result; 11 boolean ran; 12 try { 13 // 執行Callable的call()方法,並保存結果到result中。 14 result = c.call(); 15 ran = true; 16 } catch (Throwable ex) { 17 result = null; 18 ran = false; 19 setException(ex); 20 } 21 // 如果運行成功,則將result保存 22 if (ran) 23 set(result); 24 } 25 } finally { 26 runner = null; 27 // 設置“state狀態標記” 28 int s = state; 29 if (s >= INTERRUPTING) 30 handlePossibleCancellationInterrupt(s); 31 } 32 }
說明:run()中會執行Callable對象的call()方法,並且最終將結果保存到result中,並通過set(result)將result保存。
之后調用FutureTask的get()方法,返回的就是通過set(result)保存的值。
第4部分 實例——JAVA並行異步編程線程池+FutureTask
通過上面的介紹,Callable 和 Future組合使用,可以獲取線程的執行結果,實際項目中遇到一個問題,查詢數據庫的時候,有個執行任務返回的數據非常大,幾乎是全部用戶群的數據,所以非常耗時,后續處理邏輯要獲取到所有返回的數據。這里考慮在查詢數據的時候,加上行號,利用線程池多個任務同時查從起始行到結束行的數據,類似分頁處理,最后將返回的數據合在一起。
1 @Override 2 public List<BuyerInfoVo> testQuery(final String eticketActId) { 3 4 int count = grantEticketActDao.testQueryCount(eticketActId); 5 final int pageSize = 2000; 6 final int totalPage = count / pageSize+1; 7 8 long start = System.currentTimeMillis(); 9 // 進行異步任務列表 10 List<FutureTask<List<BuyerInfoVo>>> futureTasks = new ArrayList<FutureTask<List<BuyerInfoVo>>>(); 11 // 線程池 初始化30個線程 12 ExecutorService executorService = Executors.newFixedThreadPool(30); 13 14 Callable<List<BuyerInfoVo>> callable = new Callable<List<BuyerInfoVo>>() { 15 @Override 16 public List<BuyerInfoVo> call() throws Exception { 17 // 執行任務 18 List<BuyerInfoVo> ls = grantEticketActDao.testQueryBySize(eticketActId,pageSize,totalPage); 19 return ls; 20 } 21 }; 22 23 List<BuyerInfoVo> reLs = new ArrayList<BuyerInfoVo>(); 24 for (int i = 0; i < totalPage; i++) { 25 // 創建一個異步任務 26 FutureTask<List<BuyerInfoVo>> futureTask = new FutureTask<List<BuyerInfoVo>>( 27 callable); 28 futureTasks.add(futureTask); 29 executorService.submit(futureTask); 30 } 31 32 for (FutureTask<List<BuyerInfoVo>> futureTask : futureTasks) { 33 try { 34 List<BuyerInfoVo> ls = futureTask.get(); 35 if(null!=ls) 36 reLs.addAll(ls); 37 } catch (InterruptedException | ExecutionException e) { 38 throw new RuntimeException(e); 39 } 40 } 41 42 // 清理線程池 43 executorService.shutdown(); 44 45 long end = System.currentTimeMillis(); 46 47 System.out.println("數據庫查詢記錄總數:"+count); 48 System.out.println("實際返回數據條數: " + reLs.size()); 49 System.out.println("一共使用時間:"+(end-start)/1000+"s"); 50 if(reLs.size()!=count){ 51 throw new RuntimeException(); 52 } 53 GrantEticketActServiceImpl.queryId = 0; 54 return reLs; 55 }
其中,執行任務查詢的代碼:
1 public List<BuyerInfoVo> testQueryBySize(String eticketActId, int pageSize, 2 int totalPage) { 3 int start ; 4 int end; 5 synchronized (eticketActId) { 6 start = (GrantEticketActServiceImpl.queryId)*pageSize+1; 7 end = (GrantEticketActServiceImpl.queryId+1)*pageSize; 8 GrantEticketActServiceImpl.queryId++; 9 } 10 StringBuffer sb = new StringBuffer(); 11 //查詢語句省略 12 String querysql=" "; 13 sb.append(" SELECT * FROM (SELECT ROWNUM row_, t.* FROM ("); 14 sb.append(querysql); 15 sb.append(") t ) WHERE row_ <="); 16 sb.append(end); 17 sb.append(" AND row_ >="); 18 sb.append(start); 19 String sql = sb.toString(); 20 21 Object[] args = new Object[]{eticketActId }; 22 List<BuyerInfoVo> list = jdbcTemplate.query(sql, args, new RowMapper<BuyerInfoVo>(){ 23 @Override 24 public BuyerInfoVo mapRow(ResultSet rs, int i) 25 throws SQLException { 26 BuyerInfoVo vo = new BuyerInfoVo(); 27 AutoInjection.Rs2Vo(rs, vo, null); 28 return vo; 29 } 30 }); 31 32 return list; 33 }
執行結果說明,原先優化之前,單次執行返回時間需要4min,優化后返回只需15s,效果非常明顯,涉及的參數,單次查詢條數,以及線程池的配置大小,還有待繼續深入。
第5部分 Callable和Future 區別
Callable定義的方法是call,而Runnable定義的方法是run。
Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
Callable的call方法可拋出異常,而Runnable的run方法不能拋出異常。
1.《java 並發編程實戰》 第六章 任務執行
5.Callable vs Runnable - Runners間的“爭論”
項目中代碼片段:
/** * @Description:獲取所有有效產品列表 * @return * @Return:List<SupplyAreaProVo> * @Author: * @Date:2016年5月9日 下午12:33:49 */ private List<SupplyAreaProVo> getTMallProductIndexAll(){ Util.i = 0; final int total = 10; List<FutureTask<List<SupplyAreaProVo>>> futureTasks = new ArrayList<FutureTask<List<SupplyAreaProVo>>>(); // 線程池 初始化20個線程 ExecutorService executorService = Executors.newFixedThreadPool(20); List<SupplyAreaProVo> list=new ArrayList<SupplyAreaProVo>(); for (int index = 0; index < total; index++) { final int cnt = index; Callable<List<SupplyAreaProVo>> callable = new Callable<List<SupplyAreaProVo>>() { @Override public List<SupplyAreaProVo> call() throws Exception { // 執行任務 List<SupplyAreaProVo> list= searchTLMallDao.getTMallProductList(total,cnt); return list; } }; // 創建一個異步任務 FutureTask<List<SupplyAreaProVo>> futureTask = new FutureTask<List<SupplyAreaProVo>>(callable); futureTasks.add(futureTask); executorService.submit(futureTask); } for (FutureTask<List<SupplyAreaProVo>> futureTask : futureTasks) { try { List<SupplyAreaProVo> ls = futureTask.get(); if(null!=ls){ list.addAll(ls); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } executorService.shutdown(); return list; }