面渣逆襲:線程池奪命連環十八問


大家好,我是老三,很高興又和大家見面。線程池是面試必問的知識點,這節我們來對線面試官,搞透線程池。

1. 什么是線程池?

線程池: 簡單理解,它就是一個管理線程的池子。

  • 它幫我們管理線程,避免增加創建線程和銷毀線程的資源損耗。因為線程其實也是一個對象,創建一個對象,需要經過類加載過程,銷毀一個對象,需要走GC垃圾回收流程,都是需要資源開銷的。
  • 提高響應速度。 如果任務到達了,相對於從線程池拿線程,重新去創建一條線程執行,速度肯定慢很多。
  • 重復利用。 線程用完,再放回池子,可以達到重復利用的效果,節省資源。

2. 能說說工作中線程池的應用嗎?

之前我們有一個和第三方對接的需求,需要向第三方推送數據,引入了多線程來提升數據推送的效率,其中用到了線程池來管理線程。

業務示例

主要代碼如下:

主要代碼

完整可運行代碼地址:https://gitee.com/fighter3/thread-demo.git

線程池的參數如下:

  • corePoolSize:線程核心參數選擇了CPU數×2

  • maximumPoolSize:最大線程數選擇了和核心線程數相同

  • keepAliveTime:非核心閑置線程存活時間直接置為0

  • unit:非核心線程保持存活的時間選擇了 TimeUnit.SECONDS 秒

  • workQueue:線程池等待隊列,使用 LinkedBlockingQueue阻塞隊列

同時還用了synchronized 來加鎖,保證數據不會被重復推送:

  synchronized (PushProcessServiceImpl.class) {}

ps:這個例子只是簡單地進行了數據推送,實際上還可以結合其他的業務,像什么數據清洗啊、數據統計啊,都可以套用。

3.能簡單說一下線程池的工作流程嗎?

用一個通俗的比喻:

有一個營業廳,總共有六個窗口,現在開放了三個窗口,現在有三個窗口坐着三個營業員小姐姐在營業。

老三去辦業務,可能會遇到什么情況呢?

  1. 老三發現有空間的在營業的窗口,直接去找xjj辦理業務。

直接辦理

  1. 老三發現沒有空閑的窗口,就在排隊區排隊等。

排隊等待

  1. 老三發現沒有空閑的窗口,等待區也滿了,蚌埠住了,經理一看,就讓休息的小姐姐趕緊回來上班,等待區號靠前的趕緊去新窗口辦,老三去排隊區排隊。小姐姐比較辛苦,假如一段時間發現他們可以不用接着營業,經理就讓她們接着休息。

排隊區滿

  1. 老三一看,六個窗口都滿了,等待區也沒位置了。老三急了,要鬧,經理趕緊出來了,經理該怎么辦呢?

等待區,排隊區都滿

  1. 我們銀行系統已經癱瘓

  2. 誰叫你來辦的你找誰去

  3. 看你比較急,去隊里加個塞

  4. 今天沒辦法,不行你看改一天

上面的這個流程幾乎就跟 JDK 線程池的大致流程類似,

  1. 營業中的 3個窗口對應核心線程池數:corePoolSize
  2. 總的營業窗口數6對應:maximumPoolSize
  3. 打開的臨時窗口在多少時間內無人辦理則關閉對應:unit
  4. 排隊區就是等待隊列:workQueue
  5. 無法辦理的時候銀行給出的解決方法對應:RejectedExecutionHandler
  6. threadFactory 該參數在 JDK 中是 線程工廠,用來創建線程對象,一般不會動。

所以我們線程池的工作流程也比較好理解了:

  1. 線程池剛創建時,里面沒有一個線程。任務隊列是作為參數傳進來的。不過,就算隊列里面有任務,線程池也不會馬上執行它們。
  2. 當調用 execute() 方法添加一個任務時,線程池會做如下判斷:
  • 如果正在運行的線程數量小於 corePoolSize,那么馬上創建線程運行這個任務;
  • 如果正在運行的線程數量大於或等於 corePoolSize,那么將這個任務放入隊列;
  • 如果這時候隊列滿了,而且正在運行的線程數量小於 maximumPoolSize,那么還是要創建非核心線程立刻運行這個任務;
  • 如果隊列滿了,而且正在運行的線程數量大於或等於 maximumPoolSize,那么線程池會根據拒絕策略來對應處理。

線程池執行流程

  1. 當一個線程完成任務時,它會從隊列中取下一個任務來執行。

  2. 當一個線程無事可做,超過一定的時間(keepAliveTime)時,線程池會判斷,如果當前運行的線程數大於 corePoolSize,那么這個線程就被停掉。所以線程池的所有任務完成后,它最終會收縮到 corePoolSize 的大小。

4.線程池主要參數有哪些?

  1. corePoolSize

此值是用來初始化線程池中核心線程數,當線程池中線程池數< corePoolSize時,系統默認是添加一個任務才創建一個線程池。當線程數 = corePoolSize時,新任務會追加到workQueue中。

  1. maximumPoolSize

maximumPoolSize表示允許的最大線程數 = (非核心線程數+核心線程數),當BlockingQueue也滿了,但線程池中總線程數 < maximumPoolSize時候就會再次創建新的線程。

  1. keepAliveTime

非核心線程 =(maximumPoolSize - corePoolSize ) ,非核心線程閑置下來不干活最多存活時間。

  1. unit

線程池中非核心線程保持存活的時間的單位

  • TimeUnit.DAYS; 天
  • TimeUnit.HOURS; 小時
  • TimeUnit.MINUTES; 分鍾
  • TimeUnit.SECONDS; 秒
  • TimeUnit.MILLISECONDS; 毫秒
  • TimeUnit.MICROSECONDS; 微秒
  • TimeUnit.NANOSECONDS; 納秒
  1. workQueue

線程池等待隊列,維護着等待執行的Runnable對象。當運行當線程數= corePoolSize時,新的任務會被添加到workQueue中,如果workQueue也滿了則嘗試用非核心線程執行任務,等待隊列應該盡量用有界的。

  1. threadFactory

創建一個新線程時使用的工廠,可以用來設定線程名、是否為daemon線程等等。

  1. handler

corePoolSizeworkQueuemaximumPoolSize都不可用的時候執行的飽和策略。

5.線程池的拒絕策略有哪些?

類比前面的例子,無法辦理業務時的處理方式,幫助記憶:

四種策略

  • AbortPolicy :直接拋出異常,默認使用此策略
  • CallerRunsPolicy:用調用者所在的線程來執行任務
  • DiscardOldestPolicy:丟棄阻塞隊列里最老的任務,也就是隊列里靠前的任務
  • DiscardPolicy :當前任務直接丟棄

想實現自己的拒絕策略,實現RejectedExecutionHandler接口即可。

6.線程池有哪幾種工作隊列?

常用的阻塞隊列主要有以下幾種:

  • ArrayBlockingQueue:ArrayBlockingQueue(有界隊列)是一個用數組實現的有界阻塞隊列,按FIFO排序量。
  • LinkedBlockingQueue:LinkedBlockingQueue(可設置容量隊列)是基於鏈表結構的阻塞隊列,按FIFO排序任務,容量可以選擇進行設置,不設置的話,將是一個無邊界的阻塞隊列,最大長度為Integer.MAX_VALUE,吞吐量通常要高於ArrayBlockingQuene;newFixedThreadPool線程池使用了這個隊列
  • DelayQueue:DelayQueue(延遲隊列)是一個任務定時周期的延遲執行的隊列。根據指定的執行時間從小到大排序,否則根據插入到隊列的先后排序。newScheduledThreadPool線程池使用了這個隊列。
  • PriorityBlockingQueue:PriorityBlockingQueue(優先級隊列)是具有優先級的無界阻塞隊列
  • SynchronousQueue:SynchronousQueue(同步隊列)是一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene,newCachedThreadPool線程池使用了這個隊列。

7.線程池提交execute和submit有什么區別?

  1. execute 用於提交不需要返回值的任務
threadsPool.execute(new Runnable() { 
    @Override public void run() { 
        // TODO Auto-generated method stub } 
    });
  1. submit()方法用於提交需要返回值的任務。線程池會返回一個future類型的對象,通過這個 future對象可以判斷任務是否執行成功,並且可以通過future的get()方法來獲取返回值
Future<Object> future = executor.submit(harReturnValuetask); 
try { Object s = future.get(); } catch (InterruptedException e) { 
    // 處理中斷異常 
} catch (ExecutionException e) { 
    // 處理無法執行任務異常 
} finally { 
    // 關閉線程池 executor.shutdown();
}

8.線程池怎么關閉知道嗎?

可以通過調用線程池的shutdownshutdownNow方法來關閉線程池。它們的原理是遍歷線程池中的工作線程,然后逐個調用線程的interrupt方法來中斷線程,所以無法響應中斷的任務可能永遠無法終止。

  1. shutdown() 將線程池狀態置為shutdown,並不會立即停止:
  1. 停止接收外部submit的任務
  2. 內部正在跑的任務和隊列里等待的任務,會執行完
  3. 等到第二步完成后,才真正停止
  1. shutdownNow() 將線程池狀態置為stop。一般會立即停止,事實上不一定:
  1. 和shutdown()一樣,先停止接收外部提交的任務
  2. 忽略隊列里等待的任務
  3. 嘗試將正在跑的任務interrupt中斷
  4. 返回未執行的任務列表

shutdown 和shutdownnow簡單來說區別如下:

shutdownNow()能立即停止線程池,正在跑的和正在等待的任務都停下了。這樣做立即生效,但是風險也比較大。shutdown()只是關閉了提交通道,用submit()是無效的;而內部的任務該怎么跑還是怎么跑,跑完再徹底停止線程池。

9.線程池的線程數應該怎么配置?

線程在Java中屬於稀缺資源,線程池不是越大越好也不是越小越好。任務分為計算密集型、IO密集型、混合型。

  1. 計算密集型:大部分都在用CPU跟內存,加密,邏輯操作業務處理等。
  2. IO密集型:數據庫鏈接,網絡通訊傳輸等。
  1. 計算密集型一般推薦線程池不要過大,一般是CPU數 + 1,+1是因為可能存在頁缺失(就是可能存在有些數據在硬盤中需要多來一個線程將數據讀入內存)。如果線程池數太大,可能會頻繁的 進行線程上下文切換跟任務調度。獲得當前CPU核心數代碼如下:
Runtime.getRuntime().availableProcessors();
  1. IO密集型:線程數適當大一點,機器的Cpu核心數*2。
  2. 混合型:可以考慮根絕情況將它拆分成CPU密集型和IO密集型任務,如果執行時間相差不大,拆分可以提升吞吐量,反之沒有必要。

當然,實際應用中沒有固定的公式,需要結合測試和監控來進行調整。

10.有哪幾種常見的線程池?

主要有四種,都是通過工具類Excutors創建出來的,阿里巴巴《Java開發手冊》里禁止使用這種方式來創建線程池。

  • newFixedThreadPool (固定數目線程的線程池)

  • newCachedThreadPool (可緩存線程的線程池)

  • newSingleThreadExecutor (單線程的線程池)

  • newScheduledThreadPool (定時及周期執行的線程池)

11.能說一下四種常見線程池的原理嗎?

前三種線程池的構造直接調用ThreadPoolExecutor的構造方法。

newSingleThreadExecutor

  public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

線程池特點

  • 核心線程數為1
  • 最大線程數也為1
  • 阻塞隊列是無界隊列LinkedBlockingQueue,可能會導致OOM
  • keepAliveTime為0

SingleThreadExecutor運行流程

工作流程:

  • 提交任務
  • 線程池是否有一條線程在,如果沒有,新建線程執行任務
  • 如果有,將任務加到阻塞隊列
  • 當前的唯一線程,從隊列取任務,執行完一個,再繼續取,一個線程執行任務。

適用場景

適用於串行執行任務的場景,一個任務一個任務地執行。

newFixedThreadPool

  public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

線程池特點:

  • 核心線程數和最大線程數大小一樣
  • 沒有所謂的非空閑時間,即keepAliveTime為0
  • 阻塞隊列為無界隊列LinkedBlockingQueue,可能會導致OOM

FixedThreadPool

工作流程:

  • 提交任務
  • 如果線程數少於核心線程,創建核心線程執行任務
  • 如果線程數等於核心線程,把任務添加到LinkedBlockingQueue阻塞隊列
  • 如果線程執行完任務,去阻塞隊列取任務,繼續執行。

使用場景

FixedThreadPool 適用於處理CPU密集型的任務,確保CPU在長期被工作線程使用的情況下,盡可能的少的分配線程,即適用執行長期的任務。

newCachedThreadPool

   public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

線程池特點:

  • 核心線程數為0
  • 最大線程數為Integer.MAX_VALUE,即無限大,可能會因為無限創建線程,導致OOM
  • 阻塞隊列是SynchronousQueue
  • 非核心線程空閑存活時間為60秒

當提交任務的速度大於處理任務的速度時,每次提交一個任務,就必然會創建一個線程。極端情況下會創建過多的線程,耗盡 CPU 和內存資源。由於空閑 60 秒的線程會被終止,長時間保持空閑的 CachedThreadPool 不會占用任何資源。

CachedThreadPool執行流程

工作流程:

  • 提交任務
  • 因為沒有核心線程,所以任務直接加到SynchronousQueue隊列。
  • 判斷是否有空閑線程,如果有,就去取出任務執行。
  • 如果沒有空閑線程,就新建一個線程執行。
  • 執行完任務的線程,還可以存活60秒,如果在這期間,接到任務,可以繼續活下去;否則,被銷毀。

適用場景

用於並發執行大量短期的小任務。

newScheduledThreadPool

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

線程池特點

  • 最大線程數為Integer.MAX_VALUE,也有OOM的風險
  • 阻塞隊列是DelayedWorkQueue
  • keepAliveTime為0
  • scheduleAtFixedRate() :按某種速率周期執行
  • scheduleWithFixedDelay():在某個延遲后執行

ScheduledThreadPool執行流程

工作機制

  • 線程從DelayQueue中獲取已到期的ScheduledFutureTask(DelayQueue.take())。到期任務是指ScheduledFutureTask的time大於等於當前時間。
  • 線程執行這個ScheduledFutureTask。
  • 線程修改ScheduledFutureTask的time變量為下次將要被執行的時間。
  • 線程把這個修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。

ScheduledThreadPoolExecutor執行流程

使用場景

周期性執行任務的場景,需要限制線程數量的場景

12.使用無界隊列的線程池會導致什么問題嗎?

例如newFixedThreadPool使用了無界的阻塞隊列LinkedBlockingQueue,如果線程獲取一個任務后,任務的執行時間比較長,會導致隊列的任務越積越多,導致機器內存使用不停飆升,最終導致OOM。

13.線程池異常怎么處理知道嗎?

在使用線程池處理任務的時候,任務代碼可能拋出RuntimeException,拋出異常后,線程池可能捕獲它,也可能創建一個新的線程來代替異常的線程,我們可能無法感知任務出現了異常,因此我們需要考慮線程池異常情況。

常見的異常處理方式:

線程池異常處理

14.能說一下線程池有幾種狀態嗎?

線程池有這幾個狀態:RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。

   //線程池狀態
   private static final int RUNNING    = -1 << COUNT_BITS;
   private static final int SHUTDOWN   =  0 << COUNT_BITS;
   private static final int STOP       =  1 << COUNT_BITS;
   private static final int TIDYING    =  2 << COUNT_BITS;
   private static final int TERMINATED =  3 << COUNT_BITS;

線程池各個狀態切換圖:

線程池狀態切換圖

RUNNING

  • 該狀態的線程池會接收新任務,並處理阻塞隊列中的任務;
  • 調用線程池的shutdown()方法,可以切換到SHUTDOWN狀態;
  • 調用線程池的shutdownNow()方法,可以切換到STOP狀態;

SHUTDOWN

  • 該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務;
  • 隊列為空,並且線程池中執行的任務也為空,進入TIDYING狀態;

STOP

  • 該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,而且會中斷正在運行的任務;
  • 線程池中執行的任務為空,進入TIDYING狀態;

TIDYING

  • 該狀態表明所有的任務已經運行終止,記錄的任務數量為0。
  • terminated()執行完畢,進入TERMINATED狀態

TERMINATED

  • 該狀態表示線程池徹底終止

15.線程池如何實現參數的動態修改?

線程池提供了幾個 setter方法來設置線程池的參數。

JDK 線程池參數設置接口來源參考[7]

這里主要有兩個思路:

動態修改線程池參數

  • 在我們微服務的架構下,可以利用配置中心如Nacos、Apollo等等,也可以自己開發配置中心。業務服務讀取線程池配置,獲取相應的線程池實例來修改線程池的參數。

  • 如果限制了配置中心的使用,也可以自己去擴展ThreadPoolExecutor,重寫方法,監聽線程池參數變化,來動態修改線程池參數。

16.線程池調優了解嗎?

線程池配置沒有固定的公式,通常事前會對線程池進行一定評估,常見的評估方案如下:

線程池評估方案 來源參考[7]

上線之前也要進行充分的測試,上線之后要建立完善的線程池監控機制。

事中結合監控告警機制,分析線程池的問題,或者可優化點,結合線程池動態參數配置機制來調整配置。

事后要注意仔細觀察,隨時調整。

線程池調優

具體的調優案例可以查看參考[7]美團技術博客。

17.你能設計實現一個線程池嗎?

⭐這道題在阿里的面試中出現頻率比較高

線程池實現原理可以查看 要是以前有人這么講線程池,我早就該明白了! ,當然,我們自己實現, 只需要抓住線程池的核心流程-參考[6]:

線程池主要實現流程

我們自己的實現就是完成這個核心流程:

  • 線程池中有N個工作線程
  • 把任務提交給線程池運行
  • 如果線程池已滿,把任務放入隊列
  • 最后當有空閑時,獲取隊列中任務來執行

實現代碼[6]:

public class MyThreadPoolExecutor implements Executor {

    //記錄線程池中線程數量
    private final AtomicInteger ctl = new AtomicInteger(0);

    //核心線程數
    private volatile int corePoolSize;
    //最大線程數
    private volatile int maximumPoolSize;

    //阻塞隊列
    private final BlockingQueue<Runnable> workQueue;

    public MyThreadPoolExecutor(int corePoolSize, int maximumPoolSize, BlockingQueue<Runnable> workQueue) {
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
    }

    /**
     * 執行
     *
     * @param command
     */
    @Override
    public void execute(Runnable command) {
        //工作線程數
        int c = ctl.get();
        //小於核心線程數
        if (c < corePoolSize) {
            //添加任務失敗
            if (!addWorker(command)) {
                //執行拒絕策略
                reject();
            }
            return;
        }
        //任務隊列添加任務
        if (!workQueue.offer(command)) {
            //任務隊列滿,嘗試啟動線程添加任務
            if (!addWorker(command)) {
                reject();
            }
        }
    }

    /**
     * 飽和拒絕
     */
    private void reject() {
        //直接拋出異常
        throw new RuntimeException("Can not execute!ctl.count:"
                + ctl.get() + "workQueue size:" + workQueue.size());
    }

    /**
     * 添加任務
     *
     * @param firstTask
     * @return
     */
    private boolean addWorker(Runnable firstTask) {
        if (ctl.get() >= maximumPoolSize) return false;
        Worker worker = new Worker(firstTask);
        //啟動線程
        worker.thread.start();
        ctl.incrementAndGet();
        return true;
    }

    /**
     * 線程池工作線程包裝類
     */
    private final class Worker implements Runnable {
        final Thread thread;
        Runnable firstTask;

        public Worker(Runnable firstTask) {
            this.thread = new Thread(this);
            this.firstTask = firstTask;
        }

        @Override
        public void run() {
            Runnable task = firstTask;
            try {
                //執行任務
                while (task != null || (task = getTask()) != null) {
                    task.run();
                    //線程池已滿,跳出循環
                    if (ctl.get() > maximumPoolSize) {
                        break;
                    }
                    task = null;
                }
            } finally {
                //工作線程數增加
                ctl.decrementAndGet();
            }
        }

        /**
         * 從隊列中獲取任務
         *
         * @return
         */
        private Runnable getTask() {
            for (; ; ) {
                try {
                    System.out.println("workQueue size:" + workQueue.size());
                    return workQueue.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //測試
    public static void main(String[] args) {
        MyThreadPoolExecutor myThreadPoolExecutor = new MyThreadPoolExecutor(2, 2,
                new ArrayBlockingQueue<Runnable>(10));
        for (int i = 0; i < 10; i++) {
            int taskNum = i;
            myThreadPoolExecutor.execute(() -> {
                try {
                    Thread.sleep(1500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("任務編號:" + taskNum);
            });
        }
    }
}

這樣,一個實現了線程池主要流程的類就完成了。

18.單機線程池執行斷電了應該怎么處理?


我們可以對正在處理和阻塞隊列的任務做事務管理或者對阻塞隊列中的任務持久化處理,並且當斷電或者系統崩潰,操作無法繼續下去的時候,可以通過回溯日志的方式來撤銷正在處理的已經執行成功的操作。然后重新執行整個阻塞隊列。

也就是:阻塞隊列持久化;正在處理任務事務控制;斷電之后正在處理任務的回滾,通過日志恢復該次操作;服務器重啟后阻塞隊列中的數據再加載。


參考:

[1]. 《Java並發編程的藝術》

[2]. 《Java發編程實戰》

[3]. 講真 這次絕對讓你輕松學習線程池

[4]. 面試必備:Java線程池解析

[5]. 面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!

[6]. 小傅哥 《Java面經手冊》

[7]. Java線程池實現原理及其在美團業務中的實踐



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM