面試-線程池的成長之路
版權聲明:轉載請先聯系作者並標記出處。
背景
相信大家在面試過程中遇到面試官問線程的很多,線程過后就是線程池了。從易到難,都是這么個過程,還有就是確實很多人在工作中接觸線程池比較少,最多的也就是創建一個然后往里面提交線程,對於一些經驗很豐富的面試官來說,一下就可以問出很多線程池相關的問題,與其被問的暈頭轉向,還不如好好學習。此時不努力更待何時。
什么是線程池?
線程池是一種多線程處理形式,處理過程中將任務提交到線程池,任務的執行交由線程池來管理。
如果每個請求都創建一個線程去處理,那么服務器的資源很快就會被耗盡,使用線程池可以減少創建和銷毀線程的次數,每個工作線程都可以被重復利用,可執行多個任務。
如果用生活中的列子來說明,我們可以把線程池當做一個客服團隊,如果同時有1000個人打電話進行咨詢,按照正常的邏輯那就是需要1000個客服接聽電話,服務客戶。現實往往需要考慮到很多層面的東西,比如:資源夠不夠,招這么多人需要費用比較多。正常的做法就是招100個人成立一個客服中心,當有電話進來后分配沒有接聽的客服進行服務,如果超出了100個人同時咨詢的話,提示客戶等待,稍后處理,等有客服空出來就可以繼續服務下一個客戶,這樣才能達到一個資源的合理利用,實現效益的最大化。
Java中的線程池種類
1. newSingleThreadExecutor
創建方式:
ExecutorService pool = Executors.newSingleThreadExecutor();
一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。
使用方式:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadPool {public static void main(String[] args) {ExecutorService pool = Executors.newSingleThreadExecutor();for (int i = 0; i < 10; i++) {pool.execute(() -> {System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");});}}}
輸出結果如下:
pool-1-thread-1 開始發車啦....pool-1-thread-1 開始發車啦....pool-1-thread-1 開始發車啦....pool-1-thread-1 開始發車啦....pool-1-thread-1 開始發車啦....pool-1-thread-1 開始發車啦....pool-1-thread-1 開始發車啦....pool-1-thread-1 開始發車啦....pool-1-thread-1 開始發車啦....pool-1-thread-1 開始發車啦....
從輸出的結果我們可以看出,一直只有一個線程在運行。
2.newFixedThreadPool
創建方式:
ExecutorService pool = Executors.newFixedThreadPool(10);
創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。
使用方式:
import java.util.concurrent.ExecutorService;import java.util.concurrent.Executors;public class ThreadPool {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(10);for (int i = 0; i < 10; i++) {pool.execute(() -> {System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");});}}}
輸出結果如下:
pool-1-thread-1 開始發車啦....pool-1-thread-4 開始發車啦....pool-1-thread-3 開始發車啦....pool-1-thread-2 開始發車啦....pool-1-thread-6 開始發車啦....pool-1-thread-7 開始發車啦....pool-1-thread-5 開始發車啦....pool-1-thread-8 開始發車啦....pool-1-thread-9 開始發車啦....pool-1-thread-10 開始發車啦....
3. newCachedThreadPool
創建方式:
ExecutorService pool = Executors.newCachedThreadPool();
創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑的線程,當任務數增加時,此線程池又添加新線程來處理任務。
使用方式如上2所示。
4.newScheduledThreadPool
創建方式:
ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
此線程池支持定時以及周期性執行任務的需求。
使用方式:
import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;public class ThreadPool {public static void main(String[] args) {ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);for (int i = 0; i < 10; i++) {pool.schedule(() -> {System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");}, 10, TimeUnit.SECONDS);}}}
上面演示的是延遲10秒執行任務,如果想要執行周期性的任務可以用下面的方式,每秒執行一次
//pool.scheduleWithFixedDelay也可以pool.scheduleAtFixedRate(() -> {System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");}, 1, 1, TimeUnit.SECONDS);
5.newWorkStealingPool
newWorkStealingPool是jdk1.8才有的,會根據所需的並行層次來動態創建和關閉線程,通過使用多個隊列減少競爭,底層用的ForkJoinPool來實現的。ForkJoinPool的優勢在於,可以充分利用多cpu,多核cpu的優勢,把一個任務拆分成多個“小任務”,把多個“小任務”放到多個處理器核心上並行執行;當多個“小任務”執行完成之后,再將這些執行結果合並起來即可。
說說線程池的拒絕策略
當請求任務不斷的過來,而系統此時又處理不過來的時候,我們需要采取的策略是拒絕服務。RejectedExecutionHandler接口提供了拒絕任務處理的自定義方法的機會。在ThreadPoolExecutor中已經包含四種處理策略。
- AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作。
public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString() +" rejected from " +e.toString());}}
- CallerRunsPolicy 策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前的被丟棄的任務。
public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}}
- DiscardOleddestPolicy策略: 該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。
public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {e.getQueue().poll();e.execute(r);}}}
- DiscardPolicy策略:該策略默默的丟棄無法處理的任務,不予任何處理。
public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}}
除了JDK默認為什么提供的四種拒絕策略,我們可以根據自己的業務需求去自定義拒絕策略,自定義的方式很簡單,直接實現RejectedExecutionHandler接口即可
比如Spring integration中就有一個自定義的拒絕策略CallerBlocksPolicy,將任務插入到隊列中,直到隊列中有空閑並插入成功的時候,否則將根據最大等待時間一直阻塞,直到超時
package org.springframework.integration.util;import java.util.concurrent.BlockingQueue;import java.util.concurrent.RejectedExecutionException;import java.util.concurrent.RejectedExecutionHandler;import java.util.concurrent.ThreadPoolExecutor;import java.util.concurrent.TimeUnit;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;public class CallerBlocksPolicy implements RejectedExecutionHandler {private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class);private final long maxWait;/*** @param maxWait The maximum time to wait for a queue slot to be* available, in milliseconds.*/public CallerBlocksPolicy(long maxWait) {this.maxWait = maxWait;}@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {if (!executor.isShutdown()) {try {BlockingQueue<Runnable> queue = executor.getQueue();if (logger.isDebugEnabled()) {logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds");}if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {throw new RejectedExecutionException("Max wait time expired to queue task");}if (logger.isDebugEnabled()) {logger.debug("Task execution queued");}}catch (InterruptedException e) {Thread.currentThread().interrupt();throw new RejectedExecutionException("Interrupted", e);}}else {throw new RejectedExecutionException("Executor has been shut down");}}}
定義好之后如何使用呢?光定義沒用的呀,一定要用到線程池中呀,可以通過下面的方式自定義線程池,指定拒絕策略。
BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 100, 10, TimeUnit.SECONDS, workQueue, new CallerBlocksPolicy());
execute和submit的區別?
在前面的講解中,我們執行任務是用的execute方法,除了execute方法,還有一個submit方法也可以執行我們提交的任務。
這兩個方法有什么區別呢?分別適用於在什么場景下呢?我們來做一個簡單的分析。
execute適用於不需要關注返回值的場景,只需要將線程丟到線程池中去執行就可以了
public class ThreadPool {public static void main(String[] args) {ExecutorService pool = Executors.newFixedThreadPool(10);pool.execute(() -> {System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");});}}
submit方法適用於需要關注返回值的場景,submit方法的定義如下:
public interface ExecutorService extends Executor {...<T> Future<T> submit(Callable<T> task);<T> Future<T> submit(Runnable task, T result);Future<?> submit(Runnable task);...}
其子類AbstractExecutorService實現了submit方法,可以看到無論參數是Callable還是Runnable,最終都會被封裝成RunnableFuture,然后再調用execute執行。
/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public Future<?> submit(Runnable task) {if (task == null) throw new NullPointerException();RunnableFuture<Void> ftask = newTaskFor(task, null);execute(ftask);return ftask;}/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public <T> Future<T> submit(Runnable task, T result) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task, result);execute(ftask);return ftask;}/*** @throws RejectedExecutionException {@inheritDoc}* @throws NullPointerException {@inheritDoc}*/public <T> Future<T> submit(Callable<T> task) {if (task == null) throw new NullPointerException();RunnableFuture<T> ftask = newTaskFor(task);execute(ftask);return ftask;}
下面我們來看看這三個方法分別如何去使用:
submit(Callable task);
public class ThreadPool {public static void main(String[] args) throws Exception {ExecutorService pool = Executors.newFixedThreadPool(10);Future<String> future = pool.submit(new Callable<String>() {@Overridepublic String call() throws Exception {return "Hello";}});String result = future.get();System.out.println(result);}}
submit(Runnable task, T result);
public class ThreadPool {public static void main(String[] args) throws Exception {ExecutorService pool = Executors.newFixedThreadPool(10);Data data = new Data();Future<Data> future = pool.submit(new MyRunnable(data), data);String result = future.get().getName();System.out.println(result);}}class Data {String name;public String getName() {return name;}public void setName(String name) {this.name = name;}}class MyRunnable implements Runnable {private Data data;public MyRunnable(Data data) {this.data = data;}@Overridepublic void run() {data.setName("yinjihuan");}}
Future<?> submit(Runnable task);
直接submit一個Runnable是拿不到返回值的,返回值就是null.
五種線程池的使用場景
-
newSingleThreadExecutor:一個單線程的線程池,可以用於需要保證順序執行的場景,並且只有一個線程在執行。
-
newFixedThreadPool:一個固定大小的線程池,可以用於已知並發壓力的情況下,對線程數做限制。
-
newCachedThreadPool:一個可以無限擴大的線程池,比較適合處理執行時間比較小的任務。
-
newScheduledThreadPool:可以延時啟動,定時啟動的線程池,適用於需要多個后台線程執行周期任務的場景。
-
newWorkStealingPool:一個擁有多個任務隊列的線程池,可以減少連接數,創建當前可用cpu數量的線程來並行執行。
線程池的關閉
關閉線程池可以調用shutdownNow和shutdown兩個方法來實現
shutdownNow:對正在執行的任務全部發出interrupt(),停止執行,對還未開始執行的任務全部取消,並且返回還沒開始的任務列表
public class ThreadPool {public static void main(String[] args) throws Exception {ExecutorService pool = Executors.newFixedThreadPool(1);for (int i = 0; i < 5; i++) {System.err.println(i);pool.execute(() -> {try {Thread.sleep(30000);System.out.println("--");} catch (Exception e) {e.printStackTrace();}});}Thread.sleep(1000);List<Runnable> runs = pool.shutdownNow();}}
上面的代碼模擬了立即取消的場景,往線程池里添加5個線程任務,然后sleep一段時間,線程池只有一個線程,如果此時調用shutdownNow后應該需要中斷一個正在執行的任務和返回4個還未執行的任務,控制台輸出下面的內容:
01234[fs.ThreadPool$$Lambda$1/990368553@682a0b20,fs.ThreadPool$$Lambda$1/990368553@682a0b20,fs.ThreadPool$$Lambda$1/990368553@682a0b20,fs.ThreadPool$$Lambda$1/990368553@682a0b20]java.lang.InterruptedException: sleep interruptedat java.lang.Thread.sleep(Native Method)at fs.ThreadPool.lambda$0(ThreadPool.java:15)at fs.ThreadPool$$Lambda$1/990368553.run(Unknown Source)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745)
shutdown:當我們調用shutdown后,線程池將不再接受新的任務,但也不會去強制終止已經提交或者正在執行中的任務
public class ThreadPool {public static void main(String[] args) throws Exception {ExecutorService pool = Executors.newFixedThreadPool(1);for (int i = 0; i < 5; i++) {System.err.println(i);pool.execute(() -> {try {Thread.sleep(30000);System.out.println("--");} catch (Exception e) {e.printStackTrace();}});}Thread.sleep(1000);pool.shutdown();pool.execute(() -> {try {Thread.sleep(30000);System.out.println("--");} catch (Exception e) {e.printStackTrace();}});}}
上面的代碼模擬了正在運行的狀態,然后調用shutdown,接着再往里面添加任務,肯定是拒絕添加的,請看輸出結果:
01234Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task fs.ThreadPool$$Lambda$2/1747585824@3d075dc0 rejected from java.util.concurrent.ThreadPoolExecutor@214c265e[Shutting down, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)at fs.ThreadPool.main(ThreadPool.java:24)
還有一些業務場景下需要知道線程池中的任務是否全部執行完成,當我們關閉線程池之后,可以用isTerminated來判斷所有的線程是否執行完成,千萬不要用isShutdown,isShutdown只是返回你是否調用過shutdown的結果。
public class ThreadPool {public static void main(String[] args) throws Exception {ExecutorService pool = Executors.newFixedThreadPool(1);for (int i = 0; i < 5; i++) {System.err.println(i);pool.execute(() -> {try {Thread.sleep(3000);System.out.println("--");} catch (Exception e) {e.printStackTrace();}});}Thread.sleep(1000);pool.shutdown();while(true){if(pool.isTerminated()){System.out.println("所有的子線程都結束了!");break;}Thread.sleep(1000);}}}
自定義線程池
在實際的使用過程中,大部分我們都是用Executors去創建線程池直接使用,如果有一些其他的需求,比如指定線程池的拒絕策略,阻塞隊列的類型,線程名稱的前綴等等,我們可以采用自定義線程池的方式來解決。
如果只是簡單的想要改變線程名稱的前綴的話可以自定義ThreadFactory來實現,在Executors.new…中有一個ThreadFactory的參數,如果沒有指定則用的是DefaultThreadFactory。
自定義線程池核心在於創建一個ThreadPoolExecutor對象,指定參數
下面我們看下ThreadPoolExecutor構造函數的定義:
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) ;
- corePoolSize
線程池大小,決定着新提交的任務是新開線程去執行還是放到任務隊列中,也是線程池的最最核心的參數。一般線程池開始時是沒有線程的,只有當任務來了並且線程數量小於corePoolSize才會創建線程。 - maximumPoolSize
最大線程數,線程池能創建的最大線程數量。 - keepAliveTime
在線程數量超過corePoolSize后,多余空閑線程的最大存活時間。 - unit
時間單位 - workQueue
存放來不及處理的任務的隊列,是一個BlockingQueue。 - threadFactory
生產線程的工廠類,可以定義線程名,優先級等。 - handler
拒絕策略,當任務來不及處理的時候,如何處理, 前面有講解。
了解上面的參數信息后我們就可以定義自己的線程池了,我這邊用ArrayBlockingQueue替換了LinkedBlockingQueue,指定了隊列的大小,當任務超出隊列大小之后使用CallerRunsPolicy拒絕策略處理。
這樣做的好處是嚴格控制了隊列的大小,不會出現一直往里面添加任務的情況,有的時候任務處理的比較慢,任務數量過多會占用大量內存,導致內存溢出。
當然你也可以在提交到線程池的入口進行控制,比如用CountDownLatch, Semaphore等。
/*** 自定義線程池<br>* 默認的newFixedThreadPool里的LinkedBlockingQueue是一個無邊界隊列,如果不斷的往里加任務,最終會導致內存的不可控<br>* 增加了有邊界的隊列,使用了CallerRunsPolicy拒絕策略* @author yinjihuan**/public class FangjiaThreadPoolExecutor {private static ExecutorService executorService = newFixedThreadPool(50);private static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(10000), new DefaultThreadFactory(), new CallerRunsPolicy());}public static void execute(Runnable command) {executorService.execute(command);}public static void shutdown() {executorService.shutdown();}static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory() {SecurityManager s = System.getSecurityManager();group = (s != null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "FSH-pool-" +poolNumber.getAndIncrement() +"-thread-";}public Thread newThread(Runnable r) {Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon())t.setDaemon(false);if (t.getPriority() != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}}}
