一、簡介
NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的( JDK 1.4中的java.nio.*包中引入新的Java I/O庫)。但現在都稱之為Non-blocking I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。
NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。
新增的着兩種通道都支持阻塞和非阻塞兩種模式。
阻塞模式使用就像傳統中的支持一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。
對於低負載、低並發的應用程序,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高並發的(網絡)應用,應使用NIO的非阻塞模式來開發。
二、NIO實現基礎
NIO對應linux IO 模型的IO多路復用模型,netty、tomcat(tomcat 6及以后版本,tomcat 6之前是基於BIO)的實現也是基於NIO。
I/O復用模型,是同步非阻塞,這里的非阻塞是指I/O讀寫,對應的是recvfrom操作,因為數據報文已經准備好,無需阻塞。說它是同步,是因為,這個執行是在一個線程里面執行的。有時候,還會說它又是阻塞的,實際上是指阻塞在select上面,必須等到讀就緒、寫就緒等網絡事件。有時候我們又說I/O復用是多路復用,這里的多路是指N個連接,每一個連接對應一個channel,或者說多路就是多個channel。復用,是指多個連接復用了一個線程或者少量線程(在Tomcat中是Math.min(2,Runtime.getRuntime().availableProcessors()))。
NIO的注冊、輪詢等待、讀寫操作協作關系如下圖:
1、多路復用器 Selector
Selector的英文含義是“選擇器”,Selector是Java NIO 編程的基礎,也可以稱為為“輪詢代理器”、“事件訂閱器”、“channel容器管理機”都行。
Selector中也會維護一個“已經注冊的Channel”的容器(select使用數組,poll使用鏈表,epoll是紅黑樹+雙向鏈表,因為JDK使用了epoll()代替傳統的select實現,所以沒有最大連接句柄1024/2048(32*32/32*64)的限制,具體原理請轉到7層網絡以及5種Linux IO模型以及相應IO基礎文章末尾部分),應用程序將向Selector對象注冊需要它關注的Channel,以及具體的某一個Channel會對哪些IO事件感興趣。
Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢注冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel的集合,進行后續的I/O操作。所以,只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。
2、通道 Channel
通道表示打開到 IO 設備(例如:文件、套接字,所以Channel有這兩種實現:SelectableChannel 用戶網絡讀寫;FileChannel 用於文件操作)的連接。
其中SelectableChannel有以下幾種實現:
- 所有被Selector(選擇器)注冊的通道,只能是繼承了SelectableChannel類的子類。
- ServerSocketChannel:應用服務器程序的監聽通道。只有通過這個通道,應用程序才能向操作系統注冊支持“多路復用IO”的端口監聽。同時支持UDP協議和TCP協議。
- ScoketChannel:TCP Socket套接字的監聽通道,一個Socket套接字對應了一個客戶端IP:端口 到 服務器IP:端口的通信連接。
- DatagramChannel:UDP 數據報文的監聽通道。
Channel相比IO中的Stream流更加高效(底層的操作系統的通道一般都是全雙工的,可以異步雙向傳輸,所以全雙工的Channel比流能更好的映射底層操作系統的API),但是必須和Buffer一起使用(若需要使用 NIO 系統,需要獲取用於連接 IO 設備的Channel通道以及用於容納數據的緩沖區Buffer。然后操作緩沖區,對數據進行處理。即通道中的數據總是要先讀到一個Buffer,或者總是要從一個Buffer中寫入)。也可以通過Channel通道向操作系統寫數據。
3、緩沖區Buffer
Buffer是Channel操作讀寫的組件,包含一些要寫入或者讀出的數據。在NIO庫中,所有數據都是用緩沖區處理的。在讀取數據時,它是直接讀到緩沖區中的;在寫入數據時,也是寫入到緩沖區中。任何時候訪問NIO中的數據,都是通過緩沖區進行操作。緩沖區實際上是一個數組,並提供了對數據結構化訪問以及維護讀寫位置等信息。
具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的接口:Buffer。但是在我們網絡傳輸中都是使用byte數據類型。如圖:
Buffer中有三個重要參數:
位置(position):當前緩沖區(Buffer)的位置,將從該位置往后讀或寫數據。
容量(capacity):緩沖區的總容量上限。
上限(limit):緩沖區的實際容量大小。
寫模式下Position為寫入多少數據的位數標識,Limit等於Capacity;讀模式下從Position讀到limit,Position為0。寫讀模式切換使用flip()方法。
flip方法將Buffer從寫模式切換到讀模式。調用flip()方法會將position設回0,並將limit設置成之前position的值。
public Buffer flip() { limit = position; position = 0; mark = -1; return this; }
三、代碼實現
1、服務端代碼實現
public class MultiplexerNioServer implements Runnable { private ServerSocketChannel serverSocketChannel; private Selector selector; private volatile boolean stop = false; /** * 初始化多路復用器 綁定監聽端口 * * @param port */ public MultiplexerNioServer(int port) { try { serverSocketChannel = ServerSocketChannel.open();//獲得一個serverChannel selector = Selector.open();////創建選擇器 獲得一個多路復用器 serverSocketChannel.configureBlocking(false);//設置為非阻塞模式 如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式 serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);//綁定一個端口和等待隊列長度 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//把selector注冊到channel,關注鏈接事件 } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; // 優雅停機 } public void run() { while (!stop) { try { //無論是否有讀寫事件發生,selector每隔1s被喚醒一次。如果一定時間內沒有事件,就需要做些其他的事情,就可以使用帶超時的 int client = selector.select(1000); System.out.println("1:"+client); // 阻塞,只有當至少一個注冊的事件發生的時候才會繼續. // int client = selector.select(); 不設置超時時間為線程阻塞,但是IO上支持多個文件描述符就緒 if (client == 0) { continue; } System.out.println("2:"+client); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { //處理事件 handle(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Throwable e) { e.printStackTrace(); }finally { } } if (selector != null) { // selector關閉后會自動釋放里面管理的資源 try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } public void handle(SelectionKey key) throws IOException { if (key.isValid()) { //連接事件 if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 通過ServerSocketChannel的accept創建SocketChannel實例 // 完成該操作意味着完成TCP三次握手,TCP物理鏈路正式建立 SocketChannel sc = ssc.accept();//3次握手 sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ);//連接建立后關注讀事件 } //讀事件 if (key.isReadable()) { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer readbuffer = ByteBuffer.allocate(1024);//寫 0 1024 1024 // ByteBuffer readbuffer = ByteBuffer.allocateDirect(1024); //申請直接內存,也就是堆外內存 // 讀取請求碼流,返回讀取到的字節數 int readBytes = socketChannel.read(readbuffer); // 讀取到字節,對字節進行編解碼 if (readBytes > 0) { // 將緩沖區當前的limit設置為position=0,用於后續對緩沖區的讀取操作 readbuffer.flip();//讀寫模式反轉 // 將緩沖區可讀字節數組復制到新建的數組中 byte[] bytes = new byte[readbuffer.remaining()]; readbuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("input is:" + body); res(socketChannel, body); }else if(readBytes < 0){ // 鏈路已經關閉 釋放資源 key.cancel(); socketChannel.close(); }else{ // 沒有讀到字節忽略 } } } } private void res(SocketChannel channel, String response) throws IOException { if (response != null && response.length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); System.out.println("res end"); } } }
可以看到,創建NIO服務端的主要步驟如下:
1、打開ServerSocketChannel,監聽客戶端連接;
2、綁定監聽端口,設置連接為非阻塞模式;
3、創建Reactor線程,創建多路復用器並啟動線程。
4、將ServerSocketChannel注冊到Reactor線程中的Selector上,監聽ACCEPT事件
5、Selector輪詢准備就緒的key
6、Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路
7、設置客戶端鏈路為非阻塞模式
8、將新接入的客戶端連接注冊到Reactor線程的Selector上,監聽讀操作,讀取客戶端發送的網絡消息
9、異步讀取客戶端消息到緩沖區
10、對Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task
11、將應答消息編碼為Buffer,調用SocketChannel的write將消息異步發送給客戶端
2、服務端啟動
public class NioServer { public static void main(String[] args) { int port=8080; MultiplexerNioServer nioServer=new MultiplexerNioServer(port); new Thread(nioServer,"nioserver-001").start(); } }
啟動服務端后,打印堆棧信息可以看到,會一直阻塞在 select() 操作,等待請求的到來:
通過控制台手動建立連接,測試如下:
3、客戶端代碼實現
public class NioClientHandler implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public NioClientHandler(String host, int port) { this.host = host; this.port = port; try { // 創建選擇器 selector = Selector.open(); // 打開監聽通道 socketChannel = SocketChannel.open(); // 如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式 socketChannel.configureBlocking(false); // 開啟非阻塞模式 } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void run() { try { doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { int wait=selector.select(1000); if(wait==0){ continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handle(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void doConnect() throws IOException { if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); }else{ socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void handle(SelectionKey key) throws IOException { if (key.isValid()) { SocketChannel sc = (SocketChannel) key.channel(); if (key.isConnectable()) { if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc); } else { System.exit(1); } } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes=new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body=new String(bytes,"UTF-8"); System.out.println("res"+body); this.stop=true; }else if(readBytes<0){ key.cancel(); sc.close(); } } } } private void doWrite(SocketChannel sc) throws IOException { // 將消息編碼為字節數組 byte[] request = "0123456789".getBytes(); // 根據數組容量創建ByteBuffer ByteBuffer writeBuffer = ByteBuffer.allocate(request.length); // 將字節數組復制到緩沖區 writeBuffer.put(request); // flip讀寫切換操作 writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("寫入完成"); } } }
4、啟動客戶端
public class NioClient { public static void main(String[] args) { new Thread(new NioClientHandler("localhost", 8080), "nioClient-001").start(); } }
啟動客戶端后,會向服務端寫一些數據,然后再從服務端接收數據,如下圖所示:
客戶端獲得服務端的回復后會自動斷開同服務端的連接,客戶端的一次請求就此完成。
四、BIO、NIO、AIO對比