詳細講解線程池的使用


      前言:說起threadpoolexector應該大家多少都接觸過,現在我詳細的講解下其的用法

一:解析參數

為了更好地理解threadpoolexecutor,我先講一個例子,話說一個工作多年的高T,一天突然決定自己要單干組織一個團隊,經過仔細的考慮他做出了如下的決定

1、團隊的核心人員為10個

2、如果一旦出現項目過多人員不足的時候,則會聘請5個外包人員

3、接的項目單子最多堆積100

4、如果項目做完了團隊比較空閑,則裁掉外包人員,空閑的時間為一個月

5、如果接的單子超過100個,則后續考慮一些兜底策略(比如拒絕多余的單子,或者把多出100個以外的單子直接交付第三方公司做)

6、同時他還考慮了如果效益一直不好,那么就裁掉所有人,宣布公司直接倒閉

上面的例子恰恰和我們的線程池非常的像,我們來看下threadpoolexecutor的定義。

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler);

corePoolSize:核心線程數(就是我們上面說的一個公司核心人員)

maximumPoolSize:最大線程數(就是我們說的一旦公司接收的單子過多則聘請外包,此時也是公司最大的人員了,因為人多了辦公地方不夠了)

keepAliveTime:超多核心線程數之外線程的存活時間(就是如果公司一旦活不多要多久進行裁掉外包人員)

unit:上面時間的單元(可以年月日時分秒等)

workQueue:任務隊列(就是如果公司最大能存的單子)

handler:拒絕策略(就是一旦任務滿了應該如果處理多余的單子)

allowCoreThreadTimeOut:設置是否清理核心線程(如果設置true,如果任務少於實際執行的線程則會清理核心線程,默認為false)

二:實際演練

先驗證核心線程數

public class ThreadPoolExecutorTest {
    public static void main(String[] args) throws InterruptedException {
        RejectedExecutionHandler handler = new RejectedExecutionHandlerImpl();
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(9));
        for (int i = 0; i < 11; i++) {
            AppTask appTask = new AppTask(i);
            poolExecutor.execute(appTask);
            System.out.println("線程池中線程的數目:" + poolExecutor.getPoolSize() + ",線程池中等待的隊列數目:" + poolExecutor.getQueue().size() + ";線程池中已執行完畢的任務數據:" + poolExecutor.getCompletedTaskCount());
        }
        poolExecutor.allowCoreThreadTimeOut(true);
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
    }

    static class AppTask implements Runnable {
        private int taskNum;

        public AppTask(int num) {
            this.taskNum = num;
        }

        @Override
        public void run() {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName()+"_task_" + this.taskNum + ":執行完畢");
        }
    }

    static LinkedBlockingQueue<Runnable> xs = new LinkedBlockingQueue(10000);
    static int i = 0;

    static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                xs.put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
代碼塊1

那么我們來看看運行的結果

我們可以看出線程池此時只有2個線程,足矣說明當隊列的數+核心線程數<=任務數,則線程池中只會有核心線程工作。

如果把上述代碼中for循環中最大值改為14呢,那么我們在來看運行的結果

我們可以看的出線程池中的線程已經是5個了,說明當任務>隊列最大值+核心線程數的時候線程池則會生成新的線程來處理任務。

上面我們基本弄明白了核心線程數,最大線程數這些概念,現在我們再來看2個比較重要的參數隊列和拒絕策略

隊列

1、直接提交(SynchronousQueue,不會保存任何任務)

2、無界隊列(LinkedBlockingQueue,對於新加來的任務全部存入隊列中,量大可能會導致oom)

3、有界隊列(ArrayBlockingQueue,隊列有一個最大值,超過最大值的任務交給拒絕策略處理)

拒絕策略

1、當線程池中的數量等於最大線程數時對於新傳入的任務則拋異常(AbortPolicy)

2、當線程池中的數量等於最大線程數時拋棄消息,不做任務處理(DiscardPolicy)

3、當線程池中的數量等於最大線程數時主線程處理新傳入的任務消息(CallerRunsPolicy)

4、當線程池中的數量等於最大線程數時 、拋棄線程池中最后一個要執行的任務,並執行新傳入的任務(DiscardOldestPolicy)

隊列這里就不多說了大家可以自己去實踐,這里我們主要說拒絕策略,現在線程池默認的是第一種拒絕策略,直接拋異常,第二種不做處理的這種我們也不提了,我們主要來看下第三種和第四種拒絕策略,先看下第三種拒絕策略的源碼

 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
代碼塊2

從中我們可以看出新傳入的任務並沒有交接線程池來處理,直接交給主線程來處理的,看我下面這塊代碼以及執行結果

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),new ThreadPoolExecutor.CallerRunsPolicy());
        for (int i = 0; i < 12; i++) {
            AppTask appTask = new AppTask(i);
            poolExecutor.execute(appTask);
            System.out.println("線程池中線程的數目:" + poolExecutor.getPoolSize() + ",線程池中等待的隊列數目:" + poolExecutor.getQueue().size() + ";線程池中已執行完畢的任務數據:" + poolExecutor.getCompletedTaskCount());
        }
        poolExecutor.allowCoreThreadTimeOut(true);
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
代碼塊3

運行結果

從結果顯示對於多出隊列的任務則由主線程來執行,主線程執行完畢后,由於隊列被釋放了一些任務,新來的任務又會交給線程池來處理。

第四種情況我們可以看下源碼

 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
代碼塊4

我們可以看出先執行新進入的任務,然后將隊列頭的任務拋棄,這樣會導致一部分消息丟失

 三:使用場景

1、異步處理日志,這個是比較場景的采用線程池來解決的

2、定時任務,定時對數據庫備份、定時更新redis配置等,定時發送郵件

3、數據遷移

這些常見的一些場景我們就應該優先來考慮線程池來解決

四:堵塞線程池和非堵塞線程池

這里我說下通過日志處理方式來講解堵塞線程池和非堵塞線程池.日志的作用便於我們分析問題,但是對於服務本身而言卻不是必須的,這也是為什么我們一般都會異步來處理日志的情況。

1、堵塞線程池

在上面幾種幾種隊列中,如果我們選取有有界隊列來,拒絕策略可以采用CallerRunsPolicy,這樣一來就不會出現消息丟失、內存溢出等問題,當然我們也可以重寫拒絕策略,我們來看下面一段代碼

 RejectedExecutionHandler handler = new RejectedExecutionHandlerImpl();
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5),handler);
        for (int i = 0; i < 12; i++) {
            AppTask appTask = new AppTask(i);
            poolExecutor.execute(appTask);
            System.out.println("線程池中線程的數目:" + poolExecutor.getPoolSize() + ",線程池中等待的隊列數目:" + poolExecutor.getQueue().size() + ";線程池中已執行完畢的任務數據:" + poolExecutor.getCompletedTaskCount());
        }
        poolExecutor.allowCoreThreadTimeOut(true);
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }

 static int count=0;
    static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                executor.getQueue().put(r);
                count++;
                System.out.println("阻塞隊列中個數:"+count);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
代碼塊5

運行結果如下

我們可以看出所有的任務都執行完畢了,因為拒絕策略中我們把新進入的任務在次放入隊列中,我們用put這個方法,這個是java提供給我們的阻塞隊列,如果滿了就會一直等待直到隊列中其他任務被釋放。

優點:

1、不會造成內存溢出

2、不會出現消息丟失

3、對消費者來說並不需要非常復雜的處理就能滿足業務需求

缺點:

對生成者來說就要變得復雜的多,如果消息量qps過高消費者消費能力不足(如果消費者不足以處理生產者的任務則會堵塞等待,那么生產者肯定得不到響應,會報出超時異常),如果不采用一些措施將會導致消息丟失,所以對生成者來說必須要持久化的記錄消息,而且設置最大量,如果出現超過最大量的80%則報警,所以在設計生產者的時候必須考慮這樣的場景,

總結:

堵塞線程池其實就是把消費者消費能力不足的壓力交給生產者來處理

2、非堵塞線程池

非堵塞線程池說白就是消息來了就處理,處理不足則進行存儲,或者記錄日志,我們看如下的代碼

RejectedExecutionHandler handler = new RejectedExecutionHandlerImpl();
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5, 20L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5), handler);
        for (int i = 0; i < 12; i++) {
            AppTask appTask = new AppTask(i);
            poolExecutor.execute(appTask);
            System.out.println("線程池中線程的數目:" + poolExecutor.getPoolSize() + ",線程池中等待的隊列數目:" + poolExecutor.getQueue().size() + ";線程池中已執行完畢的任務數據:" + poolExecutor.getCompletedTaskCount());
        }
        MainThread.exec();
        poolExecutor.allowCoreThreadTimeOut(true);
        if (!poolExecutor.isShutdown()) {
            poolExecutor.shutdown();
        }
 static LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue(10000);
    static class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                blockingQueue.put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    static int count=0;
    static class MainThread {
        public static void exec() throws InterruptedException {
            while (true) {
                if (blockingQueue.size() >= 8000) {
                    System.out.println("報警");
                }
                Runnable r = blockingQueue.poll();
                if (r == null) {
                    Thread.sleep(1000);
                    continue;
                }
                count++;
                r.run();
                System.out.println("非堵塞線程池執行的個數:"+count);
            }
        }
    }
代碼塊6

執行結果

 有人問和上面有什么區別呢,其實區別就是這個不會出現堵塞,這里我雖然采用堵塞隊列來存儲,為了更好的展示,其實這里你可以打印日志,或者存入redis中然后進行處理。

優點:

1、也不會造成內存溢出

2、消費不會出現堵塞

缺點:

這樣設計明顯會復雜的很多,而且獲取消息值不易,就我目前來看我更傾向於采用堵塞線程池

五:線程池的設計

線程池的設計最重要的一點就是計算出生產者最大的qps量和單個線程消費的能力,比如我們生產者qps是1萬,但是我們單個線程處理每個任務的時間是2毫秒,如果我們cpu是4核,那么我們核心線程是4*2=8個,所以我們每秒處理的任務數是1000/2*8=4000,很顯然我們的消費能力遠遠不足,這個時候我們應該考慮采用多台機器處理,有人不是可以堵塞隊列么,其實那是一種兜底策略,避免消息丟失,但這並不是我們設計的核心。如果我們能計算出單條消息的大小(如1k)我們分配這個消息服務的內存是300M,那么我們可以做個折中150M來存儲多余的消息,那么可以存儲的量是1百萬,如果我們的流量高峰是30分鍾,每秒處理剩余的消息是200,那么這半小時之內總共剩余的消息總量是30*60*200=360000,這樣一來完全可以滿足我們的業務需求。

六:線程池的有點

1、減少頻繁的創建和銷毀線程(由於線程創建和銷毀都會耗用一定的內存)

2、線程池也是多線程,充分利用CPU,提高系統的效率

七:Executors下的4種創建線程池的方式

1、newSingleThreadExecutor();

這個線程池的特點是核心線程數只有一個,最大線程數也只有一個,采用的隊列是無界隊列,可能會導致內存溢出

2、newFixedThreadPool(5)

這個線程池特點是核心線程數由自己設置並且一旦設置就是固定的不在改變,采用的隊列是無界隊列,可能會導致內存溢出

3、newCachedThreadPool()

這個線程池特點是核心線程數為0,最大線程數無界,這樣也會造成內存溢出

4、newScheduledThreadPool(5)

這個線程池特點是可以周期性執行任務,核心線程數需要自己進行初始化,最大線程數無界,這樣也會造成內存溢出

我們在實際操作中,盡量避免使用Executors來創建多線程,因為如果消息量過大會導致內存溢出,消息丟失

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM