Java NIO學習與記錄(七): Reactor單線程模型的實現


一、Selector&Channel

1.1:各種channel

寫這個模型需要提前了解Selector以及Channel,之前記錄過FileChannel,除此之外還有以下幾種Channel:

ServerSocketChannel:用於監聽新的TCP連接的通道,負責讀取&響應,通常用於服務端的實現。

SocketChannel:用於發起TCP連接,讀寫網絡中的數據,通常用於客戶端的實現。

DatagramChannel:上述兩個通道基於TCP傳輸協議,而這個通道則基於UDP,用於讀寫網絡中的數據。

FileChannel:從文件讀取數據。

本篇重點放在ServerSocketChannel和SocketChannel上,大部分客戶端/服務端為了保證數據准確性,都是基於TCP傳輸協議實現的,由於使用Selector注冊必須要求被注冊的Channel是非阻塞模式的,因此FileChannel由於沒有非阻塞模式(無法設置configureBlocking(false)),沒辦法和注冊到selector。

1.2:selector

Selector是個通道注冊器(用法會在程序里標注),是實現Reactor模型的關鍵,多個通道均可以注冊到Selector,Selector負責監聽每個Channel的幾個事件:連接就緒、寫就緒、讀就緒,當某個channel注冊感興趣就緒事件到selector時,若發生興趣事件就緒,則Selector.select()方法不再阻塞,返回興趣事件集合(可能包含多個channel的),然后按照事件不同進行分發處理。

Selector返回對應的就緒事件,封裝為SelectionKey,每個Channel對應一個SelectionKey,這個對象還可以通過attach方法附着處理類(Handler、Acceptor等)。

1.3:一個簡單的例子

先來看個簡單使用Selector做處理的服務端實現,可以簡單對Selector和SelectionKey的用法做個了解:


 public static void main(String[] args) throws IOException {

        Selector selector = Selector.open(); //打開選擇器

        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //打開通道
        serverSocketChannel.configureBlocking(false); //設置通道為非阻塞模式
        serverSocketChannel.bind(new InetSocketAddress(2333)); //綁定端口
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //注冊channel到選擇器,指定監聽該Channel的哪些事件,初始化都是對連接事件監聽(因為是入口)

        while (selector.select() > 0) { // 若收到就緒事件select返回“感興趣”事件集合,否則阻塞當前線程
            Set keys = selector.selectedKeys(); //獲取本次拿到的事件集合
            Iterator iterator = keys.iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove();
                if (key.isAcceptable()) { //當前就緒事件為連接事件
                    ServerSocketChannel skc = (ServerSocketChannel) key.channel(); //連接就緒觸發,說明已經有客戶端通道連了過來,這里需要拿服務端通道去獲取客戶端通道
                    SocketChannel socketChannel = skc.accept(); //獲取客戶端通道(連接就緒,說明客戶端接下來可能還有別的動作,比如讀和寫)
                    socketChannel.configureBlocking(false); //同樣的需要設置非阻塞模式
                    System.out.println(String.format("收到來自 %s 的連接", socketChannel.getRemoteAddress()));
                    socketChannel.register(selector, SelectionKey.OP_READ); //將該客戶端注冊到選擇器,感興趣事件設置為讀(客戶端連接完畢,很肯能會往服務端寫數據,因此這里要注冊讀事件用以接收這些數據)
                } else if (key.isReadable()) { //當前事件為讀就緒
                    SocketChannel socketChannel = (SocketChannel) key.channel(); //能觸發讀就緒,說明客戶端已經開始往服務端寫數據,通過SelectionKey拿到當前客戶端通道
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int count = socketChannel.read(buffer); //從通道讀入數據
                    if (count < 0) { //若本次讀就緒拿到-1,則認為客戶端主動斷開了連接
                        socketChannel.close(); //服務端關閉客戶端通道
                        key.cancel(); //斷連后就將該事件從選擇器的SelectionKey集合中移除(這里說一下,這里不是真正意義上的移除,這里是取消,會將該key放入取消隊列里,在下次select函數調用時才負責清空)
                        System.out.println("連接關閉");
                        continue;
                    }
                    System.out.println(String.format("收到來自 %s 的消息: %s",
                            socketChannel.getRemoteAddress(),
                            new String(buffer.array())));
                }
                keys.remove(key);
            }
        }
    }

上面是一個簡單的例子,接下來,就利用選擇器、通道來實現Reactor單線程模型。

二、單Reactor單線程模型的服務端實現

實現服務端,服務端負責接收客戶端的連接,接收客戶端的請求數據以及響應客戶端。

把上一篇的結構圖再拿過來展示下,看看需要做的有哪些模塊:

圖1

通過上圖,我們需要實現的模塊有Reactor、Acceptor、Handler,下面來逐個編寫:

2.1:Reactor核心模塊

該模塊內部包含兩個核心方法,select和dispatch,該模塊負責監聽就緒事件和對事件的分發處理:

 


public class Reactor implements Runnable {

    private final Selector selector;
    private final ServerSocketChannel serverSocketChannel;

    public Reactor(int port) throws IOException { //Reactor初始化
        selector = Selector.open(); //打開一個Selector
        serverSocketChannel = ServerSocketChannel.open(); //建立一個Server端通道
        serverSocketChannel.socket().bind(new InetSocketAddress(port)); //綁定服務端口
        serverSocketChannel.configureBlocking(false); //selector模式下,所有通道必須是非阻塞的
        //Reactor是入口,最初給一個channel注冊上去的事件都是accept
        SelectionKey sk = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        //attach callback object, Acceptor
        sk.attach(new Acceptor(serverSocketChannel, selector));
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select(); //就緒事件到達之前,阻塞
                Set selected = selector.selectedKeys(); //拿到本次select獲取的就緒事件
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    //這里進行任務分發
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment()); //這里很關鍵,拿到每次selectKey里面附帶的處理對象,然后調用其run,這個對象在具體的Handler里會進行創建,初始化的附帶對象為Acceptor(看上面構造器)
        //調用之前注冊的callback對象
        if (r != null) {
            r.run();
        }
    }
}

細節已標注。

2.2:實現Acceptor模塊

這個模塊只負責處理連接就緒事件,有了這個事件就可以拿到客戶單的SocketChannel,就可以繼續完成接下來的讀寫任務了:


public class Acceptor implements Runnable {

    private final Selector selector;

    private final ServerSocketChannel serverSocketChannel;

    Acceptor(ServerSocketChannel serverSocketChannel, Selector selector) {
        this.serverSocketChannel = serverSocketChannel;
        this.selector = selector;
    }

    @Override
    public void run() {
        SocketChannel socketChannel;
        try {
            socketChannel = serverSocketChannel.accept();
            if (socketChannel != null) {
                System.out.println(String.format("收到來自 %s 的連接",
                        socketChannel.getRemoteAddress()));
                new Handler(socketChannel, selector); //這里把客戶端通道傳給Handler,Handler負責接下來的事件處理(除了連接事件以外的事件均可)
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

細節已標注。

2.3:Handler模塊的實現

這個模塊負責接下來的讀寫操作:


public class Handler implements Runnable {

    private final SelectionKey selectionKey;
    private final SocketChannel socketChannel;

    private ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    private ByteBuffer sendBuffer = ByteBuffer.allocate(2048);

    private final static int READ = 0;
    private final static int SEND = 1;

    private int status = READ;

    Handler(SocketChannel socketChannel, Selector selector) throws IOException {
        this.socketChannel = socketChannel; //接收客戶端連接
        this.socketChannel.configureBlocking(false); //置為非阻塞模式(selector僅允非阻塞模式)
        selectionKey = socketChannel.register(selector, 0); //將該客戶端注冊到selector,得到一個SelectionKey,以后的select到的就緒動作全都是由該對象進行封裝
        selectionKey.attach(this); //附加處理對象,當前是Handler對象,run是對象處理業務的方法
        selectionKey.interestOps(SelectionKey.OP_READ); //走到這里,說明之前Acceptor里的建連已完成,那么接下來就是讀取動作,因此這里首先將讀事件標記為“感興趣”事件
        selector.wakeup(); //喚起select阻塞
    }

    @Override
    public void run() {
        try {
            switch (status) {
                case READ:
                    read();
                    break;
                case SEND:
                    send();
                    break;
                default:
            }
        } catch (IOException e) { //這里的異常處理是做了匯總,常出的異常就是server端還有未讀/寫完的客戶端消息,客戶端就主動斷開連接,這種情況下是不會觸發返回-1的,這樣下面read和write方法里的cancel和close就都無法觸發,這樣會導致死循環異常(read/write處理失敗,事件又未被cancel,因此會不斷的被select到,不斷的報異常)
            System.err.println("read或send時發生異常!異常信息:" + e.getMessage());
            selectionKey.cancel();
            try {
                socketChannel.close();
            } catch (IOException e2) {
                System.err.println("關閉通道時發生異常!異常信息:" + e2.getMessage());
                e2.printStackTrace();
            }
        }
    }

    private void read() throws IOException {
        if (selectionKey.isValid()) {
            readBuffer.clear();
            int count = socketChannel.read(readBuffer); //read方法結束,意味着本次"讀就緒"變為"讀完畢",標記着一次就緒事件的結束
            if (count > 0) {
                System.out.println(String.format("收到來自 %s 的消息: %s",
                        socketChannel.getRemoteAddress(),
                        new String(readBuffer.array())));
                status = SEND;
                selectionKey.interestOps(SelectionKey.OP_WRITE); //注冊寫方法
            } else {
                //讀模式下拿到的值是-1,說明客戶端已經斷開連接,那么將對應的selectKey從selector里清除,否則下次還會select到,因為斷開連接意味着讀就緒不會變成讀完畢,也不cancel,下次select會不停收到該事件
                //所以在這種場景下,(服務器程序)你需要關閉socketChannel並且取消key,最好是退出當前函數。注意,這個時候服務端要是繼續使用該socketChannel進行讀操作的話,就會拋出“遠程主機強迫關閉一個現有的連接”的IO異常。
                selectionKey.cancel();
                socketChannel.close();
                System.out.println("read時-------連接關閉");
            }
        }
    }

    void send() throws IOException {
        if (selectionKey.isValid()) {
            sendBuffer.clear();
            sendBuffer.put(String.format("我收到來自%s的信息辣:%s,  200ok;",
                    socketChannel.getRemoteAddress(),
                    new String(readBuffer.array())).getBytes());
            sendBuffer.flip();
            int count = socketChannel.write(sendBuffer); //write方法結束,意味着本次寫就緒變為寫完畢,標記着一次事件的結束

            if (count < 0) {
                //同上,write場景下,取到-1,也意味着客戶端斷開連接
                selectionKey.cancel();
                socketChannel.close();
                System.out.println("send時-------連接關閉");
            }

            //沒斷開連接,則再次切換到讀
            status = READ;
            selectionKey.interestOps(SelectionKey.OP_READ);
        }
    }
}

細節已標注。

關鍵模塊已實現,下面來啟動服務端:


new Thread(new Reactor(2333)).start();

三、客戶端的編寫

接下來同樣利用selector編寫客戶端,客戶端需要做的事情就是發送消息到服務端,等待服務端響應,然后再次發送消息,發夠10條消息斷開連接:

3.1:Client入口模塊


public class NIOClient implements Runnable {

    private Selector selector;

    private SocketChannel socketChannel;

    NIOClient(String ip, int port) {
        try {
            selector = Selector.open(); //打開一個Selector
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false); //設置為非阻塞模式
            socketChannel.connect(new InetSocketAddress(ip, port)); //連接服務
            //入口,最初給一個客戶端channel注冊上去的事件都是連接事件
            SelectionKey sk = socketChannel.register(selector, SelectionKey.OP_CONNECT);
            //附加處理類,第一次初始化放的是連接就緒處理類
            sk.attach(new Connector(socketChannel, selector));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void run() {
        try {
            while (!Thread.interrupted()) {
                selector.select(); //就緒事件到達之前,阻塞
                Set selected = selector.selectedKeys(); //拿到本次select獲取的就緒事件
                Iterator it = selected.iterator();
                while (it.hasNext()) {
                    //這里進行任務分發
                    dispatch((SelectionKey) (it.next()));
                }
                selected.clear();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    void dispatch(SelectionKey k) {
        Runnable r = (Runnable) (k.attachment()); //這里很關鍵,拿到每次selectKey里面附帶的處理對象,然后調用其run,這個對象在具體的Handler里會進行創建,初始化的附帶對象為Connector(看上面構造器)
        //調用之前注冊的callback對象
        if (r != null) {
            r.run();
        }
    }
}

細節已標注。

3.2:Connector模塊(建連)


public class Connector implements Runnable {

    private final Selector selector;

    private final SocketChannel socketChannel;

    Connector(SocketChannel socketChannel, Selector selector) {
        this.socketChannel = socketChannel;
        this.selector = selector;
    }

    @Override
    public void run() {
        try {
            if (socketChannel.finishConnect()) { //這里連接完成(與服務端的三次握手完成)
                System.out.println(String.format("已完成 %s 的連接",
                        socketChannel.getRemoteAddress()));
                new Handler(socketChannel, selector); //連接建立完成后,接下來的動作交給Handler去處理(讀寫等)
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

細節已標注。

3.3:客戶端Handler模塊實現

public class Handler implements Runnable {

    private final SelectionKey selectionKey;
    private final SocketChannel socketChannel;

    private ByteBuffer readBuffer = ByteBuffer.allocate(2048);
    private ByteBuffer sendBuffer = ByteBuffer.allocate(1024);

    private final static int READ = 0;
    private final static int SEND = 1;

    private int status = SEND; //與服務端不同,默認最開始是發送數據

    private AtomicInteger counter = new AtomicInteger();

    Handler(SocketChannel socketChannel, Selector selector) throws IOException {
        this.socketChannel = socketChannel; //接收客戶端連接
        this.socketChannel.configureBlocking(false); //置為非阻塞模式(selector僅允非阻塞模式)
        selectionKey = socketChannel.register(selector, 0); //將該客戶端注冊到selector,得到一個SelectionKey,以后的select到的就緒動作全都是由該對象進行封裝
        selectionKey.attach(this); //附加處理對象,當前是Handler對象,run是對象處理業務的方法
        selectionKey.interestOps(SelectionKey.OP_WRITE); //走到這里,說明之前Connect已完成,那么接下來就是發送數據,因此這里首先將寫事件標記為“感興趣”事件
        selector.wakeup(); //喚起select阻塞
    }

    @Override
    public void run() {
        try {
            switch (status) {
                case SEND:
                    send();
                    break;
                case READ:
                    read();
                    break;
                default:
            }
        } catch (IOException e) { //這里的異常處理是做了匯總,同樣的,客戶端也面臨着正在與服務端進行寫/讀數據時,突然因為網絡等原因,服務端直接斷掉連接,這個時候客戶端需要關閉自己並退出程序
            System.err.println("send或read時發生異常!異常信息:" + e.getMessage());
            selectionKey.cancel();
            try {
                socketChannel.close();
            } catch (IOException e2) {
                System.err.println("關閉通道時發生異常!異常信息:" + e2.getMessage());
                e2.printStackTrace();
            }
        }
    }

    void send() throws IOException {
        if (selectionKey.isValid()) {
            sendBuffer.clear();
            int count = counter.incrementAndGet();
            if (count <= 10) {
                sendBuffer.put(String.format("客戶端發送的第%s條消息", count).getBytes());
                sendBuffer.flip(); //切換到讀模式,用於讓通道讀到buffer里的數據
                socketChannel.write(sendBuffer);

                //則再次切換到讀,用以接收服務端的響應
                status = READ;
                selectionKey.interestOps(SelectionKey.OP_READ);
            } else {
                selectionKey.cancel();
                socketChannel.close();
            }
        }
    }

    private void read() throws IOException {
        if (selectionKey.isValid()) {
            readBuffer.clear(); //切換成buffer的寫模式,用於讓通道將自己的內容寫入到buffer里
            socketChannel.read(readBuffer);
            System.out.println(String.format("收到來自服務端的消息: %s", new String(readBuffer.array())));

            //收到服務端的響應后,再繼續往服務端發送數據
            status = SEND;
            selectionKey.interestOps(SelectionKey.OP_WRITE); //注冊寫事件
        }
    }
}

細節已標注。

 

下面啟動客戶端去連接之前的服務端:


new Thread(new NIOClient("127.0.0.1", 2333)).start();
new Thread(new NIOClient("127.0.0.1", 2333)).start();

上面模擬了兩個客戶端同時連到服務端,運行結果如下:

服務端運行結果:

圖2

 

客戶端運行結果如下:

圖3

單線程Reactor模型有個致命的缺點,通過上述例子可以看出,整個執行流程都是線性的,客戶端請求→服務端讀取→服務端響應→客戶端收到響應→客戶端再次發送請求,那么在這個鏈路中,如果handler中某個位置存在性能瓶頸,比如我們可以改造下服務端的send方法:


try {
   Thread.sleep(2000L); //響應2s
} catch (InterruptedException e) {
   e.printStackTrace();
}

int count = socketChannel.write(sendBuffer);

在響應客戶端之前睡眠2s,當做是性能瓶頸點,同樣的再次開兩個客戶端同時訪問服務端,每個客戶端發送10條消息,會發現,程序直接運行了40s,這是大多數情況下不願意看到的,因此,就有了多線程Reactor模式,跟BIO為了提高性能將讀操作放到一個獨立線程處理一樣,Reactor這樣做,也是為了解決上面提到的性能問題,只不過NIO比BIO做異步有個最大的優勢就是NIO不會阻塞一個線程,類似read這種操作狀態都是由selector負責監聽的,不像BIO里都是阻塞的,只要被異步出去,那么一定是非阻塞的業務代碼(除非是人為將代碼搞成阻塞),而BIO由於read本身阻塞,因此會阻塞掉整個線程,這也是同樣是異步為什么NIO可以更加高效的原因之一。

那么單線程Reactor適用於什么情況呢?適用於那種程序復雜度很低的系統,例如redis,其大部分操作都是非常高效的,很多命令的時間復雜度直接為O(1),這種情況下適合這種簡單的Reactor模型實現服務端。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM