1.JDK5引入了Future進行異步任務的處理,Future 的接口主要方法有以下幾個:
(1)boolean cancel (boolean mayInterruptIfRunning) 取消任務的執行。參數指定是否立即中斷任務執行,或者等等任務結束
(2)boolean isCancelled () 任務是否已經取消,任務正常完成前將其取消,則返回 true
(3)boolean isDone () 任務是否已經完成。需要注意的是如果任務正常終止、異常或取消,都將返回true
(4)V get () throws InterruptedException, ExecutionException 等待任務執行結束,然后獲得V類型的結果。InterruptedException 線程被中斷異常, ExecutionException任務執行異常,如果任務被取消,還會拋出CancellationException
(5) get (long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException 同上面的get功能一樣,多了設置超時時間。參數timeout指定超時時間,uint指定時間的單位,在枚舉類TimeUnit中有相關的定義。如果計 算超時,將拋出TimeoutException
一般情況下Future 配合Callable 使用,獲取異步任務執行的結果,一般使用get()方法設置超時時間,但是在任務執行結束前的這段時間內線程是阻塞的,也就不說異步的了。同時為了獲取一般只能采取輪詢isDone()方法,這樣就顯得使用方法很單一,無法適應復雜情況下的異步任務編排。
- RunAsync 執行異步任務,無返回值
package completablefuture; import java.util.concurrent.CompletableFuture; /** * @Author lizhilong * @create 2019/11/18 18:07 * @desc * runAsyn 無返回結果,執行get()方法時,任務被觸發。 */ public class RunAsync { public static void main(String [] args) { CompletableFuture<Void> future = CompletableFuture.runAsync(()->{ System.out.println("Hello"); }); System.out.println("--------------"); try { future.get(); }catch (Exception e){ e.printStackTrace(); } } }
- SupplyAsync 執行異步任務,有返回值
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/18 18:17 * @desc supplyAsync 方法有返回值,在get()方法后被觸發 */ public class SupplyAsync { public static void main(String[] args){ CompletableFuture<String> future1 = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { return "Hello"; } }); System.out.println("-------------"); try { String s = future1.get(); System.out.println(s); }catch (Exception e){ e.printStackTrace(); } } }
- thenApply 在上個方法執行結束后將返回值作為入參執行下個方法。
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/18 18:41 * @desc * 以Async結尾的方法都會異步執行 * thenApply/thenApplyAsync 會在上個方法執行完之后然后繼續執行 */ public class ThenApply { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } return "Hello"; } }).thenApplyAsync(s1 -> { return s1+"=="+"World"; }); try { String s = future.get(); System.out.println(s); }catch (Exception e){ e.printStackTrace(); } } }
運行結果
Hello==World
- ThenAccept 消耗上個任務執行的結果,無返回值。
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/18 19:20 * @desc * thenAccept 對上個任務產生的結果進行消耗,與ThenApply 不同的是無返回結果 * 所以第二個thenAccept 返回 null */ public class ThenAccept { public static void main(String[] args) { CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { return "hello"; } }).thenAccept(s1 -> System.out.println(s1+" world")) .thenAccept(s2-> System.out.println("---"+s2)); try { future.get(); }catch (Exception e){ e.printStackTrace(); } } }
執行結果:
hello world ---null
- ThenRun 不關心上一步的執行結果,在上一步任務執行結束后執行下一步任務
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/19 11:44 * @desc thenRun 不關心上一步執行的結果,上一步執行結束后執行下一步 * thenRunAsync 異步執行 */ public class ThenRun { public static void main(String[] args) { CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(3000); }catch (Exception e){ e.printStackTrace(); } return "Hello"; } }).thenRunAsync(new Runnable() { @Override public void run() { System.out.println("World"); } }); try { future.get(); }catch (Exception e ){ e.printStackTrace(); } } }
運行結果:
World
- ThenApplyWithExecutor 在自定義的線程池執行異步任務
package completablefuture; import java.util.concurrent.*; /** * @Author lizhilong * @create 2019/11/18 18:59 * @desc * 自定義線程池的方式來處理有先后順序的任務 */ public class ThenApplyWithExecutor { public static void main(String[] args) { ExecutorService service = Executors.newCachedThreadPool(); CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{ return "Hello"; },service).thenApplyAsync(s1->{ return s1 + " World"; },service).thenApplyAsync(s2 ->{ return s2 +" China"; },service); try { String s = future.get(); System.out.println(s); }catch (Exception e){ e.printStackTrace(); }finally { service.shutdownNow(); } } }
運行結果:
Hello World China
- runAfterBoth/runAfterBothAsync 在前面任務執行結束后執行新的任務
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/19 15:33 * @desc runAfterBothAsync 忽略前面任務的執行結果,在前面任務執行結束之后在執行后面的runable任務 */ public class RunAfterBothAsync { public static void main(String[] args) { CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } System.out.println("st1 end"); return "Hello"; } }).runAfterBothAsync(CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(7000); } catch (Exception e) { e.printStackTrace(); } System.out.println("st2 end"); return "World"; } }), new Runnable() { @Override public void run() { System.out.println("I LOVE CHINA"); } }); try { future.get(); }catch (Exception e){ e.printStackTrace(); } } }
運行結果:
st1 end
st2 end
I LOVE CHINA
(2)任務結果消費/組合
- thenCombine/thenCombineAsync 任務的運行結果進行組合后輸出
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/19 13:40 * @desc thenCombineAsync 將任務的執行結果進行合並后輸出 * 最后的合並必操作須等兩個任務都執行結束后才可以進行 */ public class CompletionStage { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } System.out.println("st1 end"); return "Hello"; } }).thenCombineAsync( CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(7000); } catch (Exception e) { e.printStackTrace(); } System.out.println("st2 end"); return "World"; } } ), (r1, r2) -> r1 + " " + r2); try { String s = future.get(); System.out.println(s); } catch (Exception e) { e.printStackTrace(); } } }
運行結果:
st1 end
st2 end
Hello World
- ThenAcceptBothAsync 對異步任務的執行結果進行消耗,無返回值
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/19 14:59 * @desc thenAcceptBothAsync 消費任務的執行結果,無返回值 * 消費動作的執行發生在任務均完成的情況下 */ public class ThenAcceptBothAsync { public static void main(String[] args) { CompletableFuture<Void> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } System.out.println("st1 end"); return "Hello"; } }).thenAcceptBothAsync(CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(7000); }catch (Exception e){ e.printStackTrace(); } System.out.println("st2 end"); return "World"; } }) ,(r1, r2) -> System.out.println(r1 +" "+r2)); try { future.get(); }catch (Exception e){ e.printStackTrace(); } } }
運行結果:
st1 end
st2 end
Hello World
(3)根據任務執行完成的先后順訊進行后續操作
- applyToEither/applyToEitherAsync 獲取最先執行完成的任務的結果
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/19 15:43 * @desc ApplyToEitherAsync 獲取多個任務執行最快的任務結果 */ public class ApplyToEitherAsync { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } System.out.println("st1 end"); return "CHINA"; } }).applyToEitherAsync(CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(7000); } catch (Exception e) { e.printStackTrace(); } System.out.println("st2 end"); return "AUS"; } }), (r) -> { return r; }).applyToEitherAsync(CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } System.out.println("st3 end"); return "UK"; } }), r1->{ return r1; }); try { String s = future.get(); System.out.println(s); }catch (Exception e){ e.printStackTrace(); } } }
運行結果:
st3 end
UK
- acceptEither/acceptEitherAsync 消耗最先執行完的任務的返回結果
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/19 16:13 * @desc AcceptEitherAsync 消費最先完成的任務返回的結果 */ public class AcceptEitherAsync { public static void main(String[] args) throws Exception{ CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } return "Hello"; } }).acceptEitherAsync( CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } return "World"; } }), System.out::println).get(); } }
運行結果:
Hello
- RunAfterEither/RunAfterEitherAsync 在前面的任務有任何一個完成后運行
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/19 18:28 * @desc RunAfterEither 是在前面任務有一個完成以后再去執行的,即最先完成的任務后運行 * */ public class RunAfterEither { public static void main(String[] args) throws Exception{ CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(3000); } catch (Exception e) { e.printStackTrace(); } System.out.println("st1 end"); return "Hello"; } }).runAfterEitherAsync(CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try { Thread.sleep(5000); } catch (Exception e) { e.printStackTrace(); } System.out.println("st2 end"); return "World"; } }), new Runnable() { @Override public void run() { System.out.println("I LOVE CHINA"); } }).get(); } }
運行結果:
st1 end
I LOVE CHINA
(4)任務完成時
- complete 任務完成后執行后續操作
package completablefuture; import java.util.concurrent.CompletableFuture; /** * @Author lizhilong * @create 2019/11/18 18:21 * @desc 任務完成以后 執行后續操作 */ public class Complete { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(()->{ return "Hello"; }); future.complete("world"); System.out.println("----------------"); try { String s = future.get(); System.out.println(s); }catch (Exception e){ e.printStackTrace(); } } }
運行結果:
----------------
world
- whenCompleteAsync/whenComplete 在前面任務執行完成后執行后續操作,可以獲取前面任務的執行結果和異常
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/19 18:46 * @desc WhenComplete 任務完成后執行相應操作,可以獲取上步任務執行的結果或者異常 */ public class WhenComplete { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { if (1==2) { throw new RuntimeException("測試異常"); } return "Hello"; } }).whenCompleteAsync((s, e) -> { System.out.println(s); System.out.println(e.getMessage()); }); try { String s = future.get(); System.out.println(s); }catch (Exception e){ System.out.println(e.getMessage()); } } }
運行結果:
Hello
java.lang.NullPointerException
(5)任務執行異常
- completeExceptionally 任務完成后拋異常
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/18 18:30 * @desc 任務完成以后拋異常 */ public class CompleteExceptionally { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { return "Hello"; } }); future.completeExceptionally( new Exception()); try { String s = future.get(); System.out.println(s); }catch (Exception e){ e.printStackTrace(); } } }
運行結果:
java.util.concurrent.ExecutionException: java.lang.Exception at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) at completablefuture.CompleteExceptionally.main(CompleteExceptionally.java:22) Caused by: java.lang.Exception at completablefuture.CompleteExceptionally.main(CompleteExceptionally.java:20)
- exceptionally 執行任務過程中產生異常
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/19 18:35 * @desc 任務產生異常時進行相應操作 */ public class Exceptionally { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { int x = 10 / 0; return "Hello"; } }).exceptionally(e -> { System.out.println(e.getMessage()); return "World"; }); try { String s = future.get(); System.out.println(s); }catch (Exception e){ } } }
運行結果:
java.lang.ArithmeticException: / by zero
World
- handleAsync/handle 在使用 exceptionally 可以獲取異常時的異常,但是無法獲取正常執行時的結果
異常:
public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { if(true){ throw new RuntimeException("測試異常"); } return "Hello"; } }).handleAsync((r, e) -> { if (e != null) { return e.getMessage(); } return "World"; }); try { System.out.println(future.get()); }catch (Exception e){ e.printStackTrace(); } }
執行結果:
java.lang.RuntimeException: 測試異常
正常:
package completablefuture; import java.util.concurrent.CompletableFuture; import java.util.function.Supplier; /** * @Author lizhilong * @create 2019/11/19 18:58 * @desc */ public class HandleNormal { public static void main(String[] args) { CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { return "Hello"; } }).handleAsync((r, e) -> { if (e != null) { return e.getMessage(); } return "World"; }); try { System.out.println(future.get()); }catch (Exception e){ e.printStackTrace(); } } }
執行結果:
World