記錄ThreadPoolTaskExecutor線程池的在項目中的實際應用,講解一下線程池的配置和參數理解。


前言:最近項目中與融360項目中接口對接,有反饋接口(也就是我們接收到請求,需要立即響應,並且還要有一個接口推送給他們其他計算結果),推送過程耗時、或者說兩個接口不能是同時返回,有先后順序。

這時我想到了把自己Controller立即返回接受成功,中間添加一個新的線程去做其他耗時的操作(線程池配置和參數測試講解請閱讀第5步)。

1、Controller代碼如下:

@Autowired
private CallThreadDemo worker;
@RequestMapping("/bandBankConfirm2")
    public void bandBankConfirm2(String jsonString) {
        System.out.println("controller開始--------------");
    //這里是需要調用第三方的接口傳入參數 String method
= "is.api.v3.order.bindcardfeedback"; Map<String, Object> map = new HashMap<>(); map.put("order_no", "254986512848973"); map.put("bind_status", 1); map.put("reason", "");     //這里開始調用線程池的方法 worker.callRong360(method, map); System.out.println("controller end --------------");
    //renderJson這個方法是jfinal的,可以理解為@ReponseBody 需要return的操作了
    renderJson("接口調用完畢");   
}
2、調用線程池的方法 CallThreadDemo 類代碼:
@Component
public class CallThreadDemo {

   //這里是Spring.xml中配置的bean名稱 @Autowired
private ThreadPoolTaskExecutor executor; @Autowired private ServiceTest serviceTest; public void callRong360(final String method, final Map<String, Object> map) { //這個類是我封裝的抽象類,里面有一個公共方法,具體代碼下面有 ServiceParent callRong360Method = new ServiceParent() { @Override public void run() { try { Thread.sleep(20000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("線程開始-------------");
          //這里調用第三方公用接口 JSONObject result
= callBandBankMethod(method, map);

          //這里調用service方法,實現自己的業務邏輯 serviceTest.insertUser(
"111", "222222");
System.out.println(result); System.out.println(
"線程結束-------------"); } };
     //這里線程池方法調用一個線程繼承類或者實現Runable接口的類 executor.execute(callRong360Method); System.out.println(
"當前活動線程數:"+ executor.getActiveCount()); System.out.println("核心線程數:"+ executor.getCorePoolSize()); System.out.println("總線程數:"+ executor.getPoolSize()); System.out.println("最大線程池數量"+executor.getMaxPoolSize()); System.out.println("線程處理隊列長度"+executor.getThreadPoolExecutor().getQueue().size()); }

3、封裝的抽象類代碼如下:

public abstract class ServiceParent implements Runnable {

    public JSONObject callBandBankMethod(String method, Map<String, Object> map) {
//這個方法是調用三方的接口,公用部分
// //輸入 參數如下 : // Map<String, Object> map = new HashMap<>(); // map.put("order_no", "254986512848973"); // map.put("bind_status", 1); // map.put("reason", ""); // net.sf.json.JSONObject ret = callRong360Method.callBandBankMethod("is.api.v3.order.bindcardfeedback", map); // // // 異常情況輸出參數 為 null: OpenapiClient openapiClient = new OpenapiClient(); openapiClient.setMethod(method); for (Map.Entry<String, Object> entry : map.entrySet()) { openapiClient.setField(entry.getKey(), String.valueOf(entry.getValue())); } net.sf.json.JSONObject ret = null; try { ret = openapiClient.execute(); } catch (Exception e) { e.printStackTrace(); System.out.println("調用反饋接口異常---->" + e.getMessage()); } return ret; }   //這里定義為抽象方法,創建匿名內部類或者繼承類必須實現 public abstract void run();

4、ServiceTest接口方法如下:

@Service
public class ServiceTest {

    @Autowired
    private BankMapper bankMapper;

    @Transactional(rollbackFor = {Exception.class})
    public void insertUser(String s, String s1) {
        bankMapper.insertUser(s, s1);

         //這里創建異常,測試事務。已測試,事務生效
        int i = 1 / 0;
    }
}

以上四步,成功在調用Controller方法立即返回結果,也實現了另外的一個線程去調用三方接口返回信息,並且實現了自己的業務邏輯。

5、接着我們繼續說明一下線程池對應配置和參數說明,這里我們通過測試來說明參數的作用(corePoolSize 核心線程數量,maxPoolSize 最大線程數,queueCapacity 處理隊列)

  threadpool.corePoolSize=5
  threadpool.keepAliveSeconds=200
  threadpool.maxPoolSize=10
  threadpool.queueCapacity=2
  <!-- 線程池 -->
    <bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
        <property name="corePoolSize" value="${threadpool.corePoolSize}" />
        <property name="keepAliveSeconds" value="${threadpool.keepAliveSeconds}" />
        <property name="maxPoolSize" value="${threadpool.maxPoolSize}" />
        <property name="queueCapacity" value="${threadpool.queueCapacity}" />
    </bean>
        <!--請在bean xsd中配置task-->
    <task:annotation-driven executor="taskExecutor" />
    

 

下面貼出測試輸出的日志信息:controller開始--------------

當前活動線程數:1 核心線程數:5 總線程數:1 最大線程池數量10 線程處理隊列長度0 controller end --------------

controller開始-------------- 當前活動線程數:2 核心線程數:5 總線程數:2 最大線程池數量10 線程處理隊列長度0 controller end --------------

controller開始-------------- 當前活動線程數:3 核心線程數:5 總線程數:3 最大線程池數量10 線程處理隊列長度0 controller end --------------

controller開始-------------- 當前活動線程數:4 核心線程數:5 總線程數:4 最大線程池數量10 線程處理隊列長度0 controller end --------------

controller開始-------------- 當前活動線程數:5 核心線程數:5 總線程數:5 最大線程池數量10 線程處理隊列長度0 controller end --------------

controller開始-------------- 當前活動線程數:5---------------》因為放入了隊列中,此處活動線程數還是5 核心線程數:5 總線程數:5 最大線程池數量10 線程處理隊列長度1 -------------》這里達到了最大的核心線程數量 corePoolSize=5,開始放入處理隊列中
queueCapacity=2
controller end --------------
controller開始-------------- 當前活動線程數:5 核心線程數:5 總線程數:5 最大線程池數量10 線程處理隊列長度2 ---------------》繼續放入隊列中,達到了最大隊列數量2 controller end --------------

controller開始-------------- 當前活動線程數:6-----------------》這里因為達到了最大隊列數量,所以繼續創建線程去執行,一直到最后到最大線程數量 核心線程數:5 總線程數:6 最大線程池數量10 線程處理隊列長度2 controller end --------------

controller開始-------------- 當前活動線程數:7 核心線程數:5 總線程數:7 最大線程池數量10 線程處理隊列長度2 controller end --------------

controller開始-------------- 當前活動線程數:8 核心線程數:5 總線程數:8 最大線程池數量10 線程處理隊列長度2 controller end --------------

controller開始-------------- 當前活動線程數:9 核心線程數:5 總線程數:9 最大線程池數量10 線程處理隊列長度2 controller end --------------

controller開始-------------- 當前活動線程數:10 --------------》這里活動線程數量達到了最大線程池數量 核心線程數:5 總線程數:10 最大線程池數量10 線程處理隊列長度2 controller end --------------

controller開始-------------- 2018-07-01 20:23:19 -----------------》這里繼續調用,因為最大線程池數量和隊列中都已經到了最大值,拋出了異常 [] [] [WARN]-[Thread: qtp1276504061-60]-[org.springframework.web.servlet.handler.AbstractHandlerExceptionResolver.logException()]: Handler execution resulted in exception org.springframework.core.task.TaskRejectedException: Executor [java.util.concurrent.ThreadPoolExecutor@17d95b77[Running, pool size = 10, active threads = 10, queued tasks = 2, completed tasks = 0]] did not accept task: com.fastx.cooperate.rong360.rong.service.CallThreadDemo$1@5fa6bf1 at org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor.execute(ThreadPoolTaskExecutor.java:245) at com.fastx.cooperate.rong360.rong.service.CallThreadDemo.callRong360(CallThreadDemo.java:43)

這樣,通過輸出的日志,我們可以很容易的理解了各個參數的作用。

corePoolSize: 線程池維護線程的最少數量 
keepAliveSeconds 線程池維護線程所允許的空閑時間 
maxPoolSize 線程池維護線程的最大數量 
queueCapacity 線程池所使用的緩沖隊列

 

 

ThredPoolTaskExcutor的處理流程:

 

  當池子大小小於corePoolSize,就新建線程,並處理請求

 

  當池子大小等於corePoolSize,把請求放入workQueue中,池子里的空閑線程就去workQueue中取任務並處理

 

  當workQueue放不下任務時,就新建線程入池,並處理請求,如果池子大小撐到了maximumPoolSize,就用RejectedExecutionHandler來做拒絕處理

 

  當池子的線程數大於corePoolSize時,多余的線程會等待keepAliveTime長時間,如果無請求可處理就自行銷毀

 

  其會優先創建  CorePoolSiz 線程, 當繼續增加線程時,先放入Queue中,當 CorePoolSiz  和 Queue 都滿的時候,就增加創建新線程,當線程達到MaxPoolSize的時候,就會拋出錯 誤 org.springframework.core.task.TaskRejectedException

 

  另外MaxPoolSize的設定如果比系統支持的線程數還要大時,會拋出java.lang.OutOfMemoryError: unable to create new native thread 異常。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM