在上回《Dubbo源代碼實現六》中我們已經了解到,對於Dubbo集群中的Provider角色,有IO線程池(默認無界)和業務處理線程池(默認200)兩個線程池,所以當業務的並發比較高,或者某些業務處理變慢,業務線程池就很容易被“打滿”,拋出“RejectedExecutionException: Thread pool is EXHAUSTED! ”異常。當然,前提是我們每給Provider的線程池配置等待Queue。
既然Provider端已經拋出異常,表明自己已經受不了了,但線上我們卻發現,Consumer無動於衷,發出的那筆請求還在那里傻傻的候着,直到超時。這樣極其容易導致整個系統的“雪崩”,因為它違背了fail-fast原則。我們希望一旦Provider由於線程池被打滿而無法收到請求,Consumer應該立即感知然后快速失敗來釋放線程。后來發現,完全是Dispatcher配置得不對,默認是all,我們應該配置成message。
我們從源碼角度來看看到底是咋回事,這里先假設我們用的是Netty框架來實現IO操作,上回我們已經提到過,NettyHandler、NettyServer、MultiMessageHandler、HeartbeatHandler都實現了ChannelHandler接口,來實現接收、發送、連接斷開和異常處理等操作,目前上面提到的這四個Handler都是在IO線程池中按順序被調用,但HeartbeatHandler調用后下一個Handler是?這時候就要Dispatcher來上場了,Dispatcher是dubbo中的調度器,用來決定操作是在IO中執行還是業務線程池執行,來一張官方的圖(http://dubbo.io/user-guide/demos/線程模型.html):
上圖Dispatcher后面跟着的ThreadPool就是我們所說的業務線程池。Dispatcher分為5類,默認是all,解釋也直接參考官方截圖:
因為默認是all,所以包括請求、響應、連接、斷開和心跳都交給業務線程池來處理,則無疑加大了業務線程池的負擔,因為默認是200。每種Dispatcher,都有對應的ChannelHandler,ChannelHandler將Handler的調動形成調用鏈。如果配置的是all,那么接下來上場的就是AllChannelHandler;如果配置的是message,那么接下來上場的就是MessageOnlyChannelHandler,這些ChannelHandler都是WrappedChannelHandler的子類,WrappedChannelHandler默認把請求、響應、連接、斷開、心跳操作都交給Handler來處理:
protected final ChannelHandler handler;
public void connected(Channel channel) throws RemotingException {
handler.connected(channel);
}
public void disconnected(Channel channel) throws RemotingException {
handler.disconnected(channel);
}
public void sent(Channel channel, Object message) throws RemotingException {
handler.sent(channel, message);
}
public void received(Channel channel, Object message) throws RemotingException {
handler.received(channel, message);
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
handler.caught(channel, exception);
}
很顯然,如果直接使用WrappedChannelHandler的處理方式,那么Handler的調用會在當前的線程中完成(這里是IO線程),我們看看AllChannelHandler內部實現:
public void connected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try{
cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CONNECTED));
}catch (Throwable t) {
throw new ExecutionException("connect event", channel, getClass()+" error when process connected event ." , t);
}
}
public void disconnected(Channel channel) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try{
cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.DISCONNECTED));
}catch (Throwable t) {
throw new ExecutionException("disconnect event", channel, getClass()+" error when process disconnected event ." , t);
}
}
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
public void caught(Channel channel, Throwable exception) throws RemotingException {
ExecutorService cexecutor = getExecutorService();
try{
cexecutor.execute(new ChannelEventRunnable(channel, handler ,ChannelState.CAUGHT, exception));
}catch (Throwable t) {
throw new ExecutionException("caught event", channel, getClass()+" error when process caught event ." , t);
}
}
可以看出,AllChannelHandler覆蓋了WrappedChannelHandler所有的關鍵操作,都將其放進到ExecutorService(這里指的是業務線程池)中異步來處理,但唯一沒有異步操作的就是sent方法,該方法主要用於應答,但官方文檔卻說使用all時應答也是放到業務線程池的,寫錯了?這里,關鍵的地方來了,一旦業務線程池滿了,將拋出執行拒絕異常,將進入caught方法來處理,而該方法使用的仍然是業務線程池,所以很有可能這時業務線程池還是滿的,於是悲劇了,直接導致下游的一個HeaderExchangeHandler沒機會調用,而異常處理后的應答消息正是HeaderExchangeHandler#caught來完成的,所以最后NettyHandler#writeRequested也沒有被調用,Consumer只能死等到超時,無法收到Provider的線程池打滿異常。
從上面的分析得出結論,當Dispatcher使用all時,一旦Provider線程池被打滿,由於異常處理也需要用業務線程池,如果此時運氣好,業務線程池有空閑線程,那么Consumer將收到Provider發送的線程池打滿異常;但很可能此時業務線程池還是滿的,於是悲劇,異常處理和應答步驟也沒有線程可以跑,導致無法應答Consumer,這時候Consumer只能苦等到超時!
這也是為什么我們有時候能在Consumer看到線程池打滿異常,有時候看到的確是超時異常。
為啥我們設置Dispatcher為message可以規避此問題?直接看MessageOnlyChannelHandler的實現:
public void received(Channel channel, Object message) throws RemotingException {
ExecutorService cexecutor = executor;
if (cexecutor == null || cexecutor.isShutdown()) {
cexecutor = SHARED_EXECUTOR;
}
try {
cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
沒錯,MessageOnlyChannelHandler只覆蓋了WrappedChannelHandler的received方法,意味着只有請求處理會用到業務線程池,其他的非業務操作直接在IO線程池執行,這不正是我們想要的嗎?所以使用message的Dispatcher,不會存在Provider線程池滿了,Consumer卻還在傻等的情況,因為默認IO線程池是無界的,一定會有線程來處理異常和應答(如果你把它設置成有界,那我也沒啥好說的了)。
所以,為了減少在Provider線程池打滿時整個系統雪崩的風險,建議將Dispatcher設置成message:
<!--StartFragment--> <!--EndFragment-->
<dubbo:protocol name="dubbo" port="8888" threads="500" dispatcher="message" />