前言
之前看過Dubbo源碼,Nacos等源碼都涉及到了Netty,雖然遇到的時候查查資料,后面自己也有私下學習Netty並實踐,但始終沒有形成良好的知識體系,Netty對想要在Java開發上不斷深入是十分重要的。所以借此博客平台記錄下自己的學習思考的過程,形成自己的知識體系,以后學習深入源碼更加得心應手!
參考資料《Netty In Action》、《Netty權威指南》(有需要的小伙伴可以評論或者私信我)
博文中所有的代碼都已上傳到Github,歡迎Star、Fork
一、Linux中常見的I/O模型
1.5種I/O模型
根據UNIX網絡編程對I/O模型的分類,UNIX提供了5種I/O模型
(1)阻塞I/O模型
如果還未獲取數據報則一直等待,直到數據報准備好了再進行復制數據報等接下來的操作。
(2)非阻塞I/O模型
輪詢檢查是否有數據准備好的狀態,如果數據准備好就進行接下來的操作
(3)I/O復用模型
Linux提供select/poll,進程通過將一個或多個fd傳遞給select或poll系統調用,這樣select/poll可以幫我們偵測多個fd是否處於就緒狀態。select/poll順序掃描fd是否就緒,但是select支持的fd數量有限(1024*8個)。Linux提供了epoll系統調用,epoll使用基於事件驅動方式代替順序掃描,因此性能更高。Java NIO的Selector基於epoll的多路復用技術實現。
(4)信號驅動I/O模型
首先開啟套接字信號驅動I/O功能,並通過系統調用sigaction執行一個信號處理函數。當數據准備就緒時,就為進程生成一個SIGIO信號,通過信號回調通知應用程序調用來讀取數據。
(5)異步I/O
告知內核啟動某個操作,並讓內核在整個操作完成后通知我們,這種模型與信號驅動模型的主要區別是:信號驅動I/O由內核通知我們何時可以開始一個I/O操作;異步I/O模型由內核通知我們I/O操作何時已經完成。
2.I/O多路復用技術
(1)I/O多路復用技術應用場景
在處理多客戶端連接I/O請求時,往往有兩種方式:一種是傳統的多線程處理,另一種就是I/O多路復用技術進行處理,但是與傳統的多線程處理比較,I/O多路復用最大的優勢就是在於系統開銷小,不需要創建額外的線程或進程處理客戶端連接,節省了系統資源,I/O多路復用技術主要的應用場景如下:
- 服務器需要同時處理多個處於監聽狀態或多個連接狀態的套接字
- 服務器需要同時處理多種網絡協議的套接字,比如又要處理UDP、又要處理TCP
(2)epoll的優勢
在Linux系統中,采用了I/O多路復用技術調用有select、poll、epoll,在Linux網絡編程的過程中。select、poll、epoll介紹這里不再講解,我也是參考了很多資料才把三者關系弄清楚,大家可以自行Google,起初使用select做輪詢,但是select有一些缺陷,不得不選擇epoll替代了select。epoll在select的基礎上做了如下的改進:
1)支持一個進程打開的socket描述符(FD)不受限制
select最大缺陷就是單個進程打開FD是有限制的,epoll是沒有的,而poll除了沒有限制(基於鏈表存儲,所有理論上沒有限制)以外跟select沒啥區別
2)I/O效率不會隨着FD數目的增加而線性下降
傳統的select/poll的有一個致命缺點:當有一個很大的socket集合時,由於網絡延遲或鏈路空閑,任一時刻只有少部分的socket是“活躍”的,但是select/poll仍然會線性掃描全部的集合,導致效率降低。而epoll只會針對“活躍”的socket進行操作,這是因為epoll根據每個fd上的回調函數callback函數實現的,只有“活躍”的socket才會主動調用該函數,其它則不會
3)使用mmap加速內核與用戶空間的消息傳遞
無論是select、poll還是epoll都需要內核把FD消息通知給用戶空間,如何避免不必要的內存復制顯得十分重要,epoll通過內核把用戶空間mmap同一塊內存來實現的。
4)epoll使用的API更加簡單
二、傳統BIO編程
在基於傳統同步阻塞模型開發中,ServerSocket負責綁定IP地址,啟動監聽端口;Socket負責發起連接操作。連接成功后,雙方通過輸入和輸出流進行同步阻塞式通信。
很明顯,這種模型缺乏彈性伸縮能力,當客戶端並發量增加后,線程數也隨着增加,可能會造成線程堆棧溢出、創建新線程失敗等問題。
1.通信模型
BIO通信模型的服務端,通常由一個獨立的Acceptor線程負責監聽客戶客戶端的連接,接收到客戶端連接請求之后為每個客戶端創建一個新的線程進行鏈路處理,處理完之后通過輸出流返回應答給客戶端,最后線程銷毀,這是典型的一請求一應答通信模型。
2.代碼實踐
(1)服務端代碼(代碼已上傳到Github)
/** * BIO通信服務端: * 由一個獨立的Acceptor線程負責監聽客戶客戶端的連接, * 接收到客戶端連接請求之后為每個客戶端創建一個新的線程進行鏈路處理, * 處理完之后通過輸出流返回應答給客戶端,最后線程銷毀,這是典型的一請求一應答通信模型 */ public class BioServer { public static void main(String[] args) { bioServer(8082); } /** * @param port */ public static void bioServer(int port) { ServerSocket server = null; try { // ServerSocket負責綁定IP地址,啟動監聽端口 server = new ServerSocket(port); System.out.println("The bio server is start in port : " + port); // Socket負責發起連接操作 Socket socket = null; // 無限循環監聽客戶端的連接,若沒有則主線程阻塞在ServerSocket的accept操作上 while (true) { socket = server.accept(); new Thread(new BioServerHandler(socket)).start(); } } catch (IOException e) { e.printStackTrace(); } finally { if (server != null) { System.out.println("The bio server close"); } try { server.close(); } catch (IOException e) { e.printStackTrace(); } server = null; } } }
(2)客戶端代碼(代碼已上傳到Github)
/** * BIO通信客戶端: * 由一個獨立的Acceptor線程負責監聽客戶客戶端的連接, * 接收到客戶端連接請求之后為每個客戶端創建一個新的線程進行鏈路處理, * 處理完之后通過輸出流返回應答給客戶端,最后線程銷毀,這是典型的一請求一應答通信模型 */ public class BioClient { public static void main(String[] args) throws InterruptedException { String host = "127.0.0.1"; int port = 8082; bioClient(host, port); } public static void bioClient(String host, int port) { Socket socket = null; BufferedReader in = null; PrintWriter out = null; try { socket = new Socket(host, port); in = new BufferedReader(new InputStreamReader(socket.getInputStream())); out = new PrintWriter(socket.getOutputStream(), true); // 發送查詢當前時間指令 out.println("QUERY CURRENT TIME ORDER"); System.out.println("Client send order to server succeed."); // 返回應答 String resp = in.readLine(); System.out.println("Now is : " + resp); } catch (IOException e) { e.printStackTrace(); } finally { if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (out != null) { out.close(); out = null; } // 釋放socket套接字句柄資源 if (socket != null) { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } } }
(3)處理器(代碼已上傳到Github)
public class BioServerHandler implements Runnable { private Socket socket; public BioServerHandler(Socket socket) { this.socket = socket; } @Override public void run() { BufferedReader in = null; PrintWriter out = null; try { in = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); out = new PrintWriter(this.socket.getOutputStream(), true); String currentTime = null; String body = null; while (true) { body = in.readLine(); if (body == null) { break; } System.out.println("The server receive order: " + body); currentTime = "QUERY CURRENT TIME ORDER".equalsIgnoreCase(body) ? new Date(System.currentTimeMillis()).toString() : "BAD ORDER"; out.println(currentTime); } } catch (IOException e) { e.printStackTrace(); } finally { if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } if (out != null) { out.close(); out = null; } // 釋放socket套接字句柄資源 if (this.socket != null) { try { this.socket.close(); } catch (IOException e) { e.printStackTrace(); } this.socket = null; } } } }
(4)測試結果
運行服務端,再運行客戶端:
服務端Console:
客戶端Console:
同時netstat命令查看TCP監聽端口8082
此外,通過dump thread查看,發現服務端線程一直阻塞在accept方法上,處於RUNNABLE狀態
為了解決同步阻塞I/O的缺點(處理鏈路:功能線程=1:1),后端通過一個線程池處理多個客戶端的請求接入,形成客戶端個數M:線程池最大線程數N的比例關系,其中M可以遠遠大於N。這就是通過Java線程池處理任務,而不是每次生成一個Thread。在《Netty權威指南》中稱之為“偽異步I/O”。