Netty框架問題記錄1--多線程下批量發送消息導致消息被覆蓋


業務背景

項目是基於Netty實現的實時課堂項目,課堂中老師需要對試卷進行講解,則老師向服務器發送一個打開試卷信息的請求,服務器獲取試卷信息,將試卷信息發送給所有的客戶端(學生和老師)。

發送給學生的時候需要在試卷信息中加上本人得分的信息。

 實現方式大致如下:

1 Paper paper = getPaper(paperId); // 根據試卷ID獲取試卷詳細信息
2 for(Client client : allClients){
3    paper.setMyScore(getMyScore(client.getUserId())); //根據userId獲取本人得分
4   client.send(paper); //向客戶端發送數據
5 }

 

結果:學生A收到的得分是學生B的得分,也就是發送給clientA的paper數據被發送給clientB的paper數據給覆蓋了,因為paper對象是同一個

 

原因分析:

雖然發送給所有客戶端的信息都是paper對象,但是是在for循環里面執行的send方法,也就是說理論上應該是clientA的send方法執行完了之后才會執行clientB的send方法,也就是說理論上應該是學生A收到的paper信息之后學生B才會收到paper信息。

所以得出的結論猜想就是send方法不是同步執行的,而是異步的。追蹤代碼進行分析

第四行的代碼client.send(paper) 實際就是調用了Channel的writeAndFlush方法

追蹤到AbstractChannel的實現如下: 

1 @Override
2     public ChannelFuture writeAndFlush(Object msg) {
3         return pipeline.writeAndFlush(msg);
4     }

 

 執行了ChannelPipeline的writeAndFlush方法,跟蹤實現類DefaultChannelPipeline的實現如下:

1 @Override
2     public final ChannelFuture writeAndFlush(Object msg) {
3         return tail.writeAndFlush(msg);
4     }

 

執行的是ChannelHandlerContext的writeAndFlush方法,跟蹤實現類AbstractChannelHandlerContext實現如下:

1 @Override
2     public ChannelFuture writeAndFlush(Object msg) {
3         return writeAndFlush(msg, newPromise());
4     }

 

執行了內部的writeAndFlush方法,繼續跟蹤如下:

 1 @Override
 2     public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
 3         if (msg == null) {
 4             throw new NullPointerException("msg");
 5         }
 6 
 7         if (isNotValidPromise(promise, true)) {
 8             ReferenceCountUtil.release(msg);
 9             // cancelled
10             return promise;
11         }
12 
13         write(msg, true, promise);
14 
15         return promise;
16     }

 

write方法如下:

 1 private void write(Object msg, boolean flush, ChannelPromise promise) {
 2         AbstractChannelHandlerContext next = findContextOutbound();
 3         final Object m = pipeline.touch(msg, next);
 4         EventExecutor executor = next.executor();
 5         if (executor.inEventLoop()) { //判斷當前線程是否是EventLoop線程
 6             if (flush) {
 7                 next.invokeWriteAndFlush(m, promise);
 8             } else {
 9                 next.invokeWrite(m, promise);
10             }
11         } else {
12             AbstractWriteTask task;
13             if (flush) {
14                 task = WriteAndFlushTask.newInstance(next, m, promise);
15             }  else {
16                 task = WriteTask.newInstance(next, m, promise);
17             }
18             safeExecute(executor, task, promise, m);
19         }
20     }

 

跟蹤到這里終於有所發現了,方法邏輯大致如下:

1.獲取channelPipeline中的head節點

2.獲取當前channel的eventLoop對象

3.判斷當前channel的eventLoop對象中的線程是否是當前線程

4.如果是EventLoop線程,則直接執行writeAndFlush方法,也就是執行寫入並且刷新到channelSocket中去

5.如果不是EventLoop線程,則會創建一個AbstractWriteTask,然后將這個task添加到這個channel的eventLoop中去 

 

分析到這里就可以總結問題的所在了,如果執行channel的writeAndFlush的線程不是work線程池中的線程,那么就會先將這個發送消息封裝成一個task,然后添加到這個channel所屬的eventLoop中的阻塞隊列中去,

然后通過EventLoop的循環來從隊列中獲取任務來執行。一旦task添加到隊列中完成,write方法就會返回。那么當下一個客戶端再執行write方法時,由於msg內容是同一個對象,就會將前一個msg的內容給覆蓋了。

從而就會出現發送給多個客戶端的內容不同,但是接收到的內容是相同的內容。而本例中,執行channel的write方法的線程確實不是eventLoop線程,因為我們采用了線程池來處理業務,當channel發送數據給服務器之后,

服務器解析channel中發送來的請求,然后執行業務處理,而執行業務的操作是采用線程池的方式實現的,所以最終通過channel發送數據給客戶端的時候實際的線程是線程池中的線程,而並不是channel所屬的EventLoop中的線程。

 

總結:

Netty中的work線程池中的EventLoop並不是一個純粹的IO線程,除了有selector輪詢IO操作之外,還會處理系統的Task和定時任務。

系統的task是通過EventLoop的execute(Runnable task)方法實現,EventLoop內部有一個LinkedBlockingQueue阻塞隊列保存task,task一般都是由於用戶線程發起的IO操作。

每個客戶端有一個channel,每一個channel會綁定一個EventLoop,所以每個channel的所以IO操作默認都是由這個EventLoop中的線程來執行。然后用戶可以在自定義的線程中執行channel的方法。

當用戶線程執行channel的IO操作時,並不會立即執行,而是將IO操作封裝成一個Task,然后添加到這個channel對應的EventLoop的隊列中,然后由這個EventLoop中的線程來執行。所以channel的所有IO操作最終還是

由同一個EventLoop中的線程來執行的,只是發起channel的IO操作的線程可以不是任何線程。

 

采用將IO操作封裝成Task的原因主要是防止並發操作導致的鎖競爭,因為如果不用task的方式,那么用戶線程和IO線程就可以同時操作網絡資源,就存儲並發問題,所以采用task的方式實現了局部的無鎖化。

所以線程池固然好用,netty固然強大,但是如果沒有深入理解,稍有不慎就可能會出現意想不到的BUG。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM