轉載:https://mp.weixin.qq.com/s/YIcXaH7AWLJbPjnTUwnlyQ
首先我們分別畫圖來看看,BIO、NIO、AIO,分別是什么?
BIO:傳統的網絡通訊模型,就是BIO,同步阻塞IO
它其實就是服務端創建一個ServerSocket, 然后就是客戶端用一個Socket去連接服務端的那個ServerSocket, ServerSocket接收到了一個的連接請求就創建一個Socket和一個線程去跟那個Socket進行通訊。
接着客戶端和服務端就進行阻塞式的通信,客戶端發送一個請求,服務端Socket進行處理后返回響應。
在響應返回前,客戶端那邊就阻塞等待,上門事情也做不了。
這種方式的缺點:每次一個客戶端接入,都需要在服務端創建一個線程來服務這個客戶端
這樣大量客戶端來的時候,就會造成服務端的線程數量可能達到了幾千甚至幾萬,這樣就可能會造成服務端過載過高,最后崩潰死掉。
BIO模型圖:
Acceptor:
傳統的IO模型的網絡服務的設計模式中有倆種比較經典的設計模式:一個是多線程, 一種是依靠線程池來進行處理。
如果是基於多線程的模式來的話,就是這樣的模式,這種也是Acceptor線程模型。
NIO:
NIO是一種同步非阻塞IO, 基於Reactor模型來實現的。
其實相當於就是一個線程處理大量的客戶端的請求,通過一個線程輪詢大量的channel,每次就獲取一批有事件的channel,然后對每個請求啟動一個線程處理即可。
這里的核心就是非阻塞,就那個selector一個線程就可以不停輪詢channel,所有客戶端請求都不會阻塞,直接就會進來,大不了就是等待一下排着隊而已。
這里面優化BIO的核心就是,一個客戶端並不是時時刻刻都有數據進行交互,沒有必要死耗着一個線程不放,所以客戶端選擇了讓線程歇一歇,只有客戶端有相應的操作的時候才發起通知,創建一個線程來處理請求。
NIO:模型圖
Reactor模型:
AIO
AIO:異步非阻塞IO,基於Proactor模型實現。
每個連接發送過來的請求,都會綁定一個Buffer,然后通知操作系統去完成異步的讀,這個時間你就可以去做其他的事情
等到操作系統完成讀之后,就會調用你的接口,給你操作系統異步讀完的數據。這個時候你就可以拿到數據進行處理,將數據往回寫
在往回寫的過程,同樣是給操作系統一個Buffer,讓操作系統去完成寫,寫完了來通知你。
這倆個過程都有buffer存在,數據都是通過buffer來完成讀寫。
這里面的主要的區別在於將數據寫入的緩沖區后,就不去管它,剩下的去交給操作系統去完成。
操作系統寫回數據也是一樣,寫到Buffer里面,寫完后通知客戶端來進行讀取數據。
AIO:模型圖
聊完了BIO,NIO,AIO的區別之后,現在我們再結合這三個模型來說下同步和阻塞的一些問題。
同步阻塞
為什么說BIO是同步阻塞的呢?
其實這里說的不是針對網絡通訊模型而言,而是針對磁盤文件讀寫IO操作來說的。
因為用BIO的流讀寫文件,例如FileInputStrem,是說你發起個IO請求直接hang死,卡在那里,必須等着搞完了這次IO才能返回。
同步非阻塞:
為什么說NIO為啥是同步非阻塞?
因為無論多少客戶端都可以接入服務端,客戶端接入並不會耗費一個線程,只會創建一個連接然后注冊到selector上去,這樣你就可以去干其他你想干的其他事情了
一個selector線程不斷的輪詢所有的socket連接,發現有事件了就通知你,然后你就啟動一個線程處理一個請求即可,這個過程的話就是非阻塞的。
但是這個處理的過程中,你還是要先讀取數據,處理,再返回的,這是個同步的過程。
異步非阻塞
為什么說AIO是異步非阻塞?
通過AIO發起個文件IO操作之后,你立馬就返回可以干別的事兒了,接下來你也不用管了,操作系統自己干完了IO之后,告訴你說ok了
當你基於AIO的api去讀寫文件時, 當你發起一個請求之后,剩下的事情就是交給了操作系統
當讀寫完成后, 操作系統會來回調你的接口, 告訴你操作完成
在這期間不需要等待, 也不需要去輪詢判斷操作系統完成的狀態,你可以去干其他的事情。
同步就是自己還得主動去輪詢操作系統,異步就是操作系統反過來通知你。所以來說, AIO就是異步非阻塞的。
NIO核心組件詳細講解
學習NIO先來搞清楚一些相關的概念,NIO通訊有哪些相關組件,對應的作用都是什么,之間有哪些聯系?
多路復用機制實現Selector
首先我們來了解下傳統的Socket網絡通訊模型。
傳統Socket通訊原理圖
為什么傳統的socket不支持海量連接?
每次一個客戶端接入,都是要在服務端創建一個線程來服務這個客戶端的
這會導致大量的客戶端的時候,服務端的線程數量可能達到幾千甚至幾萬,幾十萬,這會導致服務器端程序負載過高,不堪重負,最終系統崩潰死掉。
接着來看下NIO是如何基於Selector實現多路復用機制支持的海量連接。
NIO原理圖
多路復用機制是如何支持海量連接?
NIO的線程模型對Socket發起的連接不需要每個都創建一個線程,完全可以使用一個Selector來多路復用監聽N多個Channel是否有請求,該請求是對應的連接請求,還是發送數據的請求
這里面是基於操作系統底層的Select通知機制的,一個Selector不斷的輪詢多個Channel,這樣避免了創建多個線程
只有當莫個Channel有對應的請求的時候才會創建線程,可能說1000個請求, 只有100個請求是有數據交互的
這個時候可能server端就提供10個線程就能夠處理這些請求。這樣的話就可以避免了創建大量的線程。
NIO如何通過Buffer來緩沖數據的
NIO中的Buffer是個什么東西 ?
學習NIO,首當其沖就是要了解所謂的Buffer緩沖區,這個東西是NIO里比較核心的一個部分
一般來說,如果你要通過NIO寫數據到文件或者網絡,或者是從文件和網絡讀取數據出來此時就需要通過Buffer緩沖區來進行。Buffer的使用一般有如下幾個步驟:
寫入數據到Buffer,調用flip()方法,從Buffer中讀取數據,調用clear()方法或者compact()方法。
Buffer中對應的Position, Mark, Capacity,Limit都啥?
-
capacity:緩沖區容量的大小,就是里面包含的數據大小。
-
limit:對buffer緩沖區使用的一個限制,從這個index開始就不能讀取數據了。
-
position:代表着數組中可以開始讀寫的index, 不能大於limit。
-
mark:是類似路標的東西,在某個position的時候,設置一下mark,此時就可以設置一個標記
后續調用reset()方法可以把position復位到當時設置的那個mark上。去把position或limit調整為小於mark的值時,就丟棄這個mark
如果使用的是Direct模式創建的Buffer的話,就會減少中間緩沖直接使用DirectorBuffer來進行數據的存儲。
如何通過Channel和FileChannel讀取Buffer數據寫入磁盤的
NIO中,Channel是什么?
Channel是NIO中的數據通道,類似流,但是又有些不同
Channel既可從中讀取數據,又可以從寫數據到通道中,但是流的讀寫通常是單向的。
Channel可以異步的讀寫。Channel中的數據總是要先讀到一個Buffer中,或者從緩沖區中將數據寫到通道中。
FileChannel的作用是什么?
Buffer有不同的類型,同樣Channel也有好幾個類型。
-
FileChannel
-
DatagramChannel
-
SocketChannel
-
ServerSocketChannel
這些通道涵蓋了UDP 和 TCP 網絡IO,以及文件IO。而FileChannel就是文件IO對應的管道, 在讀取文件的時候會用到這個管道。
下面給一個簡單的NIO實現讀取文件的Demo代碼
NIOServer端和Client端代碼案例
最后,給大家一個NIO客戶端和服務端示例代碼,簡單感受下NIO通訊的方式。
-
NIO通訊Client端
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; public class NIOClient { public static void main(String[] args) { for(int i = 0; i < 10; i++){ new Worker().start(); } } static class Worker extends Thread { @Override public void run() { SocketChannel channel = null; Selector selector = null; try { // SocketChannel,一看底層就是封裝了一個Socket channel = SocketChannel.open(); // SocketChannel是連接到底層的Socket網絡 // 數據通道就是負責基於網絡讀寫數據的 channel.configureBlocking(false); channel.connect(new InetSocketAddress("localhost", 9000)); // 后台一定是tcp三次握手建立網絡連接 selector = Selector.open(); // 監聽Connect這個行為 channel.register(selector, SelectionKey.OP_CONNECT); while(true){ // selector多路復用機制的實現 循環去遍歷各個注冊的Channel selector.select(); Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator(); while(keysIterator.hasNext()){ SelectionKey key = (SelectionKey) keysIterator.next(); keysIterator.remove(); // 如果發現返回的時候一個可連接的消息 走到下面去接受數據 if(key.isConnectable()){ channel = (SocketChannel) key.channel(); if(channel.isConnectionPending()){ channel.finishConnect(); // 接下來對這個SocketChannel感興趣的就是人家server給你發送過來的數據了 // READ事件,就是可以讀數據的事件 // 一旦建立連接成功了以后,此時就可以給server發送一個請求了 ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("你好".getBytes()); buffer.flip(); channel.write(buffer); } channel.register(selector, SelectionKey.OP_READ); } // 這里的話就時候名服務器端返回了一條數據可以讀了 else if(key.isReadable()){ channel = (SocketChannel) key.channel(); // 構建一個緩沖區 ByteBuffer buffer = ByteBuffer.allocate(1024); // 把數據寫入buffer,position推進到讀取的字節數數字 int len = channel.read(buffer); if(len > 0) { System.out.println("[" + Thread.currentThread().getName() + "]收到響應:" + new String(buffer.array(), 0, len)); Thread.sleep(5000); channel.register(selector, SelectionKey.OP_WRITE); } } else if(key.isWritable()) { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("你好".getBytes()); buffer.flip(); channel = (SocketChannel) key.channel(); channel.write(buffer); channel.register(selector, SelectionKey.OP_READ); } } } } catch (Exception e) { e.printStackTrace(); } finally{ if(channel != null){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } if(selector != null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } } } }
-
NIO通訊Server端
} } } NIO通訊Server端 import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; public class NIOServer { private static Selector selector; private static LinkedBlockingQueue<SelectionKey> requestQueue; private static ExecutorService threadPool; public static void main(String[] args) { init(); listen(); } private static void init(){ ServerSocketChannel serverSocketChannel = null; try { selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); // 將Channel設置為非阻塞的 NIO就是支持非阻塞的 serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(9000), 100); // ServerSocket,就是負責去跟各個客戶端連接連接請求的 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); // 就是僅僅關注這個ServerSocketChannel接收到的TCP連接的請求 } catch (IOException e) { e.printStackTrace(); } requestQueue = new LinkedBlockingQueue<SelectionKey>(500); threadPool = Executors.newFixedThreadPool(10); for(int i = 0; i < 10; i++) { threadPool.submit(new Worker()); } } private static void listen() { while(true){ try{ selector.select(); Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator(); while(keysIterator.hasNext()){ SelectionKey key = (SelectionKey) keysIterator.next(); // 可以認為一個SelectionKey是代表了一個請求 keysIterator.remove(); handleRequest(key); } } catch(Throwable t){ t.printStackTrace(); } } } private static void handleRequest(SelectionKey key) throws IOException, ClosedChannelException { // 后台的線程池中的線程處理下面的代碼邏輯 SocketChannel channel = null; try{ // 如果說這個Key是一個acceptable,也就是一個連接請求 if(key.isAcceptable()){ ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); // 調用accept這個方法 就可以進行TCP三次握手了 channel = serverSocketChannel.accept(); // 握手成功的話就可以獲取到一個TCP連接好的SocketChannel channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); // 僅僅關注這個READ請求,就是人家發送數據過來的請求 } // 如果說這個key是readable,是個發送了數據過來的話,此時需要讀取客戶端發送過來的數據 else if(key.isReadable()){ channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); // 通過底層的socket讀取數據,寫buffer中,position可能就會變成21之類的 // 你讀取到了多少個字節,此時buffer的position就會變成多少 if(count > 0){ // 准備讀取剛寫入的數據,就是將limit設置為當前position,將position設置為0,丟棄mark。一般就是先寫入數據,接着准備從0開始讀這段數據,就可以用flip // position = 0,limit = 21,僅僅讀取buffer中,0~21這段剛剛寫入進去的數據 buffer.flip(); System.out.println("服務端接收請求:" + new String(buffer.array(), 0, count)); channel.register(selector, SelectionKey.OP_WRITE); } } else if(key.isWritable()) { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("收到".getBytes()); buffer.flip(); channel = (SocketChannel) key.channel(); channel.write(buffer); channel.register(selector, SelectionKey.OP_READ); } } catch(Throwable t){ t.printStackTrace(); if(channel != null){ channel.close(); } } } // 創建一個線程任務來執行 static class Worker implements Runnable { @Override public void run() { while(true) { try { SelectionKey key = requestQueue.take(); handleRequest(key); } catch (Exception e) { e.printStackTrace(); } } } private void handleRequest(SelectionKey key) throws IOException, ClosedChannelException { // 假設想象一下,后台有個線程池獲取到了請求 // 下面的代碼,都是在后台線程池的工作線程里在處理和執行 SocketChannel channel = null; try{ // 如果說這個key是個acceptable,是個連接請求的話 if(key.isAcceptable()){ System.out.println("[" + Thread.currentThread().getName() + "]接收到連接請求"); ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); // 調用accept方法 和客戶端進行三次握手 channel = serverSocketChannel.accept(); System.out.println("[" + Thread.currentThread().getName() + "]建立連接時獲取到的channel=" + channel); // 如果三次握手成功了之后,就可以獲取到一個建立好TCP連接的SocketChannel // 這個SocketChannel大概可以理解為,底層有一個Socket,是跟客戶端進行連接的 // 你的SocketChannel就是聯通到那個Socket上去,負責進行網絡數據的讀寫的 // 設置為非阻塞的 channel.configureBlocking(false); // 關注的是Reade請求 channel.register(selector, SelectionKey.OP_READ); } // 如果說這個key是readable,是個發送了數據過來的話,此時需要讀取客戶端發送過來的數據 else if(key.isReadable()){ channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); // 通過底層的socket讀取數據,寫入buffer中,position可能就會變成21之類的 // 你讀取到了多少個字節,此時buffer的position就會變成多少 System.out.println("[" + Thread.currentThread().getName() + "]接收到請求"); if(count > 0){ buffer.flip(); // position = 0,limit = 21,僅僅讀取buffer中,0~21這段剛剛寫入進去的數據 System.out.println("服務端接收請求:" + new String(buffer.array(), 0, count)); channel.register(selector, SelectionKey.OP_WRITE); } } else if(key.isWritable()) { ByteBuffer buffer = ByteBuffer.allocate(1024); buffer.put("收到".getBytes()); buffer.flip(); channel = (SocketChannel) key.channel(); channel.write(buffer); channel.register(selector, SelectionKey.OP_READ); } } catch(Throwable t){ t.printStackTrace(); if(channel != null){ channel.close(); } } } } }
總結:
通過本篇文章,主要是分析了常見的NIO的一些問題:
-
BIO, NIO, AIO各自的特點
-
什么同步阻塞,同步非阻塞,異步非阻塞
-
為什么NIO能夠應對支持海量的請求
-
NIO相關組件的原理
-
NIO通訊的簡單案例
本文僅僅是介紹了一下網絡通訊的一些原理,應對面試來講解
NIO通訊其實有很多的的東西,在中間件的研發過程中使用的頻率還是非常高的,后續有機會再和大家分享交流。