池化是我們在實際生產中經常用到的一種思想,通過一個 “池” 把資源統一的管理起來。可以達到對資源的合理管理、重復利用、減少資源創建/銷毀的開銷等目的。
常見的比如常量池、連接池、線程池,今天我們手擼一個線程池。
拋開語言特性,線程池無非是維護一堆線程阻塞等待任務的到來,並由主線程對任務線程的數量進行動態控制的組件。做到線程資源的復用及統一管理,同時避免大量的線程創建銷毀的開銷,並控制總的線程數量保證系統安全。
功能要點
1. 維護一個線程集合,執行外部提交的任務。
2. 管理線程的集合,沒有任務時維護一定數量的線程等待任務的到來。在運行過程中動態的根據任務的多少創建或銷毀池中的線程,合理的分配資源。
3. 提交成功的任務由線程池保證一定會被執行,同時對於提交失敗的任務需要向調用方進行反饋,比如拋出異常由調用方進行處理。
4. 對外提供安全的關閉方法,在保證任務隊列中的任務都會被處理的前提下可以正確的回收池內所有的線程。
實現思路要點
1. 采用阻塞隊列存儲任務,在沒有任務時任務線程阻塞在隊列的 getTask 方法,並在 putTask 時喚醒等待在該隊列的任務線程。
2. 存儲隊列對外提供多種拒絕策略,由調用方在創建線程池時選擇。調用方完全掌握任務被拒絕時的線程池的處理方式,從而做出正確的處理。
3. 任務線程有自己的關閉標識,同時應正確的響應 interrupt 信號,以便在阻塞在任務隊列上時可以被正確的關閉。
4. 主線程輪詢的檢查任務數與活動線程數,動態的增加或減少活動線程,增加或減少線程數的算法會極大的影響整個線程池的性能,應根據任務的提交情況合理設計。
5. 在使用同步鎖的情況下,阻塞隊列與主線程的同步鎖應合理設計。因為同步鎖為持有等待且不可搶占的,在我們不能完全掌握每個線程獲取鎖的順序時,容易發生死鎖的情況。
本次實現的不足
1. 鎖的粒度不夠細,為了避免死鎖,主線程與阻塞隊列共用一把同步鎖,影響性能。
2. 控制線程數的方式不夠細致:任務數超過活動線程數的兩倍時開啟所有線程;無任務時才會縮減線程。
3. 阻塞隊列由自己實現,簡單粗暴的使用 synchronized 控制互斥關系,效率不夠高。
4. 還可以在一些信號量上使用 CAS 或直接使用非阻塞隊列提升性能。
5. 沒有定義對任務在執行過程中發生異常的處理。實際情況下應該記錄沒有執行成功的任務並記錄下來,比如序列化任務對象記錄到數據庫,以便進行人工補償。但因為本次實現任務對象直接使用 Runnable 的簡單實現類,序列化沒有意義,所以沒有考慮這點。
測試結果
先上測試代碼:
public static void main(String[] args) {
BasicThreadPool threadPool = new BasicThreadPool(); for (int i = 0; i <= 100; i++) { final int num = i; threadPool.excute( () -> { System.out.println(Thread.currentThread().getName() + " : i am running to deal with task of " + num); try { Thread.sleep(1000); } catch (InterruptedException e) { return; } } ); } }
任務 sleep 一段時間是防止任務處理速度過快,無法測試動態增加或減少線程數的功能。測試結果:
首先在提交任務時,我設置了初始線程數為 1 ,由於任務會 sleep 一段時間,造成了大量任務積壓在了緩沖區,因此線程池馬力全開,將線程數增加到了最大(add部分):
積壓任務處理完后,線程池縮減線程數(remove部分),只留下了核心線程,我設置的核心線程數為 3 ,為 0,1,2 號線程:
然后我們改一下測試代碼,增加關閉線程池的操作:
public static void main(String[] args) { BasicThreadPool threadPool = new BasicThreadPool(); for (int i = 0; i <= 100; i++) { final int num = i; threadPool.excute( () -> { System.out.println(Thread.currentThread().getName() + " : i am running to deal with task of " + num); try { Thread.sleep(1000); } catch (InterruptedException e) { return; } } ); if (i == 90) { threadPool.shutdown(); } } }
在第 90 個任務提交時關閉線程池,之后的任務線程池拒絕接收,將拋出異常給調用者:
線程池處於關閉狀態時,依然會保證已經在隊列中的任務會被執行完畢:
任務隊列中擠壓的任務全部處理完畢后,會終止所有任務線程,包括阻塞狀態的線程:
剩下的全在代碼里了。
拒絕策略:
package theadPool; /** * @Author Nxy * @Date 2020/3/14 14:26 * @Description 拒絕策略 */ public interface DenyPolicy { public void reject(Runnable task, ThreadPool pool); /** * @Author Nxy * @Date 2020/3/14 14:27 * @Description 任務隊列溢出異常 */ class OutOfRunnableQueueException extends RuntimeException { OutOfRunnableQueueException(String msg) { super(msg); } } /** * @Author Nxy * @Date 2020/3/14 14:30 * @Description 直接丟棄任務 */ class DiscardDenyPolicy implements DenyPolicy { @Override public void reject(Runnable task, ThreadPool pool) { //do nothing } } /** * @Author Nxy * @Date 2020/3/14 14:42 * @Description 拋出 任務隊列溢出異常 */ class throwExceptionDenyPolicy implements DenyPolicy { @Override public void reject(Runnable task, ThreadPool pool) { throw new OutOfRunnableQueueException("task queue is full!"); } } /** * @Author Nxy * @Date 2020/3/14 14:42 * @Description 由提交線程直接執行任務 */ class RunnerDenyPolicy implements DenyPolicy { @Override public void reject(Runnable task, ThreadPool pool) { task.run(); } } }
任務隊列:
package theadPool; /** * @Author Nxy * @Date 2020/3/14 14:23 * @Description 任務隊列 */ public interface TaskQueue { //新任務追加到隊列結尾 void putTask(Runnable runnable); //獲取任務,該方法是阻塞的,應當向上拋出 InterruptException 使調用方做出阻塞期間對 interrupt 信號的響應 Runnable getTask() throws InterruptedException; //獲取當前任務數 int getSize(); }
package theadPool; import java.util.LinkedList; public class LinkedTaskQueue implements TaskQueue { //隊列最大長度 private final int maxSize; //任務達到最大數后的拒絕策略 private final DenyPolicy denyPolicy; //任務隊列,鏈表實現 private final LinkedList<Runnable> queue = new LinkedList<Runnable>(); private final ThreadPool threadPool; LinkedTaskQueue(ThreadPool threadPool, int maxSize, DenyPolicy denyPolicy) { this.maxSize = maxSize; this.denyPolicy = denyPolicy; this.threadPool = threadPool; } /** * @Author Nxy * @Date 2020/3/14 21:07 * @Description 以下互斥區域使用 threadPool 對象鎖,因為在 threadPool 中存在對這些方法的調用, * 在調用情況復雜,不好判斷調用次序的情況下(並且存在持有等待、不可搶占的特性)用兩把鎖容易造成死鎖 */ @Override public void putTask(Runnable runnable) { synchronized (threadPool) { //超出最大任務數或者線程池已關閉,采取拒絕策略 if (getSize() >= maxSize || threadPool.isShutDown()) { denyPolicy.reject(runnable, threadPool); return; } //任務追加到任務隊列結尾 queue.addLast(runnable); //喚醒等待在任務隊列的工作線程 threadPool.notify(); } } @Override public Runnable getTask() throws InterruptedException { Runnable returnRunnable; synchronized (threadPool) { if (queue.isEmpty()) { System.out.println(Thread.currentThread().getName() + " 等待在緩沖區"); threadPool.wait(); } //先進先出隊列 returnRunnable = queue.removeFirst(); } return returnRunnable; } @Override public int getSize() { synchronized (queue) { return queue.size(); } } }
任務線程:
package theadPool; /** * @Author Nxy * @Date 2020/3/14 14:47 * @Description 線程池中的線程 */ public class PoolTask extends Thread implements Runnable { //任務隊列 private final TaskQueue queue; //當前線程運行標志位 private volatile boolean isRunning = true; //傳入隊列 public PoolTask(TaskQueue queue) { this.queue = queue; } /** * @Author Nxy * @Date 2020/3/14 14:52 * @Description 除通過 isRunning 標志位可關閉該線程外,interrupt 信號也可關閉該線程 */ @Override public void run() { while (isRunning && !(this.isInterrupted())) { try { //獲取任務並執行 Runnable runnable = queue.getTask(); runnable.run(); } catch (InterruptedException e0) { System.out.println(Thread.currentThread().getName() + " 接收到 interrupt 信號,終止執行"); return; } catch (Exception e1) { // e1.printStackTrace(); return; } } System.out.println(Thread.currentThread().getName() + " 終止執行"); } //安全的關閉該線程 public void shutdown() { this.isRunning = false; this.interrupt(); } }
主類:
package theadPool; import java.util.LinkedList; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * @Author Nxy * @Date 2020/3/14 16:55 * @Description 線程池 */ public class BasicThreadPool extends Thread implements ThreadPool { //初始線程數 private final int initSize; //最大線程數 private final int maxSize; //最大任務數 private final int maxTaskSzie; //核心線程數 private final int coreSize; //當前活躍線程數 private int activeCount; //任務隊列 private final TaskQueue taskQueue; //關閉控制位 private volatile boolean isShutDown = false; //主線程掃描線程池狀態時間間隔 private final long keepAliveTime; //睡眠工具類 private final TimeUnit timeUtil; //線程隊列 private final LinkedList<TaskThread> taskPool = new LinkedList<TaskThread>(); //任務拒絕策略 private final DenyPolicy denyPolicy; //默認參數 private static final int defaultInitSize = 1; private static final int defaultMaxSize = 8; private static final int defaultCoreSize = 3; private static final DenyPolicy defaultDenyPolicy = new DenyPolicy.throwExceptionDenyPolicy(); private static final int defaultMaxTaskSize = 1000; private static final TimeUnit defaultTimeUtil = TimeUnit.MILLISECONDS; private static final int defaultKeepAlive = 1; //默認參數構造 public BasicThreadPool() { this(defaultInitSize, defaultMaxSize, defaultCoreSize, defaultKeepAlive, defaultDenyPolicy, defaultMaxTaskSize, defaultTimeUtil); } //自定義拒絕策略構造 public BasicThreadPool(DenyPolicy denyPolicy) { this(defaultInitSize, defaultMaxSize, defaultCoreSize, defaultKeepAlive, denyPolicy, defaultMaxTaskSize, defaultTimeUtil); } //自定義參數構造 public BasicThreadPool(int initSize, int maxSize, int coreSize, long keepAliveTime, DenyPolicy denyPolicy, int maxTaskSzie, TimeUnit timeUtil) { this.initSize = initSize; this.maxSize = maxSize; this.coreSize = coreSize; this.denyPolicy = denyPolicy; this.keepAliveTime = keepAliveTime; this.maxTaskSzie = maxTaskSzie; this.timeUtil = timeUtil; taskQueue = new LinkedTaskQueue(this, maxTaskSzie, denyPolicy); this.init(); } /** * @Author Nxy * @Date 2020/3/14 16:03 * @Description 靜態線程工廠 */ static class ThreadFactory { private static final AtomicInteger groupCounter = new AtomicInteger(1); private static final AtomicInteger counter = new AtomicInteger(0); private static final ThreadGroup group = new ThreadGroup("BasicThreadPool group : " + groupCounter.getAndIncrement()); public static Thread createThread(Runnable runnable) { return new Thread(group, runnable, "BasicThreadPool group " + groupCounter.get() + " : " + counter.getAndIncrement()); } } /** * @Author Nxy * @Date 2020/3/14 15:48 * @Description 線程池內線程,poolTask 與 thread 的結合,poolTask 攜帶run,thread 攜帶池中參數 */ class TaskThread { final PoolTask poolTask; final Thread thread; TaskThread(PoolTask poolTask, Thread thread) { this.thread = thread; this.poolTask = poolTask; } public void shutdown() { poolTask.shutdown(); /** * @Author Nxy * @Date 2020/3/14 22:04 * @Description 很重要,僅僅給 poolTask 發送interrupt,所在線程並不會收到信號 * 線程與 Thread 或 runnable 對象是綁定的,但我們用 runnable 提交任務到 Thread 時,線程綁定的 * 是 Thread 對象,因此要通過 Thread 對象發送 interrupt 信號 */ thread.interrupt(); } } /** * @Author Nxy * @Date 2020/3/14 16:17 * @Description 新增一個活動線程 */ private void addThread() { PoolTask poolTask = new PoolTask(taskQueue); Thread thread = BasicThreadPool.ThreadFactory.createThread(poolTask); TaskThread taskThread = new TaskThread(poolTask, thread); synchronized (this) { activeCount++; taskPool.addLast(taskThread); } thread.start(); } /** * @Author Nxy * @Date 2020/3/14 16:35 * @Description 關閉一個活動線程 */ private void removeThread() { synchronized (this) { TaskThread taskThread = taskPool.removeLast(); taskThread.shutdown(); activeCount--; } } /** * @Author Nxy * @Date 2020/3/14 16:22 * @Description 不斷檢查線程數,動態調整活動線程數量 */ @Override public void run() { while (!this.isShutDown && !Thread.currentThread().isInterrupted()) { try { timeUtil.sleep(10); } catch (InterruptedException e) { //休眠時間響應 interrupt 信號 this.isShutDown = true; break; } //有任務且線程數小於核心線程數 synchronized (this) { if (this.isShutDown) { //DCL 檢查 break; } //任務數超過當前線程數的兩倍,線程池馬力全開 if (taskQueue.getSize() >= activeCount * 2 && activeCount < maxSize) { int beforeActive = activeCount; for (int i = activeCount; i < maxSize; i++) { addThread(); } System.out.println("add : actice->" + beforeActive + " ---> " + activeCount); continue; } //任務數不為0且活動線程數小於核心線程數,新增線程數過核心數線 if (taskQueue.getSize() > 0 && activeCount < coreSize) { for (int i = activeCount; i < coreSize; i++) { addThread(); } System.out.println("add : actice->" + activeCount); continue; } //無任務且線程數大於核心線程數,關閉線程,僅留下核心線程數的線程 if (taskQueue.getSize() == 0 && activeCount > coreSize) { for (int i = coreSize; i < activeCount; i++) { removeThread(); System.out.println("remove : actice->" + activeCount); } } } } System.out.println("******線程池主線程關閉******"); } private void init() { this.start(); for (int i = 0; i < initSize; i++) { addThread(); } } /** * @Author Nxy * @Date 2020/3/14 16:53 * @Description 向線程池中提交新任務 */ @Override public void excute(Runnable runnable) { if (this.isShutDown || Thread.currentThread().isInterrupted()) { throw new RuntimeException("threadPool is closed!"); } taskQueue.putTask(runnable); } /** * @Author Nxy * @Date 2020/3/14 16:53 * @Description 關閉線程池 */ @Override public void shutdown() { synchronized (this) { this.isShutDown = true; } System.out.println("正在等待處理任務隊列剩余任務"); while (taskQueue.getSize() != 0) { //等待任務隊列中的任務全部被處理完 } System.out.println("開始關閉線程池中線程"); taskPool.forEach(threadTask -> { threadTask.shutdown(); } ); this.interrupt(); System.out.println("******線程池中所有任務線程已關閉******"); } @Override public int getInitSize() { if (this.isShutDown || Thread.currentThread().isInterrupted()) { throw new RuntimeException("threadPool is closed!"); } return this.initSize; } @Override public int getMaxSize() { if (this.isShutDown || Thread.currentThread().isInterrupted()) { throw new RuntimeException("threadPool is closed!"); } return this.maxSize; } @Override public int getQueSize() { if (this.isShutDown || Thread.currentThread().isInterrupted()) { throw new RuntimeException("threadPool is closed!"); } return this.taskQueue.getSize(); } @Override public int getActiveCount() { if (this.isShutDown || Thread.currentThread().isInterrupted()) { throw new RuntimeException("threadPool is closed!"); } synchronized (this) { return this.activeCount; } } @Override public boolean isShutDown() { if (this.isShutDown || Thread.currentThread().isInterrupted()) { throw new RuntimeException("threadPool is closed!"); } synchronized (this) { return this.isShutDown; } } }