也談Reactor模式


何謂Reactor模式?它是實現高性能IO的一種設計模式。網上資料有很多,有些寫的也很好,但大多不知其所以然。這里博主按自己的思路簡單介紹下,有不對的地方敬請指正。


BIO

Java1.4(2002年)以前,IO都是Blocking的,也就是常說的BIO,它在等待請求、讀、寫(返回)三個環節都是阻塞的。在等待請求階段,系統無法知道請求何時到達,因此需要一個主線程一直守着,當有請求進來時,將請求分發給讀寫線程。如圖:

代碼如下:

    ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//線程池
    ServerSocket serverSocket = new ServerSocket();
    serverSocket.bind(8088); 
    while(!Thread.currentThread.isInturrupted()){//主線程死循環等待新連接到來        
        Socket socket = serverSocket.accept();
        executor.submit(new ConnectIOnHandler(socket));//為新的連接創建新的線程 
    }
class ConnectIOnHandler extends Thread{ private Socket socket; public ConnectIOnHandler(Socket socket){ this.socket = socket; } public void run(){ while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){//死循環處理讀寫事件 String someThing = socket.read()....//讀取數據 if(someThing!=null){
           ......//處理數據
           
socket.write()....//寫數據 } } }

需知,請求進來(accept),並不表示數據馬上達到了,可能隔一段時間才會傳進來,這個時候socket.read()也是一直阻塞的狀態。socket.write()也同理,當向磁盤或其它socket寫數據時,也要等對方准備好才能寫入,在對方准備階段,socket.write()也是阻塞的。這兩個環節可能的無效阻塞導致讀寫線程的低效。


NIO

Java1.4開始,引入了NIO。NIO有三個概念:Selector、Buffer、Channel。與BIO的區別是,請求進來后,並不會馬上分派IO線程,而是依靠操作系統底層的多路復用機制(select/poll/epoll等),在監聽到socket讀寫就緒之后,再分配IO線程(實際可由當前線程[使用Buffer和Channel]直接讀寫,因為讀寫本身的效率很高),這就避免了線程等待。且與BIO多線程方式相比,使用I/O多路復用技術,系統不必創建和維護龐大的線程池,從而大大減小了開銷。這部分工作是NIO的核心,由Selector負責,本質上是多路復用的Java封裝。而Buffer和Channel又封裝了一層socket的讀寫,應該為的是將IO與業務代碼徹底分離。以下圖示為本人理解:

如圖示,與BIO中監聽線程職責不同,Selector監聽的不只是連接請求,還有讀寫就緒事件,當某個事件發生時,即通知注冊了該事件的Channel,由Channel操作socket讀寫Buffer。虛線表示需要具體的NIO框架或業務代碼自己處理,比如Channel如何注冊以及注冊何種事件,Channel處理IO的方式(如在當前線程處理還是新開線程,若新開線程,則可看作是AIO模式)等。NIO只是提供了一套機制,具體使用還是需要編程實現(Reactor模式就是OO的一種實現)。

示例代碼(摘自Java NIO詳解

服務端:

 1 package cn.blog.test.NioTest;
 2 
 3 
 4 import java.io.IOException;
 5 import java.net.InetSocketAddress;
 6 import java.nio.ByteBuffer;
 7 import java.nio.channels.*;
 8 import java.nio.charset.Charset;
 9 import java.util.Iterator;
10 import java.util.Set;
11 
12 
13 public class MyNioServer {
14     private Selector selector;          //創建一個選擇器
15     private final static int port = 8686;
16     private final static int BUF_SIZE = 10240;
17 
18     private void initServer() throws IOException {
19         //創建通道管理器對象selector
20         this.selector=Selector.open();
21 
22         //創建一個通道對象channel
23         ServerSocketChannel channel = ServerSocketChannel.open();
24         channel.configureBlocking(false);       //將通道設置為非阻塞
25         channel.socket().bind(new InetSocketAddress(port));       //將通道綁定在8686端口
26 
27         //將上述的通道管理器和通道綁定,並為該通道注冊OP_ACCEPT事件
28         //注冊事件后,當該事件到達時,selector.select()會返回(一個key),如果該事件沒到達selector.select()會一直阻塞
29         SelectionKey selectionKey = channel.register(selector,SelectionKey.OP_ACCEPT);
30 
31         while (true){       //輪詢
32             selector.select();          //這是一個阻塞方法,一直等待直到有數據可讀,返回值是key的數量(可以有多個)
33             Set keys = selector.selectedKeys();         //如果channel有數據了,將生成的key訪入keys集合中
34             Iterator iterator = keys.iterator();        //得到這個keys集合的迭代器
35             while (iterator.hasNext()){             //使用迭代器遍歷集合
36                 SelectionKey key = (SelectionKey) iterator.next();       //得到集合中的一個key實例
37                 iterator.remove();          //拿到當前key實例之后記得在迭代器中將這個元素刪除,非常重要,否則會出錯
38                 if (key.isAcceptable()){         //判斷當前key所代表的channel是否在Acceptable狀態,如果是就進行接收
39                     doAccept(key);
40                 }else if (key.isReadable()){
41                     doRead(key);
42                 }else if (key.isWritable() && key.isValid()){
43                     doWrite(key);
44                 }else if (key.isConnectable()){
45                     System.out.println("連接成功!");
46                 }
47             }
48         }
49     }
50 
51     public void doAccept(SelectionKey key) throws IOException {
52         ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
53         System.out.println("ServerSocketChannel正在循環監聽");
54         SocketChannel clientChannel = serverChannel.accept();
55         clientChannel.configureBlocking(false);
56         clientChannel.register(key.selector(),SelectionKey.OP_READ);
57     }
58 
59     public void doRead(SelectionKey key) throws IOException {
60         SocketChannel clientChannel = (SocketChannel) key.channel();
61         ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
62         long bytesRead = clientChannel.read(byteBuffer);
63         while (bytesRead>0){
64             byteBuffer.flip();
65             byte[] data = byteBuffer.array();
66             String info = new String(data).trim();
67             System.out.println("從客戶端發送過來的消息是:"+info);
68             byteBuffer.clear();
69             bytesRead = clientChannel.read(byteBuffer);
70         }
71         if (bytesRead==-1){
72             clientChannel.close();
73         }
74     }
75 
76     public void doWrite(SelectionKey key) throws IOException {
77         ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
78         byteBuffer.flip();
79         SocketChannel clientChannel = (SocketChannel) key.channel();
80         while (byteBuffer.hasRemaining()){
81             clientChannel.write(byteBuffer);
82         }
83         byteBuffer.compact();
84     }
85 
86     public static void main(String[] args) throws IOException {
87         MyNioServer myNioServer = new MyNioServer();
88         myNioServer.initServer();
89     }
90 }
View Code

客戶端:

 1 package cn.blog.test.NioTest;
 2 
 3 
 4 import java.io.IOException;
 5 import java.net.InetSocketAddress;
 6 import java.nio.ByteBuffer;
 7 import java.nio.channels.SelectionKey;
 8 import java.nio.channels.Selector;
 9 import java.nio.channels.SocketChannel;
10 import java.util.Iterator;
11 
12 public class MyNioClient {
13     private Selector selector;          //創建一個選擇器
14     private final static int port = 8686;
15     private final static int BUF_SIZE = 10240;
16     private static ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
17 
18     private void  initClient() throws IOException {
19         this.selector = Selector.open();
20         SocketChannel clientChannel = SocketChannel.open();
21         clientChannel.configureBlocking(false);
22         clientChannel.connect(new InetSocketAddress(port));
23         clientChannel.register(selector, SelectionKey.OP_CONNECT);
24         while (true){
25             selector.select();
26             Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
27             while (iterator.hasNext()){
28                 SelectionKey key = iterator.next();
29                 iterator.remove();
30                 if (key.isConnectable()){
31                     doConnect(key);
32                 }else if (key.isReadable()){
33                     doRead(key);
34                 }
35             }
36         }
37     }
38 
39     public void doConnect(SelectionKey key) throws IOException {
40         SocketChannel clientChannel = (SocketChannel) key.channel();
41         if (clientChannel.isConnectionPending()){
42             clientChannel.finishConnect();
43         }
44         clientChannel.configureBlocking(false);
45         String info = "服務端你好!!";
46         byteBuffer.clear();
47         byteBuffer.put(info.getBytes("UTF-8"));
48         byteBuffer.flip();
49         clientChannel.write(byteBuffer);
50         //clientChannel.register(key.selector(),SelectionKey.OP_READ);
51         clientChannel.close();
52     }
53 
54     public void doRead(SelectionKey key) throws IOException {
55         SocketChannel clientChannel = (SocketChannel) key.channel();
56         clientChannel.read(byteBuffer);
57         byte[] data = byteBuffer.array();
58         String msg = new String(data).trim();
59         System.out.println("服務端發送消息:"+msg);
60         clientChannel.close();
61         key.selector().close();
62     }
63 
64     public static void main(String[] args) throws IOException {
65         MyNioClient myNioClient = new MyNioClient();
66         myNioClient.initClient();
67     }
68 }
View Code

在早期的JDK1.4和1.5 update10版本之前,Selector基於select/poll模型實現,是基於IO復用技術的非阻塞IO,不是異步IO。在JDK1.5 update10和linux core2.6以上版本,sun優化了Selctor的實現,底層使用epoll替換了select/poll。另據說Buffer指向的並非堆內內存,NIO使用 Native 函數庫直接分配堆外內存,然后通過一個存儲在 Java 堆的 DirectByteBuffer 對象作為這塊內存的引用進行操作,避免了在 Java 堆和 Native 堆中來回復制數據。

NIO的實現解析可參看:深入淺出NIO Socket實現機制


Reactor模式

NIO為實現Reactor模式提供了基礎,上面的NIO圖示其實就是Reactor模式的雛形,只是Reactor以OO的方式抽象出了幾個概念,使得職責划分更加明確。

  • Reactor:Reactor是IO事件的派發者,對應NIO的Selector;
  • Acceptor:Acceptor接受client連接,建立對應client的Handler,並向Reactor注冊此Handler,對應NIO中注冊Channel和事件觸發時的判斷分支(上述NIO服務端示例代碼的38-46行);
  • Handler:IO處理類,對應NIO中Channel[使用socket]操作Buffer的過程。

基於上述三個角色畫出Reactor模式圖如下:

如此,Reactor模式便非常清晰地展現在我們眼前。那么業務線程如何與Reactor交互呢?由前文所知,數據存取於Buffer,具體操作由Handler負責。socket.read()將數據讀入Buffer,需要一種機制將Buffer引用推送給業務線程;同樣,業務線程返回的數據需要寫入Buffer,按Reactor模式,寫入后還需要注冊write事件,socket可寫后write()。如果直接調用的話,至少Handler和業務代碼會耦合在一起,常見的解耦方式是定義接口,或使用消息中間件。


其它

話說回來,由於相對短暫的歷史以及相對封閉的環境,.Net社區缺少很多概念的演化、探究和討論,這也導致了.Neter們這些概念的缺失。雖然從語言層面上來說,C#和Java大同小異,前者甚至一定程度的有語法上的便利,然而只有認識到了其背后的思想和模式,才能真正用好這門語言,這就是.Neter需要了解Java及其歷史的原因,畢竟.Net一開始就是參照着Java來的。

比如.Net里的堆棧概念,就算一些經典書籍都沒有非常深入的說明,而Java方面的資料就很多了,參看深入理解JVM—JVM內存模型

 

其它參考資料:

NIO淺析

深入理解Java NIO

網絡通信socket連接數上限

 

轉載請注明本文出處:https://www.cnblogs.com/newton/p/9776821.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2026 CODEPRJ.COM