IO模型之NIO代碼及其實踐詳解


一、簡介

  NIO我們一般認為是New I/O(也是官方的叫法),因為它是相對於老的I/O類庫新增的( JDK 1.4中的java.nio.*包中引入新的Java I/O庫)。但現在都稱之為Non-blocking I/O,即非阻塞I/O,因為這樣叫,更能體現它的特點。而下文中的NIO,不是指整個新的I/O庫,而是非阻塞I/O。

  NIO提供了與傳統BIO模型中的Socket和ServerSocket相對應的SocketChannel和ServerSocketChannel兩種不同的套接字通道實現。

  新增的着兩種通道都支持阻塞和非阻塞兩種模式。

  阻塞模式使用就像傳統中的支持一樣,比較簡單,但是性能和可靠性都不好;非阻塞模式正好與之相反。

  對於低負載、低並發的應用程序,可以使用同步阻塞I/O來提升開發速率和更好的維護性;對於高負載、高並發的(網絡)應用,應使用NIO的非阻塞模式來開發。

二、NIO實現基礎

  NIO對應linux IO 模型的IO多路復用模型,netty、tomcat(tomcat 6及以后版本,tomcat 6之前是基於BIO)的實現也是基於NIO。

             

 

  I/O復用模型,是同步非阻塞,這里的非阻塞是指I/O讀寫,對應的是recvfrom操作,因為數據報文已經准備好,無需阻塞。說它是同步,是因為,這個執行是在一個線程里面執行的。有時候,還會說它又是阻塞的,實際上是指阻塞在select上面,必須等到讀就緒、寫就緒等網絡事件。有時候我們又說I/O復用是多路復用,這里的多路是指N個連接,每一個連接對應一個channel,或者說多路就是多個channel。復用,是指多個連接復用了一個線程或者少量線程(在Tomcat中是Math.min(2,Runtime.getRuntime().availableProcessors()))。

  NIO的注冊、輪詢等待、讀寫操作協作關系如下圖:

          

  1、多路復用器 Selector

  Selector的英文含義是“選擇器”,Selector是Java  NIO 編程的基礎,也可以稱為為“輪詢代理器”、“事件訂閱器”、“channel容器管理機”都行。

  Selector中也會維護一個“已經注冊的Channel”的容器(select使用數組,poll使用鏈表,epoll是紅黑樹+雙向鏈表,因為JDK使用了epoll()代替傳統的select實現,所以沒有最大連接句柄1024/2048(32*32/32*64)的限制,具體原理請轉到7層網絡以及5種Linux IO模型以及相應IO基礎文章末尾部分),應用程序將向Selector對象注冊需要它關注的Channel,以及具體的某一個Channel會對哪些IO事件感興趣。

  Selector提供選擇已經就緒的任務的能力:Selector會不斷輪詢注冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然后通過SelectionKey可以獲取就緒Channel的集合,進行后續的I/O操作。所以,只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。

  2、通道 Channel

  通道表示打開到 IO 設備(例如:文件、套接字,所以Channel有這兩種實現:SelectableChannel 用戶網絡讀寫;FileChannel 用於文件操作)的連接。

        

  其中SelectableChannel有以下幾種實現:

  • 所有被Selector(選擇器)注冊的通道,只能是繼承了SelectableChannel類的子類。
  • ServerSocketChannel:應用服務器程序的監聽通道。只有通過這個通道,應用程序才能向操作系統注冊支持“多路復用IO”的端口監聽。同時支持UDP協議和TCP協議。
  • ScoketChannel:TCP Socket套接字的監聽通道,一個Socket套接字對應了一個客戶端IP:端口 到 服務器IP:端口的通信連接。
  • DatagramChannel:UDP 數據報文的監聽通道。

  Channel相比IO中的Stream流更加高效(底層的操作系統的通道一般都是全雙工的,可以異步雙向傳輸,所以全雙工的Channel比流能更好的映射底層操作系統的API),但是必須和Buffer一起使用(若需要使用 NIO 系統,需要獲取用於連接 IO 設備的Channel通道以及用於容納數據的緩沖區Buffer。然后操作緩沖區,對數據進行處理。即通道中的數據總是要先讀到一個Buffer,或者總是要從一個Buffer中寫入)。也可以通過Channel通道向操作系統寫數據。

  3、緩沖區Buffer

  Buffer是Channel操作讀寫的組件,包含一些要寫入或者讀出的數據。在NIO庫中,所有數據都是用緩沖區處理的。在讀取數據時,它是直接讀到緩沖區中的;在寫入數據時,也是寫入到緩沖區中。任何時候訪問NIO中的數據,都是通過緩沖區進行操作。緩沖區實際上是一個數組,並提供了對數據結構化訪問以及維護讀寫位置等信息。

    具體的緩存區有這些:ByteBuffe、CharBuffer、 ShortBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。他們實現了相同的接口:Buffer。但是在我們網絡傳輸中都是使用byte數據類型。如圖:

      

  Buffer中有三個重要參數:

  位置(position):當前緩沖區(Buffer)的位置,將從該位置往后讀或寫數據。

  容量(capacity):緩沖區的總容量上限。

  上限(limit):緩沖區的實際容量大小。

  寫模式下Position為寫入多少數據的位數標識,Limit等於Capacity;讀模式下從Position讀到limit,Position為0。寫讀模式切換使用flip()方法。

  flip方法將Buffer從寫模式切換到讀模式。調用flip()方法會將position設回0,並將limit設置成之前position的值。

public Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

          

三、代碼實現

  1、服務端代碼實現

public class MultiplexerNioServer implements Runnable {

    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private volatile boolean stop = false;

    /**
     * 初始化多路復用器 綁定監聽端口
     *
     * @param port
     */
    public MultiplexerNioServer(int port) {
        try {
            serverSocketChannel = ServerSocketChannel.open();//獲得一個serverChannel
            selector = Selector.open();////創建選擇器  獲得一個多路復用器
            serverSocketChannel.configureBlocking(false);//設置為非阻塞模式 如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
            serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);//綁定一個端口和等待隊列長度
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//把selector注冊到channel,關注鏈接事件
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;    // 優雅停機
    }

    public void run() {
        while (!stop) {
            try {
                //無論是否有讀寫事件發生,selector每隔1s被喚醒一次。如果一定時間內沒有事件,就需要做些其他的事情,就可以使用帶超時的
                int client = selector.select(1000);
                System.out.println("1:"+client);
                // 阻塞,只有當至少一個注冊的事件發生的時候才會繼續.
          // int client = selector.select(); 不設置超時時間為線程阻塞,但是IO上支持多個文件描述符就緒
                if (client == 0) {
                    continue;
                }
                System.out.println("2:"+client);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        //處理事件
 handle(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable e) {
                e.printStackTrace();
            }finally {

            }
        }

        if (selector != null) {
            // selector關閉后會自動釋放里面管理的資源
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void handle(SelectionKey key) throws IOException {
        if (key.isValid()) {
            //連接事件
            if (key.isAcceptable()) {
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                // 通過ServerSocketChannel的accept創建SocketChannel實例
                // 完成該操作意味着完成TCP三次握手,TCP物理鏈路正式建立
                SocketChannel sc = ssc.accept();//3次握手
                sc.configureBlocking(false);
                sc.register(selector, SelectionKey.OP_READ);//連接建立后關注讀事件
            }

            //讀事件
            if (key.isReadable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                ByteBuffer readbuffer = ByteBuffer.allocate(1024);//寫 0 1024  1024
//                ByteBuffer readbuffer = ByteBuffer.allocateDirect(1024); //申請直接內存,也就是堆外內存
                // 讀取請求碼流,返回讀取到的字節數
                int readBytes = socketChannel.read(readbuffer);
                // 讀取到字節,對字節進行編解碼
                if (readBytes > 0) {
                    // 將緩沖區當前的limit設置為position=0,用於后續對緩沖區的讀取操作
                    readbuffer.flip();//讀寫模式反轉
                    // 將緩沖區可讀字節數組復制到新建的數組中
                    byte[] bytes = new byte[readbuffer.remaining()];
                    readbuffer.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("input is:" + body);

                    res(socketChannel, body);
                }else if(readBytes < 0){
                    // 鏈路已經關閉 釋放資源
                    key.cancel();
                    socketChannel.close();
                }else{
                    // 沒有讀到字節忽略
                }
            }

        }
    }

    private void res(SocketChannel channel, String response) throws IOException {
        if (response != null && response.length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            channel.write(writeBuffer);
            System.out.println("res end");
        }
    }
}

  可以看到,創建NIO服務端的主要步驟如下:

    1、打開ServerSocketChannel,監聽客戶端連接;

    2、綁定監聽端口,設置連接為非阻塞模式;

    3、創建Reactor線程,創建多路復用器並啟動線程。

    4、將ServerSocketChannel注冊到Reactor線程中的Selector上,監聽ACCEPT事件

    5、Selector輪詢准備就緒的key

    6、Selector監聽到新的客戶端接入,處理新的接入請求,完成TCP三次握手,簡歷物理鏈路

    7、設置客戶端鏈路為非阻塞模式

    8、將新接入的客戶端連接注冊到Reactor線程的Selector上,監聽讀操作,讀取客戶端發送的網絡消息

    9、異步讀取客戶端消息到緩沖區

    10、對Buffer編解碼,處理半包消息,將解碼成功的消息封裝成Task

    11、將應答消息編碼為Buffer,調用SocketChannel的write將消息異步發送給客戶端

  2、服務端啟動

public class NioServer {
    public static void main(String[] args) {
        int port=8080;
        MultiplexerNioServer nioServer=new MultiplexerNioServer(port);
        new Thread(nioServer,"nioserver-001").start();
    }
}

  啟動服務端后,打印堆棧信息可以看到,會一直阻塞在 select() 操作,等待請求的到來:

        

  通過控制台手動建立連接,測試如下:

        

  3、客戶端代碼實現

public class NioClientHandler implements Runnable {
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public NioClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            // 創建選擇器
            selector = Selector.open();
            // 打開監聽通道
            socketChannel = SocketChannel.open();
            // 如果為 true,則此通道將被置於阻塞模式;如果為 false,則此通道將被置於非阻塞模式
            socketChannel.configureBlocking(false); // 開啟非阻塞模式
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                int wait=selector.select(1000);
                if(wait==0){
                    continue;
                }
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;

                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    try {
                        handle(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void doConnect() throws IOException {
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        }else{
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    private void handle(SelectionKey key) throws IOException {
        if (key.isValid()) {
            SocketChannel sc = (SocketChannel) key.channel();
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                } else {
                    System.exit(1);
                }
            }
            if (key.isReadable()) {
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    readBuffer.flip();
                    byte[] bytes=new byte[readBuffer.remaining()];
                    readBuffer.get(bytes);
                    String body=new String(bytes,"UTF-8");
                    System.out.println("res"+body);
                    this.stop=true;
                }else if(readBytes<0){
                    key.cancel();
                    sc.close();
                }

            }
        }
    }
    
    private void doWrite(SocketChannel sc) throws IOException {
        // 將消息編碼為字節數組
        byte[] request = "0123456789".getBytes();
        // 根據數組容量創建ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(request.length);
        // 將字節數組復制到緩沖區
        writeBuffer.put(request);
        // flip讀寫切換操作
        writeBuffer.flip();
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining()) {
            System.out.println("寫入完成");
        }
    }
}

  4、啟動客戶端

public class NioClient {
    public static void main(String[] args) {
        new Thread(new NioClientHandler("localhost", 8080), "nioClient-001").start();
    }
}

  啟動客戶端后,會向服務端寫一些數據,然后再從服務端接收數據,如下圖所示:

             

   客戶端獲得服務端的回復后會自動斷開同服務端的連接,客戶端的一次請求就此完成。

四、BIO、NIO、AIO對比

         


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM