初學者很容易看錯,如果沒有看到spring或者JUC源碼的人肯定是不太了解的。
ThreadPoolTaskExecutor是spring core包中的,而ThreadPoolExecutor是JDK中的JUC。ThreadPoolTaskExecutor是對ThreadPoolExecutor進行了封裝處理。
自己在之前寫多線程代碼的時候都是這么玩的executor=Executors.newCachedThreadPool();但是有一次在大量數據的時候由於入庫速度遠大於出庫速度導致內存急劇膨脹最后悲劇了重寫代碼,原來spring 早就給我們做好封裝了。
來看一下ThreadPoolExecutor結構,祖類都是調用Executor接口:
再來看一下ThreadPoolTaskExecutor結構,祖類都是調用Executor接口:
再來看一下源碼:
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements SchedulingTaskExecutor { private final Object poolSizeMonitor = new Object(); private int corePoolSize = 1; private int maxPoolSize = 2147483647; private int keepAliveSeconds = 60; private boolean allowCoreThreadTimeOut = false; private int queueCapacity = 2147483647; private ThreadPoolExecutor threadPoolExecutor; //這里就用到了ThreadPoolExecutor
}
//下面是設置完配置需要調用initialize方法初始化
@Override protected ExecutorService initializeExecutor( ThreadFactory threadFactory, RejectedExecutionHandler rejectedExecutionHandler) { BlockingQueue<Runnable> queue = createQueue(this.queueCapacity); ThreadPoolExecutor executor = new ThreadPoolExecutor( this.corePoolSize, this.maxPoolSize, this.keepAliveSeconds, TimeUnit.SECONDS, queue, threadFactory, rejectedExecutionHandler); if (this.allowCoreThreadTimeOut) { executor.allowCoreThreadTimeOut(true); } this.threadPoolExecutor = executor; return executor; }
這是ThreadPoolTaskExecutor用來初始化threadPoolExecutor的方法,BlockingQueue是一個阻塞隊列,這個我們先不管。由於ThreadPoolTaskExecutor的實現方式完全是使用threadPoolExecutor進行實現,我們需要知道這個threadPoolExecutor的一些參數。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
int corePoolSize:線程池維護線程的最小數量.
int maximumPoolSize:線程池維護線程的最大數量.
long keepAliveTime:空閑線程的存活時間.
TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒枚舉值.
BlockingQueue<Runnable> workQueue:持有等待執行的任務隊列.
RejectedExecutionHandler handler:
用來拒絕一個任務的執行,有兩種情況會發生這種情況。
一是在execute方法中若addIfUnderMaximumPoolSize(command)為false,即線程池已經飽和;
二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,就調用ensureQueuedTaskHandled(Runnable command),在該方法中有可能調用reject。
ThreadPoolExecutor池子的處理流程如下:
1)當池子大小小於corePoolSize就新建線程,並處理請求
2)當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去從workQueue中取任務並處理
3)當workQueue放不下新入的任務時,新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理
4)另外,當池子的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀
其會優先創建 CorePoolSiz 線程, 當繼續增加線程時,先放入Queue中,當 CorePoolSiz 和 Queue 都滿的時候,就增加創建新線程,當線程達到MaxPoolSize的時候,就會拋出錯 誤 org.springframework.core.task.TaskRejectedException
另外MaxPoolSize的設定如果比系統支持的線程數還要大時,會拋出java.lang.OutOfMemoryError: unable to create new native thread 異常。
<!-- 異步線程池 --> <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心線程數,默認為1 --> <property name="corePoolSize" value="3" /> <!-- 最大線程數,默認為Integer.Max_value --> <property name="maxPoolSize" value="10" /> <!-- 隊列最大長度 >=mainExecutor.maxSize --> <property name="queueCapacity" value="25" /> <!-- 線程池維護線程所允許的空閑時間 --> <property name="keepAliveSeconds" value="300" /> <!-- 線程池對拒絕任務(無線程可用)的處理策略 ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄. --> <property name="rejectedExecutionHandler"> <!-- AbortPolicy:直接拋出java.util.concurrent.RejectedExecutionException異常 --> <!-- CallerRunsPolicy:若已達到待處理隊列長度,將由主線程直接處理請求 --> <!-- DiscardOldestPolicy:拋棄舊的任務;會導致被丟棄的任務無法再次被執行 --> <!-- DiscardPolicy:拋棄當前任務;會導致被丟棄的任務無法再次被執行 --> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean>
Reject策略預定義有四種:
(1)ThreadPoolExecutor.AbortPolicy策略,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程).
關於callable回調方法(因為為隊列阻塞,如果到取值某個執行的值會等待執行完成)
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setCorePoolSize(5); threadPoolTaskExecutor.setMaxPoolSize(50); threadPoolTaskExecutor.initialize(); List<String> paymentSeqNoList = new ArrayList<>(); for (int i = 0; i < 100; i++) { paymentSeqNoList.add(String.valueOf(i)); } Long startTime = System.currentTimeMillis(); Map<String, FutureTask<String>> futureMap = new HashMap<String, FutureTask<String>>(); //線程池提交返回 for (String paymentSeqNo : paymentSeqNoList) { FutureTask<String> futureTask = new FutureTask<String>(new MyTestCallable(paymentSeqNo)); futureMap.put(paymentSeqNo, futureTask); // submit提交執行 threadPoolTaskExecutor.submit(futureTask); } Long endTime = System.currentTimeMillis(); System.out.println("耗時1:" + (endTime - startTime));
private static class MyTestCallable implements Callable<String> {
private final String paymentSeqNo;
/**
* 構造
*
* @param
*/
public MyTestCallable(String paymentSeqNo) {
this.paymentSeqNo = paymentSeqNo;
}
/*
* (non-Javadoc)
* @see java.util.concurrent.Callable#call()
*/
public String call() throws Exception {
ControlRequest controlRequest = new ControlRequest(paymentSeqNo, CtrlType.RETRY, null);
//System.out.println(controlRequest);
return "成功";
}
}
關於callable回調值監聽是否成功,JDK1.8 也開始支持guava方法了,guava有ListenableFuture 返回優化如下:
Long startTime2 = System.currentTimeMillis(); ListenableFuture<String> listenableFuture = null; for (String paymentSeqNo : paymentSeqNoList) { ListeningExecutorService executorService = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); listenableFuture = executorService.submit(new Callable<String>() { @Override public String call() throws Exception { return "成功"; } }); }
//監聽事件 Futures.addCallback(listenableFuture, new FutureCallback<String>() { @Override public void onSuccess(String result) { System.out.println("get listenable future's result with callback " + result); } @Override public void onFailure(Throwable t) { t.printStackTrace(); } }); Long endTime2 = System.currentTimeMillis(); System.out.println("耗時2:" + (endTime2 - startTime2));