零拷貝介紹
- 零拷貝是網絡編程的關鍵, 很多性能優化都需要零拷貝。
- 在 Java程序中, 常用的零拷貝方式有m(memory)map[內存映射] 和 sendFile。它們在OS中又是怎樣的設計?
- NIO中如何使用零拷貝?
NIO 與 傳統IO對比
-
傳統IO流程示意圖
- user context: 用戶態
- kernel context: 內核態
- User space: 用戶空間
- Kernel space: 內核空間
- Syscall read: 系統調用讀取
- Syscall write: 系統調用寫入
- Hard drive: 硬件驅動
- kernel buffer: 內核態緩沖區
- user buffer: 用戶態緩沖區
- socket buffer: 套接字緩存
- protocol engine: 協議引擎
- DMA: Direct Memory Access: 直接內存拷貝(不使用CPU)
- 總結: 4次拷貝, 3次狀態切換, 效率不高
-
mmap優化流程示意圖
- mmap 通過內存映射, 將文件映射到內核緩沖區, 同時, 用戶空間可以共享內核空間的數據。
- 這樣, 在進行網絡傳輸時, 就可以減少內核空間到用戶空間的拷貝次數。
- 總結: 3次拷貝, 3次狀態切換, 不是真正意義上的零拷貝。
-
sendFile Linux2.1版本優化流程示意圖
- 數據根本不經過用戶態, 直接從內核緩沖區進入到Socket Buffer, 同時, 由於和用戶台完全無關, 就減少了一次上下文切換。
- 但是仍然有一次CPU拷貝, 不是真正的零拷貝(沒有CPU拷貝)。
- 總結: 3次拷貝, 2次切換
-
sendFile Linux
- 避免了從內核緩沖區拷貝到Socket buffer的操作, 直接拷貝到協議棧, 從而再一次減少了數據拷貝。
- 其實是有一次cpu拷貝的, kernel buffer -> socket buffer, 但是拷貝的信息很少, length, offset, 消耗低, 基本可以忽略。
- 總結: 2次拷貝(如果忽略消耗低的cpu拷貝的話), 2次切換, 基本可以認為是零拷貝了。
零拷貝理解
- 零拷貝是從操作系統的角度來看的。內核緩沖區之間, 沒有數據是重復的(只有kernel buffer有一份數據)。
- 零拷貝不僅僅帶來更少的數據復制, 還能帶來其他的性能優勢: 如更少的上下文切換, 更少的 CPU 緩存偽共享以及無CPU校驗和計算。
mmap 與 sendFile 總結
- mmap適合小數據兩讀寫, sendFile適合大文件傳輸
- mmap 需要3次上下文切換, 3次數據拷貝; sendFile 需要3次上下文切換, 最少2次數據拷貝。
- sendFile 可以利用 DMA 方式, 減少 CPU 拷貝, 而 mmap則不能(必須從內核拷貝到Socket緩沖區)。
NIO實現零拷貝
-
服務端
package com.ronnie.nio.zeroCopy; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; public class NewIOServer { public static void main(String[] args) throws IOException { InetSocketAddress address = new InetSocketAddress(8096); ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(address); // 創建Buffer ByteBuffer byteBuffer = ByteBuffer.allocate(4096); while (true){ SocketChannel socketChannel = serverSocketChannel.accept(); int readCount = 0; while (-1 != readCount){ try { readCount = socketChannel.read(byteBuffer); } catch (IOException e) { e.printStackTrace(); } // 倒帶, position設為0, mark重置為-1 byteBuffer.rewind(); } } } }
-
客戶端
package com.ronnie.nio.zeroCopy; import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.FileChannel; import java.nio.channels.SocketChannel; public class NewIOClient { public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.connect(new InetSocketAddress("localhost", 8096)); String filename = "flink-1.9.0-bin-scala_2.12.tgz"; // 得到一個文件channel FileChannel fileChannel = new FileInputStream(filename).getChannel(); // 准備發送 long startTime = System.currentTimeMillis(); // 在Linux下一次transferTo方法就可以完成傳輸 // 在Windows下一次調用transferTo 只能發送 8M, 就需要分段傳輸文件, 而且主要傳輸時的位置需要記錄 long transferCount = 0L; if (fileChannel.size() <= 8){ // transferTo() 參數1: 從什么位置開始, 參數2: 截多少, 參數3: 可寫的管道對象) transferCount = fileChannel.transferTo(0, fileChannel.size(), socketChannel); } else { int times = (int) (fileChannel.size() / 8 + 1); for (int i = 1; i < times; i++){ transferCount += fileChannel.transferTo(8 * i, 8 * i + 8, socketChannel); } } System.out.println("Total byte count: " + transferCount + " time consumed: " + (System.currentTimeMillis() - startTime)); // 關閉 fileChannel.close(); } }
核心方法transferTo()
-
代碼(這是fileChannelImpl中的反編譯代碼)
public long transferTo(long var1, long var3, WritableByteChannel var5) throws IOException { // 確認當前管道已經開啟, 檢查到未開啟會拋出異常 this.ensureOpen(); // 如果傳入的管道未開啟, 拋出異常 if (!var5.isOpen()) { throw new ClosedChannelException(); // 如果當前管道不可讀, 拋出異常 } else if (!this.readable) { throw new NonReadableChannelException(); // 如果傳入的管道是實現類 且 該管道不可寫, 拋出異常 } else if (var5 instanceof FileChannelImpl && !((FileChannelImpl)var5).writable) { throw new NonWritableChannelException(); // 如果 position >= 0 且 count >= 0 } else if (var1 >= 0L && var3 >= 0L) { // 獲取當前管道的長度 long var6 = this.size(); // 如果position已經超過當前管道末尾, 就返回0 if (var1 > var6) { return 0L; } else { // 將count數與2147483647L比較並獲取其中最小值, 再轉換成int, 傳給var8, 其實這里就是做了一個防止count越界的處理 int var8 = (int)Math.min(var3, 2147483647L); // 如果管道末尾到position之間的長度小於var8 if (var6 - var1 < (long)var8) { // 就把該值賦給var8 var8 = (int)(var6 - var1); } long var9; // transferToDirectly 直接傳輸 if ((var9 = this.transferToDirectly(var1, var8, var5)) >= 0L) { return var9; } else { // transferToTrustedChannel 傳輸到可靠的管道 // transferToArbitraryChannel 傳輸到任意的管道 // 其實就是先嘗試傳輸到可靠的管道, 如果傳輸失敗, 再用任意管道繼續傳輸 return (var9 = this.transferToTrustedChannel(var1, (long)var8, var5)) >= 0L ? var9 : this.transferToArbitraryChannel(var1, var8, var5); } } } else { throw new IllegalArgumentException(); } }