一、spring異步線程池類圖
二、簡單介紹
2.1. TaskExecutor---Spring異步線程池的接口類,其實質是java.util.concurrent.Executor
以下是官方已經實現的全部7個TaskExecuter。Spring宣稱對於任何場景,這些TaskExecuter完全夠用了:
名字 | 特點 |
---|---|
SimpleAsyncTaskExecutor | 每次請求新開線程,沒有最大線程數設置.不是真的線程池,這個類不重用線程,每次調用都會創建一個新的線程。 --【1】 |
SyncTaskExecutor | 不是異步的線程.同步可以用SyncTaskExecutor,但這個可以說不算一個線程池,因為還在原線程執行。這個類沒有實現異步調用,只是一個同步操作。 |
ConcurrentTaskExecutor | Executor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時,才用考慮使用這個類。 |
SimpleThreadPoolTaskExecutor | 監聽Spring’s lifecycle callbacks,並且可以和Quartz的Component兼容.是Quartz的SimpleThreadPool的類。線程池同時被quartz和非quartz使用,才需要使用此類。 |
ThreadPoolTaskExecutor | 最常用。要求jdk版本大於等於5。可以在程序而不是xml里修改線程池的配置.其實質是對java.util.concurrent.ThreadPoolExecutor的包裝。 |
TimerTaskExecutor | |
WorkManagerTaskExecutor |
三、Spring中的同步執行器
1. SyncTaskExecutor:同步可以用SyncTaskExecutor,但這個可以說不算一個線程池,因為還在原線程執行。這個類沒有實現異步調用,只是一個同步操作。
2.也可以用ThreadPoolTaskExecutor結合FutureTask做到同步。
3.2. SyncTaskExecutor與ThreadPoolTaskExecutor區別
前者是同步執行器,執行任務同步,后者是線程池,執行任務異步。
四、Spring中的異步執行器
異步執行用戶任務的SimpleAsyncTaskExecutor。每次執行客戶提交給它的任務時,它會啟動新的線程,並允許開發者控制並發線程的上限(concurrencyLimit),從而起到一定的資源節流作用。默認時,concurrencyLimit取值為-1,即不啟用資源節流。
SimpleAsyncTaskExecutor
<bean id="simpleAsyncTaskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor"> <property name="daemon" value="true"/> <property name="concurrencyLimit" value="2"/> <property name="threadNamePrefix" value="simpleAsyncTaskExecutor"/> </bean>
主要實現:1.支持限流處理 2.異步注冊線程返回結果
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable { //限流主要實現 private final SimpleAsyncTaskExecutor.ConcurrencyThrottleAdapter concurrencyThrottle = new SimpleAsyncTaskExecutor.ConcurrencyThrottleAdapter(); private ThreadFactory threadFactory; //設置最大的線程數量 public void setConcurrencyLimit(int concurrencyLimit) { this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit); } //是否開啟了限流 限流數量大於0? public final boolean isThrottleActive() { return this.concurrencyThrottle.isThrottleActive(); } //1.是否開啟限流 否則不開啟限流處理 //2.執行開始之前檢測是否可以滿足要求 當前數量++ //3.開啟限流將執行的Runable進行封裝,執行完成調用final方法 當前數量-- public void execute(Runnable task, long startTimeout) { Assert.notNull(task, "Runnable must not be null"); if(this.isThrottleActive() && startTimeout > 0L) { this.concurrencyThrottle.beforeAccess(); this.doExecute(new SimpleAsyncTaskExecutor.ConcurrencyThrottlingRunnable(task)); } else { this.doExecute(task); } } //異步提交有返回值 public Future<?> submit(Runnable task) { FutureTask future = new FutureTask(task, (Object)null); this.execute(future, 9223372036854775807L); return future; } public <T> Future<T> submit(Callable<T> task) { FutureTask future = new FutureTask(task); this.execute(future, 9223372036854775807L); return future; } public ListenableFuture<?> submitListenable(Runnable task) { ListenableFutureTask future = new ListenableFutureTask(task, (Object)null); this.execute(future, 9223372036854775807L); return future; } public <T> ListenableFuture<T> submitListenable(Callable<T> task) { ListenableFutureTask future = new ListenableFutureTask(task); this.execute(future, 9223372036854775807L); return future; } //擁有工廠?沒有的話調用父類可以設置各種參數的創建線程 protected void doExecute(Runnable task) { Thread thread = this.threadFactory != null?this.threadFactory.newThread(task):this.createThread(task); thread.start(); } //父類的方法,方便配置線程,方便xml設置線程參數CustomizableThreadCreator public Thread createThread(Runnable runnable) { Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName()); thread.setPriority(getThreadPriority()); thread.setDaemon(isDaemon()); return thread; } }
4.2.限流處理其實就是在執行任務之前和之后對於當前線程數量進行統計
內部類的實現
//下面只是對於操作進行簡單的封裝,最真的實現還是抽象的ConcurrencyThrottleSupport private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport { private ConcurrencyThrottleAdapter() { } protected void beforeAccess() { super.beforeAccess(); } protected void afterAccess() { super.afterAccess(); } }
更多關於限流功能源碼見:《spring控制並發數的工具類ConcurrencyThrottleSupport和ConcurrencyThrottleInterceptor》
//這里是對於Runable對象執行在次封裝,在執行完畢后處理限流操作 private class ConcurrencyThrottlingRunnable implements Runnable { private final Runnable target; public ConcurrencyThrottlingRunnable(Runnable target) { this.target = target; } public void run() { try { this.target.run(); } finally { SimpleAsyncTaskExecutor.this.concurrencyThrottle.afterAccess(); } } }
簡單的通過synchronized和wati and notify達到控制線程數量的效果,從而實現限流的策略。
4.3.SimpleAsyncTaskExecutor中,執行任務時,每次都會新建一個線程,不使用線程池
看SimpleAsyncTaskExecutor.java的關鍵源代碼:
protected void doExecute(Runnable task) { Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task)); thread.start(); }
createThread()在父類中CustomizableThreadCreator.java中
public Thread createThread(Runnable runnable) { Thread thread = new Thread(getThreadGroup(), runnable, nextThreadName()); thread.setPriority(getThreadPriority()); thread.setDaemon(isDaemon()); return thread; }
4.4.步監聽獲取線程的結果,其實這個不算這里面的實現
ListenableFutureTask 其實主要是依靠FutureTask這個JDK的封裝,覆蓋了原始的run方法,在run中封裝可以獲取到線程的返回值。
ListenableFutureTask 在次封裝,由於FutureTask執行完成之后會調用done()空方法,ListenableFutureTask覆蓋done方法可以獲取到執行的結果,然后在調用前期注冊的錯誤處理或者成功處理的方法,即可到達異步處理的效果。
類似於回調的效果
public interface SuccessCallback<T> { /** * Called when the {@link ListenableFuture} successfully completes. * @param result the result */ void onSuccess(T result); } public interface FailureCallback { /** * Called when the {@link ListenableFuture} fails to complete. * @param ex the exception that triggered the failure */ void onFailure(Throwable ex); } public interface ListenableFuture<T> extends Future<T> { //成功和失敗的集合 void addCallback(ListenableFutureCallback<? super T> var1); void addCallback(SuccessCallback<? super T> var1, FailureCallback var2); }
實現類(ListenableFutureTask)可有返回值,可被監聽的,注冊監聽,這里可以注冊監聽者放在一個單獨的類中去處理,很好的分配工作ListenableFutureCallbackRegistry
public class ListenableFutureTask<T> extends FutureTask<T> implements ListenableFuture<T> { private final ListenableFutureCallbackRegistry<T> callbacks = new ListenableFutureCallbackRegistry(); public ListenableFutureTask(Callable<T> callable) { super(callable); } public ListenableFutureTask(Runnable runnable, T result) { super(runnable, result); } public void addCallback(ListenableFutureCallback<? super T> callback) { this.callbacks.addCallback(callback); } public void addCallback(SuccessCallback<? super T> successCallback, FailureCallback failureCallback) { this.callbacks.addSuccessCallback(successCallback); this.callbacks.addFailureCallback(failureCallback); } //FutureTask執行完成后的回調,調用監聽接口的實現類的方法 protected final void done() { Object cause; try { Object ex = this.get(); //回調實現類的方法 this.callbacks.success(ex); return; } catch (InterruptedException var3) { Thread.currentThread().interrupt(); return; } catch (ExecutionException var4) { cause = var4.getCause(); if(cause == null) { cause = var4; } } catch (Throwable var5) { cause = var5; } this.callbacks.failure((Throwable)cause); } }
注冊監聽,還維護了一個狀態量的信息,很標准的寫法,維護隊列的添加和成功消息和失敗消息的處理
public class ListenableFutureCallbackRegistry<T> { private final Queue<SuccessCallback<? super T>> successCallbacks = new LinkedList<SuccessCallback<? super T>>(); private final Queue<FailureCallback> failureCallbacks = new LinkedList<FailureCallback>(); private State state = State.NEW; private Object result = null; private final Object mutex = new Object(); /** * Add the given callback to this registry. * @param callback the callback to add */ public void addCallback(ListenableFutureCallback<? super T> callback) { Assert.notNull(callback, "'callback' must not be null"); synchronized (this.mutex) { switch (this.state) { case NEW: this.successCallbacks.add(callback); this.failureCallbacks.add(callback); break; case SUCCESS: callback.onSuccess((T) this.result); break; case FAILURE: callback.onFailure((Throwable) this.result); break; } } } /** * Add the given success callback to this registry. * @param callback the success callback to add * @since 4.1 */ public void addSuccessCallback(SuccessCallback<? super T> callback) { Assert.notNull(callback, "'callback' must not be null"); synchronized (this.mutex) { switch (this.state) { case NEW: this.successCallbacks.add(callback); break; case SUCCESS: callback.onSuccess((T) this.result); break; } } } /** * Add the given failure callback to this registry. * @param callback the failure callback to add * @since 4.1 */ public void addFailureCallback(FailureCallback callback) { Assert.notNull(callback, "'callback' must not be null"); synchronized (this.mutex) { switch (this.state) { case NEW: this.failureCallbacks.add(callback); break; case FAILURE: callback.onFailure((Throwable) this.result); break; } } } /** * Trigger a {@link ListenableFutureCallback#onSuccess(Object)} call on all * added callbacks with the given result. * @param result the result to trigger the callbacks with */ public void success(T result) { synchronized (this.mutex) { this.state = State.SUCCESS; this.result = result; while (!this.successCallbacks.isEmpty()) { this.successCallbacks.poll().onSuccess(result); } } } public void failure(Throwable ex) { synchronized (this.mutex) { this.state = State.FAILURE; this.result = ex; while (!this.failureCallbacks.isEmpty()) { this.failureCallbacks.poll().onFailure(ex); } } } private enum State {NEW, SUCCESS, FAILURE} }
五、使用ThreadPoolTaskExecutor
5.1、(傳統方式)
比起從線程池取一個線程再執行, 你僅僅需要把你的Runnable類加入到隊列中,然后TaskExecutor用它內置的規則決定何時開始取一個線程並執行該Runnable類
先在xml中添加bean的配置:
<bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <property name="corePoolSize" value="5" /> <property name="maxPoolSize" value="10" /> <property name="queueCapacity" value="25" /> </bean> <bean id="taskExecutorExample" class="TaskExecutorExample"> <constructor-arg ref="taskExecutor" /> </bean>
配置解釋
當一個任務通過execute(Runnable)方法欲添加到線程池時:
1、 如果此時線程池中的數量小於corePoolSize,即使線程池中的線程都處於空閑狀態,也要創建新的線程來處理被添加的任務。
2、 如果此時線程池中的數量等於 corePoolSize,但是緩沖隊列 workQueue未滿,那么任務被放入緩沖隊列。
3、如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量小於maximumPoolSize,建新的線程來處理被添加的任務。
4、 如果此時線程池中的數量大於corePoolSize,緩沖隊列workQueue滿,並且線程池中的數量等於maximumPoolSize,那么通過 handler所指定的策略來處理此任務。也就是:處理任務的優先級為:核心線程corePoolSize、任務隊列workQueue、最大線程 maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
5、 當線程池中的線程數量大於 corePoolSize時,如果某線程空閑時間超過keepAliveTime,線程將被終止。這樣,線程池可以動態的調整池中的線程數。
具體調用:
import org.springframework.core.task.TaskExecutor; public class TaskExecutorExample { private class MessagePrinterTask implements Runnable { private String message; public MessagePrinterTask(String message) { this.message = message; } public void run() { System.out.println(message); } } private TaskExecutor taskExecutor; public TaskExecutorExample(TaskExecutor taskExecutor) { this.taskExecutor = taskExecutor; } public void printMessages() { for(int i = 0; i < 25; i++) { taskExecutor.execute(new MessagePrinterTask("Message" + i)); } } }
5.2.推薦 - 使用ThreadPoolTaskExecutor(注解方式)
首先,為了以注解方式使用異步功能,我們需要在Spring的xml配置中定義相關的bean:
1.必須在*-servlet.xml而不是applicationContext.xml中定義@Async相關配置
引自http://stackoverflow.com/questions/6610563/spring-async-not-working
2 不使用線程池版本
引自http://www.springframework.org/schema/task/spring-task-3.2.xsd
所以,如果我們僅僅添加<task:annotation-driven/>
,也可以使用@Async標簽。然而,此時使用的是SimpleAsyncTaskExecutor。如“官方文檔27章:Task Execution”中所述,SimpleAsyncTaskExecutor不會使用線程池,僅僅是為每一個請求新開一個線程。這樣在大並發的業務場景下,發生OutOfMemory是不足為奇的。
<?xml version="1.0" encoding="UTF-8"?> <!--Spring框架的xml標簽定義文檔, 可訪問http://www.springframework.org/schema/task/查看最新task組件的xml標簽文檔--> <beans xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd"> <!--掃描項目實例化@Component,@Service,@Controller修飾的類--> <context:component-scan base-package="com.your_app" /> <!--create a SimpleAsyncTaskExecutor instance--> <task:annotation-driven/> </beans>
3 推薦 - 使用線程池版本
<?xml version="1.0" encoding="UTF-8"?> <!--Spring框架的xml標簽定義文檔, 可訪問http://www.springframework.org/schema/task/查看最新task組件的xml標簽文檔--> <beans xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.2.xsd"> <!--掃描項目實例化@Component,@Service,@Controller修飾的類--> <context:component-scan base-package="com.your_app" /> <!-- 在代碼中@Async不加參數就會使用task:annotation-driven標簽定義的executor--> <task:annotation-driven executor="myExecutor"/> <!-- 在代碼中@Async("myExecutor")可以顯式指定executor為"myExecutor"--> <task:executor id="myExecutor" pool-size="5-25" queue-capacity="100" rejection-policy="CALLER_RUNS"/> </beans>
其中,注意到屬性pool-size的值”5-25”是一個范圍,這對應的是線程池的min和max容量,它們的意義請參考本文上一節的“配置說明”里的第3、4點。如果只有一個值,如pool-size=n
, 意味着minSize==maxSize==n
而關於rejection-policy,官方文檔里說:
總結如下:
池滿時的拒絕策略 | 效果 |
---|---|
AbortPolicy(默認) | 拋異常 |
DiscardPolicy or DiscardOldestPolicy | 放棄該線程 |
CallerRunsPolicy | 通知該線程的創建者,讓其不要提交新的線程 |
轉自:https://blog.csdn.net/caib1109/article/details/51623089