1 線程池簡介
1.1 為什么使用線程池
- 降低系統資源消耗,通過重用已存在的線程,降低線程創建和銷毀造成的消耗;
- 提高系統響應速度,當有任務到達時,通過復用已存在的線程,無需等待新線程的創建便能立即執行;
- 方便線程並發數的管控,因為線程若是無限制的創建,可能會導致內存占用過多而產生
OOM
,並且會造成cpu
過度切換(cpu
切換線程是有時間成本的(需要保持當前執行線程的現場,並恢復要執行線程的現場) - 提供更強大的功能,延時定時線程池
1.2 線程池為什么需要使用隊列
因為線程若是無限制的創建,可能會導致內存
占用過多而產生OOM
,並且會造成cpu
過度切換。
創建線程池的消耗較高或者線程池創建線程需要獲取mainlock
這個全局鎖,影響並發效率,阻塞隊列可以很好的緩沖
1.3 線程池為什么要使用阻塞隊列而不使用非阻塞隊列
阻塞隊列
可以保證任務隊列中沒有任務時阻塞獲取任務的線程,使得線程進入wait
狀態,釋放cpu
資源,當隊列中有任務時才喚醒對應線程從隊列中取出消息進行執行。
使得在線程不至於一直占用cpu
資源。(線程執行完任務后通過循環再次從任務隊列中取出任務進行執行,代碼片段如:while (task != null || (task = getTask()) != null) {}
)。
不用阻塞隊列也是可以的,不過實現起來比較麻煩而已,有好用的為啥不用呢
1.4 如何配置線程池
-
CPU
密集型任務
盡量使用較小的線程池,一般為CPU
核心數+1
。 因為CPU
密集型任務使得CPU
使用率很高,若開過多的線程數,會造成CPU
過度切換 -
IO
密集型任務
可以使用稍大的線程池,一般為2*CPU
核心數。IO
密集型任務CPU
使用率並不高,因此可以讓CPU
在等待IO
的時候有其他線程去處理別的任務,充分利用CPU
時間 -
混合型任務
可以將任務分成IO
密集型和CPU
密集型任務,然后分別用不同的線程池去處理。 只要分完之后兩個任務的執行時間相差不大,那么就會比串行執行來的高效
因為如果划分之后兩個任務執行時間有數據級的差距,那么拆分沒有意義。
因為先執行完的任務就要等后執行完的任務,最終的時間仍然取決於后執行完的任務,而且還要加上任務拆分與合並的開銷,得不償失
1.5 execute()和submit()方法
execute()
,執行一個任務,沒有返回值submit()
,提交一個線程任務,有返回值
submit(Callable<T> task)
能獲取到它的返回值,通過future.get()
獲取(阻塞
直到任務執行完)。一般使用FutureTask+Callable
配合使用
submit(Runnable task, T result)
能通過傳入的載體result
間接獲得線程的返回值。
submit(Runnable task)
則是沒有返回值的,就算獲取它的返回值也是null
Future.get()
方法會使取結果的線程進入阻塞狀態
,直到線程執行完成之后,喚醒取結果的線程,然后返回結果
1.6 Spring線程池
Spring
通過任務執行器(TaskExecutor
)來實現多線程和並發編程,使用ThreadPoolTaskExecutor
實現一個基於線程池的TaskExecutor
,
還得需要使用@EnableAsync
開啟異步,並通過在需要的異步方法那里使用注解@Async
聲明是一個異步任務
Spring
已經實現的異常線程池:
SimpleAsyncTaskExecutor
:不是真的線程池,這個類不重用線程,每次調用都會創建一個新的線程。SyncTaskExecutor
:這個類沒有實現異步調用,只是一個同步操作。只適用於不需要多線程的地方ConcurrentTaskExecutor
:Executor
的適配類,不推薦使用。如果ThreadPoolTaskExecutor
不滿足要求時,才用考慮使用這個類SimpleThreadPoolTaskExecutor
:是Quartz的SimpleThreadPool
的類。線程池同時被quartz和非quartz使用,才需要使用此類ThreadPoolTaskExecutor
:最常使用,推薦。 其實質是對java.util.concurrent.ThreadPoolExecutor
的包裝
1.7 @Async調用中的事務處理機制
在@Async
標注的方法,同時也使用@Transactional
進行標注;在其調用數據庫操作之時,將無法產生事務管理的控制,原因就在於其是基於異步處理
的操作。
那該如何給這些操作添加事務管理呢?
可以將需要事務管理操作的方法放置到異步方法內部,在內部被調用的方法上添加@Transactional
示例:
方法A
, 使用了@Async/@Transactional
來標注,但是無法產生事務控制的目的。方法B
, 使用了@Async
來標注,B
中調用了C、D
,C/D
分別使用@Transactional
做了標注,則可實現事務控制的目的
2 示例
2.1 線程池配置類
package cn.jzh.thread;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
@Configuration
@ComponentScan("cn.jzh.thread")
@EnableAsync //開啟異步操作
public class TaskExecutorConfig implements AsyncConfigurer {
/**
* 通過getAsyncExecutor方法配置ThreadPoolTaskExecutor,獲得一個基於線程池TaskExecutor
*
* @return
*/
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
pool.setCorePoolSize(5);//核心線程數
pool.setMaxPoolSize(10);//最大線程數
pool.setQueueCapacity(25);//線程隊列
pool.initialize();//線程初始化
return pool;
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return null;
}
}
配置類中方法說明:
Spring
中的ThreadPoolExecutor
是借助JDK
並發包中的java.util.concurrent.ThreadPoolExecutor
來實現的。其中一些值的含義如下:
int corePoolSize:
線程池維護線程的最小數量int maximumPoolSize:
線程池維護線程的最大數量,線程池中允許的最大線程數,線程池中的當前線程數目不會超過該值。如果隊列中任務已滿,並且當前線程個數小於maximumPoolSize
,那么會創建新的線程來執行任務。long keepAliveTime:
空閑線程的存活時間TimeUnit
unit:
時間單位,現由納秒,微秒,毫秒,秒BlockingQueue workQueue:
持有等待執行的任務隊列RejectedExecutionHandler
handler 線程池的拒絕策略,是指當任務添加到線程池中被拒絕,而采取的處理措施。
當任務添加到線程池中之所以被拒絕,可能是由於:第一,線程池異常關閉。第二,任務數量超過線程池的最大限制。
Reject
策略預定義有四種:
ThreadPoolExecutor.AbortPolicy
策略,是默認的策略,處理程序遭到拒絕將拋出運行時RejectedExecutionException
ThreadPoolExecutor.CallerRunsPolicy
策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄.ThreadPoolExecutor.DiscardPolicy
策略,不能執行的任務將被丟棄.ThreadPoolExecutor.DiscardOldestPolicy
策略,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如
果再次失敗,則重復此過程)
2.2 異步方法
@Async
注解可以用在方法上,表示該方法是個異步方法,也可以用在類上,那么表示此類的所有方法都是異步方法
異步方法會自動注入使用ThreadPoolTaskExecutor
作為TaskExecutor
package cn.jzh.thread;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.Future;
@Service
public class AsyncTaskService {
/**
*
* @param i
*/
@Async
public void executeAsync(Integer i) throws Exception{
System.out.println("線程ID:" + Thread.currentThread().getId() + "線程名字:" +Thread.currentThread().getName()+"執行異步任務:" + i);
}
@Async
public Future<String> executeAsyncPlus(Integer i) throws Exception {
System.out.println("線程ID:" + Thread.currentThread().getId() +"線程名字:" +Thread.currentThread().getName()+ "執行異步有返回的任務:" + i);
return new AsyncResult<>("success:"+i);
}
}
2.3 啟動測試
package cn.jzh.thread;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import java.util.concurrent.Future;
public class MainApp {
public static void main(String[] args) throws Exception{
System.out.println("主線程id:" + Thread.currentThread().getId() + "開始執行調用任務...");
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
AsyncTaskService service = context.getBean(AsyncTaskService.class);
for (int i = 0;i<10;i++){
service.executeAsync(i);
Future<String> result = service.executeAsyncPlus(i);
System.out.println("異步程序執行結束,獲取子線程返回內容(會阻塞當前main線程)" + result.get());
}
context.close();
System.out.println("主線程id:" + Thread.currentThread().getId() + "程序結束!!");
}
}
注意:
- 是否影響主線程
如果main
主線程不去獲取子線程的結果(Future.get()
),那么主線程完全可以不阻塞。那么,此時,主線程和子線程完全異步。此功能,可以做成類似MQ
消息中間件之類的,消息異步進行發送 - 判斷是否執行完畢
當返回的數據類型為Future
類型,其為一個接口。具體的結果類型為AsyncResult
,這個是需要注意的地方。
調用返回結果的異步方法,判斷是否執行完畢時需要使用future.isDone()
來判斷是否執行完畢
public void testAsyncAnnotationForMethodsWithReturnType()
throws InterruptedException, ExecutionException {
System.out.println("Invoking an asynchronous method. " + Thread.currentThread().getName());
Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();
while (true) { ///這里使用了循環判斷,等待獲取結果信息
if (future.isDone()) { //判斷是否執行完畢
System.out.println("Result from asynchronous process - " + future.get());
break;
}
System.out.println("Continue doing something else. ");
Thread.sleep(1000);
}
}
這些獲取異步方法的結果信息,是通過不停的檢查Future
的狀態來獲取當前的異步方法是否執行完畢來實現的