Java並發(11)- 有關線程池的10個問題


引言

在日常開發中,線程池是使用非常頻繁的一種技術,無論是服務端多線程接收用戶請求,還是客戶端多線程處理數據,都會用到線程池技術,那么全面的了解線程池的使用、背后的實現原理以及合理的優化線程池的大小等都是非常有必要的。這篇文章會通過對一系列的問題的解答來講解線程池的基本功能以及背后的原理,希望能對大家有所幫助。

  • 舉個例子來說明為什么要使用線程池,有什么好處?
  • jdk1.8中提供了哪幾種基本的線程池?
  • 線程池幾大組件的關系?
  • ExecutorService的生命周期?
  • 線程池中的線程能設置超時嗎?
  • 怎么取消線程池中的線程?
  • 如何設置一個合適的線程池大小?
  • 當使用有界隊列時,如何設置一個合適的隊列大小?
  • 當使用有界隊列時,如果隊列已滿,如何選擇合適的拒絕策略?
  • 如何統計線程池中的線程執行時間?

舉個例子來說明為什么要使用線程池,有什么好處?

先來看這樣一個場景,服務端在一個線程內通過監聽8888端口來接收多個客戶端的消息。為了避免阻塞主線程,每收到一個消息,就開啟一個新的線程來處理,這樣主線程就可以不停的接收新的消息。不使用線程池時代碼的簡單實現如下:

public static void main(String[] args) throws IOException {
    ServerSocket serverSocket = new ServerSocket(8888);

    while (true) {
        try {
            Socket socket = serverSocket.accept();

            new Thread(() -> {
                try {
                    InputStream inputStream = socket.getInputStream();
                    //do something
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }).start();

        } catch (IOException e) {
        }
    }
}

通過每次new一個新的線程的方式,不會阻塞主線程,提高了服務端接收消息的能力。但是存在幾個非常明顯的問題:

  • 不停初始化線程的內存消耗,任何時候資源都是有限的,無限制的新建線程會占用大量的內存空間。
  • 在CPU資源有限的情況下,新建更多的線程不僅不能達到並發處理客戶端消息的目的,相反由於線程間的切換更加頻繁,會導致處理時間更長,效率更加低下。
  • 線程本身的創建與銷毀都需要耗費服務器資源。
  • 不方便對線程進行集中管理。
    而這些問題都是可以通過使用線程池得倒解決的。

jdk1.8中提供了哪幾種基本的線程池以及它們的使用場景?

  • newFixedThreadPool,固定線程數的線程池。它的核心線程數(corePoolSize)和最大線程數(maximumPoolSize)是相等的。同時它使用一個無界的阻塞隊列LinkedBlockingQueue來存儲額外的任務,也就是說當達到nThreads的線程數在運行之后,所有的后續線程都會進入LinkedBlockingQueue中,不會再創建新的線程。

    使用場景:因為線程數固定,一般適用於可預測的並行任務執行環境。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}
  • newCachedThreadPool,可緩存線程的線程池。默認核心線程數(corePoolSize)為0,最大線程數(maximumPoolSize)為Integer.MAX_VALUE,它還有一個過期時間為60秒,當線程閑置超過60秒之后會被回收。內部使用SynchronousQueue作為阻塞隊列。

    使用場景:由於SynchronousQueue無容量的特性,導致了newCachedThreadPool不適合做長時間的任務。因為如果單個任務執行時間過長,每當無空閑線程時,會導致開啟新線程,而線程數量可以達到Integer.MAX_VALUE,存儲隊列又不能緩存任務,很容易導致OOM的問題。所以他的使用場景一般在大量短時間任務的執行上。

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
  • newSingleThreadExecutor,單線程線程池。默認核心線程數(corePoolSize)和最大線程數(maximumPoolSize)都為1,使用無界阻塞隊列LinkedBlockingQueue。

    使用場景:由於只能有一個線程在執行,而且其他任務都會排隊,適用於單線程串行執行有序任務的環境。

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  • newScheduledThreadPool與newSingleThreadScheduledExecutor,執行延時或者周期性任務的線程池,使用了一個內部實現的DelayedWorkQueue阻塞隊列。可以看到它的返回結果是ScheduledExecutorService,它擴展了ExecutorService接口,提供了用於延時和周期執行任務的方法。

    使用場景:用於延時啟動任務,或需要周期性執行的任務。

    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
  • newWorkStealingPool,它是jdk1.8提供的一種線程池,用於執行並行任務。默認並行級別為當前可用最大可用cpu數量的線程。

    使用場景:用於大耗時同時可以分段並行的任務。

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

線程池幾大組件的關系?

線程池簡單來說可以分為四大組件:Executor、ExecutorService、Executors以及ThreadPoolExecutor。

  • Executor接口定義了一個以Runnable為參數的execute方法。這也是對線程池框架的一個抽象,它將線程池能做什么和具體怎么做拆分開來,也可以看做是一個生產者和消費者模式,Executor負責生產任務,具體的線程池負責消費任務,讓使用者能夠更加靈活的切換線程池具體策略,它也是線程池多樣性的基礎。
public interface Executor {
    void execute(Runnable command);
}
那么在ThreadPoolExecutor中,是怎么實現execute方法的呢?來看下ThreadPoolExecutor中execute方法的源碼,里面的注釋實在太詳細了,簡直時良好注釋的典范。這里只做個簡單總結:首先當工作線程小於核心線程數時會嘗試添加worker到隊列中去運行,如果核心線程不夠用會將任務加入隊列中,如果入隊也不成功,會采取拒絕策略。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
        * Proceed in 3 steps:
        *
        * 1. If fewer than corePoolSize threads are running, try to
        * start a new thread with the given command as its first
        * task.  The call to addWorker atomically checks runState and
        * workerCount, and so prevents false alarms that would add
        * threads when it shouldn't, by returning false.
        *
        * 2. If a task can be successfully queued, then we still need
        * to double-check whether we should have added a thread
        * (because existing ones died since last checking) or that
        * the pool shut down since entry into this method. So we
        * recheck state and if necessary roll back the enqueuing if
        * stopped, or start a new thread if there are none.
        *
        * 3. If we cannot queue task, then we try to add a new
        * thread.  If it fails, we know we are shut down or saturated
        * and so reject the task.
        */
    //ctl通過位運算同時標記了線程數量以及線程狀態
    int c = ctl.get();
    //workerCountOf方法用來統計當前運行的線程數量
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
  • ExecutorService接口繼承自Executor接口,提供了更加完善的線程池控制功能。並將線程池的狀態分為運行中,關閉,終止三種。同時提供了帶返回值的提交,方便更好的控制提交的任務。
public interface ExecutorService extends Executor {
    //關閉線程池,關閉狀態
    void shutdown();
    //立即關閉線程池,關閉狀態
    List<Runnable> shutdownNow();
    
    boolean isShutdown();
    
    boolean isTerminated();
    //提交一個Callable類型的任務,帶Future返回值
    <T> Future<T> submit(Callable<T> task);
    //提交一個Runnable類型的任務,帶Future返回值
    Future<?> submit(Runnable task);
    //一段時間后終止線程池,終止狀態
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    ......
}
還是通過ThreadPoolExecutor來說明,ThreadPoolExecutor中將線程池狀態進行了擴展,定義了5種狀態,這5種狀態通過Integer.SIZE的高3位來表示。代碼如下:
* The runState provides the main lifecycle control, taking on values:
*   能夠接受新任務也能處理隊列中的任務
*   RUNNING:  Accept new tasks and process queued tasks
*   不能接受新任務,但能處理隊列中的任務
*   SHUTDOWN: Don't accept new tasks, but process queued tasks
    不能接受新任務,也不能處理隊列中的任務,同時會中斷正在執行的任務
*   STOP:     Don't accept new tasks, don't process queued tasks,
*             and interrupt in-progress tasks
    所有的任務都被終止,工作線程為0
*   TIDYING:  All tasks have terminated, workerCount is zero,
*             the thread transitioning to state TIDYING
*             will run the terminated() hook method
    terminated方法執行完成
*   TERMINATED: terminated() has completed
private static final int COUNT_BITS = Integer.SIZE - 3;

private static final int RUNNING    = -1 << COUNT_BITS;//101
private static final int SHUTDOWN   =  0 << COUNT_BITS;//000
private static final int STOP       =  1 << COUNT_BITS;//001
private static final int TIDYING    =  2 << COUNT_BITS;//010
private static final int TERMINATED =  3 << COUNT_BITS;//011
再來看看通過ExecutorService接口對這5種狀態的轉換:
public interface ExecutorService extends Executor {
    //關閉線程池,線程池狀態會從RUNNING變為SHUTDOWN
    void shutdown();
    //立即關閉線程池RUNNING或者SHUTDOWN到STOP
    List<Runnable> shutdownNow();
    //STOP、TIDYING以及TERMINATED都返回true
    boolean isShutdown();
    //TERMINATED狀態返回true
    boolean isTerminated();
    //一段時間后終止線程池,TERMINATED
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    ......
}
  • Executors提供了一系列獲取線程池的靜態方法,相當於線程池工廠,是對ThreadPoolExecutor的一個封裝,簡化了用戶切換Executor和ExecutorService的各種實現細節。

  • ThreadPoolExecutor是對Executor以及ExecutorService的實現,提供了具體的線程池實現。

ExecutorService的生命周期?

這個問題在上面已經做了解說,ExecutorService的生命周期通過接口定義可以分為運行中,關閉,終止三種狀態。

ThreadPoolExecutor在具體實現上提供了更加詳細的五種狀態:RUNNING、SHUTDOWN、STOP、TIDYING以及TERMINATED。各種狀態的說明以及轉換可以看上一個問題的答案。

線程池中的線程能設置超時嗎?

線程池中的線程是可以進行超時控制的,通過ExecutorService的submit來提交任務,這樣會返回一個Future類型的結果,來看看Future接口的代碼:

public interface Future<V> {
    
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();
    //獲取返回結果,並在出現錯誤或者中斷時throws Exception
    V get() throws InterruptedException, ExecutionException;
    //timeout時間內獲取返回結果,並在出現錯誤、中斷以及超時時throws Exception
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Future定義了get()以及get(long timeout, TimeUnit unit)方法,get()方法會阻塞當前調用,一直到獲取到返回結果,get(long timeout, TimeUnit unit)會在指定時間內阻塞,當超時后會拋出TimeoutException錯誤。這樣就可以達到線程超時控制的目的。簡單使用示例如下:

Future<String> future = executor.submit(callable);
try {
    future.get(2000, TimeUnit.SECONDS);
} catch (InterruptedException e1) {
    //中斷后處理
} catch (ExecutionException e1) {
    //拋出異常處理
} catch (TimeoutException e1) {
    //超時處理
}

這里有一個問題就是因為get方法是阻塞的---通過LockSupport.park實現,那么線程池中線程比較多的情況下要怎么獲取每個線程的超時呢?這里除了自定義線程池實現或者自定義線程工廠來實現之外,使用ThreadPoolExecutor本身的功能我也沒想到更好的辦法。有一個非常笨的解決方案是開啟同線程池數量相等的線程進行監聽。大家如果有更好的辦法可以留言提出。

怎么取消線程池中的線程?

這個問題和上面的問題解決方案一樣,同樣也是通過ExecutorService的submit來提交任務,獲取Future,調用Future中的cancel方法來達到目的。

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
}

cancel方法有一個mayInterruptIfRunning參數,當為true時,代表任務能接受並處理中斷,調用了interrupt方法。如果為false,代表如果任務沒啟動就不要運行它,不會調用interrupt方法。

取消的本質實際上還是通過interrupt來實現的,這就是說,如果線程本身不能響應中斷,就算調用了cancel方法也沒用。一般情況下通過lockInterruptibly、park和await方法阻塞的線程都是能響應中斷的,運行中的線程就需要開發者自己實現中斷了。

如何設置一個合適的線程池大小?

如何設置一個合適的線程池大小,這個問題我覺得是沒有一個固定公式的。或者可以說,只有一些簡單的設置規則,但放到具體業務中,又各有不同,只能根據現場環境測試過后再來分析。

設置合適的線程池大小分為兩部分,一部分是最大線程池大小,一部分是最小線程池大小。在ThreadPoolExecutor中體現在最大線程數(maximumPoolSize)和核心線程數(corePoolSize)。

最大線程池大小的設置首先跟當前機器cpu核心數密切相關,一般情況來說要想最大化利用cpu,設置為cpu核心數就可以了,比如4核cpu服務器可以設置為4。但實際情況又大有不同,因為往往我們執行的任務都會涉及到IO,比如任務中執行了一個從數據庫查詢數據的操作,那么這段時間cpu實際上是沒有最大化利用的,這樣我們就可以適當擴大maximumPoolSize的大小。在有些情況下任務會是cpu密集型的,如果這樣設置更多的線程不僅不會提高效率,反而因為線程的創建銷毀以及切換開銷而大大降低了效率,所以說最大線程池的大小需要根據業務情況具體測試后才能設置一個合適的大小。

最小線程池大小相比較最大線程池大小設置起來相對容易一些,因為最小線程一般來說是可以根據業務情況來預估進行設置,比如大多數情況下會有2個任務在運行,很小概率會有超過2個任務運行,那么直接設置最小線程池大小為2就可以。但有一點需要知道的是每間隔多長時間會有超過2個任務,如果每2分鍾會有一次超過2個任務的情況,那么我們可以將線程過期時間設置的稍微久一點,比如4分鍾,這樣就算頻繁的超過2個任務,也可以利用緩存的線程池。

總的來說設置最大和最小線程池都是一個沒有固定公式的問題,都需要考慮實際業務情況和機器配置,根據實際業務情況多做測試才能做到最優化設置。在一切沒有決定之前,可以使用軟件架構的KISS原則,設置最大以及最小線程數都為cpu核心數即可,后續在做優化。

當使用有界隊列時,如何設置一個合適的隊列大小?

要設置合適的隊列大小,先要明白隊列什么時候會被使用。在ThreadPoolExecutor的實現中,使用隊列的情況有點特殊。它會先使用核心線程池大小的線程,之后會將任務加入隊列中,再之后隊列滿了之后才會擴大到最大線程池大小的線程。也就是說隊列的使用並不是等待線程不夠用了才使用,而是等待核心線程不夠用了就使用。我不是太能理解這樣設計的意圖,按《Java性能權威權威指南》一書中的說法是這樣提供了兩個節流閥,第一個是隊列,第二個是最大線程池。但這樣做並不能給使用者最優的體驗,既然要使用最大線程池,那為什么不在第一次就使用呢?

知道了ThreadPoolExecutor使用線程池的時機,那么再來預估合適的隊列大小就很方便了。如果單個任務執行時間在100ms,最小線程數是2,使用者能忍受的最大延時在2s,那么我們可以這樣簡單推算出隊列大小:2/2s/100ms=10,這樣滿隊列時最大延時就在2s之內。當然還有其他一些影響因素,比如部分任務超過或者小於100ms,最大線程池的利用等等,可以在這基礎上做簡單調整。

當使用有界隊列時,如果隊列已滿,如何選擇合適的拒絕策略?

ThreadPoolExecutor中提供了四種RejectedExecutionHandler,每種分工都比較明確,選擇起來並不困難。它們分別是:AbortPolicy、DiscardPolicy、DiscardOldestPolicy以及CallerRunsPolicy。下面貼出了他們的源碼並做了簡單說明,使用的時候可以根據需要自行選擇。

//AbortPolicy
//默認的拒絕策略,直接拋出RejectedExecutionException異常供調用者做后續處理
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    throw new RejectedExecutionException("Task " + r.toString() +
                                            " rejected from " +
                                            e.toString());
}

//DiscardPolicy
//不做任何處理,將任務直接拋棄掉
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}

//DiscardOldestPolicy
//拋棄隊列中的下一個任務,然后嘗試做提交。這個使用我覺得應該是在知道當前要提交的任務比較重要,必須要被執行的場景
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        e.getQueue().poll();
        e.execute(r);
    }
}

//CallerRunsPolicy
//直接使用調用者線程執行,相當於同步執行,會阻塞調用者線程,不太友好感覺。
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    if (!e.isShutdown()) {
        r.run();
    }
}

如何統計線程池中的線程執行時間?

要統計線程池中的線程執行時間,就需要了解線程池中的線程是在什么地方,什么時機執行的?知道了線程的執行狀態,然后在線程執行前后添加自己的處理就可以了,所以先來找到ThreadPoolExecutor中線程具體執行的代碼:

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task); //執行task.run()的前置方法
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);//執行task.run()的后置方法
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

可以看到runWorker方法中在task.run()也就是任務執行前后分別執行了beforeExecute以及afterExecute方法,着兩個方法在ThreadPoolExecutor的繼承類中都是可重寫的,提供了極大的靈活性。我們可以在繼承ThreadPoolExecutor之后在任務執行前后做任何自己需要做的事情,當然也就包括任務執行的時間統計了。

順便說一句,熟悉spring源碼的同學看到這里是不是發現和spring中的postprocesser前后置處理器有異曲同工之妙?區別在於一個是通過繼承來覆蓋,一個是通過接口來實現。

總結

其實線程池框架涉及到的問題遠不止這些,包括ThreadFactory、ForkJoinPool等等還有很多值得花時間研究的地方。本文也只是閱讀jdk源碼、《Java並發編程實戰》以及《Java性能優化權威指南》之后的一點點總結,如有錯誤遺漏的地方,希望大家能多多指出。

參考資料:

  • 《Java並發編程實戰》
  • 《Java性能優化權威指南》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM