目錄
@Async實現異步調用
pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
啟動類
@EnableAsync
@SpringBootApplication
public class LearnutilsApplication {
public static void main(String[] args) {
SpringApplication.run(LearnutilsApplication.class, args);
}
/**
* 核心線程數10:線程池創建時初始化的線程數
* 最大線程數20:線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
* 緩沖隊列200:用來緩沖執行任務的隊列
* 允許線程的空閑時間60秒:超過了核心線程數之外的線程,在空閑時間到達之后會被銷毀
* 線程池名的前綴:設置好了之后可以方便我們定位處理任務所在的線程池
* 線程池對拒絕任務的處理策略:此處采用了CallerRunsPolicy策略,當線程池沒有處理能力的時候,該策略會直接在execute方法的調用線程中運行被拒絕的任務;如果執行程序已被關閉,則會丟棄該任務
* 設置線程池關閉的時候等待所有任務都完成再繼續銷毀其他的Bean
* 設置線程池中任務的等待時間,如果超過這個時候還沒有銷毀就強制銷毀,以確保應用最后能夠被關閉,而不是阻塞住
*/
@EnableAsync
@Configuration
class TaskPoolConfig{
@Bean("taskExecutor")
public Executor taskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(10);
executor.setMaxPoolSize(20);
executor.setQueueCapacity(200);
executor.setKeepAliveSeconds(60);
executor.setThreadNamePrefix("TaskExecutor-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
return executor;
}
}
}
定義controller
@RequestMapping(value = "/AsyncController")
@RestController
public class AsyncController {
@Autowired
private AsyncService asyncService;
@Autowired
private AsyncService2 asyncService2;
@Autowired
private AsyncService3 asyncService3;
@GetMapping(value = "/sendSms")
public String sendSms() throws Exception{
Future<String> sms = asyncService.sendSms();
Future<String> sms2 = asyncService2.sendSms();
Future<String> sms3 = asyncService3.sendSms();
int i = 0;
for (;;) {
//如果都執行完就跳出循環,isDone方法,如果此線程執行完,true
if (sms.isDone() && sms2.isDone() && sms3.isDone()) {
break;
}
}
//get是獲取結果集
return sms.get()+sms2.get()+sms3.get();
}
}
定義接口
public interface AsyncService {
Future<String> sendSms();
}
實現類
@Service
public class AsyncServiceImpl implements AsyncService {
//Future<String> 返回結果 AsyncResult<String>
@Async("taskExecutor")
@Override
public Future<String> sendSms() {
return new AsyncResult<>("000000");
}
}
將isDone換程CountDownLatch來判斷線程是否執行完實例化CountDownLatch並且制定線程個數,線程個數就是從本地異步調用的方法個輸,並且傳入線程任務中,每個線程執行完畢就調用countDown()方法。最后在調用await()方法。這樣在線程計數為零之前,線程就會一直等待。
AsyncResult用來封裝結果集,否則結果集無法返回
@GetMapping(value = "/sendSms2")
public String sendSms2() throws Exception{
CountDownLatch downLatch = new CountDownLatch(3);
Future<String> s = asyncService4.sendSms(downLatch);
Future<String> s1 = asyncService5.sendSms(downLatch);
Future<String> s2 = asyncService6.sendSms(downLatch);
downLatch.await();
return s.get()+s1.get()+s2.get();
}
將CountDownLatch傳給方法
public interface AsyncService4 {
Future<String> sendSms(CountDownLatch downLatch);
}
方法
@Service
public class AsyncService4Impl implements AsyncService4 {
@Async("taskExecutor")
@Override
public Future<String> sendSms(CountDownLatch downLatch) {
downLatch.countDown();
return new AsyncResult<>("11111");
}
}
TaskExecutor的使用
注冊TaskExecutor
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.ThreadPoolExecutor;
/**
* @author yanjun
* @date 2019/8/1 16:04
**/
@Configuration
public class MainConfiguration {
@Bean
public TaskExecutor getTaskExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 設置核心線程數
executor.setCorePoolSize(5);
// 設置最大線程數
executor.setMaxPoolSize(10);
// 設置隊列容量
executor.setQueueCapacity(20);
// 設置線程活躍時間(秒)
executor.setKeepAliveSeconds(60);
// 設置默認線程名稱
executor.setThreadNamePrefix("post-lending-");
// 設置拒絕策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
// 等待所有任務結束后再關閉線程池
executor.setWaitForTasksToCompleteOnShutdown(true);
return executor;
}
}
使用TaskExecutor
@Autowired
private TaskExecutor taskExecutor;
public ResultVO findHandlingRecordByAssociationId(Integer associationId) throws InterruptedException{
Map<String, Object> map = new HashMap<>(2);
//線程計數器(等待所有線程執行完統一返回)
CountDownLatch countDownLatch = new CountDownLatch(10);
taskExecutor.execute(() -> {
try {
//service調用
map.put("HandlingRecord", legalLitigationService.findHandlingRecordByAssociationId(associationId));
}finally {
countDownLatch.countDown();
}
});
taskExecutor.execute(() -> {
try {
map.put("CaseBasic", legalLitigationService.findCaseDetailsById(associationId));
}finally {
countDownLatch.countDown();
}
});
countDownLatch.await();
return ResultVO.putSuccess(map);
}