spring 線程異步執行


多線程並發處理起來通常比較麻煩,如果你使用spring容器來管理業務bean,事情就好辦了多了。spring封裝了Java的多線程的實現,你只需要關注於並發事物的流程以及一些並發負載量等特性,具體來說如何使用spring來處理並發事務:

 

1.了解 TaskExecutor接口

Spring的TaskExecutor接口等同於java.util.concurrent.Executor接口。 實際上,它存在的主要原因是為了在使用線程池的時候,將對Java 5的依賴抽象出來。 這個接口只有一個方法execute(Runnable task),它根據線程池的語義和配置,來接受一個執行任務。最初創建TaskExecutor是為了在需要時給其他Spring組件提供一個線程池的抽象。 例如ApplicationEventMulticaster組件、JMS的 AbstractMessageListenerContainer和對Quartz的整合都使用了TaskExecutor抽象來提供線程池。 當然,如果你的bean需要線程池行為,你也可以使用這個抽象層。

 

2. TaskExecutor接口的實現類

(1)SimpleAsyncTaskExecutor 類

這個實現不重用任何線程,或者說它每次調用都啟動一個新線程。但是,它還是支持對並發總數設限,當超過線程並發總數限制時,阻塞新的調用,直到有位置被釋放。如果你需要真正的池,請繼續往下看。

(2)SyncTaskExecutor類

這個實現不會異步執行。相反,每次調用都在發起調用的線程中執行。它的主要用處是在不需要多線程的時候,比如簡單的test case。

(3)ConcurrentTaskExecutor 類

這個實現是對Java 5 java.util.concurrent.Executor類的包裝。有另一個備選, ThreadPoolTaskExecutor類,它暴露了Executor的配置參數作為bean屬性。很少需要使用ConcurrentTaskExecutor, 但是如果ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另外一個備選。

(4)SimpleThreadPoolTaskExecutor 類

這個實現實際上是Quartz的SimpleThreadPool類的子類,它會監聽Spring的生命周期回調。當你有線程池,需要在Quartz和非Quartz組件中共用時,這是它的典型用處。

(5)ThreadPoolTaskExecutor 類

它不支持任何對java.util.concurrent包的替換或者下行移植。Doug Lea和Dawid Kurzyniec對java.util.concurrent的實現都采用了不同的包結構,導致它們無法正確運行。 這個實現只能在Java 5環境中使用,但是卻是這個環境中最常用的。它暴露的bean properties可以用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個TaskExecutor中。如果你需要更加先進的類,比如ScheduledThreadPoolExecutor,我們建議你使用ConcurrentTaskExecutor來替代。

(6)TimerTaskExecutor類

這個實現使用一個TimerTask作為其背后的實現。它和SyncTaskExecutor的不同在於,方法調用是在一個獨立的線程中進行的,雖然在那個線程中是同步的。

(7)WorkManagerTaskExecutor類

這個實現使用了CommonJ WorkManager作為其底層實現,是在Spring context中配置CommonJ WorkManager應用的最重要的類。和SimpleThreadPoolTaskExecutor類似,這個類實現了WorkManager接口,因此可以直接作為WorkManager使用。  

 

案例

注冊TaskExecutor

復制代碼
 1 @Configuration
 2 public class WebMvcConfigurerAdpter extends AbstractWebMvcConfigurerAdpter {
 3 
 4     @Override
 5     public void configureMessageConverters(List<HttpMessageConverter<?>> converters) {
 6         super.configureMessageConverters(converters);
 7         WafJsonMapper.getMapper().enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS);
 8     }
 9 
10 
11     @Bean
12     public TaskExecutor taskExecutor() {
13         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
14         executor.setCorePoolSize(5);
15         executor.setMaxPoolSize(10);
16         return executor;
17     }
18 }
復制代碼

使用:

復制代碼
 1 @Service
 2 public class TaskService {
 3 
 4     @Autowired
 5     private TaskExecutor executor;
 6 
 7     public void execute() {
 8         executor.execute(new Runnable() {
 9             @Override
10             public void run() {
11                 for (int i = 0; i < 10; i++) {
12                     try {
13                         Thread.sleep(1000);
14                         System.out.println("task running ...");
15                     } catch (Exception e) {
16 
17                     }
18                 }
19             }
20         });
21     }
22 }
復制代碼

 

復制代碼
 1 @RestController
 2 @RequestMapping(value = "/v0.1")
 3 public class TaskController {
 4 
 5     @Autowired
 6     private TaskService taskService;
 7 
 8     @RequestMapping()
 9     public Object execute() {
10         taskService.execute();
11         Map res = new HashMap();
12         res.put("result", "success");
13         return res;
14     }
15 }
復制代碼

程序不會等到10個線程都跑完才返回結果,不是阻塞程序,返回結果后,線程仍然在執行。

 

案例:

復制代碼
 1 ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor();  
 2 //線程池所使用的緩沖隊列  
 3 poolTaskExecutor.setQueueCapacity(200);  
 4 //線程池維護線程的最少數量  
 5 poolTaskExecutor.setCorePoolSize(5);  
 6 //線程池維護線程的最大數量  
 7 poolTaskExecutor.setMaxPoolSize(1000);  
 8 //線程池維護線程所允許的空閑時間  
 9 poolTaskExecutor.setKeepAliveSeconds(30000);  
10 poolTaskExecutor.initialize();  
復制代碼
復制代碼
 1 <!-- 配置線程池 -->  
 2 <bean id ="taskExecutor"  class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" >  
 3     <!-- 線程池維護線程的最少數量 -->  
 4 <span style="white-space:pre">  </span><property name ="corePoolSize" value ="5" />  
 5     <!-- 線程池維護線程所允許的空閑時間 -->  
 6 <span style="white-space:pre">  </span><property name ="keepAliveSeconds" value ="30000" />  
 7     <!-- 線程池維護線程的最大數量 -->  
 8 <span style="white-space:pre">  </span><property name ="maxPoolSize" value ="1000" />  
 9     <!-- 線程池所使用的緩沖隊列 -->  
10 <span style="white-space:pre">  </span><property name ="queueCapacity" value ="200" />  
11 </bean>  
復制代碼
復制代碼
1 ApplicationContext ctx =  new ClassPathXmlApplicationContext("applicationContext.xml");
2 ThreadPoolTaskExecutor poolTaskExecutor = (ThreadPoolTaskExecutor)ctx.getBean("taskExecutor");
3 
4 Thread udpThread = new Thread(udp);
5 poolTaskExecutor.execute(udpThread);
6 獲取當前線程池活動的線程數:
7 int count = poolTaskExecutor.getActiveCount();
8 logger.debug("[x] - now threadpool active threads totalNum : " +count);
復制代碼

 

配置解釋


當一個任務通過execute(Runnable)方法欲添加到線程池時:
1、 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
2、 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
3、如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
4、 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
5、 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM