Netty核心概念(8)之Netty線程模型


1.前言

 第7節初步學習了一下Java原本的線程池是如何工作的,以及Future的為什么能夠達到其效果,這些知識對於理解本章有很大的幫助,不了解的可以先看上一節。

 Netty為什么會高效?回答就是良好的線程模型,和內存管理。在Java的NIO例子中就我將客戶端的操作單獨放在一個線程中處理了,這么做的原因在於如果將客戶端連接串起來,后來的連接就要等前一個處理完,當然這並不意味着多線程比單線程有優勢,而是在於每個客戶端都需要進行讀取准備好的緩存數據,再執行一些業務邏輯。如果業務邏輯耗時很久,那么順序執行的方式沒有多線程優勢大。另一個方面目前多核CPU很常見了,多線程是個不錯的選擇。這些在第一節就說明過,也提到過NIO並不是提升了IO操作的速度,而是減少了CPU的浪費時間,這些概念不能搞混。

 本節不涉及內存管理,只介紹相關的線程模型。

2.核心類

 上圖就是我們需要關注的體系內容了,主要從EventExecutorGroup開始往下看,再上層的父接口是JDK提供的並發包內的內容,基礎是線程池中可以執行周期任務的線程池服務。所以從這我們可以知道Netty可以實現周期任務,比如心跳檢測。接口定義下面將逐一介紹。

2.1 EventExecutorGroup

  isShuttingDown():是否正在關閉,或者是已經關閉。

  shutdownGracefully():優雅停機,等待所有執行中的任務執行完成,並不再接收新的任務。

  terminationFuture():返回一個該線程池管理的所有線程都terminated的時候觸發的future。

  shutdown():廢棄了的關閉方法,被shutdownGracefully取代。

  next():返回一個被該Group管理的EventExecutor。

  iterator():所有管理的EventExecutor的迭代器。

  submit():提交一個線程任務。

  schedule():周期執行一個任務。

 上述方法基本上是對周期線程池的一個封裝,但是擴展了EventExecuotr概念,即分了若干個小組,處理事件。另外一個比較實用的就是優雅停機了。

2.2 EventLoopGroup

 EventLoopGroup中的方法很少,其主要是和channel結合了,就多了一個將channel注冊到線程池中的方法。

2.3 EventExecutor

 EventExecutor繼承自EventExecutorGroup,這個之前也提到過該類,相當於Group中的一個子集。

  next():就是找group中下一個子集

  parent():就是所屬group

  inEventLoop():當前線程是否是在該子集中

  newXXX():這個是下一節內容,此處不介紹。

2.4 EventLoop

 該接口就一個方法,就是parent();EventLoop和EventLoopGroup與EventExecutor和EventExecutorGroup是一組相似的概念。了解這些就可以了。

3 實現細節

 EventLoop和EventLoopGroup的實現十分簡單,簡單看下就可以了,這里介紹幾個重要的實現類。

3.1 AbstractEventExecutor

 該類繼承自上節說過的AbstractExecutorService,其最重要的是execute方法未實現。該類是對AbstractExecutorService的一個進一步加工,添加了group的概念,和不同的Future創建方法。這里不要被之前的Java線程池模型所干擾,其不一定是線程池。回到上一節線程池的介紹,最終的樣子都是Execute方法決定的。

3.2 AbstractScheduledEventExecutor

 該類是對AbstractEventExecutor的一個進一步實現,其實現了周期任務的執行。原理是內部持有一個優先隊列ScheduledFutureTask。所有周期任務都添加到這個隊列中,也實現了取出周期任務的方法,但是該抽象類並沒有具體執行周期任務的實現。

3.3 SingleThreadEventExecutor

 該類是對AbstractScheduledEventExecutor的一個實現,其基本上是我們最終的一個EventLoop的雛形了,很多不同協議的EventLoop都是基於它實現的。

 雖然名字叫做單線程執行器,但是其不一定是單個線程。Executor默認使用的是ThreadPerTaskExecutor,其executor會為每一個任務創建一個線程並執行,當然你也可以傳入自己的executor。Queue使用的是LinkedBlockingQueue,無容量限制的任務隊列。其提供了添加任務到任務隊列,從任務隊列中獲取任務的方法。

    protected boolean runAllTasks() {
        assert inEventLoop();
        boolean fetchedAll;
        boolean ranAtLeastOne = false;

        do {
            fetchedAll = fetchFromScheduledTaskQueue();
            if (runAllTasksFrom(taskQueue)) {
                ranAtLeastOne = true;
            }
        } while (!fetchedAll); // keep on processing until we fetched all scheduled tasks.

        if (ranAtLeastOne) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
        }
        afterRunningAllTasks();
        return ranAtLeastOne;
    }

    protected final boolean runAllTasksFrom(Queue<Runnable> taskQueue) {
        Runnable task = pollTaskFrom(taskQueue);
        if (task == null) {
            return false;
        }
        for (;;) {
            safeExecute(task);
            task = pollTaskFrom(taskQueue);
            if (task == null) {
                return true;
            }
        }
    }

 執行過程如上:1.先獲取所有的周期任務,放入taskQueue;2.不斷的執行taskQueue中的任務;3.afterRunningAllTasks就是一個自由發揮的方法。safeExecute就是直接執行run方法。

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
                    success = true;
                } catch (Throwable t) {
                    logger.warn("Unexpected exception from an event executor: ", t);
                } finally {
                    for (;;) {
                        int oldState = state;
                        if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
                                SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
                            break;
                        }
                    }

                    // Check if confirmShutdown() was called at the end of the loop.
                    if (success && gracefulShutdownStartTime == 0) {
                        logger.error("Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
                                SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
                                "before run() implementation terminates.");
                    }

                    try {
                        // Run all remaining tasks and shutdown hooks.
                        for (;;) {
                            if (confirmShutdown()) {
                                break;
                            }
                        }
                    } finally {
                        try {
                            cleanup();
                        } finally {
                            STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
                            threadLock.release();
                            if (!taskQueue.isEmpty()) {
                                logger.warn(
                                        "An event executor terminated with " +
                                                "non-empty task queue (" + taskQueue.size() + ')');
                            }

                            terminationFuture.setSuccess(null);
                        }
                    }
                }
            }
        });
    }

 上面是該Executor初始化過程,run方法又是交給子類進行初始化了。

    public Future<?> shutdownGracefully(long quietPeriod, long timeout, TimeUnit unit) {
        if (quietPeriod < 0) {
            throw new IllegalArgumentException("quietPeriod: " + quietPeriod + " (expected >= 0)");
        }
        if (timeout < quietPeriod) {
            throw new IllegalArgumentException(
                    "timeout: " + timeout + " (expected >= quietPeriod (" + quietPeriod + "))");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        if (isShuttingDown()) {
            return terminationFuture();
        }

        boolean inEventLoop = inEventLoop();
        boolean wakeup;
        int oldState;
        for (;;) {
            if (isShuttingDown()) {
                return terminationFuture();
            }
            int newState;
            wakeup = true;
            oldState = state;
            if (inEventLoop) {
                newState = ST_SHUTTING_DOWN;
            } else {
                switch (oldState) {
                    case ST_NOT_STARTED:
                    case ST_STARTED:
                        newState = ST_SHUTTING_DOWN;
                        break;
                    default:
                        newState = oldState;
                        wakeup = false;
                }
            }
            if (STATE_UPDATER.compareAndSet(this, oldState, newState)) {
                break;
            }
        }
        gracefulShutdownQuietPeriod = unit.toNanos(quietPeriod);
        gracefulShutdownTimeout = unit.toNanos(timeout);

        if (oldState == ST_NOT_STARTED) {
            try {
                doStartThread();
            } catch (Throwable cause) {
                STATE_UPDATER.set(this, ST_TERMINATED);
                terminationFuture.tryFailure(cause);

                if (!(cause instanceof Exception)) {
                    // Also rethrow as it may be an OOME for example
                    PlatformDependent.throwException(cause);
                }
                return terminationFuture;
            }
        }

        if (wakeup) {
            wakeup(inEventLoop);
        }

        return terminationFuture();
    }

 上面是一個優雅停機的過程,改變該Executor的狀態成ST_SHUTTING_DOWN,這里要注意addTask的時候只有shutdown狀態才會拒絕,所以此時這里的邏輯還不會拒絕新任務添加,然后返回了一個terminationFuture,這里不做介紹。

3.4 SingleThreadEventLoop

 此類繼承自上面講解的SingleThreadEventExecutor,這里多了一個tailTask隊列,用於每次事件循環后置任務處理,暫且不管。重要的在於很早提到了register方法,將channel注冊到線程中。

    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }

    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }

 實際上就是生成了一個DefaultChannelPromise,將channel和線程綁定,最后都放入了unsafe對象中。

3.5 NioEventLoop

 上面講了一些雜亂無章的內容,這里借助NioEventLoop好好梳理一下整個設計流程。NioEventLoop繼承自SingleThreadEventLoop,先對之前的相關研究進行總結:

  1.EventExecutorGroup接口是繼承自Java的周期任務接口,是一個事件處理器組的概念,其相關方法有:

    是否正在關閉;優雅停機;獲取一個事件處理器;提交一個任務;提交一個周期任務

  2.EventExecutor接口是事件處理器,其繼承自EventExecutorGroup,目的並不是說每一個事件處理器都是一個事件處理器組,而是為了復用接口定義的方法。每個處理器都應該具備優雅停機,提交任務,判斷是否關閉的方法,其它方法有:

    獲取處理器組;獲取下一個處理器(覆蓋了父接口的next方法);判斷是否在事件循環內;創建Promise、ProgressivePromise、SucceededFuture、FailedFuture

  3.EventLoopGroup繼承自EventExecutorGroup,更新了父接口的含義,EventExecutor的定位是處理單個事件,group就是處理事件組了。EventLoop的定位是處理一個連接的生命周期過程中的周期事件,group是多個EventLoop的集合了。這里又有一個尷尬的地方,group按照定義本不需要定義其它方法,但是由於Server端的設計(之前說過服務端的channel也是一個線程),使用的是group,所以group必須承擔單個EventLoop的職責。最終添加了額外的方法:

    獲取下一個EventLoop;注冊channel;

  4.EventLoop,事件循環,其也是一個處理器,最終繼承自EventExecutor和EventLoopGroup,方法只有一個:

    獲取父事件循環組EventLoopGroup。

 上述接口光看名稱很容易陷入誤解,實際上定義是想將單個loop和group分離,但是實現上由於Server端包含一個服務端監聽連接線程,一個客戶端連接線程,其Group承擔了單個的職責,所以定義了一些本該由單個執行器處理的方法,又為了復用方法,導致loop繼承了group,這樣看起來怪怪的,接口理解起來就混亂了。結合上面的描述,再看一遍繼承圖就更清楚了:

 理解了上面的設定,我們再來看看客戶端的事件處理是如何設計的,即總結上訴抽象類做了哪些事情。

  1. AbstractEventExecutor:入參就一個parent,該類完成了一個基本處理:

    a.將next設置成自己(上面說過繼承的group,這個操作就和group區分開了)。

    b.優雅停機調用的是帶有超時的停機方案,超時為15秒

    c.覆蓋了Java提供的newTask包裝成FutureTask的方法,使用了自己的PromiseTask

    d.提供安全執行方法:safeExecute,直接調用的run方法

   該類是最基礎的一個抽象類,基本作用就是與group在定義混亂上做了一個區分。提供了執行器與Future關聯方法和一個基本的執行任務的方法。

  2.AbstractScheduledEventExecutor:入參也是一個parent,該類對AbstractEventExecutor未處理的周期任務提供了具體的完成方法:

    a.提供計算當前距服務啟動的時間

    b.提供存儲ScheduledFutureTask的優先隊列

    c.提供了取消所有周期任務的方法

    d.提供了獲取一個符合的周期任務的方法,要滿足時間,並獲取后移除

    e.提供了獲取距最近一個周期任務的時間多久

    f.提供了移除一個周期任務的方法

    g.提供添加周期任務的方法

   該類提供了周期任務執行的一些基本方法,涉及添加周期任務,移除,獲取等方法。

  3.SingleThreadEventExecutor:入參包括parent,addTaskWakesUp標志,maxPendingTasks最大任務隊列數(16和io.netty.eventexecutor.maxPendingTasks(default Integer.MAX_VALUE)參數更大的那個值),executor執行器(默認是ThreadPerTaskExecutor,每個任務創建一個線程執行),taskQueue任務隊列(默認是LinkedBlockingQueue),rejectedExecutionHandler拒絕任務的處理類(默認直接拋出RejectedExecutionException)。該類主要完成了一個單線程的EventExecutor的基本操作:

    a.創建一個taskQueue

    b.中斷線程

    c.從任務隊列中獲取一個任務,takeTask連同周期任務也會獲取

    d.添加任務到任務隊列

    e.移除任務隊列中指定任務

    f.運行所有任務,會先將周期任務存入taskQueue,再使用safeExecute方法執行任務

    g.實現了execute方法,會添加任務到任務隊列,如果當前線程不是事件循環線程,開啟一個線程。通過的就是持有的executor來開啟的線程任務。execute方法調用了run方法,該類沒有實現run方法。任務的添加都不是通過execute直接執行了,而是走的添加任務到taskQueue,由未實現的run線程來處理這些事件。

    h.優雅停機

   這樣就有了一個基礎的單線程模型了,開啟線程,保存,取出任務的方法都有了,只有在開啟線程中執行任務的run()方法還未實現。

  4.SingleThreadEventLoop:入參和SingleThreadEventExecutor一致,不同的是多了一個tailTasks。該類主要是針對netty自身的事件循環的定義來實現方法了:

    a.注冊channel,實際上是生成了一個DefaultChannelPromise對象,持有了channel,和運行該channel的EventExecutor,然后將該對象交給最底層的unsafe處理。

    b.添加一個事件周期結束后執行的尾任務tailTasks

    c.執行尾任務

    d.刪除指定尾任務

   該類就很簡單,沒有過多的內容,只是增加了一個每個事件周期后執行的任務而已。

 回顧完了,上面4個父類構建了一個基本的帶定時任務,普通任務,事件循環后置任務的EventLoop,每個channel綁定了一個線程執行器,通過DefaultChannelPromise持有兩者,最終交給Unsafe操作。子類只需要實現run方法,處理任務隊列中的任務。下面就是重頭戲NioEventLoop這個客戶端的線程是如何設計的了:

    NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
                 SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
        super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
        if (selectorProvider == null) {
            throw new NullPointerException("selectorProvider");
        }
        if (strategy == null) {
            throw new NullPointerException("selectStrategy");
        }
        provider = selectorProvider;
        final SelectorTuple selectorTuple = openSelector();
        selector = selectorTuple.selector;
        unwrappedSelector = selectorTuple.unwrappedSelector;
        selectStrategy = strategy;
    }

 調用的就是父類方法,不過在創建EventLoop的時候創建了selector,這個是NIO中也提到過的。該EventLoop是在Group中newChild創建的。

    protected void run() {
        for (;;) {
            try {
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                    default:
                }
                cancelledKeys = 0;
                needsToSelectAgain = false;
                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
            try {
                if (isShuttingDown()) {
                    closeAll();
                    if (confirmShutdown()) {
                        return;
                    }
                }
            } catch (Throwable t) {
                handleLoopException(t);
            }
        }
    }

 上面是我們需要關注的run方法,該方法被單獨的線程執行。通過一個策略來判斷執行哪個,默認的策略是任務隊列中有任務,select執行一下,返回當前的事件數,沒任務返回SelectStrategy.SELECT。簡單的說就是有任務就補充新到來的事件執行所有的任務,沒任務就執行新到的事件。先處理IO讀寫任務,再處理其他任務,ioRation設置的耗時比例是IO任務占一個執行周期的百分比,默認50,意思是IO執行了50秒,其他任務也會得到50秒的執行時間。后續操作就是獲取所有select的key,執行所有的任務了。這里就有一個判斷如果是停機狀態,就會closeAll(),之前說優雅停機的時候就是設置了一個這個標志,最后是在執行任務之后判斷。processSelectedKey環節都交給unsafe類完成了,這里就掛上了handler相關觸發,handler的執行也就說明都在該線程內了。

 上面的描述雖然把整個過程都關聯上了,但是最主要的問題還是混亂的:如何做到一個channel創建一個線程的?上面只是說明了channel和EventExecutor是綁定在DefaultChannelPromise並交給了Unsafe類,並沒有看到是如何創建線程的。而且另一個問題在於,processSelectedKey是選擇了所有的key,這不是所有的channel共享了一個線程嗎?

 要解決該問題要回到Bootstrap的channel建立過程:initAndRegister()方法中,通過channelFactory創建了一個channel對象,而后ChannelFuture regFuture = config().group().register(channel);主要就是觀察register方法,該類是設置的線程池NioEventLoopGroup提供的方法,其繼承的是MultithreadEventLoopGroup,是調用了next方法獲取的EventLoop,最后接上上面channel和eventloop綁定的內容。next中獲取的EventLoop早在類初始化的時候就生成了,在構造方法中MultithreadEventExecutorGroup,children就是Eventloop,next不過是挑選了一個線程池而已,默認數量是CPU核數的2倍。這個也就是前面說的,線程數量不是越多越好。

 這樣我們明白了,客戶端注冊的時候是分配了一個線程給它。客戶端並不需要多線程,但是還是繼續看后面的內容:AbstractUnsafe的register方法給出了相關解答。channel持有了該EventLoop,此時線程還是未運行狀態,只是有了這么一個對象而已。

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }

 這里execute方法,這個是之前我們講過的一個方法,其會判斷當前線程是否是該channel的線程,目前都沒有初始化線程肯定不是,將任務放入任務隊列,開啟一個線程(線程處於未啟動狀態才會開啟,否則不執行),這個設計使得execute的時候只會開啟一次線程,而所有的任務都會被放入任務隊列,由這個線程執行。再回到run方法,這個是channel線程執行的方法,目前每個NioEventLoop都執行了Select等方法啊,這不是處理了所有的channel的工作嗎?並沒有達到一個channel生命周期控制在一個線程中啊。

 這里實際上是JAVA NIO的例子帶來的誤解,認為必須一個線程來使用select,然后遍歷事件分配線程給channel執行讀寫操作。實際上在Netty中不一樣,Netty所有線程都在執行select並獲取相關事件,但是實際上其並沒有執行所有的事件。

    private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                return;
            }
           
            if (eventLoop != this || eventLoop == null) {
                return;
            }
            
            unsafe.close(unsafe.voidPromise());
            return;
        }

        try {
            int readyOps = k.readyOps();
            
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                ch.unsafe().forceFlush();
            }

            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }

 看這里,if (eventLoop != this || eventLoop == null) ,如果channel的eventLoop不是當前的eventLoop就不執行,這個就一小段代碼,但是就直接決定了EventLoop只對自己所綁定的channel感興趣,最終達到了只處理自己相關的任務的目的。

3.6 NioEventLoopGroup

 該類沒有太多需要說明的內容,前面已經講解了很多。

  1.AbstractEventExecutorGroup,實現了基本的方法,所有方法都是調用next()即挑選出一個線程執行器完成的。

  2.MultithreadEventExecutorGroup,實現了一個基本的線程池,持有子線程,主要工作是初始化了子線程數組,提供了next方法。

  3.MultithreadEventLoopGroup,實現了Netty的channel線程池,提供了register方法,雖然也是調用next的register方法。

  4.NioEventLoopGroup,實現了創建子線程數組時newChild方法,所有的EventLoop都是這個方法創建的。

 group就是兩個重點,一個next()挑選事件執行器,一個newChild()創建線程執行器對象。

4.總結

 本節耗費了大量的篇幅講解了Netty的線程模型的設計思路,主要看點如下:

  1.解釋了EventLoop、EventLoopGroup、EventExecutor、EventExecutorGroup這四者之間的關系,和這么復雜混亂的繼承關系的原因。

  2.解釋了group是如何初始化線程,並綁定channel的,next().register()

  3.解釋了eventloop為什么和channel綁定了,execute()開啟線程,以及每個eventloop都在獲取IO事件,但是通過channel的eventloop是否等於當前過濾掉其它的事件,只處理自己綁定的channel事件。

 由於每個線程都在獲取IO事件,所以這段邏輯變的非常復雜,這也就是我之前說的寫好很IO很困難。

 最后附上一張對前面所有內容總結的一個圖,清醒一下頭腦,從復雜的代碼中脫身:

 這個圖就是一個基本的執行過程圖了,可能有遺漏的地方,但是大體情況如圖所示。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM