多線程並發處理起來通常比較麻煩,如果你使用spring容器來管理業務bean,事情就好辦了多了。spring封裝了java的多線程的實現,你只需要關注於並發事物的流程以及一些並發負載量等特性,具體來說如何使用spring來處理並發事務:
1.了解 TaskExecutor接口
Spring的 TaskExecutor接口等同於java.util.concurrent.Executor接口。 實際上,它存在的主要原因是為了在使用線程池的時候,將對Java 5的依賴抽象出來。 這個接口只有一個方法execute(Runnable task),它根據線程池的語義和配置,來接受一個執行任務。最初創建TaskExecutor是為了在需要時給其他Spring組件提供一個線程池的抽 象。 例如ApplicationEventMulticaster組件、JMS的 AbstractMessageListenerContainer和對Quartz的整合都使用了TaskExecutor抽象來提供線程池。 當然,如果你的bean需要線程池行為,你也可以使用這個抽象層。
2. TaskExecutor接口的實現類
(1)SimpleAsyncTaskExecutor 類
這個實現不重用任何線程,或者說它每次調用都啟動一個新線程。但是,它還是支持對並發總數設限,當超過線程並發總數限制時,阻塞新的調用,直到有位置被釋放。如果你需要真正的池,請繼續往下看。
(2)SyncTaskExecutor類
這個實現不會異步執行。相反,每次調用都在發起調用的線程中執行。它的主要用處是在不需要多線程的時候,比如簡單的test case。
(3)ConcurrentTaskExecutor 類
這個實現是對 Java 5 java.util.concurrent.Executor類的包裝。有另一個備選, ThreadPoolTaskExecutor類,它暴露了Executor的配置參數作為bean屬性。很少需要使用 ConcurrentTaskExecutor, 但是如果ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另外一個備選。
(4)SimpleThreadPoolTaskExecutor 類
這個實現實際上是Quartz的SimpleThreadPool類的子類,它會監聽Spring的生命周期回調。當你有線程池,需要在Quartz和非Quartz組件中共用時,這是它的典型用處。
(5)ThreadPoolTaskExecutor 類
它不支持任何對 java.util.concurrent包的替換或者下行移植。Doug Lea和Dawid Kurzyniec對java.util.concurrent的實現都采用了不同的包結構,導致它們無法正確運行。 這個實現只能在Java 5環境中使用,但是卻是這個環境中最常用的。它暴露的bean properties可以用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個 TaskExecutor中。如果你需要更加先進的類,比如ScheduledThreadPoolExecutor,我們建議你使用 ConcurrentTaskExecutor來替代。
(6)TimerTaskExecutor類
這個實現使用一個TimerTask作為其背后的實現。它和SyncTaskExecutor的不同在於,方法調用是在一個獨立的線程中進行的,雖然在那個線程中是同步的。
(7)WorkManagerTaskExecutor類
這個實現使用了 CommonJ WorkManager作為其底層實現,是在Spring context中配置CommonJ WorkManager應用的最重要的類。和SimpleThreadPoolTaskExecutor類似,這個類實現了WorkManager接口, 因此可以直接作為WorkManager使用。
3. 簡單hellword
3.1 線程類
public class MessagePrinterTask implements Runnable { private String message; public MessagePrinterTask() { } public MessagePrinterTask(String message) { this.message = message; } public void run() { try { Thread.sleep(5000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } System.out.println(message); } }
3.1 線程管理工具類
import org.springframework.core.task.TaskExecutor; public class TaskExecutorUtil { private TaskExecutor taskExecutor; public TaskExecutor getTaskExecutor() { return taskExecutor; } public void setTaskExecutor(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } public void printMessages(Runnable r,int i) { taskExecutor.execute(r); System.out.println("add Thread:"+i); } }
applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd"> <!-- 異步線程池 --> <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <!-- 核心線程數 --> <property name="corePoolSize" value="2" /> <!-- 最大線程數 --> <property name="maxPoolSize" value="3" /> <!-- 隊列最大長度 >=mainExecutor.maxSize --> <property name="queueCapacity" value="10" /> <!-- 線程池維護線程所允許的空閑時間 --> <property name="keepAliveSeconds" value="300" /> <!-- 線程池對拒絕任務(無線程可用)的處理 策略 --> <!-- 若不作該處理,當線程滿了,隊列滿了之后,繼續往下增加任務,則拋出異常,拒絕該任務 --> <property name="rejectedExecutionHandler"> <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" /> </property> </bean> <bean id="taskExecutorUtil" class="TaskExecutorUtil"> <!-- <constructor-arg ref="taskExecutor" /> --> <property name="taskExecutor" ref="taskExecutor" /> </bean> <!-- 托管線程 --> <bean id="messagePrinterTask" class="MessagePrinterTask"> </bean> </beans>
測試
import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * Hello world! * */ public class App { public static void main( String[] args ) { ApplicationContext appContext = new ClassPathXmlApplicationContext("applicationContext.xml"); TaskExecutorUtil te = (TaskExecutorUtil)appContext.getBean("taskExecutorUtil"); for (int i = 0; i < 25; i++) { MessagePrinterTask m=new MessagePrinterTask("Message" + i); te.printMessages(m,i); } System.out.println("11111111111111111111111"); } }
結果輸出
add Thread:0 add Thread:1 add Thread:2 add Thread:3 add Thread:4 add Thread:5 add Thread:6 add Thread:7 add Thread:8 add Thread:9 add Thread:10 add Thread:11 add Thread:12 Message1 Message0 Message13 add Thread:13 Message12 add Thread:14 add Thread:15 add Thread:16 Message2 Message4 Message17 add Thread:17 Message3 add Thread:18 add Thread:19 add Thread:20 Message5 Message7 Message21 add Thread:21 Message6 add Thread:22 add Thread:23 add Thread:24 11111111111111111111111 Message9 Message8 Message10 Message14 Message15 Message11 Message18 Message19 Message16 Message22 Message23 Message20 Message24
解釋:
1.因為線程類睡眠5秒,所以在5秒前add Thread增加了13個線程(3個最大線程+10個在隊列中等候),由於在做了對了滿了的策略(rejectedExecutionHandler),所以不會拋出異常,
其他線程等候加入隊列
2.五秒后打印,然后繼續addThread,打印message,注意當前最大線程執行是3個,,其余在隊列等候,然后打印剩余線程
,111111111111是主線程
4、配置解釋
當一個任務通過execute(Runnable)方法欲添加到線程池時:
1、 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
2、 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
3、如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
4、 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
5、 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
5. 線程類為 java.util.concurrent.ThreadPoolExecutor:
線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為: ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) corePoolSize: 線程池維護線程的最少數量 maximumPoolSize:線程池維護線程的最大數量 keepAliveTime: 線程池維護線程所允許的空閑時間 unit: 線程池維護線程所允許的空閑時間的單位 workQueue: 線程池所使用的緩沖隊列 handler: 線程池對拒絕任務的處理策略 一個任務通過 execute(Runnable)方法被添加到線程池,任務就是一個 Runnable類型的對象,任務的執行方法就是 Runnable類型對象的run()方法。 當一個任務通過execute(Runnable)方法欲添加到線程池時: 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。 也就是:處理任務的優先級為: 核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。 unit可選的參數為java.util.concurrent.TimeUnit中的幾個靜態屬性: NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。 workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue handler有四個選擇: ThreadPoolExecutor.AbortPolicy() 拋出java.util.concurrent.RejectedExecutionException異常 ThreadPoolExecutor.CallerRunsPolicy() 重試添加當前的任務,他會自動重復調用execute()方法 ThreadPoolExecutor.DiscardOldestPolicy() 拋棄舊的任務 ThreadPoolExecutor.DiscardPolicy() 拋棄當前的任務