線程池ThreadPoolExecutor最全實戰案例


線程池ThreadPoolExecutor

1、創建線程池 ThreadPoolExecutor()

  • corePoolSize:核心線程數[一直存在]。除非設置allowCoreThreadTimeOut,線程池創建以后准備就緒的線程數量。
  • maximumPoolSize:最大線程數量,控制資源。
  • keepAliveTime:存活時間。如果當前的線程數量大於核心線程數,只要線程空閑時間大於指定的keepAliveTime時間,就釋放空閑的線程(maximumPoolSize-corePoolSize)。
  • TimeUnit:存活時間單位。
  • BlockingQueue:阻塞隊列。如果任務很多,就會將多的任務放在隊列中,只要有線程空閑,就會去隊列中取出新的任務執行。
  • ThreadFactory:線程的創建工廠。
  • RejectedExecutionHandler:如果隊列滿了,按照指定的拒絕策略執行任務。

2、工作順序

  • 1)、線程池創建,准備核心線程,准備接受任務;
  • 2)、新的任務進來,用空閑的核心線程執行任務;
  • 3)、核心線程滿了,將再進來的任務放入阻塞隊列中,空閑的核心線程會去阻塞隊列中獲取任務執行;
  • 4)、阻塞隊列滿了,就直接開啟新線程執行,最大只能開到max設置的數量;
  • 5)、任務執行完成,空閑的線程(最大線程數-核心線程數)會在keepAliveTime指定的時間后自動銷毀,最終保持到核心線程數量;
  • 6)、如果線程開到了最大線程數,還有新的任務進來,就會使用指定的拒絕策略進行處理。

3、拒絕策略

  • DiscardOldestPolicy:丟棄最老的任務;
  • CallerRunsPolicy:同步調用;
  • AbortPolicy:丟棄新任務並拋出異常;
  • DiscardPolicy:丟棄新任務;

4、CompletableFuture異步編排

CompletableFuture提供了四個靜態方法創建異步任務:

CompletableFuture.runAsync(Runnable runnable);

CompletableFuture.runAsync(Runnable runnable,Executor executor);

CompletableFuture.supplyAsync(Supplier<U> supplier);

CompletableFuture.supplyAsync(Supplier<U> supplier,Executor executor);

其中runXXX沒有返回結果,supplyXXX可以獲取返回結果;

都可以傳入自定義的線程池,否則使用默認的線程池;

1)、whenComplete可以處理正常和異常的計算結果,exceptionally處理異常情況。

whenComplete獲取上任務的結果:

	public static void main(String[] args) {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).whenComplete((result,exception)->{
			System.out.println("異步任務完成了,結果:"+result);
			System.out.println("異步任務異常:"+exception);
		});
	}
// 執行結果:
-----main start--------1
當前線程12------當前結果5
異步任務完成了,結果:5
異步任務異常:null

whenComplete獲取上任務拋出的異常信息:

	public static void main(String[] args) {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 0;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).whenComplete((result,exception)->{
			System.out.println("異步任務完成了,結果:"+result);
			System.out.println("異步任務異常:"+exception);
		});
	}
// 執行結果:
-----main start--------1
異步任務完成了,結果:null
異步任務異常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero

exceptionally捕獲異常信息,返回默認值

	public static void main(String[] args) throws Exception {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 0;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).whenComplete((result,exception)->{
			System.out.println("異步任務完成了,結果:"+result);
			System.out.println("異步任務異常:"+exception);
		}).exceptionally((exception)->{
			System.out.println("捕獲異步任務異常:"+exception);
			return 10;
		});
		System.out.println("任務結果:"+future.get());
	}
// 執行結果:
-----main start--------1
異步任務完成了,結果:null
異步任務異常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
捕獲異步任務異常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
任務結果:10

exceptionally可以捕獲到whenComplete的異常

	public static void main(String[] args) throws Exception {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).whenComplete((result,exception)->{
			System.out.println("異步任務完成了,結果:"+result);
			System.out.println("異步任務異常:"+exception);
			throw new RuntimeException("whenComplete拋出異常");
		}).exceptionally((exception)->{
			System.out.println("捕獲異步任務異常:"+exception);
			return 10;
		});
		System.out.println("任務結果:"+future.get());
	}
// 執行結果:
-----main start--------1
當前線程12------當前結果5
異步任務完成了,結果:5
異步任務異常:null
捕獲異步任務異常:java.util.concurrent.CompletionException: java.lang.RuntimeException: whenComplete拋出異常
任務結果:10

exceptionally拋出異常,任務結束

	public static void main(String[] args) throws Exception {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).whenComplete((result,exception)->{
			System.out.println("異步任務完成了,結果:"+result);
			System.out.println("異步任務異常:"+exception);
			throw new RuntimeException("whenComplete拋出異常");
		}).exceptionally((exception)->{
			System.out.println("捕獲異步任務異常:"+exception);
			throw new RuntimeException("任務執行失敗");
		});
		System.out.println("任務結果:"+future.get());
	}
// 執行結果:
-----main start--------1
當前線程12------當前結果5
異步任務完成了,結果:5
異步任務異常:null
捕獲異步任務異常:java.util.concurrent.CompletionException: java.lang.RuntimeException: whenComplete拋出異常
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: 任務執行失敗

whenComplete 和 whenCompleteAsync 區別:

  • whenComplete:使用當前任務的線程繼續執行;
  • whenCompleteAsync:把whenCompleteAsync任務繼續提交給線程池來進行執行;
  • whenComplete:可以獲取異常信息,但不能處理異常;
  • exceptionally:可以獲取異常信息,進行異常處理;

2)、handle方法執行完成后的處理,與complete一樣,可處理異常,也可返回默認值

handle捕獲異常,返回默認值

	public static void main(String[] args) throws Exception {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 0;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).handle((result,exception)->{
			System.out.println("異步任務完成了,結果:"+result);
			System.out.println("異步任務異常:"+exception);
			return 20;
		});
		System.out.println("任務結果:"+future.get());
	}
//執行結果:
-----main start--------1
異步任務完成了,結果:null
異步任務異常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
任務結果:20

handle拋出異常,任務結束

	public static void main(String[] args) throws Exception {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 0;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).handle((result,exception)->{
			System.out.println("異步任務完成了,結果:"+result);
			System.out.println("異步任務異常:"+exception);
			throw new RuntimeException("handle拋出異常");
		});
		System.out.println("任務結果:"+future.get());
	}
//執行結果:
-----main start--------1
異步任務完成了,結果:null
異步任務異常:java.util.concurrent.CompletionException: java.lang.ArithmeticException: / by zero
Exception in thread "main" java.util.concurrent.ExecutionException: java.lang.RuntimeException: handle拋出異常

3)、線程串行化方法

線程串行化thenRun():不能獲取到上一步任務的執行結果,不能捕獲上一步異常,上一步異常任務結束

	public static void main(String[] args) throws Exception {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture.supplyAsync(() -> {
			int a = 10 / 0;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).thenRun(()->{
			System.out.println("任務2啟動了");
		});
		System.out.println("任務結果:");
	}
// 執行結果:
-----main start--------1
任務結果:

線程串行化thenRun():不能獲取到上一步任務的執行結果,無返回值

	public static void main(String[] args)  {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).thenRun(()->{
			System.out.println("任務2啟動了");
		});
		System.out.println("任務結果:");
	}
// 執行結果:
-----main start--------1
當前線程12------當前結果5
任務2啟動了
任務結果:

線程串行化thenRun():不能獲取到上一步任務的執行結果,無返回值,發生異常時不會拋出異常

	public static void main(String[] args)  {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).thenRun(()->{
			System.out.println("任務2啟動了");
			throw new RuntimeException("任務2失敗了");
		});
		System.out.println("任務結果:");
	}
// 執行結果:
-----main start--------1
當前線程12------當前結果5
任務2啟動了
任務結果:

線程串行化thenAccept():可以獲取上一步結果,異常與thenRun()方法一致

	public static void main(String[] args)  {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).thenAccept((result)->{
			System.out.println("任務1執行結果"+result);
			System.out.println("任務2啟動了");
			throw new RuntimeException("任務2失敗了");
		});
		System.out.println("任務結果:");
	}
//執行結果:
-----main start--------1
當前線程12------當前結果5
任務1執行結果5
任務2啟動了
任務結果:

線程串行化thenAccept():可以獲取上一步結果,異常與thenRun()方法一致

	public static void main(String[] args)  {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程"+Thread.currentThread().getId()+"------當前結果" + a);
			return a;
		}).thenAccept((result)->{
			System.out.println("任務1執行結果"+result);
			System.out.println("任務2啟動了");
			throw new RuntimeException("任務2失敗了");
		});
		System.out.println("任務結果:");
	}
//執行結果:
-----main start--------1
當前線程12------當前結果5
任務1執行結果5
任務2啟動了
任務結果:

線程串行化thenApply():可以獲取上一步結果,有返回值

public static void main(String[] args)  {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程" + Thread.currentThread().getId() + "------當前結果" + a);
			return a;
		}).thenApply((result) -> {
			System.out.println("任務1執行結果" + result);
			System.out.println("任務2啟動了");
			return 100;
		});
		Integer result=null;
		try {
			result = future.get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		System.out.println("任務結果:"+result);
	}
//執行結果:
-----main start--------1
當前線程12------當前結果5
任務1執行結果5
任務2啟動了
任務結果:100

線程串行化thenApply():可以獲取上一步結果,有返回值,可拋出異常,上一步發生異常任務結束

	public static void main(String[] args)  {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程" + Thread.currentThread().getId() + "------當前結果" + a);
			return a;
		}).thenApply((result) -> {
			System.out.println("任務1執行結果" + result);
			System.out.println("任務2啟動了");
			throw new RuntimeException("任務2發生異常了");
		});
		Integer result=null;
		try {
			result = future.get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		System.out.println("任務結果:"+result);
	}
//執行結果:
-----main start--------1
當前線程12------當前結果5
任務1執行結果5
任務2啟動了
任務結果:null
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 任務2發生異常了

thenApply 、thenAccept、thenRun 區別:

  • thenApply:當一個線程依賴另一個線程時,獲取上一個任務返回的結果,並返回當前任務的返回值;
  • thenAccept:消費處理結果。接收任務處理的結果,並消息處理,無返回結果;
  • thenRun:只要上面的任務執行完成,就開始執行thenRun,只是處理完任務后,執行thenRun的后續操作;

4)、兩任務組合,都要完成

runAfterBoth()無返回結果:任務1和任務2執行完成,執行當前任務;任務1和任務2任一拋出異常,任務3不執行;任一任務不會拋出異常;

	public static void main(String[] args)  {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程" + Thread.currentThread().getId() + "------任務一結果:" + a);
			return a;
		});

		CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 0;
			System.out.println("當前線程" + Thread.currentThread().getId() + "------任務二結果:" + a);
			return a;
		});

		future1.runAfterBoth(future2,()->{
			int a=1/0;
			System.out.println("任務三開始了"+a);
		});

		System.out.println("任務結果:");
	}
//執行結果:
-----main start--------1
當前線程12------任務一結果:5
任務結果:

runAfterBoth()無返回結果,可得到任務1和任務2結果:任務1和任務2任一拋出異常,任務3不執行;任一任務不會拋出異常;

	public static void main(String[] args)  {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程" + Thread.currentThread().getId() + "------任務一結果:" + a);
			return a;
		});

		CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程" + Thread.currentThread().getId() + "------任務二結果:" + a);
			return a;
		});

		future1.thenAcceptBothAsync(future2,(f1,f2)->{
			System.out.println("任務一結果:"+f1+"---任務二結果:"+f2);
			int a=1/0;
			System.out.println("任務三開始了"+a);
		});

		System.out.println("任務結果:");
	}
-----main start--------1
當前線程12------任務一結果:5
當前線程12------任務二結果:5
任務結果:
任務一結果:5---任務二結果:5

runAfterBoth()有返回結果,可得到任務1和任務2結果:任務1和任務2任一拋出異常,任務3不執行;可捕獲任務3的異常;

	public static void main(String[] args)  {
		System.out.println("-----main start--------"+Thread.currentThread().getId());
		CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程" + Thread.currentThread().getId() + "------任務一結果:" + a);
			return a;
		});

		CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
			int a = 10 / 2;
			System.out.println("當前線程" + Thread.currentThread().getId() + "------任務二結果:" + a);
			return a;
		});

		CompletableFuture<Integer> future = future1.thenCombineAsync(future2, (f1, f2) -> {
			System.out.println("任務一結果:" + f1 + "---任務二結果:" + f2);
			int a = 1 / 0;
			System.out.println("任務三開始了" + a);
			return a;
		});

		Integer integer = null;
		try {
			integer = future.get();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		System.out.println("任務結果:"+integer);
	}
-----main start--------1
當前線程12------任務一結果:5
當前線程12------任務二結果:5
任務一結果:5---任務二結果:5
任務結果:null
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero

thenCombine 、thenAcceptBoth、runAfterBoth 區別:

  • thenCombine:組合兩個future,獲取兩個future的返回結果,並返回當前任務的返回值;
  • thenAcceptBoth:組合兩個future,獲取兩個future任務的返回結果,然后處理任務,沒有返回值;
  • runAfterBoth:組合兩個future,不需要獲取future的結果,只需要兩個future處理完任務后,處理該任務;

5)、兩任務組合,一個完成

applyToEither 、acceptEither、runAfterEither 區別:

  • applyToEither:兩個任務有一個執行完成,獲取它的返回值,處理任務並有新的返回值;
  • acceptEither:兩個任務有一個執行完成,獲取它的返回值,處理任務,沒有新的返回值;
  • runAfterEither:兩個任務有一個執行完成,不需要獲取future的結果,處理任務,也沒有返回值;

6)、多任務組合

allOf 、anyOf 區別:

  • allOf:等待所有任務完成
  • anyOf:只要有一個任務完成


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM