手動創建線程池,效果會更好哦。 建議使用ThreadPoolExecutor線程池創建


手動創建線程池,效果會更好哦。
Inspection info:
線程池不允許使用Executors去創建,而是通過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同學更加明確線程池的運行規則規避資源耗盡的風險

說明:Executors返回的線程池對象的弊端如下:
1)FixedThreadPool和SingleThreadPool:
  允許的請求隊列長度為Integer.MAX_VALUE,可能會堆積大量的請求,從而導致OOM
2)CachedThreadPool:
  允許的創建線程數量為Integer.MAX_VALUE,可能會創建大量的線程,從而導致OOM

Positive example 1//org.apache.commons.lang3.concurrent.BasicThreadFactory
ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1,
new BasicThreadFactory.Builder().namingPattern("example-schedule-pool-%d").daemon(true).build());



Positive example 2:
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("demo-pool-%d").build();

//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(5, 200,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

pool.execute(()-> System.out.println(Thread.currentThread().getName()));
pool.shutdown();//gracefully shutdown



Positive example 3<bean id="userThreadPool"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="10" />
<property name="maxPoolSize" value="100" />
<property name="queueCapacity" value="2000" />

<property name="threadFactory" value= threadFactory />
<property name="rejectedExecutionHandler">
<ref local="rejectedExecutionHandler" />
</property>
</bean>
//in code
userThreadPool.execute(thread);

 

線程池執行任務邏輯和線程池參數的關系


執行邏輯說明:

  • 判斷核心線程數是否已滿,核心線程數大小和corePoolSize參數有關,未滿則創建線程執行任務
  • 若核心線程池已滿,判斷隊列是否滿,隊列是否滿和workQueue參數有關,若未滿則加入隊列中
  • 若隊列已滿,判斷線程池是否已滿,線程池是否已滿和maximumPoolSize參數有關,若未滿創建線程執行任務
  • 若線程池已滿,則采用拒絕策略處理無法執執行的任務,拒絕策略和handler參數有關
     

ThreadPoolExecutor是線程池中最核心的一個類,其繼承關系如下

 其中Executor是線程池的頂級接口,接口中只定義了一個方法  void execute(Runnable command);線程池的操作方法都是定義子在ExecutorService子接口中的,所以說ExecutorService是線程池真正的接口

 

Executors創建返回ThreadPoolExecutors對象


Executors創建返回 靜態ThreadPoolExecutor對象 的方法共有三種:

  • Executors#newCachedThreadPool => 創建可緩存的線程池
  • Executors#newSingleThreadExecutor => 創建單線程的線程池
  • Executors#newFixedThreadPool => 創建固定長度的線程池
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

 


ThreadPoolExecutor的構造函數共有四個,但最終調用的都是同一個:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

構造函數參數說明:

  • corePoolSize => 線程池核心線程數量
  • maximumPoolSize => 線程池最大數量
  • keepAliveTime => 空閑線程存活時間
  • unit => 時間單位
  • workQueue => 線程池所使用的緩沖隊列
  • threadFactory => 線程池創建線程使用的工廠
  • handler => 線程池對拒絕任務的處理策略

 
OOM異常測試


理論上會出現OOM異常,必須測試一波驗證之前的說法:

測試類:TaskTest.java

public class TaskTest {
    public static void main(String[] args) {
        ExecutorService es = Executors.newCachedThreadPool();
        int i = 0;
        while (true) {
            es.submit(new Task(i++));
        }
    }
}

使用Executors創建的CachedThreadPool,往線程池中無限添加線程

在啟動測試類之前先將JVM內存調整小一點,不然很容易將電腦跑出問題【別問我為什么知道,是鐵憨憨甜沒錯了!!!】,在idea里:Run -> Edit Configurations

JVM參數說明:

  • -Xms10M => Java Heap內存初始化值
  • -Xmx10M => Java Heap內存最大值

運行結果:

Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler 
in thread "main"Disconnected from the target VM, address: '127.0.0.1:60416',
 transport: 'socket'

創建到3w多個線程的時候開始報OOM錯誤

 

如何定義線程池參數

CPU密集型:線程池的大小推薦為CPU數量+1。CPU數量可以根據 Runtime.getRuntime().availableProcessors() 方法獲取
IO密集型:CPU數量 * CPU利用率 *(1 + 線程等待時間/線程CPU時間)
混合型 => 將任務分為CPU密集型和IO密集型,然后分別使用不同的線程池去處理,從而使每個線程池可以根據各自的工作負載來調整

阻塞隊列 => 推薦使用有界隊列,有界隊列有助於避免資源耗盡的情況發生

拒絕策略 => 默認采用的是AbortPolicy拒絕策略,直接在程序中拋出RejectedExecutionException異常【因為是運行時異常,不強制catch】,這種處理方式不夠優雅。處理拒絕策略有以下幾種比較推薦:

  • 在程序中捕獲RejectedExecutionException異常,在捕獲異常中對任務進行處理。針對默認拒絕策略
  • 使用CallerRunsPolicy拒絕策略,該策略會將任務交給調用execute的線程執行【一般為主線程】,此時主線程將在一段時間內不能提交任何任務,從而使工作線程處理正在執行的任務。此時提交的線程將被保存在TCP隊列中,TCP隊列滿將會影響客戶端,這是一種平緩的性能降低
  • 自定義拒絕策略,只需要實現RejectedExecutionHandler接口即可
  • 如果任務不是特別重要,使用DiscardPolicy和DiscardOldestPolicy拒絕策略將任務丟棄也是可以的

如果使用Executors的靜態方法創建ThreadPoolExecutor對象,可以通過使用Semaphore對任務的執行進行限流也可以避免出現OOM異常。


規避資源耗盡的風險,推薦寫法:

//獲取系統處理器個數,作為線程池數量
int nThreads = Runtime.getRuntime().availableProcessors();
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
        .setNameFormat("demo-pool-%d").build();
 
//Common Thread Pool
ExecutorService pool = new ThreadPoolExecutor(nThreads , 200, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM