線程池ThreadPoolExecutor
1、創建線程池 ThreadPoolExecutor()
- corePoolSize:核心線程數[一直存在]。除非設置allowCoreThreadTimeOut,線程池創建以后准備就緒的線程數量。
- maximumPoolSize:最大線程數量,控制資源。
- keepAliveTime:存活時間。如果當前的線程數量大於核心線程數,只要線程空閑時間大於指定的keepAliveTime時間,就釋放空閑的線程(maximumPoolSize-corePoolSize)。
- TimeUnit:存活時間單位。
- BlockingQueue:阻塞隊列。如果任務很多,就會將多的任務放在隊列中,只要有線程空閑,就會去隊列中取出新的任務執行。
- ThreadFactory:線程的創建工廠。
- RejectedExecutionHandler:如果隊列滿了,按照指定的拒絕策略執行任務。
2、工作順序
- 1)、線程池創建,准備核心線程,准備接受任務;
- 2)、新的任務進來,用空閑的核心線程執行任務;
- 3)、核心線程滿了,將再進來的任務放入阻塞隊列中,空閑的核心線程會去阻塞隊列中獲取任務執行;
- 4)、阻塞隊列滿了,就直接開啟新線程執行,最大只能開到max設置的數量;
- 5)、任務執行完成,空閑的線程(最大線程數-核心線程數)會在keepAliveTime指定的時間后自動銷毀,最終保持到核心線程數量;
- 6)、如果線程開到了最大線程數,還有新的任務進來,就會使用指定的拒絕策略進行處理。
3、拒絕策略
- DiscardOldestPolicy:丟棄最老的任務;
- CallerRunsPolicy:同步調用;
- AbortPolicy:丟棄新任務並拋出異常;
- DiscardPolicy:丟棄新任務;
4、CompletableFuture異步編排
CompletableFuture提供了四個靜態方法創建異步任務:
CompletableFuture.runAsync(Runnable runnable);
CompletableFuture.runAsync(Runnable runnable,Executor executor);
CompletableFuture.supplyAsync(Supplier<U> supplier);
CompletableFuture.supplyAsync(Supplier<U> supplier,Executor executor);
其中runXXX沒有返回結果,supplyXXX可以獲取返回結果;
都可以傳入自定義的線程池,否則使用默認的線程池;
1)、whenComplete可以處理正常和異常的計算結果,exceptionally處理異常情況。
whenComplete獲取上任務的結果:
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).whenComplete((result,exception)->{
System.out.println("異步任務完成了,結果:"+result);
System.out.println("異步任務異常:"+exception);
});
}
// 執行結果:
-----main start--------1
當前線程12------當前結果5
異步任務完成了,結果:5
異步任務異常:null
whenComplete獲取上任務拋出的異常信息:
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int a = 10 / 0;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).whenComplete((result,exception)->{
System.out.println("異步任務完成了,結果:"+result);
System.out.println("異步任務異常:"+exception);
});
}
// 執行結果:
-----main start--------1
異步任務完成了,結果:null
異步任務異常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
exceptionally捕獲異常信息,返回默認值
public static void main(String[] args) throws Exception {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int a = 10 / 0;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).whenComplete((result,exception)->{
System.out.println("異步任務完成了,結果:"+result);
System.out.println("異步任務異常:"+exception);
}).exceptionally((exception)->{
System.out.println("捕獲異步任務異常:"+exception);
return 10;
});
System.out.println("任務結果:"+future.get());
}
// 執行結果:
-----main start--------1
異步任務完成了,結果:null
異步任務異常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
捕獲異步任務異常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
任務結果:10
exceptionally可以捕獲到whenComplete的異常
public static void main(String[] args) throws Exception {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).whenComplete((result,exception)->{
System.out.println("異步任務完成了,結果:"+result);
System.out.println("異步任務異常:"+exception);
throw new RuntimeException("whenComplete拋出異常");
}).exceptionally((exception)->{
System.out.println("捕獲異步任務異常:"+exception);
return 10;
});
System.out.println("任務結果:"+future.get());
}
// 執行結果:
-----main start--------1
當前線程12------當前結果5
異步任務完成了,結果:5
異步任務異常:null
捕獲異步任務異常:java.util.concurrent.CompletionException: java.lang.RuntimeException: whenComplete拋出異常
任務結果:10
exceptionally拋出異常,任務結束
public static void main(String[] args) throws Exception {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).whenComplete((result,exception)->{
System.out.println("異步任務完成了,結果:"+result);
System.out.println("異步任務異常:"+exception);
throw new RuntimeException("whenComplete拋出異常");
}).exceptionally((exception)->{
System.out.println("捕獲異步任務異常:"+exception);
throw new RuntimeException("任務執行失敗");
});
System.out.println("任務結果:"+future.get());
}
// 執行結果:
-----main start--------1
當前線程12------當前結果5
異步任務完成了,結果:5
異步任務異常:null
捕獲異步任務異常:java.util.concurrent.CompletionException: java.lang.RuntimeException: whenComplete拋出異常
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: 任務執行失敗
whenComplete 和 whenCompleteAsync 區別:
- whenComplete:使用當前任務的線程繼續執行;
- whenCompleteAsync:把whenCompleteAsync任務繼續提交給線程池來進行執行;
- whenComplete:可以獲取異常信息,但不能處理異常;
- exceptionally:可以獲取異常信息,進行異常處理;
2)、handle方法執行完成后的處理,與complete一樣,可處理異常,也可返回默認值
handle捕獲異常,返回默認值
public static void main(String[] args) throws Exception {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int a = 10 / 0;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).handle((result,exception)->{
System.out.println("異步任務完成了,結果:"+result);
System.out.println("異步任務異常:"+exception);
return 20;
});
System.out.println("任務結果:"+future.get());
}
//執行結果:
-----main start--------1
異步任務完成了,結果:null
異步任務異常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
任務結果:20
handle拋出異常,任務結束
public static void main(String[] args) throws Exception {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int a = 10 / 0;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).handle((result,exception)->{
System.out.println("異步任務完成了,結果:"+result);
System.out.println("異步任務異常:"+exception);
throw new RuntimeException("handle拋出異常");
});
System.out.println("任務結果:"+future.get());
}
//執行結果:
-----main start--------1
異步任務完成了,結果:null
異步任務異常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: handle拋出異常
3)、線程串行化方法
線程串行化thenRun():不能獲取到上一步任務的執行結果,不能捕獲上一步異常,上一步異常任務結束
public static void main(String[] args) throws Exception {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture.supplyAsync(() -> {
int a = 10 / 0;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).thenRun(()->{
System.out.println("任務2啟動了");
});
System.out.println("任務結果:");
}
// 執行結果:
-----main start--------1
任務結果:
線程串行化thenRun():不能獲取到上一步任務的執行結果,無返回值
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).thenRun(()->{
System.out.println("任務2啟動了");
});
System.out.println("任務結果:");
}
// 執行結果:
-----main start--------1
當前線程12------當前結果5
任務2啟動了
任務結果:
線程串行化thenRun():不能獲取到上一步任務的執行結果,無返回值,發生異常時不會拋出異常
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).thenRun(()->{
System.out.println("任務2啟動了");
throw new RuntimeException("任務2失敗了");
});
System.out.println("任務結果:");
}
// 執行結果:
-----main start--------1
當前線程12------當前結果5
任務2啟動了
任務結果:
線程串行化thenAccept():可以獲取上一步結果,異常與thenRun()方法一致
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).thenAccept((result)->{
System.out.println("任務1執行結果"+result);
System.out.println("任務2啟動了");
throw new RuntimeException("任務2失敗了");
});
System.out.println("任務結果:");
}
//執行結果:
-----main start--------1
當前線程12------當前結果5
任務1執行結果5
任務2啟動了
任務結果:
線程串行化thenAccept():可以獲取上一步結果,異常與thenRun()方法一致
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
return a;
}).thenAccept((result)->{
System.out.println("任務1執行結果"+result);
System.out.println("任務2啟動了");
throw new RuntimeException("任務2失敗了");
});
System.out.println("任務結果:");
}
//執行結果:
-----main start--------1
當前線程12------當前結果5
任務1執行結果5
任務2啟動了
任務結果:
線程串行化thenApply():可以獲取上一步結果,有返回值
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程" + Thread.currentThread().getId() + "------當前結果" + a);
return a;
}).thenApply((result) -> {
System.out.println("任務1執行結果" + result);
System.out.println("任務2啟動了");
return 100;
});
Integer result=null;
try {
result = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("任務結果:"+result);
}
//執行結果:
-----main start--------1
當前線程12------當前結果5
任務1執行結果5
任務2啟動了
任務結果:100
線程串行化thenApply():可以獲取上一步結果,有返回值,可拋出異常,上一步發生異常任務結束
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程" + Thread.currentThread().getId() + "------當前結果" + a);
return a;
}).thenApply((result) -> {
System.out.println("任務1執行結果" + result);
System.out.println("任務2啟動了");
throw new RuntimeException("任務2發生異常了");
});
Integer result=null;
try {
result = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("任務結果:"+result);
}
//執行結果:
-----main start--------1
當前線程12------當前結果5
任務1執行結果5
任務2啟動了
任務結果:null
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 任務2發生異常了
thenApply 、thenAccept、thenRun 區別:
- thenApply:當一個線程依賴另一個線程時,獲取上一個任務返回的結果,並返回當前任務的返回值;
- thenAccept:消費處理結果。接收任務處理的結果,並消息處理,無返回結果;
- thenRun:只要上面的任務執行完成,就開始執行thenRun,只是處理完任務后,執行thenRun的后續操作;
4)、兩任務組合,都要完成
runAfterBoth()無返回結果:任務1和任務2執行完成,執行當前任務;任務1和任務2任一拋出異常,任務3不執行;任一任務不會拋出異常;
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程" + Thread.currentThread().getId() + "------任務一結果:" + a);
return a;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int a = 10 / 0;
System.out.println("當前線程" + Thread.currentThread().getId() + "------任務二結果:" + a);
return a;
});
future1.runAfterBoth(future2,()->{
int a=1/0;
System.out.println("任務三開始了"+a);
});
System.out.println("任務結果:");
}
//執行結果:
-----main start--------1
當前線程12------任務一結果:5
任務結果:
runAfterBoth()無返回結果,可得到任務1和任務2結果:任務1和任務2任一拋出異常,任務3不執行;任一任務不會拋出異常;
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程" + Thread.currentThread().getId() + "------任務一結果:" + a);
return a;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程" + Thread.currentThread().getId() + "------任務二結果:" + a);
return a;
});
future1.thenAcceptBothAsync(future2,(f1,f2)->{
System.out.println("任務一結果:"+f1+"---任務二結果:"+f2);
int a=1/0;
System.out.println("任務三開始了"+a);
});
System.out.println("任務結果:");
}
-----main start--------1
當前線程12------任務一結果:5
當前線程12------任務二結果:5
任務結果:
任務一結果:5---任務二結果:5
runAfterBoth()有返回結果,可得到任務1和任務2結果:任務1和任務2任一拋出異常,任務3不執行;可捕獲任務3的異常;
public static void main(String[] args) {
System.out.println("-----main start--------"+Thread.currentThread().getId());
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程" + Thread.currentThread().getId() + "------任務一結果:" + a);
return a;
});
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
int a = 10 / 2;
System.out.println("當前線程" + Thread.currentThread().getId() + "------任務二結果:" + a);
return a;
});
CompletableFuture<Integer> future = future1.thenCombineAsync(future2, (f1, f2) -> {
System.out.println("任務一結果:" + f1 + "---任務二結果:" + f2);
int a = 1 / 0;
System.out.println("任務三開始了" + a);
return a;
});
Integer integer = null;
try {
integer = future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("任務結果:"+integer);
}
-----main start--------1
當前線程12------任務一結果:5
當前線程12------任務二結果:5
任務一結果:5---任務二結果:5
任務結果:null
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
thenCombine 、thenAcceptBoth、runAfterBoth 區別:
- thenCombine:組合兩個future,獲取兩個future的返回結果,並返回當前任務的返回值;
- thenAcceptBoth:組合兩個future,獲取兩個future任務的返回結果,然后處理任務,沒有返回值;
- runAfterBoth:組合兩個future,不需要獲取future的結果,只需要兩個future處理完任務后,處理該任務;
5)、兩任務組合,一個完成
applyToEither 、acceptEither、runAfterEither 區別:
- applyToEither:兩個任務有一個執行完成,獲取它的返回值,處理任務並有新的返回值;
- acceptEither:兩個任務有一個執行完成,獲取它的返回值,處理任務,沒有新的返回值;
- runAfterEither:兩個任務有一個執行完成,不需要獲取future的結果,處理任務,也沒有返回值;
6)、多任務組合
allOf 、anyOf 區別:
- allOf:等待所有任務完成
- anyOf:只要有一個任務完成