1.ExecutorService
private static ExecutorService exec = null;
public static ExecutorService getExecutorServiceInstance(){
if(exec == null){
exec = Executors.newCachedThreadPool();
}
return exec;
}
public void threadNoticeOrMessageOrShortMessage (Integer type, Map<String, String> map, List<String> replaceParameter, List<String> list, Integer saveFlag){
exec = getExecutorServiceInstance();
NoticeOrMessageOrShortMessage noticeOrMessageOrShortMessage = new NoticeOrMessageOrShortMessage(getMessagePushInstance(), type, map, replaceParameter, list, saveFlag,
messagePushService, sendPushService, sendSmsService, sendMessageService);
exec.execute(noticeOrMessageOrShortMessage);
}
2.ThreadPoolTaskExecutor
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="${task.core_pool_size}" />
<property name="maxPoolSize" value="${task.max_pool_size}" />
<property name="queueCapacity" value="${task.queue_capacity}" />
<property name="keepAliveSeconds" value="${task.keep_alive_seconds}" />
<!-- 新增 -->
- <!-- 線程池對拒絕任務(無線程可用)的處理策略 -->
- <property name="rejectedExecutionHandler">
- <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
- </property>
</bean>
@Resource(name = "taskExecutor")
private TaskExecutor taskExecutor;
private void addSendTask(final MimeMessage mimeMessage) {
try {
taskExecutor.execute(new Runnable() {
public void run() {
javaMailSender.send(mimeMessage);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
另外一種方式(未驗證代碼准確性)
private static ThreadPoolTaskExecutor threadPoolTaskExecutor = null;
public static ThreadPoolTaskExecutor getThreadPoolTaskExecutor Instance(){
if(threadPoolTaskExecutor == null){
threadPoolTaskExecutor.setCorePoolSize(5);
threadPoolTaskExecutor.setMaxPoolSize(50);
threadPoolTaskExecutor.setQueueCapacity(1000);
threadPoolTaskExecutor.setKeepAliveSeconds(60);
try {
taskExecutor.execute(new Runnable() {
public void run() {
javaMailSender.send(mimeMessage);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
return exec;
}
下面分別說下各項代表的具體意義:
int corePoolSize:線程池維護線程的最小數量.
int maximumPoolSize:線程池維護線程的最大數量.
long keepAliveTime:空閑線程的存活時間.
TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒枚舉值.
BlockingQueue<Runnable> workQueue:持有等待執行的任務隊列.
RejectedExecutionHandler handler:
用來拒絕一個任務的執行,有兩種情況會發生這種情況。
一是在execute方法中若addIfUnderMaximumPoolSize(command)為false,即線程池已經飽和;
二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,就調用ensureQueuedTaskHandled(Runnable command),在該方法中有可能調用reject。
Reject策略預定義有四種:
(1)ThreadPoolExecutor.AbortPolicy策略,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程).
ThreadPoolExecutor執行器的處理流程:
(1)當線程池大小小於corePoolSize就新建線程,並處理請求.
(2)當線程池大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去從workQueue中取任務並處理.
(3)當workQueue放不下新入的任務時,新建線程加入線程池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理.
(4)另外,當線程池的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀.
CachedThreadPool
CachedThreadPool會創建一個緩存區,將初始化的線程緩存起來。會終止並且從緩存中移除已有60秒未被使用的線程。
如果線程有可用的,就使用之前創建好的線程,
如果線程沒有可用的,就新創建線程。
- 重用:緩存型池子,先查看池中有沒有以前建立的線程,如果有,就reuse;如果沒有,就建一個新的線程加入池中
- 使用場景:緩存型池子通常用於執行一些生存期很短的異步型任務,因此在一些面向連接的daemon型SERVER中用得不多。
- 超時:能reuse的線程,必須是timeout IDLE內的池中線程,缺省timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。
- 結束:注意,放入CachedThreadPool的線程不必擔心其結束,超過TIMEOUT不活動,其會自動被終止。
FixedThreadPool
在FixedThreadPool中,有一個固定大小的池。
如果當前需要執行的任務超過池大小,那么多出的任務處於等待狀態,直到有空閑下來的線程執行任務,
如果當前需要執行的任務小於池大小,空閑的線程也不會去銷毀。
重用:fixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的線程
固定數目:其獨特之處在於,任意時間點,最多只能有固定數目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子
超時:和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),
使用場景:所以FixedThreadPool多數針對一些很穩定很固定的正規並發線程,多用於服務器
源碼分析:從方法的源代碼看,cache池和fixed 池調用的是同一個底層池,只不過參數不同:
1.fixed池線程數固定,並且是0秒IDLE(無IDLE)
2.cache池線程數支持0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE
創建了一個固定大小的線程池,容量為3,然后循環執行了4個任務。由輸出結果可以看到,前3個任務首先執行完,然后空閑下來的線程去執行第4個任務。
SingleThreadExecutor
SingleThreadExecutor得到的是一個單個的線程,這個線程會保證你的任務執行完成。
如果當前線程意外終止,會創建一個新線程繼續執行任務,這和我們直接創建線程不同,也和newFixedThreadPool(1)不同。
ScheduledThreadPool
ScheduledThreadPool是一個固定大小的線程池,與FixedThreadPool類似,執行的任務是定時執行。
參考:http://www.cnblogs.com/cuiliang/p/3327039.html
http://zy116494718.iteye.com/blog/1704344
