多線程並發處理起來通常比較麻煩,如果你使用spring容器來管理業務bean,事情就好辦了多了。spring封裝了Java的多線程的實現,你只需要關注於並發事物的流程以及一些並發負載量等特性,具體來說如何使用spring來處理並發事務:
1.了解 TaskExecutor接口
Spring的TaskExecutor接口等同於java.util.concurrent.Executor接口。 實際上,它存在的主要原因是為了在使用線程池的時候,將對Java 5的依賴抽象出來。 這個接口只有一個方法execute(Runnable task),它根據線程池的語義和配置,來接受一個執行任務。最初創建TaskExecutor是為了在需要時給其他Spring組件提供一個線程池的抽象。 例如ApplicationEventMulticaster組件、JMS的 AbstractMessageListenerContainer和對Quartz的整合都使用了TaskExecutor抽象來提供線程池。 當然,如果你的bean需要線程池行為,你也可以使用這個抽象層。
2. TaskExecutor接口的實現類
(1)SimpleAsyncTaskExecutor 類
這個實現不重用任何線程,或者說它每次調用都啟動一個新線程。但是,它還是支持對並發總數設限,當超過線程並發總數限制時,阻塞新的調用,直到有位置被釋放。如果你需要真正的池,請繼續往下看。
(2)SyncTaskExecutor類
這個實現不會異步執行。相反,每次調用都在發起調用的線程中執行。它的主要用處是在不需要多線程的時候,比如簡單的test case。
(3)ConcurrentTaskExecutor 類
這個實現是對Java 5 java.util.concurrent.Executor類的包裝。有另一個備選, ThreadPoolTaskExecutor類,它暴露了Executor的配置參數作為bean屬性。很少需要使用ConcurrentTaskExecutor, 但是如果ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另外一個備選。
(4)SimpleThreadPoolTaskExecutor 類
這個實現實際上是Quartz的SimpleThreadPool類的子類,它會監聽Spring的生命周期回調。當你有線程池,需要在Quartz和非Quartz組件中共用時,這是它的典型用處。
(5)ThreadPoolTaskExecutor 類
它不支持任何對java.util.concurrent包的替換或者下行移植。Doug Lea和Dawid Kurzyniec對java.util.concurrent的實現都采用了不同的包結構,導致它們無法正確運行。 這個實現只能在Java 5環境中使用,但是卻是這個環境中最常用的。它暴露的bean properties可以用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個TaskExecutor中。如果你需要更加先進的類,比如ScheduledThreadPoolExecutor,我們建議你使用ConcurrentTaskExecutor來替代。
(6)TimerTaskExecutor類
這個實現使用一個TimerTask作為其背后的實現。它和SyncTaskExecutor的不同在於,方法調用是在一個獨立的線程中進行的,雖然在那個線程中是同步的。
(7)WorkManagerTaskExecutor類
這個實現使用了CommonJ WorkManager作為其底層實現,是在Spring context中配置CommonJ WorkManager應用的最重要的類。和SimpleThreadPoolTaskExecutor類似,這個類實現了WorkManager接口,因此可以直接作為WorkManager使用。
3.線程池Demo之 ThreadPoolTaskExecutor
(1)編寫測試類
- import org.springframework.core.task.TaskExecutor;
- public class MainExecutor {
- private TaskExecutor taskExecutor;
- public MainExecutor (TaskExecutor taskExecutor) {
- this.taskExecutor = taskExecutor;
- }
- public void printMessages() {
- for(int i = 0; i < 25; i++) {
- taskExecutor.execute(new MessagePrinterTask("Message" + i));
- }
- }
- private class MessagePrinterTask implements Runnable {
- private String message;
- public MessagePrinterTask(String message) {
- this.message = message;
- }
- public void run() {
- System.out.println(message);
- }
- }
- }
在業務代碼中,通常以for循環的方式執行多個事務
for(int k = 0; k < n; k++) {
taskExecutor.execute(new ThreadTransCode());
}
其它繁瑣的線程管理的事情就交給執行器去管理。
值得注意的事有兩點
1, taskExecutor.execute(new ThreadTransCode()); 激活的線程都是守護線程,主線程結束,守護線程就會放棄執行,這個在業務中式符合邏輯的,在單元測試中為了看到執行效果,需要自行阻塞主線程。
2, taskExecutor.execute(new ThreadTransCode()); 的執行也不是完全安全的,在執行的過程中可能會因為需要的線程查過了線程隊列的容量而拋出運行時異常,如有必要需要捕獲。
(2)spring的配置
- <?xml version="1.0" encoding="UTF-8"?>
- <!DOCTYPE beans PUBLIC "-//SPRING//DTD BEAN//EN" "/spring-beans.dtd">
- <beans>
- <!-- 異步線程池 -->
- <bean id="threadPool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
- <!-- 核心線程數 -->
- <property name="corePoolSize" value="10" />
- <!-- 最大線程數 -->
- <property name="maxPoolSize" value="50" />
- <!-- 隊列最大長度 >=mainExecutor.maxSize -->
- <property name="queueCapacity" value="1000" />
- <!-- 線程池維護線程所允許的空閑時間 -->
- <property name="keepAliveSeconds" value="300" />
- <!-- 線程池對拒絕任務(無線程可用)的處理策略 -->
- <property name="rejectedExecutionHandler">
- <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />
- </property>
- </bean>
- <bean id="mainExecutor" class="supben.MainExecutor">
- <property name="threadPool" ref="threadPool" />
- </bean>
- <bean id="springScheduleExecutorTask" class="org.springframework.scheduling.concurrent.ScheduledExecutorTask">
- <property name="runnable" ref="mainExecutor" />
- <!-- 容器加載10秒后開始執行 -->
- <property name="delay" value="10000" />
- <!-- 每次任務間隔 5秒-->
- <property name="period" value="5000" />
- </bean>
- <bean id="springScheduledExecutorFactoryBean" class="org.springframework.scheduling.concurrent.ScheduledExecutorFactoryBean">
- <property name="scheduledExecutorTasks">
- <list>
- <ref bean="springScheduleExecutorTask" />
- </list>
- </property>
- </bean>
- </beans>
(3)調用
- ApplicationContext appContext = new ClassPathXmlApplicationContext("applicationContext.xml");
- MainExecutor te = (MainExecutor)appContext.getBean("taskExecutorExample");
- te.printMessages();
- System.out.println("11111111111111111111111");
(4)效果
案例:
1 @EnableScheduling 2 @Configuration 3 public class WebConfig extends SafWebMvcConfigurerAdapter { 4 5 、、、、 6 7 8 @Bean 9 public TaskExecutor taskExecutor() { 10 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 11 executor.setCorePoolSize(5); 12 executor.setMaxPoolSize(10); 13 return executor; 14 } 15 16 }
1 @Autowired 2 private TaskExecutor taskExecutor; 3 4 5 void save2AllTenantSetting(final ForbidScope forbidScope) { 6 taskExecutor.execute(new Runnable() { 7 @Override 8 public void run() { 9 final List<Tenant> tenants = tenantService.findAll(); 10 11 for (Tenant tenant : tenants) { 12 Runnable save2TenantSettingRunnable 13 = new Save2TenantSettingRunnable(tenant, forbidScope); 14 taskExecutor.execute(save2TenantSettingRunnable); 15 } 16 } 17 }); 18 }
案例:
首先要將TaskExecutor加入spring容器進行管理
1 @Configuration 2 public class WebMvcConfigurerAdpter extends AbstractWebMvcConfigurerAdpter { 3 4 @Override 5 public void configureMessageConverters(List<HttpMessageConverter<?>> converters) { 6 super.configureMessageConverters(converters); 7 WafJsonMapper.getMapper().enable(DeserializationFeature.FAIL_ON_NUMBERS_FOR_ENUMS); 8 } 9 10 @Bean 11 public TaskExecutor taskExecutor() { 12 ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 13 executor.setCorePoolSize(5); 14 executor.setMaxPoolSize(10); 15 return executor; 16 } 17 18 @Bean 19 public static PropertySourcesPlaceholderConfigurer propertyConfig() { 20 return new PropertySourcesPlaceholderConfigurer(); 21 } 22 }
使用:
1 @RestController 2 @RequestMapping(value = "/v0.1/tasks") 3 public class TaskController { 4 5 @Autowired 6 private TaskService taskService; 7 8 @RequestMapping() 9 public Object execute() { 10 taskService.execute(); 11 Map res = new HashMap(); 12 res.put("result", "success"); 13 return res; 14 } 15 }
@Service public class TaskService {@Autowired </span><span style="color: #0000ff">private</span><span style="color: #000000"> TaskExecutor executor; </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">void</span><span style="color: #000000"> execute() { executor.execute(</span><span style="color: #0000ff">new</span><span style="color: #000000"> Runnable() { @Override </span><span style="color: #0000ff">public</span> <span style="color: #0000ff">void</span><span style="color: #000000"> run() { </span><span style="color: #0000ff">for</span> (<span style="color: #0000ff">int</span> i = 0; i < 10; i++<span style="color: #000000">) { </span><span style="color: #0000ff">try</span><span style="color: #000000"> { Thread.sleep(</span>1000<span style="color: #000000">); System.out.println(</span>"task running ..."<span style="color: #000000">); } </span><span style="color: #0000ff">catch</span><span style="color: #000000"> (Exception e) { } } } }); }
}
每次調用/v0.1/tasks 接口時, 不用等到任務執行結束后才會響應,而是響應后,任務還可能在執行 -- 異步而非同步