Netty如何處理連接事件
上文講了Netty如何綁定端口,現在我們來閱讀下netty如何處理connect事件。上文我們說了NioEventLoop啟動后不斷去調用select的事件,當客戶端連接時候,回觸發processSelectedKeys方法,然后調用 processSelectedKey方法
SelectKey | 說明 |
---|---|
OP_READ | 讀 1 |
OP_WRITE | 寫 4 |
OP_CONNECT | 客戶端connect 8 |
OP_ACCEPT | 連接 16 |
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 省略。。。
try {
// 16 是連接事件
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 調用NioMessageUnsafe的read方法
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
通過代碼我們知道調用的是NioMessageUnsafe的read方法,進入方法我們源碼,我門發現其調用了NioServerSocketChannel的doReadMessages方法,
try {
do {
// 調用NioServerSocketChannel的doReadMessages方法
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);
} while (continueReading(allocHandle));
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 觸發fireChannelRead事件
pipeline.fireChannelRead(readBuf.get(i));
}
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
//這里處理java的accpet事件
SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 丟到buf里 然后觸發channelRead事件
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
之前文章我門在綁定端口時候,pipiline中添加了一個ServerBootstrapAcceptor類,我門看下其channelRead方法的實現
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 這個其實就是上面的NioSocketChannel
final Channel child = (Channel) msg;
// 我們啟動設置的ChannelInitializer
child.pipeline().addLast(childHandler);
//設置 options
setChannelOptions(child, childOptions, logger); //設置 attributes
setAttributes(child, childAttrs);
try {
// 注冊child?
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
接下來我們繼續看下childGroup.register(child)如何注冊channel的。邏輯就是調用EveentLoopGrpoup的next方法分配SingleThreadEventLoop,調用其register方法
@Override
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
// 最終走到NioSocketChannel的unsafe對象,registry這個方法在Abstract&AbstractUnsafe對象里實現
promise.channel().unsafe().register(this, promise);
return promise;
}
我們看下register方法實現,我們發現這個其實跟NioServerSocketChannel的注冊是一樣的,綁定EventLoop,並且開啟EventLoop,然后調用其
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
// 綁定當前的EventLoop
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
// 第一次提交Runnable會啟動EventLoop線程去啟動事件,具體之前Netty綁定端口文章我寫過。如何去啟動的
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
繼續看下register0方法
private void register0(ChannelPromise promise) {
try {
// check if the channel is still open as it could be closed in the mean time when the register
// call was outside of the eventLoop
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 開始注冊綁定selectKey
doRegister();
neverRegistered = false;
registered = true;
// Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
// user may already fire events through the pipeline in the ChannelFutureListener.
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// Only fire a channelActive if the channel has never been registered. This prevents firing
// multiple channel actives if the channel is deregistered and re-registered.
if (isActive()) {
//第一次注冊時候會調用HeadContext的channelActive完成讀事件的注冊
if (firstRegistration) {
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
// This channel was registered before and autoRead() is set. This means we need to begin read
// again so that we process inbound data.
//
// See https://github.com/netty/netty/issues/4805
beginRead();
}
}
} catch (Throwable t) {
// Close the channel directly to avoid FD leak.
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
這邊如何調用鏈路比較長 我這邊以斷點形式給大家展示一下

至此netty完成了處理客戶端的連接,綁定EventLoop,並且開啟EventLoop,完成讀事件的注冊,
結束
❝識別下方二維碼!回復: 「
❞入群
」 ,掃碼加入我們交流群!
