在編程過程中大家都接觸過線程池吧,在JAVA中它的實現類是ThreadPoolExecutor,常見構造如下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); }
- corePoolSize: 線程池維護線程的最少數量
- maximumPoolSize:線程池維護線程的最大數量
- keepAliveTime: 線程池維護線程所允許的空閑時間
- unit: 線程池維護線程所允許的空閑時間的單位
- workQueue: 線程池所使用的緩沖隊列
- handler: 線程池對拒絕任務的處理策略
正式使用中一般都會設置一個最大緩沖隊列容量,如果線程池滿它會對繼續添加的任務線程執行指定的拒絕策略,ThreadPoolExcetor 的最后一個參數指定了拒絕策略,JDK提供了四種拒絕策略:
AbortPolicy 策略、CallerRunsPolicy策略、 DiscardOledestPolicy策略、DiscardPolicy策略。
AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工作。
CallerRunsPolicy 策略:只要線程池未關閉,該策略直接在調用者線程中,運行當前的被丟棄的任務。
DiscardOleddestPolicy策略: 該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。
DiscardPolicy策略:該策略默默的丟棄無法處理的任務,不予任何處理。
可以看到默認提供的四種策略似乎都不太友好,要么放棄要么拋異常,而直接在調用者線程中執行或許也不是你想要的,因為它破壞了線程的執行順序。
有時候我們需要保證任務添加不會失敗,並且只要被添加的任務能依次順序執行就好了,而不需要這個添加動作立即響應,即讓線程池等待池中的任務完成后再繼續添加新任務,此時JDK提供的四種策略無法滿足需求,
需要自定義拒絕策略,代碼如下:
package com.montnets.task; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; /** * 自定義阻塞型線程池 當池滿時會阻塞任務提交 * * @ClassName: BlockThreadPool * @Description: TODO * @author: wangs * @date: 2018-1-24 下午5:24:54 */ public class BlockThreadPool { private ThreadPoolExecutor pool = null; public BlockThreadPool(int poolSize) { pool = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), new CustomThreadFactory(), new CustomRejectedExecutionHandler()); } public void destory() { if (pool != null) { pool.shutdownNow(); } } private class CustomThreadFactory implements ThreadFactory { private AtomicInteger count = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); String threadName = BlockThreadPool.class.getSimpleName() + count.addAndGet(1); t.setName(threadName); return t; } } private class CustomRejectedExecutionHandler implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { // 核心改造點,由blockingqueue的offer改成put阻塞方法 executor.getQueue().put(r); } catch (InterruptedException e) { e.printStackTrace(); } } } public void execute(Runnable runnable) { this.pool.execute(runnable); } // 測試構造的線程池 public static void main(String[] args) { BlockThreadPool pool = new BlockThreadPool(3); for (int i = 1; i < 100; i++) { System.out.println("提交第" + i + "個任務!"); pool.execute(new Runnable() { @Override public void run() { try { System.out.println(Thread.currentThread().getId() + "=====開始"); TimeUnit.SECONDS.sleep(10); System.out.println(Thread.currentThread().getId() + "=====【結束】"); } catch (InterruptedException e) { e.printStackTrace(); } } }); System.out.println("【提交第" + i + "個任務成功!】"); } // 2.銷毀----此處不能銷毀,因為任務沒有提交執行完,如果銷毀線程池,任務也就無法執行了 // exec.destory(); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } } }
該類的核心實現還是ThreadPoolExecutor,只是自定義了拒絕策略CustomRejectedExecutionHandler,實現了阻塞功能。
當核心池和緩存隊列滿了之后外部再調用execute時就會阻塞住,一直等到池里某個任務完成后釋放出空閑線程以后,再將該任務添加到緩存隊列,而不會拋異常或丟棄該任務。
適用於一些定時掃描觸發任務類場景。
多情為何忘情,無心怎去用心,空有知人之智,恨無自知之明。