1.前言
上章提到過Java的NIO采取的是多路IO復用模式,其衍生出來的模型就是Reactor模型。多路IO復用有兩種方式,一種是select/poll,另一種是epoll。在windows系統上使用的是select/poll方式,在linux上使用的是epoll方式,主要是由於DefaultSelectorProvider具體選擇的selector決定。epoll是在linux2.6之后才支持的,select的方式時間復雜度為O(N),最大fd限制是1024。epoll沒有數量限制,時間復雜度是O(1)。
再溫習一遍多路IO復用的基本思路,阻塞發生在select上面,select管理所有注冊在其上的socket請求,socket准備完全就會交由用戶程序處理。下面結合java的nio例子,來更細致的講解一下這種模式,強化理解一下。要寫出java的nio不難但要完全正確絕不容易,相關概念不清楚就會產生難以理解的bug,這里有一些相關的陷阱。
另外說明一下,這個例子不一定完全正確,用於演示足夠了。對於Java的NIO而言,有幾個概念比較重要,這里先提兩個channel和buffer。不管是客戶端發送服務端接收,還是服務端發送客戶端接收,基本的流程都是:發送方發送數據->buffer->發送方channel->接收方channel->buffer->接收方接收數據。
2.具體實現
2.1 服務端
對於服務端而言首先需要的就是確定監聽的端口,其次是與之對應的channel,而后就是selector,最后還需要一個線程池。為什么會需要線程池呢?道理很簡單,select模式獲取了所有channel的change,對於服務端而言,change的可能有非常多的客戶端channel,而用戶程序只有一個線程,如果這么多個channel一個個順序執行,如果有耗時嚴重的操作,那么后果是非常糟糕的,所有客戶端都會延時處理,這也是多路IO復用的一個糟糕點。線程池就是為每個客戶端分配一個線程去處理,減緩這種情況的后果。Server的基本四個內容就出來了:
private int port; private Selector selector; private ServerSocketChannel serverSocketChannel; private ExecutorService executorService;
接下來就是初始化服務端。初始化的步驟也是一般化:1.初始化連接池;2.初始化Selector;3.初始化綁定端口;4.將socket注冊到select上。大致步驟就是這些,但是還有些額外的細節。具體代碼如下:
1. executorService = Executors.newCachedThreadPool(); 2. selector = Selector.open(); 3. serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); 4. serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
這里的一個細節就是socket必須是非阻塞模式。初始化完成之后就是正式的邏輯了,再來回憶一下多路IO復用的邏輯,管理多個IO的change事件,阻塞在select上,如果有change事件,select就能繼續執行下去,選出change了的IO,只對這部分IO進行操作。這段描述就下面這段簡單的代碼了:
int event = selector.select(); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); } }
這里就是調用selector.select()方法進行阻塞,如果change事件不為0(這個判斷應該去掉好點),獲取當前所有change事件。遍歷處理,移除該事件。不移除,下次該事件依舊存在,相當於認為是沒處理,會出現多次觸發錯誤。
下面詳細介紹一下事件的類型,Java定義了4種類型:
1.針對服務端的ACCEPT事件,接收到客戶端的連接請求;
2.針對客戶端的CONNECT事件,發起對服務端的連接請求;
3.針對獲取對端發送的數據的READ事件;
4.針對請求發送數據給對端時准備好了緩沖區的WRITE事件。
其中WRITE事件一般不進行使用,因為大部分情況緩沖區都是空閑的,會立刻觸發該事件,這個浪費CPU的性能,還會造成bug。下面代碼就是server端處理的一個基本邏輯,也是有些要注意的點。
if(key.isValid() && key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); } else if(key.isValid() && key.isReadable()) { key.interestOps(0); executorService.execute(new Task(key)); }
服務端的事件就3個,write事件不用管,所以只需要關注accept和read事件。有請求進來,就接收這個請求,設置成非阻塞式,再注冊到selector中,監聽該請求的讀事件。讀事件到來,先將監聽的時間改成無,這里是因為異步執行,可能沒有讀完數據,再次觸發了該channel的讀事件,重復讀取,造成問題。Task就是一個runnabel任務,處理讀取,發送應答,這里還需要重新將監聽的事件改成讀事件,即處理完了本次內容,等待下次內容。
Task的具體內容如下:
SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; try { while((size = channel.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } if(baos.size() == 0) { key.cancel(); } else { // 協議解析 String msg = new String(baos.toByteArray(), "UTF-8"); // 返回該數據 String reply = "get client msg: " + msg; ByteBuffer re = ByteBuffer.wrap(reply.getBytes()); while(re.hasRemaining()) { channel.write(re); } // 處理完畢后后設置成讀狀態,繼續獲取相關數據 key.interestOps(SelectionKey.OP_READ); key.selector().wakeup(); } } catch (Exception e) { key.cancel(); // 異常連接中斷 }
這里的邏輯就是使用buffer將數據取出來了。取出為0,或者拋出異常,意味着客戶端斷開了連接,直接取消掉該channel的管理。回寫了一個數據。之后就是將事件監聽設置回監聽讀取事件,最后一步需要wakeup一下。wakeup是為了喚醒一下select,原因如下:這個是由於前面先將監聽的事件改成了0,后面才改回了read事件。不管是怎么修改,都不是立刻生效的,需要下次select事件觸發才能生效,問題也只會出在多線程中。試想一下下面這個過程:
1.A通道有數據了,A先置為0了,開始讀取數據,因為是異步的,所以又走到了select阻塞了;
2.B連接進來,觸發的select方法,這時A的0才正式生效,這也是我們想要的,因為A之前的數據還在處理,並不是新的數據到來,不需要再次觸發讀操作。這里先置為0的動作是正確的。
3.此時主線程又走到了select方法阻塞了,注意,此時A生效的是0,A結束此次讀操作,等待下次讀事件。問題就出在這里,如果不觸發一下select方法,此時A即使有新的讀事件,其也不會觸發,因為重置為read並沒有生效,要等select觸發才能生效。這就相當於A沒接到消息了,如果B有讀事件,觸發了select方法,則A才能接到消息。wakeup在這里必須添加的目的就是強制觸發一下select,使A更新回read事件,而不是不關系任何事件。
實際上觸發沒有這么麻煩,在客戶端還會說到這個問題,有更簡單的觸發方法。
上面的代碼也可以看出nio都是基於buffer操作的。buffer也有很多陷阱,使用正確不容易。下面給出一個我的完整例子,可以運行試試,不保證沒bug。了解了上面的知識,測出bug調試應該也不難。
import java.io.ByteArrayOutputStream; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class NioServer { private int port; private Selector selector; private ServerSocketChannel serverSocketChannel; private ExecutorService executorService; public NioServer(int port) { this.port = port; } public void open() { this.executorService = Executors.newCachedThreadPool(); try { this.selector = Selector.open(); this.serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); serverSocketChannel.socket().bind(new InetSocketAddress(port)); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("server端啟動..."); for(;;) { System.out.println("======>select的keys數量:" + selector.keys().size()); int event = selector.select(); System.out.println("======>select的keys數量:" + selector.keys().size() + ", change事件數量:" + event); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); // System.out.println("======>真實未處理的change事件數量:" + keys.size()); while(it.hasNext()) { SelectionKey key = it.next(); it.remove(); // 移除這個key if(key.isValid() && key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); System.out.println("===>獲取client連接,准備讀取數據:"+ client.socket().getRemoteSocketAddress()); } else if(key.isValid() && key.isReadable()) { // 先置為0,防止異步線程未處理完該事件又被select // key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); key.interestOps(0); executorService.execute(new Task(key)); } else { System.out.println("其它事件:" + key.interestOps()); } } } } } catch (Exception e) { e.printStackTrace(); } } private class Task implements Runnable { private SelectionKey key; public Task(SelectionKey key) { this.key = key; } @Override public void run() { SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; try { // System.out.println("===>開始讀取數據"); while((size = channel.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } if(baos.size() == 0) { key.cancel(); System.out.println("======<client斷開連接:"+ channel.socket().getRemoteSocketAddress()); } else { // 協議解析 String msg = new String(baos.toByteArray(), "UTF-8"); System.out.println("===>獲取client數據: " + msg); // 返回該數據 String reply = "get client msg: " + msg; ByteBuffer re = ByteBuffer.wrap(reply.getBytes()); while(re.hasRemaining()) { channel.write(re); } // 處理完畢后后設置成讀狀態,繼續獲取相關數據 // key.interestOps((key.interestOps() | SelectionKey.OP_READ)); key.interestOps(SelectionKey.OP_READ); key.selector().wakeup(); System.out.println("===<返回server的獲取結果"); } } catch (Exception e) { key.cancel(); // 異常連接中斷 System.out.println("======<異常client斷開連接:"+ channel.socket().getRemoteSocketAddress()); } } } public static void main(String[] args) { NioServer nioServer = new NioServer(7777); nioServer.open(); } }
2.2 客戶端
第一節說過,在單個連接的時候,多路IO復用方式甚至沒有阻塞式IO性能好,多路IO復用是針對了多個IO操作。這里還是給出客戶端的NIO寫法。同樣的,客戶端需要上面的內容,不包括線程池,我們只處理一個客戶端連接。需要增加的一個字段就是服務端地址,所以總共也是4個內容:服務端地址、端口、連接通道、select。
private String host; private int port; private SocketChannel socketChannel; private Selector selector;
初始化也是基本操作:1.獲取select;2.建立連接;3.注冊到select
1. selector = Selector.open(); 2. socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.socket().setTcpNoDelay(true); socketChannel.connect(new InetSocketAddress(host, port)); 3. socketChannel.register(selector, SelectionKey.OP_CONNECT);
這里要注意的也就是要以非阻塞式的方式進行。后面的步驟也一樣,進行select,獲取change事件,根據不同的事件處理不同。write事件不使用,客戶端關注的就connect和read事件了。
int event = selector.select(); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if(sk.isValid() && sk.isConnectable()) { if(socketChannel.isConnectionPending()) { if(socketChannel.finishConnect()) { sk.interestOps(SelectionKey.OP_READ); } else { sk.cancel(); } } } else if(sk.isValid() && sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; while((size = sc.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } } } }
這里要注意的是connect並沒有真正連上,要觸發了connect事件,執行finishConnect才會連接成功。連接成功后更新成read事件。這里會有一個疑惑,server端的時候intersetOps設置成0或者read不是直接生效,要select執行后才能生效,為什么這邊connect設置成read事件就能直接改過來???...這是一個思維陷阱:不是要執行后才能改變狀態,而是select認准的狀態是select操作之前一瞬間的狀態。server端的例子,哪怕不需要兩個線程,單個線程也能觸發,只要是異步操作。主線程先接收到A的讀取操作,設置A成0,然后又進行select了,此一瞬間A的狀態是0,后面A處理完后,再來一條消息就沒用了,因為此時select阻塞時檢測的狀態是0,后續改過來也沒用,所以才需要wakeup一下,讓其認識到其狀態應該修改后的read。而上述例子為什么不需要,就是因為這是一個同步的過程,此次connect事件,下次再select的時候一定變成了read。
其他的也沒有什么值得一提的了,下面是客戶端的完整代碼。
import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class NioClient { private String host; private int port; private SocketChannel socketChannel; private Selector selector; public NioClient(String host, int port) { this.host = host; this.port = port; } public void open() { try { selector = Selector.open(); socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.socket().setTcpNoDelay(true); socketChannel.connect(new InetSocketAddress(host, port)); socketChannel.register(selector, SelectionKey.OP_CONNECT); System.out.println("client端啟動..."); for(;;) { System.out.println("======>select的keys數量:" + selector.keys().size()); int event = selector.select(); System.out.println("======>select的keys數量:" + selector.keys().size() + ", change事件數量:" + event); if(event != 0) { Set<SelectionKey> keys = selector.selectedKeys(); Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()) { SelectionKey sk = it.next(); it.remove(); if(sk.isValid() && sk.isConnectable()) { if(socketChannel.isConnectionPending()) { if(socketChannel.finishConnect()) { sk.interestOps(SelectionKey.OP_READ); System.out.println("連接上遠程服務器:" + socketChannel.getRemoteAddress()); } else { sk.cancel(); System.out.println("連接未建立..."); } } } else if(sk.isValid() && sk.isReadable()) { SocketChannel sc = (SocketChannel) sk.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int size = -1; while((size = sc.read(buffer)) > 0) { buffer.flip(); baos.write(buffer.array(), 0, size); buffer.clear(); } System.out.println("接收服務器消息:" + new String(baos.toByteArray(), "UTF-8")); } else { System.out.println("其它事件:" + sk.interestOps()); } } } } } catch (IOException e) { e.printStackTrace(); } } public void close() { try { socketChannel.close(); } catch (IOException e) { e.printStackTrace(); } } public void send(String msg) { byte[] b = msg.getBytes(); ByteBuffer buffer = ByteBuffer.wrap(b); try { while (buffer.hasRemaining()) { socketChannel.write(buffer); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) { NioClient client = new NioClient("127.0.0.1", 7777); Thread thread = new Thread(new Runnable() { @Override public void run() { client.open(); } }); thread.setDaemon(true); thread.start(); Scanner scanner = new Scanner(System.in); while(scanner.hasNext()) { String msg = scanner.nextLine(); if("close".equals(msg)) { client.close(); System.out.println("退出成功"); break; } else { client.send(msg); } } } }
3.總結
此章結合java nio的實際demo加強一下對多路IO復用的理解,理解Java的nio基本流程,對於理解后面的netty設計的結構有很大的幫助。