通過 socket 實現一個 TCP 服務端與客戶端,實現通過 TCP 協議進行消息收發。
關鍵在 socket 的使用的理解上。
socket 是對操作系統提供的協議棧的封裝,底層調用的是操作系統提供的協議棧。
當我們調用 ServerSocket 的 accept 方法時,線程阻塞。以 TCP 協議為例,直到網卡接收到一個三次握手的連接請求,網卡向 CPU 發送中斷信號,CPU 調用中斷處理程序喚醒我們阻塞在 accept 方法上的線程,進行連接處理。
三次握手的過程是由協議棧完成的,我們在應用層編程無法感知。直到三次握手完成,協議棧將客戶端信息與服務端信息封裝在一個 Socket 對象中返回,我們通過該Socket 對象完成數據的收發。
值得注意的是,在連接建立完成前,操作系統會為本次連接在內核空間開辟兩個數據緩沖區:發送緩沖區與接收緩沖區。
我們要做的是監聽發送緩沖區是否有數據到達,以及將需要發送的數據寫入發送緩沖區。
至於網卡接收到的數據何時由操作系統拆包並寫入接收緩沖區,以及我們寫入發送緩沖區的數據何時會被封裝為 TCP 報文發送給網卡是操作系統 OS 控制的,這對我們來說是透明的。不同的 OS 對此會有不同的實現,我們不需要關注這些細節(或者說想要關注也沒辦法介入)。
本次實現是通過傳統的 ServerSocket 建立服務端,並沒有使用通道技術。也就是說是 BIO 的實現,當並發量比較大時可以采用 NIO 多路復用技術進行優化。這可以幫助我們節約線程數。
對 TCP 報文進行分包有多種方式,本次實現使用的是最普適的方式,通過報文頭添加報文長度字段進行分包,也就是與 HTTP 協議 Header 中的 Content-Length 相同的方式。
測試時客戶端發送的數據是一個序列化的對象,服務端對其進行反序列化並檢查結果。
由於牽扯到線程的切換,本次實現並沒有對代碼結構進行提前設計,僅僅是簡單的實現了數據收發功能。經過設計優化的代碼將在下篇博客發出。
服務端:
/** * @Author Nxy * @Date 2020/3/21 17:16 * @Description socket 服務端 */ public class BasicSeverDemo { public static void main(String[] args) { ServerSocket server = null; try { server = new ServerSocket(80); System.out.println("server start!"); } catch (IOException e) { e.printStackTrace(); return; } while (!Thread.currentThread().isInterrupted()) { Socket socket; BufferedInputStream in; BufferedOutputStream out; try { //阻塞等待連接請求 socket = server.accept(); System.out.println("建立連接:" + socket.getInetAddress()); in = new BufferedInputStream(socket.getInputStream()); out = new BufferedOutputStream(socket.getOutputStream()); } catch (IOException e) { e.printStackTrace(); System.out.println("連接建立失敗!"); continue; } byte[] result; try { //阻塞等待接收請求數據 byte[] lengthByte = IOUtil.readBytesFromInputStream(in, 4); //本次請求的長度 int length = ByteBuffer.wrap(lengthByte).getInt(); System.out.println("from server:" + length); //讀取指定長度字節 result = IOUtil.readBytesFromInputStream(in, length); } catch (Exception e) { e.printStackTrace(); break; } //反序列化對象 Invocation obj = null; try { ByteArrayInputStream bis = new ByteArrayInputStream(result); ObjectInputStream ois = new ObjectInputStream(bis); obj = (Invocation) ois.readObject(); ois.close(); bis.close(); } catch (IOException ex) { ex.printStackTrace(); } catch (ClassNotFoundException ex) { ex.printStackTrace(); } System.out.println(obj.getInterfaceName() + ":" + obj.getMethodName()); } }
客戶端:
/** * @Author Nxy * @Date 2020/3/21 17:54 * @Description socket 客戶端 */ public class BasicClientDemo { public static void main(String[] args) { Socket socket; BufferedOutputStream out; BufferedInputStream in; try { socket = new Socket("127.0.0.1", 80); out = new BufferedOutputStream(socket.getOutputStream()); in = new BufferedInputStream(socket.getInputStream()); } catch (IOException e) { e.printStackTrace(); return; } Object[] params = new Object[2]; Class[] paramTypes = new Class[2]; Invocation invocation = new Invocation(BasicClientDemo.class.getName(), "main", paramTypes, params); byte[] invocationBytes = toByteArray(invocation); int length = invocationBytes.length; try { System.out.println("from client:" + length); out.write(ByteBuffer.allocate(4).putInt(length).array()); out.flush(); out.write(invocationBytes); out.flush(); } catch (IOException e) { e.printStackTrace(); } finally { try { out.flush(); out.close(); } catch (IOException e) { e.printStackTrace(); } } }
工具類:
public class IOUtil { /** * @Author Nxy * @Date 2020/3/21 20:21 * @Param in:輸入流,length:讀取字節數 * @Return * @Exception * @Description 從輸入流讀取指定長度字節的數據 */ public static byte[] readBytesFromInputStream(BufferedInputStream in, int length) throws IOException { int readSize; byte[] bytes = null; bytes = new byte[length]; long length_tmp = length; long index = 0;// start from zero while ((readSize = in.read(bytes, (int) index, (int) length_tmp)) != -1) { length_tmp -= readSize; if (length_tmp == 0) { break; } index = index + readSize; } return bytes; }
public static byte[] toByteArray(Object obj) { byte[] bytes = null; ByteArrayOutputStream bos = new ByteArrayOutputStream(); try { ObjectOutputStream oos = new ObjectOutputStream(bos); oos.writeObject(obj); oos.flush(); bytes = bos.toByteArray(); oos.close(); bos.close(); } catch (IOException ex) { ex.printStackTrace(); } return bytes; }
}
執行效果,服務端正常接收到數據並成功反序列化為對象: