多線程並發處理起來通常比較麻煩,如果你使用spring容器來管理業務bean,事情就好辦了多了。spring封裝了Java的多線程的實現,你只需要關注於並發事物的流程以及一些並發負載量等特性,具體來說如何使用spring來處理並發事務:
1.了解 TaskExecutor接口
Spring的TaskExecutor接口等同於java.util.concurrent.Executor接口。 實際上,它存在的主要原因是為了在使用線程池的時候,將對Java 5的依賴抽象出來。 這個接口只有一個方法execute(Runnable task),它根據線程池的語義和配置,來接受一個執行任務。最初創建TaskExecutor是為了在需要時給其他Spring組件提供一個線程池的抽象。 例如ApplicationEventMulticaster組件、JMS的 AbstractMessageListenerContainer和對Quartz的整合都使用了TaskExecutor抽象來提供線程池。 當然,如果你的bean需要線程池行為,你也可以使用這個抽象層。
2. TaskExecutor接口的實現類
(1)SimpleAsyncTaskExecutor 類
這個實現不重用任何線程,或者說它每次調用都啟動一個新線程。但是,它還是支持對並發總數設限,當超過線程並發總數限制時,阻塞新的調用,直到有位置被釋放。如果你需要真正的池,請繼續往下看。
(2)SyncTaskExecutor類
這個實現不會異步執行。相反,每次調用都在發起調用的線程中執行。它的主要用處是在不需要多線程的時候,比如簡單的test case。
(3)ConcurrentTaskExecutor 類
這個實現是對Java 5 java.util.concurrent.Executor類的包裝。有另一個備選, ThreadPoolTaskExecutor類,它暴露了Executor的配置參數作為bean屬性。很少需要使用ConcurrentTaskExecutor, 但是如果ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另外一個備選。
(4)SimpleThreadPoolTaskExecutor 類
這個實現實際上是Quartz的SimpleThreadPool類的子類,它會監聽Spring的生命周期回調。當你有線程池,需要在Quartz和非Quartz組件中共用時,這是它的典型用處。
(5)ThreadPoolTaskExecutor 類
它不支持任何對java.util.concurrent包的替換或者下行移植。Doug Lea和Dawid Kurzyniec對java.util.concurrent的實現都采用了不同的包結構,導致它們無法正確運行。 這個實現只能在Java 5環境中使用,但是卻是這個環境中最常用的。它暴露的bean properties可以用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個TaskExecutor中。如果你需要更加先進的類,比如ScheduledThreadPoolExecutor,我們建議你使用ConcurrentTaskExecutor來替代。
(6)TimerTaskExecutor類
這個實現使用一個TimerTask作為其背后的實現。它和SyncTaskExecutor的不同在於,方法調用是在一個獨立的線程中進行的,雖然在那個線程中是同步的。
(7)WorkManagerTaskExecutor類
這個實現使用了CommonJ WorkManager作為其底層實現,是在Spring context中配置CommonJ WorkManager應用的最重要的類。和SimpleThreadPoolTaskExecutor類似,這個類實現了WorkManager接口,因此可以直接作為WorkManager使用。
案例
注冊TaskExecutor
1 @Configuration 2 public class WebMvcConfigurerAdpter extends AbstractWebMvcConfigurerAdpter { 3 4 @Override 5 public void configureMessageConverters(List<HttpMessageConverter<?>> converters) { 6 super.configureMessageConverters(converters); 7 WafJsonMapper.getMapper().enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS); 8 } 9 10 11 @Bean 12 public TaskExecutor taskExecutor() { 13 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 14 executor.setCorePoolSize(5); 15 executor.setMaxPoolSize(10); 16 return executor; 17 } 18 }
使用:
1 @Service 2 public class TaskService { 3 4 @Autowired 5 private TaskExecutor executor; 6 7 public void execute() { 8 executor.execute(new Runnable() { 9 @Override 10 public void run() { 11 for (int i = 0; i < 10; i++) { 12 try { 13 Thread.sleep(1000); 14 System.out.println("task running ..."); 15 } catch (Exception e) { 16 17 } 18 } 19 } 20 }); 21 } 22 }
1 @RestController 2 @RequestMapping(value = "/v0.1") 3 public class TaskController { 4 5 @Autowired 6 private TaskService taskService; 7 8 @RequestMapping() 9 public Object execute() { 10 taskService.execute(); 11 Map res = new HashMap(); 12 res.put("result", "success"); 13 return res; 14 } 15 }
程序不會等到10個線程都跑完才返回結果,不是阻塞程序,返回結果后,線程仍然在執行。
案例:
1 ThreadPoolTaskExecutor poolTaskExecutor = new ThreadPoolTaskExecutor(); 2 //線程池所使用的緩沖隊列 3 poolTaskExecutor.setQueueCapacity(200); 4 //線程池維護線程的最少數量 5 poolTaskExecutor.setCorePoolSize(5); 6 //線程池維護線程的最大數量 7 poolTaskExecutor.setMaxPoolSize(1000); 8 //線程池維護線程所允許的空閑時間 9 poolTaskExecutor.setKeepAliveSeconds(30000); 10 poolTaskExecutor.initialize();
1 <!-- 配置線程池 --> 2 <bean id ="taskExecutor" class ="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor" > 3 <!-- 線程池維護線程的最少數量 --> 4 <span style="white-space:pre"> </span><property name ="corePoolSize" value ="5" /> 5 <!-- 線程池維護線程所允許的空閑時間 --> 6 <span style="white-space:pre"> </span><property name ="keepAliveSeconds" value ="30000" /> 7 <!-- 線程池維護線程的最大數量 --> 8 <span style="white-space:pre"> </span><property name ="maxPoolSize" value ="1000" /> 9 <!-- 線程池所使用的緩沖隊列 --> 10 <span style="white-space:pre"> </span><property name ="queueCapacity" value ="200" /> 11 </bean>
1 ApplicationContext ctx = new ClassPathXmlApplicationContext("applicationContext.xml"); 2 ThreadPoolTaskExecutor poolTaskExecutor = (ThreadPoolTaskExecutor)ctx.getBean("taskExecutor"); 3 4 Thread udpThread = new Thread(udp); 5 poolTaskExecutor.execute(udpThread); 6 獲取當前線程池活動的線程數: 7 int count = poolTaskExecutor.getActiveCount(); 8 logger.debug("[x] - now threadpool active threads totalNum : " +count);
配置解釋
當一個任務通過execute(Runnable)方法欲添加到線程池時:
1、 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
2、 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
3、如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
4、 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
5、 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。