線程池的問題


面試-線程池的成長之路      

 

 

背景

相信大家在面試過程中遇到面試官問線程的很多,線程過后就是線程池了。從易到難,都是這么個過程,還有就是確實很多人在工作中接觸線程池比較少,最多的也就是創建一個然后往里面提交線程,對於一些經驗很豐富的面試官來說,一下就可以問出很多線程池相關的問題,與其被問的暈頭轉向,還不如好好學習。此時不努力更待何時。

什么是線程池?

線程池是一種多線程處理形式,處理過程中將任務提交到線程池,任務的執行交由線程池來管理。

如果每個請求都創建一個線程去處理,那么服務器的資源很快就會被耗盡,使用線程池可以減少創建和銷毀線程的次數,每個工作線程都可以被重復利用,可執行多個任務。

如果用生活中的列子來說明,我們可以把線程池當做一個客服團隊,如果同時有1000個人打電話進行咨詢,按照正常的邏輯那就是需要1000個客服接聽電話,服務客戶。現實往往需要考慮到很多層面的東西,比如:資源夠不夠,招這么多人需要費用比較多。正常的做法就是招100個人成立一個客服中心,當有電話進來后分配沒有接聽的客服進行服務,如果超出了100個人同時咨詢的話,提示客戶等待,稍后處理,等有客服空出來就可以繼續服務下一個客戶,這樣才能達到一個資源的合理利用,實現效益的最大化。

Java中的線程池種類

1. newSingleThreadExecutor

創建方式:

  1. ExecutorService pool = Executors.newSingleThreadExecutor();

一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。

使用方式:

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class ThreadPool {
  4. public static void main(String[] args) {
  5. ExecutorService pool = Executors.newSingleThreadExecutor();
  6. for (int i = 0; i < 10; i++) {
  7. pool.execute(() -> {
  8. System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");
  9. });
  10. }
  11. }
  12. }

輸出結果如下:

  1. pool-1-thread-1 開始發車啦....
  2. pool-1-thread-1 開始發車啦....
  3. pool-1-thread-1 開始發車啦....
  4. pool-1-thread-1 開始發車啦....
  5. pool-1-thread-1 開始發車啦....
  6. pool-1-thread-1 開始發車啦....
  7. pool-1-thread-1 開始發車啦....
  8. pool-1-thread-1 開始發車啦....
  9. pool-1-thread-1 開始發車啦....
  10. pool-1-thread-1 開始發車啦....

從輸出的結果我們可以看出,一直只有一個線程在運行。

2.newFixedThreadPool

創建方式:

  1. ExecutorService pool = Executors.newFixedThreadPool(10);

創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。

使用方式:

  1. import java.util.concurrent.ExecutorService;
  2. import java.util.concurrent.Executors;
  3. public class ThreadPool {
  4. public static void main(String[] args) {
  5. ExecutorService pool = Executors.newFixedThreadPool(10);
  6. for (int i = 0; i < 10; i++) {
  7. pool.execute(() -> {
  8. System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");
  9. });
  10. }
  11. }
  12. }

輸出結果如下:

  1. pool-1-thread-1 開始發車啦....
  2. pool-1-thread-4 開始發車啦....
  3. pool-1-thread-3 開始發車啦....
  4. pool-1-thread-2 開始發車啦....
  5. pool-1-thread-6 開始發車啦....
  6. pool-1-thread-7 開始發車啦....
  7. pool-1-thread-5 開始發車啦....
  8. pool-1-thread-8 開始發車啦....
  9. pool-1-thread-9 開始發車啦....
  10. pool-1-thread-10 開始發車啦....

3. newCachedThreadPool

創建方式:

  1. ExecutorService pool = Executors.newCachedThreadPool();

創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑的線程,當任務數增加時,此線程池又添加新線程來處理任務。

使用方式如上2所示。

4.newScheduledThreadPool

創建方式:

  1. ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);

此線程池支持定時以及周期性執行任務的需求。

使用方式:

  1. import java.util.concurrent.Executors;
  2. import java.util.concurrent.ScheduledExecutorService;
  3. import java.util.concurrent.TimeUnit;
  4. public class ThreadPool {
  5. public static void main(String[] args) {
  6. ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
  7. for (int i = 0; i < 10; i++) {
  8. pool.schedule(() -> {
  9. System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");
  10. }, 10, TimeUnit.SECONDS);
  11. }
  12. }
  13. }

上面演示的是延遲10秒執行任務,如果想要執行周期性的任務可以用下面的方式,每秒執行一次

  1. //pool.scheduleWithFixedDelay也可以
  2. pool.scheduleAtFixedRate(() -> {
  3. System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");
  4. }, 1, 1, TimeUnit.SECONDS);

5.newWorkStealingPool
newWorkStealingPool是jdk1.8才有的,會根據所需的並行層次來動態創建和關閉線程,通過使用多個隊列減少競爭,底層用的ForkJoinPool來實現的。ForkJoinPool的優勢在於,可以充分利用多cpu,多核cpu的優勢,把一個任務拆分成多個“小任務”,把多個“小任務”放到多個處理器核心上並行執行;當多個“小任務”執行完成之后,再將這些執行結果合並起來即可。

說說線程池的拒絕策略

當請求任務不斷的過來,而系統此時又處理不過來的時候,我們需要采取的策略是拒絕服務。RejectedExecutionHandler接口提供了拒絕任務處理的自定義方法的機會。在ThreadPoolExecutor中已經包含四種處理策略。

  • AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作。
  1. public static class AbortPolicy implements RejectedExecutionHandler {
  2. public AbortPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. throw new RejectedExecutionException("Task " + r.toString() +
  5. " rejected from " +
  6. e.toString());
  7. }
  8. }
  • CallerRunsPolicy 策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前的被丟棄的任務。
  1. public static class CallerRunsPolicy implements RejectedExecutionHandler {
  2. public CallerRunsPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. if (!e.isShutdown()) {
  5. r.run();
  6. }
  7. }
  8. }
  • DiscardOleddestPolicy策略: 該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。
  1. public static class DiscardOldestPolicy implements RejectedExecutionHandler {
  2. public DiscardOldestPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. if (!e.isShutdown()) {
  5. e.getQueue().poll();
  6. e.execute(r);
  7. }
  8. }
  9. }
  • DiscardPolicy策略:該策略默默的丟棄無法處理的任務,不予任何處理。
  1. public static class DiscardPolicy implements RejectedExecutionHandler {
  2. public DiscardPolicy() { }
  3. public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
  4. }
  5. }

除了JDK默認為什么提供的四種拒絕策略,我們可以根據自己的業務需求去自定義拒絕策略,自定義的方式很簡單,直接實現RejectedExecutionHandler接口即可

比如Spring integration中就有一個自定義的拒絕策略CallerBlocksPolicy,將任務插入到隊列中,直到隊列中有空閑並插入成功的時候,否則將根據最大等待時間一直阻塞,直到超時

  1. package org.springframework.integration.util;
  2. import java.util.concurrent.BlockingQueue;
  3. import java.util.concurrent.RejectedExecutionException;
  4. import java.util.concurrent.RejectedExecutionHandler;
  5. import java.util.concurrent.ThreadPoolExecutor;
  6. import java.util.concurrent.TimeUnit;
  7. import org.apache.commons.logging.Log;
  8. import org.apache.commons.logging.LogFactory;
  9. public class CallerBlocksPolicy implements RejectedExecutionHandler {
  10. private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class);
  11. private final long maxWait;
  12. /**
  13. * @param maxWait The maximum time to wait for a queue slot to be
  14. * available, in milliseconds.
  15. */
  16. public CallerBlocksPolicy(long maxWait) {
  17. this.maxWait = maxWait;
  18. }
  19. @Override
  20. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  21. if (!executor.isShutdown()) {
  22. try {
  23. BlockingQueue<Runnable> queue = executor.getQueue();
  24. if (logger.isDebugEnabled()) {
  25. logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds");
  26. }
  27. if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {
  28. throw new RejectedExecutionException("Max wait time expired to queue task");
  29. }
  30. if (logger.isDebugEnabled()) {
  31. logger.debug("Task execution queued");
  32. }
  33. }
  34. catch (InterruptedException e) {
  35. Thread.currentThread().interrupt();
  36. throw new RejectedExecutionException("Interrupted", e);
  37. }
  38. }
  39. else {
  40. throw new RejectedExecutionException("Executor has been shut down");
  41. }
  42. }
  43. }

定義好之后如何使用呢?光定義沒用的呀,一定要用到線程池中呀,可以通過下面的方式自定義線程池,指定拒絕策略。

  1. BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
  2. ThreadPoolExecutor executor = new ThreadPoolExecutor(
  3. 10, 100, 10, TimeUnit.SECONDS, workQueue, new CallerBlocksPolicy());

execute和submit的區別?

在前面的講解中,我們執行任務是用的execute方法,除了execute方法,還有一個submit方法也可以執行我們提交的任務。

這兩個方法有什么區別呢?分別適用於在什么場景下呢?我們來做一個簡單的分析。

execute適用於不需要關注返回值的場景,只需要將線程丟到線程池中去執行就可以了

  1. public class ThreadPool {
  2. public static void main(String[] args) {
  3. ExecutorService pool = Executors.newFixedThreadPool(10);
  4. pool.execute(() -> {
  5. System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");
  6. });
  7. }
  8. }

submit方法適用於需要關注返回值的場景,submit方法的定義如下:

  1. public interface ExecutorService extends Executor {
  2.   ...
  3.   <T> Future<T> submit(Callable<T> task);
  4.   <T> Future<T> submit(Runnable task, T result);
  5.   Future<?> submit(Runnable task);
  6.   ...
  7. }

其子類AbstractExecutorService實現了submit方法,可以看到無論參數是Callable還是Runnable,最終都會被封裝成RunnableFuture,然后再調用execute執行。

  1. /**
  2. * @throws RejectedExecutionException {@inheritDoc}
  3. * @throws NullPointerException {@inheritDoc}
  4. */
  5. public Future<?> submit(Runnable task) {
  6. if (task == null) throw new NullPointerException();
  7. RunnableFuture<Void> ftask = newTaskFor(task, null);
  8. execute(ftask);
  9. return ftask;
  10. }
  11. /**
  12. * @throws RejectedExecutionException {@inheritDoc}
  13. * @throws NullPointerException {@inheritDoc}
  14. */
  15. public <T> Future<T> submit(Runnable task, T result) {
  16. if (task == null) throw new NullPointerException();
  17. RunnableFuture<T> ftask = newTaskFor(task, result);
  18. execute(ftask);
  19. return ftask;
  20. }
  21. /**
  22. * @throws RejectedExecutionException {@inheritDoc}
  23. * @throws NullPointerException {@inheritDoc}
  24. */
  25. public <T> Future<T> submit(Callable<T> task) {
  26. if (task == null) throw new NullPointerException();
  27. RunnableFuture<T> ftask = newTaskFor(task);
  28. execute(ftask);
  29. return ftask;
  30. }

下面我們來看看這三個方法分別如何去使用:

submit(Callable task);

  1. public class ThreadPool {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService pool = Executors.newFixedThreadPool(10);
  4. Future<String> future = pool.submit(new Callable<String>() {
  5. @Override
  6. public String call() throws Exception {
  7. return "Hello";
  8. }
  9. });
  10. String result = future.get();
  11. System.out.println(result);
  12. }
  13. }

submit(Runnable task, T result);

  1. public class ThreadPool {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService pool = Executors.newFixedThreadPool(10);
  4. Data data = new Data();
  5. Future<Data> future = pool.submit(new MyRunnable(data), data);
  6. String result = future.get().getName();
  7. System.out.println(result);
  8. }
  9. }
  10. class Data {
  11. String name;
  12. public String getName() {
  13. return name;
  14. }
  15. public void setName(String name) {
  16. this.name = name;
  17. }
  18. }
  19. class MyRunnable implements Runnable {
  20. private Data data;
  21. public MyRunnable(Data data) {
  22. this.data = data;
  23. }
  24. @Override
  25. public void run() {
  26. data.setName("yinjihuan");
  27. }
  28. }

Future<?> submit(Runnable task);
直接submit一個Runnable是拿不到返回值的,返回值就是null.

五種線程池的使用場景

  • newSingleThreadExecutor:一個單線程的線程池,可以用於需要保證順序執行的場景,並且只有一個線程在執行。

  • newFixedThreadPool:一個固定大小的線程池,可以用於已知並發壓力的情況下,對線程數做限制。

  • newCachedThreadPool:一個可以無限擴大的線程池,比較適合處理執行時間比較小的任務。

  • newScheduledThreadPool:可以延時啟動,定時啟動的線程池,適用於需要多個后台線程執行周期任務的場景。

  • newWorkStealingPool:一個擁有多個任務隊列的線程池,可以減少連接數,創建當前可用cpu數量的線程來並行執行。

線程池的關閉

關閉線程池可以調用shutdownNow和shutdown兩個方法來實現

shutdownNow:對正在執行的任務全部發出interrupt(),停止執行,對還未開始執行的任務全部取消,並且返回還沒開始的任務列表

  1. public class ThreadPool {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService pool = Executors.newFixedThreadPool(1);
  4. for (int i = 0; i < 5; i++) {
  5. System.err.println(i);
  6. pool.execute(() -> {
  7. try {
  8. Thread.sleep(30000);
  9. System.out.println("--");
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. });
  14. }
  15. Thread.sleep(1000);
  16. List<Runnable> runs = pool.shutdownNow();
  17. }
  18. }

上面的代碼模擬了立即取消的場景,往線程池里添加5個線程任務,然后sleep一段時間,線程池只有一個線程,如果此時調用shutdownNow后應該需要中斷一個正在執行的任務和返回4個還未執行的任務,控制台輸出下面的內容:

  1. 0
  2. 1
  3. 2
  4. 3
  5. 4
  6. [fs.ThreadPool$$Lambda$1/990368553@682a0b20,
  7. fs.ThreadPool$$Lambda$1/990368553@682a0b20,
  8. fs.ThreadPool$$Lambda$1/990368553@682a0b20,
  9. fs.ThreadPool$$Lambda$1/990368553@682a0b20]
  10. java.lang.InterruptedException: sleep interrupted
  11. at java.lang.Thread.sleep(Native Method)
  12. at fs.ThreadPool.lambda$0(ThreadPool.java:15)
  13. at fs.ThreadPool$$Lambda$1/990368553.run(Unknown Source)
  14. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
  15. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
  16. at java.lang.Thread.run(Thread.java:745)

shutdown:當我們調用shutdown后,線程池將不再接受新的任務,但也不會去強制終止已經提交或者正在執行中的任務

  1. public class ThreadPool {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService pool = Executors.newFixedThreadPool(1);
  4. for (int i = 0; i < 5; i++) {
  5. System.err.println(i);
  6. pool.execute(() -> {
  7. try {
  8. Thread.sleep(30000);
  9. System.out.println("--");
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. });
  14. }
  15. Thread.sleep(1000);
  16. pool.shutdown();
  17. pool.execute(() -> {
  18. try {
  19. Thread.sleep(30000);
  20. System.out.println("--");
  21. } catch (Exception e) {
  22. e.printStackTrace();
  23. }
  24. });
  25. }
  26. }

上面的代碼模擬了正在運行的狀態,然后調用shutdown,接着再往里面添加任務,肯定是拒絕添加的,請看輸出結果:

  1. 0
  2. 1
  3. 2
  4. 3
  5. 4
  6. Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task fs.ThreadPool$$Lambda$2/1747585824@3d075dc0 rejected from java.util.concurrent.ThreadPoolExecutor@214c265e[Shutting down, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]
  7. at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
  8. at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
  9. at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
  10. at fs.ThreadPool.main(ThreadPool.java:24)

還有一些業務場景下需要知道線程池中的任務是否全部執行完成,當我們關閉線程池之后,可以用isTerminated來判斷所有的線程是否執行完成,千萬不要用isShutdown,isShutdown只是返回你是否調用過shutdown的結果。

  1. public class ThreadPool {
  2. public static void main(String[] args) throws Exception {
  3. ExecutorService pool = Executors.newFixedThreadPool(1);
  4. for (int i = 0; i < 5; i++) {
  5. System.err.println(i);
  6. pool.execute(() -> {
  7. try {
  8. Thread.sleep(3000);
  9. System.out.println("--");
  10. } catch (Exception e) {
  11. e.printStackTrace();
  12. }
  13. });
  14. }
  15. Thread.sleep(1000);
  16. pool.shutdown();
  17. while(true){
  18. if(pool.isTerminated()){
  19. System.out.println("所有的子線程都結束了!");
  20. break;
  21. }
  22. Thread.sleep(1000);
  23. }
  24. }
  25. }

自定義線程池

在實際的使用過程中,大部分我們都是用Executors去創建線程池直接使用,如果有一些其他的需求,比如指定線程池的拒絕策略,阻塞隊列的類型,線程名稱的前綴等等,我們可以采用自定義線程池的方式來解決。

如果只是簡單的想要改變線程名稱的前綴的話可以自定義ThreadFactory來實現,在Executors.new…中有一個ThreadFactory的參數,如果沒有指定則用的是DefaultThreadFactory。

自定義線程池核心在於創建一個ThreadPoolExecutor對象,指定參數

下面我們看下ThreadPoolExecutor構造函數的定義:

  1. public ThreadPoolExecutor(int corePoolSize,
  2. int maximumPoolSize,
  3. long keepAliveTime,
  4. TimeUnit unit,
  5. BlockingQueue<Runnable> workQueue,
  6. ThreadFactory threadFactory,
  7. RejectedExecutionHandler handler) ;
  • corePoolSize
    線程池大小,決定着新提交的任務是新開線程去執行還是放到任務隊列中,也是線程池的最最核心的參數。一般線程池開始時是沒有線程的,只有當任務來了並且線程數量小於corePoolSize才會創建線程。
  • maximumPoolSize
    最大線程數,線程池能創建的最大線程數量。
  • keepAliveTime
    在線程數量超過corePoolSize后,多余空閑線程的最大存活時間。
  • unit
    時間單位
  • workQueue
    存放來不及處理的任務的隊列,是一個BlockingQueue。
  • threadFactory
    生產線程的工廠類,可以定義線程名,優先級等。
  • handler
    拒絕策略,當任務來不及處理的時候,如何處理, 前面有講解。

了解上面的參數信息后我們就可以定義自己的線程池了,我這邊用ArrayBlockingQueue替換了LinkedBlockingQueue,指定了隊列的大小,當任務超出隊列大小之后使用CallerRunsPolicy拒絕策略處理。

這樣做的好處是嚴格控制了隊列的大小,不會出現一直往里面添加任務的情況,有的時候任務處理的比較慢,任務數量過多會占用大量內存,導致內存溢出。

當然你也可以在提交到線程池的入口進行控制,比如用CountDownLatch, Semaphore等。

  1. /**
  2. * 自定義線程池<br>
  3. * 默認的newFixedThreadPool里的LinkedBlockingQueue是一個無邊界隊列,如果不斷的往里加任務,最終會導致內存的不可控<br>
  4. * 增加了有邊界的隊列,使用了CallerRunsPolicy拒絕策略
  5. * @author yinjihuan
  6. *
  7. */
  8. public class FangjiaThreadPoolExecutor {
  9. private static ExecutorService executorService = newFixedThreadPool(50);
  10. private static ExecutorService newFixedThreadPool(int nThreads) {
  11. return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
  12. new ArrayBlockingQueue<Runnable>(10000), new DefaultThreadFactory(), new CallerRunsPolicy());
  13. }
  14. public static void execute(Runnable command) {
  15. executorService.execute(command);
  16. }
  17. public static void shutdown() {
  18. executorService.shutdown();
  19. }
  20. static class DefaultThreadFactory implements ThreadFactory {
  21. private static final AtomicInteger poolNumber = new AtomicInteger(1);
  22. private final ThreadGroup group;
  23. private final AtomicInteger threadNumber = new AtomicInteger(1);
  24. private final String namePrefix;
  25. DefaultThreadFactory() {
  26. SecurityManager s = System.getSecurityManager();
  27. group = (s != null) ? s.getThreadGroup() :
  28. Thread.currentThread().getThreadGroup();
  29. namePrefix = "FSH-pool-" +
  30. poolNumber.getAndIncrement() +
  31. "-thread-";
  32. }
  33. public Thread newThread(Runnable r) {
  34. Thread t = new Thread(group, r,
  35. namePrefix + threadNumber.getAndIncrement(),
  36. 0);
  37. if (t.isDaemon())
  38. t.setDaemon(false);
  39. if (t.getPriority() != Thread.NORM_PRIORITY)
  40. t.setPriority(Thread.NORM_PRIORITY);
  41. return t;
  42. }
  43. }
  44. }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM