線程池深入(li)


  java線程池。在jdk5之后為我們提供了線程池,只需要使用API,不用去考慮線程池里特殊的處理機制。jdk5線程池分好多種,固定尺寸的線程池、可變尺寸連接池等。常用的是ThreadPoolExecutor,它的構造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

  參數說明:

  1.corePoolSize:線程池維護線程的最少數量,有可能是空閑的線程。

  2.maximunPoolSize:線程池維護線程的最大數量。

  3.keepAliveTime:線程池維護線程所允許的空閑時間。

  4.TimeUnit:程池維護線程所允許的空閑時間的單位。

  5.workQueue:線程池所使用的緩沖隊列,改緩沖隊列的長度決定了能夠緩沖的最大數量。

  6.RejectedExecutionHandler :拒絕任務的處理方式。

拒絕任務,是指當線程池里面的線程數量達到 maximumPoolSize  workQueue 隊列已滿的情況下被嘗試添加進來的任務。 ThreadPoolExecutor 里面定義了 4  handler 策略,分別是:

    1.CallerRunsPolicy :這個策略重試添加當前的任務,他會自動重復調用 execute() 方法,直到成功。

    2.AbortPolicy :對拒絕任務拋棄處理,並且拋出異常。

    3.DiscardPolicy :對拒絕任務直接無聲拋棄,沒有異常信息。

    4.DiscardOldestPolicy :對拒絕任務不拋棄,而是拋棄隊列里面等待最久的一個線程,然后把拒絕任務加到隊列。

一個任務通過 execute(Runnable)方法被添加到線程池,任務就是一個Runnable類型的對象,任務的執行方法就是 Runnable 類型對象的run()方法。當一個任務通過 execute(Runnable) 方法欲添加到線程池時,線程池采用的策略如下:

  1.如果此時線程池中的數量小於 corePoolSize ,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。

  2.如果此時線程池中的數量等於 corePoolSize ,但是緩沖隊列 workQueue 未滿,那么任務被放入緩沖隊列。

  3.如果此時線程池中的數量大於 corePoolSize ,緩沖隊列 workQueue 滿,並且線程池中的數量小於maximumPoolSize ,建新的線程來處理被添加的任務。

  4.如果此時線程池中的數量大於 corePoolSize ,緩沖隊列 workQueue 滿,並且線程池中的數量等於maximumPoolSize ,那么通過 handler 所指定的策略來處理此任務。

處理任務的優先級為:核心線程 corePoolSize 、任務隊列 workQueue 、最大線程 maximumPoolSize ,如果三者都滿了,使用handler 處理被拒絕的任務。當線程池中的線程數量大於 corePoolSize 時,如果某線程空閑時間超過keepAliveTime ,線程將被終止。這樣,線程池可以動態的調整池中的線程數。

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ThreadExecute {

    private static final int  corePoolSize         = 2; // 線程池維護線程的最少數量
    private static final int  maximumPoolSize      = 4; // 線程池維護線程的最大數量
    private static final long keepAliveTime        = 3; // 線程池維護線程所允許的空閑時間
    private static final int  PRODUCETASKMAXNUMBER = 10;

    private static void processMessageTask() {
        // 創建等待隊列
        BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(2);
        // 構造一個線程池{這個策略重試添加當前的任務,他會自動重復調用 execute() 方法,直到成功}
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime,
                                                               TimeUnit.SECONDS, bqueue,
                                                               new ThreadPoolExecutor.DiscardOldestPolicy());

        for (int i = 1; i <= PRODUCETASKMAXNUMBER; i++) {
            try {
                threadPool.execute(new MyThread());
            } catch (Exception e) {
                System.err.println("thread pool is error, content::" + e);
            }
        }
    }

    public static void main(String[] args) {
        processMessageTask();
    }
}

// 子類不能比父類拋出更多的異常
class MyThread implements Runnable {

    @Override
    public void run() {
        while (true) {
            try {
                Thread.sleep(2 * 1000);
                System.out.println(Thread.currentThread().getName() + "正在執行。。。");
            } catch (Exception e) {

            }
        }
    }
}

上面的代碼,每兩秒執行一次,並且線程會一直運行。

  總結一下,Java里面線程池的頂級接口是Executor,但是嚴格意義上講Executor並不是一個線程池,而只是一個執行線程的工具。真正的線程池接口是ExecutorService。下面這張圖完整描述了線程池的類體系結構:

  ExecutorService:真正的線程池接口。

  ScheduledExecutorService:能和Timer/TimerTask類似,解決那些需要任務重復執行的問題。

  ThreadPoolExecutor:ExecutorService的默認實現。

  ScheduledThreadPoolExecutor: 繼承ThreadPoolExecutor的ScheduledExecutorService接口實現,周期性任務調度的類實現。

要配置一個線程池是比較復雜的,尤其是對於線程池的原理不是很清楚的情況下,很有可能配置的線程池不是較優的,因此在Executors類里面提供了一些靜態工廠,生成一些常用的線程池

  newSingleThreadExecutor:創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。

  newFixedThreadPool:創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。

  newCachedThreadPool:創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。

  newScheduledThreadPool:創建一個大小無限的線程池。此線程池支持定時以及周期性執行任務的需求。

  newSingleThreadExecutor:創建一個單線程的線程池。此線程池支持定時以及周期性執行任務的需求。

當然,new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);用構造方法創建線程池。corePoolSize(線程池的基本大小):當提交一個任務到線程池時,線程池會創建一個線程來執行任務,即使其他空閑的基本線程能夠執行新任務也會創建線程,等到需要執行的任務數大於線程池基本大小時就不再創建。如果調用了線程池的prestartAllCoreThreads方法,線程池會提前創建並啟動所有基本線程。runnableTaskQueue(任務隊列):用於保存等待執行的任務的阻塞隊列。可以選擇以下幾個阻塞隊列:ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序;LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按FIFO (先進先出) 排序元素,吞吐量通常要高於ArrayBlockingQueue。靜態工廠方法Executors.newFixedThreadPool()使用了這個隊列。SynchronousQueue:一個不存儲元素的阻塞隊列。每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQueue,靜態工廠方法。Executors.newCachedThreadPool使用了這個隊列。PriorityBlockingQueue:一個具有優先級的無限阻塞隊列。maximumPoolSize(線程池最大大小):線程池允許創建的最大線程數。如果隊列滿了,並且已創建的線程數小於最大線程數,則線程池會再創建新的線程執行任務。值得注意的是如果使用了無界的任務隊列這個參數就沒什么效果。ThreadFactory:用於設置創建線程的工廠,可以通過線程工廠給每個創建出來的線程設置更有意義的名字。RejectedExecutionHandler(飽和策略):當隊列和線程池都滿了,說明線程池處於飽和狀態,那么必須采取一種策略處理提交的新任務。這個策略默認情況下是AbortPolicy,表示無法處理新任務時拋出異常。以下是JDK1.5提供的四種策略:AbortPolicy:直接拋出異常;CallerRunsPolicy:只用調用者所在線程來運行任務;DiscardOldestPolicy:丟棄隊列里最近的一個任務,並執行當前任務;DiscardPolicy:不處理,丟棄掉;當然也可以根據應用場景需要來實現RejectedExecutionHandler接口自定義策略。如記錄日志或持久化不能處理的任務;keepAliveTime(線程活動保持時間):線程池的工作線程空閑后,保持存活的時間。所以如果任務很多,並且每個任務執行的時間比較短,可以調大這個時間,提高線程的利用率;TimeUnit(線程活動保持時間的單位):可選的單位有天(DAYS),小時(HOURS),分鍾(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。

最后,說說線程池在項目中的使用。

public interface ThreadPool {

    /**
     * 線程池的初始化
     */
    public void init(int poolSize, int watermark);

    /**
     * 將一個可執行的對象推到線程池隊列中,在有空閑線程的情況下立刻執行
     */
    public boolean schedule(Runnable runnable);

    /**
     * 關閉線程池,調用此方法將放棄所有未執行都已經在線程隊列中的Action
     */
    public void close();
}

定義了接口ThreadPool,並且提供了三個方法:初始化方法、執行方法、關閉方法。

public class BlockedThreadPoolExecutor extends ThreadPoolExecutor {

    // 利用Semaphore實現的帶阻塞的ThreadPoolExecutor
    private Semaphore semaphore = null;

    public BlockedThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,

    BlockingQueue<Runnable> workQueue, int watermark){
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.semaphore = new Semaphore(watermark);

    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        semaphore.release();
        super.afterExecute(r, t);

    }

    @Override
    public void execute(Runnable command) {
        try {
            semaphore.acquire();
            super.execute(command);
        } catch (InterruptedException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }
}

BlockedThreadPoolExecutor實現了帶阻塞的ThreadPoolExecutor。

public class BlockedThreadPoolImpl implements ThreadPool {

    protected ExecutorService pool;

    @Override
    public void init(int poolSize, int watermark) {
        this.pool = new BlockedThreadPoolExecutor(poolSize, poolSize, 0L, TimeUnit.MILLISECONDS,
                                                  new LinkedBlockingQueue<Runnable>(), watermark);
    }

    @Override
    public boolean schedule(Runnable runnable) {
        boolean ret = false;
        try {
            pool.execute(runnable);
            ret = true;

        } catch (Throwable t) {
            System.out.println(t.getMessage() + t);
        }
        return ret;
    }

    @Override
    public void close() {
        // 禁止新的線程從入口進入
        pool.shutdown();
        try {
            if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
                // 取消當前正在運行的線程
                pool.shutdownNow();
                // 等待取消的線程回應
                if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
                    System.err.println("ThreadPoolImpl did not terminate!");
                }
            }
        } catch (InterruptedException ie) {
            // 如果遇到異常,重新嘗試停止線程
            pool.shutdownNow();
            // 中斷當前線程
            Thread.currentThread().interrupt();
        }
    }
}

BlockedThreadPoolImpl實現了接口ThreadPool,並對init、schedule、close方法進行了實現。

  為什么要使用線程池?在Java中,如果每當一個請求到達就創建一個新線程,開銷是相當大的。在實際使用中,每個請求創建新線程的服務器在創建和銷毀線程上花費的時間和消耗的系統資源,甚至可能要比花在實際處理實際的用戶請求的時間和資源要多的多。除了創建和銷毀線程的開銷之外,活動的線程也需要消耗系統資源。如果在一個JVM中創建太多的線程,可能會導致系統由於過度消耗內存或者“切換過度”而導致系統資源不足。為了防止資源不足,服務器應用程序需要一些辦法來限制任何給定時刻處理的請求數目,盡可能減少創建和銷毀線程的次數,特別是一些資源耗費比較大的線程的創建和銷毀,盡量利用已有對象來進行服務,這就是“池化資源”技術產生的原因。線程池主要用來解決線程生命周期開銷問題和資源不足問題,通過對多個任務重用線程,線程創建的開銷被分攤到多個任務上了,而且由於在請求到達時線程已經存在,所以消除了創建所帶來的延遲。這樣,就可以立即請求服務,使應用程序響應更快。另外,通過適當的調整線程池中的線程數據可以防止出現資源不足的情況。

  Executors類常用的靜態方法有哪些?Executors類里面提供了一些靜態工廠,生成一些常用的線程池:newSingleThreadExecutor:創建一個單線程的線程池。這個線程池只有一個線程在工作,也就是相當於單線程串行執行所有任務。如果這個唯一的線程因為異常結束,那么會有一個新的線程來替代它。此線程池保證所有任務的執行順序按照任務的提交順序執行。newFixedThreadPool:創建固定大小的線程池。每次提交一個任務就創建一個線程,直到線程達到線程池的最大大小。線程池的大小一旦達到最大值就會保持不變,如果某個線程因為執行異常而結束,那么線程池會補充一個新線程。newCachedThreadPool:創建一個可緩存的線程池。如果線程池的大小超過了處理任務所需要的線程,那么就會回收部分空閑(60秒不執行任務)的線程,當任務數增加時,此線程池又可以智能的添加新線程來處理任務。此線程池不會對線程池大小做限制,線程池大小完全依賴於操作系統(或者說JVM)能夠創建的最大線程大小。

 

//2016-07-25 01:00:24,411 ERROR taobao.hsf -HSF-Provider HSFthread pool is full.

//2016-07-258 01:00:24,485 ERROR taobao.hsf -HSF-Provider HSF thread pool is full.

//2016-07-25 01:00:24,644 ERROR taobao.hsf -HSF-Provider HSF thread pool is full.

//2016-07-25 01:00:24,889 ERROR taobao.hsf -HSF-Provider HSF thread pool is full.

  線程池滿了,該如何解決?可以看到大量的pool full,從錯誤可以看出是hsf provider線程被占滿(HSF默認線程池數量為600),可以定位出是外部調用tripwb系統的hsf服務所致。HSF會在出現pool full的同時,打印出堆棧到 /home/admin/hsf/HSF_JStack.log。這次出的問題,就能看出是調用httpclient這塊出問題了,並且給出咱們出問題的類了,Review AMapUtil.java 果然發現是調用高德地圖由於httpclient 超時出現線程的阻塞,由於代碼沒有設置timeout,線程就一直卡在這,而一直累積直到超過600,系統就崩潰了。

  當在使用線程的場景處,當不確定線程數的時候,盡量使用線程池。因此,針對受限的資源(線程,文件,數據庫鏈接等),在使用時需要加以限制,如使用線程池限制線程數,數據庫連接池限制鏈接數。當代碼中執行一個批量操作,由於每次執行的時間較長,因此每次執行時都創建一個新線程異步執行。但在高訪問下,該代碼將會導致線程數過多。合理的方式時使用一個線程池對線程加以限制,當線程池耗盡時拒絕新啟線程。

  多線程的另一處使用場景,是用來處理各種socket請求。由於Runnable接口中只提供了一個不帶返回值run方法,因此當任務需要返回值時就不能滿足需求了,於是出現了ExecutorService,這個接口繼承了Executor,對提交任務的接口進行了擴展,引入了Callable接口,該接口定義如下:

public interface Callable<V> {
    V call() throws Exception;
}

同時接口將任務執行過程進行管理,分為三個狀態,提交,shutdown,terminate。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM