一,五種IO模型:
一個IO操作可以分為兩個步驟:發起IO請求(判斷此時IO是否允許將網卡中的數據往內核緩沖區中讀或將內核中的往網卡中寫)(可寫)
實際的IO操作(從內核往應用程序讀或從應用程序往網卡中寫)(真正的寫 )
例如:
1、操作系統的一次寫操作分為兩步:第一步,將數據從用戶空間拷貝到系統空間;第二步,從系統空間往網卡寫。
2、一次讀操作也分為兩步:第一步,將數據從網卡拷貝到系統空間;第二步,將數據從系統空間拷貝到用戶空間。
對於讀操作來說,阻塞IO和非阻塞IO的區別在於第一步:發起IO請求是否會被阻塞,如果阻塞直到完成那么就是傳統的阻塞IO,如果不阻塞,那么就是非阻塞IO。
而同步IO和異步IO的區別就在於第二步:如果實際的IO讀寫阻塞請求進程,因為讀寫過程是阻塞的,那么就是同步IO,因此阻塞IO、非阻塞IO、IO復用、信號驅動IO都是同步IO,如果實際的IO讀寫過程不阻塞,也就是操作系統做完IO兩個階段的操作再將結果返回,那么就是異步IO。為了方便理解可結合下圖來看。
1.1 同步阻塞
模型特點 :在Linux中,對於一次讀取IO的操作,數據並不會直接拷貝到程序的程序緩沖區。通常包括兩個不同階段:
- 等待數據准備好,到達內核緩沖區;
- 從內核向進程復制數據。
對於一個套接字上的輸入操作,第一步通常涉及等待數據從網絡中到達。當所有等待分組到達時,它被復制到內核中的某個緩沖區。第二步就是把數據從內核緩沖區復制到應用程序緩沖區。
阻塞I/O(blocking I/O)模型,進程調用recvfrom,其系統調用直到數據報到達且被拷貝到應用進程的緩沖區中或者發生錯誤才返回。進程從調用recvfrom開始到它返回的整段時間內是被阻塞的。
故事描述:
小明從家里面先到演唱會現場問售票業務員買票,但是票還沒出來,三天以后才出來,小明直接打了個地鋪睡在舉辦商售票大廳,一直等票出來,然后買票。
1.2 同步非阻塞
模型特點 :
與阻塞式I/O不同的是,非阻塞的recvform系統調用之后,進程並沒有被阻塞,內核馬上返回給進程,如果數據還沒准備好,此時會返回一個error(EAGAIN 或 EWOULDBLOCK)。進程在返回error之后,可以處理其他的業務邏輯,過會兒再發起recvform系統調用。采用輪詢的方式檢查內核數據,直到數據准備好。再拷貝數據到進程,進行數據處理。 在linux下,可以通過設置socket套接字選項使其變為非阻塞。
當一個應用進程像這樣對一個非阻塞描述字循環調用recvfrom時,我們稱之為輪詢(polling)。應用進程持續輪詢內核,以查看某個操作是否就緒。
故事描述:
小明從家里面先到演唱會現場問售票業務員買票,但是票還沒出來,然后小明走了,辦理其他事情去了,然后過了2個小時,又去舉辦商售票大廳買票來了,如果票還沒有出來,小明又先去辦其他事情了,重復上面的操作,直到有票可以買。
1.3 I/O復用(事件驅動)
模型特點 :
IO 多路復用的好處就在於單個進程就可以同時處理多個網絡連接的IO。它的基本原理就是不再由應用程序自己監視連接,取而代之由內核替應用程序監視文件描述符。以select為例,當用戶進程調用了select,那么整個進程會被阻塞,而同時,kernel會“監視”所有select負責的socket,當任何一個socket中的數據准備好了,select就會返回。這個時候用戶進程再調用read操作,將數據從內核拷貝到用戶進程
從流程上來看,使用select函數進行IO請求和同步阻塞模型沒有太大的區別,甚至還多了添加監視socket,以及調用select函數的額外操作,效率更差。但是,使用select以后最大的優勢是用戶可以在一個線程內同時處理多個socket的IO請求。用戶可以注冊多個socket,然后不斷地調用select讀取被激活的socket,即可達到在同一個線程內同時處理多個IO請求的目的。而在同步阻塞模型中,必須通過多線程的方式才能達到這個目的。
IO多路復用方式允許單線程內處理多個IO請求,但是每個IO請求的過程還是阻塞的(在select函數上阻塞),平均時間甚至比同步阻塞IO模型還要長。如果用戶線程只注冊自己感興趣的socket或者IO請求,然后去做自己的事情,等到數據到來時再進行處理,則可以提高CPU的利用率。
由於select函數是阻塞的,因此多路IO復用模型也被稱為異步阻塞IO模型。注意,這里的所說的阻塞是指select函數執行時線程被阻塞,而不是指socket。一般在使用IO多路復用模型時,socket都是設置為NONBLOCK的,不過這並不會產生影響,因為用戶發起IO請求時,數據已經到達了,用戶線程一定不會被阻塞。IO多路復用是最常使用的IO模型,但是其異步程度還不夠“徹底”,因為它使用了會阻塞線程的select系統調用。因此IO多路復用只能稱為異步阻塞IO,而非真正的異步IO。
故事描述:
小明想買票看演唱會,都直接給黃牛(selector/epoll)打電話了,說幫我留意買個票,票買了通知我,我自己去取(當我接到黃牛的電話時,我需要花費整個路成的時間去讀這個數據,買拿這個票),那么票沒出來之前,小明完全可以做自己的事情。
1.4 信號I/O
模型特點 :
允許Socket進行信號驅動IO,並注冊一個信號處理函數,進程繼續運行並不阻塞。當數據准備好時,進程會收到一個SIGIO信號,可以在信號處理函數中調用I/O操作函數處理數據。
故事描述:
小明想買票看演唱會,給舉辦商售票業務員說,給給你留個電話,有票了請你給我打個電話通知一下(是看人家操作系統提不提供這種功能,Linux提供,windows沒有這種機制),我自己再來買票(小明完全可以做自己的事情,但是票還是需要小明自己去拿的)。
1.5 異步非阻塞:
模型特點:
上述四種IO模型都是同步的。相對於同步IO,異步IO不是順序執行。用戶進程進行aio_read系統調用之后,就可以去處理其他的邏輯了,無論內核數據是否准備好,都會直接返回給用戶進程,不會對進程造成阻塞。等到數據准備好了,內核直接復制數據到進程空間,然后從內核向進程發送通知,此時數據已經在用戶空間了,可以對數據進行處理了。
在 Linux 中,通知的方式是 “信號”,分為三種情況:
- 如果這個進程正在用戶態處理其他邏輯,那就強行打斷,調用事先注冊的信號處理函數,這個函數可以決定何時以及如何處理這個異步任務。由於信號處理函數是突然闖進來的,因此跟中斷處理程序一樣,有很多事情是不能做的,因此保險起見,一般是把事件 “登記” 一下放進隊列,然后返回該進程原來在做的事。
- 如果這個進程正在內核態處理,例如以同步阻塞方式讀寫磁盤,那就把這個通知掛起來了,等到內核態的事情忙完了,快要回到用戶態的時候,再觸發信號通知。
- 如果這個進程現在被掛起了,例如陷入睡眠,那就把這個進程喚醒,等待CPU調度,觸發信號通知。
故事描述:
小明想買票看演唱會,給舉辦商售票業務員說(異步非阻塞i/o)打電話了,給你留個地址,有票了請通知快遞員,把這張票送到這個地址來,當小明聽到敲門聲,看見快遞員,就知道票好了,而且指導票好了的時候,票已經到他手上了,票不用小明自己去取(應用不用自己再去read數據了)。
2、select(Java)、poll(c++)、epoll(c++)
Linux支持IO多路復用的系統調用有select、poll、epoll,這些調用都是內核級別的。但select、poll、epoll本質上都是同步I/O,先是block住等待就緒的socket,再是block住將數據從內核拷貝到用戶內存。
3、兩種I/O多路復用模式:Reactor和Proactor
在這兩種模式下的事件多路分離器反饋給程序的信息是不一樣的:
1.Reactor模式下說明你可以進行讀寫(收發)操作了。
2.Proactor模式下說明已經完成讀寫(收發)操作了,具體內容在給定緩沖區中,可以對這些內容進行其他操作了。
Reactor關注的是I/O操作的就緒事件,而Proactor關注的是I/O操作的完成事件
一般地,I/O多路復用機制都依賴於一個事件多路分離器(Event Demultiplexer)。分離器對象可將來自事件源的I/O事件分離出來,並分發到對應的read/write事件處理器(Event Handler)。
Reactor模式采用同步IO,而Proactor采用異步IO。
在Reactor中,事件分離器負責等待文件描述符或socket為讀寫操作准備就緒,然后將就緒事件傳遞給對應的處理器,最后由處理器負責完成實際的讀寫工作。
而在Proactor模式中,處理器或者兼任處理器的事件分離器,只負責發起異步讀寫操作。IO操作本身由操作系統來完成。傳遞給操作系統的參數需要包括用戶定義的數據緩沖區地址和數據大小,操作系統才能從中得到寫出操作所需數據,或寫入從socket讀到的數據。事件分離器捕獲IO操作完成事件,然后將事件傳遞給對應處理器。比如,在windows上,處理器發起一個異步IO操作,再由事件分離器等待IOCompletion事件。典型的異步模式實現,都建立在操作系統支持異步API的基礎之上,我們將這種實現稱為“系統級”異步或“真”異步,因為應用程序完全依賴操作系統執行真正的IO工作。
Reactor和Proactor模式的主要區別就是真正的讀取和寫入操作是有誰來完成的,Reactor中需要應用程序自己讀取或者寫入數據,而Proactor模式中,應用程序不需要進行實際的讀寫過程,它只需要從緩存區讀取或者寫入即可,操作系統會讀取緩存區或者寫入緩存區到真正的IO設備。
二, 常用網絡模型
2.1 BIO模型
BIO 全稱Block-IO 是一種同步阻塞的通信模式。我們常說的Stock IO 一般指的是BIO。是一個比較傳統的通信方式,模式簡單,使用方便。但並發處理能力低,通信耗時,依賴網速。
網絡編程的基本模型是C/S模型,即兩個進程間的通信。服務端提供IP和監聽端口,客戶端通過連接操作向服務端監聽的地址發起連接請求,通過三次握手連接,如果連接成功建立,雙方就可以通過套接字進行通信。而傳統的同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功后,雙方通過輸入和輸出流進行同步阻塞式通信。
簡單的描述一下BIO的服務端通信模型,即BIO 設計原理:采用BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶端的連接,它接收到客戶端連接請求之后為每個客戶端創建一個新的線程進行鏈路處理,處理完成后,通過輸出流返回應答給客戶端,線程銷毀。即典型的一請求一應答通信模型。
主線程負責監聽當有新的連接的時候創建一個新的子線程處理任務。如下圖所示:
缺點:
該模型最大的問題就是缺乏彈性伸縮能力,當客戶端並發訪問量增加后,由於服務端的線程個數和客戶端並發訪問數呈1:1的正比關系,線程數量快速膨脹后,系統的性能將急劇下降,隨着訪問量的繼續增大,系統最終就死掉了。
BIO的編程流程
服務端:
1、創建ServerSocket實例
2、綁定端口
3、通過accept來監聽客戶端的連接,有客戶端連接會返回socket實例
4、進行讀寫操作
5、關閉資源
客戶端:
1、創建socket實例
2、通過connect並指定服務端的IP+端口連接服務端
3、進行讀寫操作
4、關閉資源
代碼展示:
【BIO單線程代碼】
public class BIOClent { public static void main(String[] args) { //創建Socket實例 Socket socket=new Socket(); OutputStream outputStream=null; BufferedReader bufferedReader=null; Scanner in=new Scanner(System.in); try { //連接服務器 socket.connect(new InetSocketAddress(9999)); System.out.println("客戶端連接服務器成功!"); while (true){ //發送數據給服務端,先得到socket的流,然后將數據寫到流中 outputStream = socket.getOutputStream(); System.out.println("請寫入數據:"); String str=in.nextLine(); outputStream.write((str+"\n").getBytes()); //接收服務端的消息 bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream())); String msg=bufferedReader.readLine(); System.out.println("服務端發來消息:"+msg); } } catch (IOException e) { e.printStackTrace(); }finally { try { socket.close(); outputStream.close(); bufferedReader.close(); } catch (IOException e) { e.printStackTrace(); } } } }
public class BIOServer { public static void main(String[] args) { ServerSocket serverSocket=null; Socket socket=null; BufferedReader reader=null; try{ //創建serversocket實例 serverSocket=new ServerSocket(); //綁定端口 serverSocket.bind(new InetSocketAddress(9999)); System.out.println("服務端啟動了..."); //進行監聽,等待客戶端連接,返回的是客戶端的socket實例 socket=serverSocket.accept(); System.out.println("客戶端:"+socket.getRemoteSocketAddress()+"連接上了");//獲取客戶端的ip和port //服務端來讀取消息,從socket的流中讀取數據 reader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//獲取客戶端的讀取流 String msg=null; System.out.println("正在等待客戶端發送消息...."); while ((msg=reader.readLine())!=null){ System.out.println("客戶端發來消息:"+msg); //回復消息 OutputStream write=socket.getOutputStream();//獲取服務端的輸出流 write.write(("echo:"+msg+"\n").getBytes()); } }catch (IOException e){ e.printStackTrace(); }finally { try{ if(reader!=null)reader.close(); if(socket!=null)socket.close(); if(serverSocket!=null)serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
運行結果:
【BIO多線程代碼】
public class BIOClient_2 { public static void main(String[] args) { //創建Socket實例 Socket socket=new Socket(); OutputStream outputStream=null; BufferedReader bufferedReader=null; Scanner in=new Scanner(System.in); try { //連接服務器 socket.connect(new InetSocketAddress("127.0.0.1",8888)); System.out.println("客戶端連接服務器成功!"); while (true){ //發送數據給服務端,先得到socket的流,然后將數據寫到流中 outputStream = socket.getOutputStream(); System.out.println("請寫入數據:"); String str=in.nextLine(); outputStream.write((str+"\n").getBytes()); outputStream.flush(); //接收服務端的消息 bufferedReader=new BufferedReader(new InputStreamReader(socket.getInputStream())); String msg=bufferedReader.readLine(); System.out.println("服務端發來消息:"+msg); } } catch (IOException e) { e.printStackTrace(); }finally { try { socket.close(); outputStream.close(); bufferedReader.close(); } catch (IOException e) { e.printStackTrace(); } } } }
class BIOthread extends Thread{ private Socket socket=null;//客戶端的Socket public BIOthread(Socket socket){//構造函數 this.socket=socket; } @Override public void run(){ BufferedReader reader=null; try { System.out.println(Thread.currentThread().getName()+"已啟動..."); //服務端來讀取消息,從socket的流中讀取數據 reader=new BufferedReader(new InputStreamReader(socket.getInputStream()));//獲取客戶端的讀取流 String msg=null; System.out.println("正在等待客戶端發送消息...."); while ((msg=reader.readLine())!=null){ System.out.println("客戶端發來消息:"+msg); //回復消息 OutputStream outputStream=socket.getOutputStream();//獲取服務端的輸出流 outputStream.write(("echo:"+msg+"\n").getBytes()); } }catch (Exception e){ e.printStackTrace(); }finally { if(socket!=null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } } } public class BIOServer_2 { public static void main(String[] args) { ServerSocket serverSocket=null; //服務端的ServerSocket Socket socket=null;//客戶端的Socket try{ //創建serversocket實例 serverSocket=new ServerSocket(); //綁定端口 serverSocket.bind(new InetSocketAddress(8888)); System.out.println("服務端啟動了..."); while(true){ //進行監聽,等待客戶端連接,返回的是客戶端的socket實例 socket=serverSocket.accept(); System.out.println("客戶端:"+socket.getRemoteSocketAddress()+"連接上了");//獲取客戶端的ip和port //當有客戶端連接,就為其創建一個子線程 new BIOthread(socket).start(); } }catch (IOException e){ e.printStackTrace(); }finally { try{ if(serverSocket!=null)serverSocket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
運行結果:
2.2 NIO模型
NIO 全稱New IO,也叫Non-Block IO 是一種同步非阻塞的通信模式。是面向塊編程的。NIO相關的類都被放在java.nio包及其子包下,
NIO 設計原理:
NIO 相對於BIO來說一大進步。客戶端和服務器之間通過Channel通信。NIO可以在Channel中進行讀寫操作,但使用時要與Buffer結合。如果客戶有請求時,服務器會為每個Client分配一個Channel,這些Channel都會被注冊在Selector多路復用器上。如果某個Channel中沒有事件,線程不會一直阻塞到Channel中,此時Selector通過一個線程不停的輪詢這些Channel,找出已經准備就緒的Channel執行IO操作,如果注冊到Selector上的每一個Channel都沒有事件要讀取,則線程還可以去做其他的業務。所以說NIO 是通過一個線程輪詢,實現千萬個客戶端的請求,這也就是非阻塞NIO的特點。
1 .Selector(多路復用器)
Selector 稱為選擇器 ,當然你也可以翻譯為 多路復用器 。它是Java NIO核心組件中的一個,用於檢查一個或多個NIO Channel(通道)的狀態是否處於可讀、可寫。Selector提供選擇已經就緒任務的能力:就是Selector會不斷地輪詢注冊在其上的通道(Channel),如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然后通過SelectionKey可以取得就緒的Channel集合,從而進行后續的IO操作。服務器端提供一個線程負責Selector的輪詢,就可以接入成千上萬個客戶端,這就是JDK NIO庫的巨大進步。在NIO中一個Selector對應一個線程,但一個線程可以對應多個Selector。注意:當客戶端過多時,為了提高效率,可以多分配給服務器幾個線程,如有100000個客戶端時,就可以分配給50個線程來執行,但在BIO模型中,如果有100000個客戶端,就必須分配100000線程。
Selector類是一個抽象類,常用方法如下: public abstract class Selector implements Closeable{ public static Selector open();//得到一個選擇器 public int select(Long timeout);//參數用來設置阻塞時間,即表示在一定時間內,監控所有注冊到的通道,當其中有IO操作時,將對應的SelectionKey加入到內部集合中並返回發生事件的通道個數 public int select();//不帶參數的select是阻塞型的監聽,即當其中有IO操作時,才會返回 public int selectNow();//表示的是沒有事件發生時,會立即返回 public Set<SelectionKey> selectedKeys();//從內部集合中得到所有發生事件的SelectionKey public Set<SelectionKey> keys();//返回被注冊到selector上的所有SelectionKey }
SelectionKey表示Selector和網絡通道的注冊關系,NIO所有監聽的事件都定義在SelectionKey下,而每一個被注冊的SelectionKey都是存儲到selector的Keys(hashSet<SelectionKey>)集合中的。
int OP_ACCEPT:有新的網絡可以連接,值為16.可以調用accpet()來生成socketChannel int OP_CONNECT:代表連接已經建立,值為8 int OP_READ:代表讀操作,值為1 int OP_write:代表寫操作,值為4 SelectionKey常用方法: public abstract Selector selector();//得到與之關聯的Selector對象 public abstract SelecttableChannel channel();//得到與之相關聯的Channel通道 public final Object attachment();//得到與之關聯的共享數據,如buffer public abstract SelectionKey interestOps(int ops);//設置或改變監聽的事件,如將OP_READ改為OP_WRITE public final boolean isAcceptable();//是否需要連接 public final boolean isReadable();//是否需要讀 public final boolean isWriteable();//是否可寫
2. Buffer(緩沖區)
它是NIO與BIO的一個重要區別。BIO是將數據直接寫入或讀取到Stream對象中。而NIO的數據操作都是在緩沖區Buffer中進行的。緩沖區可以理解為是一個數組,但它又與簡單數組不同,Buffer 類相比一個簡單數組的而言,它是將關於數據的數據內容和信息包含在一個單一的對象中存儲的,並且這個對象還提供了一組方法,用來操作緩沖區。
在NIO中,Buffer是一個頂級父類,它是一個抽象類,它的子類最常見的類型是ByteBuffer,另外還有CharBuffer,ShortBuffer,IntBuffer,LongBuffer,FloatBuffer,DoubleBuffer。
常用屬性:
position:下一個要讀或寫的元素的索引,每次讀或寫時都會改變這個值。在往Buffer中寫數據時會從Buffer數組中的position位置開始寫。從Buffer中讀數據時會從Buffer的position開始讀。(由於Buffer既可以寫也可以讀,為了區別,在寫完進行讀取時,必須要調用flip()方法反轉,一般是調用flip方法使limit=position;poslition=0。在寫的時候也要先調用clear()方法,使position=0,limit=capacity,否則在寫的時候會一直返回0)
limit:Buffer最多可操作的數據的位置。在往Buffer中寫數據時表示最多可寫到數據量為limit。從Buffer中讀數據時需要開啟Buffer的讀模式,讀從position到limit位置的數據。
capcity:Buffer數組的容量。即可以容納的最大的數據量,在緩沖區創建時被設定並且不能改變。
mark:備忘位置,調用mark()使得mark=position,調用reset(),恢復postion使position=mark。
補充:1.調用put()方法可以往buffer中添加數據,調用get()方法可以從buffer中獲取數據(該方法內部會自動移動指針)。
2.channel.read(buffer)也是從channel往buffer中寫數據,channel.write(buffer)是從buffer中讀取往channel中寫。
3.Channel.transferFrom(targetChannel,position,count)表示從目標通道中復制數據到當前通道,position表示起始位,count表示復制的長度。Channel.transferTo(position,count,targetChannel)表示將當前通道的數據復制到目標通道。
3.調用asReadOnlyBuffer()方法返回的buffer為只讀buffer,只能從中讀取數據,不能往里寫入數據。
4.MappedBuffer可以讓文件直接在內存中進行修改。
創建buffer的方法:
ByteBuffer allocate(int capacity):在堆上創建指定大小的緩沖
ByteBuffer allocate(int capacity):在堆外空間創建指定大小的緩沖
ByteBuffer wrap(byte[] array):通過byte數組實例創建一個實例
3. Channel(通道)
通道是雙向的,和流不同,流是單向的。NIO可以通過Channel進行數據的讀,寫和同時讀寫操作,但是不能直接訪問數據,需要和緩沖區Buffer進行交互。與Selector一起使用時,Channel必須處於非阻塞模式下。
通道分為兩大類:一類是網絡讀寫(SelectableChannel),一類是用於文件操作(FileChannel)。另外我們使用的SocketChannel和ServerSocketChannel都是SelectableChannel的子類,都是用於TCP的數據連接。在創建服務器的時候會先創建一個ServerSocketChannel(ServerSocketChannelimp),當有客戶端來請求連接時,服務器端會給客戶端生成一個SocketChannel,也就是我們所說的Channel通道。(DatagramChannel用於UDP的數據讀寫)
ServerSocketChannel的主要作用是在服務器端監聽新的客戶端的連接,然后調用accept()為客戶端生成一個SocketChannel。
SocketChannel,網絡IO通道,具體負責進行進行讀寫操作,負責把緩沖區的數據寫入通道,或者把通道里的數據讀到緩沖區。
channel與buffer之間的關系:
舉例說明: 山西有煤,我們想要,於是建立一條鐵路到山西,這條鐵路就是這里的"Channel",那么煤通過什么運過來呢?鐵路建好了,就差火車了,因此這里的火車就像是緩沖區"Buffer",火車把山西的煤運到這邊來,把我們這里的錢運過去。
【案例】:編寫一個NIO入門案例,實現服務器端和客戶端之間的數據簡單通訊(非阻塞)。
//服務端
import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; public class NIOserver { public static void main(String[] args) throws Exception{ //通過open靜態方法創建Selector多路復用器 Selector selector=Selector.open(); //通過open靜態方法創建ServerSocketChannel ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); //綁定端口號6666,在服務端監聽 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //設置為非阻塞,在NIO編程中都需要將serverSocketChannel設置為false,非阻塞的 serverSocketChannel.configureBlocking(false); //然后將serverSocketChannel注冊到seletor,它關心事件為OP_ACCEPT serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); //循環等待客戶端進行連接 while (true){ //如果沒有事件發生,則一直會while循環。只有檢測到有事件發生才會走到下一步 if(selector.select(500)==0){//輪詢檢測是否有事件(連接事件)發生 System.out.println("服務器等待中,暫無事件發生。。。"); continue; } //通過selector.selectedKeys()返回發生事件的集合,然后通過selectionKeys反向獲取通道 Set<SelectionKey> selectionKeys=selector.selectedKeys(); //先遍歷Set<SelectionKey> Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()){ //獲取到SelectionKey SelectionKey key = keyIterator.next(); if(key.isAcceptable()){//如果是OP_ACCEPT,表示有新的客戶端連接 //給該客戶端生成一個SocketChannel SocketChannel socketChannel=serverSocketChannel.accept(); //將socketChannel注冊到selector,關注的事件為OP_READ,同時給socketChannel關聯一個buffer socketChannel.register(selector,SelectionKey.OP_READ,ByteBuffer.allocate(1024)); } if(key.isReadable()){//OP_READ 讀事件 //通過key反向獲取到對應的channel SocketChannel channel = (SocketChannel) key.channel(); //獲取到該channel關聯的buffer ByteBuffer buffer=(ByteBuffer)key.attachment(); //將channel中的事件讀取到buffer channel.read(buffer); System.out.println("from 客戶端"+new String(buffer.array())); } //手動從集合中移動當前selectionKey,防止重復操作 keyIterator.remove(); } } } }
//客戶端 public class NIOClient { public static void main(String[] args) throws Exception{ Scanner in=new Scanner(System.in); //得到一個網絡通道 SocketChannel socketChannel=SocketChannel.open(); //得到一個多路復用器 Selector selector=Selector.open(); //設置非阻塞 socketChannel.configureBlocking(false); //提供服務器端的ip和端口 InetSocketAddress inetSocketAddress=new InetSocketAddress("127.0.0.1",6666); //嘗試連接服務器 if(!socketChannel.connect(inetSocketAddress)){ //將socketChannel注冊到多路復用器上 socketChannel.register(selector,SelectionKey.OP_CONNECT); } while(true){ //等待連接完成 selector.select();//阻塞的 //通過selector返回事件集 Set<SelectionKey> selectionKeys = selector.selectedKeys(); //迭代遍歷 Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { //得到一個事件集 SelectionKey selectionKey = keyIterator.next(); //對事件類型進行判斷 if (selectionKey.isConnectable()) {//連接事件 //可連接事件 SocketChannel socketchannel_1 = (SocketChannel) selectionKey.channel(); //連接操作完成 socketchannel_1.finishConnect(); System.out.println("連接成功"); socketchannel_1.register(selector,SelectionKey.OP_READ); } if(selectionKey.isReadable()) { ByteBuffer buffer=ByteBuffer.allocate(100);//根據str的情況定義一個buffer //發送數據,將buffer數據寫入channel socketChannel.read(buffer); String Smsg=new String(buffer.array()); System.out.println("服務端返回的消息:"+Smsg); if("exit".equals(Smsg)){ System.out.println("退出命令"); break;//退出條件 } } keyIterator.remove(); } //如果連接成功,就發送數據 String Cmsg=in.nextLine(); ByteBuffer buffer=ByteBuffer.allocate(100);//根據str的情況定義一個buffer buffer.put((Cmsg+"\n").getBytes());//添加數據 //因為寫是從頭開始的,所以進行讀寫切換 buffer.flip(); //發送數據,將buffer數據寫入channel socketChannel.write(buffer); } } }
上面是單線程方式的,下面就演示下使用多線程如何編碼。(客戶端代碼不變)
public class NIOserver { public static void main(String[] args) { Selector selector; ServerSocketChannel serverSocketChannel; try{ //通過open靜態方法創建Selector多路復用器 selector=Selector.open(); //通過open靜態方法創建ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); //綁定端口號6666,在服務端監聽 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //設置為非阻塞,在NIO編程中都需要將serverSocketChannel設置為false,非阻塞的 serverSocketChannel.configureBlocking(false); //然后將serverSocketChannel注冊到seletor,它關心事件為OP_ACCEPT serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); // 創建線程池 ExecutorService newFixedThreadPool =Executors.newFixedThreadPool(4); //循環等待客戶端進行連接 while (true){ //如果沒有事件發生,則一直會while循環。只有檢測到有事件發生才會走到下一步 if(selector.select(500)==0){//輪詢檢測是否有事件(連接事件,讀事件)發生 // System.out.println("服務器等待中,暫無事件發生。。。"); continue; } //通過selector.selectedKeys()返回發生事件的集合,然后通過selectionKeys反向獲取通道 Set<SelectionKey> selectionKeys=selector.selectedKeys(); //先遍歷Set<SelectionKey> Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()){ //獲取到SelectionKey SelectionKey key = keyIterator.next(); //手動從集合中移動當前selectionKey,防止重復操作 keyIterator.remove(); if(key.isAcceptable()){//如果是OP_ACCEPT,表示有新的客戶端連接 //先獲取到服務器的SeverSocketChannel ServerSocketChannel serverSocketChannel_1=(ServerSocketChannel)key.channel(); //服務端進行等待連接,並返回客戶端的SocketChannel SocketChannel socketChannel=serverSocketChannel_1.accept(); System.out.println("客戶端連接成功,並生成了一個Channel"); //開啟一個子線程 newFixedThreadPool.execute(new ServersubThread(socketChannel)); } } } }catch (Exception e){ }finally { //if(selector!=null)selector.close(); //if(serverSocketChannel!=null)serverSocketChannel.close(); } } } class ServersubThread implements Runnable{ SocketChannel socketchannel; public ServersubThread(SocketChannel socketchannel){ this.socketchannel=socketchannel; } @Override public void run() { try{ //獲取多路復用器 Selector subSelector = Selector.open(); //將其設置為非阻塞 socketchannel.configureBlocking(false); //將socketChannel注冊到selector,關注的事件為OP_READ socketchannel.register(subSelector,SelectionKey.OP_READ); while (true){ if(subSelector.select()==0) continue;//沒有事件就一直監聽 //返回事件集 Set<SelectionKey> selectionsKeys = subSelector.selectedKeys(); //對事件集遍歷 Iterator<SelectionKey> keyIterator = selectionsKeys.iterator(); while (keyIterator.hasNext()){ SelectionKey key = keyIterator.next(); if(key.isReadable()){//OP_READ 讀事件 //通過key反向獲取到對應的channel //為什么上面通過key.channel得到的是ServerSocketChannel,而這塊得到的確實 SocketChannel //因為連接事件是通過服務端的severSocket注冊的,而讀事件是通過客戶端的socket注冊的 SocketChannel socketchannel = (SocketChannel) key.channel(); //定義buffer ByteBuffer buffer=ByteBuffer.allocate(1024); //將channel中的事件讀取到buffer socketchannel.read(buffer); String msg=new String(buffer.array()); if("exit".equals(msg)){ System.out.println("退出命令"); break; } System.out.println(Thread.currentThread().getName()+"from 客戶端"+msg); buffer.flip(); //將內容返回給客戶端 socketchannel.write(buffer); } } } }catch (Exception e){ e.getMessage(); }finally { try { socketchannel.close(); } catch (IOException e) { e.printStackTrace(); } } } }
使用多線程相當於在服務端的主線程中只進行監聽客戶端的連接事件,如果連接成功就為每一個客戶端創建一個子線程用來監聽IO讀事件,這樣主線程與子線程配合使用,減少了服務端的業務,可更好的適應訪問高並發問題,大大提高服務端的執行效率。
【案例】:群聊系統
//客戶端
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class GroupChatClient { //定義相關屬性 private Selector selector; private SocketChannel socketChannel; private String username; //構造器 public GroupChatClient(){ try { //獲取到多路選擇器 selector=Selector.open(); //獲取到Channel,並連接服務器 socketChannel=socketChannel.open(new InetSocketAddress("127.0.0.1",6667)); //設置為非阻塞的 socketChannel.configureBlocking(false); //將其注冊到selector socketChannel.register(selector,SelectionKey.OP_READ); username=socketChannel.getLocalAddress().toString().substring(1); System.out.println(username+"is ok!"); } catch (IOException e) { e.printStackTrace(); } } public void sendInfo(String info){ String msg=username+"說:"+info; try { //將消息寫入通道發送給服務端 socketChannel.write(ByteBuffer.wrap(msg.getBytes())); } catch (IOException e) { e.printStackTrace(); } } public void readInfo(){ try { int readChannels=selector.select(500); //有事件發生 if(readChannels>0){ //獲取有事件發生的selectionKey Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectionKeys.iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); //判斷事件的類型 if(key.isReadable()){ //得到相關的通道 SocketChannel channel = (SocketChannel) key.channel(); //創建一個buffer用來存儲從通道中讀取到達數據 ByteBuffer buffer = ByteBuffer.allocate(1024); channel.read(buffer);//從通道讀取數據 System.out.println(new String(buffer.array())); } } iterator.remove(); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { //啟動客戶端 GroupChatClient groupChatClient=new GroupChatClient(); //啟動一個線程,來讀取服務器端發來的數據 new Thread(){ @Override public void run() { while (true){//每隔一秒就去讀取一下 groupChatClient.readInfo(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } }.start(); //發送數據給服務器端 Scanner scanner=new Scanner(System.in); while (scanner.hasNext()){ groupChatClient.sendInfo(new String(scanner.nextLine())); } } }
//服務端
import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Set; public class GroupChatServer { //定義屬性 private Selector selector;//多路復用選擇器 private ServerSocketChannel listenChannel;//監聽器 private static final int PORT=6667;//端口號 //構造器 public GroupChatServer(){ try { //獲得選擇器 selector=Selector.open(); //獲取監聽器 listenChannel=ServerSocketChannel.open(); //綁定端口 listenChannel.socket().bind(new InetSocketAddress(PORT)); //將Channel設置為非阻塞的 listenChannel.configureBlocking(false); //將ServerSocketChannel注冊到selector上 listenChannel.register(selector,SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } public void listen(){ try { //輪詢監聽是否有事件發生 while (true){ if(selector.select(500)==0){//若沒有檢測到,則繼續檢測 //System.out.println("等待。。。。"); continue; } System.out.println("監測到事件"); //檢測到有事件發生,就獲取發生事件的selectionKey Set<SelectionKey> selectionKeys=selector.selectedKeys(); //進行遍歷 Iterator<SelectionKey> keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()){ SelectionKey key = keyIterator.next(); //判斷是否有客戶端需要連接 if(key.isAcceptable()){ //給客戶端分配一個soketChannel SocketChannel socketChannel = listenChannel.accept(); //然后將其設置成非阻塞的 socketChannel.configureBlocking(false); //接着注冊給客戶端 socketChannel.register(selector,SelectionKey.OP_READ); System.out.println(socketChannel.getRemoteAddress()+"上線了"); } if(key.isReadable()){ readData(key); } keyIterator.remove();//刪除當前SelectionKey,防止重復操作 } } } catch (IOException e) { e.printStackTrace(); } } public void readData(SelectionKey key){ //先獲取Channel SocketChannel channel = (SocketChannel) key.channel(); //獲取buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //將channel中的數據讀到buffer中 try { channel.read(buffer); } catch (IOException e) { try { System.out.println(channel.getRemoteAddress()+"下線了"); //取消注冊 key.cancel(); // 關閉通道 channel.close(); } catch (IOException e1) { e1.printStackTrace(); } } String msg=new String(buffer.array()); System.out.println("from"+msg); //從客戶端接收到消息后,接着轉發給其他客戶 send(msg,channel); } //轉發消息給其他客戶 public void send(String msg,SocketChannel channel){ System.out.println("消息轉發中。。。。"); //遍歷所有注冊到selector上的channel for (SelectionKey key:selector.keys()) { //通過key獲取channel Channel targetchannel=key.channel(); if(targetchannel instanceof SocketChannel&&channel!=targetchannel){//排除自己 ByteBuffer buffer=ByteBuffer.wrap(msg.getBytes()); try { ((SocketChannel) targetchannel).write(buffer); } catch (IOException e) { e.printStackTrace(); } } } } public static void main(String[] args) { //創建服務器端對象 GroupChatServer groupChatServer=new GroupChatServer(); //開始一直監聽 groupChatServer.listen(); } }
2.3 AIO模型
AIO 也叫NIO2.0 是一種異步非阻塞的通信模式。在NIO的基礎上引入了新的異步通道的概念,並提供了異步文件通道和異步套接字通道的實現。
AIO 並沒有采用NIO的多路復用器,而是使用異步通道的概念。其read,write方法的返回類型都是Future對象。而Future模型是異步的,其核心思想是:去主函數等待時間。內核先把數據准備好,然后再把數據主動拷貝到程序空間,准備數據和拷貝數據程序都不參與,拷貝好了,系統通知程序可以執行程序了,但AIO現在沒有真正的實例,所以一般談得比較少。
小結:AIO模型中通過AsynchronousSocketChannel和AsynchronousServerSocketChannel完成套接字通道的實現。非阻塞,異步。
常見面試題:
1BIO,NIO,AIO區別
BIO 阻塞同步通信模式,客戶端和服務器連接需要三次握手,使用簡單,但吞吐量小
NIO 非阻塞同步通信模式,客戶端與服務器通過Channel連接,采用多路復用器輪詢注冊的Channel。提高吞吐量和可靠性。
AIO 非阻塞異步通信模式,NIO的升級版,采用異步通道實現異步通信,其read和write方法均是異步方法。
BIO,NIO的區別
1.BIO以流的方式處理數據的。而NIO以塊的方式處理數據,塊I/O的效率比流I/O的效率高很多。
2.BIO是阻塞的,NIO是非阻塞的。
3.BIO是基於字節流和字符流進行操作的,要么輸入流,要么輸出流,不能雙向。而NIO是基於Channel(通道)和Buffer(緩沖區)進行操作,Buffer既可以讀也可以寫,數據總是從通道讀取到緩沖區,或者從緩沖區寫入到通道中,Channel也是雙向的。Selector(選擇器)用於監聽多個通道的事件(比如:連接請求,數據到達等),因此使用單線程就可以監聽多個客戶端通道。
2 Stock通信的偽代碼實現流程
服務器綁定端口:server = new ServerSocket(PORT)
服務器阻塞監聽:socket = server.accept()
服務器開啟線程:new Thread(Handle handle)
服務器讀寫數據:BufferedReader PrintWriter
客戶端綁定IP和PORT:new Socket(IP_ADDRESS, PORT)
客戶端傳輸接收數據:BufferedReader PrintWriter
3 TCP協議與UDP協議有什么區別
TCP : 傳輸控制協議是基於連接的協議,在正式收發數據前,必須和對方建立可靠的連接。速度慢,合適傳輸大量數據。
UDP : 用戶數據報協議是與TCP相對應的協議。面向非連接的協議,不與對方建立連接,而是直接就把數據包發送過去,速度快,適合傳輸少量數據。
4 什么是同步阻塞BIO,同步非阻塞NIO,異步非阻塞AIO
同步阻塞IO : 用戶進程發起一個IO操作以后,必須等待IO操作的真正完成后,才能繼續運行。
同步非阻塞IO: 用戶進程發起一個IO操作以后,可做其它事情,但用戶進程需要經常詢問IO操作是否完成,這樣造成不必要的CPU資源浪費。
異步非阻塞IO: 用戶進程發起一個IO操作然后,立即返回,等IO操作真正的完成以后,應用程序會得到IO操作完成的通知。類比Future模式。
總結:
BIO | NIO | AIO 以Java的角度,理解如下:
- BIO,同步阻塞式IO,簡單理解:一個線程處理一個連接,發起和處理IO請求都是同步的。通過Socket和ServerSocket完成套接字通道實現。阻塞,同步,連接耗時。
- NIO,同步非阻塞IO,簡單理解:一個線程處理多個連接,發起IO請求是非阻塞的但處理IO請求是同步的。通過SocketChannel和ServerSocketChannel完成套接字通道實現。非阻塞/阻塞,同步,避免TCP建立連接使用三次握手帶來的開銷。
- AIO,異步非阻塞IO,簡單理解:一個有效請求一個線程,發起和處理IO請求都是異步的。通過AsynchronousSocketChannel和AsynchronousServerSocketChannel完成套接字通道實現。非阻塞,異步。
借鑒:https://www.cnblogs.com/diegodu/p/6823855.html
https://www.cnblogs.com/itdragon/p/8337234.html
當一個應用進程像這樣對一個非阻塞描述字循環調用recvfrom時,我們稱之為輪詢(polling)。應用進程持續輪詢內核,以查看某個操作是否就緒