java並發編程框架 Executor ExecutorService invokeall


首先介紹兩個重要的接口,Executor和ExecutorService,定義如下: 

Java代碼   收藏代碼
  1. public interface Executor {  
  2.     void execute(Runnable command);  
  3. }  

 

Java代碼   收藏代碼
  1. public interface ExecutorService extends Executor {  
  2.     //不再接受新任務,待所有任務執行完畢后關閉ExecutorService  
  3.     void shutdown();  
  4.     //不再接受新任務,直接關閉ExecutorService,返回沒有執行的任務列表  
  5.     List<Runnable> shutdownNow();  
  6.     //判斷ExecutorService是否關閉  
  7.     boolean isShutdown();  
  8.     //判斷ExecutorService是否終止  
  9.     boolean isTerminated();  
  10.     //等待ExecutorService到達終止狀態  
  11.     boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  
  12.     <T> Future<T> submit(Callable<T> task);  
  13.     //當task執行成功的時候future.get()返回result  
  14.     <T> Future<T> submit(Runnable task, T result);  
  15.     //當task執行成功的時候future.get()返回null  
  16.     Future<?> submit(Runnable task);  
  17.     //批量提交任務並獲得他們的future,Task列表與Future列表一一對應  
  18.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)  
  19.         throws InterruptedException;  
  20.     //批量提交任務並獲得他們的future,並限定處理所有任務的時間  
  21.     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,  
  22. long timeout, TimeUnit unit) throws InterruptedException;  
  23.     //批量提交任務並獲得一個已經成功執行的任務的結果  
  24.     <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException;  
  25.   
  26.     <T> T invokeAny(Collection<? extends Callable<T>> tasks,  
  27.                     long timeout, TimeUnit unit)  
  28.         throws InterruptedException, ExecutionException, TimeoutException;  
  29. }  



為了配合使用上面的並發編程接口,有一個Executors工廠類,負責創建各類滿足ExecutorService接口的線程池,具體如下: 
newFixedThreadPool:創建一個固定長度的線程池,線程池中線程的數量從1增加到最大值后保持不變。如果某個線程壞死掉,將會補充一個新的線程。 
newCachedThreadPool:創建長度不固定的線程池,線程池的規模不受限制,不常用。 
newSingleThreadExecutor:創建一個單線程的Executor,他其中有一個線程來處理任務,如果這個線程壞死掉,將補充一個新線程。 
newScheduledThreadPool:創建固定長度的線程池,以延時或定時的方式來執行任務。 

下面是Executor和ExecutorService中常用方法的示例: 

Java代碼   收藏代碼
  1. import java.util.ArrayList;  
  2. import java.util.Collection;  
  3. import java.util.Iterator;  
  4. import java.util.List;  
  5. import java.util.concurrent.Callable;  
  6. import java.util.concurrent.Executor;  
  7. import java.util.concurrent.ExecutorService;  
  8. import java.util.concurrent.Executors;  
  9. import java.util.concurrent.Future;  
  10. import java.util.concurrent.TimeUnit;  
  11.   
  12. public class Demo{  
  13.     public static void main(String [] args){  
  14.         //--------Executor示例------------//  
  15.         Executor s=Executors.newSingleThreadExecutor();  
  16.         s.execute(new MyRunnableTask("1"));  
  17.           
  18.         //--------ExecutorService示例------------//  
  19.         ExecutorService es=Executors.newFixedThreadPool(2);  
  20.           
  21.         //--------get()示例------------//  
  22.         Future<String> future=es.submit(new MyCallableTask("10"));  
  23.         try{  
  24.             System.out.println(future.get());             
  25.         }catch(Exception e){}  
  26.           
  27.         //--------get(timeout, timeunit)示例------------//  
  28.         future=es.submit(new MyCallableTask("11"));  
  29.         try{  
  30.             System.out.println(future.get(500,TimeUnit.MILLISECONDS));  
  31.         }catch(Exception e){  
  32.             System.out.println("cancle because timeout");  
  33.         }  
  34.           
  35.         //--------invokeAll(tasks)示例------------//  
  36.         List<MyCallableTask> myCallableTasks=new ArrayList<MyCallableTask>();  
  37.         for(int i=0;i<6;i++){  
  38.             myCallableTasks.add(new MyCallableTask(i+""));  
  39.         }  
  40.         try {  
  41.             List<Future<String>> results = es.invokeAll(myCallableTasks);  
  42.             Iterator<Future<String>> iterator=results.iterator();  
  43.             while(iterator.hasNext()){  
  44.                 future=iterator.next();  
  45.                 System.out.println(future.get());  
  46.             }  
  47.         } catch (Exception e) {}  
  48.   
  49.         //--------invokeAll(tasks,timeout,timeunit))示例------------//  
  50.         try {  
  51.             //限定執行時間為2100ms,每個任務需要1000ms,線程池的長度為2,因此最多只能處理4個任務。一共6個任務,有2個任務會被取消。  
  52.             List<Future<String>> results = es.invokeAll(myCallableTasks,2100,TimeUnit.MILLISECONDS);  
  53.             Iterator<Future<String>> iterator=results.iterator();  
  54.             while(iterator.hasNext()){  
  55.                 future=iterator.next();  
  56.                 if(!future.isCancelled())  
  57.                     System.out.println(future.get());  
  58.                 else  
  59.                     System.out.println("cancle because timeout");  
  60.             }  
  61.         } catch (Exception e) {}  
  62.         es.shutdown();  
  63.     }  
  64. }  
  65.   
  66. class MyRunnableTask implements Runnable{  
  67.     private String name;  
  68.     public MyRunnableTask(String name) {  
  69.         this.name=name;  
  70.     }  
  71.     @Override  
  72.     public void run() {  
  73.         try {  
  74.             Thread.sleep(1000);  
  75.         } catch (InterruptedException e) {  
  76.             e.printStackTrace();  
  77.         }  
  78.         System.out.println("runnable task--"+name);  
  79.     }  
  80. }  
  81. class MyCallableTask implements Callable<String>{  
  82.     private String name;  
  83.     public MyCallableTask(String name) {  
  84.         this.name=name;  
  85.     }  
  86.     @Override  
  87.     public String call() throws Exception {  
  88.         try {  
  89.             Thread.sleep(1000);  
  90.         } catch (InterruptedException e) {}  
  91.         StringBuilder sb=new StringBuilder("callable task--");  
  92.         return sb.append(name).toString();  
  93.     }  
  94. }  



上面的ExecutorSerivce接口中的invokeAll(tasks)方法用於批量執行任務,並且將結果按照task列表中的順序返回。此外,還存在一個批量執行任務的接口CompletionTask。ExecutorCompletionService是實現CompletionService接口的一個類,該類的實現原理很簡單: 

用Executor類來執行任務,同時把在執行任務的Future放到BlockingQueue<Future<V>>隊列中。該類實現的關鍵就是重寫FutureTask類的done()方法,FutureTask類的done()方法是一個鈎子函數(關於鈎子函數,請讀者自行查詢),done()方法在FutureTask任務被執行的時候被調用。 

ExecutorCompletionService類的核心代碼如下: 

Java代碼   收藏代碼
  1. public Future<V> submit(Runnable task, V result) {  
  2.     if (task == null) throw new NullPointerException();  
  3.     RunnableFuture<V> f = newTaskFor(task, result);  
  4.     executor.execute(new QueueingFuture(f));  
  5.     return f;  
  6. }  
  7. private class QueueingFuture extends FutureTask<Void> {  
  8.     QueueingFuture(RunnableFuture<V> task) {  
  9.         super(task, null);  
  10.         this.task = task;  
  11.     }  
  12.     protected void done() { completionQueue.add(task); }  
  13.     private final Future<V> task;  
  14. }  


其中的done()方法定義如下: 

Java代碼   收藏代碼
  1. /** 
  2.     * Protected method invoked when this task transitions to state 
  3.     * <tt>isDone</tt> (whether normally or via cancellation). The 
  4.     * default implementation does nothing.  Subclasses may override 
  5.     * this method to invoke completion callbacks or perform 
  6.     * bookkeeping. Note that you can query status inside the 
  7.     * implementation of this method to determine whether this task 
  8.     * has been cancelled. 
  9.     */  
  10.    protected void done() { }  



ExecutorCompletionService的使用示例如下: 

Java代碼   收藏代碼
    1. import java.util.concurrent.Callable;  
    2. import java.util.concurrent.CompletionService;  
    3. import java.util.concurrent.ExecutionException;  
    4. import java.util.concurrent.ExecutorCompletionService;  
    5. import java.util.concurrent.Executors;  
    6. import java.util.concurrent.Future;  
    7.   
    8. public class Demo{  
    9.     public static void main(String [] args) throws InterruptedException, ExecutionException{  
    10.         CompletionService<String> cs=new ExecutorCompletionService<String>(  
    11.                 Executors.newFixedThreadPool(2));  
    12.         for(int i=0;i<6;i++){  
    13.             cs.submit(new MyCallableTask(i+""));  
    14.         }  
    15.         for(int i=0;i<6;i++){  
    16.             Future<String> future=cs.take();  
    17.             //Retrieves and removes the Future representing the next completed task,   
    18.             //waiting if none are yet present.  
    19.             System.out.println(future.get());  
    20.         }  
    21.     }  
    22. }  
    23.   
    24. class MyCallableTask implements Callable<String>{  
    25.     private String name;  
    26.     public MyCallableTask(String name) {  
    27.         this.name=name;  
    28.     }  
    29.     @Override  
    30.     public String call() throws Exception {  
    31.         try {  
    32.             Thread.sleep(1000);  
    33.         } catch (InterruptedException e) {}  
    34.         StringBuilder sb=new StringBuilder("callable task--");  
    35.         return sb.append(name).toString();  
    36.     }  
    37. }  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM