隨手記
問題背景:
使用FutureTask來做異步任務,但是當需要獲取返回值,futureTask.get()方法卻是非異步執行的,不符合需求,如果還想在任務完成后記錄日志什么的,更不用考慮了。
問題研究:
使用CompletableFuture代替FutureTask。
CompletableFuture簡單使用方式如下:
public static void main(String[] args) {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
System.out.println("future task is called");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("future task is done");
return "future task is done";
});
future.thenAccept((result) -> {
System.out.println("執行完成!");
});
future.exceptionally(t -> {
System.out.println("執行失敗!"+t.getMessage());
return null;
});
// Thread.sleep(3000);
}
結果輸出:

是的,問題來了,當使用main方法測試時,發現主線程執行完畢后,程序就會結束,導致任務執行完畢回調的方法沒有觸發。當設置sleep3秒時,才可以打印出來。
public static void main(String[] args) throws InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
System.out.println("future task is called");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("future task is done");
return "future task is done";
});
future.thenAccept((result) -> {
System.out.println("執行完成!");
});
future.exceptionally(t -> {
System.out.println("執行失敗!"+t.getMessage());
return null;
});
Thread.sleep(3000);
}
結果:

但是,實際應用中,誰也不知道任務要執行多久,這種方法好像有點問題......
問題解決:
但是后來又仔細一想,又好像有點不對,實際的應用場景不需要考慮主線程結束的問題。
因為這個這個任務是放到springboot網站中跑的,而springboot的主線程好像不會停!不然網站就掛了啊!
從main轉到網站中測試后,果然沒問題。
public Object work(FutureUtils job, DownloadTask downloadTask, DownloadTaskService downloadTaskService) {
CompletableFuture<Object> future = CompletableFuture.supplyAsync(job::run);
future.thenAccept((result) -> {
Date ed = new Date();
downloadTask.setEndTime(ed);
downloadTask.setTaskStatus("S");
downloadTask.setTaskProgress(100.0);
downloadTask.setUsingTime(ed.getTime() - downloadTask.getStartTime().getTime());
downloadTaskService.downloadTaskUpdate(downloadTask);
System.out.println("執行完成!");
});
future.exceptionally(e -> {
String msg = e.toString();
log.error("任務執行失敗!{}", msg);
Date ed = new Date();
downloadTask.setEndTime(ed);
downloadTask.setTaskStatus("E");
downloadTask.setUsingTime(ed.getTime() - downloadTask.getStartTime().getTime());
downloadTask.setMsg(msg.substring(0, Math.min(msg.length(), 2000)));
downloadTaskService.downloadTaskUpdate(downloadTask);
return null;
});
return null;
}
坑一:
如果定義的job有異常拋出,千萬不要使用catch去捕獲,否則外層的 future.exceptionally無法捕獲異常,無法統一記錄日志。
但是如果把job中的異常往外拋,則job方法的定義勢必會聲明有異常拋出,那問題來了,在CompletableFuture.supplyAsync中自定義的方法
new Supplier<Object>(){},它只有一個方法get,而這個get是override的,父類並沒有拋出異常,也就是說,這個異常拋不出了......而在里面catch了,就無法統一異常處理。
還好有個nb的注解@SneakyThrows,這樣就不需要拋異常,也不需要catch了。有異常時,future.exceptionally也可以正常捕獲!
CompletableFuture<Object> future = CompletableFuture.supplyAsync(new Supplier<Object>() {
@SneakyThrows
@Override
public Object get() {
return job.run();
}
});
多任務拓展:
多任務異步執行,一批任務都執行完畢后,執行某個方法:
@Data
public class User {
private String name;
private String age;
User(){}
User(String name,String age){
this.name = name;
this.age = age;
}
}
public class CompleteFutureTest {
private static List<User> shops = Arrays.asList(new User("shop1","1"),
new User("shop2","1"),
new User("shop3","1"),
new User("shop4","1"),
new User("shop5","1"),
new User("shop6","1"),
new User("shop7","1"),
new User("shop8","1")
);
public static void main(String[] args) throws InterruptedException {
//定義任務列表
List<CompletableFuture<String>> priceFuture = shops.stream().map(shop -> CompletableFuture
.supplyAsync(() -> String.format("%s price is %s ", shop.getName(), shop.getAge())))
.collect(Collectors.toList());
//定義每個任務完成后做的事情
priceFuture.forEach(futureResult ->{
futureResult.thenAccept(CompleteFutureTest::allDone);
});
// 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:
Thread.sleep(30000);
}
public static final int count = shops.size();
public static AtomicInteger atomicInteger = new AtomicInteger(0);
static void allDone(String s) {
int result = atomicInteger.addAndGet(1);
System.out.println(result);
if(count == result){
System.out.println("all done");
}
}
}
