死磕 java線程系列之自己動手寫一個線程池


 


問題

(1)自己動手寫一個線程池需要考慮哪些因素?

(2)自己動手寫的線程池如何測試?

簡介

線程池是Java並發編程中經常使用到的技術,那么自己如何動手寫一個線程池呢?本文彤哥將手把手帶你寫一個可用的線程池。

屬性分析

線程池,顧名思義它首先是一個“池”,這個池里面放的是線程,線程是用來執行任務的。

首先,線程池中的線程應該是有類別的,有的是核心線程,有的是非核心線程,所以我們需要兩個變量標識核心線程數量coreSize和最大線程數量maxSize。

為什么要區分是否為核心線程呢?這是為了控制系統中線程的數量。

當線程池中線程數未達到核心線程數coreSize時,來一個任務加一個線程是可以的,也可以提高任務執行的效率。

當線程池中線程數達到核心線程數后,得控制一下線程的數量,來任務了先進隊列,如果任務執行足夠快,這些核心線程很快就能把隊列中的任務執行完畢,完全沒有新增線程的必要。

當隊列中任務也滿了,這時候光靠核心線程就無法及時處理任務了,所以這時候就需要增加新的線程了,但是線程也不能無限制地增加,所以需要控制其最大線程數量maxSize。

其次,我們需要一個任務隊列來存放任務,這個隊列必須是線程安全的,我們一般使用BlockingQueue阻塞隊列來充當,當然使用ConcurrentLinkedQueue也是可以的(注意ConcurrentLinkedQueue不是阻塞隊列,不能運用在jdk的線程池中)。

最后,當任務越來越多而線程處理卻不及時,遲早會達到一種狀態,隊列滿了,線程數也達到最大線程數了,這時候怎么辦呢?這時候就需要走拒絕策略了,也就是這些無法及時處理的任務怎么辦的一種策略,常用的策略有丟棄當前任務、丟棄最老的任務、調用者自己處理、拋出異常等。

根據上面的描述,我們定義一個線程池一共需要這么四個變量:核心線程數coreSize、最大線程數maxSize、阻塞隊列BlockingQueue、拒絕策略RejectPolicy。

另外,為了便於給線程池一個名稱,我們再加一個變量:線程池的名稱name。

所以我們得出了線程池的屬性及構造方法大概如下:

public class MyThreadPoolExecutor implements Executor { /** * 線程池的名稱 */ private String name; /** * 核心線程數 */ private int coreSize; /** * 最大線程數 */ private int maxSize; /** * 任務隊列 */ private BlockingQueue<Runnable> taskQueue; /** * 拒絕策略 */ private RejectPolicy rejectPolicy; public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) { this.name = name; this.coreSize = coreSize; this.maxSize = maxSize; this.taskQueue = taskQueue; this.rejectPolicy = rejectPolicy; } } 

任務流向分析

根據上面的屬性分析,基本上我們已經得到了任務流向的完整邏輯:

首先,如果運行的線程數小於核心線程數,直接創建一個新的核心線程來運行新的任務。

其次,如果運行的線程數達到了核心線程數,則把新任務入隊列。

然后,如果隊列也滿了,則創建新的非核心線程來運行新的任務。

最后,如果非核心線程數也達到最大了,那就執行拒絕策略。

mythreadpool

代碼邏輯大致如下:

    @Override public void execute(Runnable task) { // 正在運行的線程數 int count = runningCount.get(); // 如果正在運行的線程數小於核心線程數,直接加一個線程 if (count < coreSize) { // 注意,這里不一定添加成功,addWorker()方法里面還要判斷一次是不是確實小 if (addWorker(task, true)) { return; } // 如果添加核心線程失敗,進入下面的邏輯 } // 如果達到了核心線程數,先嘗試讓任務入隊 // 這里之所以使用offer(),是因為如果隊列滿了offer()會立即返回false if (taskQueue.offer(task)) { // do nothing,為了邏輯清晰這里留個空if // 【本篇文章由公眾號“彤哥讀源碼”原創】 } else { // 如果入隊失敗,說明隊列滿了,那就添加一個非核心線程 if (!addWorker(task, false)) { // 如果添加非核心線程失敗了,那就執行拒絕策略 rejectPolicy.reject(task, this); } } } 

創建線程邏輯分析

首先,創建線程的依據是正在運行的線程數量有沒有達到核心線程數或者最大線程數,所以我們還需要一個變量runningCount用來記錄正在運行的線程數。

其次,這個變量runningCount需要在並發環境下加加減減,所以這里需要使用到Unsafe的CAS指令來控制其值的修改,用了CAS就要給這個變量加上volatile修飾,為了方便我們這里直接使用AtomicInteger來作為這個變量的類型。

然后,因為是並發環境中,所以需要判斷runningCount < coreSize(或maxSize)(條件一)的同時修改runningCount CAS加一(條件二)成功了才表示可以增加一個線程,如果條件一失敗則表示不能再增加線程了直接返回false,如果條件二失敗則表示其它線程先修改了runningCount的值,則重試。

最后,創建一個線程並運行新任務,且不斷從隊列中拿任務來運行【本篇文章由公眾號“彤哥讀源碼”原創】。

mythreadpool

代碼邏輯如下:

    private boolean addWorker(Runnable newTask, boolean core) { // 自旋判斷是不是真的可以創建一個線程 for (; ; ) { // 正在運行的線程數 int count = runningCount.get(); // 核心線程還是非核心線程 int max = core ? coreSize : maxSize; // 不滿足創建線程的條件,直接返回false if (count >= max) { return false; } // 修改runningCount成功,可以創建線程 if (runningCount.compareAndSet(count, count + 1)) { // 線程的名字 String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet(); // 創建線程並啟動 new Thread(() -> { System.out.println("thread name: " + Thread.currentThread().getName()); // 運行的任務 Runnable task = newTask; // 不斷從任務隊列中取任務執行,如果取出來的任務為null,則跳出循環,線程也就結束了 while (task != null || (task = getTask()) != null) { try { // 執行任務 task.run(); } finally { // 任務執行完成,置為空 task = null; } } }, threadName).start(); break; } } return true; } 

取任務邏輯分析

從隊列中取任務應該使用take()方法,這個方法會一直阻塞直至取到任務或者中斷,如果中斷了就返回null,這樣當前線程也就可以安靜地結束了,另外還要注意中斷了記得把runningCount減一。

    private Runnable getTask() { try { // take()方法會一直阻塞直到取到任務為止 return taskQueue.take(); } catch (InterruptedException e) { // 線程中斷了,返回null可以結束當前線程 // 當前線程都要結束了,理應要把runningCount的數量減一 runningCount.decrementAndGet(); return null; } } 

好了,到這里我們自己的線程池就寫完了,下面我們一起來想想怎么測試呢?

測試邏輯分析

我們再來回顧下自己的寫的線程池的構造方法:

    public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) { this.name = name; this.coreSize = coreSize; this.maxSize = maxSize; this.taskQueue = taskQueue; this.rejectPolicy = rejectPolicy; } 

name,這個隨便傳;

coreSize,我們假設為5;

maxSize,我們假設為10;

taskQueue,任務隊列,既然我們設置的是有邊界的,我們就用最簡單的ArrayBlockingQueue好吧,容量設置為15,這樣里面最多可以存儲15條任務;

rejectPolicy,拒絕策略,我們假設使用丟棄當前任務的策略,OK,我們來實現一個。

/** * 丟棄當前任務 */ public class DiscardRejectPolicy implements RejectPolicy { @Override public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) { // do nothing System.out.println("discard one task"); } } 

OK,這樣一個線程池就創建完成了,下面就是執行任務了,我們假設通過for循環連續不斷地添加100個任務好不好。

public class MyThreadPoolExecutorTest { public static void main(String[] args) { Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy()); AtomicInteger num = new AtomicInteger(0); for (int i = 0; i < 100; i++) { threadPool.execute(()->{ try { Thread.sleep(1000); System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet()); } catch (InterruptedException e) { e.printStackTrace(); } }); } } } 

我們分析下這段程序:

(1)先連續創建了5個核心線程,並執行了新任務;

(2)后面的15個任務進了隊列;

(3)隊列滿了,又連續創建了5個線程,並執行了新任務;

(4)后面的任務就沒得執行了,全部走了丟棄策略;

(5)所以真正執行成功的任務應該是 5 + 15 + 5 = 25 條任務;

運行之:

thread name: core_test2 thread name: core_test5 thread name: core_test3 thread name: core_test4 thread name: core_test1 thread name: test6 thread name: test7 thread name: test8 thread name: test9 discard one task thread name: test10 discard one task ...省略被拒絕的任務 【本篇文章由公眾號“彤哥讀源碼”原創】 discard one task running: 1570546871851: 2 running: 1570546871851: 8 running: 1570546871851: 7 running: 1570546871851: 6 running: 1570546871851: 5 running: 1570546871851: 3 running: 1570546871851: 4 running: 1570546871851: 1 running: 1570546871851: 10 running: 1570546871851: 9 running: 1570546872852: 14 running: 1570546872852: 20 running: 1570546872852: 19 running: 1570546872852: 17 running: 1570546872852: 18 running: 1570546872852: 16 running: 1570546872852: 15 running: 1570546872852: 12 running: 1570546872852: 13 running: 1570546872852: 11 running: 1570546873852: 21 running: 1570546873852: 24 running: 1570546873852: 23 running: 1570546873852: 25 running: 1570546873852: 22 

可以看到,創建了5個核心線程、5個非核心線程,成功執行了25條任務,完成沒問題,完美^^。

總結

(1)自己動手寫一個線程池需要考慮的因素主要有:核心線程數、最大線程數、任務隊列、拒絕策略。

(2)創建線程的時候要時刻警惕並發的陷阱;

彩蛋

我們知道,jdk自帶的線程池還有兩個參數:keepAliveTime、unit,它們是干什么的呢?

答:它們是用來控制何時銷毀非核心線程的,當然也可以銷毀核心線程,具體的分析請期待下一章吧。

完整源碼

Executor接口

public interface Executor { void execute(Runnable command); } 

MyThreadPoolExecutor線程池實現類

/** * 自動動手寫一個線程池 */ public class MyThreadPoolExecutor implements Executor { /** * 線程池的名稱 */ private String name; /** * 線程序列號 */ private AtomicInteger sequence = new AtomicInteger(0); /** * 核心線程數 */ private int coreSize; /** * 最大線程數 */ private int maxSize; /** * 任務隊列 */ private BlockingQueue<Runnable> taskQueue; /** * 拒絕策略 */ private RejectPolicy rejectPolicy; /** * 當前正在運行的線程數【本篇文章由公眾號“彤哥讀源碼”原創】 * 需要修改時線程間立即感知,所以使用AtomicInteger * 或者也可以使用volatile並結合Unsafe做CAS操作(參考Unsafe篇章講解) */ private AtomicInteger runningCount = new AtomicInteger(0); public MyThreadPoolExecutor(String name, int coreSize, int maxSize, BlockingQueue<Runnable> taskQueue, RejectPolicy rejectPolicy) { this.name = name; this.coreSize = coreSize; this.maxSize = maxSize; this.taskQueue = taskQueue; this.rejectPolicy = rejectPolicy; } @Override public void execute(Runnable task) { // 正在運行的線程數 int count = runningCount.get(); // 如果正在運行的線程數小於核心線程數,直接加一個線程 if (count < coreSize) { // 注意,這里不一定添加成功,addWorker()方法里面還要判斷一次是不是確實小 if (addWorker(task, true)) { return; } // 如果添加核心線程失敗,進入下面的邏輯 } // 如果達到了核心線程數,先嘗試讓任務入隊 // 這里之所以使用offer(),是因為如果隊列滿了offer()會立即返回false if (taskQueue.offer(task)) { // do nothing,為了邏輯清晰這里留個空if } else { // 如果入隊失敗,說明隊列滿了,那就添加一個非核心線程 if (!addWorker(task, false)) { // 如果添加非核心線程失敗了,那就執行拒絕策略 rejectPolicy.reject(task, this); } } } private boolean addWorker(Runnable newTask, boolean core) { // 自旋判斷是不是真的可以創建一個線程 for (; ; ) { // 正在運行的線程數 int count = runningCount.get(); // 核心線程還是非核心線程 int max = core ? coreSize : maxSize; // 不滿足創建線程的條件,直接返回false if (count >= max) { return false; } // 修改runningCount成功,可以創建線程 if (runningCount.compareAndSet(count, count + 1)) { // 線程的名字 String threadName = (core ? "core_" : "") + name + sequence.incrementAndGet(); // 創建線程並啟動 new Thread(() -> { System.out.println("thread name: " + Thread.currentThread().getName()); // 運行的任務【本篇文章由公眾號“彤哥讀源碼”原創】 Runnable task = newTask; // 不斷從任務隊列中取任務執行,如果取出來的任務為null,則跳出循環,線程也就結束了 while (task != null || (task = getTask()) != null) { try { // 執行任務 task.run(); } finally { // 任務執行完成,置為空 task = null; } } }, threadName).start(); break; } } return true; } private Runnable getTask() { try { // take()方法會一直阻塞直到取到任務為止 return taskQueue.take(); } catch (InterruptedException e) { // 線程中斷了,返回null可以結束當前線程 // 當前線程都要結束了,理應要把runningCount的數量減一 runningCount.decrementAndGet(); return null; } } } 

RejectPolicy拒絕策略接口

public interface RejectPolicy { void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor); } 

DiscardRejectPolicy丟棄策略實現類

/** * 丟棄當前任務 */ public class DiscardRejectPolicy implements RejectPolicy { @Override public void reject(Runnable task, MyThreadPoolExecutor myThreadPoolExecutor) { // do nothing System.out.println("discard one task"); } } 

測試類

public class MyThreadPoolExecutorTest { public static void main(String[] args) { Executor threadPool = new MyThreadPoolExecutor("test", 5, 10, new ArrayBlockingQueue<>(15), new DiscardRejectPolicy()); AtomicInteger num = new AtomicInteger(0); for (int i = 0; i < 100; i++) { threadPool.execute(()->{ try { Thread.sleep(1000); System.out.println("running: " + System.currentTimeMillis() + ": " + num.incrementAndGet()); } catch (InterruptedException e) { e.printStackTrace(); } }); } } } 

歡迎關注我的公眾號“彤哥讀源碼”,查看更多源碼系列文章, 與彤哥一起暢游源碼的海洋
























免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM