Netty核心概念(7)之Java線程池


1.前言

 本章本來要講解Netty的線程模型的,但是由於其是基於Java線程池設計而封裝的,所以我們先詳細學習一下Java中的線程池的設計。之前也說過Netty5被放棄的原因之一就是forkjoin結構比較復雜,forkjoin也是JDK提供的一個基本線程模型,這里就不進行介紹。本節涉及知識點很多,可能有誤,請對照JDK源碼進行學習。

 本章涉及的概念有Callable,Future,ExecutorService等,所有的類都在java.util.concurrent包下。

2.相關概念

2.1 Callable

 學過Java的人都知道,在Java中運行線程任務有兩個方法,一個是繼承Thread類,覆寫run方法,另一個就是實現Runnable接口,實現run方法。但是這兩種方法都有一個尷尬的地方,其沒有返回值,而通常我們使用異步模型,需要知道異步的任務的執行結果,雖然方法有很多,但是最簡單的還是重新制造一個帶返回值的接口,這個接口就是Callable。此接口就一個call()方法,返回一個結果。此外Thread只接受Runnable類,所以該接口是專門給線程池模型使用的。

2.2 Future

 Future顧名思義,這個類的實例是從未來而來。異步模型,在主線程中提交任務到線程池中運行,得到執行后的結果,問題是主線程如何獲取這個結果呢?這個時候就要靠這個來自未來的實例了。提交任務給異步線程時,會立刻得到一個Future對象,這個對象可以獲得任務執行完成后的結果。明白了這個設定,就能很好的理解接口方法了。

  cancel():取消掉這個任務,true時意味着運行中中斷程序。實際上實現是辦不到的,這個要自己實現的call方法做相應的取消實現,所以其正在的含義是改變Thread的狀態成中斷狀態,具體中斷處理邏輯要自己寫在call或者run方法中。false則只是取消任務,不能對開始了的任務進行操作。

  isCancelled():這個任務是否被取消

  isDone():這個任務是否被完成

  get():獲取任務的執行結果,沒完成就阻塞獲取線程,在設置的時間內沒獲取到拋出TimeoutException。

2.3 ExecutorService

 Java設計了很多種類型的線程模型,開發者可能擴展更多,為了有個統一的標准,就有了這個接口。

  shutdown():關閉模型中所有的線程

  isShutdown():是否已經關閉

  isTerminated():所有的任務是否在shutdown之后終結了。

  awaitTermination():等待任務結束。

  submit():提交一個任務

  invokeAll():執行所有的任務

  invokeAny():執行所有的任務,其中一個完成就返回結果。

 ExecutorService繼承自Executor,其還有一個方法就是execute執行一個runnable任務。從這些接口方法,我們也可以清楚的認識到該類是對線程模型的一個基本使用的定義,提供了一個運用標准。

2.4 Executors

 該類是一個具體的服務層的類了,給出所有JDK實現的線程模型的實例,方便開發者調用。該類的所有方法都是靜態方法,靜態工廠模式。

 給出的方法有很多,但實際上只有三種類型的模型而已:1.線程池ThreadPoolExecutor;2.工作竊取算法實現的ForkJoinPool;3.周期任務線程池ScheduledThreadPoolExecutor;下面對這些方法進行簡要說明。這里先簡單介紹一下ThreadPoolExecutor的相關參數含義,才能更好的理解Executors提供的不同線程池的含義,具體參數如下:

  corePoolSize:核心線程池的大小,可以理解為線程池中最小的線程數量。

  maximumPoolSize:最大的線程數量

  keepAliveTime:線程池線程數量超過core的設置后,線程最大的空閑時間,超過這個值且數量仍大於core會被銷毀。

  unit:keepAliveTime的單位

  workQueue:任務的存儲隊列

  threadFactory:創建線程時的工廠方法

  handler:執行阻塞時處理類,當線程數達到最大,任務存儲隊列也滿了,新提交的任務會觸發該方法。

 現在我們來看看幾種和ThreadPoolExecutor相關的方法:

  1.newFixedThreadPool(nThreads):

return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());

  從參數可以看出,FixedThreadPool是一個可重用固定線程數的線程池,由於固定了線程數量,所以keepAliveTime參數無實際作用。其任務在LinkedBlockingQueue中,這是一個無邊界的隊列,所以handler無作用。

  2.newSingleThreadExecutor():

return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));

  可以看出,SingleThreadExecutor和FixedThreadPool參數基本一樣,就是線程數量為1,keepAliveTime和handler參數不起作用。

  3.newCachedThreadPool():

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());

  CachedThreadPool是一個無限制的線程池,其線程空閑1分鍾就會被回收,SynchronousQueue在之前的博客中也介紹過:這里。這是個無容量的阻塞隊列,有點像接力棒。由於線程池大小沒有限制,所以實際上handler對該類也沒有作用。

  4.newScheduledThreadPool(int corePoolSize):

return new ScheduledThreadPoolExecutor(corePoolSize);

super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());

  定時任務線程池是繼承自ThreadPoolExecutor的,其能做到定時執行,歸功於DelayedWorkQueue。這個隊列功能和DelayQueue類似:這里,設計實現上與PriorityBlockingQueue隊列相似:這里。這個線程池也是無邊界的。

  5.newSingleThreadScheduledExecutor():

return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));

  這個和newSingleThreadExecutor一樣,就是單線程的周期任務。

  6.newWorkStealingPool():

return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);

  ForkJoin的核心思想就是:將一個大任務切割成若干個小任務同時進行,最后等所有任務完成,合並任務結果。工作竊取算法是其的一種優化,比如有個線程執行完了其任務,另一個線程還有N個任務,這樣等待就比較耗時,所以空閑的線程會從忙碌的線程處理的任務鏈尾端拿任務進行執行。

 其它的不是線程模型相關的內容,值得一提的就只有RunnableAdapter了。

    static final class RunnableAdapter<T> implements Callable<T> {
        final Runnable task;
        final T result;
        RunnableAdapter(Runnable task, T result) {
            this.task = task;
            this.result = result;
        }
        public T call() {
            task.run();
            return result;
        }
    }

 實現簡單易懂,不再描述。

3. 實現細節

 上面講了線程池的一些基本概念,這里將對其實現過程進行解析,理解更深刻。上面的概念有兩個地方沒有說明清楚,一個是Future為什么能達到這種效果。另一個就是callable接口為什么能返回結果,我們知道Thread只接收Runnable接口實例,該方法即沒有入參,又沒有返回值,這是怎么做到的?帶着這兩個疑問,我們對ThreadPoolExecutor進行詳細的介紹。

3.1 FutureTask

 這個類是解決上訴問題的關鍵。其實現了Future和Runnable兩個接口,是線程池使用的具體任務對象,控制着任務的相關執行。

  state:任務的狀態。后面是其7種狀態定義。

  callable:需要執行的任務,如果是runnable會變成callable的適配器對象。

  outcome:執行結果

  runner:執行的線程

  waiters:等待任務執行完成的線程

    public boolean isCancelled() {
        return state >= CANCELLED;
    }

    public boolean isDone() {
        return state != NEW;
    }

 上面很清楚,狀態就是由該類控制的。

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }


    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                LockSupport.parkNanos(this, nanos);
            }
            else
                LockSupport.park(this);
        }
    }

 這里就get方法的邏輯:1.判斷線程是否中斷,中斷移除無效等待節點;2.如果完成了,返回;3.如果進行中,讓出時間片;4.如果無等待者,將此線程設置成waiter;5.如果沒添加到task的等待節點中,添加進去;6.如果有超時設置,超時了移除等待,否則等待。7.阻塞等待執行完成。

 簡要的說一下上述過程就是根據state判斷是立刻返回還是阻塞等待,等待后將其設置成等待節點。最后我們需要關心的就是run方法了。

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

 判斷狀態,執行callable的call方法,獲取返回值。這里可能會有個疑問,runnable接口是怎么返回結果的呢?答案是runnable方法不能返回結果,但是可以通過RunnableAdapter傳入一個默認的結果,在runnable任務結束時就能獲取這個結果了。上面的問題2實際上要說明的就是沒有其他方法對一個既沒有入參,又沒有出參的方法進行返回,最好的方法就是對其包裝一層,通過包裝的局部變量返回執行結果。FutureTask就是這么做的,最后看下任務完成后執行的方法。

    private void finishCompletion() {
        // assert state > COMPLETING;
        for (WaitNode q; (q = waiters) != null;) {
            if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
                for (;;) {
                    Thread t = q.thread;
                    if (t != null) {
                        q.thread = null;
                        LockSupport.unpark(t);
                    }
                    WaitNode next = q.next;
                    if (next == null)
                        break;
                    q.next = null; // unlink to help gc
                    q = next;
                }
                break;
            }
        }

        done();

        callable = null;        // to reduce footprint
    }

 這里就能夠看出,在get操作任務未完成時,get線程阻塞,並且將阻塞線程添加到任務的waiter隊列中。任務實際完成后,會釋放鎖,這就達到通知線程任務完成的效果了。

3.2 AbstractExecutorService

 線程池的任務定義,通知任務完成的實現原理等了解完了,現在再來看下線程池是如何實現的吧。該類為抽象父類,沒有做核心的操作,關鍵的execute方法沒有實現,實現了其它通用邏輯的方法。簡略看下就可以明白了:

    public <T> Future<T> submit(Callable<T> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<T> ftask = newTaskFor(task);
        execute(ftask);
        return ftask;
    }


    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll();
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (int i = 0, size = futures.size(); i < size; i++)
                futures.get(i).cancel(true);
        }
    }

 這里就能看出提交的任務都被包裝成FutureTask任務了。invokeAll方法就是包裝所有的任務,遍歷調用execute方法。invokeAny最終也是相似的操作。

3.3 ThreadPoolExecutor

 抽象父類的作用不多,這里我們看ThreadPoolExecutor是如何管理線程池,進行調度的吧。ThreadPoolExecutor的策略如下:

  1、當有一個新任務提交時,只要當前線程數少於corePoolSize,哪怕有空閑的線程,其也會創建一個新的線程給新任務使用。

  2、corePoolSize<X<maxSize的時候,只有隊列滿了才會創建線程。

  3、只有有任務到來時才會創建線程,哪怕是corePoolSize也不是立刻初始化。

  4、創建線程失敗,執行器也會繼續下去,但是任務不會被執行

  5、根據上面的說明,無界的隊列會導致線程數達到corePoolSize就不再增長,因為隊列永遠不會滿

  6、有界隊列可以防止資源耗盡,但是更難正確和控制。

  workQueue:任務隊列

  mainLock:主要的鎖

  worker:工作者,運行中線程

  termination:終止信號

  其他的就不一一介紹了,都是之前提到過的參數。這里直接看最重要的execute方法實現。

    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

 源碼注釋說明了這段操作,一共有3個步驟:

  1.如果運行中的線程數小於corePoolSize的數量,創建一個新的線程,該任務作為其第一個任務。addWorker會檢查池的狀態和工作線程數量,不滿足條件就會返回false。

  2.如果一個任務成功的放入隊列,再次檢查狀態,如果池停止了,就會拒絕該任務,且移除該任務,否則觸發方法,請求分配worker。

  3.如果不能放入隊列,嘗試創建一個線程,如果失敗,就拒絕該任務。

 可以說思路其實是很清晰的,但是實際上操作卻不容易,具體細節就不說明了。接下來看下Worker是如何工作的:

        Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

    final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);
        }
    }

 worker也是一個runnable方法,不同的是其繼承了AbstractQueuedSynchronizer,該類是Java並發包的一個重點。創建worker對象的時候就初始化了線程設置,只等addWorker()方法調用start()。運行過程判斷有沒有任務,有任務鎖住線程,然后運行任務。這里都有不同的擴展點給開發者進行業務擴展。完成任務后,在運行中的線程集合中,移除它。這里可能會疑惑,運行完后線程被移除了,怎么繼續執行的任務。

  首先,runWorker中task為null的時候會getTask,從任務隊列中獲取任務。

  其次,execute方法判斷了當前池的狀態決定是否添加worker。

  然后,最重要的是runWorker方法要結束必須是沒有任務,那里有個while循環。

  最后,該worker被移除,檢測corePoolSize和線程池當前狀態決定是否添加線程。

 這里又可能會有疑惑了,不是說好的空閑超時才移除嗎?這段邏輯其實就在getTask中,如果getTask在指定時間內都沒有獲取任務,不就意味着線程空閑了這么久嗎,所以執行到最后直接移除就可以了。最后我們看下Java的線程池是如何關閉的,這對后面理解Netty的優雅停機有幫助。

    public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers();
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
    }

 先解決權限問題,再設置狀態為shutdown,這樣保證新任務不會再提交進線程池。接着對所有運行中的worker發起中斷信號,最后嘗試停機,喚醒awaitTermination的線程。所以說所謂的shutdown,並不保證任務執行完畢,只是阻止新任務進來,還需要任務自身配合中斷信號。這個是必然的,因為沒人知道你的任務干了什么,萬一是個死循環,不直接銷毀線程,如何能停止它?

 最后我們聊聊AbstractQueuedSynchronizer,每一個worker都是該類實例,這個類究竟達成了一個怎樣的目的?詳細介紹看這篇文章:這里。下面是Worker中實現的相關代碼:

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }    

 根據文章中所介紹的:

  • tryAcquire:獨占式獲取同步狀態。獲取時設置狀態為1。
  • isHeldExclusively: AQS是否被當前線程獨占。不為0獨占。
  • tryRelease:獨占式釋放同步狀態。設置狀態為0,釋放鎖。

 所以worker類實際上是一個排他鎖,只允許一個線程操作。

4 后記

 本章主要介紹了Java線程池的執行過程,Future的實現方式,整個過程可用下圖表示:

 再提一下FutureTask,其call執行完就會解鎖,get就能知道是否執行完畢了。大體的過程就是上圖,實現細節就不在此討論。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM