SpringBoot 异步与多线程


1. @Async可以开启异步,但是要在 main 中EnableAsync

2.@Async既可以注解在方法上,也可以注解到类上

3.使用@Async时,请注意一定要对应bean name,否则或调用系统默认的SampleTaskExecutor,容易造成OOM

4.本人使用的SpringBoot 2.3.4 ,默认值  maxPoolSize = 2147483647,queueCapacity = 2147483647, 建议在初始化时设置corePoolSize即可(百度到的例子中,大多数没有讲这一块)

5.线程池对拒绝任务的处理策略处理,默认为 new ThreadPoolExecutor.CallerRunsPolicy(),建议使用 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

6.如果Executor后台线程池还没有完成Callable的计算,这时调用返回Future对象的get()方法,会阻塞直到计算完成。

 

我为什么要在这里重点提第四点和第五点,目前百度到的大多文章都是相互抄的,在定义executor主动定义了queueCapacity ,maxPoolSize  并没有去看源码中对于queueCapacity ,maxPoolSize  的处理。

我的建议是,这俩值无需自定义,为了提高多线程的并发效率,可以考虑直接放大corePoolSize。

 

关于executort的使用代码我就不在此处多讲,各位可以用此代码,测试系统中指定bean的taskExecutor中到底有多少任务在执行。

getBean见 https://www.jianshu.com/p/3cd2d4e73eb7

使用方式如下

@Component
@Slf4j
public class TaskSchedule {

    @Autowired
    ApplicationContextProvider applicationContextProvider;

//    @Scheduled(fixedRate = 2000L, initialDelay = 5)
    public void getTaskExecutorState(){
        Class<ThreadPoolTaskExecutor> clas = ThreadPoolTaskExecutor.class;
        ThreadPoolTaskExecutor threadPoolTaskExecutor  = applicationContextProvider.getBean("taskExecutor", clas);
        ThreadPoolExecutor threadPoolExecutor = threadPoolTaskExecutor.getThreadPoolExecutor();
        log.info("{}, taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}], MaximumPoolSize[{}], largestPoolSize[{}]",
                threadPoolTaskExecutor.getThreadNamePrefix(),
                threadPoolExecutor.getTaskCount(),
                threadPoolExecutor.getCompletedTaskCount(),
                threadPoolExecutor.getActiveCount(),
                threadPoolExecutor.getQueue().size(),
                threadPoolExecutor.getMaximumPoolSize(),
                threadPoolExecutor.getLargestPoolSize());
    }
}

 

controller

@Autowired
private AsyncTask task;

@Autowired
private TaskSchedule taskSchedule;

@PostMapping("/consume") @ResponseBody public JSONObject consume(@RequestBody JSONObject params) throws InterruptedException, ExecutionException { count ++; JSONObject jsonObject = new JSONObject(); log.info("params flag {} ",params.getString("flag")); log.info("名称 {}", params.getString("loginid")); jsonObject.put("loginidis",params.getString("loginid")); jsonObject.put("count", count); Future<String> task4 = task.task4(count); taskSchedule.getTaskExecutorState(); // task.task4(); // log.info("Future<String> {}", task4.get()); //调用返回Future对象的get()方法,会阻塞直到计算完成 // task.getTest1(); return jsonObject; }

 

 

import cn.hutool.core.util.RandomUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;

import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
 * 功能描述:异步任务业务类(@Async也可添加在方法上)
 */
@Component
@Async("taskExecutor")
@Slf4j
public class AsyncTask {

    //获取异步结果
    public Future<String> task4(int index) throws InterruptedException {
        log.info("开始执行任务 task4 index:{}",index);
        long begin = System.currentTimeMillis();
//        Thread.sleep(1000L*60*2);
//        int sleepTime = RandomUtil.randomInt(1000*60*3, 1000*60*5);
        int sleepTime = RandomUtil.randomInt(1000*30, 1000*60);
        log.info(" sleepTime is {}",sleepTime);
        Thread.sleep(sleepTime);
        long end = System.currentTimeMillis();
        log.info("任务4执行完毕 index:"+index+" 耗时=" + (end - begin));
        return new AsyncResult<String>("任务4");
    }

}

 

各位可以在代码中注释掉

        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
或者使用不同的拒绝策略测试效果。
如本人设置的参数core=3, max=5, queue=10, 通过postman构造对应的请求,会在第16个请求开始阻塞,由接收请求的线程本身http-nio-80-exec负责执行任务,其执行时间即postman请求消耗的时间
http-nio-80-exec即SpringBoot中tomcat本身默认的executor。
关于拒绝策略可参考:https://www.jianshu.com/p/f3322daa2ad0
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.ThreadPoolExecutor;

@Configuration
@Slf4j
public class ThreadPoolTaskConfig {

    private static final int corePoolSize = 2;               // 核心线程数(默认线程数)线程池创建时候初始化的线程数
    private static final int maxPoolSize = 5;                // 最大线程数 线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
    private static final int keepAliveTime = 10;            // 允许线程空闲时间(单位:默认为秒)当超过了核心线程之外的线程在空闲时间到达之后会被销毁
    private static final int queueCapacity = 10;            // 缓冲队列数 用来缓冲执行任务的队列
    private static final String threadNamePrefix = "Async-Service-"; // 线程池名前缀 方便我们定位处理任务所在的线程池

    @Bean("taskExecutor") // bean的名称,默认为首字母小写的方法名
//    public ThreadPoolTaskExecutor taskExecutor(){
    public ThreadPoolTaskExecutor taskExecutor(){
//    public AsyncTaskExecutor taskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(corePoolSize);
        executor.setMaxPoolSize(maxPoolSize);
        executor.setQueueCapacity(queueCapacity);
//        executor.setKeepAliveSeconds(keepAliveTime);
        executor.setThreadNamePrefix(threadNamePrefix);

        // 线程池对拒绝任务的处理策略 采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
//        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());

//        executor.setRejectedExecutionHandler(
//                new RejectedExecutionHandler(){
//                    @Override
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        try {
//                            //继续加入阻塞队列执行,可自定义
//                            log.info("继续加入阻塞队列执行,可自定义");
//                            executor.getQueue().put(r);
//                        } catch (InterruptedException e) {
//                            e.printStackTrace();
//                        }
//                    }
//                }
//
//        );
        // 初始化
        executor.initialize();
        return executor;
    }

}

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM