SpringBoot:整合線程池


hello world

pom依賴:(我這里用的boot版本是2.4.2)

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

測試Controller:

@RestController
public class TestController {

    @Autowired
    ThreadPoolTaskExecutor threadPoolTaskExecutor;

    @RequestMapping("test")
    public String test() throws InterruptedException {

        threadPoolTaskExecutor.execute(()->{
            System.out.println("開始");
            System.out.println(Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("結束");
        });
        return "hello";
    }
}

啟動項目后,我們發現ThreadPoolTaskExecutor可以注入到TestController,說明這個類也是被boot自動配置的,並且該方法execute確實是異步執行的

@EnableAsync和@Async

無返回值異步

我們在啟動類上標注@EnableAsync注解

@EnableAsync
@SpringBootApplication
public class BootAsyncApplication {

    public static void main(String[] args) {
        SpringApplication.run(BootAsyncApplication.class, args);
    }

}

編寫controller和service

@RestController
public class TestController {
    @Autowired
    private TestService testService;

    @Async
    @RequestMapping("test")
    public void test() throws InterruptedException {
        System.out.println("開始");
        System.out.println(Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("結束");
    }

    @RequestMapping("test2")
    public String test2(){
        testService.ok();
        return "ok";
    }
}

@Service
public class TestService {

    @Async
    public void ok() {
        System.out.println("service開始");
        System.out.println(Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("service結束");

    }
}

我在controller的一個方法和service的一個方法上加上了@Async注解,調用接口觀察效果

訪問接口后,立馬返回結果,后台線程異步執行並打印結果。

有返回值異步

上面測試的是無返回值異步調用。

下面介紹有返回值異步調用,並接受返回值

我們知道在juc中通過Future和Callable可以獲取異步調用返回值,spring中支持了AsyncResult類支持獲取方法返回值,但其實該類實現的也是Future。

修改test2方法:

    @RequestMapping("test2")
    public String test2() throws ExecutionException, InterruptedException {
        Future<String> ok = testService.ok();
        //Future的get會被阻塞,需要注意
        return ok.get();
    }

@Service
public class TestService {

    @Async
    public Future<String> ok() {
        System.out.println("Execute method asynchronously - "
                + Thread.currentThread().getName());
        try {
            Thread.sleep(5000);
            return new AsyncResult<>("hello world !!!!");
        } catch (InterruptedException ignored) {
        }
        return null;
    }
}

測試效果:

自動配置類:TaskExecutionAutoConfiguration

image-20210128093624185

大致就是先創建一個TaskExecutorBuilder,然后通過它去創建一個ThreadPoolTaskExecutor。

注意了,如果容器中有Executor,那么默認的ThreadPoolTaskExecutor就不會自動創建了。

注意:這里有一個大坑!!!我們注意到,默認的ThreadPoolTaskExecutor的參數是從TaskExecutionProperties.Pool中取得的,點進去。

image-20210128094032767

竟然發現阻塞隊列大小和線程數最大值都是Integer.MAX_VALUE,這種配置極其容易造成生產事故(生產任務大量堆積,OOM異常等)

所以一定不要使用springboot默認配置的線程池!!!

自定義springboot線程池

springboot給了我們兩種解決方案

配置參數

image-20210128094649536

我們可以通過配置spring.task.execution來修改配置

spring.task.execution.pool.core-size=10
spring.task.execution.pool.max-size=100
spring.task.execution.pool.queue-capacity=30

自定義線程池

這種方式比上一種更好,我們可以自定義線程池,定制化更好,包括我們可以自定義線程異常處理器

我們可以實現AsyncConfigurer接口,並重寫接口中的方法

@Slf4j
@EnableAsync
@Configuration
public class ThreadPoolConfig implements AsyncConfigurer {

    @Bean("WjTaskExecutor")
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(5);
        threadPoolTaskExecutor.setMaxPoolSize(50);
        threadPoolTaskExecutor.setAllowCoreThreadTimeOut(true);
        threadPoolTaskExecutor.setQueueCapacity(20);
        threadPoolTaskExecutor.setKeepAliveSeconds(300);
        threadPoolTaskExecutor.setThreadNamePrefix("wj-");
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //設置線程池關閉的時候等待所有任務都完成再繼續銷毀其他的bean
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        //設置線程池中任務的等待時間,如果超過這個時候還沒有就強制銷毀,以確保應用最后能夠關閉而不是阻塞住
        threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
        return threadPoolTaskExecutor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex, method, params)->{
            log.error("調用異步方法異常: \n" + method, ex);
        };
    }
}

修改controller的test方法:

    @Async
    @RequestMapping("test")
    public void test() throws InterruptedException {
        System.out.println("開始");
        System.out.println(Thread.currentThread().getName());
        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        int i = 1/0;
        System.out.println("結束");
    }

測試:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM