spring-線程池(1)


多線程並發處理起來通常比較麻煩,如果你使用spring容器來管理業務bean,事情就好辦了多了。spring封裝了java的多線程的實現,你只需要關注於並發事物的流程以及一些並發負載量等特性,具體來說如何使用spring來處理並發事務:

1.了解 TaskExecutor接口

Spring的 TaskExecutor接口等同於java.util.concurrent.Executor接口 實際上,它存在的主要原因是為了在使用線程池的時候,將對Java 5的依賴抽象出來。 這個接口只有一個方法execute(Runnable task),它根據線程池的語義和配置,來接受一個執行任務。最初創建TaskExecutor是為了在需要時給其他Spring組件提供一個線程池的抽 象。 例如ApplicationEventMulticaster組件、JMS的 AbstractMessageListenerContainer和對Quartz的整合都使用了TaskExecutor抽象來提供線程池。 當然,如果你的bean需要線程池行為,你也可以使用這個抽象層。

2. TaskExecutor接口的實現類

(1)SimpleAsyncTaskExecutor 類

這個實現不重用任何線程,或者說它每次調用都啟動一個新線程。但是,它還是支持對並發總數設限,當超過線程並發總數限制時,阻塞新的調用,直到有位置被釋放。如果你需要真正的池,請繼續往下看。

(2)SyncTaskExecutor類

這個實現不會異步執行。相反,每次調用都在發起調用的線程中執行。它的主要用處是在不需要多線程的時候,比如簡單的test case。

(3)ConcurrentTaskExecutor 類

這個實現是對 Java 5 java.util.concurrent.Executor類的包裝。有另一個備選, ThreadPoolTaskExecutor類,它暴露了Executor的配置參數作為bean屬性。很少需要使用 ConcurrentTaskExecutor, 但是如果ThreadPoolTaskExecutor不敷所需,ConcurrentTaskExecutor是另外一個備選。

(4)SimpleThreadPoolTaskExecutor 類

這個實現實際上是Quartz的SimpleThreadPool類的子類,它會監聽Spring的生命周期回調。當你有線程池,需要在Quartz和非Quartz組件中共用時,這是它的典型用處。

(5)ThreadPoolTaskExecutor 類

它不支持任何對 java.util.concurrent包的替換或者下行移植。Doug Lea和Dawid Kurzyniec對java.util.concurrent的實現都采用了不同的包結構,導致它們無法正確運行。 這個實現只能在Java 5環境中使用,但是卻是這個環境中最常用的。它暴露的bean properties可以用來配置一個java.util.concurrent.ThreadPoolExecutor,把它包裝到一個 TaskExecutor中。如果你需要更加先進的類,比如ScheduledThreadPoolExecutor,我們建議你使用 ConcurrentTaskExecutor來替代。

(6)TimerTaskExecutor類

這個實現使用一個TimerTask作為其背后的實現。它和SyncTaskExecutor的不同在於,方法調用是在一個獨立的線程中進行的,雖然在那個線程中是同步的。

(7)WorkManagerTaskExecutor類

這個實現使用了 CommonJ WorkManager作為其底層實現,是在Spring context中配置CommonJ WorkManager應用的最重要的類。和SimpleThreadPoolTaskExecutor類似,這個類實現了WorkManager接口, 因此可以直接作為WorkManager使用。 

 

3. 簡單hellword

 

3.1 線程類

public class MessagePrinterTask implements Runnable {
    private String message;

    public MessagePrinterTask() {

    }
    public MessagePrinterTask(String message) {
        this.message = message;
    }

    public void run() {
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        System.out.println(message);
    }
}

 

3.1 線程管理工具類

import org.springframework.core.task.TaskExecutor;

public class TaskExecutorUtil {
    
    private TaskExecutor taskExecutor;
    public TaskExecutor getTaskExecutor() {
        return taskExecutor;
    }

    public void setTaskExecutor(TaskExecutor taskExecutor) {
        this.taskExecutor = taskExecutor;
    }

    public void printMessages(Runnable r,int i) {
            taskExecutor.execute(r);
            System.out.println("add Thread:"+i);
    }


}

 

applicationContext.xml

<?xml version="1.0" encoding="UTF-8"?>  
<beans  
xmlns="http://www.springframework.org/schema/beans"  
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
xmlns:context="http://www.springframework.org/schema/context"  
xsi:schemaLocation="  
http://www.springframework.org/schema/beans        
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
http://www.springframework.org/schema/context                
http://www.springframework.org/schema/context/spring-context-3.0.xsd">  
  <!-- 異步線程池 --> 
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> 
    <!-- 核心線程數  -->  
    <property name="corePoolSize" value="2" />  
    <!-- 最大線程數 -->  
    <property name="maxPoolSize" value="3" />  
    <!-- 隊列最大長度 >=mainExecutor.maxSize -->  
    <property name="queueCapacity" value="10" />  
    <!-- 線程池維護線程所允許的空閑時間 -->  
    <property name="keepAliveSeconds" value="300" />  
    <!-- 線程池對拒絕任務(無線程可用)的處理
    策略 --> <!-- 若不作該處理,當線程滿了,隊列滿了之后,繼續往下增加任務,則拋出異常,拒絕該任務 --> 
    <property name="rejectedExecutionHandler">  
      <bean class="java.util.concurrent.ThreadPoolExecutor$CallerRunsPolicy" />  
    </property>
</bean>
  
<bean id="taskExecutorUtil" class="TaskExecutorUtil">    
    <!-- <constructor-arg ref="taskExecutor" />  --> 
    <property name="taskExecutor" ref="taskExecutor" /> 
</bean> 

<!-- 托管線程 -->
<bean id="messagePrinterTask" class="MessagePrinterTask">    
</bean> 

  
  
</beans>

 

測試

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
 * Hello world!
 *
 */
public class App 
{
    public static void main( String[] args )
    {
        ApplicationContext appContext = new ClassPathXmlApplicationContext("applicationContext.xml");    
        TaskExecutorUtil te = (TaskExecutorUtil)appContext.getBean("taskExecutorUtil");   
        
        for (int i = 0; i < 25; i++) {
            MessagePrinterTask m=new MessagePrinterTask("Message" + i);
            te.printMessages(m,i); 
        } 
        System.out.println("11111111111111111111111"); 
    }
}

結果輸出

add Thread:0
add Thread:1
add Thread:2
add Thread:3
add Thread:4
add Thread:5
add Thread:6
add Thread:7
add Thread:8
add Thread:9
add Thread:10
add Thread:11
add Thread:12
Message1
Message0
Message13
add Thread:13
Message12
add Thread:14
add Thread:15
add Thread:16
Message2
Message4
Message17
add Thread:17
Message3
add Thread:18
add Thread:19
add Thread:20
Message5
Message7
Message21
add Thread:21
Message6
add Thread:22
add Thread:23
add Thread:24
11111111111111111111111
Message9
Message8
Message10
Message14
Message15
Message11
Message18
Message19
Message16
Message22
Message23
Message20
Message24

 

解釋:

1.因為線程類睡眠5秒,所以在5秒前add Thread增加了13個線程(3個最大線程+10個在隊列中等候),由於在做了對了滿了的策略(rejectedExecutionHandler),所以不會拋出異常,

其他線程等候加入隊列

2.五秒后打印,然后繼續addThread,打印message,注意當前最大線程執行是3個,,其余在隊列等候,然后打印剩余線程,111111111111是主線程

 

 

4、配置解釋
當一個任務通過execute(Runnable)方法欲添加到線程池時:
1、 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
2、 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
3、如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
4、 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
5、 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。

 

 

 

 

5. 線程類為 java.util.concurrent.ThreadPoolExecutor:

線程池類為 java.util.concurrent.ThreadPoolExecutor,常用構造方法為:

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler)

corePoolSize: 線程池維護線程的最少數量
maximumPoolSize:線程池維護線程的最大數量
keepAliveTime: 線程池維護線程所允許的空閑時間
unit: 線程池維護線程所允許的空閑時間的單位
workQueue: 線程池所使用的緩沖隊列
handler: 線程池對拒絕任務的處理策略

一個任務通過 execute(Runnable)方法被添加到線程池,任務就是一個 Runnable類型的對象,任務的執行方法就是 Runnable類型對象的run()方法。

當一個任務通過execute(Runnable)方法欲添加到線程池時:

如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。

如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。

如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。

如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。

也就是:處理任務的優先級為:
核心線程corePoolSize、任務隊列workQueue、最大線程maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。

當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。

unit可選的參數為java.util.concurrent.TimeUnit中的幾個靜態屬性:
NANOSECONDS、MICROSECONDS、MILLISECONDS、SECONDS。

workQueue我常用的是:java.util.concurrent.ArrayBlockingQueue

handler有四個選擇:
ThreadPoolExecutor.AbortPolicy()
拋出java.util.concurrent.RejectedExecutionException異常
ThreadPoolExecutor.CallerRunsPolicy()
重試添加當前的任務,他會自動重復調用execute()方法
ThreadPoolExecutor.DiscardOldestPolicy()
拋棄舊的任務
ThreadPoolExecutor.DiscardPolicy()
拋棄當前的任務

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM