Java並發編程-Executor框架集


Executor框架集對線程調度進行了封裝,將任務提交和任務執行解耦。

它提供了線程生命周期調度的所有方法,大大簡化了線程調度和同步的門檻。

Executor框架集的核心類圖如下:


從上往下,可以很清晰的看出框架集的各個類,以及它們之間的關系:
Executor,是一個可以提交可執行(Runnable)任務的Object,這個接口解耦了任務提交和執行細節(線程使用、調度等),Executor主要用來替代顯示的創建和運行線程;
ExecutorService提供了異步的管理一個或多個線程終止、執行過程(Future)的方法;
AbstractExecutorService提供了ExecutorService的一個默認實現,這個類通過RunnableFuture(實現類FutureTask)實現了submit, invokeAny, invokeAll幾個方法;
ThreadPoolExecutor是ExecutorService的一個可配置的線程池實現,它的兩個重要的配置參數:corePoolSize(線程池的基本大小,即在沒有任務需要執行的時候線程池的大小,並且只有在工作隊列滿了的情況下才會創建超出這個數量的線程。這里需要注意的是:在剛剛創建ThreadPoolExecutor的時候,線程並不會立即啟動,而是要等到有任務提交時才會啟動,除非調用了prestartCoreThread/prestartAllCoreThreads事先啟動核心線程。再考慮到keepAliveTime和allowCoreThreadTimeOut超時參數的影響,所以沒有任務需要執行的時候,線程池的大小不一定是corePoolSize。), maximumPoolSize(線程池中允許的最大線程數,線程池中的當前線程數目不會超過該值。如果隊列中任務已滿,並且當前線程個數小於maximumPoolSize,那么會創建新的線程來執行任務。這里值得一提的是largestPoolSize,該變量記錄了線程池在整個生命周期中曾經出現的最大線程個數。為什么說是曾經呢?因為線程池創建之后,可以調用setMaximumPoolSize() 改變運行的最大線程的數目。);
ScheduledExecutorService 是添加了調度特性(延遲或者定時執行)的ExecutorService;
ScheduledThreadPoolExecutor是具有調度特性的ExecutorService的池化實現;
Executors是一個Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, Callable的工具類,它能滿足大部分的日常應用場景。使用它創建線程池:

接下來,分析下ThreadPoolExecutor的實現。

ThreadPoolExecutor的作者Doug Lea,他將workerCount(線程池當前有效線程數)和runState(線程池當前所處狀態)放置到一個原子變量ctl(AtomicInteger)上,原子變量高三位保存runStatus,低29位保存workerCount。因此,ThreadPoolExecutor(JDK8)支持的最大線程數為2^29-1。線程池狀態有以下五中:

   RUNNING(正常運行,-1):  Accept new tasks and process queued tasks
   SHUTDOWN(關閉,0): Don't accept new tasks, but process queued tasks
   STOP(停止,1):     Don't accept new tasks, don't process queued tasks, and interrupt in-progress tasks
   TIDYING(整理中,2):  All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
   TERMINATED(終結,3): terminated() has completed

線程池狀態的變遷,並不嚴格按照數字增加變化:

    RUNNING -> SHUTDOWN
        On invocation of shutdown(), perhaps implicitly in finalize()
     (RUNNING or SHUTDOWN) -> STOP
        On invocation of shutdownNow()
    SHUTDOWN -> TIDYING
        When both queue and pool are empty
    STOP -> TIDYING
        When pool is empty
    TIDYING -> TERMINATED
        When the terminated() hook method has completed
     Threads waiting in awaitTermination() will return when the
     state reaches TERMINATED.

當前工作線程計數以及線程池的狀態變遷,通過ctl原子變量的CAS操作完成。

ThreadPoolExecutor會將所有提交的任務放置到workQueue中,它是一個BlockingQueue.

所有的工作線程集(workers,HashSet<Worker>)的獲取和預定,使用一個final的ReentrantLock(mainLock)控制,還有mainLock上的等待條件termination(Condition)。

largestPoolSize(最大池容量),completedTaskCount(已完成線程計數)等私有變量,通過mainLock控制訪問。

threadFactory(volatile,線程工廠,工廠模式的典型運用),所有的線程通過addWorker方法,間接調用這個工廠創建,以下為Executors中的DefaultThreadFactory類的默認構造方法(namePrefix非常熟悉)。

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }

keepAliveTime,線程等待工作的空閑時間(當allowCoreThreadTimeOut設置或者工作線程workerCount大於corePoolSize時,會超時退出,否則線程講一直運行)

allowCoreThreadTimeOut,允許核心線程超時退出(默認為false)

corePoolSize,核心線程數目(如果沒有設置allowCoreThreadTimeOut,它將是線程池中,最少活躍的線程數)

 類Worker主要維護線程執行任務時的狀態打斷和其它功能預定,它通過繼承AbstractQueuedSynchronizer來簡化任務執行時鎖的獲取和釋放,Worker沒有使用可重入鎖,而是實現了一個互斥鎖,因為我們不想工作線程訪問線程池控制變量時再次獲得鎖(如setCorePoolSize)。

接下來,我們看看addWorker方法,通過指定參數,它允許以核心線程運行任務。addWorker會首先檢查當前的線程池狀態,當前運行的線程數是否允許(添加新worker),前兩項檢查通過后,會嘗試設置ctl中的線程計數(因為活躍工作線程數存儲在ctl的低位,因此,直接自增ctl便可)。線程池計數器設置后,剩下的就是添並啟動Worker,Worker集合由mainLock控制,所有workers集的修改都是由mainLock控制的。只有當集合添加成功並且新添加的線程啟動成功時,線程池計數器的設置生效,否則,計數器將回退(由addWorkerFailed方法執行)。

 private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

 只有當新添加的worker線程啟動成功時,addWorker返回成功(此時worker線程啟動start(),它的run方法中調用了runWorker方法),其它情況返回失敗。

 最后看一個方法,runWorker 方法:worker線程,不斷從BlockQueu中取出任務,執行它並處理執行過程中的各種情況(如線程池的狀態變化,已執行計數)。

  final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

 runWorker方法中,直接調用了task的run()方法,大致交互過程。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM