我不生產知識,我只是知識的搬運工。努力通過實踐與各位博友交流一些自己的見解。
引文:
由於cpu和磁盤等存儲設備的處理速度的差異,巧妙的io設計能夠極大的提升工作效率。從硬件設計角度包括 SPOOLING(假脫機)技術(實現獨占設備的共享),DMA(通過中斷的方式實現內存到磁盤的傳輸通道)大大降低了io傳輸到cpu的調用和阻塞,通道IO(有自己的指令和程序,相比DMA有更強的獨立處理數據能力。並且可以控制多台同類或不同類的設備)。————來自王道考研操作系統
總結:硬件實現cpu與磁盤的盡可能獨立運行,磁盤讀取盡可能少的通過中斷程序來獲取cpu的執行權。
解決了單個IO的CPU和磁盤獨立運行,我們來看下多個IO連接時,操作系統如何優化? 也就是多個io鏈接如何管理的問題。IO作為計算機的核心功能,用戶只能通過系統調用實現用戶態到內核態的切換來讀寫磁盤數據。傳統io 每建立一個io鏈接就要新建一個線程阻塞在當前操作中,在IO密集型任務中會大大降低CPU的利用率。通過IO復用監聽多個文件描述符來提升程序的性能。
注意:IO復用雖然能同時監聽多個文件描述符,但它本身是阻塞的。並且當多個文件描述符同時就緒時,如果不采取額外的措施,程序就只能按順序依次處理其中的每一個文件描述符,這使得程序看起來就像是串行工作。如果要實現並發,只能使用多線程和多進程等編程手段。————來自Linux高性能服務器編程
理論知識:
系統調用 | select | poll | epoll |
事件集合 | 用戶通過3個參數分別傳入感興趣的可讀,可寫和異常等事件,內核通過對這些參數的在線修改來反饋其中的就緒事件。這使得用戶每次調用select都要重置這3個參數 | 統一處理所有事件類型,因此只需一個事件集參數。用戶通過pollfd.events傳入感興趣的事件,內核通過修改polld.revents反饋其中就緒的事件 | 內核通過一個時間表直接管理用戶感興趣的所有事件。因此每次調用epoll_wait時,無需反復傳入用戶感興趣的事件。epoll_wait系統調用的參數events僅用來反饋就緒的事件 |
應用程序索引就緒文件描述符的事件復雜度 | O(n) | O(n) | O(1) |
最大支持文件描述符數 | 有最大值限制(內核默認值為1024) | 65535 | 65535 |
工作模式 | LT | LT | 支持ET高效模式 |
內核實現和工作效率 | 采用輪詢方法來檢測就緒事件,算法事件復雜度為O(n) | 采用輪詢方法來檢測就緒事件,算法事件復雜度為O(n) | 采用回調方法來檢測就緒事件,算法事件復雜度為O(n) |
系統調用API的演進路線:select————》poll(事件處理統一,編程接口更簡潔。不需要重置參數)————》epoll(使用回調替換輪詢機制,降低事件復雜度為O(1))
注意:當活動鏈接比較多的時候,epoll_wait的效率未必比selelct和poll高,因為此時回調函數出發得過於頻繁。所有epoll_wait適用於連接數量多,但活動鏈接少的情況。
實踐代碼:
聊天室程序:
服務端:

/** * @author: ljf * @date: 2020/12/29 19:25 * @description: 聊天室服務端 * 功能: * 1.數據轉發broadcast * 功能推導類屬性: * 1.userNames * 2.userThreads * 3.port 監聽端口 * <p> * accept阻塞監聽客戶端的鏈接,將鏈接添加到userNames,userThreads並啟動用戶線程接收數據 * 服務端的userThread只用來轉發數據,read and write * TODO:多個客戶端同時退出的並發問題, 客戶端正常段開時給服務器能夠檢測到。同時給其他客戶端發信息 * socket全雙工通信 * @modified By: * @version: $ 1.0 */ public class ChatServer { private final int port; private final HashSet<String> userNames = new HashSet<>(); private final HashSet<UserThread> userThreads = new HashSet<>(); public AtomicInteger connectedCount = new AtomicInteger(0); public ChatServer(int port) { this.port = port; } public void execute() { //監聽端口 try (ServerSocket serverSocket = new ServerSocket(port)) { System.out.println("chat server listening on port:" + port); while (true) { //創建服務端進程,accept阻塞監聽 Socket clientSocket = serverSocket.accept(); UserThread newUser = new UserThread(clientSocket, this); userThreads.add(newUser); newUser.start(); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { int port = 12345; ChatServer chatServer = new ChatServer(port); chatServer.execute(); } public void broadMessage(String serverMessage, UserThread userThread) { for (UserThread user : userThreads) { if (user != null && user != userThread) { //除當前用戶 user.sendMessage(serverMessage); } } } public Set<String> getUserNames() { return this.userNames; } /** * 刪除用戶,刪除userName和userThread */ public void removeUser(String user, UserThread userThread) { boolean removed = userNames.remove(user); if (removed) { userThreads.remove(userThread); System.out.println(user + " quit group chat"); } } public boolean hasUsers() { return !this.userNames.isEmpty(); } public void addUserName(String userName) { this.userNames.add(userName); } }
服務端每創建一個socket鏈接,就創建一個用戶線程:

import java.io.*; import java.net.Socket; /** * @author: ljf * @date: 2020/12/29 19:34 * @description: 服務端socket線程,用來向客戶端轉發消息 * @modified By: * @version: $ 1.0 */ public class UserThread extends Thread { private final Socket clientSocket; private PrintWriter printWriter; private String userName; //客戶端創建時輸入用戶名,網絡傳輸獲取 private final ChatServer chatServer; public UserThread(Socket clientSocket, ChatServer chatServer) { this.clientSocket = clientSocket; this.chatServer = chatServer; } public String getUserName() { return this.userName; } @Override public void run() { String serverMessage; try { OutputStream outputStream = clientSocket.getOutputStream(); printWriter = new PrintWriter(outputStream,true); //阻塞監聽客戶端發來的消息,然后轉發給其他客戶端 InputStream inputStream = clientSocket.getInputStream(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); printUsers(); //首條消息是 客戶端姓名 userName = bufferedReader.readLine(); chatServer.addUserName(userName); System.out.println("connectedCount: " + chatServer.connectedCount.getAndIncrement() + " new user connected: " + userName); serverMessage = "new user " + userName + " connected"; chatServer.broadMessage(serverMessage, this); //read 阻塞,直到客戶端發來"bye"消息,斷開連接 String clientMessage; while (!(clientMessage = bufferedReader.readLine()).equals("bye")) { //拼接上當前socket的用戶,轉發給其他用戶 serverMessage = "[" + userName + "]: " + clientMessage; chatServer.broadMessage(serverMessage, this); } } catch (IOException e) { // e.printStackTrace(); System.err.println(e.getMessage()); } finally { //與客戶端socket斷開連接 chatServer.removeUser(userName, this); if(clientSocket!=null && clientSocket.isConnected()){ try { clientSocket.shutdownOutput();//立即關閉輸出流 clientSocket.close(); } catch (IOException e) { e.printStackTrace(); } } //轉發我離開的消息 serverMessage = userName + " has quited"; chatServer.broadMessage(serverMessage, this); } } /** * 向新鏈接的客戶端發送當前服務器的用戶列表 */ public void printUsers() { if (chatServer.hasUsers()) { printWriter.println("connected users: " + chatServer.getUserNames()); } else { printWriter.println("no other users connected"); } } /** * 向客戶端發送消息 * * @param message:消息內容 */ public void sendMessage(String message) { printWriter.println(message); } }
客戶端:
public class ChatClient { private volatile String userName; private final String hostName; private final int port; private volatile boolean closed = false; public ChatClient(String hostName,int port){ this.hostName = hostName; this.port = port; } public static void main(String[] args) { String hostName = "localhost"; int port = 12345; ChatClient chatClient = new ChatClient(hostName, port); chatClient.execute(); } /** * 與服務端建立連接 */ public void execute(){ try { //必須輸入用戶名字后才能創建socket Scanner scanner = new Scanner(System.in); System.out.print("\nEnter your name: "); userName = scanner.nextLine(); Socket clientSocket = new Socket(hostName, port); System.out.println("connected to chat server"); new ReadThread(clientSocket,this).start(); new WriteThread(clientSocket,this).start(); } catch (IOException e) { e.printStackTrace(); } } public void setClosed(){ closed = true; } public boolean isClosed(){ return closed; } public String getUserName() { return userName; } }
客戶端socket讀線程
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.net.Socket; import java.net.SocketException; /** * @author: ljf * @date: 2020/12/29 20:55 * @description: * @modified By: * @version: $ 1.0 */ public class ReadThread extends Thread{ private final ChatClient chatClient; private final Socket clientSocket; public ReadThread(Socket clientSocket,ChatClient chatClient){ this.chatClient = chatClient; this.clientSocket = clientSocket; } @Override public void run() { while (!chatClient.isClosed()) { try { InputStream inputStream = clientSocket.getInputStream(); BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); String response = bufferedReader.readLine(); System.out.println("\n" + response); // prints the username after displaying the server's message if (chatClient.getUserName() != null) { System.out.print("[" + chatClient.getUserName() + "]: "); } } catch (SocketException se){ //TODO:正常退出替代這里 System.out.println("quit"); System.err.println(se.getMessage()); break; } catch (IOException ex) { System.out.println("Error reading from server: " + ex.getMessage()); ex.printStackTrace(); break; } } } }
客戶端socket寫線程
import java.io.IOException; import java.io.OutputStream; import java.io.PrintWriter; import java.net.Socket; import java.util.Scanner; /** * @author: ljf * @date: 2020/12/29 20:59 * @description: * @modified By: * @version: $ 1.0 */ public class WriteThread extends Thread { private final ChatClient chatClient; private final Socket clientSocket; private PrintWriter printWriter; public WriteThread(Socket clientSocket, ChatClient chatClient) { this.chatClient = chatClient; this.clientSocket = clientSocket; try { OutputStream outputStream = clientSocket.getOutputStream(); printWriter = new PrintWriter(outputStream,true); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { Scanner scanner = new Scanner(System.in); String userName = chatClient.getUserName(); printWriter.println(userName); String text; // System.out.print("[" + userName + "]: "); while (!(text = scanner.nextLine()).equals("bye")) { printWriter.println(text); System.out.print("[" + userName + "]: "); } try { printWriter.println(text); clientSocket.close(); } catch (IOException ex) { System.out.println("Error writing to server: " + ex.getMessage()); } } }
TODO:當前是用傳統io流實現,后期加入nio,零拷貝。