ExecutorService與ThreadPoolTaskExecutor


1.ExecutorService 

private static ExecutorService exec = null;
public static ExecutorService getExecutorServiceInstance(){
if(exec == null){
exec = Executors.newCachedThreadPool();
}
return exec;
}
public void threadNoticeOrMessageOrShortMessage (Integer type, Map<String, String> map, List<String> replaceParameter, List<String> list, Integer saveFlag){
exec = getExecutorServiceInstance();
NoticeOrMessageOrShortMessage noticeOrMessageOrShortMessage = new NoticeOrMessageOrShortMessage(getMessagePushInstance(), type, map, replaceParameter, list, saveFlag,
messagePushService, sendPushService, sendSmsService, sendMessageService);
exec.execute(noticeOrMessageOrShortMessage);
}

2.ThreadPoolTaskExecutor

<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="${task.core_pool_size}" />
<property name="maxPoolSize" value="${task.max_pool_size}" />
<property name="queueCapacity" value="${task.queue_capacity}" />
<property name="keepAliveSeconds" value="${task.keep_alive_seconds}" />
  <!-- 新增 -->
  1. <!-- 線程池對拒絕任務(無線程可用)的處理策略 -->  
  2.     <property name="rejectedExecutionHandler">  
  3.       <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />  
  4.     </property>  

</bean>

@Resource(name = "taskExecutor")
private TaskExecutor taskExecutor;
private void addSendTask(final MimeMessage mimeMessage) {
try {
taskExecutor.execute(new Runnable() {
public void run() {
javaMailSender.send(mimeMessage);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
另外一種方式(未驗證代碼准確性)
private static ThreadPoolTaskExecutor threadPoolTaskExecutor = null;
public static ThreadPoolTaskExecutor getThreadPoolTaskExecutor Instance(){
    if(threadPoolTaskExecutor == null){
    threadPoolTaskExecutor.setCorePoolSize(5);
    threadPoolTaskExecutor.setMaxPoolSize(50);
    threadPoolTaskExecutor.setQueueCapacity(1000);
    threadPoolTaskExecutor.setKeepAliveSeconds(60);
    try {
  taskExecutor.execute(new Runnable() {
   public void run() {
  javaMailSender.send(mimeMessage);
  }
  });
  } catch (Exception e) {
e.printStackTrace();
  }
   }
 return exec;
}

下面分別說下各項代表的具體意義: 

int corePoolSize:線程池維護線程的最小數量. 
int maximumPoolSize:線程池維護線程的最大數量. 
long keepAliveTime:空閑線程的存活時間. 
TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒枚舉值. 
BlockingQueue<Runnable> workQueue:持有等待執行的任務隊列. 
RejectedExecutionHandler handler: 
用來拒絕一個任務的執行,有兩種情況會發生這種情況。 
一是在execute方法中若addIfUnderMaximumPoolSize(command)為false,即線程池已經飽和; 
二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,就調用ensureQueuedTaskHandled(Runnable command),在該方法中有可能調用reject。 

Reject策略預定義有四種: 
(1)ThreadPoolExecutor.AbortPolicy策略,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。 
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄. 
(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄. 
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程). 
ThreadPoolExecutor執行器的處理流程: 
(1)當線程池大小小於corePoolSize就新建線程,並處理請求. 
(2)當線程池大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去從workQueue中取任務並處理. 
(3)當workQueue放不下新入的任務時,新建線程加入線程池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理. 
(4)另外,當線程池的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀. 


CachedThreadPool

  CachedThreadPool會創建一個緩存區,將初始化的線程緩存起來。會終止並且從緩存中移除已有60秒未被使用的線程。

  如果線程有可用的,就使用之前創建好的線程,

  如果線程沒有可用的,就新創建線程。

  • 重用:緩存型池子,先查看池中有沒有以前建立的線程,如果有,就reuse;如果沒有,就建一個新的線程加入池中
  • 使用場景:緩存型池子通常用於執行一些生存期很短的異步型任務,因此在一些面向連接的daemon型SERVER中用得不多。
  • 超時:能reuse的線程,必須是timeout IDLE內的池中線程,缺省timeout是60s,超過這個IDLE時長,線程實例將被終止及移出池。
  • 結束:注意,放入CachedThreadPool的線程不必擔心其結束,超過TIMEOUT不活動,其會自動被終止。

FixedThreadPool

  在FixedThreadPool中,有一個固定大小的池。

  如果當前需要執行的任務超過池大小,那么多出的任務處於等待狀態,直到有空閑下來的線程執行任務,

  如果當前需要執行的任務小於池大小,空閑的線程也不會去銷毀。

         重用:fixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的線程

        固定數目:其獨特之處在於,任意時間點,最多只能有固定數目的活動線程存在,此時如果有新的線程要建立,只能放在另外的隊列中等待,直到當前的線程中某個線程終止直接被移出池子

       超時:和cacheThreadPool不同,FixedThreadPool沒有IDLE機制(可能也有,但既然文檔沒提,肯定非常長,類似依賴上層的TCP或UDP IDLE機制之類的),

      使用場景:所以FixedThreadPool多數針對一些很穩定很固定的正規並發線程,多用於服務器

     源碼分析:從方法的源代碼看,cache池和fixed 池調用的是同一個底層池,只不過參數不同:

          1.fixed池線程數固定,並且是0秒IDLE(無IDLE)

          2.cache池線程數支持0-Integer.MAX_VALUE(顯然完全沒考慮主機的資源承受能力),60秒IDLE

創建了一個固定大小的線程池,容量為3,然后循環執行了4個任務。由輸出結果可以看到,前3個任務首先執行完,然后空閑下來的線程去執行第4個任務

 

SingleThreadExecutor 

  SingleThreadExecutor得到的是一個單個的線程,這個線程會保證你的任務執行完成。

  如果當前線程意外終止,會創建一個新線程繼續執行任務,這和我們直接創建線程不同,也和newFixedThreadPool(1)不同。

ScheduledThreadPool

ScheduledThreadPool是一個固定大小的線程池,與FixedThreadPool類似,執行的任務是定時執行

 參考:http://www.cnblogs.com/cuiliang/p/3327039.html

    http://zy116494718.iteye.com/blog/1704344


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM