Spring線程池ThreadPoolTaskExecutor的使用


1 線程池簡介

1.1 為什么使用線程池

  • 降低系統資源消耗,通過重用已存在的線程,降低線程創建和銷毀造成的消耗;
  • 提高系統響應速度,當有任務到達時,通過復用已存在的線程,無需等待新線程的創建便能立即執行;
  • 方便線程並發數的管控,因為線程若是無限制的創建,可能會導致內存占用過多而產生OOM,並且會造成cpu過度切換(cpu切換線程是有時間成本的(需要保持當前執行線程的現場,並恢復要執行線程的現場)
  • 提供更強大的功能,延時定時線程池

1.2 線程池為什么需要使用隊列

因為線程若是無限制的創建,可能會導致內存占用過多而產生OOM,並且會造成cpu過度切換。

創建線程池的消耗較高或者線程池創建線程需要獲取mainlock這個全局鎖,影響並發效率,阻塞隊列可以很好的緩沖

1.3 線程池為什么要使用阻塞隊列而不使用非阻塞隊列

阻塞隊列可以保證任務隊列中沒有任務時阻塞獲取任務的線程,使得線程進入wait狀態,釋放cpu資源,當隊列中有任務時才喚醒對應線程從隊列中取出消息進行執行。
使得在線程不至於一直占用cpu資源。(線程執行完任務后通過循環再次從任務隊列中取出任務進行執行,代碼片段如:while (task != null || (task = getTask()) != null) {})。

不用阻塞隊列也是可以的,不過實現起來比較麻煩而已,有好用的為啥不用呢

1.4 如何配置線程池

  • CPU密集型任務
    盡量使用較小的線程池,一般為CPU核心數+1。 因為CPU密集型任務使得CPU使用率很高,若開過多的線程數,會造成CPU過度切換

  • IO密集型任務
    可以使用稍大的線程池,一般為2*CPU核心數。 IO密集型任務CPU使用率並不高,因此可以讓CPU在等待IO的時候有其他線程去處理別的任務,充分利用CPU時間

  • 混合型任務
    可以將任務分成IO密集型和CPU密集型任務,然后分別用不同的線程池去處理。 只要分完之后兩個任務的執行時間相差不大,那么就會比串行執行來的高效
    因為如果划分之后兩個任務執行時間有數據級的差距,那么拆分沒有意義。
    因為先執行完的任務就要等后執行完的任務,最終的時間仍然取決於后執行完的任務,而且還要加上任務拆分與合並的開銷,得不償失

1.5 execute()和submit()方法

  1. execute(),執行一個任務,沒有返回值
  2. submit(),提交一個線程任務,有返回值

submit(Callable<T> task)能獲取到它的返回值,通過future.get()獲取(阻塞直到任務執行完)。一般使用FutureTask+Callable配合使用
submit(Runnable task, T result)能通過傳入的載體result間接獲得線程的返回值。
submit(Runnable task)則是沒有返回值的,就算獲取它的返回值也是null

Future.get()方法會使取結果的線程進入阻塞狀態,直到線程執行完成之后,喚醒取結果的線程,然后返回結果

1.6 Spring線程池

Spring 通過任務執行器(TaskExecutor)來實現多線程和並發編程,使用ThreadPoolTaskExecutor實現一個基於線程池的TaskExecutor
還得需要使用@EnableAsync開啟異步,並通過在需要的異步方法那里使用注解@Async聲明是一個異步任務
Spring 已經實現的異常線程池:

  • SimpleAsyncTaskExecutor:不是真的線程池,這個類不重用線程,每次調用都會創建一個新的線程。
  • SyncTaskExecutor:這個類沒有實現異步調用,只是一個同步操作。只適用於不需要多線程的地方
  • ConcurrentTaskExecutorExecutor的適配類,不推薦使用。如果ThreadPoolTaskExecutor不滿足要求時,才用考慮使用這個類
  • SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類。線程池同時被quartz和非quartz使用,才需要使用此類
  • ThreadPoolTaskExecutor :最常使用,推薦。 其實質是對java.util.concurrent.ThreadPoolExecutor的包裝

1.7 @Async調用中的事務處理機制

@Async標注的方法,同時也使用@Transactional進行標注;在其調用數據庫操作之時,將無法產生事務管理的控制,原因就在於其是基於異步處理的操作。

那該如何給這些操作添加事務管理呢?
可以將需要事務管理操作的方法放置到異步方法內部,在內部被調用的方法上添加@Transactional

示例:

  • 方法A, 使用了@Async/@Transactional來標注,但是無法產生事務控制的目的。
  • 方法B, 使用了@Async來標注,B中調用了C、DC/D分別使用@Transactional做了標注,則可實現事務控制的目的

2 示例

2.1 線程池配置類

package cn.jzh.thread;
import org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.concurrent.Executor;

@Configuration
@ComponentScan("cn.jzh.thread")
@EnableAsync  //開啟異步操作
public class TaskExecutorConfig implements AsyncConfigurer {

    /**
     * 通過getAsyncExecutor方法配置ThreadPoolTaskExecutor,獲得一個基於線程池TaskExecutor
     *
     * @return
     */
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();
        pool.setCorePoolSize(5);//核心線程數
        pool.setMaxPoolSize(10);//最大線程數
        pool.setQueueCapacity(25);//線程隊列
        pool.initialize();//線程初始化
        return pool;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return null;
    }
}

配置類中方法說明:
Spring 中的ThreadPoolExecutor是借助JDK並發包中的java.util.concurrent.ThreadPoolExecutor來實現的。其中一些值的含義如下:

  • int corePoolSize:線程池維護線程的最小數量
  • int maximumPoolSize:線程池維護線程的最大數量,線程池中允許的最大線程數,線程池中的當前線程數目不會超過該值。如果隊列中任務已滿,並且當前線程個數小於maximumPoolSize,那么會創建新的線程來執行任務。
  • long keepAliveTime:空閑線程的存活時間TimeUnit
  • unit:時間單位,現由納秒,微秒,毫秒,秒
  • BlockingQueue workQueue:持有等待執行的任務隊列
  • RejectedExecutionHandler handler 線程池的拒絕策略,是指當任務添加到線程池中被拒絕,而采取的處理措施。
    當任務添加到線程池中之所以被拒絕,可能是由於:第一,線程池異常關閉。第二,任務數量超過線程池的最大限制。
    Reject策略預定義有四種:
  1. ThreadPoolExecutor.AbortPolicy策略,是默認的策略,處理程序遭到拒絕將拋出運行時 RejectedExecutionException
  2. ThreadPoolExecutor.CallerRunsPolicy策略 ,調用者的線程會執行該任務,如果執行器已關閉,則丟棄.
  3. ThreadPoolExecutor.DiscardPolicy策略,不能執行的任務將被丟棄.
  4. ThreadPoolExecutor.DiscardOldestPolicy策略,如果執行程序尚未關閉,則位於工作隊列頭部的任務將被刪除,然后重試執行程序(如
    果再次失敗,則重復此過程)

2.2 異步方法

@Async注解可以用在方法上,表示該方法是個異步方法,也可以用在類上,那么表示此類的所有方法都是異步方法
異步方法會自動注入使用ThreadPoolTaskExecutor作為TaskExecutor

package cn.jzh.thread;

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import java.util.concurrent.Future;

@Service
public class AsyncTaskService {
    /**
     * 
     * @param i
     */
    @Async
    public void executeAsync(Integer i) throws Exception{
        System.out.println("線程ID:" + Thread.currentThread().getId() + "線程名字:" +Thread.currentThread().getName()+"執行異步任務:" + i);
    }

    @Async
    public Future<String> executeAsyncPlus(Integer i) throws Exception {
        System.out.println("線程ID:" + Thread.currentThread().getId() +"線程名字:" +Thread.currentThread().getName()+ "執行異步有返回的任務:" + i);
        return new AsyncResult<>("success:"+i);
    }

}

2.3 啟動測試

package cn.jzh.thread;

import org.springframework.context.annotation.AnnotationConfigApplicationContext;

import java.util.concurrent.Future;

public class MainApp {
    public static void main(String[] args) throws Exception{
        System.out.println("主線程id:" + Thread.currentThread().getId() + "開始執行調用任務...");
        AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(TaskExecutorConfig.class);
        AsyncTaskService service = context.getBean(AsyncTaskService.class);
        for (int i = 0;i<10;i++){
            service.executeAsync(i);
            Future<String> result = service.executeAsyncPlus(i);
            System.out.println("異步程序執行結束,獲取子線程返回內容(會阻塞當前main線程)" + result.get());
        }
        context.close();

        System.out.println("主線程id:" + Thread.currentThread().getId() + "程序結束!!");
    }
}

注意:

  1. 是否影響主線程
    如果main主線程不去獲取子線程的結果(Future.get()),那么主線程完全可以不阻塞。那么,此時,主線程和子線程完全異步。此功能,可以做成類似MQ消息中間件之類的,消息異步進行發送
  2. 判斷是否執行完畢
    當返回的數據類型為Future類型,其為一個接口。具體的結果類型為AsyncResult,這個是需要注意的地方。
    調用返回結果的異步方法,判斷是否執行完畢時需要使用future.isDone()來判斷是否執行完畢
public void testAsyncAnnotationForMethodsWithReturnType()  
   throws InterruptedException, ExecutionException {  
    System.out.println("Invoking an asynchronous method. "   + Thread.currentThread().getName());  
    Future<String> future = asyncAnnotationExample.asyncMethodWithReturnType();  
   
    while (true) {  ///這里使用了循環判斷,等待獲取結果信息  
        if (future.isDone()) {  //判斷是否執行完畢  
            System.out.println("Result from asynchronous process - " + future.get());  
            break;  
        }  
        System.out.println("Continue doing something else. ");  
        Thread.sleep(1000);  
    }  
}

這些獲取異步方法的結果信息,是通過不停的檢查Future的狀態來獲取當前的異步方法是否執行完畢來實現的


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM