為什么要用線程池?


為什么要用線程池1?

服務器應用程序中經常出現的情況是:單個任務處理的時間很短而請求的數目卻是巨大的。

構建服務器應用程序的一個過於簡單的模型應該是:每當一個請求到達就創建一個新線程,然后在新線程中為請求服務。實際上,對於原型開發這種方法工作得很好,但如果試圖部署以這種方式運行的服務器應用程序,那么這種方法的嚴重不足就很明顯。

每個請求對應一個線程(thread-per-request)方法的不足之一是:為每個請求創建一個新線程的開銷很大;為每個請求創建新線程的服務器在創建和銷毀線程上花費的時間和消耗的系統資源要比花在處理實際的用戶請求的時間和資源更多。除了創建和銷毀線程的開銷之外,活動的線程也消耗系統資源(線程的生命周期!)。在一個JVM 里創建太多的線程可能會導致系統由於過度消耗內存而用完內存或“切換過度”。為了防止資源不足,服務器應用程序需要一些辦法來限制任何給定時刻處理的請求數目。

線程池為線程生命周期開銷問題和資源不足問題提供了解決方案。通過對多個任務重用線程,線程創建的開銷被分攤到了多個任務上。其好處是,因為在請求到達時線程已經存在,所以無意中也消除了線程創建所帶來的延遲。這樣,就可以立即為請求服務,使應用程序響應更快。而且,通過適當地調整線程池中的線程數目,也就是當請求的數目超過某個閾值時,就強制其它任何新到的請求一直等待,直到獲得一個線程來處理為止,從而可以防止資源不足。

 

為什么要使用線程池2?


操作系統創建線程、切換線程狀態、終結線程都要進行CPU調度——這是一個耗費時間和系統資源的事情。

大多數實際場景中是這樣的:處理某一次請求的時間是非常短暫的,但是請求數量是巨大的。這種技術背景下,如果我們為每一個請求都單獨創建一個線程,那么物理機的所有資源基本上都被操作系統創建線程、切換線程狀態、銷毀線程這些操作所占用,用於業務請求處理的資源反而減少了。所以最理想的處理方式是,將處理請求的線程數量控制在一個范圍,既保證后續的請求不會等待太長時間,又保證物理機將足夠的資源用於請求處理本身。

另外,一些操作系統是有最大線程數量限制的。當運行的線程數量逼近這個值的時候,操作系統會變得不穩定。這也是我們要限制線程數量的原因。

----------------

線程池的替代方案

線程池遠不是服務器應用程序內使用多線程的唯一方法。如同上面所提到的,有時,為每個新任務生成一個新線程是十分明智的。然而,如果任務創建過於頻繁而任務的平均處理時間過短,那么為每個任務生成一個新線程將會導致性能問題

另一個常見的線程模型是為某一類型的任務分配一個后台線程與任務隊列。

每個任務對應一個線程方法和單個后台線程(single-background-thread)方法在某些情形下都工作得非常理想。每個任務一個線程方法在只有少量運行時間很長的任務時工作得十分好。而只要調度可預見性不是很重要,則單個后台線程方法就工作得十分好,如低優先級后台任務就是這種情況。然而,大多數服務器應用程序都是面向處理大量的短期任務或子任務,因此往往希望具有一種能夠以低開銷有效地處理這些任務的機制以及一些資源管理和定時可預見性的措施。線程池提供了這些優點。


----------------

工作隊列

就線程池的實際實現方式而言,術語“線程池”有些使人誤解,因為線程池“明顯的”實現在大多數情形下並不一定產生我們希望的結果。術語“線程池”先於 Java 平台出現,因此它可能是較少面向對象方法的產物。然而,該術語仍繼續廣泛應用着。

雖然我們可以輕易地實現一個線程池類,其中客戶機類等待一個可用線程、將任務傳遞給該線程以便執行、然后在任務完成時將線程歸還給池,但這種方法卻存在幾個潛在的負面影響。例如在池為空時,會發生什么呢?試圖向池線程傳遞任務的調用者都會發現池為空,在調用者等待一個可用的池線程時,它的線程將阻塞。我們之所以要使用后台線程的原因之一常常是為了防止正在提交的線程被阻塞。完全堵住調用者,如在線程池的“明顯的”實現的情況,可以杜絕我們試圖解決的問題的發生。

我們通常想要的是同一組固定的工作線程相結合的工作隊列,它使用 wait() 和 notify() 來通知等待線程新的工作已經到達了。該工作隊列通常被實現成具有相關監視器對象的某種鏈表。清單 1 顯示了簡單的合用工作隊列的示例。盡管 Thread API 沒有對使用 Runnable 接口強加特殊要求,但使用 Runnable 對象隊列的這種模式是調度程序和工作隊列的公共約定。

 

----------------

使用線程池的風險

雖然線程池是構建多線程應用程序的強大機制,但使用它並不是沒有風險的。用線程池構建的應用程序容易遭受任何其它多線程應用程序容易遭受的所有並發風險,諸如同步錯誤和死鎖,它還容易遭受特定於線程池的少數其它風險,諸如與池有關的死鎖、資源不足,並發錯誤,線程泄漏,請求過載。

 

----------------

有效使用線程池的准則

只要您遵循幾條簡單的准則,線程池可以成為構建服務器應用程序的極其有效的方法:

(1)不要對那些同步等待其它任務結果的任務排隊。這可能會導致上面所描述的那種形式的死鎖,在那種死鎖中,所有線程都被一些任務所占用,這些任務依次等待排隊任務的結果,而這些任務又無法執行,因為所有的線程都很忙。

(2)在為時間可能很長的操作使用合用的線程時要小心。如果程序必須等待諸如 I/O 完成這樣的某個資源,那么請指定最長的等待時間,以及隨后是失效還是將任務重新排隊以便稍后執行。這樣做保證了:通過將某個線程釋放給某個可能成功完成的任務,從而將最終取得 某些 進展。

理解任務。要有效地調整線程池大小,您需要理解正在排隊的任務以及它們正在做什么。它們是 CPU 限制的(CPU-bound)嗎?它們是 I/O 限制的(I/O-bound)嗎?您的答案將影響您如何調整應用程序。如果您有不同的任務類,這些類有着截然不同的特征,那么為不同任務類設置多個工作隊 列可能會有意義,這樣可以相應地調整每個池。

 

----------------

調整池的大小

調整線程池的大小基本上就是避免兩類錯誤:線程太少或線程太多。幸運的是,對於大多數應用程序來說,太多和太少之間的余地相當寬。

線程池的最佳大小取決於可用處理器的數目以及工作隊列中的任務的性質。若在一個具有 N 個處理器的系統上只有一個工作隊列,其中全部是計算性質的任務,在線程池具有 N 或 N+1 個線程時一般會獲得最大的 CPU 利用率。

處理器利用率不是調整線程池大小過程中的唯一考慮事項。隨着線程池的增長,您可能會碰到調度程序、可用內存方面的限制,或者其它系統資源方面的限制,例如套接字、打開的文件句柄或數據庫連接等的數目。

 

----------------

無須編寫您自己的池

Doug Lea 編寫了一個優秀的並發實用程序開放源碼庫 util.concurrent ,它包括互斥、信號量、諸如在並發訪問下執行得很好的隊列和散列表之類集合類以及幾個工作隊列實現。該包中的 PooledExecutor 類是一種有效的、廣泛使用的以工作隊列為基礎的線程池的正確實現。

參考:http://blog.csdn.net/ichsonx/article/details/6265071 java 線程池 較詳細文摘
參考:http://www.ibm.com/developerworks/cn/java/l-threadPool/ 線程池的介紹及簡單實現

Java語言為我們提供了兩種基礎線程池的選擇:ScheduledThreadPoolExecutor 和 ThreadPoolExecutor。它們都實現了ExecutorService接口(注意,ExecutorService接口本身和“線程池”並沒有直接關系,它的定義更接近“執行器”,而“使用線程管理的方式進行實現”只是其中的一種實現方式)。這篇文章中,我們主要圍繞ThreadPoolExecutor類進行講解。

 

----------------

線程池的創建

我們可以通過java.util.concurrent.ThreadPoolExecutor來創建一個線程池。

常用構造方法為:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler)

  1. corePoolSize: 線程池維護線程的最少數量
  2. maximumPoolSize:線程池維護線程的最大數量
  3. keepAliveTime: 線程池維護線程所允許的空閑時間
  4. unit: 線程池維護線程所允許的空閑時間的單位
  5. workQueue: 線程池所使用的緩沖隊列
  6. handler: 線程池對拒絕任務的處理策略

 

處理過程
當一個任務通過execute(Runnable)方法欲添加到線程池時:

  1. 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
  2. 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
  3. 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
  4. 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。

處理任務的優先級為
核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。

當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。

unit可選的參數為java.util.concurrent.TimeUnit中的幾個靜態屬性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

handler有四個選擇

  1. ThreadPoolExecutor.AbortPolicy() :拋出java.util.concurrent.RejectedExecutionException異常
  2. ThreadPoolExecutor.CallerRunsPolicy() : 重試添加當前的任務,他會自動重復調用execute()方法
  3. ThreadPoolExecutor.DiscardOldestPolicy() : 拋棄舊的任務
  4. ThreadPoolExecutor.DiscardPolicy() : 拋棄當前的任務

 舉例:

import java.io.Serializable;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestThreadPool2{

    public static void main(String[] args){

        // 構造一個線程池(池中最少有2個線程,最多4個線程,池中的線程允許空閑3秒,線程池所使用的緩沖隊列ArrayBlockingQueue且容量為3,線程池對拒絕任務的處理策略為:拋棄舊的任務)
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        for (int i = 1; i <= 10; i++){
            try{
                // 產生一個任務,並將其加入到線程池
                String task = "task@ " + i;
                System.out.println("put " + task);
                threadPool.execute(new ThreadPoolTask(task));

                // 便於觀察,等待一段時間
                Thread.sleep(2);

            } catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

/**
 * 線程池執行的任務
 */
class ThreadPoolTask implements Runnable, Serializable{

    private static final long serialVersionUID = 0;

    // 保存任務所需要的數據
    private Object threadPoolTaskData;

    ThreadPoolTask(Object tasks){
        this.threadPoolTaskData = tasks;
    }

    public void run(){

        // 處理一個任務,這里的處理方式太簡單了,僅僅是一個打印語句
        System.out.println(Thread.currentThread().getName());
        System.out.println("start .." + threadPoolTaskData);

        try{
            // //便於觀察,等待一段時間
            Thread.sleep(1000);

        }catch (Exception e){
            e.printStackTrace();
        }
        threadPoolTaskData = null;
    }

    public Object getTask(){
        return this.threadPoolTaskData;
    }
}
View Code

 

說明:

  1. 在這段程序中,一個任務就是一個Runnable類型的對象,也就是一個ThreadPoolTask類型的對象。
  2. 一般來說任務除了處理方式外,還需要處理的數據,處理的數據通過構造方法傳給任務。
  3. 在這段程序中,main()方法相當於一個殘忍的領導,他派發出許多任務,丟給一個叫 threadPool的任勞任怨的小組來做。

這個小組里面隊員至少有兩個,如果他們兩個忙不過來,任務就被放到任務列表里面。如果積壓的任務過多,多到任務列表都裝不下(超過3個)的時候,就雇佣新的隊員來幫忙。但是基於成本的考慮,不能雇佣太多的隊員,至多只能雇佣 4個。如果四個隊員都在忙時,再有新的任務,這個小組就處理不了了,任務就會被通過一種策略來處理,我們的處理方式是不停的派發,直到接受這個任務為止(更殘忍!呵呵)。因為隊員工作是需要成本的,如果工作很閑,閑到 3SECONDS都沒有新的任務了,那么有的隊員就會被解雇了,但是,為了小組的正常運轉,即使工作再閑,小組的隊員也不能少於兩個。

  4、通過調整 produceTaskSleepTime和 consumeTaskSleepTime的大小來實現對派發任務和處理任務的速度的控制,改變這兩個值就可以觀察不同速率下程序的工作情況。
  5、通過調整4中所指的數據,再加上調整任務丟棄策略,換上其他三種策略,就可以看出不同策略下的不同處理方式。
  6、對於其他的使用方法,參看jdk的幫助,很容易理解和使用。

 

再舉一個例子:

import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadPoolExecutorTest {

    private static int queueDeep = 4;

    public void createThreadPool(){

        /* 
         * 創建線程池,最小線程數為2,最大線程數為4,線程池維護線程的空閑時間為3秒, 
         * 使用隊列深度為4的有界隊列,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除, 
         * 然后重試執行程序(如果再次失敗,則重復此過程),里面已經根據隊列深度對任務加載進行了控制。 
         */ 
        ThreadPoolExecutor tpe = new ThreadPoolExecutor(2, 4, 3, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueDeep),
                new ThreadPoolExecutor.DiscardOldestPolicy());

        // 向線程池中添加 10 個任務
        for (int i = 0; i < 10; i++){
            try{
                Thread.sleep(1);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
            while (getQueueSize(tpe.getQueue()) >= queueDeep){

                System.out.println("隊列已滿,等3秒再添加任務");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

            }
            TaskThreadPool ttp = new TaskThreadPool(i);
            System.out.println("put i:" + i);
            tpe.execute(ttp);
        }

        tpe.shutdown();
    }

    private synchronized int getQueueSize(Queue queue) {
        return queue.size();
    }

    public static void main(String[] args) {
        ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
        test.createThreadPool();
    }

    class TaskThreadPool implements Runnable {

        private int index;

        public TaskThreadPool(int index) {
            this.index = index;
        }

        public void run() {
            System.out.println(Thread.currentThread() + " index:" + index);
            try{
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

}
View Code

 

spring線程池ThreadPoolExecutor配置並且得到任務執行的結果

Java並發包:ExecutorService和ThreadPoolExecutor

Java並發包:Java Fork and Join using ForkJoinPool

Java並發包:Exchanger和Semaphore

Java並發包:Lock和ReadWriteLock

Java並發包:ScheduledExecutorService(一種安排任務執行的ExecutorService)

----------------

合理的配置線程池

要想合理的配置線程池,就必須首先分析任務特性,可以從以下幾個角度來進行分析:

  1. 任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
  2. 任務的優先級:高,中和低。
  3. 任務的執行時間:長,中和短。
  4. 任務的依賴性:是否依賴其他系統資源,如數據庫連接。

任務性質不同的任務可以用不同規模的線程池分開處理。

  1. CPU密集型任務配置盡可能小的線程:如配置(N個cpu+1)個線程的線程池。
  2. IO密集型任務則由於線程並不是一直在執行任務,則配置盡可能多的線程,如(2*Ncpu)。
  3. 混合型的任務,如果可以拆分,則將其拆分成一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那么分解后執行的吞吐率要高於串行執行的吞吐率,如果這兩個任務執行時間相差太大,則沒必要進行分解。

  我們可以通過Runtime.getRuntime().availableProcessors()方法獲得當前設備的CPU個數。

優先級不同的任務可以使用優先級隊列PriorityBlockingQueue來處理。

它可以讓優先級高的任務先得到執行,需要注意的是如果一直有優先級高的任務提交到隊列里,那么優先級低的任務可能永遠不能執行。

執行時間不同的任務可以交給不同規模的線程池來處理,或者也可以使用優先級隊列,讓執行時間短的任務先執行。

依賴數據庫連接池的任務,因為線程提交SQL后需要等待數據庫返回結果,如果等待的時間越長CPU空閑時間就越長,那么線程數應該設置越大,這樣才能更好的利用CPU。

建議使用有界隊列,有界隊列能增加系統的穩定性和預警能力,可以根據需要設大一點,比如幾千。

有一次我們組使用的后台任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,通過排查發現是數據庫出現了問題,導致執行SQL變得非常緩慢,因為后台任務線程池里的任務全是需要向數據庫查詢和插入數據的,所以導致線程池里的工作線程全部阻塞住,任務積壓在線程池里。

如果當時我們設置成無界隊列,線程池的隊列就會越來越多,有可能會撐滿內存,導致整個系統不可用,而不只是后台任務出現問題。

當然我們的系統所有的任務是用的單獨的服務器部署的,而我們使用不同規模的線程池跑不同類型的任務,但是出現這樣問題時也會影響到其他任務。

 

Java線程池

Java編程中“為了性能”盡量要做到的一些地方

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM