Netty 是基於Java NIO 封裝的網絡通訊框架,只有充分理解了 Java NIO 才能理解好Netty的底層設計。Java NIO 由三個核心組件組件:
-
Buffer
-
Channel
-
Selector
緩沖區 Buffer
Buffer 是一個數據對象,我們可以把它理解為固定數量的數據的容器,它包含一些要寫入或者讀出的數據。
在 Java NIO 中,任何時候訪問 NIO 中的數據,都需要通過緩沖區(Buffer)進行操作。讀取數據時,直接從緩沖區中讀取,寫入數據時,寫入至緩沖區。NIO 最常用的緩沖區則是 ByteBuffer。下圖是 Buffer 繼承關系圖:
每一個 Java 基本類型都對應着一種 Buffer,他們都包含這相同的操作,只不過是所處理的數據類型不同而已。
通道 Channel
Channel 是一個通道,它就像自來水管一樣,網絡數據通過 Channel 這根水管讀取和寫入。傳統的 IO 是基於流進行操作的,Channle 和類似,但又有些不同:
區別 | 流 | 通過Channel |
---|---|---|
支持異步 | 不支持 | 支持 |
是否可雙向傳輸數據 | 不能,只能單向 | 可以,既可以從通道讀取數據,也可以向通道寫入數據 |
是否結合 Buffer 使用 | 不 | 必須結合 Buffer 使用 |
性能 | 較低 | 較高 |
正如上面說到的,Channel 必須要配合 Buffer 一起使用,我們永遠不可能將數據直接寫入到 Channel 中,同樣也不可能直接從 Channel 中讀取數據。都是通過從 Channel 讀取數據到 Buffer 中或者從 Buffer 寫入數據到 Channel 中,如下:
簡單點說,Channel 是數據的源頭或者數據的目的地,用於向 buffer 提供數據或者讀取 buffer 數據,並且對 I/O 提供異步支持。
下圖是 Channel 的類圖
Channel 為最頂層接口,所有子 Channel 都實現了該接口,它主要用於 I/O 操作的連接。定義如下:
public interface Channel extends Closeable {
/**
* 判斷此通道是否處於打開狀態。
*/
public boolean isOpen();
/**
*關閉此通道。
*/
public void close() throws IOException;
}
最為重要的Channel實現類為:
-
FileChannel:一個用來寫、讀、映射和操作文件的通道
-
DatagramChannel:能通過 UDP 讀寫網絡中的數據
-
SocketChannel: 能通過 TCP 讀寫網絡中的數據
-
ServerSocketChannel:可以監聽新進來的 TCP 連接,像 Web 服務器那樣。對每一個新進來的連接都會創建一個 SocketChannel
多路復用器 Selector
多路復用器 Selector,它是 Java NIO 編程的基礎,它提供了選擇已經就緒的任務的能力。從底層來看,Selector 提供了詢問通道是否已經准備好執行每個 I/O 操作的能力。簡單來講,Selector 會不斷地輪詢注冊在其上的 Channel,如果某個 Channel 上面發生了讀或者寫事件,這個 Channel 就處於就緒狀態,會被 Selector 輪詢出來,然后通過 SelectionKey 可以獲取就緒 Channel 的集合,進行后續的 I/O 操作。
Selector 允許一個線程處理多個 Channel ,也就是說只要一個線程復雜 Selector 的輪詢,就可以處理成千上萬個 Channel ,相比於多線程來處理勢必會減少線程的上下文切換問題。下圖是一個 Selector 連接三個 Channel :
實例
服務端
public class NIOServer {
/*接受數據緩沖區*/
private ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
/*發送數據緩沖區*/
private ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
private Selector selector;
public NIOServer(int port) throws IOException {
// 打開服務器套接字通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 服務器配置為非阻塞
serverSocketChannel.configureBlocking(false);
// 檢索與此通道關聯的服務器套接字
ServerSocket serverSocket = serverSocketChannel.socket();
// 進行服務的綁定
serverSocket.bind(new InetSocketAddress(port));
// 通過open()方法找到Selector
selector = Selector.open();
// 注冊到selector,等待連接
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("Server Start----:");
}
private void listen() throws IOException {
while (true) {
selector.select();
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
iterator.remove();
handleKey(selectionKey);
}
}
}
private void handleKey(SelectionKey selectionKey) throws IOException {
// 接受請求
ServerSocketChannel server = null;
SocketChannel client = null;
String receiveText;
String sendText;
int count=0;
// 測試此鍵的通道是否已准備好接受新的套接字連接。
if (selectionKey.isAcceptable()) {
// 返回為之創建此鍵的通道。
server = (ServerSocketChannel) selectionKey.channel();
// 接受到此通道套接字的連接。
// 此方法返回的套接字通道(如果有)將處於阻塞模式。
client = server.accept();
// 配置為非阻塞
client.configureBlocking(false);
// 注冊到selector,等待連接
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
// 返回為之創建此鍵的通道。
client = (SocketChannel) selectionKey.channel();
//將緩沖區清空以備下次讀取
receivebuffer.clear();
//讀取服務器發送來的數據到緩沖區中
count = client.read(receivebuffer);
if (count > 0) {
receiveText = new String( receivebuffer.array(),0,count);
System.out.println("服務器端接受客戶端數據--:"+receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
} else if (selectionKey.isWritable()) {
//將緩沖區清空以備下次寫入
sendbuffer.clear();
// 返回為之創建此鍵的通道。
client = (SocketChannel) selectionKey.channel();
sendText="message from server--";
//向緩沖區中輸入數據
sendbuffer.put(sendText.getBytes());
//將緩沖區各標志復位,因為向里面put了數據標志被改變要想從中讀取數據發向服務器,就要復位
sendbuffer.flip();
//輸出到通道
client.write(sendbuffer);
System.out.println("服務器端向客戶端發送數據--:"+sendText);
client.register(selector, SelectionKey.OP_READ);
}
}
public static void main(String[] args) throws IOException {
int port = 8080;
NIOServer server = new NIOServer(port);
server.listen();
}
}
客戶端
public class NIOClient {
/*接受數據緩沖區*/
private static ByteBuffer sendbuffer = ByteBuffer.allocate(1024);
/*發送數據緩沖區*/
private static ByteBuffer receivebuffer = ByteBuffer.allocate(1024);
public static void main(String[] args) throws IOException {
// 打開socket通道
SocketChannel socketChannel = SocketChannel.open();
// 設置為非阻塞方式
socketChannel.configureBlocking(false);
// 打開選擇器
Selector selector = Selector.open();
// 注冊連接服務端socket動作
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// 連接
socketChannel.connect(new InetSocketAddress("127.0.0.1", 8080));
Set<SelectionKey> selectionKeys;
Iterator<SelectionKey> iterator;
SelectionKey selectionKey;
SocketChannel client;
String receiveText;
String sendText;
int count=0;
while (true) {
//選擇一組鍵,其相應的通道已為 I/O 操作准備就緒。
//此方法執行處於阻塞模式的選擇操作。
selector.select();
//返回此選擇器的已選擇鍵集。
selectionKeys = selector.selectedKeys();
//System.out.println(selectionKeys.size());
iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
System.out.println("client connect");
client = (SocketChannel) selectionKey.channel();
// 判斷此通道上是否正在進行連接操作。
// 完成套接字通道的連接過程。
if (client.isConnectionPending()) {
client.finishConnect();
System.out.println("完成連接!");
sendbuffer.clear();
sendbuffer.put("Hello,Server".getBytes());
sendbuffer.flip();
client.write(sendbuffer);
}
client.register(selector, SelectionKey.OP_READ);
} else if (selectionKey.isReadable()) {
client = (SocketChannel) selectionKey.channel();
//將緩沖區清空以備下次讀取
receivebuffer.clear();
//讀取服務器發送來的數據到緩沖區中
count=client.read(receivebuffer);
if(count>0){
receiveText = new String( receivebuffer.array(),0,count);
System.out.println("客戶端接受服務器端數據--:"+receiveText);
client.register(selector, SelectionKey.OP_WRITE);
}
} else if (selectionKey.isWritable()) {
sendbuffer.clear();
client = (SocketChannel) selectionKey.channel();
sendText = "message from client--";
sendbuffer.put(sendText.getBytes());
//將緩沖區各標志復位,因為向里面put了數據標志被改變要想從中讀取數據發向服務器,就要復位
sendbuffer.flip();
client.write(sendbuffer);
System.out.println("客戶端向服務器端發送數據--:"+sendText);
client.register(selector, SelectionKey.OP_READ);
}
}
selectionKeys.clear();
}
}
}
運行結果