rocketmq用netty實現的網絡連接,發現它多個線程掉用一個channel連接,所以這個是線程安全的?
使用Netty編程時,我們經常會從用戶線程,而不是Netty線程池發起write操作,因為我們不能在netty的事件回調中做大量耗時操作。那么問題來了 –
1, writeAndFlush是線程安全的嗎?
2, 是否使用了鎖,導致並發性能下降呢
我們來看代碼 – 在DefaultChannelHandlerContext中
@Override public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { DefaultChannelHandlerContext next; next = findContextOutbound(MASK_WRITE); ReferenceCountUtil.touch(msg, next); next.invoker.invokeWrite(next, msg, promise); next = findContextOutbound(MASK_FLUSH); next.invoker.invokeFlush(next); return promise; }
在DefaultChannelHandlerInvoker.java中
@Override public void invokeWrite(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (msg == null) { throw new NullPointerException("msg"); } if (!validatePromise(ctx, promise, true)) { // promise cancelled ReferenceCountUtil.release(msg); return; } if (executor.inEventLoop()) { invokeWriteNow(ctx, msg, promise); } else { AbstractChannel channel = (AbstractChannel) ctx.channel(); int size = channel.estimatorHandle().size(msg); if (size > 0) { ChannelOutboundBuffer buffer = channel.unsafe().outboundBuffer(); // Check for null as it may be set to null if the channel is closed already if (buffer != null) { buffer.incrementPendingOutboundBytes(size); } } safeExecuteOutbound(WriteTask.newInstance(ctx, msg, size, promise), promise, msg); } }
private void safeExecuteOutbound(Runnable task, ChannelPromise promise, Object msg) { try { executor.execute(task); } catch (Throwable cause) { try { promise.setFailure(cause); } finally { ReferenceCountUtil.release(msg); } } }
可見,writeAndFlush如果在Netty線程池內執行,則是直接write;否則,將作為一個task插入到Netty線程池執行。
《Netty權威指南》寫到
通過調用NioEventLoop的execute(Runnable task)方法實現,Netty有很多系統Task,創建他們的主要原因是:當I/O線程和用戶線程同時操作網絡資源時,為了防止並發操作導致的鎖競爭,將用戶線程的操作封裝成Task放入消息隊列中,由I/O線程負責執行,這樣就實現了局部無鎖化。