dubbo學習(一)線程池


 
 

解讀基於dubbo 2.6.9版本。

前導問題

  1. dubbo的線程池怎么初始化的?
  2. dubbo的線程池是怎么驅動的?
  3. dubbo的consumer&provider的線程池有什么區別?
  4. 線程池有什么問題?

dubbo線程池的初始化

provider端的線程池,是在初始化server時,在包裝channelHandler時生成的,默認走的fixed threadpool。

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;

        String componentKey;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
           ......
        } else {
            componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
            dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
        }
    }

consumer端的線程池,是在初始化client時,通過與指定的provider url建立鏈接時創建的,默認是cached threadpool。

如下所示:

    protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) {
        url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
        url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
        return ChannelHandlers.wrap(handler, url);
    }

在初始化時,會根據url中share.threadpool參數的值來選擇動態生成,還是選擇共享的線程池實例。

    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;

        String componentKey;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
            if (url.getParameter(SHARE_EXECUTOR_KEY, false)) {
                ExecutorService cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT);
                if (cExecutor == null) {
                    cExecutor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
                    dataStore.put(componentKey, SHARED_CONSUMER_EXECUTOR_PORT, cExecutor);
                    cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT);
                }
                executor = cExecutor;
            } else {
                executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
                dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
            }
        } else {
....... } }

 

線程池的驅動

無論是provider,還是consumer,其線程池主要用於交換層消息的處理。

這部分邏輯,主要在AllChannelHandler中。其接收所有類型的交換層事件,並將之轉換為ChannelEventRunnable事件,提交到對應的線程池來執行。

    @Override
    public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

ChannelEventRunnable的處理邏輯如下:

    @Override
    public void run() {
        if (state == ChannelState.RECEIVED) {
            try {
                handler.received(channel, message);
            } catch (Exception e) {
                logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                        + ", message is " + message, e);
            }
        } else {
            switch (state) {
            case CONNECTED:
                try {
                    handler.connected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case DISCONNECTED:
                try {
                    handler.disconnected(channel);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e);
                }
                break;
            case SENT:
                try {
                    handler.sent(channel, message);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is " + message, e);
                }
            case CAUGHT:
                try {
                    handler.caught(channel, exception);
                } catch (Exception e) {
                    logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel
                            + ", message is: " + message + ", exception is " + exception, e);
                }
                break;
            default:
                logger.warn("unknown state: " + state + ", message is " + message);
            }
        }

    }

根據類型委托給具體的handler。該handler的定義來自最外層DubboProtocol中實現的ExchangeHandler實現。

 

線程池的區別

類型上:

provider的線程池默認是fixed類型

consumer的線程池默認是cached類型。

數量上:

provider的線程池與實例發布服務url的地址相關,默認一個地址一個server實例,一個server實例對應一個線程池。

consumer的線程池則與實例引用服務的url數量有關,消費者會與每一個provider的url建立一定數量的鏈接,每一個鏈接中,根據share.threadpool的值決定是新建線程池還是使用共享線程池。故作為consumer的實例,其consumer的線程池數量可能非常可觀。

 

線程池的問題

第一個問題:

consumer的線程池是cached類型,在單機qps比較高時,也會意味着該線程池會創建可觀的線程數用於處理provider返回的響應。這種情況下,共享線程池並不一定是解決問題的好辦法。獨享線程池可以用於資源隔離,避免不同服務之間侵占調度資源。但共享勢必會影響這一點。最好的辦法,是水平擴容,避免單機負載過高。同時,針對吞吐量確定較高的服務,可以切換類型為fixed,降低創建銷毀線程的成本。

第二個問題:

在spring環境下,線程池的初始化源頭,都是來自於spring的事件觸發。

在ServiceBean&ReferenceBean中,會監聽spring的ContextRefreshEvent事件進而觸發服務的發布&引用。

消息通知串行下,這個過程不會有問題,但是消息是可以並行通知的。此時多個服務的發布&引用過程可能並行,故線程池的初始化可能並行。而在初始化部分未做同步保護,這種情況下會導致並發問題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM