springboot多線程TaskExecutor的使用,以及使用@Async實現異步調用


@Async實現異步調用

pom.xml

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>

啟動類

@EnableAsync
@SpringBootApplication
public class LearnutilsApplication {
 
   public static void main(String[] args) {
      SpringApplication.run(LearnutilsApplication.class, args);
   }
 
   /**
    * 核心線程數10:線程池創建時初始化的線程數
    * 最大線程數20:線程池最大的線程數,只有在緩沖隊列滿了之后才會申請超過核心線程數的線程
    * 緩沖隊列200:用來緩沖執行任務的隊列
    * 允許線程的空閑時間60秒:超過了核心線程數之外的線程,在空閑時間到達之后會被銷毀
    * 線程池名的前綴:設置好了之后可以方便我們定位處理任務所在的線程池
    * 線程池對拒絕任務的處理策略:此處采用了CallerRunsPolicy策略,當線程池沒有處理能力的時候,該策略會直接在execute方法的調用線程中運行被拒絕的任務;如果執行程序已被關閉,則會丟棄該任務
    * 設置線程池關閉的時候等待所有任務都完成再繼續銷毀其他的Bean
    * 設置線程池中任務的等待時間,如果超過這個時候還沒有銷毀就強制銷毀,以確保應用最后能夠被關閉,而不是阻塞住
    */
   @EnableAsync
   @Configuration
   class TaskPoolConfig{
      @Bean("taskExecutor")
      public Executor taskExecutor(){
         ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
         executor.setCorePoolSize(10);
         executor.setMaxPoolSize(20);
         executor.setQueueCapacity(200);
         executor.setKeepAliveSeconds(60);
         executor.setThreadNamePrefix("TaskExecutor-");
         executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
         executor.setWaitForTasksToCompleteOnShutdown(true);
         executor.setAwaitTerminationSeconds(60);
         return executor;
      }
   }
 
}

定義controller

@RequestMapping(value = "/AsyncController")
@RestController
public class AsyncController {
 
    @Autowired
    private AsyncService asyncService;
 
    @Autowired
    private AsyncService2 asyncService2;
 
    @Autowired
    private AsyncService3 asyncService3;
 
    @GetMapping(value = "/sendSms")
    public String sendSms() throws Exception{
       
        Future<String> sms = asyncService.sendSms();
        Future<String> sms2 = asyncService2.sendSms();
        Future<String> sms3 = asyncService3.sendSms();
        int i = 0;
        for (;;) {
            //如果都執行完就跳出循環,isDone方法,如果此線程執行完,true
            if (sms.isDone() && sms2.isDone() && sms3.isDone()) {
                break;
            }
        }
        //get是獲取結果集
        return sms.get()+sms2.get()+sms3.get();
    }
}

定義接口

public interface AsyncService {
    Future<String> sendSms();
}

實現類

@Service
public class AsyncServiceImpl implements AsyncService {
    //Future<String> 返回結果 AsyncResult<String>
    @Async("taskExecutor")
    @Override
    public Future<String> sendSms() {
        return new AsyncResult<>("000000");
    }
}

將isDone換程CountDownLatch來判斷線程是否執行完實例化CountDownLatch並且制定線程個數,線程個數就是從本地異步調用的方法個輸,並且傳入線程任務中,每個線程執行完畢就調用countDown()方法。最后在調用await()方法。這樣在線程計數為零之前,線程就會一直等待。

AsyncResult用來封裝結果集,否則結果集無法返回

@GetMapping(value = "/sendSms2")
public String sendSms2() throws Exception{
    CountDownLatch downLatch = new CountDownLatch(3);
    Future<String> s = asyncService4.sendSms(downLatch);
    Future<String> s1 = asyncService5.sendSms(downLatch);
    Future<String> s2 = asyncService6.sendSms(downLatch);
    downLatch.await();
    return s.get()+s1.get()+s2.get();
}

將CountDownLatch傳給方法

public interface AsyncService4 {
    Future<String> sendSms(CountDownLatch downLatch);
}

方法

@Service
public class AsyncService4Impl implements AsyncService4 {

    @Async("taskExecutor")
    @Override
    public Future<String> sendSms(CountDownLatch downLatch) {
        downLatch.countDown();
        return new AsyncResult<>("11111");
    }
}

TaskExecutor的使用

注冊TaskExecutor

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
 
import java.util.concurrent.ThreadPoolExecutor;
 
/**
 * @author yanjun
 * @date 2019/8/1 16:04
 **/
@Configuration
public class MainConfiguration {
    @Bean
    public TaskExecutor getTaskExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        // 設置核心線程數
        executor.setCorePoolSize(5);
        // 設置最大線程數
        executor.setMaxPoolSize(10);
        // 設置隊列容量
        executor.setQueueCapacity(20);
        // 設置線程活躍時間(秒)
        executor.setKeepAliveSeconds(60);
        // 設置默認線程名稱
        executor.setThreadNamePrefix("post-lending-");
        // 設置拒絕策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        // 等待所有任務結束后再關閉線程池
        executor.setWaitForTasksToCompleteOnShutdown(true);
        return executor;
    }
}

使用TaskExecutor

@Autowired
private TaskExecutor taskExecutor;
 
public ResultVO findHandlingRecordByAssociationId(Integer associationId) throws InterruptedException{
    Map<String, Object> map = new HashMap<>(2);
   //線程計數器(等待所有線程執行完統一返回)
    CountDownLatch countDownLatch = new CountDownLatch(10);
    taskExecutor.execute(() -> {
        try {
            //service調用
            map.put("HandlingRecord", legalLitigationService.findHandlingRecordByAssociationId(associationId));
        }finally {
            countDownLatch.countDown();
        }
    });
    taskExecutor.execute(() -> {
        try {
            map.put("CaseBasic", legalLitigationService.findCaseDetailsById(associationId));
        }finally {
            countDownLatch.countDown();
        }
    });
    countDownLatch.await();
    return ResultVO.putSuccess(map);
}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM