並發編程 05—— Callable和Future


第1部分 Callable

第2部分 Future

第3部分 示例和源碼分析

  3.1 submit()

  3.2 FutureTask的構造函數

  3.3 FutureTask的run()方法

第4部分 實例——JAVA並行異步編程線程池+FutureTask

第5部分 Callable和Future 區別

參考

 

Callable 和 Future 是比較有趣的一對組合。當我們需要獲取線程的執行結果時,就需要用到它們。Callable用於產生結果,Future用於獲取結果。

第1部分 Callable

  Callable 是一個接口,它只包含一個call()方法。Callable是一個返回結果並且可能拋出異常的任務。

為了便於理解,我們可以將Callable比作一個Runnable接口,而Callable的call()方法則類似於Runnable的run()方法。

Callable的源碼如下:

public interface Callable<V> {
    V call() throws Exception;
}

  Callable 接口類似於 Runnable,兩者都是為那些其實例可能被另一個線程執行的類設計的。但是 Runnable 不會返回結果,並且無法拋出經過檢查的異常。

Executors 類包含一些從其他普通形式轉換成 Callable 類的實用方法。

 

第2部分 Future

  Future 是一個接口。它用於表示異步計算的結果。提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。

Future的源碼如下:

 boolean cancel(boolean mayInterruptIfRunning) 
          試圖取消對此任務的執行。 
 V get() 
          如有必要,等待計算完成,然后獲取其結果。 
 V get(long timeout, TimeUnit unit) 
          如有必要,最多等待為使計算完成所給定的時間之后,獲取其結果(如果結果可用)。 
 boolean isCancelled() 
          如果在任務正常完成前將其取消,則返回 trueboolean isDone() 
          如果任務已完成,則返回 true

 

在講解FutureTask之前,先看看Callable, Future, FutureTask它們之間的關系圖,如下:

 

  Future表示一個任務的生命周期,並提供了相應的方法來判斷是否已經完成或取消,以及獲取任務的結果和取消任務等。在Future規范中包含的隱含意義是,任務的生命周期只能前進,不能后退,就像ExecutorService的生命周期一樣。當某個任務完成以后,它就永遠停留在“完成”狀態上。

  get方法的行為取決於任務的狀態(尚未開始、正在運行、已完成)。如果任務已經完成,那么get會立即返回或者拋出一個Exception,如果任務沒有完成,那么get將阻塞並直到任務完成。如果任務拋出了異常,那么get將該異常封裝為ExecutionException並重新拋出。如果任務被取消,那么get將拋出CancellationException。如果get拋出了ExecutionException,那么可以通過getCause來獲得封裝的初始異常。

異常拋出: 
CancellationException - 如果計算被取消 ExecutionException - 如果計算拋出異常 InterruptedException - 如果當前的線程在等待時被中斷 TimeoutException - 如果等待超時

 

  可以通過很多種方法創建一個Future來描述任務。ExecutorService中的所有submit方法都將返回一個Future,從而將一個Runnable 或 Callable 提交給Executor ,並得到一個Future 用來獲得任務的執行結果或者取消任務。 還可以顯示地為某個任務指定的Runnable或Callable 實例化一個FutureTask。

 

說明
(01) RunnableFuture是一個接口,它繼承了Runnable和Future這兩個接口。RunnableFuture的源碼如下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

 

(02) FutureTask實現了RunnableFuture接口。所以,也說它實現了Future接口。

 

第3部分 示例和源碼分析

先通過一個示例看看Callable和Future的基本用法,然后再分析示例的實現原理。

 

 1 package com.concurrency.TaskExecution_6;
 2 
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutionException;
 5 import java.util.concurrent.ExecutorService;
 6 import java.util.concurrent.Executors;
 7 import java.util.concurrent.Future;
 8 
 9 /**
10  * Callable 和 Future實現線程等待
11  * @ClassName: CallableFutureTest
12  * @author Xingle
13  * @date 2014-9-15 下午3:23:30
14  */
15 public class CallableFutureTest {
16     
17     public static void main(String[] args) throws InterruptedException, ExecutionException{
18         System.out.println("start main thread ");
19         ExecutorService exec = Executors.newFixedThreadPool(5);
20         
21         //新建一個Callable 任務,並將其提交到一個ExecutorService. 將返回一個描述任務情況的Future.
22         Callable<String> call = new Callable<String>() {
23 
24             @Override
25             public String call() throws Exception {
26                 System.out.println("start new thread ");
27                 Thread.sleep(5000);
28                 System.out.println("end new thread ");
29                 return "返回內容";
30             }
31         };
32         
33         Future<String> task = exec.submit(call);
34         Thread.sleep(1000);
35         String retn = task.get();
36         //關閉線程池
37         exec.shutdown();
38         System.out.println(retn+"--end main thread");
39     }
40 
41 }

 

執行結果:

 

3.1 submit()

submit()在java/util/concurrent/AbstractExecutorService.java中實現,它的源碼如下:

1 public <T> Future<T> submit(Callable<T> task) {
2     if (task == null) throw new NullPointerException();
3     // 創建一個RunnableFuture對象
4     RunnableFuture<T> ftask = newTaskFor(task);
5     // 執行“任務ftask”
6     execute(ftask);
7     // 返回“ftask”
8     return ftask;
9 }

 

說明:submit()通過newTaskFor(task)創建了RunnableFuture對象ftask。它的源碼如下:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

 

3.2 FutureTask的構造函數

FutureTask的構造函數如下:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    // callable是一個Callable對象
    this.callable = callable;
    // state記錄FutureTask的狀態
    this.state = NEW;       // ensure visibility of callable
}

 

3.3 FutureTask的run()方法

繼續回到submit()的源碼中。
在newTaskFor()新建一個ftask對象之后,會通過execute(ftask)執行該任務。此時ftask被當作一個Runnable對象進行執行,最終會調用到它的run()方法;ftask的run()方法在java/util/concurrent/FutureTask.java中實現,源碼如下:

 1 public void run() {
 2     if (state != NEW ||
 3         !UNSAFE.compareAndSwapObject(this, runnerOffset,
 4                                      null, Thread.currentThread()))
 5         return;
 6     try {
 7         // 將callable對象賦值給c。
 8         Callable<V> c = callable;
 9         if (c != null && state == NEW) {
10             V result;
11             boolean ran;
12             try {
13                 // 執行Callable的call()方法,並保存結果到result中。
14                 result = c.call();
15                 ran = true;
16             } catch (Throwable ex) {
17                 result = null;
18                 ran = false;
19                 setException(ex);
20             }
21             // 如果運行成功,則將result保存
22             if (ran)
23                 set(result);
24         }
25     } finally {
26         runner = null;
27         // 設置“state狀態標記”
28         int s = state;
29         if (s >= INTERRUPTING)
30             handlePossibleCancellationInterrupt(s);
31     }
32 }

 

說明:run()中會執行Callable對象的call()方法,並且最終將結果保存到result中,並通過set(result)將result保存。
      之后調用FutureTask的get()方法,返回的就是通過set(result)保存的值。

 

第4部分 實例——JAVA並行異步編程線程池+FutureTask

  通過上面的介紹,Callable 和 Future組合使用,可以獲取線程的執行結果,實際項目中遇到一個問題,查詢數據庫的時候,有個執行任務返回的數據非常大,幾乎是全部用戶群的數據,所以非常耗時,后續處理邏輯要獲取到所有返回的數據。這里考慮在查詢數據的時候,加上行號,利用線程池多個任務同時查從起始行到結束行的數據,類似分頁處理,最后將返回的數據合在一起。

 

 1 @Override
 2     public List<BuyerInfoVo> testQuery(final String eticketActId) {
 3         
 4         int count = grantEticketActDao.testQueryCount(eticketActId);
 5         final int pageSize = 2000;
 6         final int totalPage = count / pageSize+1;
 7 
 8         long start = System.currentTimeMillis();
 9         // 進行異步任務列表
10         List<FutureTask<List<BuyerInfoVo>>> futureTasks = new ArrayList<FutureTask<List<BuyerInfoVo>>>();
11         // 線程池 初始化30個線程 
12         ExecutorService executorService = Executors.newFixedThreadPool(30);
13 
14         Callable<List<BuyerInfoVo>> callable = new Callable<List<BuyerInfoVo>>() {
15             @Override
16             public List<BuyerInfoVo> call() throws Exception {
17                 // 執行任務
18                 List<BuyerInfoVo> ls =  grantEticketActDao.testQueryBySize(eticketActId,pageSize,totalPage);
19                 return ls;
20             }
21         };
22 
23         List<BuyerInfoVo> reLs = new ArrayList<BuyerInfoVo>();
24         for (int i = 0; i < totalPage; i++) {
25             // 創建一個異步任務
26             FutureTask<List<BuyerInfoVo>> futureTask = new FutureTask<List<BuyerInfoVo>>(
27                     callable);
28             futureTasks.add(futureTask);
29             executorService.submit(futureTask);
30         }
31 
32         for (FutureTask<List<BuyerInfoVo>> futureTask : futureTasks) {
33             try {
34                 List<BuyerInfoVo> ls = futureTask.get();
35                 if(null!=ls)
36                     reLs.addAll(ls);
37             } catch (InterruptedException | ExecutionException e) {
38                  throw new RuntimeException(e);
39             }
40         }
41 
42         // 清理線程池
43         executorService.shutdown();
44 
45         long end = System.currentTimeMillis();        
46         
47         System.out.println("數據庫查詢記錄總數:"+count);
48         System.out.println("實際返回數據條數:  " + reLs.size());
49         System.out.println("一共使用時間:"+(end-start)/1000+"s");    
50         if(reLs.size()!=count){
51             throw new RuntimeException();
52         }
53         GrantEticketActServiceImpl.queryId = 0;
54         return reLs;
55     }

 

 

其中,執行任務查詢的代碼:

 1 public List<BuyerInfoVo> testQueryBySize(String eticketActId, int pageSize,
 2             int totalPage) {
 3         int start ;
 4         int end;
 5         synchronized (eticketActId) {
 6             start = (GrantEticketActServiceImpl.queryId)*pageSize+1;
 7             end = (GrantEticketActServiceImpl.queryId+1)*pageSize;
 8             GrantEticketActServiceImpl.queryId++;
 9         }
10         StringBuffer sb = new StringBuffer();
11         //查詢語句省略
12         String querysql=" ";
13         sb.append(" SELECT * FROM (SELECT ROWNUM row_, t.* FROM (");
14         sb.append(querysql);
15         sb.append(") t ) WHERE row_ <=");
16         sb.append(end);
17         sb.append(" AND row_ >=");
18         sb.append(start);
19         String sql = sb.toString();
20         
21         Object[]  args = new Object[]{eticketActId };
22         List<BuyerInfoVo> list = jdbcTemplate.query(sql, args, new RowMapper<BuyerInfoVo>(){
23             @Override
24             public BuyerInfoVo mapRow(ResultSet rs, int i)
25                     throws SQLException {
26                 BuyerInfoVo vo = new BuyerInfoVo();
27                 AutoInjection.Rs2Vo(rs, vo, null);
28                 return vo;
29             }
30         });
31         
32         return list;
33     }

 

執行結果說明,原先優化之前,單次執行返回時間需要4min,優化后返回只需15s,效果非常明顯,涉及的參數,單次查詢條數,以及線程池的配置大小,還有待繼續深入。

 

 

第5部分 Callable和Future 區別

Callable定義的方法是call,而Runnable定義的方法是run。
Callable的call方法可以有返回值,而Runnable的run方法不能有返回值。
Callable的call方法可拋出異常,而Runnable的run方法不能拋出異常。 

 


參考:

1.《java 並發編程實戰》 第六章 任務執行

2.java並發編程-Executor框架

3.Java線程(七):Callable和Future

4.JAVA並行異步編程線程池+FutureTask

5.Callable vs Runnable - Runners間的“爭論”

 

項目中代碼片段:

/**
     * @Description:獲取所有有效產品列表
     * @return
     * @Return:List<SupplyAreaProVo>
     * @Author:
     * @Date:2016年5月9日 下午12:33:49
     */
    private List<SupplyAreaProVo> getTMallProductIndexAll(){
        Util.i = 0;
        final int total = 10;
        List<FutureTask<List<SupplyAreaProVo>>> futureTasks = new ArrayList<FutureTask<List<SupplyAreaProVo>>>();
        // 線程池 初始化20個線程
        ExecutorService executorService = Executors.newFixedThreadPool(20);
        List<SupplyAreaProVo> list=new ArrayList<SupplyAreaProVo>();
        for (int index = 0; index < total; index++) {
            final int cnt = index;
            Callable<List<SupplyAreaProVo>> callable = new Callable<List<SupplyAreaProVo>>() {
                @Override
                public List<SupplyAreaProVo> call() throws Exception {
                    // 執行任務
                    List<SupplyAreaProVo> list= searchTLMallDao.getTMallProductList(total,cnt);
                    return list;
                }
            };
            // 創建一個異步任務
            FutureTask<List<SupplyAreaProVo>> futureTask = new FutureTask<List<SupplyAreaProVo>>(callable);
            futureTasks.add(futureTask);
            executorService.submit(futureTask);
        }
        
        for (FutureTask<List<SupplyAreaProVo>> futureTask : futureTasks) {          
                try {
                    
                    List<SupplyAreaProVo> ls = futureTask.get();
                    if(null!=ls){
                        list.addAll(ls);
                    }     
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            
        }
        executorService.shutdown();
        return list;
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM