前言
后面打算開始擼其他框架源碼,而Netty對Java NIO的一層封裝,提供了一套簡單易用的API,經常被其他框架拿來用,我先花了點時間研究了下。這里整理下對源碼的解讀,以及對幾個關鍵對象的介紹。分析了之前兩篇流水賬式的源碼分析的不足,這次嘗試聚焦幾個不同重點進行分析。
個人netty注釋版本:https://gitee.com/Nortyr/netty
原netty地址:https://github.com/netty/netty
看完能收獲什么
- Java網絡編程介紹
- 一個簡單的EchoServerDemo
- Bootstrap
- Channel
- ChannelPipeline & ChannelHandler
- EventLoopGroup & EventLoop
- ChannelFuture
Java網絡編程介紹
BIO模型。龐大的線程消耗,消費消息如果很漫長,這個服務就是個災難。
public void server(int port) throws IOException{
final ServerSocket socket=new ServerSocket(port);
for (;;){
//接受連接
final Socket clientSock = socket.accept();
System.out.println("Accept connection from"+ clientSock);
//創建一個線程來處理連接
new Thread(new Runnable() {
@Override
public void run() {
OutputStream out;
try {
out=clientSock.getOutputStream();
... doSomeThing...
//將消息寫給已連接的客戶端
out.write("Hello world".getBytes(Charset.forName("UTF-8")));
out.flush();
clientSock.close();
}
...略...
}
}).start();
}
}
故此Java設計出了NIO,這里找到個Doug Lea大神的一篇 NIOppt http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf 感興趣的可以看一下,十分精簡。下面這部分會結合這個ppt進行講解,看完這個ppt的可以直接略過到下一個部分。
public static void main(String[] args) throws IOException {
//創建ServerSocketChannel,處理接入連接
ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();
//創建Selector
Selector selector=Selector.open();
//設置是否為非阻塞
serverSocketChannel.configureBlocking(false);
//創建注冊channel進selector的創立連接時間
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//綁定端口號
serverSocketChannel.socket().bind(new InetSocketAddress(8080));
while (true){
if(serverSocketChannel.isOpen()){
// 通過 Selector 選擇 Channel
int selectNums = selector.select(1000L);
if (selectNums == 0) {
continue;
}
// 遍歷可選擇的 Channel 的 SelectionKey 集合
for (SelectionKey selectKey:selector.selectedKeys()) {
// 忽略無效的 SelectionKey
if (!selectKey.isValid()) {
continue;
}
//新建立的連接
if(selectKey.isAcceptable()){
//獲取新連接創建的channel
SocketChannel socketChannel= ((ServerSocketChannel) selectKey.channel()).accept();
if(socketChannel!=null){
//設置為非阻塞
socketChannel.configureBlocking(false);
//注冊進selector
socketChannel.register(selector,SelectionKey.OP_READ);
}
}
//處理讀時間
if(selectKey.isReadable()){
SocketChannel socketChannel= (SocketChannel) selectKey.channel();
if(socketChannel!=null){
//讀取數據
ByteBuffer buffer = ByteBuffer.allocate(1024);
int bytesRead = socketChannel.read(buffer);
if(bytesRead==-1){
socketChannel.register(selector,0);
socketChannel.close();
}else{
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
System.arraycopy(buffer.array(), buffer.position(), bytes, 0, buffer.remaining());
System.out.println(new String(bytes, "UTF-8"));
}
}
}
}
}
}
}
Java NIO的幾個核心api
- Channels
- 與支持非阻塞讀取的文件,socket等建立連接。
- Buffers
- 本質是一塊內存,用於和NIO通道進行交互。
- Selectors
- 把Channel和需要的事件注冊到Selector上面,告訴一組channel中的哪一個有IO事件。
- SelectionKeys
- 維護IO事件狀態和綁定
幾個核心api的關系
- Channel和Buffer
- 2個交互關系如圖所示
- 2個交互關系如圖所示
- Selector/Channel/SelectionKey
- 一個Selector可以監聽多個Channel
- 一個Selector和Channel的綁定關系為SelectionKey
Channel
這里我們以NioServerSocketChannel
為例,看一下Channel
public interface Channel extends AttributeMap, ChannelOutboundInvoker, Comparable<Channel> {
ChannelId id();
EventLoop eventLoop();
Channel parent();
ChannelConfig config();
boolean isOpen();
boolean isRegistered();
boolean isActive();
ChannelMetadata metadata();
SocketAddress localAddress();
SocketAddress remoteAddress();
ChannelFuture closeFuture();
boolean isWritable();
long bytesBeforeUnwritable();
long bytesBeforeWritable();
Unsafe unsafe();
ChannelPipeline pipeline();
ByteBufAllocator alloc();
@Override
Channel read();
@Override
Channel flush();
/**
* 調用Javanio方法封裝
*/
interface Unsafe {
RecvByteBufAllocator.Handle recvBufAllocHandle();
/**
* 返回地址
*/
SocketAddress localAddress();
/**
* 返回遠程地址
*/
SocketAddress remoteAddress();
/**
* 注冊Channel,注冊完成后通知ChannelPromise
*/
void register(EventLoop eventLoop, ChannelPromise promise);
/**
* 將ip地址綁定到Channel,完成后通知ChannelPromise
*/
void bind(SocketAddress localAddress, ChannelPromise promise);
/**
* 連接遠程ip地址
*/
void connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise);
/**
* 斷開連接,完成后通知ChannelPromise
*/
void disconnect(ChannelPromise promise);
/**
* 關閉channel,通知ChannelPromise
*/
void close(ChannelPromise promise);
/**
* 關閉,不處罰任何事件
*/
void closeForcibly();
/**
* 注銷channel,通知ChannelPromise
*/
void deregister(ChannelPromise promise);
/**
* 調用讀取操作
*/
void beginRead();
/**
* 調用寫操作
*/
void write(Object msg, ChannelPromise promise);
/**
* 清空所有通過ChannelPromise預定的寫操作
*/
void flush();
ChannelPromise voidPromise();
/**
* 返回存儲待處理寫入請求的Channel的ChannelOutboundBuffer。
*/
ChannelOutboundBuffer outboundBuffer();
}
}
public abstract class AbstractNioChannel extends AbstractChannel {
private final SelectableChannel ch;
protected final int readInterestOp;
volatile SelectionKey selectionKey;
boolean readPending;
private final Runnable clearReadPendingRunnable = new Runnable() {
@Override
public void run() {
clearReadPending0();
}
};
private ChannelPromise connectPromise;
private Future<?> connectTimeoutFuture;
private SocketAddress requestedRemoteAddress;
}
以上是Channel
接口和AbstractNioChannel
的抽象類,這里給大家精簡了下,從Channel
定義的各個方法可以看出,netty的Channel
是對原始Channel
的一層封裝。其中所有的nio的操作封裝在了Unsafe
中,並進行了一定的增強,例如回調之類的。從AbstractNioChannel
可以更加直觀的看出,netty對Channel
SelectionKey
的封裝,並添加了自己的回調ChannelPromise
從而使方法更加易於使用。
ChannelPipeline & ChannelHandler
基礎講解
ChannelPipeline
的初始化
public abstract class AbstractChannel extends DefaultAttributeMap implements Channel {
...其余略...
private final DefaultChannelPipeline pipeline;
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
}
ChannelPipeline
內部結構概述
public class DefaultChannelPipeline implements ChannelPipeline {
final AbstractChannelHandlerContext head;
final AbstractChannelHandlerContext tail;
private final Channel channel;
private final ChannelFuture succeededFuture;
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
...略
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
... 略
}
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, HeadContext.class);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
...略
}
}
上面列舉了ChannelPipeline
的創建,以及ChannelPipeline
的內部結構。可以看出它維護了一個雙向鏈表。我們在添加handler的時候就是往這個鏈表中添加的。
ChannelHandler
添加進ChannelPipeline
后會被封裝成ChannelHandlerContext
,會判斷是ChannelInboundHandler
還是ChannelOutboundHandler
的子類,對inbound
和outbound
這兩個屬性進行賦值,ChannelInboundHandler
的子類inbound
為true,outbound
為false,ChannelOutboundHandler
反之。ChannelPipeline
內部調用方法時,會使用fireXXXXX()
的方法,會利用責任鏈模式進行調用,這時候會用到這個屬性進行判斷,是否有對應方法,從而進行調用(后面會詳細講解下)。
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
private static boolean isInbound(ChannelHandler handler) {
return handler instanceof ChannelInboundHandler;
}
private static boolean isOutbound(ChannelHandler handler) {
return handler instanceof ChannelOutboundHandler;
}
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
}
方法調用
這里就用了責任鏈的方式調用方法,確定下一個調用哪一個節點,就是通過inbound
outbound
這兩個字段決定的。
public final void read() {
...省略掉部分無用代碼
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
pipeline.fireChannelReadComplete();
if (close) {
closeOnRead(pipeline);
}
}
}
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
//如果msg實現了ReferenceCounted,進行特殊操作
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
//調用下一個節點的channelRead方法
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
invokeExceptionCaught(t);
}
} else {
fireChannelRead(msg);
}
}
下面就是調用你自定義的ChannelInboundHandler
子類的覆蓋方法了,這里就不過多贅述。
EventLoopGroup & EventLoop
初始化
EventLoopGroup初始化創建的時候,會創建對應數量的EventLoop,如果沒有指定,默認創建cpu核心數量*2個EventLoop
public abstract class MultithreadEventLoopGroup extends MultithreadEventExecutorGroup implements EventLoopGroup {
//默認線程數是cpu核心的2倍
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
}
public abstract class MultithreadEventExecutorGroup extends AbstractEventExecutorGroup {
protected MultithreadEventExecutorGroup(int nThreads, Executor executor,
EventExecutorChooserFactory chooserFactory, Object... args) {
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
//創建對應數量的EventLoop
children[i] = newChild(executor, args);
success = true;
}
}
chooser = chooserFactory.newChooser(children);
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
}
將EventLoop封裝進EventExecutorChooser
@Override
public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
方法調用
此處借助EchoServer
啟動分析EventLoop方法執行過程(不感興趣的可以跳過)
如果你服務設置了主從線程,在啟動的時候,就會使用主線程啟動服務。
final ChannelFuture initAndRegister() {
...省略部分代碼
ChannelFuture regFuture = config().group().register(channel);
}
@Override
public ChannelFuture register(Channel channel) {
//從EventExecutorChooser獲取到EventLoop注冊Channel
return next().register(channel);
}
protected abstract class AbstractUnsafe implements Unsafe {
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
}
}
}
//將任務先添加進隊列,
private void execute(Runnable task, boolean immediate) {
boolean inEventLoop = inEventLoop();
addTask(task);
if (!inEventLoop) {
//主線程輪循,監聽事件
startThread();
}
...省略無用代碼
}
protected void addTask(Runnable task) {
ObjectUtil.checkNotNull(task, "task");
if (!offerTask(task)) {
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
startThread();
比較核心單獨說一下,他會啟動一個線程
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
...省略無用代碼
try {
SingleThreadEventExecutor.this.run();
success = true;
}
}
});
}
//execute沒啥好說的了,就是啟動線程
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}
protected void run() {
for (;;) {
else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
//執行前天添加的任務
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
}
}
processSelectedKeys();
就是正常執行讀取連接的操作入口,runAllTasks( );
就是上面添加的匿名內部類的執行入口
new Runnable() {
@Override
public void run() {
register0(promise);
}
}
ChannelFuture
這個從圖中可以看出它就是對java.util.concurrent.Future
的拓展。
這里我們主要看一下它擴展的回調機制
public Promise<V> await() throws InterruptedException {
//根據有無結果判斷當前任務是否完成
if (isDone()) {
return this;
}
if (Thread.interrupted()) {
throw new InterruptedException(toString());
}
checkDeadLock();
synchronized (this) {
while (!isDone()) {
incWaiters();
try {
//線程進入等待狀態
wait();
} finally {
decWaiters();
}
}
}
return this;
}
private static boolean isDone0(Object result) {
return result != null && result != UNCANCELLABLE;
}
與之對應的就是notify了
調用回調最終會調用
private void notifyListenersNow() {
Object listeners;
synchronized (this) {
// Only proceed if there are listeners to notify and we are not already notifying listeners.
if (notifyingListeners || this.listeners == null) {
return;
}
//用完就刪
notifyingListeners = true;
listeners = this.listeners;
this.listeners = null;
}
for (;;) {
if (listeners instanceof DefaultFutureListeners) {
notifyListeners0((DefaultFutureListeners) listeners);
} else {
notifyListener0(this, (GenericFutureListener<?>) listeners);
}
synchronized (this) {
if (this.listeners == null) {
// Nothing can throw from within this method, so setting notifyingListeners back to false does not
// need to be in a finally block.
notifyingListeners = false;
return;
}
listeners = this.listeners;
this.listeners = null;
}
}
}
private static void notifyListener0(Future future, GenericFutureListener l) {
try {
//調用自定義listener
l.operationComplete(future);
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("An exception was thrown by " + l.getClass().getName() + ".operationComplete()", t);
}
}
}
Bootstrap
最后我們再來看下Bootstrap,核心部分上面已經講完了,這里就不多贅述,這就簡述下
public abstract class AbstractBootstrap{
private static final Map.Entry<ChannelOption<?>, Object>[] EMPTY_OPTION_ARRAY = new Map.Entry[0];
private static final Map.Entry<AttributeKey<?>, Object>[] EMPTY_ATTRIBUTE_ARRAY = new Map.Entry[0];
volatile EventLoopGroup group;
private volatile ChannelFactory<? extends C> channelFactory;
private volatile SocketAddress localAddress;
private final Map<ChannelOption<?>, Object> options = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> attrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
private volatile ChannelHandler handler;
}
public class ServerBootstrap extends AbstractBootstrap{
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
}
private ChannelFuture doBind(final SocketAddress localAddress) {
//初始化並注冊一個 Channel 對象,pipeline中添加ServerBootstrapAcceptor,處理連理連接事件,
// ChannelFuture regFuture = config().group().register(channel);啟動線程循環監聽事件
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
//因為是異步,不能保證是否完成
//綁定Channel端口,並注冊channel到selectionKey中
if (regFuture.isDone()) {
// 注冊完成
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// 注冊還未完成
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// EventLoop 上的注冊失敗,因此一旦我們嘗試訪問 Channel 的 EventLoop,就直接使 ChannelPromise 失敗,以免導致 IllegalStateException。
promise.setFailure(cause);
} else {
// 注冊成功,所以設置正確的執行器來使用。
// See https://github.com/netty/netty/issues/2586
promise.registered();
//綁定端口
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
至此,Netty源碼分析就結束了,大部分都已經講完,感興趣的朋友可以跟着
ServerBootstrap
的源碼跑一下,大部分都明白了,本來上一篇博客寫了ServerBootstrap
啟動過程分析,但是覺得又臭又長,就給刪了。就是跑一邊代碼,誰不會呢,這里就簡述下關鍵的部分。還有其他部分,后面看心情決定要不要寫博客了,反正也沒人看~~~