多路復用IO與NIO


最近在學習NIO相關知識,發現需要掌握的知識點非常多,當做筆記記錄就下。

在學NIO之前得先去了解IO模型

(1)同步阻塞IO(Blocking IO):即傳統的IO模型。

(2)同步非阻塞IO(Non-blocking IO):默認創建的socket都是阻塞的,非阻塞IO要求socket被設置為NONBLOCK。注意這里所說的NIO並非Java的NIO(New IO)庫

(3)多路復用IO(IO Multiplexing):即經典的Reactor設計模式,有時也稱為異步阻塞IO,Java中的Selector和Linux中的epoll都是這種模型

(4)異步IO(Asynchronous IO):即經典的Proactor設計模式,也稱為異步非阻塞IO。

這里重點介紹多路復用IO模型JAVA NIO就是采用此模式

  在多路復用IO模型中,會有一個線程(Java中的Selector)不斷去輪詢多個socket的狀態,只有當socket真正有讀寫事件時,才真正調用實際的IO讀寫操作。因為在多路復用IO模型中,只需要使用一個線程就可以管理多個socket,系統不需要建立新的進程或者線程,也不必維護這些線程和進程,並且只有在真正有socket讀寫事件進行時,才會使用IO資源,所以它大大減少了資源占用。

IO多路復用模型使用了Reactor設計模式實現了這一機制。Reactor模式有三種實現方式:

Reactor單線程

 

 

 

 

 

每個客戶端發起連接請求都會交給acceptor,acceptor根據事件類型交給線程handler處理,注意acceptor 處理和 handler 處理都在一個線程中處理,所以其中某個 handler 阻塞時, 會導致其他所有的 client 的 handler 都得不到執行, 並且更嚴重的是, handler 的阻塞也會導致整個服務不能接收新的 client 請求(因為 acceptor 也被阻塞了). 因為有這么多的缺陷, 因此單線程Reactor 模型用的比較少.

Reactor多線程模式

 

 

有專門一個線程, 即 Acceptor 線程用於監聽客戶端的TCP連接請求.

 

 

客戶端連接的 IO 操作都是由一個特定的 NIO 線程池負責. 每個客戶端連接都與一個特定的 NIO 線程綁定, 因此在這個客戶端連接中的所有 IO 操作都是在同一個線程中完成的.

 

 

客戶端連接有很多, 但是 NIO 線程數是比較少的, 因此一個 NIO 線程可以同時綁定到多個客戶端連接中.

 

 

缺點:如果我們的服務器需要同時處理大量的客戶端連接請求或我們需要在客戶端連接時, 進行一些權限的檢查, 那么單線程的 Acceptor 很有可能就處理不過來, 造成了大量的客戶端不能連接到服務器.

 

Reactor主從模式

Reactor 的主從多線程模型和 Reactor 多線程模型很類似, 只不過 Reactor 的主從多線程模型的 acceptor 使用了線程池來處理大量的客戶端請求.

NIO代碼層面是如何實現這三種模式呢?

acceptor :也可以理解為一個Handler,這個Handler只負責創建具體處理IO請求的Handler(負責所有client的連接請求),如果Reactor廣播時SelectionKey創建一個Handler負責綁定相應的SocketChannel到Selector中。下次再次有IO事件時會調用對應的Handler去處理

/**
 * 單獨一個線程去處理鏈接請求
 * Created by zhangwentao on 2018/4/12.
 */
public class Acceptor implements Runnable{
    Reactor reactor;
    public Acceptor(Reactor reactor){
        this.reactor=reactor;
    }

    public void run() {
        try {
            //監聽TCP鏈接請求
            SocketChannel socketChannel=reactor.serverSocketChannel.accept();
            if(socketChannel!=null)//調用Handler來處理channel
                new SocketReadHandler(reactor.selector, socketChannel);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

Reactor 的作用 :給ServerSocketChannel設置一個Acceptor,接收請求,給每一個一個SocketChannel(代表一個Client)關聯一個Handler 要注意其實Acceptor也是一個Handler(只是與它關聯的channel是ServerSocketChannel而不是SocketChannel) 

代碼如下

/**
 * Reactor模式有助於理解netty
 * Created by zhangwentao on 2018/4/12.
 */
public class Reactor implements Runnable {
    public final Selector selector;
    public final ServerSocketChannel serverSocketChannel;


    public Reactor(int port) throws IOException {
        //用於監控fds
        selector=Selector.open();
        //socket服務器的chanel
        serverSocketChannel=ServerSocketChannel.open();

        InetSocketAddress inetSocketAddress=new InetSocketAddress(InetAddress.getLocalHost(),port);
        //
        serverSocketChannel.socket().bind(inetSocketAddress);
        //不設置阻塞隊列
        serverSocketChannel.configureBlocking(false);

        //向selector注冊該channel 返回selectionKey
        SelectionKey selectionKey=serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        //利用selectionKey的attache功能綁定Acceptor 如果有事情,觸發Acceptor
        selectionKey.attach(new Acceptor(this));
    }


    public void run() {
        try {
            while(!Thread.interrupted()){
                selector.select();//selector 阻塞
                Set<SelectionKey> selectionKeys= selector.selectedKeys();
                Iterator<SelectionKey> it=selectionKeys.iterator();
                //Selector如果發現channel有OP_ACCEPT或READ事件發生,下列遍歷就會進行。
                while(it.hasNext()){
                    //來一個事件 第一次觸發一個accepter線程
                    //以后觸發SocketReadHandler
                    SelectionKey selectionKey=it.next();
                    dispatch(selectionKey);
                    selectionKeys.clear();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    /**
     * 運行Acceptor或SocketReadHandler
     * @param key
     */
    void dispatch(SelectionKey key) {
        Runnable r = (Runnable)(key.attachment());
        if (r != null){
            r.run();
        }
    }
}

hanler線程是具體的事件處理者,例如ReadHandler、SendHandler,ReadHandler負責讀取緩存中的數據,然后再調用一個工作處理線程去處理讀取到的數據。具體為一個SocketChannel,Acceptor初始化該Handler時會將SocketChannel注冊到Reactor的Selector中,同時將SelectionKey綁定該Handler,這樣下次就會調用本Handler。代碼如下

**
 * Created by zhangwentao on 2018/4/12.
 */
public class SocketReadHandler implements Runnable {
    private SocketChannel socketChannel;
    public SocketReadHandler(Selector selector, SocketChannel socketChannel) throws  IOException{
        this.socketChannel=socketChannel;
        socketChannel.configureBlocking(false);

        SelectionKey selectionKey=socketChannel.register(selector, 0);

        //將SelectionKey綁定為本Handler 下一步有事件觸發時,將調用本類的run方法。
        //參看dispatch(SelectionKey key)
        selectionKey.attach(this);

        //同時將SelectionKey標記為可讀,以便讀取。
        selectionKey.interestOps(SelectionKey.OP_READ);
        selector.wakeup();
    }

    /**
     * 處理讀取數據
     */
    public void run() {
        ByteBuffer inputBuffer=ByteBuffer.allocate(1024);
        inputBuffer.clear();
        try {
            socketChannel.read(inputBuffer);
            //激活線程池 處理這些request
            //requestHandle(new Request(socket,btt));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

上述就是用原生的NIO實現reactor模式,我們發現還是有些繁瑣的(多線程都沒有寫進去)

用netty可以很方便的實現三種方式,單線程模式:

 Bootstrap b = new Bootstrap();
        EventLoopGroup eventLoopGroup=new NioEventLoopGroup(1);
        b.group(eventLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .remoteAddress(serverAddress);

我們觀察EventLoopGroup的構造方法   EventLoopGroup的參數表示線程池大小(1表示只有一個線程),Bootstrap.group

多線程模式

        Bootstrap b = new Bootstrap();
        EventLoopGroup acceptorLoopGroup=new NioEventLoopGroup(1);
        EventLoopGroup handlerLoopGroup=new NioEventLoopGroup();
        b.group(eventLoopGroup,handlerLoopGroup)
                .channel(NioSocketChannel.class)
                .option(ChannelOption.SO_KEEPALIVE, true)
                .remoteAddress(serverAddress);
EventLoopGroup acceptorLoopGroup=new NioEventLoopGroup(1);說明acceptor還是單線程的。EventLoopGroup handlerLoopGroup=new EventLoopGrooup();設置線程數量是多核數量的2倍

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM