FutureTask異步完成任務,獲取返回值


 

隨手記

問題背景:

使用FutureTask來做異步任務,但是當需要獲取返回值,futureTask.get()方法卻是非異步執行的,不符合需求,如果還想在任務完成后記錄日志什么的,更不用考慮了。

問題研究:

使用CompletableFuture代替FutureTask。

CompletableFuture簡單使用方式如下:

  public static void main(String[] args) {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
                System.out.println("future task is called");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("future task is done");
                return "future task is done";
        });
        future.thenAccept((result) -> {
            System.out.println("執行完成!");
        });
        future.exceptionally(t -> {
            System.out.println("執行失敗!"+t.getMessage());
            return null;
        });
//        Thread.sleep(3000);
    }

 結果輸出:

是的,問題來了,當使用main方法測試時,發現主線程執行完畢后,程序就會結束,導致任務執行完畢回調的方法沒有觸發。當設置sleep3秒時,才可以打印出來。

public static void main(String[] args) throws InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{
                System.out.println("future task is called");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("future task is done");
                return "future task is done";
        });
        future.thenAccept((result) -> {
            System.out.println("執行完成!");
        });
        future.exceptionally(t -> {
            System.out.println("執行失敗!"+t.getMessage());
            return null;
        });
        Thread.sleep(3000);
    }

結果:

 

但是,實際應用中,誰也不知道任務要執行多久,這種方法好像有點問題......

問題解決:

但是后來又仔細一想,又好像有點不對,實際的應用場景不需要考慮主線程結束的問題。

因為這個這個任務是放到springboot網站中跑的,而springboot的主線程好像不會停!不然網站就掛了啊!

從main轉到網站中測試后,果然沒問題。

 public Object work(FutureUtils job, DownloadTask downloadTask, DownloadTaskService downloadTaskService) {
        CompletableFuture<Object> future = CompletableFuture.supplyAsync(job::run);
        future.thenAccept((result) -> {
            Date ed = new Date();
            downloadTask.setEndTime(ed);
            downloadTask.setTaskStatus("S");
            downloadTask.setTaskProgress(100.0);
            downloadTask.setUsingTime(ed.getTime() - downloadTask.getStartTime().getTime());
            downloadTaskService.downloadTaskUpdate(downloadTask);
            System.out.println("執行完成!");
        });
        future.exceptionally(e -> {
            String msg = e.toString();
            log.error("任務執行失敗!{}", msg);
            Date ed = new Date();
            downloadTask.setEndTime(ed);
            downloadTask.setTaskStatus("E");
            downloadTask.setUsingTime(ed.getTime() - downloadTask.getStartTime().getTime());
            downloadTask.setMsg(msg.substring(0, Math.min(msg.length(), 2000)));
            downloadTaskService.downloadTaskUpdate(downloadTask);
            return null;
        });
        return null;
    }

坑一:

如果定義的job有異常拋出,千萬不要使用catch去捕獲,否則外層的 future.exceptionally無法捕獲異常,無法統一記錄日志。

但是如果把job中的異常往外拋,則job方法的定義勢必會聲明有異常拋出,那問題來了,在CompletableFuture.supplyAsync中自定義的方法

new Supplier<Object>(){},它只有一個方法get,而這個get是override的,父類並沒有拋出異常,也就是說,這個異常拋不出了......而在里面catch了,就無法統一異常處理。

還好有個nb的注解@SneakyThrows,這樣就不需要拋異常,也不需要catch了。有異常時,future.exceptionally也可以正常捕獲!

     CompletableFuture<Object> future = CompletableFuture.supplyAsync(new Supplier<Object>() {
            @SneakyThrows
            @Override
            public Object get()  {
                return job.run();
            }
        });

多任務拓展:

多任務異步執行,一批任務都執行完畢后,執行某個方法:

@Data
public class User {
    private String name;
    private String age;
    User(){}
    User(String name,String age){
        this.name = name;
        this.age = age;
    }
}

public class CompleteFutureTest {

    private static List<User> shops = Arrays.asList(new User("shop1","1"),
            new User("shop2","1"),
            new User("shop3","1"),
            new User("shop4","1"),
            new User("shop5","1"),
            new User("shop6","1"),
            new User("shop7","1"),
            new User("shop8","1")
    );

    public static void main(String[] args) throws InterruptedException {
       //定義任務列表
        List<CompletableFuture<String>> priceFuture = shops.stream().map(shop -> CompletableFuture
                .supplyAsync(() -> String.format("%s price is %s ", shop.getName(), shop.getAge())))
                .collect(Collectors.toList());

        //定義每個任務完成后做的事情
        priceFuture.forEach(futureResult ->{
            futureResult.thenAccept(CompleteFutureTest::allDone);
        });

      // 主線程不要立刻結束,否則CompletableFuture默認使用的線程池會立刻關閉:
        Thread.sleep(30000);
    }

    public static final  int count = shops.size();

    public static AtomicInteger atomicInteger = new AtomicInteger(0);

    static void allDone(String s) {
        int result = atomicInteger.addAndGet(1);
        System.out.println(result);
        if(count == result){
            System.out.println("all done");
        }
    }
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM