一、Selector&Channel
1.1:各種channel
寫這個模型需要提前了解Selector以及Channel,之前記錄過FileChannel,除此之外還有以下幾種Channel:
ServerSocketChannel:用於監聽新的TCP連接的通道,負責讀取&響應,通常用於服務端的實現。
SocketChannel:用於發起TCP連接,讀寫網絡中的數據,通常用於客戶端的實現。
DatagramChannel:上述兩個通道基於TCP傳輸協議,而這個通道則基於UDP,用於讀寫網絡中的數據。
FileChannel:從文件讀取數據。
本篇重點放在ServerSocketChannel和SocketChannel上,大部分客戶端/服務端為了保證數據准確性,都是基於TCP傳輸協議實現的,由於使用Selector注冊必須要求被注冊的Channel是非阻塞模式的,因此FileChannel由於沒有非阻塞模式(無法設置configureBlocking(false)),沒辦法和注冊到selector。
1.2:selector
Selector是個通道注冊器(用法會在程序里標注),是實現Reactor模型的關鍵,多個通道均可以注冊到Selector,Selector負責監聽每個Channel的幾個事件:連接就緒、寫就緒、讀就緒,當某個channel注冊感興趣就緒事件到selector時,若發生興趣事件就緒,則Selector.select()方法不再阻塞,返回興趣事件集合(可能包含多個channel的),然后按照事件不同進行分發處理。
Selector返回對應的就緒事件,封裝為SelectionKey,每個Channel對應一個SelectionKey,這個對象還可以通過attach方法附着處理類(Handler、Acceptor等)。
1.3:一個簡單的例子
先來看個簡單使用Selector做處理的服務端實現,可以簡單對Selector和SelectionKey的用法做個了解:
public static void main(String[] args) throws IOException {
Selector selector = Selector.open(); //打開選擇器
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //打開通道
serverSocketChannel.configureBlocking(false); //設置通道為非阻塞模式
serverSocketChannel.bind(new InetSocketAddress(2333)); //綁定端口
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //注冊channel到選擇器,指定監聽該Channel的哪些事件,初始化都是對連接事件監聽(因為是入口)
while (selector.select() > 0) { // 若收到就緒事件select返回“感興趣”事件集合,否則阻塞當前線程
Set keys = selector.selectedKeys(); //獲取本次拿到的事件集合
Iterator iterator = keys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
if (key.isAcceptable()) { //當前就緒事件為連接事件
ServerSocketChannel skc = (ServerSocketChannel) key.channel(); //連接就緒觸發,說明已經有客戶端通道連了過來,這里需要拿服務端通道去獲取客戶端通道
SocketChannel socketChannel = skc.accept(); //獲取客戶端通道(連接就緒,說明客戶端接下來可能還有別的動作,比如讀和寫)
socketChannel.configureBlocking(false); //同樣的需要設置非阻塞模式
System.out.println(String.format("收到來自 %s 的連接", socketChannel.getRemoteAddress()));
socketChannel.register(selector, SelectionKey.OP_READ); //將該客戶端注冊到選擇器,感興趣事件設置為讀(客戶端連接完畢,很肯能會往服務端寫數據,因此這里要注冊讀事件用以接收這些數據)
} else if (key.isReadable()) { //當前事件為讀就緒
SocketChannel socketChannel = (SocketChannel) key.channel(); //能觸發讀就緒,說明客戶端已經開始往服務端寫數據,通過SelectionKey拿到當前客戶端通道
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = socketChannel.read(buffer); //從通道讀入數據
if (count < 0) { //若本次讀就緒拿到-1,則認為客戶端主動斷開了連接
socketChannel.close(); //服務端關閉客戶端通道
key.cancel(); //斷連后就將該事件從選擇器的SelectionKey集合中移除(這里說一下,這里不是真正意義上的移除,這里是取消,會將該key放入取消隊列里,在下次select函數調用時才負責清空)
System.out.println("連接關閉");
continue;
}
System.out.println(String.format("收到來自 %s 的消息: %s",
socketChannel.getRemoteAddress(),
new String(buffer.array())));
}
keys.remove(key);
}
}
}
上面是一個簡單的例子,接下來,就利用選擇器、通道來實現Reactor單線程模型。
二、單Reactor單線程模型的服務端實現
實現服務端,服務端負責接收客戶端的連接,接收客戶端的請求數據以及響應客戶端。
把上一篇的結構圖再拿過來展示下,看看需要做的有哪些模塊:
圖1
通過上圖,我們需要實現的模塊有Reactor、Acceptor、Handler,下面來逐個編寫:
2.1:Reactor核心模塊
該模塊內部包含兩個核心方法,select和dispatch,該模塊負責監聽就緒事件和對事件的分發處理:
public class Reactor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
public Reactor(int port) throws IOException { //Reactor初始化
selector = Selector.open(); //打開一個Selector
serverSocketChannel = ServerSocketChannel.open(); //建立一個Server端通道
serverSocketChannel.socket().bind(new InetSocketAddress(port)); //綁定服務端口
serverSocketChannel.configureBlocking(false); //selector模式下,所有通道必須是非阻塞的
//Reactor是入口,最初給一個channel注冊上去的事件都是accept
SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
//attach callback object, Acceptor
sk.attach(new Acceptor(serverSocketChannel, selector));
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select(); //就緒事件到達之前,阻塞
Set selected = selector.selectedKeys(); //拿到本次select獲取的就緒事件
Iterator it = selected.iterator();
while (it.hasNext()) {
//這里進行任務分發
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment()); //這里很關鍵,拿到每次selectKey里面附帶的處理對象,然后調用其run,這個對象在具體的Handler里會進行創建,初始化的附帶對象為Acceptor(看上面構造器)
//調用之前注冊的callback對象
if (r != null) {
r.run();
}
}
}
細節已標注。
2.2:實現Acceptor模塊
這個模塊只負責處理連接就緒事件,有了這個事件就可以拿到客戶單的SocketChannel,就可以繼續完成接下來的讀寫任務了:
public class Acceptor implements Runnable {
private final Selector selector;
private final ServerSocketChannel serverSocketChannel;
Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
this.serverSocketChannel = serverSocketChannel;
this.selector = selector;
}
@Override
public void run() {
SocketChannel socketChannel;
try {
socketChannel = serverSocketChannel.accept();
if (socketChannel != null) {
System.out.println(String.format("收到來自 %s 的連接",
socketChannel.getRemoteAddress()));
new Handler(socketChannel, selector); //這里把客戶端通道傳給Handler,Handler負責接下來的事件處理(除了連接事件以外的事件均可)
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
細節已標注。
2.3:Handler模塊的實現
這個模塊負責接下來的讀寫操作:
public class Handler implements Runnable {
private final SelectionKey selectionKey;
private final SocketChannel socketChannel;
private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);
private final static int READ = 0;
private final static int SEND = 1;
private int status = READ;
Handler(SocketChannel socketChannel, Selector selector) throws IOException {
this.socketChannel = socketChannel; //接收客戶端連接
this.socketChannel.configureBlocking(false); //置為非阻塞模式(selector僅允非阻塞模式)
selectionKey = socketChannel.register(selector, 0); //將該客戶端注冊到selector,得到一個SelectionKey,以后的select到的就緒動作全都是由該對象進行封裝
selectionKey.attach(this); //附加處理對象,當前是Handler對象,run是對象處理業務的方法
selectionKey.interestOps(SelectionKey.OP_READ); //走到這里,說明之前Acceptor里的建連已完成,那么接下來就是讀取動作,因此這里首先將讀事件標記為“感興趣”事件
selector.wakeup(); //喚起select阻塞
}
@Override
public void run() {
try {
switch (status) {
case READ:
read();
break;
case SEND:
send();
break;
default:
}
} catch (IOException e) { //這里的異常處理是做了匯總,常出的異常就是server端還有未讀/寫完的客戶端消息,客戶端就主動斷開連接,這種情況下是不會觸發返回-1的,這樣下面read和write方法里的cancel和close就都無法觸發,這樣會導致死循環異常(read/write處理失敗,事件又未被cancel,因此會不斷的被select到,不斷的報異常)
System.err.println("read或send時發生異常!異常信息:" + e.getMessage());
selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException e2) {
System.err.println("關閉通道時發生異常!異常信息:" + e2.getMessage());
e2.printStackTrace();
}
}
}
private void read() throws IOException {
if (selectionKey.isValid()) {
readBuffer.clear();
int count = socketChannel.read(readBuffer); //read方法結束,意味着本次"讀就緒"變為"讀完畢",標記着一次就緒事件的結束
if (count > 0) {
System.out.println(String.format("收到來自 %s 的消息: %s",
socketChannel.getRemoteAddress(),
new String(readBuffer.array())));
status = SEND;
selectionKey.interestOps(SelectionKey.OP_WRITE); //注冊寫方法
} else {
//讀模式下拿到的值是-1,說明客戶端已經斷開連接,那么將對應的selectKey從selector里清除,否則下次還會select到,因為斷開連接意味着讀就緒不會變成讀完畢,也不cancel,下次select會不停收到該事件
//所以在這種場景下,(服務器程序)你需要關閉socketChannel並且取消key,最好是退出當前函數。注意,這個時候服務端要是繼續使用該socketChannel進行讀操作的話,就會拋出“遠程主機強迫關閉一個現有的連接”的IO異常。
selectionKey.cancel();
socketChannel.close();
System.out.println("read時-------連接關閉");
}
}
}
void send() throws IOException {
if (selectionKey.isValid()) {
sendBuffer.clear();
sendBuffer.put(String.format("我收到來自%s的信息辣:%s, 200ok;",
socketChannel.getRemoteAddress(),
new String(readBuffer.array())).getBytes());
sendBuffer.flip();
int count = socketChannel.write(sendBuffer); //write方法結束,意味着本次寫就緒變為寫完畢,標記着一次事件的結束
if (count < 0) {
//同上,write場景下,取到-1,也意味着客戶端斷開連接
selectionKey.cancel();
socketChannel.close();
System.out.println("send時-------連接關閉");
}
//沒斷開連接,則再次切換到讀
status = READ;
selectionKey.interestOps(SelectionKey.OP_READ);
}
}
}
細節已標注。
關鍵模塊已實現,下面來啟動服務端:
new Thread(new Reactor(2333)).start();
三、客戶端的編寫
接下來同樣利用selector編寫客戶端,客戶端需要做的事情就是發送消息到服務端,等待服務端響應,然后再次發送消息,發夠10條消息斷開連接:
3.1:Client入口模塊
public class NIOClient implements Runnable {
private Selector selector;
private SocketChannel socketChannel;
NIOClient(String ip, int port) {
try {
selector = Selector.open(); //打開一個Selector
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false); //設置為非阻塞模式
socketChannel.connect(new InetSocketAddress(ip, port)); //連接服務
//入口,最初給一個客戶端channel注冊上去的事件都是連接事件
SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_CONNECT);
//附加處理類,第一次初始化放的是連接就緒處理類
sk.attach(new Connector(socketChannel, selector));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
selector.select(); //就緒事件到達之前,阻塞
Set selected = selector.selectedKeys(); //拿到本次select獲取的就緒事件
Iterator it = selected.iterator();
while (it.hasNext()) {
//這里進行任務分發
dispatch((SelectionKey) (it.next()));
}
selected.clear();
}
} catch (IOException e) {
e.printStackTrace();
}
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable) (k.attachment()); //這里很關鍵,拿到每次selectKey里面附帶的處理對象,然后調用其run,這個對象在具體的Handler里會進行創建,初始化的附帶對象為Connector(看上面構造器)
//調用之前注冊的callback對象
if (r != null) {
r.run();
}
}
}
細節已標注。
3.2:Connector模塊(建連)
public class Connector implements Runnable {
private final Selector selector;
private final SocketChannel socketChannel;
Connector(SocketChannel socketChannel, Selector selector) {
this.socketChannel = socketChannel;
this.selector = selector;
}
@Override
public void run() {
try {
if (socketChannel.finishConnect()) { //這里連接完成(與服務端的三次握手完成)
System.out.println(String.format("已完成 %s 的連接",
socketChannel.getRemoteAddress()));
new Handler(socketChannel, selector); //連接建立完成后,接下來的動作交給Handler去處理(讀寫等)
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
細節已標注。
3.3:客戶端Handler模塊實現
public class Handler implements Runnable {
private final SelectionKey selectionKey;
private final SocketChannel socketChannel;
private ByteBuffer readBuffer = ByteBuffer.allocate(2048);
private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);
private final static int READ = 0;
private final static int SEND = 1;
private int status = SEND; //與服務端不同,默認最開始是發送數據
private AtomicInteger counter = new AtomicInteger();
Handler(SocketChannel socketChannel, Selector selector) throws IOException {
this.socketChannel = socketChannel; //接收客戶端連接
this.socketChannel.configureBlocking(false); //置為非阻塞模式(selector僅允非阻塞模式)
selectionKey = socketChannel.register(selector, 0); //將該客戶端注冊到selector,得到一個SelectionKey,以后的select到的就緒動作全都是由該對象進行封裝
selectionKey.attach(this); //附加處理對象,當前是Handler對象,run是對象處理業務的方法
selectionKey.interestOps(SelectionKey.OP_WRITE); //走到這里,說明之前Connect已完成,那么接下來就是發送數據,因此這里首先將寫事件標記為“感興趣”事件
selector.wakeup(); //喚起select阻塞
}
@Override
public void run() {
try {
switch (status) {
case SEND:
send();
break;
case READ:
read();
break;
default:
}
} catch (IOException e) { //這里的異常處理是做了匯總,同樣的,客戶端也面臨着正在與服務端進行寫/讀數據時,突然因為網絡等原因,服務端直接斷掉連接,這個時候客戶端需要關閉自己並退出程序
System.err.println("send或read時發生異常!異常信息:" + e.getMessage());
selectionKey.cancel();
try {
socketChannel.close();
} catch (IOException e2) {
System.err.println("關閉通道時發生異常!異常信息:" + e2.getMessage());
e2.printStackTrace();
}
}
}
void send() throws IOException {
if (selectionKey.isValid()) {
sendBuffer.clear();
int count = counter.incrementAndGet();
if (count <= 10) {
sendBuffer.put(String.format("客戶端發送的第%s條消息", count).getBytes());
sendBuffer.flip(); //切換到讀模式,用於讓通道讀到buffer里的數據
socketChannel.write(sendBuffer);
//則再次切換到讀,用以接收服務端的響應
status = READ;
selectionKey.interestOps(SelectionKey.OP_READ);
} else {
selectionKey.cancel();
socketChannel.close();
}
}
}
private void read() throws IOException {
if (selectionKey.isValid()) {
readBuffer.clear(); //切換成buffer的寫模式,用於讓通道將自己的內容寫入到buffer里
socketChannel.read(readBuffer);
System.out.println(String.format("收到來自服務端的消息: %s", new String(readBuffer.array())));
//收到服務端的響應后,再繼續往服務端發送數據
status = SEND;
selectionKey.interestOps(SelectionKey.OP_WRITE); //注冊寫事件
}
}
}
細節已標注。
下面啟動客戶端去連接之前的服務端:
new Thread(new NIOClient("127.0.0.1", 2333)).start();
new Thread(new NIOClient("127.0.0.1", 2333)).start();
上面模擬了兩個客戶端同時連到服務端,運行結果如下:
服務端運行結果:
圖2
客戶端運行結果如下:
圖3
單線程Reactor模型有個致命的缺點,通過上述例子可以看出,整個執行流程都是線性的,客戶端請求→服務端讀取→服務端響應→客戶端收到響應→客戶端再次發送請求,那么在這個鏈路中,如果handler中某個位置存在性能瓶頸,比如我們可以改造下服務端的send方法:
try {
Thread.sleep(2000L); //響應2s
} catch (InterruptedException e) {
e.printStackTrace();
}
int count = socketChannel.write(sendBuffer);
在響應客戶端之前睡眠2s,當做是性能瓶頸點,同樣的再次開兩個客戶端同時訪問服務端,每個客戶端發送10條消息,會發現,程序直接運行了40s,這是大多數情況下不願意看到的,因此,就有了多線程Reactor模式,跟BIO為了提高性能將讀操作放到一個獨立線程處理一樣,Reactor這樣做,也是為了解決上面提到的性能問題,只不過NIO比BIO做異步有個最大的優勢就是NIO不會阻塞一個線程,類似read這種操作狀態都是由selector負責監聽的,不像BIO里都是阻塞的,只要被異步出去,那么一定是非阻塞的業務代碼(除非是人為將代碼搞成阻塞),而BIO由於read本身阻塞,因此會阻塞掉整個線程,這也是同樣是異步為什么NIO可以更加高效的原因之一。
那么單線程Reactor適用於什么情況呢?適用於那種程序復雜度很低的系統,例如redis,其大部分操作都是非常高效的,很多命令的時間復雜度直接為O(1),這種情況下適合這種簡單的Reactor模型實現服務端。