一次Java線程池誤用(newFixedThreadPool)引發的線上血案和總結


一次Java線程池誤用(newFixedThreadPool)引發的線上血案和總結


這是一個十分嚴重的線上問題

自從最近的某年某月某天起,線上服務開始變得不那么穩定(軟病)。在高峰期,時常有幾台機器的內存持續飆升,並且無法回收,導致服務不可用。

給出監控中GC的采樣曲線:

內存使用曲線如下:

如上兩張圖顯示:18:50-19:00的這10分鍾階段里,服務已經處於不可用的狀態了。這就導致了:上游服務的超時異常會增加,該台機器會觸發熔斷。

熔斷觸發后,這台機器的流量會打到其他機器,其他機器發生類似的情況的可能性會提高,極端情況會引起所有服務宕機,造成雪崩,曲線掉底。

問題分析和猜想

結合我們的業務情況,我們監控到在那段時間里,訪問量是最高的,屬於一個高峰情況,因此我們初步斷定,這個和流量高並發有密不可分個的關系。

1、因為線上內存過大,如果采用 jmap dump的方式,這個任務可能需要很久才可以執行完,同時把這么大的文件存放起來導入工具也是一件很難的事情

2、再看JVM啟動參數,也很久沒有變更過 Xms, Xmx, -XX:NewRatio, -XX:SurvivorRatio, 雖然沒有仔細分析程序使用內存情況,但看起來也無大礙。

3、於是開始找代碼,某年某天某月~ 嗯,注意到一段這樣的代碼提交:

private static ExecutorService executor = Executors.newFixedThreadPool(15); public static void push2Kafka(Object msg) { executor.execute(new WriteTask(msg, false)); }

這段代碼的功能是:每次線上調用,都會把計算結果的日志打到 Kafka,Kafka消費方再繼續后續的邏輯。

看這塊代碼的問題:咋一看,好像沒什么問題,但深入分析,問題就出現在 Executors.newFixedThreadPool(15)這段代碼上。

因為使用了 newFixedThreadPool 線程池,而它的工作機制是,固定了N個線程,而提交給線程池的任務隊列是不限制大小的,如果Kafka發消息被阻塞或者變慢,那么顯然隊列里面的內容會越來越多,也就會導致這樣的問題。

    public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

如上,采用的是LinkedBlockingQueue,而它默認是一個無界隊列。因此若使用不當,講很快導致內存被打滿,需要謹慎啊。

驗證猜想

為了驗證這個想法,做了個小實驗,把 newFixedThreadPool 線程池的線程個數調小一點,然后自己模擬壓測一下: 測試代碼如下:

/**
 * @author fangshixiang@vipkid.com.cn
 * @description
 * @date 2018-11-04 10:13
 */
public class Main { //創建一個固定線程池 private static ExecutorService executor = Executors.newFixedThreadPool(1); //向kafka里推送消費 public static void push2Kafka(Object msg) { executor.execute(() -> { try { //模擬 占用的內存大小 Byte[] bytes = new Byte[1024 * 1000 * 1000]; System.out.println(Thread.currentThread().getName() + "-->任務放到線程池:" + msg); TimeUnit.MINUTES.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); } public static void main(String[] args) { //模擬高並發環境下 一直向線程池里面不停的塞任務 for (int i = 0; i < Integer.MAX_VALUE; i++) { System.out.println("塞任務start..." + i); push2Kafka(i); System.out.println("塞任務end..." + i); } } }

打開JConsole查看JVM的CPU、內存相關使用情況:

內存情況逐漸攀升,最終可以看出程序近乎停止。最終拋出內存異常

Exception in thread "pool-1-thread-295" java.lang.OutOfMemoryError: Java heap space

然而,電腦本機的實體內存,也是幾乎會被占滿:

下面是程序啟用和停止的內存情況:

綜上所訴,我們的猜想是正確的。如果消費的速度小於生產的速度,內存隨着時間的堆積,很快就能被打滿了。

解決方案

問題根源找到了,解決的方法其實就非常的簡單了,采取了自定義線程池參數。

在我們的修復方案中,選擇的就是有界隊列,雖然會有部分任務被丟失,但是我們線上是排序日志搜集任務,所以對部分對丟失是可以容忍的。

Java提供的四種常用線程池解析 Executors

既然樓主踩坑就是使用了 JDK 的默認實現,那么再來看看這些默認實現到底干了什么,封裝了哪些參數。簡而言之 Executors 工廠方法Executors.newCachedThreadPool() 提供了無界線程池,可以進行自動線程回收;Executors.newFixedThreadPool(int) 提供了固定大小線程池,內部使用無界隊列;Executors.newSingleThreadExecutor() 提供了單個后台線程。

newCachedThreadPool:可緩存線程池
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } 

這種類型的線程池特點是:

  • 工作線程的創建數量幾乎沒有限制(其實也有限制的,數目為Interger. MAX_VALUE), 這樣可靈活的往線程池中添加線程。
  • 如果長時間沒有往線程池中提交任務,即如果工作線程空閑了指定的時間(默認為1分鍾),則該工作線程將自動終止。終止后,如果你又提交了新的任務,則線程池重新創建一個工作線程。
  • 在使用CachedThreadPool時,一定要注意控制任務的數量,否則,由於大量線程同時運行,很有會造成系統癱瘓。
public class Main { public static void main(String[] args) { ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index * 100); } catch (Exception e) { e.printStackTrace(); } cachedThreadPool.execute(() -> System.out.println(index + "當前線程" + Thread.currentThread().getName())); } } } 輸出: 0當前線程pool-1-thread-1 1當前線程pool-1-thread-1 2當前線程pool-1-thread-1 3當前線程pool-1-thread-1 4當前線程pool-1-thread-1 5當前線程pool-1-thread-1 6當前線程pool-1-thread-1 7當前線程pool-1-thread-1 8當前線程pool-1-thread-1 9當前線程pool-1-thread-1

發現10個線程都是使用的線程1,線程池為無限大,當執行第二個任務時第一個任務已經完成,會復用執行第一個任務的線程,而不用每次新建線程。

newFixedThreadPool
 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }

看代碼一目了然了,線程數量固定,使用無限大的隊列。再次強調,樓主就是踩的這個無限大隊列的坑。

newScheduledThreadPool

創建一個定長線程池,支持定時及周期性任務執行。

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); }

在來看看ScheduledThreadPoolExecutor()的構造函數:

 public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); } 

ScheduledThreadPoolExecutor的父類即ThreadPoolExecutor,因此這里各參數含義和上面一樣。值得關心的是DelayedWorkQueue這個阻塞對列。

它作為靜態內部類就在ScheduledThreadPoolExecutor中進行了實現。簡單的說,DelayedWorkQueue是一個無界隊列,它能按一定的順序對工作隊列中的元素進行排列。

newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }

注意:該靜態方法,禁止使用,因為里面有不少坑,這里不做過多解釋

關於線程池的阻塞隊列的各種用法,請參見博文: 【小家java】BlockingQueue阻塞隊列詳解以及5大實現(ArrayBlockingQueue、DelayQueue、LinkedBlockingQueue…)

結束語

雖然之前學習了不少相關知識,但是只有在實踐中踩坑才能印象深刻吧

可以通過Executors靜態工廠構建線程池,但一般不建議這樣使用。

附:ThreadFactory簡單介紹

ThreadFactory是一個線程工廠。用來創建線程。這里為什么要使用線程工廠呢?其實就是為了統一在創建線程時設置一些參數,如是否守護線程。線程一些特性等,如優先級。通過這個TreadFactory創建出來的線程能保證有相同的特性。它首先是一個接口類,而且方法只有一個。就是創建一個線程。

public interface ThreadFactory { Thread newThread(Runnable r); } 

所以我們可以自己實現這個工廠,然后定制屬於我們自己的一類線程

class MyThreadFactory implements ThreadFactory { private int counter; private String name; private List<String> stats; public MyThreadFactory(String name) { counter = 0; this.name = name; stats = new ArrayList<String>(); } @Override public Thread newThread(Runnable run) { Thread t = new Thread(run, name + "-Thread-" + counter); counter++; stats.add(String.format("Created thread %d with name %s on%s\n",t.getId(), t.getName(), new Date())); return t; } public String getStas() { StringBuffer buffer = new StringBuffer(); Iterator<String> it = stats.iterator(); while (it.hasNext()) { buffer.append(it.next()); buffer.append("\n"); } return buffer.toString(); } } //使用: MyThreadFactory factory = new MyThreadFactory("MyThreadFactory"); Thread thread = factory.newThread(new MyTask(i)); thread.start(); 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM