前言
學習Netty編程,避免不了從了解Java 的NIO編程開始,這樣才能通過比較讓我們對Netty有更深的了解,才能知道Netty大大的好處。傳統的NIO編程code起來比較麻煩,甚至有遺留Bug,但其中最基本的思想是一致的。
參考資料《Netty In Action》、《Netty權威指南》(有需要的小伙伴可以評論或者私信我)
博文中所有的代碼都已上傳到Github,歡迎Star、Fork
一、NIO 核心組件
NIO,有人稱之為New I/O,這是官方叫法。但是由於之前老的I/O類庫是阻塞I/O,所以此時的NIO也可以是非阻塞I/O(Non-block I/O)。
與Socket類和ServerSocket類相對應,NIO提供了SocketChannel和ServerSocketChannel不同的套接字通道實現,可以支持阻塞和非阻塞兩種模式。
NIO庫是JDK 1.4中引入的,彌補了原來同步阻塞I/O的不足。這是因為提供了高速處理、面向塊的I/O,主要包括:緩沖區Buffer、通道Channel、多路復用器Selector。
1.緩沖區Buffer
在NIO庫中,所有的數據都是緩沖區處理的,讀取數據時直接讀取緩沖區;在寫入數據時,寫入到緩沖區。在任何時候訪問NIO中的數據,都是通過緩沖區進行操作。實際上緩沖區是一個數組,有不同類型的數組,通常是字節數組(ByteBuffer),但它不僅僅是一個數組,緩沖區提供對數據的結構化訪問以及維護讀寫位置(limit)等信息。
2.通道Channel
網絡數據通過Channel雙向讀取和寫入(全雙工),這點不同於Stream(InputStream/OutputStream或者其子類)一個方向上移動。
Channel可以分類兩個大類:用於網絡讀寫的SelectableChannel和用於文件操作的FileChannel。
ServerSocketChannel和SocketChannel都是SelectableChannel的子類。
3.多路復用器Selector
多路復用器提供選擇已經就緒的任務的能力,具體來說:Selector會不斷地輪詢注冊在其上的Channel,如果某個Channel上面發生讀寫事件,就表明這個Channel處於就緒狀態,會被Selector輪詢出來,通過SelectionKey可以獲取就緒的Channel的集合,進行后續的I/O操作。這樣就意味着只需要一個線程負責Selector輪詢,就可以接入成千上萬的客戶端。
多路復用器Selector是最核心的組件,在Netty編程中也是尤為重要的,但是這里不具體展開,到時候分析Netty源碼的時候會具體介紹。
二、NIO服務端
1.服務端序列圖
先放出如下的NIO服務端序列圖,結合序列圖給具體的步驟如下,之后的示例代碼中也會有詳細注釋
第一步:打開ServerSocketChannel,用於監聽客戶端的連接,是所有客戶端連接的父管道。
第二步:綁定監聽端口,設置連接為非阻塞模式
第三步:創建Reactor線程,創建多路復用器並啟動線程
第四步:將ServerSocketChannel注冊到Reactor線程的多路復用器Selector上,監聽ACCPET事件。
第五步:多路復用器在線程run方法在無線循環體內輪詢准備就緒的Key。
第六步:多路復用器監聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手,建立物理鏈路。
第七步:設置客戶端鏈路為非阻塞模式
第八步:將新接入的客戶端注冊到Reactor線程的多路復用器上,監聽讀操作,讀取客戶端發送的網絡消息。
第九步:異步讀取客戶端請求消息到緩沖區
第十步:對ByteBuffer進行編解碼,如果有半包消息指針reset,繼續讀取后續的報文,將解碼成功的消息封裝成Task,交給業務線程池中,進行業務處理
第十一步:將對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端。
2.服務端代碼示例
(1)多路復用服務MultiplexerTimeServer
public class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private volatile boolean stop; /** * 初始化多路復用器、綁定監聽端口 * * @param port */ public MultiplexerTimeServer(int port) { try { // 1. 打開ServerSocketChannel,監聽客戶端連接 servChannel = ServerSocketChannel.open(); // 2. 綁定監聽端口,設置連接為非阻塞模式 servChannel.socket().bind(new InetSocketAddress(port), 1024); servChannel.configureBlocking(false); // 3. 創建Reactor線程,創建多路復用並啟動線程 selector = Selector.open(); // 4. 將ServerSocketChannel注冊到Reactor線程的多路了復用器Selector,監聽ACCEPT事件 servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The time server is start in port : " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } @Override public void run() { while (!stop) { try { selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; // 循環輪詢准備就緒的Key while (it.hasNext()) { key = it.next(); it.remove(); try { // deal with I/O event handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Throwable t) { t.printStackTrace(); } } // 多路復用器關閉后,所有注冊在上面的Channel和Pipe等資源都會被自動去注冊並關閉,所以不需要重復釋放資源 if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 處理新接入的請求消息 if (key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); // 6. 監聽到新的客戶端接入,處理新的接入請求我,完成TCP三次握手-->建立鏈路 SocketChannel sc = ssc.accept(); // 7. 設置客戶端鏈路為非阻塞模式 sc.configureBlocking(false); sc.socket().setReuseAddress(true); // 8. 將新接入的客戶端連接注冊到Reactor線程的多路復用器上,監聽讀操作,讀取客戶端發送的消息 sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { // a channel is ready for reading SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); // 9. 異步讀取客戶端請求消息到緩沖區 int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); // 10. 讀取解碼報文 byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); String currentTime = "QUERY TIME ORDER" .equalsIgnoreCase(body) ? new java.util.Date( System.currentTimeMillis()).toString() : "BAD ORDER"; doWrite(sc, currentTime); } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else { // 讀到0字節,忽略 } } } } private void doWrite(SocketChannel channel, String response) throws IOException { if (response != null && response.trim().length() > 0) { byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }
(2)NIO服務TimeServer
public class TimeServer { public static void main(String[] args) { int port = 8084; MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer, "NIO-TimeServer").start(); } }
(3)開啟服務端
運行TimeServer:
使用netstat命令查看是否對8084端口開啟監聽
三、NIO客戶端
1.客戶端序列圖
第一步:打開SocketChannel,綁定客戶端本地地址(可選,默認系統會隨機會分配一個可用的本地地址)
第二步:設置SocketChannel為非阻塞模式,同時設置客戶端連接的TCP參數
第三步:異步連接服務端
第四步:判斷是否連接成功,如果連接成功則直接注冊讀狀態位到多路復用中。如果沒有當前沒有連接成功(異步連接,返回false,說明客戶端已經發送sync包,服務端沒有返回ack包,物理鏈路還沒建立)
第五步:向Reactor線程的多路復用OP_CONNECT狀態位,監聽服務端的TCP ACK應答
第六步:創建Reactor線程,創建多路復用器並啟動線程。
第七步:多路復用在線程run方法無線循環體內輪詢准備就緒的Key
第八步:接收connect事件進行處理
第九步:判斷連接結果,如果連接成功,注冊讀事件到多路復用器,
第十步:注冊讀事件到多路復用器
第十一步:異步讀客戶端請求消息到緩沖區
第十二步:對ByteBuffer進行編解碼
第十三步:將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端。
2.客戶端示例代碼
(1)客戶端處理TimeClientHandle
public class TimeClientHandle implements Runnable { private String host; private int port; private Selector selector; private SocketChannel socketChannel; private volatile boolean stop; public TimeClientHandle(String host, int port) { this.host = host == null ? "127.0.0.1" : host; this.port = port; try { // 創建多路復用器並打開 selector = Selector.open(); // 1.打開SocketChannel, socketChannel = SocketChannel.open(); // 2.設置SocketChannel非阻塞模式, 這里不設置TCP參數 socketChannel.configureBlocking(false); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } @Override public void run() { try { // 連接服務端 doConnect(); } catch (IOException e) { e.printStackTrace(); System.exit(1); } while (!stop) { try { // 6. 多路復用器在線程run方法的無限循環體內輪詢准備就緒的Key selector.select(1000); Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectedKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (Exception e) { e.printStackTrace(); System.exit(1); } } // 多路復用器關閉后,所有注冊在上面的Channel和Pipe等資源都會被自動去注冊並關閉,所以不需要重復釋放資源 if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } /** * 處理客戶端輸入 * * @param key * @throws IOException */ private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { // 判斷是否連接成功 SocketChannel sc = (SocketChannel) key.channel(); // 7. 接收connect事件進行處理 if (key.isConnectable()) { // 8. 如果連接完成則注冊讀事件到多路復用器 if (sc.finishConnect()) { sc.register(selector, SelectionKey.OP_READ); doWrite(sc); } else { System.exit(1);// 連接失敗,進程退出 } } if (key.isReadable()) { ByteBuffer readBuffer = ByteBuffer.allocate(1024); // 9. 異步讀客戶端請求消息到緩沖區 int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); String body = new String(bytes, "UTF-8"); System.out.println("Now is : " + body); this.stop = true; } else if (readBytes < 0) { // 對端鏈路關閉 key.cancel(); sc.close(); } else { // 讀到0字節,忽略 } } } } private void doConnect() throws IOException { // 3. 異步連接客戶端 boolean connected = socketChannel.connect(new InetSocketAddress(host, port)); if (connected) { // 4. 返回true則直接連接成功,則注冊到多路復用器上,發送請求消息,讀應答 socketChannel.register(selector, SelectionKey.OP_READ); doWrite(socketChannel); } else { // 5. 如果返回false,則說明此時鏈路還沒有建立,則注冊OP_CONNECT狀態位,監聽服務端的TCP ACK應答 socketChannel.register(selector, SelectionKey.OP_CONNECT); } } private void doWrite(SocketChannel sc) throws IOException { byte[] req = "QUERY TIME ORDER".getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); sc.write(writeBuffer); if (!writeBuffer.hasRemaining()) { System.out.println("Send order to server succeed."); } } }
(2)NIO客戶端TimeClient
public class TimeClient { public static void main(String[] args) { int port = 8084; new Thread(new TimeClientHandle("127.0.0.1", port), "NIO-TimeClient").start(); } }
(3)運行客戶端
運行TimeClient:
此時服務端Console:
四、NIO編程的優點
1.NIO編程的優勢與缺點
(1)客戶端發起的連接操作是異步的
可以通過在多路復用器注冊OP_CONNECT等待后續結果,不需要像之前的客戶端被同步阻塞。
(2)SocketChannel的讀寫操作都是異步的
如果沒有可讀寫數據不會等待直接返回,I/O通信線程就可以處理其他鏈路,不需要同步等待鏈路可用。
(3)線程模型的優化
Selector在Linux等主流系統上是通過epoll實現,沒有連接句柄的限制,意味着一個Selector可以處理成千上萬的客戶端連接,而且性能不會降低
(4)同步非阻塞通信
NIO需要開啟線程不斷循環去獲取操作結果,看起來不是很明智,真正有效的應該是基於異步回調獲取結果的,JDK 1.7以后就提供了異步非堵塞的IO操作方式,所以人們叫它 AIO(Asynchronous IO),異步 IO 是基於事件和回調機制實現的。
2.Selector基本工作原理
首先,需要將 Channel 注冊到 Selector 中,這樣 Selector 才知道哪些 Channel 是它需要管理的。之后,Selector 會不斷地輪詢注冊在其上的 Channel 。如果某個 Channel 上面發生了讀或者寫事件,這個 Channel 就處於就緒狀態,會被 Selector 輪詢出來,然后通過 SelectionKey 可以獲取就緒 Channel 的集合,進行后續的 I/O 操作。
關於Selector操作的代碼示例模板:
// 創建 Selector Selector selector = Selector.open(); channel.configureBlocking(false); // 注冊 Channel 到 Selector 中 SelectionKey key = channel.register(selector, SelectionKey.OP_READ); while(true) { // 通過 Selector 選擇 Channel int readyChannels = selector.select(); if (readyChannels == 0) { continue; } // 獲得可操作的 Channel Set selectedKeys = selector.selectedKeys(); // 遍歷 SelectionKey 數組 Iterator<SelectionKey> keyIterator = selectedKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey key = keyIterator.next(); if (key.isAcceptable()) { // a connection was accepted by a ServerSocketChannel. } else if (key.isConnectable()) { // a connection was established with a remote server. } else if (key.isReadable()) { // a channel is ready for reading } else if (key.isWritable()) { // a channel is ready for writing } // 移除 keyIterator.remove(); } }