這么講線程池,徹底明白了!


大家好,我是老三,很高興又和大家見面,最近降溫,大家注意保暖。

這節分享Java線程池,接下來我們一步步把線程池扒個底朝天。

引言:老三取錢

有一個程序員,他的名字叫老三。

老三兜里沒有錢,匆匆銀行業務辦。

這天起了一大早,銀行姐姐說早安。

老三一看櫃台空,卡里五毛都取完。

直接辦理

老三這天起的晚,營業窗口都排滿。

只好進入排隊區,摸出手機等空閑。

老三排隊等待

老三睡到上三桿,窗口排隊都爆滿。

經理一看開新口,排隊同志趕緊辦。

排隊區滿

這天業務太火爆,櫃台排隊都用完。

老三一看急上火,經理你說怎么辦。

窗口,排隊都爆滿

經理揮手一笑間,這種場面已見慣。四種辦法來處理,你猜我會怎么辦。

  • 小小銀行不堪負,陳舊系統已癱瘓。
  • 我們廟小對不起,誰叫你來找誰辦。
  • 看你情況特別急,來去隊里加個塞。
  • 今天實在沒辦法,不行你看改一天。

四種策略

對,沒錯,其實這個流程就和JDK線程池ThreadPoolExecutor的工作流程類似,先賣個關子,后面結合線程池工作流程,保證你會豁然開朗。

實戰:線程池管理數據處理線程

光說不練假把式,show you code,我們來一個結合業務場景的線程池實戰。——很多同學面試的時候,線程池原理背的滾瓜爛熟,一問項目中怎么用的,歇菜。看完這個例子,趕緊琢磨琢磨,項目里有什么地方能套用的。

應用場景

應用場景非常簡單,我們的項目是一個審核類的系統,每年到了核算的時候,需要向第三方的核算系統提供數據,以供核算。

這里存在一個問題,由於歷史原因,核算系統提供的接口只支持單條推送,但是實際的數據量是三十萬條,如果一條條推送,那么起碼得一個星期。

所以就考慮使用多線程的方式來推送數據,那么,線程通過什么管理呢?線程池

為什么要用線程池管理線程呢?當然是為了線程復用。

線程池實際應用場景

思路也很簡單,開啟若干個線程,每個線程從數據庫中讀取取(start,count]區間未推送的數據進行推送。

數據分段推送

具體代碼實現

我把這個場景提取了出來,主要代碼:

主要代碼

代碼比較長,所以用了carbon美化,代碼看不清,沒關系,可運行的代碼我都上傳到了遠程倉庫,倉庫地址:https://gitee.com/fighter3/thread-demo.git ,這個例子比較簡單,沒有用過線程池的同學可以考慮你有沒有什么數據處理、清洗的場景可以套用,不妨借鑒、演繹一下。

本文主題是線程池,所以我們重點關注線程池的代碼:

線程池構造

//核心線程數:設置為操作系統CPU數乘以2
    private static final Integer CORE_POOL_SIZE = Runtime.getRuntime().availableProcessors() * 2;
    //最大線程數:設置為和核心線程數相同
    private static final Integer MAXIMUM_POOl_SIZE = CORE_POOL_SIZE;
    //創建線程池
    ThreadPoolExecutor pool = new ThreadPoolExecutor(CORE_POOL_SIZE, MAXIMUM_POOl_SIZE * 2, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100));

線程池直接采用ThreadPoolExecutor構造:

  • 核心線程數設置為CPU數×2
  • 因為需要分段數據,所以最大線程數設置為和核心線程數一樣
  • 阻塞隊列使用LinkedBlockingQueue
  • 拒絕策略使用默認

線程池提交任務

//提交線程,用數據起始位置標識線程
Future<Integer> future = pool.submit(new PushDataTask(start, LIMIT, start));
  • 因為需要返回值,所以使用submit()提交任務,如果使用execute()提交任務,沒有返回值。

代碼不負責,可以done下來跑一跑。

那么,線程池具體是怎么工作的呢?我們接着往下看。

原理:線程池實現原理

線程池工作流程

構造方法

我們在構造線程池的時候,使用了ThreadPoolExecutor的構造方法:

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }

先來看看幾個參數的含義:

  • corePoolSize: 核心線程數

  • maximumPoolSize:允許的最大線程數(核心線程數+非核心線程數)

  • workQueue:線程池任務隊列

    用來保存等待執行的任務的阻塞隊列,常見阻塞隊列有:

    • ArrayBlockingQueue:一個基於數組結構的有界阻塞隊列
    • LinkedBlockingQueue:基於鏈表結構的阻塞隊列
    • SynchronousQueue:不存儲元素的阻塞隊列
    • PriorityBlockingQueue:具有優先級的無限阻塞隊列
  • handler: 線程池飽和拒絕策略

    JDK線程池框架提供了四種策略:

    • AbortPolicy:直接拋出異常,默認策略。
    • CallerRunsPolicy:用調用者所在線程來運行任務。
    • DiscardOldestPolicy:丟棄任務隊列里最老的任務
    • DiscardPolicy:不處理,丟棄當前任務

    也可以根據自己的應用場景,實現RejectedExecutionHandler接口來自定義策略。

上面四個是和線程池工作流程息息相關的參數,我們再來看看剩下三個參數。

  • keepAliveTime:非核心線程閑置下來最多存活的時間
  • unit:線程池中非核心線程保持存活的時間
  • threadFactory:創建一個新線程時使用的工廠,可以用來設定線程名等

線程池工作流程

知道了幾個參數,那么這幾個參數是怎么應用的呢?

execute()方法提交任務為例,我們來看線程池的工作流程:

線程池工作流程

向線程池提交任務的時候:

  • 如果當前運行的線程少於核心線程數corePoolSize,則創建新線程來執行任務
  • 如果運行的線程等於或多於核心線程數corePoolSize,則將任務加入任務隊列workQueue
  • 如果任務隊列workQueue已滿,創建新的線程來處理任務
  • 如果創建新線程使當前總線程數超過最大線程數maximumPoolSize,任務將被拒絕,線程池拒絕策略handler執行

結合一下我們開頭的生活事例,是不是就對上了:

老三取錢和線程池工作流程

線程池工作源碼分析

上面的流程分析,讓我們直觀地了解了線程池的工作原理,我們再來通過源碼看看細節。

提交線程(execute)

線程池執行任務的方法如下:

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //獲取當前線程池的狀態+線程個數變量的組合值
        int c = ctl.get();
        //1.如果正在運行線程數少於核心線程數
        if (workerCountOf(c) < corePoolSize) {
            //開啟新線程運行
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //2. 判斷線程池是否處於運行狀態,是則添加任務到阻塞隊列
        if (isRunning(c) && workQueue.offer(command)) {
            //二次檢查
            int recheck = ctl.get();
            //如果當前線程池不是運行狀態,則從隊列中移除任務,並執行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //如若當前線程池為空,則添加一個新線程
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //最后嘗試添加線程,如若添加失敗,執行拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }

我們來看一下execute()的詳細流程圖:

execute()具體代碼執行

新增線程 (addWorker)

execute方法代碼里,有個關鍵的方法private boolean addWorker(Runnable firstTask, boolean core),這個方法主要完成兩部分工作:增加線程數添加任務,並執行

  • 我們先來看第一部分增加線程數:
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 1.檢查隊列是否只在必要時為空(判斷線程狀態,且隊列不為空)
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;
            //2.循環CAS增加線程個數
            for (;;) {
                int wc = workerCountOf(c);
                //2.1 如果線程個數超限則返回 false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //2.2 CAS方式增加線程個數,同時只有一個線程成功,成功跳出循環
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //2.3 CAS失敗,看線程池狀態是否變化,變化則跳到外層,嘗試重新獲取線程池狀態,否則內層重新CAS
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        //3. 到這說明CAS成功了
        boolean workerStarted = false;
        boolean workerAdded = false;
  • 接着來看第二部分添加任務,並執行
       Worker w = null;
        try {
            //4.創建worker
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                //4.1、加獨占鎖 ,為了實現workers同步,因為可能多個線程調用了線程池的excute方法
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    //4.2、重新檢查線程池狀態,以避免在獲取鎖前調用了shutdown接口
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        //4.3添加任務
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //4.4、添加成功之后啟動任務
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;

我們來看一下整體的流程:

addWorker()流程

執行線程(runWorker)

用戶線程提交到線程池之后,由Worker執行,Worker是線程池內部一個繼承AQS、實現Runnable接口的自定義類,它是具體承載任務的對象。

Worker類圖

先看一下它的構造方法:

        Worker(Runnable firstTask) {
            setState(-1); // 在調用runWorker之前禁止中斷
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);  //創建一個線程
        }
  • 在構造函數內 首先設置 state=-1,現了簡單不可重入獨占鎖,state=0表示鎖未被獲取狀態,state=1表示鎖已被獲取狀態,設置狀態大小為-1,是為了避免線程在運行runWorker()方法之前被中斷
  • firstTask記錄該工作線程的第一個任務
  • thread是具體執行任務的線程

它的run方法直接調用runWorker,真正地執行線程就是在我們的runWorker 方法里:

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 允許中斷
        boolean completedAbruptly = true;
        try {
            //獲取當前任務,從隊列中獲取任務
            while (task != null || (task = getTask()) != null) {
                w.lock();
                …………    
                try {
                    //執行任務前做一些類似統計之類的事情
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //執行任務
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        // 執行任務完畢后干一些些事情
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    // 統計當前Worker 完成了多少個任務
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //執行清理工作
            processWorkerExit(w, completedAbruptly);
        }
    }

代碼看着多,其實砍掉枝蔓,最核心的點就是task.run() 讓線程跑起來。

獲取任務(getTask)

我們在上面的執行任務runWorker里看到,這么一句while (task != null || (task = getTask()) != null) ,執行的任務是要么當前傳入的firstTask,或者還可以通過getTask()獲取,這個getTask的核心目的就是從隊列中獲取任務

private Runnable getTask() {
        //poll()方法是否超時
        boolean timedOut = false; 
        //循環獲取
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 1.線程池未終止,且隊列為空,返回null
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }
            //工作線程數
            int wc = workerCountOf(c);

        
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            //2.判斷工作線程數是否超過最大線程數 && 超時判斷 && 工作線程數大於0或隊列為空
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //從任務隊列中獲取線程
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                //獲取成功
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

總結一下,Worker執行任務的模型如下[8]:

Worker執行任務模型

小結

到這,了解了executeworker的一些流程,可以說其實ThreadPoolExecutor 的實現就是一個生產消費模型。

當用戶添加任務到線程池時相當於生產者生產元素, workers 線程工作集中的線程直接執行任務或者從任務隊列里面獲取任務時則相當於消費者消費元素。

線程池生產消費模型

線程池生命周期

線程池狀態表示

ThreadPoolExecutor里定義了一些狀態,同時利用高低位的方式,讓ctl這個參數能夠保存狀態,又能保存線程數量,非常巧妙![6]

    //記錄線程池狀態和線程數量
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    //29
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // 線程池狀態
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

高3位表示狀態,低29位記錄線程數量:

高 3 位與低 29 位

線程池狀態流轉

線程池一共定義了五種狀態,來看看這些狀態是怎么流轉的[6]:

線程池狀態流轉

  • RUNNING:運行狀態,接受新的任務並且處理隊列中的任務。
  • SHUTDOWN:關閉狀態(調用了 shutdown 方法)。不接受新任務,,但是要處理隊列中的任務。
  • STOP:停止狀態(調用了 shutdownNow 方法)。不接受新任務,也不處理隊列中的任務,並且要中斷正在處理的任務。
  • TIDYING:所有的任務都已終止了,workerCount 為 0,線程池進入該狀態后會調terminated() 方法進入 TERMINATED 狀態。
  • TERMINATED:終止狀態,terminated() 方法調用結束后的狀態。

應用:打造健壯的線程池

合理地配置線程池

關於線程池的構造,我們需要注意兩個配置,線程池的大小任務隊列

線程池大小

關於線程池的大小,並沒有一個需要嚴格遵守的“金規鐵律”,按照任務性質,大概可以分為CPU密集型任務IO密集型任務混合型任務

  • CPU密集型任務:CPU密集型任務應配置盡可能小的線程,如配置Ncpu+1個線程的線程池。
  • IO密集型任務:IO密集型任務線程並不是一直在執行任務,則應配置盡可能多的線程,如2*Ncpu。
  • 混合型任務:混合型任務可以按需拆分成CPU密集型任務和IO密集型任務。

當然,這個只是建議,實際上具體怎么配置,還要結合事前評估和測試事中監控來確定一個大致的線程線程池大小。線程池大小也可以不用寫死,使用動態配置的方式,以便調整。

任務隊列

任務隊列一般建議使用有界隊列,無界隊列可能會出現隊列里任務無限堆積,導致內存溢出的異常。

線程池監控

[1]如果在系統中大量使用線程池,則有必要對線程池進行監控,方便在出現問題時,可以根據線程池的使用狀況快速定位問題。

可以通過線程池提供的參數和方法來監控線程池:

  • getActiveCount() :線程池中正在執行任務的線程數量
  • getCompletedTaskCount() :線程池已完成的任務數量,該值小於等於 taskCount
  • getCorePoolSize() :線程池的核心線程數量
  • getLargestPoolSize():線程池曾經創建過的最大線程數量。通過這個數據可以知道線程池是否滿過,也就是達到了 maximumPoolSize
  • getMaximumPoolSize():線程池的最大線程數量
  • getPoolSize() :線程池當前的線程數量
  • getTaskCount() :線程池已經執行的和未執行的任務總數

還可以通過擴展線程池來進行監控:

  • 通過繼承線程池來自定義線程池,重寫線程池的beforeExecute、afterExecute和terminated方法,
  • 也可以在任務執行前、執行后和線程池關閉前執行一些代碼來進行監控。例如,監控任務的平均執行時間、最大執行時間和最小執行時間等。

End

這篇文章從一個生活場景入手,一步步從實戰到原理來深入了解線程池。

但是你發現沒有,我們平時常說的所謂四種線程池在文章里沒有提及——當然是因為篇幅原因,下篇就安排線程池創建工具類Executors

線程池也是面試的重點戰區,面試又會問到哪些問題呢?

這些內容,都已經在路上。點贊關注不迷路,下篇見!



參考:

[1]. 《Java並發編程的藝術》

[2]. 《Java發編程實戰》

[3]. 講真 這次絕對讓你輕松學習線程池

[4]. 面試必備:Java線程池解析

[5]. 面試官問:“在項目中用過多線程嗎?”你就把這個案例講給他聽!

[6]. 小傅哥 《Java面經手冊》

[7]. 《Java並發編程之美》

[8]. Java線程池實現原理及其在美團業務中的實踐


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM