論如何優雅的自定義ThreadPoolExecutor線程池


更好的markDown閱讀體驗可直接訪問我的CSDN博客:https://blog.csdn.net/u012881584/article/details/85221635

前言

線程池想必大家也都用過,JDK的Executors 也自帶一些線程池。但是不知道大家有沒有想過,如何才是最優雅的方式去使用過線程池嗎? 生產環境要怎么去配置自己的線程池才是合理的呢?
今天周末,剛好有時間來總結一下自己所認為的'優雅', 如有問題歡迎大家指正。

線程池使用規則

要使用好線程池,那么一定要遵循幾個規則:

  1. 線程個數大小的設置
  2. 線程池相關參數配置
  3. 利用Hook嵌入你的行為
  4. 線程池的關閉

線程池配置相關

線程池大小的設置

這其實是一個面試的考點,很多面試官會問你線程池coreSize 的大小來考察你對於線程池的理解。
首先針對於這個問題,我們必須要明確我們的需求是計算密集型還是IO密集型,只有了解了這一點,我們才能更好的去設置線程池的數量進行限制。

1、計算密集型:
顧名思義就是應用需要非常多的CPU計算資源,在多核CPU時代,我們要讓每一個CPU核心都參與計算,將CPU的性能充分利用起來,這樣才算是沒有浪費服務器配置,如果在非常好的服務器配置上還運行着單線程程序那將是多么重大的浪費。對於計算密集型的應用,完全是靠CPU的核數來工作,所以為了讓它的優勢完全發揮出來,避免過多的線程上下文切換,比較理想方案是:

線程數 = CPU核數+1,也可以設置成CPU核數*2,但還要看JDK的版本以及CPU配置(服務器的CPU有超線程)。

一般設置CPU * 2即可。

2、IO密集型
我們現在做的開發大部分都是WEB應用,涉及到大量的網絡傳輸,不僅如此,與數據庫,與緩存間的交互也涉及到IO,一旦發生IO,線程就會處於等待狀態,當IO結束,數據准備好后,線程才會繼續執行。因此從這里可以發現,對於IO密集型的應用,我們可以多設置一些線程池中線程的數量,這樣就能讓在等待IO的這段時間內,線程可以去做其它事,提高並發處理效率。那么這個線程池的數據量是不是可以隨便設置呢?當然不是的,請一定要記得,線程上下文切換是有代價的。目前總結了一套公式,對於IO密集型應用:
線程數 = CPU核心數/(1-阻塞系數) 這個阻塞系數一般為0.8~0.9之間,也可以取0.8或者0.9。
套用公式,對於雙核CPU來說,它比較理想的線程數就是20,當然這都不是絕對的,需要根據實際情況以及實際業務來調整:final int poolSize = (int)(cpuCore/(1-0.9))

針對於阻塞系數,《Programming Concurrency on the JVM Mastering》即《Java 虛擬機並發編程》中有提到一句話:

對於阻塞系數,我們可以先試着猜測,抑或采用一些性能分析工具或java.lang.management API 來確定線程花在系統/IO操作上的時間與CPU密集任務所耗的時間比值。

線程池相關參數配置

說到這一點,我們只需要謹記一點,一定不要選擇沒有上限限制的配置項
這也是為什么不建議使用Executors 中創建線程的方法。
比如,Executors.newCachedThreadPool的設置與無界隊列的設置因為某些不可預期的情況,線程池會出現系統異常,導致線程暴增的情況或者任務隊列不斷膨脹,內存耗盡導致系統崩潰和異常。 我們推薦使用自定義線程池來避免該問題,這也是在使用線程池規范的首要原則! 小心無大錯,千萬別過度自信!
可以看下Executors中四個創建線程池的方法:

//使用無界隊列
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

//線程池數量是無限的
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

其他的就不再列舉了,大家可以自行查閱源碼。

第二,合理設置線程數量、和線程空閑回收時間,根據具體的任務執行周期和時間去設定,避免頻繁的回收和創建,雖然我們使用線程池的目的是為了提升系統性能和吞吐量,但是也要考慮下系統的穩定性,不然出現不可預期問題會很麻煩!
第三,根據實際場景,選擇適用於自己的拒絕策略。進行補償,不要亂用JDK支持的自動補償機制!盡量采用自定義的拒絕策略去進行兜底!
第四,線程池拒絕策略,自定義拒絕策略可以實現RejectedExecutionHandler接口。
JDK自帶的拒絕策略如下:
AbortPolicy:直接拋出異常阻止系統正常工作。
CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。
DiscardOldestPolicy:丟棄最老的一個請求,嘗試再次提交當前任務。
DiscardPolicy:丟棄無法處理的任務,不給予任何處理。

利用Hook

利用Hook,留下線程池執行軌跡:
ThreadPoolExecutor提供了protected類型可以被覆蓋的鈎子方法,允許用戶在任務執行之前會執行之后做一些事情。我們可以通過它來實現比如初始化ThreadLocal、收集統計信息、如記錄日志等操作。這類Hook如beforeExecute和afterExecute。另外還有一個Hook可以用來在任務被執行完的時候讓用戶插入邏輯,如rerminated 。
如果hook方法執行失敗,則內部的工作線程的執行將會失敗或被中斷。

我們可以使用beforeExecute和afterExecute來記錄線程之前前和后的一些運行情況,也可以直接把運行完成后的狀態記錄到ELK等日志系統。

關閉線程池

內容當線程池不在被引用並且工作線程數為0的時候,線程池將被終止。我們也可以調用shutdown來手動終止線程池。如果我們忘記調用shutdown,為了讓線程資源被釋放,我們還可以使用keepAliveTime和allowCoreThreadTimeOut來達到目的!
當然,穩妥的方式是使用虛擬機Runtime.getRuntime().addShutdownHook方法,手工去調用線程池的關閉方法!

線程池使用實例

線程池核心代碼:

public class AsyncProcessQueue {

 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

 /**
  * Task 包裝類<br>
  * 此類型的意義是記錄可能會被 Executor 吃掉的異常<br>
  */
 public static class TaskWrapper implements Runnable {
  private static final Logger _LOGGER = LoggerFactory.getLogger(TaskWrapper.class);

  private final Runnable gift;

  public TaskWrapper(final Runnable target) {
   this.gift = target;
  }

  @Override
  public void run() {

   // 捕獲異常,避免在 Executor 里面被吞掉了
   if (gift != null) {

    try {
     gift.run();
    } catch (Exception e) {
     _LOGGER.error("Wrapped target execute exception.", e);
    }
   }
  }
 }

 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

 /**
  * 執行指定的任務
  * 
  * @param task
  * @return
  */
 public static boolean execute(final Runnable task) {
  return AsyncProcessor.executeTask(new TaskWrapper(task));
 }
}
public class AsyncProcessor {
 static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class);

 /**
  * 默認最大並發數<br>
  */
 private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;

 /**
  * 線程池名稱格式
  */
 private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d";

 /**
  * 線程工廠名稱
  */
 private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
   .daemon(true).build();

 /**
  * 默認隊列大小
  */
 private static final int DEFAULT_SIZE = 500;

 /**
  * 默認線程存活時間
  */
 private static final long DEFAULT_KEEP_ALIVE = 60L;

 /**NewEntryServiceImpl.java:689
  * Executor
  */
 private static ExecutorService executor;

 /**
  * 執行隊列
  */
 private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);

 static {
  // 創建 Executor
  // 此處默認最大值改為處理器數量的 4 倍
  try {
   executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
     TimeUnit.SECONDS, executeQueue, FACTORY);

   // 關閉事件的掛鈎
   Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

    @Override
    public void run() {
     AsyncProcessor.LOGGER.info("AsyncProcessor shutting down.");

     executor.shutdown();

     try {

      // 等待1秒執行關閉
      if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
       AsyncProcessor.LOGGER.error("AsyncProcessor shutdown immediately due to wait timeout.");
       executor.shutdownNow();
      }
     } catch (InterruptedException e) {
      AsyncProcessor.LOGGER.error("AsyncProcessor shutdown interrupted.");
      executor.shutdownNow();
     }

     AsyncProcessor.LOGGER.info("AsyncProcessor shutdown complete.");
    }
   }));
  } catch (Exception e) {
   LOGGER.error("AsyncProcessor init error.", e);
   throw new ExceptionInInitializerError(e);
  }
 }

 /**
  * 此類型無法實例化
  */
 private AsyncProcessor() {
 }

 /**
  * 執行任務,不管是否成功<br>
  * 其實也就是包裝以后的 {@link Executer} 方法
  * 
  * @param task
  * @return
  */
 public static boolean executeTask(Runnable task) {

  try {
   executor.execute(task);
  } catch (RejectedExecutionException e) {
   LOGGER.error("Task executing was rejected.", e);
   return false;
  }

  return true;
 }

 /**
  * 提交任務,並可以在稍后獲取其執行情況<br>
  * 當提交失敗時,會拋出 {@link }
  * 
  * @param task
  * @return
  */
 public static <T> Future<T> submitTask(Callable<T> task) {

  try {
   return executor.submit(task);
  } catch (RejectedExecutionException e) {
   LOGGER.error("Task executing was rejected.", e);
   throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
  }
 }
}

使用方式:

AsyncProcessQueue.execute(new Runnable() {
          @Override
         public void run() {
               //do something
        }
});

可以根據自己的使用場景靈活變更,我這里並沒有用到beforeExecute和afterExecute以及拒絕策略。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM