SpringBoot中的異步操作與線程池


SpringBoot中的異步操作與線程池

線程池類型

Java通過 java.util.concurrent.Executors 的靜態方法提供五種線程池

  1. newCachedThreadPool 創建一個可緩存線程池,如果線程池長度超過處理需要,可靈活回收空閑線程,若無可回收,則新建線程。
  2. newFixedThreadPool 創建一個定長線程池,可控制線程最大並發數,超出的線程會在隊列中等待。
  3. newScheduledThreadPool 創建一個定長線程池,支持定時及周期性任務執行。
  4. newSingleThreadExecutor 創建一個單線程化的線程池,它只會用唯一的工作線程來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先級)執行。
  5. newWorkStealingPool 這是java8新增的線程池類型。創建一個含有足夠多線程的線程池,來維持相應的並行級別,它會通過工作竊取的方式,使得多核的CPU不會閑置,總會有活着的線程讓CPU去運行。

五種線程池的底層實現

  • ThreadPoolExecutor 是CachedThreadPool、FixedThreadPool、SingleThreadExecutor、ScheduledThreadPool 這四種類型線程池的底層實現
  • ForkJoinPool (java7已有) 是WorkStealingPool線程池的底層實現

使用線程池的優點

  • 重用存在的線程,減少對象創建、消亡的開銷,性能佳。
  • 可有效控制最大並發線程數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
  • 提供定時執行、定期執行、單線程、並發數控制等功能。

如何在SpringBoot中優雅的使用線程池

注冊線程池

在config目錄下創建 AsyncConfig 配置類,在配置類中定義線程池

package com.example.async_demo.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Lazy;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.SchedulingConfigurer;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.scheduling.config.ScheduledTaskRegistrar;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@Configuration
@EnableAsync
@EnableScheduling
public class AsyncConfig implements SchedulingConfigurer {
    //第一種線程池定義方式,可代替CachedThreadPool、FixedThreadPool、SingleThreadExecutor這三種
    // Spring線程池
    @Lazy //線程池懶加載
    @Bean(name="threadPoolTaskExecutor",destroyMethod="shutdown") //name為線程池名稱,destroyMethod="shutdown"在spring bean回收后釋放資源
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        //封裝的是原生的ThreadPoolExecutor類型線程池
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心線程數(獲取硬件):線程池創建時候初始化的線程數
        int corePoolSize = Runtime.getRuntime().availableProcessors();
        System.out.println(corePoolSize);
        executor.setCorePoolSize(corePoolSize);
        //最大線程數+5:線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
        executor.setMaxPoolSize(corePoolSize+5);
        //緩沖隊列500:用來緩沖執行任務的隊列
        executor.setQueueCapacity(500);
        //允許線程的空閑時間60秒:當超過了核心線程出之外的線程在空閑時間到達之后會被銷毀
        executor.setKeepAliveSeconds(60);
        //線程池名的前綴:設置好了之后可以方便我們定位處理任務所在的線程池
        executor.setThreadNamePrefix("MyAsync-");
        executor.initialize();
        return executor;
    }

    //第二種線程池定義方式,使用的是WorkStealingPool
    //java8 搶占式線程池
    @Lazy
    @Bean(name="workStealingPool",destroyMethod="shutdown")
    public ExecutorService workStealingPool(){
        ExecutorService executorService = Executors.newWorkStealingPool();
        return executorService;
    }

    //第三種線程池定義方式,為周期任務線程池
    //周期任務線程池
    @Lazy
    @Bean(name="scheduledThreadPool",destroyMethod="shutdown")
    public ExecutorService scheduledThreadPool() {
        return Executors.newScheduledThreadPool(3);
    }

    @Override
    public void configureTasks(ScheduledTaskRegistrar scheduledTaskRegistrar) {
        scheduledTaskRegistrar.setScheduler(scheduledThreadPool());
    }
}

我在上述案例代碼中定義了三種類型的線程池

  1. 第一種是ThreadPoolTaskExecutor線程池,他是Spring中的 org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor 線程池,
    底層是對 java.util.concurrent.ThreadPoolExecutor 的封裝,綜合了CachedThreadPool、FixedThreadPool、SingleThreadExecutor這三種線程池的優點;
  2. 第二種是java8新增的 workStealingPool 線程池。第一種和第二種使用時可以在配置類上使用@EnableAsync注解,這樣就能優雅的使用@Async注解方法來實現線程run邏輯了;
  3. 第三種是ScheduledThreadPool線程池,不過在Spring中使用需要配置類實現SchedulingConfigurer接口,重寫configureTasks方法。在配置類上使用
    @EnableScheduling注解,就可以優雅的使用@Scheduled注解方法來實現周期邏輯了

使用線程池

對第一種和第二種線程池在service中實現線程run的邏輯

package com.example.async_demo.service;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;

@Service
public class AsyncService {
    
    //使用名為threadPoolTaskExecutor的線程池,返回Future
    @Async("threadPoolTaskExecutor")
    public Future<Double> service1(){
        double result = getRand(3000);
        return AsyncResult.forValue(result);
    }
    
    //使用名為threadPoolTaskExecutor的線程池,返回CompletableFuture
    @Async("threadPoolTaskExecutor")
    public CompletableFuture<Double> service2(){
        double result = getRand(3000);
        return CompletableFuture.completedFuture(result);
    }
    
    //使用名為workStealingPool的線程池,返回CompletableFuture
    @Async("workStealingPool")
    public CompletableFuture<Double> service3(){
        double result = getRand(3000);
        return CompletableFuture.completedFuture(result);
    }

    private double getRand(long sleep){
        System.out.println(Thread.currentThread().getId()+"-start");
        try {
            Thread.sleep(sleep);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        double result = Math.random();//方法返回的結果
        return result;
    }
}

測試第一種和第二種線程池

@SpringBootTest
class AsyncDemoApplicationTests {

    @Autowired
    private AsyncService asyncService;

    @Test
    void test1() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        Future<Double> result1 = asyncService.service1();
        Future<Double> result2 = asyncService.service1();
        Future<Double> result3 = asyncService.service1();
        
        //讓主線程等待子線程結束之后才能繼續運行
        while (!(result1.isDone()&&result2.isDone()&&result3.isDone())){
            Thread.sleep(500);
        }
        long end = System.currentTimeMillis();
        System.out.println(end-start+"ms");
        System.out.println(result1.get());
        System.out.println(result2.get());
        System.out.println(result3.get());
    }

    @Test
    void test2() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        CompletableFuture<Double> result1 = asyncService.service2();
        CompletableFuture<Double> result2 = asyncService.service2();
        CompletableFuture<Double> result3 = asyncService.service2();

        //join() 的作用:讓主線程等待子線程結束之后才能繼續運行
        CompletableFuture.allOf(result1,result2,result3).join();
        long end = System.currentTimeMillis();
        System.out.println(end-start+"ms");
        System.out.println(result1.get());
        System.out.println(result2.get());
        System.out.println(result3.get());
    }

    @Test
    void test3() throws ExecutionException, InterruptedException {
        long start = System.currentTimeMillis();
        CompletableFuture<Double> result1 = asyncService.service3();
        CompletableFuture<Double> result2 = asyncService.service3();
        CompletableFuture<Double> result3 = asyncService.service3();

        //join() 的作用:讓主線程等待子線程結束之后才能繼續運行
        CompletableFuture.allOf(result1,result2,result3).join();
        long end = System.currentTimeMillis();
        System.out.println(end-start+"ms");
        System.out.println(result1.get());
        System.out.println(result2.get());
        System.out.println(result3.get());
    }
}

test1測試結果

test1

test2測試結果

test1

test3測試結果

test1

通過測試發現Future返回類型不適合主線等待多個子線程全部完成的操作,
因為需要用到while循環去阻塞主線程,而CompletableFuture可以通過CompletableFuture.allOf(cf1,cf2,cf3).join()
去完成這個操作,所以推薦使用CompletableFuture作為返回類型

注意:@Async注解的方法不能在本類中被調用,只能在其他類中調用,如Controller類

對第三種線程池在service中實現線程的邏輯

package com.example.async_demo.service;

import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;

@Service
public class AsyncService {
    
    //cron表達式 每5秒執行一次
    //@Scheduled(cron = "*/5 * * * * ?")
    @Scheduled(cron = "${cron.sec5}") //表達式寫在application.yml文件中,則以這種方式取出。
    public void service4(){
        System.out.println("5s-"+Thread.currentThread().getId()+":"+System.currentTimeMillis()/1000);
    }
    
    //cron表達式 每3秒執行一次
    @Scheduled(cron = "${cron.sec3}")
    public void service5(){
        System.out.println("3s-"+Thread.currentThread().getId()+":"+System.currentTimeMillis()/1000);
    }

}

application.yml 文件

cron:
  sec5: '*/5 * * * * ?'
  sec3: '*/3 * * * * ?'

周期任務測試結果(啟動Application類)

test1

通過測試結果可發現兩個周期任務使用了三個線程,
線程id分別是20、21、25。兩個周期任務分別以3s和5s執行一次,
但不固定在某個線程中執行,而是哪個線程空閑則使用哪個線程

注意:若不為周期任務配置線程池,只使用@EnableScheduling和@Scheduled注解的話,
則所有周期任務共用一個子線程,若出現下一個周期開始上一個周期任務還沒結束的情況,
則線程阻塞,直到前一個任務完成

CRON表達式

  • cron表達式是定義任務周期的一種表達式
  • 這里不多介紹,可以參考這篇博客


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM