SpringBoot 異步與多線程


1. @Async可以開啟異步,但是要在 main 中EnableAsync

2.@Async既可以注解在方法上,也可以注解到類上

3.使用@Async時,請注意一定要對應bean name,否則或調用系統默認的SampleTaskExecutor,容易造成OOM

4.本人使用的SpringBoot 2.3.4 ,默認值  maxPoolSize = 2147483647,queueCapacity = 2147483647, 建議在初始化時設置corePoolSize即可(百度到的例子中,大多數沒有講這一塊)

5.線程池對拒絕任務的處理策略處理,默認為 new ThreadPoolExecutor.CallerRunsPolicy(),建議使用 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

6.如果Executor后台線程池還沒有完成Callable的計算,這時調用返回Future對象的get()方法,會阻塞直到計算完成。

 

我為什么要在這里重點提第四點和第五點,目前百度到的大多文章都是相互抄的,在定義executor主動定義了queueCapacity ,maxPoolSize  並沒有去看源碼中對於queueCapacity ,maxPoolSize  的處理。

我的建議是,這倆值無需自定義,為了提高多線程的並發效率,可以考慮直接放大corePoolSize。

 

關於executort的使用代碼我就不在此處多講,各位可以用此代碼,測試系統中指定bean的taskExecutor中到底有多少任務在執行。

getBean見 https://www.jianshu.com/p/3cd2d4e73eb7

使用方式如下

@Component
@Slf4j
public class TaskSchedule {

    @Autowired
    ApplicationContextProvider applicationContextProvider;

//    @Scheduled(fixedRate = 2000L, initialDelay = 5)
    public void getTaskExecutorState(){
        Class<ThreadPoolTaskExecutor> clas = ThreadPoolTaskExecutor.class;
        ThreadPoolTaskExecutor threadPoolTaskExecutor  = applicationContextProvider.getBean("taskExecutor", clas);
        ThreadPoolExecutor threadPoolExecutor = threadPoolTaskExecutor.getThreadPoolExecutor();
        log.info("{}, taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}], MaximumPoolSize[{}], largestPoolSize[{}]",
                threadPoolTaskExecutor.getThreadNamePrefix(),
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size(),
                threadPoolExecutor.getMaximumPoolSize(),
                threadPoolExecutor.getLargestPoolSize());
    }
}

 

controller

@Autowired
private AsyncTask task;

@Autowired
private TaskSchedule taskSchedule;

@PostMapping("/consume") @ResponseBody public JSONObject consume(@RequestBody JSONObject params) throws InterruptedException, ExecutionException { count ++; JSONObject jsonObject = new JSONObject(); log.info("params flag {} ",params.getString("flag")); log.info("名稱 {}", params.getString("loginid")); jsonObject.put("loginidis",params.getString("loginid")); jsonObject.put("count", count); Future<String> task4 = task.task4(count); taskSchedule.getTaskExecutorState(); // task.task4(); // log.info("Future<String> {}", task4.get()); //調用返回Future對象的get()方法,會阻塞直到計算完成 // task.getTest1(); return jsonObject; }

 

 

import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 功能描述:異步任務業務類(@Async也可添加在方法上)
 */
@Component
@Async("taskExecutor")
@Slf4j
public class AsyncTask {

    //獲取異步結果
    public Future<String> task4(int index) throws InterruptedException {
        log.info("開始執行任務 task4 index:{}",index);
        long begin = System.currentTimeMillis();
//        Thread.sleep(1000L*60*2);
//        int sleepTime = RandomUtil.randomInt(1000*60*3, 1000*60*5);
        int sleepTime = RandomUtil.randomInt(1000*30, 1000*60);
        log.info(" sleepTime is {}",sleepTime);
        Thread.sleep(sleepTime);
        long end = System.currentTimeMillis();
        log.info("任務4執行完畢 index:"+index+" 耗時=" + (end - begin));
        return new AsyncResult<String>("任務4");
    }

}

 

各位可以在代碼中注釋掉

        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
或者使用不同的拒絕策略測試效果。
如本人設置的參數core=3, max=5, queue=10, 通過postman構造對應的請求,會在第16個請求開始阻塞,由接收請求的線程本身http-nio-80-exec負責執行任務,其執行時間即postman請求消耗的時間
http-nio-80-exec即SpringBoot中tomcat本身默認的executor。
關於拒絕策略可參考:https://www.jianshu.com/p/f3322daa2ad0
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@Slf4j
public class ThreadPoolTaskConfig {

    private static final int corePoolSize = 2;               // 核心線程數(默認線程數)線程池創建時候初始化的線程數
    private static final int maxPoolSize = 5;                // 最大線程數 線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
    private static final int keepAliveTime = 10;            // 允許線程空閑時間(單位:默認為秒)當超過了核心線程之外的線程在空閑時間到達之后會被銷毀
    private static final int queueCapacity = 10;            // 緩沖隊列數 用來緩沖執行任務的隊列
    private static final String threadNamePrefix = "Async-Service-"; // 線程池名前綴 方便我們定位處理任務所在的線程池

    @Bean("taskExecutor") // bean的名稱,默認為首字母小寫的方法名
//    public ThreadPoolTaskExecutor taskExecutor(){
    public ThreadPoolTaskExecutor taskExecutor(){
//    public AsyncTaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
//        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setThreadNamePrefix(threadNamePrefix);

        // 線程池對拒絕任務的處理策略 采用了CallerRunsPolicy策略,當線程池沒有處理能力的時候,該策略會直接在 execute 方法的調用線程中運行被拒絕的任務;如果執行程序已關閉,則會丟棄該任務
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

//        executor.setRejectedExecutionHandler(
//                new RejectedExecutionHandler(){
//                    @Override
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        try {
//                            //繼續加入阻塞隊列執行,可自定義
//                            log.info("繼續加入阻塞隊列執行,可自定義");
//                            executor.getQueue().put(r);
//                        } catch (InterruptedException e) {
//                            e.printStackTrace();
//                        }
//                    }
//                }
//
//        );
        // 初始化
        executor.initialize();
        return executor;
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM