上文《Netty框架入門》說到:如果業務處理handler耗時長,將嚴重影響可支持的並發數。
針對這一問題,經過學習,發現了可以使用ExecutionHandler來優化。
先來回顧一下沒有使用ExecutionHandler優化的流程:
1)Boss線程(接收到客戶端連接)->生成Channel->交給Worker線程池處理。
2)某個被分配到任務的Worker線程->讀完已接收的數據到ChannelBuffer->觸發ChannelPipeline中的ChannelHandler鏈來處理業務邏輯。
注意:執行ChannelHandler鏈的整個過程是同步的,如果業務邏輯的耗時較長,會將導致Work線程長時間被占用得不到釋放,從而影響了整個服務器的並發處理能力。
一、引入ExecutionHandler優化
//HttpServerPipelineFactory.java private final ExecutionHandler executionHandler = new ExecutionHandler( new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)); public class HttpServerPipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", new HttpRequestDecoder()); pipeline.addLast("encoder", new HttpResponseEncoder()); pipeline.addLast("execution", executionHandler); pipeline.addLast("handler", new HttpServerHandler()); return pipeline; } }
當我們引入ExecutionHandler后,原本同步的ChannelHandler鏈在經過 ExecutionHandler后就結束了,它會被ChannelFactory的worker線程池所回收,而剩下的ChannelHandler鏈將由ExecutionHandler的線程池接手處理。
對於ExecutionHandler需要的線程池模型,Netty提供了兩種可選:
1) MemoryAwareThreadPoolExecutor 通過對線程池內存的使用控制,可控制Executor中待處理任務的上限(超過上限時,后續進來的任務將被阻塞),並可控制單個Channel待處理任務的上限,防止內存溢出錯誤。但是它不維持同一Channel的ChannelEvents秩序,當經過ExecutionHandler后的ChannelHandler鏈中有不止一個Handler時,這些事件驅動存在混亂的可能。例如:
----------------------------------------> Timeline -------------------------------------> Thread X: --- Channel A (Event 2) --- Channel A (Event 1) -----------------------------> Thread Y: --- Channel A (Event 3) --- Channel B (Event 2) --- Channel B (Event 3) ---> Thread Z: --- Channel B (Event 1) --- Channel B (Event 4) --- Channel A (Event 4) --->
2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子類。除了MemoryAwareThreadPoolExecutor 的功能之外,它還可以保證同一Channel中處理的事件流的順序性(不同Channel使用不同的key保持事件順序),這主要是控制事件在異步處理模式下可能出現的錯誤事件順序,但它並不保證同一 Channel中的事件都在一個線程中執行(通常也沒必要)。例如:
----------------------------------------> Timeline ----------------------------------------> Thread X: --- Channel A (Event 1) --. .-- Channel B (Event 2) --- Channel B (Event 3) ---> \ / X / \ Thread Y: --- Channel B (Event 1) --' '-- Channel A (Event 2) --- Channel A (Event 3) --->
二、具有可伸縮性的OrderedMemoryAwareThreadPoolExecutor
使用策略
在大多數情況下,我們會使用OrderedMemoryAwareThreadPoolExecutor,它的構造函數要求我們提供線程池的大小,在上面的代碼中,我們使用了16這個具體的值,是一種很不好的寫法,通常情況下,我們會使用配置文件使之可變,但是在實際部署時,並不能保證實施人員能很好的去調整,故提供如下的一種寫法:
double coefficient = 0.8; //系數 int numberOfCores = Runtime.getRuntime().availableProcessors(); int poolSize = (int)(numberOfCores / (1 - coefficient));
我們可以使用poolSize取代OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576)中的那個16,因為當一個系統被開發出來后,它是CPU密集型還是IO密集型是可評估的,通過評估其密集型,調整系數即可:CPU密集型接近0,IO密集型接近1。