SpringBoot中的異步操作與線程池
線程池類型
Java通過 java.util.concurrent.Executors 的靜態方法提供五種線程池
- newCachedThreadPool 創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
- newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
- newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
- newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
- newWorkStealingPool 這是java8新增的線程池類型。創建一個含有足夠多線程的線程池,來維持相應的並行級別,它會通過工作竊取的方式,使得多核的CPU不會閑置,總會有活着的線程讓CPU去運行。
五種線程池的底層實現
- ThreadPoolExecutor 是CachedThreadPool、FixedThreadPool、SingleThreadExecutor、ScheduledThreadPool 這四種類型線程池的底層實現
- ForkJoinPool (java7已有) 是WorkStealingPool線程池的底層實現
使用線程池的優點
- 重用存在的線程,減少對象創建、消亡的開銷,性能佳。
- 可有效控制最大並發線程數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
- 提供定時執行、定期執行、單線程、並發數控制等功能。
如何在SpringBoot中優雅的使用線程池
注冊線程池
在config目錄下創建 AsyncConfig 配置類,在配置類中定義線程池
package com.example.async_demo.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfig implements SchedulingConfigurer {
//第一種線程池定義方式,可代替CachedThreadPool、FixedThreadPool、SingleThreadExecutor這三種
// Spring線程池
@Lazy //線程池懶加載
@Bean(name="threadPoolTaskExecutor",destroyMethod="shutdown") //name為線程池名稱,destroyMethod="shutdown"在spring bean回收后釋放資源
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
//封裝的是原生的ThreadPoolExecutor類型線程池
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//核心線程數(獲取硬件):線程池創建時候初始化的線程數
int corePoolSize = Runtime.getRuntime().availableProcessors();
System.out.println(corePoolSize);
executor.setCorePoolSize(corePoolSize);
//最大線程數+5:線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
executor.setMaxPoolSize(corePoolSize+5);
//緩沖隊列500:用來緩沖執行任務的隊列
executor.setQueueCapacity(500);
//允許線程的空閑時間60秒:當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
executor.setKeepAliveSeconds(60);
//線程池名的前綴:設置好了之后可以方便我們定位處理任務所在的線程池
executor.setThreadNamePrefix("MyAsync-");
executor.initialize();
return executor;
}
//第二種線程池定義方式,使用的是WorkStealingPool
//java8 搶占式線程池
@Lazy
@Bean(name="workStealingPool",destroyMethod="shutdown")
public ExecutorService workStealingPool(){
ExecutorService executorService = Executors.newWorkStealingPool();
return executorService;
}
//第三種線程池定義方式,為周期任務線程池
//周期任務線程池
@Lazy
@Bean(name="scheduledThreadPool",destroyMethod="shutdown")
public ExecutorService scheduledThreadPool() {
return Executors.newScheduledThreadPool(3);
}
@Override
public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
scheduledTaskRegistrar.setScheduler(scheduledThreadPool());
}
}
我在上述案例代碼中定義了三種類型的線程池
- 第一種是ThreadPoolTaskExecutor線程池,他是Spring中的 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 線程池,
底層是對 java.util.concurrent.ThreadPoolExecutor 的封裝,綜合了CachedThreadPool、FixedThreadPool、SingleThreadExecutor這三種線程池的優點; - 第二種是java8新增的 workStealingPool 線程池。第一種和第二種使用時可以在配置類上使用@EnableAsync注解,這樣就能優雅的使用@Async注解方法來實現線程run邏輯了;
- 第三種是ScheduledThreadPool線程池,不過在Spring中使用需要配置類實現SchedulingConfigurer接口,重寫configureTasks方法。在配置類上使用
@EnableScheduling注解,就可以優雅的使用@Scheduled注解方法來實現周期邏輯了
使用線程池
對第一種和第二種線程池在service中實現線程run的邏輯
package com.example.async_demo.service;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
@Service
public class AsyncService {
//使用名為threadPoolTaskExecutor的線程池,返回Future
@Async("threadPoolTaskExecutor")
public Future<Double> service1(){
double result = getRand(3000);
return AsyncResult.forValue(result);
}
//使用名為threadPoolTaskExecutor的線程池,返回CompletableFuture
@Async("threadPoolTaskExecutor")
public CompletableFuture<Double> service2(){
double result = getRand(3000);
return CompletableFuture.completedFuture(result);
}
//使用名為workStealingPool的線程池,返回CompletableFuture
@Async("workStealingPool")
public CompletableFuture<Double> service3(){
double result = getRand(3000);
return CompletableFuture.completedFuture(result);
}
private double getRand(long sleep){
System.out.println(Thread.currentThread().getId()+"-start");
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
e.printStackTrace();
}
double result = Math.random();//方法返回的結果
return result;
}
}
測試第一種和第二種線程池
@SpringBootTest
class AsyncDemoApplicationTests {
@Autowired
private AsyncService asyncService;
@Test
void test1() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
Future<Double> result1 = asyncService.service1();
Future<Double> result2 = asyncService.service1();
Future<Double> result3 = asyncService.service1();
//讓主線程等待子線程結束之后才能繼續運行
while (!(result1.isDone()&&result2.isDone()&&result3.isDone())){
Thread.sleep(500);
}
long end = System.currentTimeMillis();
System.out.println(end-start+"ms");
System.out.println(result1.get());
System.out.println(result2.get());
System.out.println(result3.get());
}
@Test
void test2() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
CompletableFuture<Double> result1 = asyncService.service2();
CompletableFuture<Double> result2 = asyncService.service2();
CompletableFuture<Double> result3 = asyncService.service2();
//join() 的作用:讓主線程等待子線程結束之后才能繼續運行
CompletableFuture.allOf(result1,result2,result3).join();
long end = System.currentTimeMillis();
System.out.println(end-start+"ms");
System.out.println(result1.get());
System.out.println(result2.get());
System.out.println(result3.get());
}
@Test
void test3() throws ExecutionException, InterruptedException {
long start = System.currentTimeMillis();
CompletableFuture<Double> result1 = asyncService.service3();
CompletableFuture<Double> result2 = asyncService.service3();
CompletableFuture<Double> result3 = asyncService.service3();
//join() 的作用:讓主線程等待子線程結束之后才能繼續運行
CompletableFuture.allOf(result1,result2,result3).join();
long end = System.currentTimeMillis();
System.out.println(end-start+"ms");
System.out.println(result1.get());
System.out.println(result2.get());
System.out.println(result3.get());
}
}
test1測試結果
test2測試結果
test3測試結果
通過測試發現Future返回類型不適合主線等待多個子線程全部完成的操作,
因為需要用到while循環去阻塞主線程,而CompletableFuture可以通過CompletableFuture.allOf(cf1,cf2,cf3).join()
去完成這個操作,所以推薦使用CompletableFuture作為返回類型
注意:@Async注解的方法不能在本類中被調用,只能在其他類中調用,如Controller類
對第三種線程池在service中實現線程的邏輯
package com.example.async_demo.service;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@Service
public class AsyncService {
//cron表達式 每5秒執行一次
//@Scheduled(cron = "*/5 * * * * ?")
@Scheduled(cron = "${cron.sec5}") //表達式寫在application.yml文件中,則以這種方式取出。
public void service4(){
System.out.println("5s-"+Thread.currentThread().getId()+":"+System.currentTimeMillis()/1000);
}
//cron表達式 每3秒執行一次
@Scheduled(cron = "${cron.sec3}")
public void service5(){
System.out.println("3s-"+Thread.currentThread().getId()+":"+System.currentTimeMillis()/1000);
}
}
application.yml 文件
cron:
sec5: '*/5 * * * * ?'
sec3: '*/3 * * * * ?'
周期任務測試結果(啟動Application類)
通過測試結果可發現兩個周期任務使用了三個線程,
線程id分別是20、21、25。兩個周期任務分別以3s和5s執行一次,
但不固定在某個線程中執行,而是哪個線程空閑則使用哪個線程
注意:若不為周期任務配置線程池,只使用@EnableScheduling和@Scheduled注解的話,
則所有周期任務共用一個子線程,若出現下一個周期開始上一個周期任務還沒結束的情況,
則線程阻塞,直到前一個任務完成
CRON表達式
- cron表達式是定義任務周期的一種表達式
- 這里不多介紹,可以參考這篇博客