基於ThreadPoolExecutor,自定義線程池簡單實現


一、線程池作用

  在上一篇隨筆中有提到多線程具有同一時刻處理多個任務的特點,即並行工作,因此多線程的用途非常廣泛,特別在性能優化上顯得尤為重要。然而,多線程處理消耗的時間包括創建線程時間T1、工作時間T2、銷毀線程時間T3,創建和銷毀線程需要消耗一定的時間和資源,如果能夠減少這部分的時間消耗,性能將會進一步提高,線程池就能夠很好解決問題。線程池在初始化時會創建一定數量的線程,當需要線程執行任務時,從線程池取出線程,當任務執行完成后,線程置回線程池成為空閑線程,等待下一次任務。JDK1.5提供了一個Executors工廠類來產生線程池,該工廠類提供5種靜態方法來創建線程池,詳細請參見:http://www.cnblogs.com/firstsheng618/p/3861097.html。

二、認識隊列

  隊列具有先進先出(FIFO)的特點,不同於堆的后進先出(LIFO),隊列(Queue)是只允許在一端進行插入,而在另一端進行刪除的運算受限的線性表。線程池就是通過隊列的方式實現任務的調用。下面介紹幾個常用的隊列:

  1、ArrayBlockingQueue:一個有數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序,隊列的頭部 是在隊列中存在時間最長的元素,隊列的尾部 是在隊列中存在時間最短的元素,新元素插入到隊列的尾部,隊列獲取操作則是從隊列頭部開始獲得元素。

  2、ConcurrentLinkedQueue:一個基於鏈接節點的無界線程安全隊列。此隊列按照 FIFO(先進先出)原則對元素進行排序,隊列的頭部 是隊列中時間最長的元素,隊列的尾部 是隊列中時間最短的元素,新的元素插入到隊列的尾部,隊列獲取操作從隊列頭部獲得元素,當多個線程共享訪問一個公共 collection 時,ConcurrentLinkedQueue 是一個恰當的選擇,此隊列不允許使用 null 元素。 

  3、LinkedBlockingQueue:一個基於已鏈接節點的、范圍任意的BlockingQueue。此隊列按 FIFO(先進先出)排序元素,隊列的頭部 是在隊列中時間最長的元素,隊列的尾部 是在隊列中時間最短的元素,新元素插入到隊列的尾部,並且隊列獲取操作會獲得位於隊列頭部的元素,鏈接隊列的吞吐量通常要高於基於數組的隊列,但是在大多數並發應用程序中,其可預知的性能要低。

  4、SynchronousQueue:一種阻塞隊列,其中每個插入操作必須等待另一個線程的對應移除操作 ,反之亦然。 

三、隊列排隊策略

  1、ThreadPoolExecutor.AbortPolicy:用於被拒絕任務的處理程序,它將拋出 RejectedExecutionException,線程池默認被拒絕任務的處理策略。

  2、ThreadPoolExecutor.CallerRunsPolicy:用於被拒絕任務的處理程序,它直接在 execute 方法的調用線程(上一層線程)中運行被拒絕的任務;如果執行程序已關閉,則會丟棄該任務。 

  3、ThreadPoolExecutor.DiscardOldestPolicy:用於被拒絕任務的處理程序,它放棄最舊的未處理請求,然后重試 execute;如果執行程序已關閉,則會丟棄該任務。

  4、ThreadPoolExecutor.DiscardPolicy:用於被拒絕任務的處理程序,默認情況下它將丟棄被拒絕的任務。

四、自定義線程

  線程池工作策略:A. 如果運行的線程少於 corePoolSize,則 Executor 始終首選添加新的線程,而不進行排隊。B. 如果運行的線程等於或多於 corePoolSize,則 Executor 始終首選將請求加入隊列,而不添加新的線程。C. 如果無法將請求加入隊列,則創建新的線程,除非創建此線程超出 maximumPoolSize,在這種情況下,任務將被拒絕。

  ArrayBlockingQueue和LinkedBlockingQueue是兩個最常用的阻塞隊列,一般情況下足以處理多線程間的生產者和消費者問題,LinkedBlockingQueue內部分別采用獨立鎖來控制數據同步,實現生產者端和消費者端並行工作,高效的處理並發數據。

  基於ThreadPoolExecutor自定義線程池如下:

public class LinkedBqThreadPool extends ThreadPoolExecutor {
    protected Logger log = Logger.getLogger(getClass());
    /**
     * 正在執行任務數量
     */
    private AtomicInteger taskNum = new AtomicInteger(0);

    /**
     * 構建線程池
     * @param corePoolSize    池中所保存的核心線程數
     * @param maximumPoolSize    池中允許的最大線程數
     * @param keepActiveTime    非核心線程空閑等待新任務的最長時間
     * @param timeunit    keepActiveTime參數的時間單位
     * @param blockingqueue    任務隊列
     */
    public LinkedBqThreadPool(int corePoolSize, int maximumPoolSize, long keepActiveTime, TimeUnit timeunit,
            BlockingQueue<Runnable> blockingqueue) {
        super(corePoolSize, maximumPoolSize, keepActiveTime, timeunit, blockingqueue);
    }
    
    /**
     * 構建線程池
     * @param corePoolSize    池中所保存的核心線程數
     * @param maximumPoolSize    池中允許的最大線程數
     * @param keepActiveTime    非核心線程空閑等待新任務的最長時間(單位:秒)
     * @param blockingqueue    任務隊列
     */
    public LinkedBqThreadPool(int corePoolSize, int maximumPoolSize, long keepActiveTime, 
            BlockingQueue<Runnable> blockingqueue) {
        this(corePoolSize, maximumPoolSize, keepActiveTime, TimeUnit.SECONDS, blockingqueue);
    }
    
    /**
     * 構建線程池
     * @param corePoolSize    池中所保存的核心線程數
     * @param maximumPoolSize    池中允許的最大線程數
     * @param keepActiveTime    非核心線程空閑等待新任務的最長時間(單位:秒)
     */
    public LinkedBqThreadPool(int corePoolSize, int maximumPoolSize, long keepActiveTime) {
        this(corePoolSize, maximumPoolSize, keepActiveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }
    
    /**
     * 構建單線程的線程池
     */
    public LinkedBqThreadPool() {
        this(1, 1, 1, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }
    
    /**
     * 任務執行,以原子方式將當前值加 1
     */
    public void execute(Runnable task) {
        taskNum.getAndIncrement();
        super.execute(task);
    }
    
    /**
     * 任務執行之后
     */
    public void afterExecute(Runnable task, Throwable throwable) {
        taskNum.decrementAndGet();
        log.debug("task : " + task.getClass().getSimpleName() 
                +  " completed,Throwable:" + throwable + ",taskNum:" + getTaskNum());
        synchronized(this) {
            notifyAll();
        }
    }
    
    /**
     * 掛起當前線程,直到所有任務執行完成
     */
    public void waitComplete() {
        try {
            synchronized(this){
                while(getTaskNum() > 0){
                    wait(500);
                }
            }
        } catch (InterruptedException e) {
            log.error(e + ", taskNum:" + getTaskNum());
        }
    }
    
    /**
     * @return    未執行的任務數
     */
    public int getTaskNum() {
        return taskNum.get();
    }

    /**
     * @param time    非核心線程空閑等待新任務的最長時間(單位:秒)
     */
    public void setKeepAliveTime(int time) {
        super.setKeepAliveTime(time, TimeUnit.SECONDS);
    }

    /**
     * @param size    池中所保存的核心線程數
     */
    public void setCorePoolSize(int size) {
        super.setCorePoolSize(size);
    }
    
    /**
     * @param size    池中允許的最大線程數
     */
    public void setMaximumPoolSize(int size) {
        super.setMaximumPoolSize(size);
    }
}
LinkedBlockingQueue隊列實現
public class ArrayBqThreadPool extends ThreadPoolExecutor {
    protected Logger log = Logger.getLogger(getClass());
    /**
     * 待執行任務數量
     */
    private AtomicInteger taskNum = new AtomicInteger(0);

    /**
     * 構建線程池
     * @param corePoolSize    池中所保存的核心線程數
     * @param maximumPoolSize    池中允許的最大線程數
     * @param keepActiveTime    非核心線程空閑等待新任務的最長時間(單位:秒)
     * @param queueCapacity    隊列容量,即等待執行任務數
     */
    public ArrayBqThreadPool(int corePoolSize, int maximumPoolSize, long keepActiveTime, int queueCapacity) {
        super(corePoolSize, maximumPoolSize, keepActiveTime, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(queueCapacity));
    }
    
    /**
     * 構建單線程的線程池
     */
    public ArrayBqThreadPool(int queueCapacity) {
        this(1, 1, 1, queueCapacity);
    }
    
    /**
     * 任務執行,以原子方式將當前值加 1
     */
    public void execute(Runnable task) {
        taskNum.getAndIncrement();
        super.execute(task);
    }
    
    /**
     * 任務執行之后
     */
    public void afterExecute(Runnable task, Throwable throwable) {
        taskNum.decrementAndGet();
        log.debug("task : " + task.getClass().getSimpleName() 
                +  " completed,Throwable:" + throwable + ",taskNum:" + getTaskNum());
        synchronized(this) {
            notifyAll();
        }
    }
    
    /**
     * 掛起當前線程,直到所有任務執行完成
     */
    public void waitComplete() {
        try {
            synchronized(this){
                while(getTaskNum() > 0){
                    wait(500);
                }
            }
        } catch (InterruptedException e) {
            log.error(e + ", taskNum:" + getTaskNum());
        }
    }
    
    /**
     * @return    待執行的任務數
     */
    public int getTaskNum() {
        return taskNum.get();
    }

    /**
     * @param time    非核心線程空閑等待新任務的最長時間(單位:秒)
     */
    public void setKeepAliveTime(int time) {
        super.setKeepAliveTime(time, TimeUnit.SECONDS);
    }

    /**
     * @param size    池中所保存的核心線程數
     */
    public void setCorePoolSize(int size) {
        super.setCorePoolSize(size);
    }
    
    /**
     * @param size    池中允許的最大線程數
     */
    public void setMaximumPoolSize(int size) {
        super.setMaximumPoolSize(size);
    }
}
ArrayBlockingQueue隊列實現

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM