我們首先需要澄清一個概念:NIO到底是什么的簡稱?有人稱之為New I/O,因為它相對於之前的I/O類庫是新增的,所以被稱為New I/O,這是它的官方叫法。但是,由於之前老的I/O類庫是阻塞I/O,New I/O類庫的目標就是要讓Java支持非阻塞I/O,所以,更多的人喜歡稱之為非阻塞I/O(Non-block I/O),由於非阻塞I/O更能夠體現NIO的特點。
與Socket類和ServerSocket類相對應,NIO也提供了SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。這兩種新增的通道支持阻塞和非阻塞兩種模式。阻塞模式使用非常簡單,但是性能和可靠性都不好,非阻塞模式則正好相反。開發人員一般可以根據自己的需要來選擇合適的模式,一般來說,低負載、低並發的應用程序可以選擇同步阻塞I/O以降低編程復雜度,但是對於高負載、高並發的網絡應用,需要使用NIO的非阻塞模式進行開發。
NIO類庫簡介
新的輸入/輸出(NIO)庫是在JDK 1.4中引入的。NIO彌補了原來同步阻塞I/O的不足,它在標准Java代碼中提供了高速的、面向塊的I/O。通過定義包含數據的類,以及通過以塊的形式處理這些數據,NIO不用使用本機代碼就可以利用低級優化,這是原來的I/O包所無法做到的。
1.緩沖區Buffer
我們首先介紹緩沖區(Buffer)的概念,Buffer是一個對象,它包含一些要寫入或者要讀出的數據。在NIO類庫中加入Buffer對象,體現了新庫與原I/O的一個重要區別。在面向流的I/O中,可以將數據直接寫入或者將數據直接讀到Stream對象中。
在NIO庫中,所有數據都是用緩沖區處理的。在讀取數據時,它是直接讀到緩沖區中的;在寫入數據時,寫入到緩沖區中。任何時候訪問NIO中的數據,都是通過緩沖區進行操作。
緩沖區實質上是一個數組。通常它是一個字節數組(ByteBuffer),也可以使用其他種類的數組。但是一個緩沖區不僅僅是一個數組,緩沖區提供了對數據的結構化訪問以及維護讀寫位置(limit)等信息。
最常用的緩沖區是ByteBuffer,一個ByteBuffer提供了一組功能用於操作byte數組。除了ByteBuffer,還有其他的一些緩沖區,事實上,每一種Java基本類型(除了Boolean類型)都對應有一種緩沖區,具體如下:
ByteBuffer:字節緩沖區
CharBuffer:字符緩沖區
ShortBuffer:短整型緩沖區
IntBuffer:整形緩沖區
LongBuffer:長整形緩沖區
FloatBuffer:浮點型緩沖區
DoubleBuffer:雙精度浮點型緩沖區
每一個Buffer類都是Buffer接口的一個子實例。除了ByteBuffer,每一個 Buffer類都有完全一樣的操作,只是它們所處理的數據類型不一樣。因為大多數標准I/O操作都使用ByteBuffer,所以它除了具有一般緩沖區的操作之外還提供一些特有的操作,方便網絡讀寫。
2.通道Channel
Channel是一個通道,可以通過它讀取和寫入數據,它就像自來水管一樣,網絡數據通過Channel讀取和寫入。通道與流的不同之處在於通道是雙向的,流只是在一個方向上移動(一個流必須是InputStream或者OutputStream的子類),而且通道可以用於讀、寫或者同時用於讀寫。
因為Channel是全雙工的,所以它可以比流更好地映射底層操作系統的API。特別是在UNIX網絡編程模型中,底層操作系統的通道都是全雙工的,同時支持讀寫操作。
自頂向下看,前三層主要是Channel接口,用於定義它的功能,后面是一些具體的功能類(抽象類),從類圖可以看出,實際上Channel可以分為兩大類:分別是用於網絡讀寫的SelectableChannel和用於文件操作的FileChannel。
3.多路復用器Selector
多路復用器Selector,它是Java NIO編程的基礎,熟練地掌握Selector對於掌握NIO編程至關重要。多路復用器提供選擇已經就緒的任務的能力。簡單來講,Selector會不斷地輪詢注冊在其上的Channel,如果某個Channel上面有新的TCP連接接入、讀和寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel的集合,進行后續的I/O操作。
一個多路復用器Selector可以同時輪詢多個Channel,由於JDK使用了epoll()代替傳統的select實現,所以它並沒有最大連接句柄1024/2048的限制。這也就意味着只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端,這確實是個非常巨大的進步。
NIO服務端序列圖
下面,我們對NIO服務端的主要創建過程進行講解和說明,作為NIO的基礎入門,我們將忽略掉一些在生產環境中部署所需要的一些特性和功能。
步驟一:打開ServerSocketChannel,用於監聽客戶端的連接,它是所有客戶端連接的父管道,代碼示例如下。
ServerSocketChannel acceptorSvr = ServerSocketChannel.open();
步驟二:綁定監聽端口,設置連接為非阻塞模式,示例代碼如下。
acceptorSvr.socket().bind(new InetSocketAddress(InetAddress.getByName(“IP”), port));
acceptorSvr.configureBlocking(false);
步驟三:創建Reactor線程,創建多路復用器並啟動線程,代碼如下。
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
步驟四:將ServerSocketChannel注冊到Reactor線程的多路復用器Selector上,監聽ACCEPT事件,代碼如下。
SelectionKey key = acceptorSvr.register( selector, SelectionKey.OP_ACCEPT, ioHandler);
步驟五:多路復用器在線程run方法的無限循環體內輪詢准備就緒的Key,代碼如下。
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey)it.next();
// ... deal with I/O event ...
}
步驟六:多路復用器監聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手,建立物理鏈路,代碼示例如下。
SocketChannel channel = svrChannel.accept();
步驟七:設置客戶端鏈路為非阻塞模式,示例代碼如下。
channel.configureBlocking(false);
channel.socket().setReuseAddress(true);
......
步驟八:將新接入的客戶端連接注冊到Reactor線程的多路復用器上,監聽讀操作,用來讀取客戶端發送的網絡消息,代碼如下。
SelectionKey key = socketChannel.register( selector, SelectionKey.OP_READ, ioHandler);
步驟九:異步讀取客戶端請求消息到緩沖區,示例代碼如下。
int readNumber = channel.read(receivedBuffer);
步驟十:對ByteBuffer進行編解碼,如果有半包消息指針reset,繼續讀取后續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排。
Object message = null;
while(buffer.hasRemain())
{
byteBuffer.mark();
Object message = decode(byteBuffer);
if (message == null)
{
byteBuffer.reset();
break;
}
messageList.add(message );
}
if (!byteBuffer.hasRemain())
byteBuffer.clear();
else
byteBuffer.compact();
if (messageList != null & !messageList.isEmpty())
{
for(Object messageE : messageList)
handlerTask(messageE);
}
步驟十一:將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端,示例代碼如下。
socketChannel.write(buffer);
注意:如果發送區TCP緩沖區滿,會導致寫半包,此時,需要注冊監聽寫操作位,循環寫,直到整包消息寫入TCP緩沖區。
服務端代碼示例:
import java.io.IOException; public class TimeServer { public static void main(String[] args) throws IOException { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默認值 } } //MultiplexerTimeServer的多路復用類,它是個一個獨立的線程, //負責輪詢多路復用器Selector,可以處理多個客戶端的並發接入。 MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread (timeServer, "NIO-MultiplexerTimeServer-001").start(); } } import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; //在構造方法中進行資源初始化,創建多路復用器Selector、ServerSocketChannel,對Channel和TCP參數進行配置。 //例如,將ServerSocketChannel設置為異步非阻塞模式,它的backlog設置為1024。 //系統資源初始化成功后,將ServerSocket Channel注冊到Selector,監聽SelectionKey.OP_ACCEPT操作位;如果資源初始化失敗(例如端口被占用),則退出。 public MultiplexerTimeServer(int port) { try { selector = Selector.open(); servChannel = ServerSocketChannel.open(); servChannel.configureBlocking(false); servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } @Override public void run() { while (!stop) { try { //在線程的run方法的while循環體中循環遍歷selector,它的休眠時間為1s, //無論是否有讀寫等事件發生,selector每隔1s都被喚醒一次,selector也提供了一個無參的select方法。 //當有處於就緒狀態的Channel時,selector將返回就緒狀態的Channel的SelectionKey集合, //通過對就緒狀態的Channel集合進行迭代,可以進行網絡的異步讀寫操作。 selector.select(1000); Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = (SelectionKey) it.next(); it.remove(); try { handleInput(key);//這里可以用線程池啟線程去單獨處理客戶端的請求業務 } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路復用器關閉后,所有注冊在上面的Channel和Pipe等資源都會被自動去注冊並關閉,所以不需要重復釋放資源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { //根據SelectionKey的操作位進行判斷即可獲知網絡事件的類型, if (key.isAcceptable()) { //通過ServerSocketChannel的accept接收客戶端的連接請求並創建SocketChannel實例, //完成上述操作后,相當於完成了TCP的三次握手,TCP物理鏈路正式建立。 //注意,我們需要將新創建的SocketChannel設置為異步非阻塞,同時也可以對其TCP參數進行設置, //例如TCP接收和發送緩沖區的大小等,作為入門的例子,沒有進行額外的參數設置。 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); // Add the new connection to the selector sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { //首先創建一個ByteBuffer,由於我們事先無法得知客戶端發送的碼流大小, //作為例程,我們開辟一個1M的緩沖區。然后調用SocketChannel的read方法讀取請求碼流。 //注意,由於我們已經將SocketChannel設置為異步非阻塞模式,因此它的read是非阻塞的。 //使用返回值進行判斷,看讀取到的字節數 SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); //返回值有以下三種可能的結果 //返回值大於0:讀到了字節,對字節進行編解碼; //返回值等於0:沒有讀取到字節,屬於正常場景,忽略; //返回值為-1:鏈路已經關閉,需要關閉SocketChannel,釋放資源。 if (readBytes > 0) { //當讀取到碼流以后,我們進行解碼,首先對readBuffer進行flip操作, //它的作用是將緩沖區當前的limit設置為position,position設置為0,用於后續對緩沖區的讀取操作。 //然后根據緩沖區可讀的字節個數創建字節數組, //調用ByteBuffer的get操作將緩沖區可讀的字節數組復制到新創建的字節數組中, //最后調用字符串的構造函數創建請求消息體並打印。 //如果請求指令是"QUERY TIME ORDER"則把服務器的當前時間編碼后返回給客戶端 readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; //異步發送應答消息給客戶端 doWrite(sc, currentTime); } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else ; // 讀到0字節,忽略 } } } private void doWrite(SocketChannel channel, String response) throws IOException { //首先將字符串編碼成字節數組,根據字節數組的容量創建ByteBuffer, //調用ByteBuffer的put操作將字節數組復制到緩沖區中,然后對緩沖區進行flip操作, //最后調用SocketChannel的write方法將緩沖區中的字節數組發送出去。 //需要指出的是,由於SocketChannel是異步非阻塞的,它並不保證一次能夠把需要發送的字節數組發送完, //此時會出現“寫半包”問題,我們需要注冊寫操作,不斷輪詢Selector將沒有發送完的ByteBuffer發送完畢, //可以通過ByteBuffer的hasRemain()方法判斷消息是否發送完成。 //此處僅僅是個簡單的入門級例程,沒有演示如何處理“寫半包”場景。 if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
NIO客戶端序列圖
步驟一:打開SocketChannel,綁定客戶端本地地址(可選,默認系統會隨機分配一個可用的本地地址),示例代碼如下。
SocketChannel clientChannel = SocketChannel.open();
步驟二:設置SocketChannel為非阻塞模式,同時設置客戶端連接的TCP參數,示例代碼如下。
clientChannel.configureBlocking(false);
socket.setReuseAddress(true);
socket.setReceiveBufferSize(BUFFER_SIZE);
socket.setSendBufferSize(BUFFER_SIZE);
步驟三:異步連接服務端,示例代碼如下。
boolean connected=clientChannel.connect(new InetSocketAddress(“ip”,port));
步驟四:判斷是否連接成功,如果連接成功,則直接注冊讀狀態位到多路復用器中,如果當前沒有連接成功(異步連接,返回false,說明客戶端已經發送sync包,服務端沒有返回ack包,物理鏈路還沒有建立),示例代碼如下。
if (connected)
{
clientChannel.register( selector, SelectionKey.OP_READ, ioHandler);
}
else
{
clientChannel.register( selector, SelectionKey.OP_CONNECT, ioHandler);
}
步驟五:向Reactor線程的多路復用器注冊OP_CONNECT狀態位,監聽服務端的TCP ACK應答,示例代碼如下。
clientChannel.register( selector, SelectionKey.OP_CONNECT, ioHandler);
步驟六:創建Reactor線程,創建多路復用器並啟動線程,代碼如下。
Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
步驟七:多路復用器在線程run方法的無限循環體內輪詢准備就緒的Key,代碼如下。
int num = selector.select();
Set selectedKeys = selector.selectedKeys();
Iterator it = selectedKeys.iterator();
while (it.hasNext()) {
SelectionKey key = (SelectionKey)it.next();
// ... deal with I/O event ...
}
步驟八:接收connect事件進行處理,示例代碼如下。
if (key.isConnectable())
handlerConnect();
步驟九:判斷連接結果,如果連接成功,注冊讀事件到多路復用器,示例代碼如下。
if (channel.finishConnect())
registerRead();
步驟十:注冊讀事件到多路復用器,示例代碼如下。
clientChannel.register( selector, SelectionKey.OP_READ, ioHandler);
步驟十一:異步讀客戶端請求消息到緩沖區,示例代碼如下。
int readNumber = channel.read(receivedBuffer);
步驟十二:對ByteBuffer進行編解碼,如果有半包消息接收緩沖區Reset,繼續讀取后續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排,示例代碼如下。
Object message = null;
while(buffer.hasRemain())
{
byteBuffer.mark();
Object message = decode(byteBuffer);
if (message == null)
{
byteBuffer.reset();
break;
}
messageList.add(message );
}
if (!byteBuffer.hasRemain())
byteBuffer.clear();
else
byteBuffer.compact();
if (messageList != null & !messageList.isEmpty())
{
for(Object messageE : messageList)
handlerTask(messageE);
}
步驟十三:將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端,示例代碼如下。
socketChannel.write(buffer);
客戶端代碼示例:
public class TimeClient { public static void main(String[] args) { int port = 8080; new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient- 001").start(); } } import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host, int port) { //構造函數用於初始化NIO的多路復用器和SocketChannel對象。 //需要注意的是,創建SocketChannel之后,需要將其設置為異步非阻塞模式。 //我們可以設置SocketChannel的TCP參數,例如接收和發送的TCP緩沖區大小。 this.host = host == null ? "127.0.0.1" : host; this.port = port; try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } @Override public void run() { try { //作為示例,連接是成功的,所以不需要做重連操作,因此將其放到循環之前。 doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { //在循環體中輪詢多路復用器Selector,當有就緒的Channel時,執行handleInput(key)方法 selector.select(1000); Set selectedKeys = selector.selectedKeys(); Iterator it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = (SelectionKey) it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) key.channel().close(); } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } //線程退出循環后,我們需要對連接資源進行釋放,以實現“優雅退出”. //由於多路復用器上可能注冊成千上萬的Channel或者pipe,如果一一對這些資源進行釋放顯然不合適。 //因此,JDK底層會自動釋放所有跟此多路復用器關聯的資源。 //多路復用器關閉后,所有注冊在上面的Channel和Pipe等資源都會被自動去注冊並關閉,所以不需要重復釋放資源 if (selector != null) try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } private void handleInput(SelectionKey key) throws IOException { //我們首先對SelectionKey進行判斷,看它處於什么狀態。 if (key.isValid()) { // 判斷是否連接成功 SocketChannel sc = (SocketChannel) key.channel(); //如果是處於連接狀態,說明服務端已經返回ACK應答消息。 //這時我們需要對連接結果進行判斷,調用SocketChannel的finishConnect()方法, //如果返回值為true,說明客戶端連接成功;如果返回值為false或者直接拋出IOException,說明連接失敗。 //在本例程中,返回值為true,說明連接成功。 if (key.isConnectable()) { if (sc.finishConnect()) { //將SocketChannel注冊到多路復用器上,注冊SelectionKey.OP_READ操作位, //監聽網絡讀操作,然后發送請求消息給服務端。 sc.register(selector, SelectionKey.OP_READ); doWrite(sc); } else System.exit(1);// 連接失敗,進程退出 } //客戶端是如何讀取時間服務器應答消息的。 if (key.isReadable()) { //如果客戶端接收到了服務端的應答消息,則SocketChannel是可讀的, //由於無法事先判斷應答碼流的大小,我們就預分配1M的接收緩沖區用於讀取應答消息, //調用SocketChannel的read()方法進行異步讀取操作。由於是異步操作,所以必須對讀取的結果進行判斷。 ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { //如果讀取到了消息,則對消息進行解碼,最后打印結果。執行完成后將stop置為true,線程退出循環。 readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); this.stop = true; } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else ; // 讀到0字節,忽略 } } } //首先對SocketChannel的connect()操作進行判斷,如果連接成功, //則將SocketChannel注冊到多路復用器Selector上,注冊SelectionKey.OP_READ, //如果沒有直接連接成功,則說明服務端沒有返回TCP握手應答消息, //但這並不代表連接失敗,我們需要將SocketChannel注冊到多路復用器Selector上, //注冊SelectionKey.OP_CONNECT,當服務端返回TCP syn-ack消息后, //Selector就能夠輪詢到這個SocketChannel處於連接就緒狀態。 private void doConnect() throws IOException { // 如果直接連接成功,則注冊到多路復用器上,發送請求消息,讀應答 if (socketChannel.connect(new InetSocketAddress(host, port))) { socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else { socketChannel.register(selector, SelectionKey.OP_CONNECT); } } //構造請求消息體,然后對其編碼,寫入到發送緩沖區中,最后調用SocketChannel的write方法進行發送。 //由於發送是異步的,所以會存在“半包寫”問題。最后通過hasRemaining()方法對發送結果進行判斷, //如果緩沖區中的消息全部發送完成,打印"Send order 2 server succeed." private void doWrite(SocketChannel sc) throws IOException { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) System.out.println("Send order 2 server succeed."); } }
我們發現NIO編程難度確實比同步阻塞BIO大很多,我們的NIO例程並沒有考慮“半包讀”和“半包寫”,如果加上這些,代碼將會更加復雜。NIO代碼既然這么復雜,為什么它的應用卻越來越廣泛呢,使用NIO編程的優點總結如下。
(1)客戶端發起的連接操作是異步的,可以通過在多路復用器注冊OP_CONNECT等待后續結果,不需要像之前的客戶端那樣被同步阻塞。
(2)SocketChannel的讀寫操作都是異步的,如果沒有可讀寫的數據它不會同步等待,直接返回,這樣I/O通信線程就可以處理其他的鏈路,不需要同步等待這個鏈路可用。
(3)線程模型的優化:由於JDK的Selector在Linux等主流操作系統上通過epoll實現,它沒有連接句柄數的限制(只受限於操作系統的最大句柄數或者對單個進程的句柄限制),這意味着一個Selector線程可以同時處理成千上萬個客戶端連接,而且性能不會隨着客戶端的增加而線性下降,因此,它非常適合做高性能、高負載的網絡服務器。
JDK1.7升級了NIO類庫,升級后的NIO類庫被稱為NIO2.0,引人注目的是,Java正式提供了異步文件I/O操作,同時提供了與UNIX網絡編程事件驅動I/O對應的AIO。