Java線程池的幾種實現 及 常見問題講解


工作中,經常會涉及到線程。比如有些任務,經常會交與線程去異步執行。抑或服務端程序為每個請求單獨建立一個線程處理任務。線程之外的,比如我們用的數據庫連接。這些創建銷毀或者打開關閉的操作,非常影響系統性能。所以,“池”的用處就凸顯出來了。

 

1. 為什么要使用線程池

在3.6.1節介紹的實現方式中,對每個客戶都分配一個新的工作線程。當工作線程與客戶通信結束,這個線程就被銷毀。這種實現方式有以下不足之處:

  • 服務器創建和銷毀工作的開銷( 包括所花費的時間和系統資源 )很大。這一項不用解釋,可以去查下"線程創建過程"。除了機器本身所做的工作,我們還要實例化,啟動,這些都需要占用堆棧資源。
  • 除了創建和銷毀線程的開銷之外,活動的線程也消耗系統資源。 這個應該是對堆棧資源的消耗,猜測數據庫連接數設置一個合理的值,也有這個考慮。
  • 如果線程數目固定,並且每個線程都有很長的聲明周期,那么線程切換也是相對固定的。不同的操作系統有不同的切換周期,一般20ms左右。這里說的切換是在jvm以及底層操作系統的調度下,線程之間轉讓cpu的使用權。如果頻繁創建和銷毀線程,那么就將頻繁的切換線程,因為一個線程銷毀后,必然要讓出使用權給已經就緒的線程,使該線程獲得運行機會。在這種情況下,線程之間的切換就不在遵循系統的固定切換周期,切換線程的開銷甚至比創建和銷毀的開銷還要大。

相對來說,使用線程池,會預創建一些線程,它們不斷的從工作隊列中取出任務,然后執行該任務。當工作線程執行完一個任務后,就會繼續執行工作隊列中的另一個任務。優點如下:

  • 減少了創建和銷毀的次數,每個工作線程都可以一直被重用,能執行多個任務。
  • 可以根據系統的承載能力,方便的調整線程池中線程的數目,防止因為消耗過量的系統資源而導致系統崩潰。

 

2. 線程池的簡單實現

下面是自己寫的一個簡單的線程池,也是從Java網絡編程這本書上直接照着敲出來的

 

package thread;

import java.util.LinkedList;

/**
 * 線程池的實現,根據常規線程池的長度,最大長度,隊列長度,我們可以增加數目限制實現
 * @author Han
 */
public class MyThreadPool extends ThreadGroup{
    //cpu 數量 ---Runtime.getRuntime().availableProcessors();
    //是否關閉
    private boolean isClosed = false;
    //隊列
    private LinkedList<Runnable> workQueue;
    //線程池id
    private static int threadPoolID;
    private int threadID;
    public MyThreadPool(int poolSize){
        super("MyThreadPool."+threadPoolID);
        threadPoolID++;
        setDaemon(true);
        workQueue = new LinkedList<Runnable>();
        for(int i = 0;i<poolSize;i++){
            new WorkThread().start();
        }
    }
    //這里可以換成ConcurrentLinkedQueue,就可以避免使用synchronized的效率問題
    public synchronized void execute(Runnable task){
        if(isClosed){
            throw new IllegalStateException("連接池已經關閉...");
        }else{
            workQueue.add(task);
            notify();
        }
    }
    
    protected synchronized Runnable getTask() throws InterruptedException {
        while(workQueue.size() == 0){
            if(isClosed){
                return null;
            }
            wait();
        }
        return workQueue.removeFirst();
    }
    
    public synchronized void close(){
        if(!isClosed){
            isClosed = true;
            workQueue.clear();
            interrupt();
        }
    }
    
    public void join(){
        synchronized (this) {
            isClosed = true;
            notifyAll();
        }
        Thread[] threads = new Thread[activeCount()];
        int count = enumerate(threads);
        for(int i = 0;i<count;i++){
            try {
                threads[i].join();
            } catch (Exception e) {
            }
        }
    }
    
    class WorkThread extends Thread{
        public WorkThread(){
            super(MyThreadPool.this,"workThread"+(threadID++));
            System.out.println("create...");
        }
        @Override
        public void run() {
            while(!isInterrupted()){
                System.out.println("run..");
                Runnable task = null;
                try {
                    //這是一個阻塞方法
                    task = getTask();
                    
                } catch (Exception e) {
                    
                }
                if(task != null){
                    task.run();
                }else{
                    break;
                }
            }
        }
    }
}

 

 

該線程池主要定義了一個工作隊列和一些預創建的線程。只要調用execute方法,就可以向線程提交任務。

后面線程在沒有任務的時候,會阻塞在getTask(),直到有新任務進來被喚醒。

join和close都可以用來關閉線程池。不同的是,join會把隊列中的任務執行完,而close則立刻清空隊列,並且中斷所有的工作線程。close()中的interrupt()相當於調用了ThreadGroup中包含子線程的各自的interrupt(),所以有線程處於wait或者sleep時,都會拋出InterruptException

測試類如下:

public class TestMyThreadPool {
    public static void main(String[] args) throws InterruptedException {
        MyThreadPool pool = new MyThreadPool(3);
        for(int i = 0;i<10;i++){
            pool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                    }
                    System.out.println("working...");
                }
            });
        }
        pool.join();
        //pool.close();
    }
}

 

 

3. jdk類庫提供的線程池

 

java提供了很好的線程池實現,比我們自己的實現要更加健壯以及高效,同時功能也更加強大。

類圖如下:

 

關於這類線程池,前輩們已經有很好的講解。任意百度下java線程池,都有寫的非常詳細的例子和教程,這里就不再贅述。

java自帶線程池和隊列詳解

 

4. spring注入線程池

在使用spring框架的時候,如果我們用java提供的方法來創建線程池,在多線程應用中非常不方便管理,而且不符合我們使用spring的思想。(雖然spring可以通過靜態方法注入)

其實,Spring本身也提供了很好的線程池的實現。這個類叫做ThreadPoolTaskExecutor

在spring中的配置如下:

<bean id="executorService" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="${threadpool.corePoolSize}" />
        <!-- 線程池維護線程的最少數量 -->
        <property name="keepAliveSeconds" value="${threadpool.keepAliveSeconds}" />
        <!-- 線程池維護線程所允許的空閑時間 -->
        <property name="maxPoolSize" value="${threadpool.maxPoolSize}" />
        <!-- 線程池維護線程的最大數量 -->
        <property name="queueCapacity" value="${threadpool.queueCapacity}" />
        <!-- 線程池所使用的緩沖隊列 -->
    </bean>

 

5. 使用線程池的注意事項

  • 死鎖

任何多線程程序都有死鎖的風險,最簡單的情形是兩個線程AB,A持有鎖1,請求鎖2,B持有鎖2,請求鎖1。(這種情況在mysql的排他鎖也會出現,不會數據庫會直接報錯提示)。線程池中還有另一種死鎖:假設線程池中的所有工作線程都在執行各自任務時被阻塞,它們在等待某個任務A的執行結果。而任務A卻處於隊列中,由於沒有空閑線程,一直無法得以執行。這樣線程池的所有資源將一直阻塞下去,死鎖也就產生了。

  • 系統資源不足

 如果線程池中的線程數目非常多,這些線程會消耗包括內存和其他系統資源在內的大量資源,從而嚴重影響系統性能。

  • 並發錯誤

線程池的工作隊列依靠wait()和notify()方法來使工作線程及時取得任務,但這兩個方法難以使用。如果代碼錯誤,可能會丟失通知,導致工作線程一直保持空閑的狀態,無視工作隊列中需要處理的任務。因為最好使用一些比較成熟的線程池。

  • 線程泄漏

使用線程池的一個嚴重風險是線程泄漏。對於工作線程數目固定的線程池,如果工作線程在執行任務時拋出RuntimeException或Error,並且這些異常或錯誤沒有被捕獲,那么這個工作線程就異常終止,使線程池永久丟失了一個線程。(這一點太有意思)

另一種情況是,工作線程在執行一個任務時被阻塞,如果等待用戶的輸入數據,但是用戶一直不輸入數據,導致這個線程一直被阻塞。這樣的工作線程名存實亡,它實際上不執行任何任務了。如果線程池中的所有線程都處於這樣的狀態,那么線程池就無法加入新的任務了。

  • 任務過載

當工作線程隊列中有大量排隊等待執行的任務時,這些任務本身可能會消耗太多的系統資源和引起資源缺乏。

 

綜上所述,使用線程池時,要遵循以下原則:

  1. 如果任務A在執行過程中需要同步等待任務B的執行結果,那么任務A不適合加入到線程池的工作隊列中。如果把像任務A一樣的需要等待其他任務執行結果的加入到隊列中,可能造成死鎖
  2. 如果執行某個任務時可能會阻塞,並且是長時間的阻塞,則應該設定超時時間,避免工作線程永久的阻塞下去而導致線程泄漏。在服務器才程序中,當線程等待客戶連接,或者等待客戶發送的數據時,都可能造成阻塞,可以通過以下方式設置時間:
    調用ServerSocket的setSotimeout方法,設定等待客戶連接的超時時間。
    對於每個與客戶連接的socket,調用該socket的setSoTImeout方法,設定等待客戶發送數據的超時時間。
  3. 了解任務的特點,分析任務是執行經常會阻塞io操作,還是執行一直不會阻塞的運算操作。前者時斷時續的占用cpu,而后者具有更高的利用率。預計完成任務大概需要多長時間,是短時間任務還是長時間任務,然后根據任務的特點,對任務進行分類,然后把不同類型的任務加入到不同的線程池的工作隊列中,這樣就可以根據任務的特點,分配調整每個線程池
  4. 調整線程池的大小。線程池的最佳大小主要取決於系統的可用cpu的數目,以及工作隊列中任務的特點。假如一個具有N個cpu的系統上只有一個工作隊列,並且其中全部是運算性質(不會阻塞)的任務,那么當線程池擁有N或N+1個工作線程時,一般會獲得最大的cpu使用率。
    如果工作隊列中包含會執行IO操作並經常阻塞的任務,則要讓線程池的大小超過可用 cpu的數量,因為並不是所有的工作線程都一直在工作。選擇一個典型的任務,然后估計在執行這個任務的工程中,等待時間與實際占用cpu進行運算的時間的比例WT/ST。對於一個具有N個cpu的系統,需要設置大約N*(1+WT/ST)個線程來保證cpu得到充分利用。
    當然,cpu利用率不是調整線程池過程中唯一要考慮的事項,隨着線程池工作數目的增長,還會碰到內存或者其他資源的限制,如套接字,打開的文件句柄或數據庫連接數目等。要保證多線程消耗的系統資源在系統承受的范圍之內。
  5. 避免任務過載。服務器應根據系統的承載能力,限制客戶並發連接的數目。當客戶的連接超過了限制值,服務器可以拒絕連接,並進行友好提示,或者限制隊列長度.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM