import java.util.concurrent.*; /** * 線程池工具類 */ public class ThreadPoolUtils { private volatile static ThreadPoolExecutor threadPool; public static final int CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() + 1; public static final int MAX_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2; public static final int KEEP_ALIVE_TIME = 1000; public static final int BLOCK_QUEUE_SIZE = 1000; public static void executor(Runnable runnable) { getThreadPoolExecutor().execute(runnable); } public static <T> Future<T> submit(Callable<T> callable) { return getThreadPoolExecutor().submit(callable); } /** * 獲取線程池對象 * * @return */ public static ThreadPoolExecutor getThreadPoolExecutor() { if (threadPool != null) { return threadPool; } else { synchronized (ThreadPoolUtils.class) { if (threadPool == null) { threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(BLOCK_QUEUE_SIZE), new ThreadPoolExecutor.CallerRunsPolicy()); } } return threadPool; } } }
線程池參數:
public ThreadPoolExecutor(int corePoolSize, 核心線程數量
int maximumPoolSize, 最大線程數量/業余線程數量+核心線程數量
long keepAliveTime, 業余線程保持時長
TimeUnit unit, 時間單位
BlockingQueue<Runnable> workQueue, 任務隊列
ThreadFactory threadFactory, 線程工廠
RejectedExecutionHandler handler) 拒絕策略
線程池執行邏輯
1. 線程池開始執行 execute 方法 如上: 有四種情況
1)當前線程數量少於核心線程數 執行 addWork方法 執行new Worker .start 繼而執行 runWorker方法 顯性調用 task.run(); 隨后通過 processWorkerExit 回歸 addWorker 方法
2)如果不小於,那么就在workQueue中存取任務 (workQueue中任務通過getTack()方法拿出)
3)添加隊列失敗,再次執行addWork(xx,false) 判斷條件就是線程數量小於最大線程數量
4)再次失敗的話,就執行拒絕策略
線程池拒絕策略
new ThreadPoolExecutor.AbortPolicy()
public class Test { public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory() ,new ThreadPoolExecutor.AbortPolicy()
); for (int i = 0; i < Integer.MAX_VALUE;i++) { int finalI = i; executorService.execute(() -> { System.out.println("測試數據-execute = " + finalI); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); } }
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.example.threadPool.Test$$Lambda$1/245565335@ee7d9f1 rejected from java.util.concurrent.ThreadPoolExecutor@15615099[Running, pool size = 5, active threads = 5, queued tasks = 0, completed tasks = 0]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at com.example.threadPool.Test.main(Test.java:15)
測試數據-execute = 0---pool-1-thread-1
測試數據-execute = 1---pool-1-thread-2
測試數據-execute = 2---pool-1-thread-3
測試數據-execute = 3---pool-1-thread-4
測試數據-execute = 4---pool-1-thread-5
線程池執行任務,遇到多余任務直接拒絕 拋出異常
new ThreadPoolExecutor.CallerRunsPolicy()
public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory() , new ThreadPoolExecutor.CallerRunsPolicy() ); for (int i = 0; i < 500; i++) { int finalI = i; executorService.execute(() -> { System.out.println("測試數據-execute = " + finalI + "---"+Thread.currentThread().getName()); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); }
測試數據-execute = 5---main
測試數據-execute = 6---main
測試數據-execute = 7---main
測試數據-execute = 8---main
測試數據-execute = 0---pool-1-thread-1
測試數據-execute = 9---main
測試數據-execute = 1---pool-1-thread-2
測試數據-execute = 2---pool-1-thread-3
測試數據-execute = 3---pool-1-thread-4
測試數據-execute = 4---pool-1-thread-5
測試數據-execute = 10---main
測試數據-execute = 12---pool-1-thread-5
測試數據-execute = 15---pool-1-thread-2
測試數據-execute = 14---pool-1-thread-1
........
說明主線程也執行任務,不進行異常拋出
new ThreadPoolExecutor.DiscardOldestPolicy()
public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardOldestPolicy() ); for (int i = 0; i < 500; i++) { int finalI = i; executorService.execute(() -> { System.out.println("測試數據-execute = " + finalI + "---"+Thread.currentThread().getName()); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); }
測試數據-execute = 0---pool-1-thread-1
測試數據-execute = 1---pool-1-thread-2
測試數據-execute = 2---pool-1-thread-3
測試數據-execute = 3---pool-1-thread-4
測試數據-execute = 4---pool-1-thread-5
Exception in thread "main" java.lang.StackOverflowError
添加線程,添加隊列,不斷重試
new ThreadPoolExecutor.DiscardPolicy()
public static void main(String[] args) throws Exception { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new SynchronousQueue<>(), Executors.defaultThreadFactory() , new ThreadPoolExecutor.DiscardPolicy() ); for (int i = 0; i < 500; i++) { int finalI = i; executorService.execute(() -> { System.out.println("測試數據-execute = " + finalI + "---"+Thread.currentThread().getName()); try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } }); } executorService.shutdown(); }
測試數據-execute = 0---pool-1-thread-1
測試數據-execute = 1---pool-1-thread-2
測試數據-execute = 2---pool-1-thread-3
測試數據-execute = 3---pool-1-thread-4
測試數據-execute = 4---pool-1-thread-5
Process finished with exit code 0
添加失敗,放棄任務,不會拋出異常