EndPoint提供基礎的網絡IO服務,用來實現網絡連接和控制,它是服務器對外I/O操作的接入點。主要任務是管理對外的socket連接,同時將建立好的socket連接交到合適的工作線程中去。
里面兩個主要的屬性類是Acceptor和Poller、SocketProcessor
Acceptor
Acceptor類實現了Runnable接口,主要用於接收網絡請求,建立連接,連接建立之后,將一個SocketChannel對象包裝成一個NioChannel,並注冊到Poller中。由Poller來負責執行數據的讀取和業務執行。
我們看一下Acceptor的run方法:
public void run() { SocketChannel socket = serverSock.accept();//從監聽的serversocket中獲取新的連接 setSocketOptions(socket);//設置通道的屬性 …… } protected boolean setSocketOptions(SocketChannel socket) { NioChannel = channel = new NioChannel(socket, bufhandler);//將通道包裝成NioChannel getPoller0().register(channel);//從poller數組中選擇一個poller,將channel注冊到poller中 …… }
Poller
Poller實現了Runnable接口,在NioEndpoint的時候,會初始化pollers數組,同時啟動pollers數組中的線程,讓pollers開始工作。
封裝后socketchannel的放入Poller線程內部維護的一個PollerEvent隊列中,然后在Poller線程運行時處理隊列,將socketchannel注冊到這個Poller的Selector上。
當事件到來的時候,Selector發現要處理的事件,通過selector.select系列方法來獲取數據,然后經由processKey到processSocket方法,封裝成一個SocketProcessor對象后,放在EndPoint的線程池中執行。
SocketChannel是如何注冊到Poller中的?
protected ConcurrentLinkedQueue<Runnable> events = new ConcurrentLinkedQueue<Runnable>();//內部維護的事件隊列 public void register(final NioChannel socket) { socket.setPoller(this); KeyAttachment key = keyCache.poll(); final KeyAttachment ka = key!=null?key:new KeyAttachment(); ka.reset(this,socket,getSocketProperties().getSoTimeout()); PollerEvent r = eventCache.poll(); ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r);//socketchannel、key一同加入到了events隊列 }
SocketChannel是如何注冊到每個Poller的selector中的?答案在event()方法中,在該方法中遍歷events隊列,依次執行run方法
public boolean events() { while ( (r = (Runnable)events.poll()) != null ) { result = true; r.run(); } …… } public void run() { if ( interestOps == OP_REGISTER ) { socket.getIOChannel().register(socket.getPoller().getSelector(), SelectionKey.OP_READ, key);//注冊到selector中 } else { final SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); int ops = key.interestOps() | interestOps; att.interestOps(ops); key.interestOps(ops); att.setCometOps(ops);//另外一種注冊方法? }}
……
Poller的執行在其run方法中,主要是將請求封裝成SocketProcessor對象,交給線程池處理。
public void run() { if ( keyCount == 0 ) hasEvents = (hasEvents | events());//通過事件機制監控感興趣的網絡事件,見上面的events()分析 //遍歷渠道上到來的key,交給processor去處理 Iterator iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; for() { SelectionKey sk = (SelectionKey) iterator.next(); KeyAttachment attachment = (KeyAttachment)sk.attachment(); processKey(sk, attachment); } }
//processKey(sk, attachment)調用processSocket(channel, SocketStatus.OPEN),最終使用Endpoint的線程池執行請求
protected boolean processSocket(NioChannel socket, SocketStatus status, boolean dispatch) { KeyAttachment attachment = (KeyAttachment)socket.getAttachment(false); attachment.setCometNotify(false); //will get reset upon next reg sc = new SocketProcessor(socket,status); if ( dispatch ) executor.execute(sc);=====>任務被封裝成SocketProcessor對象,在成功獲取線程池后,則通過線程池來進行socket數據數據的讀寫操作。 else sc.run(); …… }
SocketProcessor
請求到達socketProcess之后,首先執行其run方法,請求被轉移到handler.process,根據上下文,我們知道這了的hander指的是Http11ConnectionHandler
public void run() { (status==null)?(handler.process(socket)==Handler.SocketState.CLOSED) : (handler.event(socket,status)==Handler.SocketState.CLOSED); }

參考文獻:
http://blog.csdn.net/yanlinwang/
