前言:
Java SE 5.0引入了ThreadPoolExecutor、ScheduledThreadPoolExecutor。Spring 2.x借助ConcurrentTaskExecutor和ThreadPoolTaskExecutor能夠通過IoC配置形式自定義它們暴露的各個屬性。
多線程並發處理起來通常比較麻煩,如果你使用spring容器來管理業務bean,事情就好辦了多了。spring封裝了java的多線程的實現,你只需要關注於並發事物的流程以及一些並發負載量等特性,具體來說如何使用spring來處理並發事務:
1.了解 TaskExecutor接口
Spring的TaskExecutor接口等同於java.util.concurrent.Executor接口。 實際上,它存在的主要原因是為了在使用線程池的時候,將對Java 5的依賴抽象出來。 這個接口只有一個方法execute(Runnable task),它根據線程池的語義和配置,來接受一個執行任務。最初創建TaskExecutor是為了在需要時給其他Spring組件提供一個線程池的抽象。 例如ApplicationEventMulticaster組件、JMS的 AbstractMessageListenerContainer和對Quartz的整合都使用了TaskExecutor抽象來提供線程池。 當然,如果你的bean需要線程池行為,你也可以使用這個抽象層。
2. TaskExecutor接口的實現類
(1)SimpleAsyncTaskExecutor 類
這個實現不重用任何線程,或者說它每次調用都啟動一個新線程。但是,它還是支持對並發總數設限,當超過線程並發總數限制時,阻塞新的調用,直到有位置被釋放。如果你需要真正的池,請繼續往下看。
(2)SyncTaskExecutor類
這個實現不會異步執行。相反,每次調用都在發起調用的線程中執行。它的主要用處是在不需要多線程的時候,比如簡單的test case。
(3)ConcurrentTaskExecutor 類
這個實現是對Java 5 java.util.concurrent.Executor類的包裝。有另一個備選, ThreadPoolTaskExecutor類,它暴露了Executor的配置參數作為bean屬性。很少需要使用ConcurrentTaskExecutor, 但是如果ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另外一個備選。
<bean id="concurrentTaskExecutor" class="org.springframework.scheduling.concurrent.ConcurrentTaskExecutor"/>
(4)SimpleThreadPoolTaskExecutor 類
這個實現實際上是Quartz的SimpleThreadPool類的子類,它會監聽Spring的生命周期回調。當你有線程池,需要在Quartz和非Quartz組件中共用時,這是它的典型用處。
(5)ThreadPoolTaskExecutor 類
它不支持任何對java.util.concurrent包的替換或者下行移植。Doug Lea和Dawid Kurzyniec對java.util.concurrent的實現都采用了不同的包結構,導致它們無法正確運行。 這個實現只能在Java 5環境中使用,但是卻是這個環境中最常用的。它暴露的bean properties可以用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個TaskExecutor中。如果你需要更加先進的類,比如ScheduledThreadPoolExecutor,我們建議你使用ConcurrentTaskExecutor來替代。
下面先學習下JDK中的ThreadPoolExecutor中的相關信息.ThreadPoolExecutor構造函數如下:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
下面分別說下各項代表的具體意義:
int corePoolSize:線程池維護線程的最小數量.
int maximumPoolSize:線程池維護線程的最大數量.
long keepAliveTime:空閑線程的存活時間.
TimeUnit unit: 時間單位,現有納秒,微秒,毫秒,秒枚舉值.
BlockingQueue<Runnable> workQueue:持有等待執行的任務隊列.
RejectedExecutionHandler handler: 用來拒絕一個任務的執行,有兩種情況會發生這種情況:
一是在execute方法中若addIfUnderMaximumPoolSize(command)為false,即線程池已經飽和;
二是在execute方法中, 發現runState!=RUNNING || poolSize == 0,即已經shutdown,就調用ensureQueuedTaskHandled(Runnable command),在該方法中有可能調用reject。
Reject策略預定義有四種:
(1)ThreadPoolExecutor.AbortPolicy策略,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException。
(2)ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄.
(3)ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄.
(4)ThreadPoolExecutor.DiscardOldestPolicy策略,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如果再次失敗,則重復此過程).
ThreadPoolExecutor執行器的處理流程:
(1)當線程池大小小於corePoolSize就新建線程,並處理請求.
(2)當線程池大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去從workQueue中取任務並處理.
(3)當workQueue放不下新入的任務時,新建線程加入線程池,並處理請求,如果池子大小撐到了maximumPoolSize就用RejectedExecutionHandler來做拒絕處理.
(4)另外,當線程池的線程數大於corePoolSize的時候,多余的線程會等待keepAliveTime長的時間,如果無請求可處理就自行銷毀.
了解清楚了ThreadPoolExecutor的執行流程,開頭提到的org.springframework.core.task.TaskRejectedException異常也就好理解和解決了.ThreadPoolTaskExecutor類中使用的
就是ThreadPoolExecutor.AbortPolicy()策略,直接拋出異常.
spring中ThreadPoolTaskExecutor最常用方式就是做為BEAN注入到容器中,其暴露的各個屬性其實是ThreadPoolExecutor的屬性,而且這體現了DI容器的優勢。如下代碼:
<bean id="threadPoolTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="2"/> <property name="keepAliveSeconds" value="200"/> <property name="maxPoolSize" value="10"/> <property name="queueCapacity" value="60"/> </bean>
(6)TimerTaskExecutor類
這個實現使用一個TimerTask作為其背后的實現。它和SyncTaskExecutor的不同在於,方法調用是在一個獨立的線程中進行的,雖然在那個線程中是同步的。
(7)WorkManagerTaskExecutor類
這個實現使用了CommonJ WorkManager作為其底層實現,是在Spring context中配置CommonJ WorkManager應用的最重要的類。和SimpleThreadPoolTaskExecutor類似,這個類實現了WorkManager接口,因此可以直接作為WorkManager使用。
