業務背景
項目是基於Netty實現的實時課堂項目,課堂中老師需要對試卷進行講解,則老師向服務器發送一個打開試卷信息的請求,服務器獲取試卷信息,將試卷信息發送給所有的客戶端(學生和老師)。
發送給學生的時候需要在試卷信息中加上本人得分的信息。
實現方式大致如下:
1 Paper paper = getPaper(paperId); // 根據試卷ID獲取試卷詳細信息 2 for(Client client : allClients){ 3 paper.setMyScore(getMyScore(client.getUserId())); //根據userId獲取本人得分 4 client.send(paper); //向客戶端發送數據 5 }
結果:學生A收到的得分是學生B的得分,也就是發送給clientA的paper數據被發送給clientB的paper數據給覆蓋了,因為paper對象是同一個
原因分析:
雖然發送給所有客戶端的信息都是paper對象,但是是在for循環里面執行的send方法,也就是說理論上應該是clientA的send方法執行完了之后才會執行clientB的send方法,也就是說理論上應該是學生A收到的paper信息之后學生B才會收到paper信息。
所以得出的結論猜想就是send方法不是同步執行的,而是異步的。追蹤代碼進行分析
第四行的代碼client.send(paper) 實際就是調用了Channel的writeAndFlush方法
追蹤到AbstractChannel的實現如下:
1 @Override 2 public ChannelFuture writeAndFlush(Object msg) { 3 return pipeline.writeAndFlush(msg); 4 }
執行了ChannelPipeline的writeAndFlush方法,跟蹤實現類DefaultChannelPipeline的實現如下:
1 @Override 2 public final ChannelFuture writeAndFlush(Object msg) { 3 return tail.writeAndFlush(msg); 4 }
執行的是ChannelHandlerContext的writeAndFlush方法,跟蹤實現類AbstractChannelHandlerContext實現如下:
1 @Override 2 public ChannelFuture writeAndFlush(Object msg) { 3 return writeAndFlush(msg, newPromise()); 4 }
執行了內部的writeAndFlush方法,繼續跟蹤如下:
1 @Override 2 public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { 3 if (msg == null) { 4 throw new NullPointerException("msg"); 5 } 6 7 if (isNotValidPromise(promise, true)) { 8 ReferenceCountUtil.release(msg); 9 // cancelled 10 return promise; 11 } 12 13 write(msg, true, promise); 14 15 return promise; 16 }
write方法如下:
1 private void write(Object msg, boolean flush, ChannelPromise promise) { 2 AbstractChannelHandlerContext next = findContextOutbound(); 3 final Object m = pipeline.touch(msg, next); 4 EventExecutor executor = next.executor(); 5 if (executor.inEventLoop()) { //判斷當前線程是否是EventLoop線程 6 if (flush) { 7 next.invokeWriteAndFlush(m, promise); 8 } else { 9 next.invokeWrite(m, promise); 10 } 11 } else { 12 AbstractWriteTask task; 13 if (flush) { 14 task = WriteAndFlushTask.newInstance(next, m, promise); 15 } else { 16 task = WriteTask.newInstance(next, m, promise); 17 } 18 safeExecute(executor, task, promise, m); 19 } 20 }
跟蹤到這里終於有所發現了,方法邏輯大致如下:
1.獲取channelPipeline中的head節點
2.獲取當前channel的eventLoop對象
3.判斷當前channel的eventLoop對象中的線程是否是當前線程
4.如果是EventLoop線程,則直接執行writeAndFlush方法,也就是執行寫入並且刷新到channelSocket中去
5.如果不是EventLoop線程,則會創建一個AbstractWriteTask,然后將這個task添加到這個channel的eventLoop中去
分析到這里就可以總結問題的所在了,如果執行channel的writeAndFlush的線程不是work線程池中的線程,那么就會先將這個發送消息封裝成一個task,然后添加到這個channel所屬的eventLoop中的阻塞隊列中去,
然后通過EventLoop的循環來從隊列中獲取任務來執行。一旦task添加到隊列中完成,write方法就會返回。那么當下一個客戶端再執行write方法時,由於msg內容是同一個對象,就會將前一個msg的內容給覆蓋了。
從而就會出現發送給多個客戶端的內容不同,但是接收到的內容是相同的內容。而本例中,執行channel的write方法的線程確實不是eventLoop線程,因為我們采用了線程池來處理業務,當channel發送數據給服務器之后,
服務器解析channel中發送來的請求,然后執行業務處理,而執行業務的操作是采用線程池的方式實現的,所以最終通過channel發送數據給客戶端的時候實際的線程是線程池中的線程,而並不是channel所屬的EventLoop中的線程。
總結:
Netty中的work線程池中的EventLoop並不是一個純粹的IO線程,除了有selector輪詢IO操作之外,還會處理系統的Task和定時任務。
系統的task是通過EventLoop的execute(Runnable task)方法實現,EventLoop內部有一個LinkedBlockingQueue阻塞隊列保存task,task一般都是由於用戶線程發起的IO操作。
每個客戶端有一個channel,每一個channel會綁定一個EventLoop,所以每個channel的所以IO操作默認都是由這個EventLoop中的線程來執行。然后用戶可以在自定義的線程中執行channel的方法。
當用戶線程執行channel的IO操作時,並不會立即執行,而是將IO操作封裝成一個Task,然后添加到這個channel對應的EventLoop的隊列中,然后由這個EventLoop中的線程來執行。所以channel的所有IO操作最終還是
由同一個EventLoop中的線程來執行的,只是發起channel的IO操作的線程可以不是任何線程。
采用將IO操作封裝成Task的原因主要是防止並發操作導致的鎖競爭,因為如果不用task的方式,那么用戶線程和IO線程就可以同時操作網絡資源,就存儲並發問題,所以采用task的方式實現了局部的無鎖化。
所以線程池固然好用,netty固然強大,但是如果沒有深入理解,稍有不慎就可能會出現意想不到的BUG。