一. 線程池簡介
1. 線程池的概念:
線程池就是首先創建一些線程,它們的集合稱為線程池。使用線程池可以很好地提高性能,線程池在系統啟動時即創建大量空閑的線程,程序將一個任務傳給線程池,線程池就會啟動一條線程來執行這個任務,執行結束以后,該線程並不會死亡,而是再次返回線程池中成為空閑狀態,等待執行下一個任務。
2. 線程池的工作機制
2.1 在線程池的編程模式下,任務是提交給整個線程池,而不是直接提交給某個線程,線程池在拿到任務后,就在內部尋找是否有空閑的線程,如果有,則將任務交給某個空閑的線程。
2.1 一個線程同時只能執行一個任務,但可以同時向一個線程池提交多個任務。
3. 使用線程池的好處
Java中的線程池是運用場景最多的並發框架,幾乎所有需要異步或並發執行任務的程序都可以使用線程池。在開發過程中,合理地使用線程池能夠帶來3個好處:
第一:降低資源消耗。通過重復利用已創建的線程降低線程創建和銷毀造成的消耗。
第二:提高響應速度。當任務到達時,任務可以不需要等到線程創建就能立即執行。
第三:提高線程的可管理性。線程是稀缺資源,如果無限制地創建,不僅會消耗系統資源,還會降低系統的穩定性,使用線程池可以進行統一分配、調優和監控。但是,要做到合理利用線程池,必須對其實現原理了如指掌。
二:JDK對線程池的支持
JDK提供的Executor框架
JDK提供了Executor框架,可以讓我們有效的管理和控制我們的線程,其實質也就是一個線程池。Executor下的接口和類繼承關系如下:
Executors:提供了一系列靜態工廠方法用於創建各種線程池
其中常用幾類如下:
public static ExecutorService newFixedThreadPool() public static ExecutorService newSingleThreadExecutor() public static ExecutorService newCachedThreadPool() public static ScheduledExecutorService newSingleThreadScheduledExecutor() public static ScheduledExecutorService newScheduledThreadPool()
1、newFixedThreadPool:該方法返回一個固定線程數量的線程池;
2、newSingleThreadExecutor:該方法返回一個只有一個現成的線程池;
3、newCachedThreadPool:返回一個可以根據實際情況調整線程數量的線程池;
4、newSingleThreadScheduledExecutor:該方法和newSingleThreadExecutor的區別是給定了時間執行某任務的功能,可以進行定時執行等;
5、newScheduledThreadPool:在4的基礎上可以指定線程數量。
創建線程池實質調用的還是ThreadPoolExecutor
在Executors類中,我們拿出來一個方法簡單分析一下:
可以看出,類似的其他方法一樣,在Executors內部創建線程池的時候,實際創建的都是一個ThreadPoolExecutor對象,只是對ThreadPoolExecutor構造方法,進行了默認值的設定。ThreadPoolExecutor的構造方法如下:
參數含義如下:
1、corePoolSize 核心線程池大小;
2、maximumPoolSize 線程池最大容量大小;
3、keepAliveTime 線程池空閑時,線程存活的時間;
4、TimeUnit 時間單位;
5、ThreadFactory 線程工廠;
6、BlockingQueue任務隊列;
7、RejectedExecutionHandler 線程拒絕策略;
Executor框架實例
1、實例一:
public class ThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(4); for (int i = 0; i < 10; i++) { int index = i; executorService.submit(() -> System.out.println("i:" + index + " executorService")); } executorService.shutdown(); } }
submit(Runnable task)方法提交一個線程。
但是使用最新的“阿里巴巴編碼規范插件”檢測一下會發現:
線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,
這樣的處理方式讓寫的同學更加明確線程池的運行規則,規避資源耗盡的風險。
說明:Executors各個方法的弊端:
1)newFixedThreadPool和newSingleThreadExecutor:
主要問題是堆積的請求處理隊列可能會耗費非常大的內存,甚至OOM。
2)newCachedThreadPool和newScheduledThreadPool:
主要問題是線程數最大數是Integer.MAX_VALUE,可能會創建數量非常多的線程,甚至OOM。
2、實例二:
遵循阿里巴巴編碼規范的提示,示例如下:
public class ThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 10; i++) { int index = i; executorService.submit(() -> System.out.println("i:" + index + " executorService")); } executorService.shutdown(); } }
或者這樣:
public class ThreadPoolDemo { public static void main(String[] args) { ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 10; i++) { int index = i; pool.submit(() -> System.out.println("i:" + index + " executorService")); } pool.shutdown(); } }
3、實例三:
自定義ThreadFactory、自定義線程拒絕策略
public static void main(String[] args) { ExecutorService executorService = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(10), new ThreadFactory() { //自定義ThreadFactory @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName(r.getClass().getName()); return thread; } }, new ThreadPoolExecutor.AbortPolicy()); //自定義線程拒絕策略 for (int i = 0; i < 10; i++) { int index = i; executorService.submit(() -> System.out.println("i:" + index)); } executorService.shutdown(); } }
使用submit的坑
首先看一下實例:
public class ThreadPoolDemo3 { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(4); for (int i = 0; i < 5; i++) { int index = i; executorService.submit(() -> divTask(100, index)); } executorService.shutdown(); }
private static void divTask(int a, int b) { double result = a / b; System.out.println(result); } }
運行結果:
上述代碼,可以看出運行結果為4個,因該是有5個的,但是當i=0
的時候,100/0
是會報錯的,但是日志信息中沒有任何信息,是為什么那?如果使用了submit(Runnable task)
就會出現這種情況,任何的錯誤信息都出現不了!
這是因為使用submit(Runnable task)
的時候,錯誤的堆棧信息跑出來的時候會被內部捕獲到,所以打印不出來具體的信息讓我們查看,解決的方法有如下兩種:
1、使用execute()代替submit();
public class ThreadPoolDemo3 { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(4); for (int i = 0; i < 5; i++) { int index = i; executorService.execute(() -> divTask(100, index)); } executorService.shutdown(); } private static void divTask(int a, int b) { double result = a / b; System.out.println(result); } }
運行結果:
2、使用Future
public class ThreadPoolDemo3 { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(4); for (int i = 0; i < 5; i++) { int index = i; Future future = executorService.submit(() -> divTask(200, index)); try { future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } executorService.shutdown(); } private static void divTask(int a, int b) { double result = a / b; System.out.println(result); } }
運行結果:
3、execute和submit的區別
(1)execute()
方法用於提交不需要返回值的任務,所以無法判斷任務是否被線程池執行成功。通過以下代碼可知execute()
方法輸入的任務是一個Runnable類的實例。
(2)submit()
方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個future對象可以判斷任務是否執行成功,並且可以通過future的get()
方法來獲取返回值,get()
方法會阻塞當前線程直到任務完成,而使用get(long timeout,TimeUnit unit)
方法則會阻塞當前線程一段時間后立即返回,這時候有可能任務沒有執行完。
思考:線程池中的工作線程是如何實現線程復用的?
一個線程一般在執行完任務后就結束了,怎么再讓他執行下一個任務呢?
則要達到復用的目的,則必須從Runnable接口的run()方法上入手,可以這樣設計這個Runnable.run()方法(就叫外面的run()方法):
它 本質上是個無限循環,跑的過程中不斷檢查我們是否有新加入的子Runnable對象(就叫內部的runnable:run()吧,它就是用來實現我們自己的任務),有就調一下我們的run(),其實就一個大run()把其它小run()#1,run()#2,...給串聯起來了,基本原理就這么簡單
不停地處理我們提交的Runnable任務。
public void run() { while(true) { if(tasks available) { Runnable task = taskqueue.dequeue(); task.run(); } else { // wait or whatever } } }
下面舉個代碼實例來模擬實現線程池復用線程
生產了兩個 線程作為工人
生產了10個同樣的任務,讓他們執行
利用復用讓 2個線程完成10個任務
import java.util.ArrayList; import java.util.LinkedList; import java.util.concurrent.TimeUnit; public class Mythreadpool { LinkedList<Task> taskList = new LinkedList<Task>(); class Task { //任務類 int id; Task(int id){ this.id=id; System.out.println("第"+id+"個任務產生"); } public void run() {//具體的工作 System.out.println("第"+id+"個任務正在執行"); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("第"+id+"個任務執行完畢"); } } class Worker extends Thread { //工人實體 String name; Worker(String name) { this.name = name; } public void run() { while(true) { if(taskList.size() == 0) { try { synchronized (taskList) { System.out.println("Worker " + name+" 沒有任務"); taskList.wait(); //沒得到任務,進入tasklist的等待隊列 } } catch (InterruptedException e) { e.printStackTrace(); } } synchronized (taskList) { System.out.println("Worker " + name+" 得到任務"); taskList.removeFirst().run(); } } } } void pool() { //工人。只生產了兩個工人 ArrayList<Worker> wokerlist=new ArrayList<Worker>(); for(int i=0;i<2;i++) { Worker k = new Worker("第"+(i+1)+"個工人"); k.start(); wokerlist.add(k);// } } class Factory extends Thread{ //生產任務的線程,總共會生產10個任務 public void run() { for(int i=0;i<10;i++) { synchronized(taskList) { taskList.addLast(new Task(i+1)); taskList.notify(); } try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } } } public static void main(String[] args) { Mythreadpool mythreadpool = new Mythreadpool(); mythreadpool.pool(); //初始化工人 Mythreadpool.Factory m= mythreadpool.new Factory(); m.start(); } }
執行效果:
分析jdk中是如何實現線程復用的
線程復用
即,如何將放入線程中的諸多任務,在N個線程中執行的。
ThreadPoolExecutor.execute()
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
分析:可以看出:ThreadPoolExecutor.execute()的功能就是:
1、將任務添加至阻塞隊列workQueue,workQueue.offer(command)
2、根據core和maxPool,選擇是否創建Worker,addWorker()
因此,線程復用的實現應該在worker中,打開addWorker()方法觀察
addWorker
private boolean addWorker(Runnable firstTask, boolean core) { //創建worker retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } //啟動worker boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { //ThreadExecutor的全局鎖,在創建\銷毀worker工作池的時候,才會用到 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
分析:addworker分為兩部分:1、創建worker,2、啟動worker
規則校驗:
與core和maxPool數量的規則相同
創建worker:
獲取ThreadLocal的全局鎖。 安全的創建Worker。
t.start();
因此:重點又回到了Worker的run方法上
Worker.run()
public void run() { runWorker(this); } final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
分析:這里就比較清晰了:
1、通過getTask()方法,獲取待執行的任務。
2、通過task.run();執行具體的任務。
3、正常情況,只有當所有任務執行完畢才會停止運行。
因此:
1、進一步分析getTask()
2、執行task.run()方法。-->>這里可以看出,事實上線程在執行任務的時候,本質上是調用了任務自身的run/call方法。
==》》有點像是thread.get(threadlocal) 本質上是調用了 threadlocalMap.get(thread) 的感覺
getTask()
private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { decrementWorkerCount(); return null; } int wc = workerCountOf(c); // Are workers subject to culling? boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { if (compareAndDecrementWorkerCount(c)) return null; continue; } try { Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
分析:也不用把代碼完全細節完全深究,可以發現方法是從workQueue中獲取task的,所以最終的問題就是看這個變量workQueue是誰的成員變量。
public class ThreadPoolExecutor extends AbstractExecutorService { private final BlockingQueue<Runnable> workQueue; 。。。 }
分析,getTask是從線程池中,獲取的任務。即所有的任務都放在ThreadPoolExecutor中,線程池啟動多個Worker去執行任務,每個worker不停的從ThreadPoolExector的workQueue中取出任務,比你高執行task.run()方法,直至所有的任務執行完畢。
至此分析完畢。
資料出處:
https://blog.csdn.net/yinni11/article/details/81348210
https://www.jianshu.com/p/93c26498a3c5
https://blog.csdn.net/qq_38966984/article/details/80415736