解读基于dubbo 2.6.9版本。
前导问题
- dubbo的线程池怎么初始化的?
- dubbo的线程池是怎么驱动的?
- dubbo的consumer&provider的线程池有什么区别?
- 线程池有什么问题?
dubbo线程池的初始化
provider端的线程池,是在初始化server时,在包装channelHandler时生成的,默认走的fixed threadpool。
public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; String componentKey; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { ...... } else { componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } }
consumer端的线程池,是在初始化client时,通过与指定的provider url建立链接时创建的,默认是cached threadpool。
如下所示:
protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler) { url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME); url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL); return ChannelHandlers.wrap(handler, url); }
在初始化时,会根据url中share.threadpool参数的值来选择动态生成,还是选择共享的线程池实例。
public WrappedChannelHandler(ChannelHandler handler, URL url) { this.handler = handler; this.url = url; String componentKey; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; if (url.getParameter(SHARE_EXECUTOR_KEY, false)) { ExecutorService cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT); if (cExecutor == null) { cExecutor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); dataStore.put(componentKey, SHARED_CONSUMER_EXECUTOR_PORT, cExecutor); cExecutor = (ExecutorService) dataStore.get(componentKey, SHARED_CONSUMER_EXECUTOR_PORT); } executor = cExecutor; } else { executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } } else {
....... } }
线程池的驱动
无论是provider,还是consumer,其线程池主要用于交换层消息的处理。
这部分逻辑,主要在AllChannelHandler中。其接收所有类型的交换层事件,并将之转换为ChannelEventRunnable事件,提交到对应的线程池来执行。
@Override public void connected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED)); } catch (Throwable t) { throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t); } } @Override public void disconnected(Channel channel) throws RemotingException { ExecutorService cexecutor = getExecutorService(); try { cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED)); } catch (Throwable t) { throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t); } }
ChannelEventRunnable的处理逻辑如下:
@Override public void run() { if (state == ChannelState.RECEIVED) { try { handler.received(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } } else { switch (state) { case CONNECTED: try { handler.connected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case DISCONNECTED: try { handler.disconnected(channel); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel, e); } break; case SENT: try { handler.sent(channel, message); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is " + message, e); } case CAUGHT: try { handler.caught(channel, exception); } catch (Exception e) { logger.warn("ChannelEventRunnable handle " + state + " operation error, channel is " + channel + ", message is: " + message + ", exception is " + exception, e); } break; default: logger.warn("unknown state: " + state + ", message is " + message); } } }
根据类型委托给具体的handler。该handler的定义来自最外层DubboProtocol中实现的ExchangeHandler实现。
线程池的区别
类型上:
provider的线程池默认是fixed类型
consumer的线程池默认是cached类型。
数量上:
provider的线程池与实例发布服务url的地址相关,默认一个地址一个server实例,一个server实例对应一个线程池。
consumer的线程池则与实例引用服务的url数量有关,消费者会与每一个provider的url建立一定数量的链接,每一个链接中,根据share.threadpool的值决定是新建线程池还是使用共享线程池。故作为consumer的实例,其consumer的线程池数量可能非常可观。
线程池的问题
第一个问题:
consumer的线程池是cached类型,在单机qps比较高时,也会意味着该线程池会创建可观的线程数用于处理provider返回的响应。这种情况下,共享线程池并不一定是解决问题的好办法。独享线程池可以用于资源隔离,避免不同服务之间侵占调度资源。但共享势必会影响这一点。最好的办法,是水平扩容,避免单机负载过高。同时,针对吞吐量确定较高的服务,可以切换类型为fixed,降低创建销毁线程的成本。
第二个问题:
在spring环境下,线程池的初始化源头,都是来自于spring的事件触发。
在ServiceBean&ReferenceBean中,会监听spring的ContextRefreshEvent事件进而触发服务的发布&引用。
消息通知串行下,这个过程不会有问题,但是消息是可以并行通知的。此时多个服务的发布&引用过程可能并行,故线程池的初始化可能并行。而在初始化部分未做同步保护,这种情况下会导致并发问题。