futureTask用法
深入学习FutureTask 主要讲解了如何去使用futureTask来创建多线程任务,并获取任务的结果。
Callable接口:实现这个接口的类,可以在这个类中定义需要执行的方法和返回结果类型。
MyTask.java类

public class MyTask implements Callable<Object>{ private String args1; private String args2; //构造函数,用来向task中传递任务的参数
public MyTask(String args1,String args2) { this.args1=args1; this.args2=args2; } //任务执行的动作
@Override public Object call() throws Exception { for(int i=0;i<100;i++){ System.out.println(args1+args2+i); } return true; } }
FutureTask使用方法

public static void main(String[] args) { MyTask myTask = new MyTask("11", "22");//实例化任务,传递参数
FutureTask<Object> futureTask = new FutureTask<>(myTask);//将任务放进FutureTask里 //采用thread来开启多线程,futuretask继承了Runnable,可以放在线程池中来启动执行
Thread thread = new Thread(futureTask); thread.start(); try { //get():获取任务执行结果,如果任务还没完成则会阻塞等待直到任务执行完成。如果任务被取消则会抛出CancellationException异常, //如果任务执行过程发生异常则会抛出ExecutionException异常,如果阻塞等待过程中被中断则会抛出InterruptedException异常。
boolean result = (boolean) futureTask.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); } }
另外一种方式来开启线程
ExecutorService executorService=Executors.newCachedThreadPool(); executorService.submit(futureTask); executorService.shutdown();
多个任务,开启多线程去执行,并依次获取返回的执行结果

public static void main(String[] args) { //创建一个FutureTask list来放置所有的任务
List<FutureTask<Object>> futureTasks=new ArrayList<>(); for(Integer i=0;i<10;i++){ MyTask myTask=new MyTask(i.toString(), i.toString()); futureTasks.add(new FutureTask<>(myTask)); } //创建线程池后,依次的提交任务,执行
ExecutorService executorService=Executors.newCachedThreadPool(); for(FutureTask<Object> futureTask:futureTasks){ executorService.submit(futureTask); } executorService.shutdown(); //根据任务数,依次的去获取任务返回的结果,这里获取结果时会依次返回,若前一个没返回,则会等待,阻塞
for(Integer i=0;i<10;i++){ try { String flag=(String)futureTasks.get(i).get(); System.out.println(flag); } catch (Exception e) { e.printStackTrace(); } } }
FutureTask原理,源码分析
Furure与Callable
1)Callable接口的call()方法可以有返回值,而Runnable接口的run()方法没有返回值。
2)Callable接口的call()方法可以声明抛出异常,而Runnable接口的run()方法不可以声明抛出异常。执行完Callable接口中的任务后,返回值是通过Future接口进行获得的。
方法get()结合ExecutorService中的submit(Callable<T>)的使用

package futureTest; import java.util.concurrent.Callable; public class Mycallable implements Callable { private int age; public Mycallable (int age){ super(); this.age=age; } public Object call() throws Exception { Thread.sleep(8000); return "this age is "+age; } } package futureTest; import java.util.concurrent.*; public class future_callable_1 { public static void main(String[] args) { try { //方法submit(Callable<T>)可以执行参数为Callable的任务。 //方法get()用于获得返回值。
Mycallable mycallable=new Mycallable(11); ThreadPoolExecutor executor=new ThreadPoolExecutor(2,3,5L, TimeUnit.SECONDS, new LinkedBlockingDeque()); Future<String> future= executor.submit(mycallable); System.out.println(future.get());//从控制台打印的结果来看,方法get()具有阻塞特性
executor.shutdown(); System.out.println("main end"); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
方法get()结合ExecutorService中的submit(Runnable)和isDone()的使用

package futureTest; import java.util.concurrent.*; public class future_callable_1 { public static void main(String[] args) { try { //方法submit()不仅可以传入Callable对象,也可以传入Runnable对象, // 说明submit()方法支持有返回值和无返回值的功能。
Runnable runnable = new Runnable() { @Override public void run() { System.out.println("running...."); } }; ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 3, 5L, TimeUnit.SECONDS, new LinkedBlockingDeque()); Future future= executor.submit(runnable); System.out.println(future.get()); System.out.println(future.isDone()); //如果submit()方法传入Callable接口则可以有返回值,如果传入Runnable则无返回值,打印的结果就是null。 // 方法get()具有阻塞特性,而isDone()方法无阻塞特性。
executor.shutdown(); System.out.println("main end"); }catch (Exception e){ e.printStackTrace(); } } }
使用ExecutorService接口中的方法submit(Runnable, T result)

package future_callable_3; import lombok.AllArgsConstructor; import lombok.Data; @Data @AllArgsConstructor public class UserInfo { private String userName; private String password; } package future_callable_3; public class MyRunnable implements Runnable { private UserInfo userInfo; public MyRunnable(UserInfo userInfo){ super(); this.userInfo=userInfo; } @Override public void run() { userInfo.setUserName("111"); userInfo.setPassword("2232"); } } package future_callable_3; import java.util.concurrent.*; public class Main { public static void main(String[] args) { try { //方法submit(Runnable, T result)的第2个参数result可以作为执行结果的返回值, // 而不需要使用get()方法来进行获得。
UserInfo userInfo=new UserInfo("1","2"); MyRunnable myRunnable=new MyRunnable(userInfo); ThreadPoolExecutor executor=new ThreadPoolExecutor(10,10,10, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); Future<UserInfo> userInfoFuture=executor.submit(myRunnable,userInfo); userInfo=(UserInfo) userInfoFuture.get(); System.out.println(userInfo.getUserName()+" "+userInfo.getPassword()); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
方法cancel(boolean mayInterruptIfRunning)和isCancelled()的使用
方法cancel(boolean mayInterruptIfRunning)的参数mayInterruptIfRunning的作用是:
如果线程正在运行则是否中断正在运行的线程,在代码中需要使用if (Thread.currentThread().isInterrupted())进行配合。
方法cancel()的返回值代表发送取消任务的命令是否成功完成。

package future_callable_4; import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { @Override public String call() throws Exception { Thread.sleep(2000); return "My age is 12"; } } package future_callable_4; import lombok.SneakyThrows; import java.util.concurrent.*; public class Test { @SneakyThrows public static void main(String[] args) { //方法cancel(boolean mayInterruptIfRunning)的参数mayInterruptIfRunning的作用是: // 如果线程正在运行则是否中断正在运行的线程, // 在代码中需要使用if (Thread.currentThread().isInterrupted())进行配合。 // 方法cancel()的返回值代表发送取消任务的命令是否成功完成。
MyCallable myCallable=new MyCallable(); ExecutorService executorService=new ThreadPoolExecutor(50,Integer.MAX_VALUE,5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); Future<String> future=executorService.submit(myCallable); System.out.println(future.get()); System.out.println(future.cancel(true)); System.out.println(future.isDone()); System.out.println(future.isCancelled()); //从打印的结果来看,线程任务已经运行完毕, // 线程对象已经销毁,所以方法cancel()的返回值是false,代表发送取消的命令并没有成功。
} }

package future_callable_4; import java.util.concurrent.Callable; public class MyCallable implements Callable<String> { @Override public String call() throws Exception { int i=1; while (i==1){ if(Thread.currentThread().isInterrupted()){ throw new InterruptedException(); } } return "111"; } } package future_callable_4; import lombok.SneakyThrows; import java.util.concurrent.*; public class Test { @SneakyThrows public static void main(String[] args) { //任务在没有运行完成之前执行了cancel()方法返回为true,代表成功发送取消的命令。 // 前面介绍过参数mayInterruptIfRunning具有中断线程的作用, // 并且需要结合代码if(Thread.currentThread().isInterrupted())来进行实现
MyCallable myCallable=new MyCallable(); ExecutorService executorService=new ThreadPoolExecutor(50,Integer.MAX_VALUE,5, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>()); Future<String> future=executorService.submit(myCallable); System.out.println(future.cancel(true)); System.out.println(future.isDone()); System.out.println(future.isCancelled()); //cancel()方法返回true代表发送中断线程的命令发送成功。
} }
如果不结合if (Thread.currentThread().isInterrupted())代码会是什么效果呢?
从打印的结果来看,线程并未中断运行,返回true代表发送中断线程的命令是成功的,但是否中断要取决于有没有if (Thread. currentThread().isInterrupted())代码。
如果方法cacnel()传入的参数是false有什么效果呢?
从打印的结果来看,输出了一个true,则代表成功发送取消的命令,但由于cancel()方法的参数值是false,所以线程并没有中断一直在运行。
方法get(long timeout, TimeUnit unit)的使用
方法get(long timeout, TimeUnit unit)的作用是在指定的最大时间内等待获得返回值。
异常的处理
接口Callable任务在执行时有可能会出现异常,那在Callable中异常是如何处理的呢?
如果出现异常,则进入catch语句,不再继续执行get()方法了,这与通过for循环调用get()方法时的效果是一样的,不再继续执行for循环,直接进入catch语句块
自定义拒绝策略RejectedExecutionHandler接口的使用
接口RejectedExecutionHandler的主要作用是当线程池关闭后依然有任务要执行时,可以实现一些处理。

package RejectedExecutionHandlerTest; import java.util.concurrent.FutureTask; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadPoolExecutor; public class MyRejectHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { System.out.println(((FutureTask)r).toString()+" is rejected"); } } package RejectedExecutionHandlerTest; public class MyRunnable implements Runnable { private String userName; public MyRunnable(String userName){ super(); this.userName=userName; } @Override public void run() { System.out.println(userName+" is running"); } } package RejectedExecutionHandlerTest; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; public class Main { public static void main(String[] args) { ExecutorService service= Executors.newCachedThreadPool(); ThreadPoolExecutor executor=(ThreadPoolExecutor)service; executor.setRejectedExecutionHandler(new MyRejectHandler()); MyRunnable myRunnable1=new MyRunnable("1"); executor.submit(myRunnable1); MyRunnable myRunnable2=new MyRunnable("2"); executor.submit(myRunnable2); executor.shutdown(); MyRunnable myRunnable3=new MyRunnable("3"); executor.submit(myRunnable3); } }
方法execute()与submit()的区别
方法execute()没有返回值,而submit()方法可以有返回值。
方法execute()在默认的情况下异常直接抛出,不能捕获,但可以通过自定义Thread-Factory的方式进行捕获,而submit()方法在默认的情况下,可以catch Execution-Exception捕获异常。
(1)有/无返回值的测试
从运行结果来看,execute()没有返回值,而submit()方法具有返回值的功能。
(2)execute()出现异常后直接打印堆栈信息,而submit()方法可以捕获
(3)execute()方法异常也可以捕获
验证Future的缺点
接口Future的实现类是FutureTask.java,而且在使用线程池时,默认的情况下也是使用FutureTask. java类作为接口Future的实现类,
也就是说,如果在使用Future与Callable的情况下,使用Future接口也就是在使用FutureTask.java类。
Callable接口与Runnable接口在对比时主要的优点是,
Callable接口可以通过Future取得返回值。
但需要注意的是,Future接口调用get()方法取得处理的结果值时是阻塞性的,
也就是如果调用Future对象的get()方法时,任务尚未执行完成,则调用get()方法时一直阻塞到此任务完成时为止。
如果是这样的效果,则前面先执行的任务一旦耗时很多,则后面的任务调用get()方法就呈阻塞状态,也就是排队进行等待,大大影响运行效率。
也就是主线程并不能保证首先获得的是最先完成任务的返回值,这就是Future的缺点,影响效率。
根据这个特性,JDK1.5提供了CompletionService接口可以解决这个问题