(轉)Netty : writeAndFlush的線程安全及並發問題


rocketmq用netty實現的網絡連接,發現它多個線程掉用一個channel連接,所以這個是線程安全的?

使用Netty編程時,我們經常會從用戶線程,而不是Netty線程池發起write操作,因為我們不能在netty的事件回調中做大量耗時操作。那么問題來了 –

1, writeAndFlush是線程安全的嗎?

2, 是否使用了鎖,導致並發性能下降呢

 

我們來看代碼 – 在DefaultChannelHandlerContext中

@Override
    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
        DefaultChannelHandlerContext next;
        next = findContextOutbound(MASK_WRITE);
        ReferenceCountUtil.touch(msg, next);
        next.invoker.invokeWrite(next, msg, promise);
        next = findContextOutbound(MASK_FLUSH);
        next.invoker.invokeFlush(next);
        return promise;
}

在DefaultChannelHandlerInvoker.java中

@Override
     public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
         if (msg == null) {
             throw new NullPointerException("msg");
         }
         if (!validatePromise(ctx, promise, true)) {
             // promise cancelled
             ReferenceCountUtil.release(msg);
             return;
         }

         if (executor.inEventLoop()) {
             invokeWriteNow(ctx, msg, promise);
         } else {
             AbstractChannel channel = (AbstractChannel) ctx.channel();
             int size = channel.estimatorHandle().size(msg);
             if (size > 0) {
                 ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer();
                 // Check for null as it may be set to null if the channel is closed already
                 if (buffer != null) {
                     buffer.incrementPendingOutboundBytes(size);
                 }
             }
             safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, promise), promise, msg);
         }
     }
private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) {
         try {
             executor.execute(task);
         } catch (Throwable cause) {
             try {
                 promise.setFailure(cause);
             } finally {
                 ReferenceCountUtil.release(msg);
             }
         }
     }

可見,writeAndFlush如果在Netty線程池內執行,則是直接write;否則,將作為一個task插入到Netty線程池執行。

 

《Netty權威指南》寫到
通過調用NioEventLoop的execute(Runnable task)方法實現,Netty有很多系統Task,創建他們的主要原因是:當I/O線程和用戶線程同時操作網絡資源時,為了防止並發操作導致的鎖競爭,將用戶線程的操作封裝成Task放入消息隊列中,由I/O線程負責執行,這樣就實現了局部無鎖化。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM