解讀基於dubbo 2.6.9版本。
前導問題
- dubbo的線程池怎么初始化的?
- dubbo的線程池是怎么驅動的?
- dubbo的consumer&provider的線程池有什么區別?
- 線程池有什么問題?
dubbo線程池的初始化
provider端的線程池,是在初始化server時,在包裝channelHandler時生成的,默認走的fixed threadpool。
public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; String componentKey; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { ...... } else { componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } }
consumer端的線程池,是在初始化client時,通過與指定的provider url建立鏈接時創建的,默認是cached threadpool。
如下所示:
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) { url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME); url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL); return ChannelHandlers.wrap(handler, url); }
在初始化時,會根據url中share.threadpool參數的值來選擇動態生成,還是選擇共享的線程池實例。
public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; String componentKey; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; if (url.getParameter(SHARE_EXECUTOR_KEY, false)) { ExecutorService cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT); if (cExecutor == null) { cExecutor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); dataStore.put(componentKey, SHARED_CONSUMER_EXECUTOR_PORT, cExecutor); cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT); } executor = cExecutor; } else { executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } } else {
....... } }
線程池的驅動
無論是provider,還是consumer,其線程池主要用於交換層消息的處理。
這部分邏輯,主要在AllChannelHandler中。其接收所有類型的交換層事件,並將之轉換為ChannelEventRunnable事件,提交到對應的線程池來執行。
@Override public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } }
ChannelEventRunnable的處理邏輯如下:
@Override public void run() { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } }
根據類型委托給具體的handler。該handler的定義來自最外層DubboProtocol中實現的ExchangeHandler實現。
線程池的區別
類型上:
provider的線程池默認是fixed類型
consumer的線程池默認是cached類型。
數量上:
provider的線程池與實例發布服務url的地址相關,默認一個地址一個server實例,一個server實例對應一個線程池。
consumer的線程池則與實例引用服務的url數量有關,消費者會與每一個provider的url建立一定數量的鏈接,每一個鏈接中,根據share.threadpool的值決定是新建線程池還是使用共享線程池。故作為consumer的實例,其consumer的線程池數量可能非常可觀。
線程池的問題
第一個問題:
consumer的線程池是cached類型,在單機qps比較高時,也會意味着該線程池會創建可觀的線程數用於處理provider返回的響應。這種情況下,共享線程池並不一定是解決問題的好辦法。獨享線程池可以用於資源隔離,避免不同服務之間侵占調度資源。但共享勢必會影響這一點。最好的辦法,是水平擴容,避免單機負載過高。同時,針對吞吐量確定較高的服務,可以切換類型為fixed,降低創建銷毀線程的成本。
第二個問題:
在spring環境下,線程池的初始化源頭,都是來自於spring的事件觸發。
在ServiceBean&ReferenceBean中,會監聽spring的ContextRefreshEvent事件進而觸發服務的發布&引用。
消息通知串行下,這個過程不會有問題,但是消息是可以並行通知的。此時多個服務的發布&引用過程可能並行,故線程池的初始化可能並行。而在初始化部分未做同步保護,這種情況下會導致並發問題。