1.線程池源碼分析:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){};
核心線程數量(corePoolSize)、最大線程數量(maximumPoolSize)、超出核心線程數量的存活時間(keepAliveTime)、
超出核心線程數量的存活時間單位(unit)、存放任務隊列(workQueue)、執行程序創建新線程時使用的工廠(threadFactory)、當線程邊界和隊列容量達到時拒絕策略(handler)
正常線程池工作流程
1:當提交的任務小於核心線程池數量的時候,使用線程池中的核心線程。
2:當提交的任務大於線程池中核心線程數量的時候,會將新任務存放到隊列中。
3:當隊列存滿后,會開啟新線程直到達到設置的最大線程池數量。
4:當隊列存滿后,且線程池中的最大線程數量達到最大的時候,這時候在提交過來任務,直接采用線程池設置的拒絕策略。
2.場景分析
由上面可得,如果隊列在沒有存滿的情況下我們的最大線程數量是沒有開啟的,這時候並沒有達到我們想要的多線程的效果。所以我們需要改寫一下邏輯
1:自定義線程池繼承ThreadPoolExecutor類,改寫核心的邏輯。
2:自定義隊列繼承LinkedBlockingQueue,改寫 offer 方法。
自定義隊列方法:
package com.example.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; /** * @Description: 線程池工作隊列重寫 * @Author: mingtian * @CreateDate: 2021/4/9 13:22 * @Version: 1.0 */ public class TaskQueue<Runnable> extends LinkedBlockingQueue<Runnable> { /** * 打印日志 */ private static Logger logger = LoggerFactory.getLogger(TaskQueue.class); /** * 自定義的線程池類,繼承自ThreadPoolExecutor */ private CustomThreadPoolExecutor threadPoolExecutor; public TaskQueue(int capacity) { super(capacity); } /** * 對象賦值 * * @param customThreadPoolExecutor */ public void setExecutor(CustomThreadPoolExecutor customThreadPoolExecutor) { threadPoolExecutor = customThreadPoolExecutor; } /** * offer方法的含義是:將任務提交到隊列中,返回值為true/false,分別代表提交成功/提交失敗。 * 作用:TaskQueue的offer返回值來決定是否創建更多的線程,達到先判斷maximumPoolSize再判斷隊列的目的 * * @param runnable * @return */ @Override public boolean offer(Runnable runnable) { if (threadPoolExecutor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } // 線程池的當前線程數 int currentPoolThreadSize = threadPoolExecutor.getPoolSize(); if (threadPoolExecutor.getSubmittedTaskCount() < currentPoolThreadSize) { // 已提交的任務數量小於當前線程數,意味着線程池中有空閑線程,直接扔進隊列里,讓線程去處理 return super.offer(runnable); } // return false to let executor create new worker. if (currentPoolThreadSize < threadPoolExecutor.getMaximumPoolSize()) { // 重點: 當前線程數小於 最大線程數 ,返回false,暗含入隊失敗,讓線程池去創建新的線程 return false; } // 重點: 代碼運行到此處,說明當前線程數 >= 最大線程數,需要真正的提交到隊列中 return super.offer(runnable); } /** * 重試 在線程池沒有關閉的狀態時 將任務存放到隊列中 * * @param o * @param timeout * @param unit * @return * @throws InterruptedException */ public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException { if (threadPoolExecutor.isShutdown()) { logger.error("threadPoolExecutor is shutdown!!!"); throw new RejectedExecutionException("Executor is shutdown!"); } return super.offer(o, timeout, unit); } }
自定義線程池類:
package com.example.util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; /** * @Description: 自定義線程池 重寫線程池執行順序 * @Author: mingtian * @CreateDate: 2021/4/9 13:21 * @Version: 1.0 */ public class CustomThreadPoolExecutor extends ThreadPoolExecutor { /** * 打印日志 */ private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutor.class); /** * 定義一個成員變量,用於記錄當前線程池中已提交的任務數量 */ private final AtomicInteger submittedTaskCount = new AtomicInteger(0); /** * 自定義線程池 * * @param corePoolSize 核心線程池數量 * @param maximumPoolSize 最大線程池數量 * @param keepAliveTime 超過核心線程池數量存活時間 * @param unit 超過核心線程池數量存活時間單位 * @param workQueue 存放任務的隊列 * @param threadFactory 線程工廠 可以定義線程池名稱 * @param handler 當隊列滿時執行拒絕策略 */ public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, TaskQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); } /*** * 獲取線程池中的任務數量 * @return */ public int getSubmittedTaskCount() { return submittedTaskCount.get(); } /** * 獲取線程池對象 * * @return */ public CustomThreadPoolExecutor getThreadPoolExecutor() { return CustomThreadPoolExecutorUtil.getCustomThreadPoolExecutor(); } /** * 方法執行完畢之后執行 * * @param r * @param t */ @Override protected void afterExecute(Runnable r, Throwable t) { // ThreadPoolExecutor的勾子方法,在task執行完后需要將池中已提交的任務數 - 1 submittedTaskCount.decrementAndGet(); } /** * 重寫execute 方法 * * @param command */ @Override public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // do not increment in method beforeExecute! // 將池中已提交的任務數 + 1 submittedTaskCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { // retry to offer the task into queue. final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { logger.warn("submittedTaskCount:{},maximumPoolSize:{},queueSize:{},completedTaskCount:{}", getSubmittedTaskCount(), getThreadPoolExecutor().getMaximumPoolSize(), getThreadPoolExecutor().getQueue().size(), getThreadPoolExecutor().getCompletedTaskCount()); submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full.", rx); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { // decrease any way submittedTaskCount.decrementAndGet(); throw t; } } }
測試類:
package com.example.util; import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.SneakyThrows; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** * @Description: 自定義線程池隊列 * @Author: mingtian * @CreateDate: 2021/4/9 13:28 * @Version: 1.0 */ public class CustomThreadPoolExecutorUtil { /** * 打印日志 */ private static Logger logger = LoggerFactory.getLogger(CustomThreadPoolExecutorUtil.class); /** * 默認 CPU 核心數 */ private static int threadPoolSize = 0; static { // 獲取服務器 CPU 核心數 threadPoolSize = Runtime.getRuntime().availableProcessors(); logger.info("服務器 CPU 核心數量:{}", threadPoolSize); } public static int getThreadPoolSize() { return threadPoolSize; } /** * 線程工廠,用來創建線程 */ private static ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("port-pool-%d").build(); private static TaskQueue taskQueue = new TaskQueue<>(10); /** * 自定義線程池 */ private static CustomThreadPoolExecutor CustomThreadPoolExecutor = new CustomThreadPoolExecutor(2, 2 * 2, 60L, TimeUnit.SECONDS, taskQueue, threadFactory, new ThreadPoolExecutor.AbortPolicy()); /** * 獲取線程池對象方法 * * @return */ public static CustomThreadPoolExecutor getCustomThreadPoolExecutor() { return CustomThreadPoolExecutor; } /** * 模擬發送消息方法 */ public static class SendMessage implements Runnable { private int i; public SendMessage(int i) { this.i = i; } @SneakyThrows @Override public void run() { logger.info("我是第{}條消息,poolSize:{},queueSize:{},activeCount:{},completedTaskCount:{}", i, CustomThreadPoolExecutor.getPoolSize(), CustomThreadPoolExecutor.getQueue().size(), CustomThreadPoolExecutor.getActiveCount(), CustomThreadPoolExecutor.getCompletedTaskCount()); } } public static void main(String[] args) throws InterruptedException { logger.info("-------------------------------開始測試--------------------------------------"); taskQueue.setExecutor(CustomThreadPoolExecutor); for (int i = 1; i <= 16; i++) { CustomThreadPoolExecutorUtil.SendMessage sendMessage = new CustomThreadPoolExecutorUtil.SendMessage(i); CustomThreadPoolExecutor.execute(sendMessage); } Thread.sleep(10000); CustomThreadPoolExecutor.shutdown(); logger.info("-------------------------------測試結束--------------------------------------"); } }
自定義線程池核心邏輯:
當提交任務到CustomThreadPoolExecutor的時候,執行 submittedTaskCount.incrementAndGet(); 將線程池中數量+1處理,然后調用父類 super.execute(command); 執行。
1 // 代碼運行到此處,說明線程數 >= corePoolSize, 此時workQueue為自定義的TaskQueue 2 if (isRunning(c) && workQueue.offer(command)) { 3 int recheck = ctl.get(); 4 if (! isRunning(recheck) && remove(command)) 5 reject(command); 6 else if (workerCountOf(recheck) == 0) 7 addWorker(null, false); 8 }
自定義隊列核心邏輯:
當執行到 workQueue.offer(command) 方法的時候走的我們自定義隊列TaskQueue的offer方法,而offer方法的返回值決定着是否創建更多的線程:返回true,代表入隊成功,不創建線程;返回false,代表入隊失敗,需要創建線程。
1 public boolean offer(Runnable runnable) { 2 if (threadPoolExecutor == null) { 3 throw new RejectedExecutionException("The task queue does not have executor!"); 4 } 5 // 線程池的當前線程數 6 int currentPoolThreadSize = threadPoolExecutor.getPoolSize(); 7 if (threadPoolExecutor.getSubmittedTaskCount() < currentPoolThreadSize) { 8 // 已提交的任務數量小於當前線程數,意味着線程池中有空閑線程,直接扔進隊列里,讓線程去處理 9 return super.offer(runnable); 10 } 11 12 // return false to let executor create new worker. 13 if (currentPoolThreadSize < threadPoolExecutor.getMaximumPoolSize()) { 14 // 重點: 當前線程數小於 最大線程數 ,返回false,暗含入隊失敗,讓線程池去創建新的線程 15 return false; 16 } 17 // 重點: 代碼運行到此處,說明當前線程數 >= 最大線程數,需要真正的提交到隊列中 18 return super.offer(runnable); 19 }
核心邏輯:當前線程數小於最大線程數就返回false,代表入隊失敗,需要創建線程。
因此,總結起來就是:自定義的CustomThreadPoolExecutor依賴自定義的TaskQueue的offer返回值來決定是否創建更多的線程,達到先判斷maximumPoolSize再判斷隊列的目的。
3.參考文獻
tomcat 源碼中的線程池也是使用的這樣的思想,該例子來源於tomcat源碼思想。
tomcat 線程池 源碼:
private final AtomicInteger submittedCount = new AtomicInteger(0); public void execute(Runnable command, long timeout, TimeUnit unit) { submittedCount.incrementAndGet(); try { super.execute(command); } catch (RejectedExecutionException rx) { if (super.getQueue() instanceof TaskQueue) { final TaskQueue queue = (TaskQueue)super.getQueue(); try { if (!queue.force(command, timeout, unit)) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull")); } } catch (InterruptedException x) { submittedCount.decrementAndGet(); throw new RejectedExecutionException(x); } } else { submittedCount.decrementAndGet(); throw rx; } } }
tomcat 源碼中 TaskQueue 源碼:
public boolean offer(Runnable o) { //we can't do any checks if (parent==null) return super.offer(o); //we are maxed out on threads, simply queue the object if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o); //we have idle threads, just add it to the queue if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o); //if we have less threads than maximum force creation of a new thread if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false; //if we reached here, we need to add it to the queue return super.offer(o); }