java並發之線程執行器(Executor)


線程執行器和不使用線程執行器的對比(優缺點)

1.線程執行器分離了任務的創建和執行,通過使用執行器,只需要實現Runnable接口的對象,然后把這些對象發送給執行器即可。

2.使用線程池來提高程序的性能。當發送一個任務給執行器時,執行器會嘗試使用線程池中的線程來執行這個任務。避免了不斷創建和銷毀線程導致的性能開銷。

3.執行器可以處理實現了Callable接口的任務。Callable接口類似於Runnable接口,卻提供了兩方面的增強:

  a.Callable主方法名稱為call(),可以返回結果

  b.當發送一個Callable對象給執行器時,將獲得一個實現了Future接口的對象。可以使用這個對象來控制Callable對象的狀態和結果。

4.提供了一些操作線程任務的功能

使用線程執行器的例子
  • 執行繼承了Runnable接口的任務類
聲明任務類Task
 1 public class Task implements Runnable {
 2     private String name;
 3     
 4     public Task(String name){
 5         this.name=name;
 6     }
 7     @Override
 8     public void run() {
 9         }
10 }
使用執行器調用任務類
 1 public class Server {
 2     private ThreadPoolExecutor executor;
 3     
 4     public Server(){
 5         executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();
 6     }
 7     public void executeTask(Task task){
 8         System.out.printf("Server: A new task has arrived\n");
 9         executor.execute(task);
10         System.out.printf("Server: Active Count: %d\n",executor.getActiveCount());
11         System.out.printf("Server: Completed Tasks: %d\n",executor.getCompletedTaskCount());
12     }
13     public void endServer() {
14         executor.shutdown();
15   }
16 }
需要注意的地方:
 1、ThreadPoolExecutor提供了好幾種構造函數,由於這些構造函數的參數比較多,難於記憶,所以這里使用Executors類對其構造函數進行了封裝,封裝后的靜        態函數可以通過函數名稱更加直觀的表述其含義。
2、執行實現Runnable接口的任務類使用的方式是:executor.execute(task);后面可以看到它和調用實現Callable接口的任務類還是有區別的。
3、使用執行器時要顯示結束執行器。如果不關閉,那么執行器會一直執行而程序不會結束。如果執行器沒有任務執行了,它將繼續等待新任務的到來,而不會          結束執行。結束執行器這里使用的方式是shutdown();
  • 執行實現了Callable<T>接口的任務
 1 public class FactorialCalculator implements Callable<Integer> {
 2     private Integer number;
 3     public FactorialCalculator(Integer number){
 4         this.number=number;
 5     }
 6     
 7     @Override
 8     public Integer call() throws Exception {
 9         int num, result;
10         
11         num=number.intValue();
12         result=1;
13         
14         // If the number is 0 or 1, return the 1 value
15         if ((num==0)||(num==1)) {
16             result=1;
17         } else {
18             // Else, calculate the factorial
19             for (int i=2; i<=number; i++) {
20                 result*=i;
21                 Thread.sleep(20);
22             }
23         }
24         System.out.printf("%s: %d\n",Thread.currentThread().getName(),result);
25         // Return the value
26         return result;
27     }
28 }
交給執行器去執行:
1 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);//實例化執行器
2 FactorialCalculator calculator = new FactorialCalculator(number);//實例化任務
3 Future<Integer> result = executor.submit(calculator);//執行任務,並返回Future<T>實例
需要注意的地方:
1、Callable接口是一個泛型接口,該接口聲明了call()方法。
1 public interface Callable<V> {
2     V call() throws Exception;
3 }
2、執行器調用submit()方法執行任務之后,返回一個Future<T>類型對象。Future是一個異步任務的結果。意思就是任務交給執行器后,執行器就會立刻返回一個Future對象,而此時任務正在執行中。Future對象聲明了一些方法來獲取由Callable對象產生的結果,並管理他們的狀態。Future包含的方法如下:
 

 
線程執行器的四種實例方式
前面提到由於ThreadPoolExecutor類的構造函數比較難記憶(參數多,形式也差不多),Java提供了一個工廠類Executors來實現執行器對象的創建。具體函數如下:

 
這些函數以new開頭。
1、newCachedThreadPool():緩存線程池
1   public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                       60L, TimeUnit.SECONDS,
4                                       new SynchronousQueue<Runnable>());
5     }
需要注意的地方:
      如果需要執行新任務,緩存線程池就會創建新線程;如果線程所運行的任務執行完成后並且這個線程可用,那么緩存線程池將會重用這些線程。
      優點:減少了創建新線程所花費的時間
      缺點:如果任務過多,系統的負荷會過載
      使用條件:線程數量合理(不太多)或者線程運行只會運行很短的時間 
2、newFixedThreadPool():固定線程池,(fixed:固定)
1 public static ExecutorService newFixedThreadPool(int nThreads) {
2         return new ThreadPoolExecutor(nThreads, nThreads,
3                                       0L, TimeUnit.MILLISECONDS,
4                                       new LinkedBlockingQueue<Runnable>());
5     }
需要注定的地方:
        創建了具有線程最大數量值(即線程數量 <= nThreads)的執行器。如果發送超過數量的任務給執行器,剩余的任務將被阻塞知道線程池中有可空閑的線程來處理它們。
3、newSingleThreadExecutor():單線程執行器
1 public static ExecutorService newSingleThreadExecutor() {
2         return new FinalizableDelegatedExecutorService
3             (new ThreadPoolExecutor(1, 1,
4                                     0L, TimeUnit.MILLISECONDS,
5                                     new LinkedBlockingQueue<Runnable>()));
6 }
4、newScheduledThreadPool(int corePoolSize):定時執行器
1 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
2         return new ScheduledThreadPoolExecutor(corePoolSize);
3 }
需要注意的地方:
        使用方式如下:
1 ScheduledExecutorService executor=(ScheduledExecutorService)Executors.newScheduledThreadPool(1);
2 executor.schedule(task,i+1 , TimeUnit.SECONDS);
其中:task是實現了Callable接口的任務。schedule的參數含義:
1 public <V> ScheduledFuture<V> schedule(Callable<V> callable,//即將執行的任務
2                                            long delay,//任務執行前需要等待的時間
3                                            TimeUnit unit)//時間單位

 
線程執行器提供的功能
線程任務交給執行器去執行,執行器封裝了一些方法來操作執行的線程。其中涉及到的類和接口的類圖如下:
執行器的類圖:
Executor是一個頂層接口,提供了唯一的一個方法execute(Runnable r)。ExecutorService繼承Excutor接口,是比較核心的接口。提供了執行器具有的基本方法,包括執行器的提交(submit)和終止(shutdown)以及控制任務運行的invokeAll()和invokeAny()等方法。經常用到的執行器類一般是ThreadPoolExecutor和ScheduledThreadPoolExecutor。區別就是ScheduledThreadPoolExecutor一般和線程調度有關,也就是與一些周期性操作,時間間隔、定時執行任務的操作有關。
通過Future接口可以對執行器的線程進行一些操作,例如獲取線程執行完成后的結果,取消線程的執行等,涉及Future的類圖如下:
接下來具體學習上面這些類的用法以及他們提供的函數的使用場景。
 
延時執行任務和 周期性執行任務
涉及到這種調度的一般使用ScheduledThreadPoolExecutor類。ScheduledThreadPoolExecutor類涉及到的和調度有關的函數如下:
延時執行任務:
1 ScheduledExecutorService executor=(ScheduledExecutorService)Executors.newScheduledThreadPool(1);
2 
3         for (int i=0; i<5; i++) {
4             Task task=new Task("Task "+i);
5             executor.schedule(task,i+1 , TimeUnit.SECONDS);
6         }
7         
8         executor.shutdown();
這里聲明一個定時執行器,返回ScheduleExecutorService接口。然后調用schedule()方法。schedule的參數含義:
1 public <V> ScheduledFuture<V> schedule(Callable<V> callable,//即將執行的任務
2                                            long delay,      //任務執行前需要等待的時間
3                                            TimeUnit unit)   //時間單位
周期性執行任務:
周期性執行任務和延時執行任務相似,只不過調用的方法是scheduleAtFixedRate()。
1 ScheduledExecutorService executor=Executors.newScheduledThreadPool(1);
2 
3         Task task=new Task("Task");
4         ScheduledFuture<?> result=executor.scheduleAtFixedRate(task, 1, 2, TimeUnit.SECONDS);
其中scheduleAtFixedRate函數的參數含義:
1 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,     //將被周期性執行的任務
2                                                   long initialDelay,//任務第一次執行后的延后時間
3                                                   long period,      //兩次執行的時間周期
4                                                   TimeUnit unit) {  //第二和第三個參數的時間單位
需要注意的地方:
兩次執行之間的周期(即period)是指任務在兩次執行開始時的時間間隔。如果有一個周期性的任務需要執行5秒鍾,但是卻讓他沒三秒執行一次,那么在任務的執行過程中會將有兩個任務實例同時存在。
 
對線程任務的控制
invokeAny()和invokeAll()
這兩個方法在ExecutorService中聲明,ExecutorService是比較核心也是比較基礎的接口。所以這兩個方法應該算是執行器提供的比較寬范圍(下面的子類都可以用到)的方法。
編程中比較常見的問題是,當采用多個並發任務來解決一個問題時,往往只關心這些任務中的第一個結果。比如,對一個數組排序有很多種算法,可以並發啟動所有算法,對於給定的數組,第一個得到排序結果的算法就是最快的算法。這種場景可以使用invokeAny()函數實現,即: 運行多個任務並返回第一個結果。  
 1     UserValidator ldapValidator=new UserValidator("LDAP");
 2         UserValidator dbValidator=new UserValidator("DataBase");
 3         
 4         // Create two tasks for the user validation objects
 5         TaskValidator ldapTask=new TaskValidator(ldapValidator, username, password);
 6         TaskValidator dbTask=new TaskValidator(dbValidator,username,password);
 7         
 8         // Add the two tasks to a list of tasks
 9         List<TaskValidator> taskList=new ArrayList<>();
10         taskList.add(ldapTask);
11         taskList.add(dbTask);
12         
13         // Create a new Executor
14         ExecutorService executor=(ExecutorService)Executors.newCachedThreadPool();
15         String result;
16         try {
17             // Send the list of tasks to the executor and waits for the result of the first task 
18             // that finish without throw and Exception. If all the tasks throw and Exception, the
19             // method throws and ExecutionException.
20             result = executor.invokeAny(taskList);
21             System.out.printf("Main: Result: %s\n",result);
22         } catch (InterruptedException e) {
23             e.printStackTrace();
24         } catch (ExecutionException e) {
25             e.printStackTrace();
26         }
27         
28         // Shutdown the Executor
29         executor.shutdown();
其中UserValidator類睡眠一個隨機模擬校驗任務
 1 public boolean validate(String name, String password) {
 2         Random random=new Random();
 3         
 4         try {
 5             Long duration=(long)(Math.random()*10);
 6             System.out.printf("Validator %s: Validating a user during %d seconds\n",this.name,duration);
 7             TimeUnit.SECONDS.sleep(duration);
 8         } catch (InterruptedException e) {
 9             return false;
10         }
11         
12         return random.nextBoolean();
13     }
接下來是invokeAll(),invokeAll()方法接收一個任務列表,然后返回任務列表的所有任務的執行結果。
 1 List<Task> taskList = new ArrayList<>();
 2         for (int i = 0; i < 3; i++) {
 3             Task task = new Task("Task-" + i);
 4             taskList.add(task);
 5         }
 6         // Call the invokeAll() method
 7         List<Future<Result>> resultList = null;
 8         try {
 9             resultList = executor.invokeAll(taskList);
10         } catch (InterruptedException e) {
11             e.printStackTrace();
12         }
13         // Finish the executor
14         executor.shutdown();
Future和FutureTask
Future接口用來對接收任務執行完成后的結果以及對交給執行器執行的任務進行控制。接口提供的函數如下:
cancel()、isCancelled()
cancel()方法用來取消交給執行器的任務。isCancelled()方法用來判斷是否取消成功。其中cancel(boolean)接收一個boolean類型的參數,用來表示是否要取消任務。具體用法:
 1 Task task=new Task();
 2         
 3         Future<String> result=executor.submit(task);
 4         
 5         try {
 6             TimeUnit.SECONDS.sleep(2);
 7         } catch (InterruptedException e) {
 8             e.printStackTrace();
 9         }
10         
11         System.out.printf("Main: Cancelling the Task\n");
12         result.cancel(true);
線程交給執行器執行后會立即返回一個FutureTask<T>對象,(例如:java.util.concurrent.FutureTask@610455d6),通過調用cancel(true)方法顯示來取消執行器中正在運行的任務。
注意的地方:
1、如果任務已經完成,或者之前已被取消,或者由於某種原因不能取消,則方法將返回false。
2、如果任務在執行器中等待分配Thread對象來執行它,那么任務被取消,並且不會開始執行。
3、如果任務已經在運行,那么依賴於調用cancel()方法時傳遞的參數。如果傳遞的參數為true,並且任務正在執行,任務將會取消。如果傳遞的參數為false並且任務正在執行,任務不會被取消。
4、如果Future對象所控制已經被取消,那么使用Future對象的get()方法將拋出CalcellationException異常控制任務的完成
isDone()
任務完成,返回值為boolean類型
get()、get(long,TimeUnit)
get()方法一直等待直到Callable對象的call()方法執行完成並返回結果。如果get()方法在等待結果時線程中斷了,則將拋出一個InterruptException異常。如果call()方法拋出異常那么get()方法也將隨之拋出ExecutionException異常。
get(long timeout,TimeUnit unit):如果調用這個方法時,任務的結果並未准備好,則方法等待所指定的timeout時間。如果等待超過了時間而任務的結果還沒准備好,那么這個方法返回null。
思考:get()方法用來接收call()函數的返回值,因為call()函數是交由線程執行的,所以會等到所有線程執行完畢后才能得到正確的執行結果。所以在線程沒有執行完成時,get()方法將一直阻塞。
FutureTask中的get()方法實現:可以看到,如果狀態為非完成,則調用函數awaitDone()等待完成。
1 public V get() throws InterruptedException, ExecutionException {
2         int s = state;
3         if (s <= COMPLETING)
4             s = awaitDone(false, 0L);
5         return report(s);
6     }
FutureTask:done()
FutureTask是Future的實現類,除了實現Future的功能外,有一個done()方法需要注意: 用來控制執行器中任務的完成
done()方法允許在執行器中的任務執行結束后,還可以執行一些后續操作。可以用來產生報表,通過郵件發送結果或者釋放一些系統資源。當任務執行完成是受FutureTask類控制時,這個方法在內部被FutureTask類調用。在任務結果設置后以及任務的狀態已改為isDone()之后,無論任務是否被取消或者正常結束,done()方法才被調用。
默認情況下,done()方法的實現為空,我們可以覆蓋FutureTask類並實現done()方法來改變這種行為。
 1 public class ResultTask extends FutureTask<String> {
 2 @Override
 3     protected void done() {
 4         if (isCancelled()) {
 5             System.out.printf("%s: Has been cancelled\n",name);
 6         } else {
 7             System.out.printf("%s: Has finished\n",name);
 8         }
 9     }
10 }
CompletionService和ExecutorCompletionService
CompletionService:完成服務
當向Executor提交批處理任務時,並且希望在它們完成后獲得結果,如果用FutureTask,你可以循環獲取task,並用future.get()去獲取結果,但是如果這個task沒有完成,你就得阻塞在這里,這個實效性不高,其實在很多場合,其實你拿第一個任務結果時,此時結果並沒有生成並阻塞,其實在阻塞在第一個任務時,第二個task的任務已經早就完成了,顯然這種情況用future task不合適的,效率也不高。
自己維護list和CompletionService的區別:

1.從list中遍歷的每個Future對象並不一定處於完成狀態,這時調用get()方法就會被阻塞住,如果系統是設計成每個線程完成后就能根據其結果繼續做后面的事,這樣對於處於list后面的但是先完成的線程就會增加了額外的等待時間。

2.而CompletionService的實現是維護一個保存Future對象的BlockingQueue。只有當這個Future對象狀態是結束的時候,才會加入到這個Queue中,take()方法其實就是Producer-Consumer中的Consumer。它會從Queue中取出Future對象,如果Queue是空的,就會阻塞在那里,直到有完成的Future對象加入到Queue中。

CompletionService采取的是BlockingQueue<Future<V>>無界隊列來管理Future。則有一個線程執行完畢把返回結果放到BlockingQueue<Future<V>>里面。就可以通過completionServcie.take().get()取出結果。
類圖如下:
對於批處理任務,完成服務一方面負責去執行(submit),一方面通過take()或者poll()方法可以獲取已完成的任務,任務列表中有任務完成,結果就會返回。
 
處理被執行器拒絕的任務(RejectExecutionHandler)
當我們想結束執行器的執行時,調用shutdown()方法來表示執行器應該結束。但是,執行器只有等待正在運行的任務或者等待執行的任務結束后,才能真正的結束。
如果在shutdown()方法與執行器結束之間發送了一個任務給執行器,這個任務會被拒絕,因為這個時間段執行器已經不再接受任務了。ThreadPoolExecutor類提供了一套機制,當任務被拒絕時調用這套機制來處理它們。
 1 public class RejectedTaskController implements RejectedExecutionHandler {
 2 
 3     @Override
 4     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
 5         System.out.printf("RejectedTaskController: The task %s has been rejected\n",r.toString());
 6         System.out.printf("RejectedTaskController: %s\n",executor.toString());
 7         System.out.printf("RejectedTaskController: Terminating: %s\n",executor.isTerminating());
 8         System.out.printf("RejectedTaksController: Terminated: %s\n",executor.isTerminated());
 9     }
10 }
先提交一個任務,然后shutdown(),接着提交另外一個任務
 1 public static void main(String[] args) {
 2         // Create the controller for the Rejected tasks
 3         RejectedTaskController controller=new RejectedTaskController();
 4         // Create the executor and establish the controller for the Rejected tasks
 5         ThreadPoolExecutor executor=(ThreadPoolExecutor)Executors.newCachedThreadPool();
 6         executor.setRejectedExecutionHandler(controller);
 7         
 8         // Lauch three tasks
 9         System.out.printf("Main: Starting.\n");
10         for (int i=0; i<3; i++) {
11             Task task=new Task("Task"+i);
12             executor.submit(task);
13         }
14         
15         // Shutdown the executor
16         System.out.printf("Main: Shuting down the Executor.\n");
17         executor.shutdown();
18         // Send another task
19         System.out.printf("Main: Sending another Task.\n");
20         Task task=new Task("RejectedTask");
21         executor.submit(task);
22         
23         // The program ends
24         System.out.printf("Main: End.\n");
25         
26     }
執行結果如下:
Main: Starting.
Main: Shuting down the Executor.
Main: Sending another Task.
RejectedTaskController: The task java.util.concurrent.FutureTask@60e53b93 has been rejected
RejectedTaskController: java.util.concurrent.ThreadPoolExecutor@5e2de80c[Shutting down, pool size = 3, active threads = 3, queued tasks = 0, completed tasks = 0]
RejectedTaskController: Terminating: true
RejectedTaksController: Terminated: false
Main: End.
Task Task1: Starting
Task Task0: Starting
Task Task2: Starting
Task Task1: ReportGenerator: Generating a report during 4 seconds
Task Task0: ReportGenerator: Generating a report during 7 seconds
Task Task2: ReportGenerator: Generating a report during 6 seconds
Task Task1: Ending
Task Task2: Ending
Task Task0: Ending
如果執行器調用了shutdown()方法后,原本執行的任務會執行完畢。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM