futureTask用法
深入學習FutureTask 主要講解了如何去使用futureTask來創建多線程任務,並獲取任務的結果。
Callable接口:實現這個接口的類,可以在這個類中定義需要執行的方法和返回結果類型。
MyTask.java類

public class MyTask implements Callable<Object>{ private String args1; private String args2; //構造函數,用來向task中傳遞任務的參數
public MyTask(String args1,String args2) { this.args1=args1; this.args2=args2; } //任務執行的動作
@Override public Object call() throws Exception { for(int i=0;i<100;i++){ System.out.println(args1+args2+i); } return true; } }
FutureTask使用方法

public static void main(String[] args) { MyTask myTask = new MyTask("11", "22");//實例化任務,傳遞參數
FutureTask<Object> futureTask = new FutureTask<>(myTask);//將任務放進FutureTask里 //采用thread來開啟多線程,futuretask繼承了Runnable,可以放在線程池中來啟動執行
Thread thread = new Thread(futureTask); thread.start(); try { //get():獲取任務執行結果,如果任務還沒完成則會阻塞等待直到任務執行完成。如果任務被取消則會拋出CancellationException異常, //如果任務執行過程發生異常則會拋出ExecutionException異常,如果阻塞等待過程中被中斷則會拋出InterruptedException異常。
boolean result = (boolean) futureTask.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } }
另外一種方式來開啟線程
ExecutorService executorService=Executors.newCachedThreadPool(); executorService.submit(futureTask); executorService.shutdown();
多個任務,開啟多線程去執行,並依次獲取返回的執行結果

public static void main(String[] args) { //創建一個FutureTask list來放置所有的任務
List<FutureTask<Object>> futureTasks=new ArrayList<>(); for(Integer i=0;i<10;i++){ MyTask myTask=new MyTask(i.toString(), i.toString()); futureTasks.add(new FutureTask<>(myTask)); } //創建線程池后,依次的提交任務,執行
ExecutorService executorService=Executors.newCachedThreadPool(); for(FutureTask<Object> futureTask:futureTasks){ executorService.submit(futureTask); } executorService.shutdown(); //根據任務數,依次的去獲取任務返回的結果,這里獲取結果時會依次返回,若前一個沒返回,則會等待,阻塞
for(Integer i=0;i<10;i++){ try { String flag=(String)futureTasks.get(i).get(); System.out.println(flag); } catch (Exception e) { e.printStackTrace(); } } }
FutureTask原理,源碼分析
Furure與Callable
1)Callable接口的call()方法可以有返回值,而Runnable接口的run()方法沒有返回值。
2)Callable接口的call()方法可以聲明拋出異常,而Runnable接口的run()方法不可以聲明拋出異常。執行完Callable接口中的任務后,返回值是通過Future接口進行獲得的。
方法get()結合ExecutorService中的submit(Callable<T>)的使用

package futureTest; import java.util.concurrent.Callable; public class Mycallable implements Callable { private int age; public Mycallable (int age){ super(); this.age=age; } public Object call() throws Exception { Thread.sleep(8000); return "this age is "+age; } } package futureTest; import java.util.concurrent.*; public class future_callable_1 { public static void main(String[] args) { try { //方法submit(Callable<T>)可以執行參數為Callable的任務。 //方法get()用於獲得返回值。
Mycallable mycallable=new Mycallable(11); ThreadPoolExecutor executor=new ThreadPoolExecutor(2,3,5L, TimeUnit.SECONDS, new LinkedBlockingDeque()); Future<String> future= executor.submit(mycallable); System.out.println(future.get());//從控制台打印的結果來看,方法get()具有阻塞特性
executor.shutdown(); System.out.println("main end"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
方法get()結合ExecutorService中的submit(Runnable)和isDone()的使用

package futureTest; import java.util.concurrent.*; public class future_callable_1 { public static void main(String[] args) { try { //方法submit()不僅可以傳入Callable對象,也可以傳入Runnable對象, // 說明submit()方法支持有返回值和無返回值的功能。
Runnable runnable = new Runnable() { @Override public void run() { System.out.println("running...."); } }; ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 5L, TimeUnit.SECONDS, new LinkedBlockingDeque()); Future future= executor.submit(runnable); System.out.println(future.get()); System.out.println(future.isDone()); //如果submit()方法傳入Callable接口則可以有返回值,如果傳入Runnable則無返回值,打印的結果就是null。 // 方法get()具有阻塞特性,而isDone()方法無阻塞特性。
executor.shutdown(); System.out.println("main end"); }catch (Exception e){ e.printStackTrace(); } } }
使用ExecutorService接口中的方法submit(Runnable, T result)

package future_callable_3; import lombok.AllArgsConstructor; import lombok.Data; @Data @AllArgsConstructor public class UserInfo { private String userName; private String password; } package future_callable_3; public class MyRunnable implements Runnable { private UserInfo userInfo; public MyRunnable(UserInfo userInfo){ super(); this.userInfo=userInfo; } @Override public void run() { userInfo.setUserName("111"); userInfo.setPassword("2232"); } } package future_callable_3; import java.util.concurrent.*; public class Main { public static void main(String[] args) { try { //方法submit(Runnable, T result)的第2個參數result可以作為執行結果的返回值, // 而不需要使用get()方法來進行獲得。
UserInfo userInfo=new UserInfo("1","2"); MyRunnable myRunnable=new MyRunnable(userInfo); ThreadPoolExecutor executor=new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); Future<UserInfo> userInfoFuture=executor.submit(myRunnable,userInfo); userInfo=(UserInfo) userInfoFuture.get(); System.out.println(userInfo.getUserName()+" "+userInfo.getPassword()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
方法cancel(boolean mayInterruptIfRunning)和isCancelled()的使用
方法cancel(boolean mayInterruptIfRunning)的參數mayInterruptIfRunning的作用是:
如果線程正在運行則是否中斷正在運行的線程,在代碼中需要使用if (Thread.currentThread().isInterrupted())進行配合。
方法cancel()的返回值代表發送取消任務的命令是否成功完成。

package future_callable_4; import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(2000); return "My age is 12"; } } package future_callable_4; import lombok.SneakyThrows; import java.util.concurrent.*; public class Test { @SneakyThrows public static void main(String[] args) { //方法cancel(boolean mayInterruptIfRunning)的參數mayInterruptIfRunning的作用是: // 如果線程正在運行則是否中斷正在運行的線程, // 在代碼中需要使用if (Thread.currentThread().isInterrupted())進行配合。 // 方法cancel()的返回值代表發送取消任務的命令是否成功完成。
MyCallable myCallable=new MyCallable(); ExecutorService executorService=new ThreadPoolExecutor(50,Integer.MAX_VALUE,5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); Future<String> future=executorService.submit(myCallable); System.out.println(future.get()); System.out.println(future.cancel(true)); System.out.println(future.isDone()); System.out.println(future.isCancelled()); //從打印的結果來看,線程任務已經運行完畢, // 線程對象已經銷毀,所以方法cancel()的返回值是false,代表發送取消的命令並沒有成功。
} }

package future_callable_4; import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { @Override public String call() throws Exception { int i=1; while (i==1){ if(Thread.currentThread().isInterrupted()){ throw new InterruptedException(); } } return "111"; } } package future_callable_4; import lombok.SneakyThrows; import java.util.concurrent.*; public class Test { @SneakyThrows public static void main(String[] args) { //任務在沒有運行完成之前執行了cancel()方法返回為true,代表成功發送取消的命令。 // 前面介紹過參數mayInterruptIfRunning具有中斷線程的作用, // 並且需要結合代碼if(Thread.currentThread().isInterrupted())來進行實現
MyCallable myCallable=new MyCallable(); ExecutorService executorService=new ThreadPoolExecutor(50,Integer.MAX_VALUE,5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); Future<String> future=executorService.submit(myCallable); System.out.println(future.cancel(true)); System.out.println(future.isDone()); System.out.println(future.isCancelled()); //cancel()方法返回true代表發送中斷線程的命令發送成功。
} }
如果不結合if (Thread.currentThread().isInterrupted())代碼會是什么效果呢?
從打印的結果來看,線程並未中斷運行,返回true代表發送中斷線程的命令是成功的,但是否中斷要取決於有沒有if (Thread. currentThread().isInterrupted())代碼。
如果方法cacnel()傳入的參數是false有什么效果呢?
從打印的結果來看,輸出了一個true,則代表成功發送取消的命令,但由於cancel()方法的參數值是false,所以線程並沒有中斷一直在運行。
方法get(long timeout, TimeUnit unit)的使用
方法get(long timeout, TimeUnit unit)的作用是在指定的最大時間內等待獲得返回值。
異常的處理
接口Callable任務在執行時有可能會出現異常,那在Callable中異常是如何處理的呢?
如果出現異常,則進入catch語句,不再繼續執行get()方法了,這與通過for循環調用get()方法時的效果是一樣的,不再繼續執行for循環,直接進入catch語句塊
自定義拒絕策略RejectedExecutionHandler接口的使用
接口RejectedExecutionHandler的主要作用是當線程池關閉后依然有任務要執行時,可以實現一些處理。

package RejectedExecutionHandlerTest; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class MyRejectHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(((FutureTask)r).toString()+" is rejected"); } } package RejectedExecutionHandlerTest; public class MyRunnable implements Runnable { private String userName; public MyRunnable(String userName){ super(); this.userName=userName; } @Override public void run() { System.out.println(userName+" is running"); } } package RejectedExecutionHandlerTest; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class Main { public static void main(String[] args) { ExecutorService service= Executors.newCachedThreadPool(); ThreadPoolExecutor executor=(ThreadPoolExecutor)service; executor.setRejectedExecutionHandler(new MyRejectHandler()); MyRunnable myRunnable1=new MyRunnable("1"); executor.submit(myRunnable1); MyRunnable myRunnable2=new MyRunnable("2"); executor.submit(myRunnable2); executor.shutdown(); MyRunnable myRunnable3=new MyRunnable("3"); executor.submit(myRunnable3); } }
方法execute()與submit()的區別
方法execute()沒有返回值,而submit()方法可以有返回值。
方法execute()在默認的情況下異常直接拋出,不能捕獲,但可以通過自定義Thread-Factory的方式進行捕獲,而submit()方法在默認的情況下,可以catch Execution-Exception捕獲異常。
(1)有/無返回值的測試
從運行結果來看,execute()沒有返回值,而submit()方法具有返回值的功能。
(2)execute()出現異常后直接打印堆棧信息,而submit()方法可以捕獲
(3)execute()方法異常也可以捕獲
驗證Future的缺點
接口Future的實現類是FutureTask.java,而且在使用線程池時,默認的情況下也是使用FutureTask. java類作為接口Future的實現類,
也就是說,如果在使用Future與Callable的情況下,使用Future接口也就是在使用FutureTask.java類。
Callable接口與Runnable接口在對比時主要的優點是,
Callable接口可以通過Future取得返回值。
但需要注意的是,Future接口調用get()方法取得處理的結果值時是阻塞性的,
也就是如果調用Future對象的get()方法時,任務尚未執行完成,則調用get()方法時一直阻塞到此任務完成時為止。
如果是這樣的效果,則前面先執行的任務一旦耗時很多,則后面的任務調用get()方法就呈阻塞狀態,也就是排隊進行等待,大大影響運行效率。
也就是主線程並不能保證首先獲得的是最先完成任務的返回值,這就是Future的缺點,影響效率。
根據這個特性,JDK1.5提供了CompletionService接口可以解決這個問題