NIO(生活篇)


  今晚是個下雨天,寫完今天最后一行代碼,小魯班起身合上電腦,用滾燙的開水為自己泡制了一桶老壇酸菜牛肉面。這大概是苦逼程序猿給接下來繼續奮戰的自己最好的饋贈。年輕的程序猿更偏愛坐在窗前,在夜晚中靜靜的享受獨特的泡面香味。。。

  科班出身的小魯班雖然寫了N多復雜(CRUD)代碼,但仍口味清淡,他們往往不加或少加料包,由泡面熱騰騰的蒸汽熏蒸自己的臉頻,潤濕又干又澀的雙眼,撫慰受傷的心靈。然后,看着外邊依然還是熙熙攘攘的車流和不屬於自己的任何一個亮燈的窗口,卻思考着如何才能成為ー個名垂青史的程序猿。小魯班不迷茫。。。

  "我們一起學貓叫,一起喵喵喵"~~~小魯班放在書桌上的大哥大手機突然響了,打破了小魯班腦子里美好的yy

  小魯班心想:都這么晚了,誰TM還打電話過來,拿起電話一看,哦原來是他表哥魯班大師

  魯班大師:小老弟。晚上好嘛!

  小魯班:嚶嚶嚶,原來是大表哥呀,能和你通話真讓我難以置信呀。

  魯班大師:聽說你今天早上請兩老去館子喝早茶去了呀,有錢人,看來混的很不錯嘛。

  小魯班:哎,別提了,等了半天才通知有位置(接待阻塞),坐下之后又沒人來負責寫菜單(點餐阻塞),寫完菜單又沒有人負責上菜,我去~氣死老子

  魯班大師:哈哈哈哈哈,這館子的老板也太奧特曼(out)了,現在規模大點的飯館都采用NIO(同步非阻塞IO)模式啦。

  小魯班:額?,NIO是什么鬼,這和飯館有什么關系呢?

  魯班大師: emmmmm,故事得從一段很長很長的網絡編程模式歷史開始說起呢~

  S1.傳統的網絡編程模式(單線程下的通信)

  

  在單線程模式下,IO操作沒完成的時候,無法返回,造成服務器線程阻塞,其他客戶端不能連上服務端。

  在只有一個餐廳服務員的情況下,服務員接待了一位客人,客人到餐桌上坐下后,服務員等待客人點餐,此時又有一個客人來吃飯,但是已經沒有服務員去接待了,因為這個服務員在等待第一個客人點餐,直到第一個客人點完餐后,服務員把菜單交給廚房,然后才能去接待第二個進來的客人。。。(這樣的服務客人早就走了)

  那么我們來看看如何改進

  S2改良后網絡編程模式(多線程)

  在S1中我們發現了一些問題,當IO阻塞的時候,服務端無法接受請求,因此S2改用了多線程模式

  

  在多線程模式下,只要有客戶端連進來,我們都會為之創建一個線程專門去處理客戶端的IO操作。當完成之后,線程就會自動銷毀。但是這樣會帶來一個問題,就是線程的頻繁的創建和銷毀非常消耗服務器的資源。

  飯館里的老板面對這種情況,只好繼續請服務員去寫菜單了,來一個客人,就請一個服務員去負責客人的單子,問題是請服務員非常消耗老板的money呀,而且當寫完單子后又要計算工資,這個過程非常耗時間。

  PS:這里插入一些概念方便后文理解

  線程輪詢:

  只是將當前線程不停地執行循環體,不進行線程狀態的改變,所以響應速度更快。但當線程數不停增加時,性能下降明顯,因為每個線程都需要執行,占用CPU時間。

  線程阻塞:
  讓線程進入阻塞狀態進行等待,當獲得相應的信號(喚醒,時間) 時,才可以進入線程的准備就緒狀態,准備就緒狀態的所有線程,通過競爭CPU資源,進入運行狀態。 阻塞的線程不會占用 CPU 時間, 不會導致 CPU 占用率過高,但線程從阻塞狀態進入就緒狀態的切換時間要比輪詢略慢,消耗性能也多。

  S3繼續改良后的網絡編程模式(線程池)

  S2我們發現了這樣的問題就是線程的創建和銷毀非常損耗系統的性能,因此我們想到JDBC中連接池的解決方案,同樣的,這里我們可以創建線程池

  

  啟動服務后,事先創建100個線程,當有客戶端連進來的時候,不需要創建,就給他分配一個線程用於IO讀寫操作,當客戶完成IO操作完成之后就歸還線程到線程池中而不是銷毀,這樣做的好處就是解決了在運行的時候線程創建和銷毀對系統資源的損耗。同時也暴露了一些問題,其一在高並發的情況下,線程池中的線程不夠用了,此時會造成客戶端等待阻塞(當然也可以繼續創建線程來解決),其二高並發環境下由於普遍線程都存在讀寫阻塞,使得各個線程一起頻繁的進行上下文的切換,消耗的大量的資源,而這些資源本來是用來處理業務用的,現在則用來切換線程,這就大大的降低了系統有效的資源利用率。同時每個客戶端都為他開了一個線程,在很多時候,其實客戶端並不進行IO操作,就沒必要為他創建線程,因為系統的無IO操作線程數多了的話會也占用CPU資源的。

  老板覺得一直請人不划算,干脆就請30個人是在餐廳一個角落待命,當有客人坐下來的時候,就分配一個服務員去點餐。但是當有31客人同時來的時候,假設30個服務員都在等待寫單,那么第31個客人假如要點餐的時候就沒人為他服務了,同時點完餐時候的,突然客人想加餐,此時每個服務員都想着去搶到這個客戶,競爭過程消耗了時間,同時得知道剛剛的賬單都點了些什么還要相互交接未完成的任務,就更浪費人力物力。

  S4再次改良后的網絡編程模式(NIO)(非阻塞的IO多路復用機制)

  S3我們發現線程池不夠用,以及高並發情況下普遍線程都存在讀寫阻塞問題,使得各個線程一起頻繁的進行上下文的切換,消耗的大量的資源。主要原因都是

  1. 線程的IO阻塞導致線程狀態頻繁的切換消耗系統資源
  2. 無IO操作線程占用CUP資源導致的

  因此

  針對問題1我們可以通過建立無阻塞環境,這樣就不會因為阻塞導致線程狀態的切換。

  針對問題2我們可以通過改變線程的創建時機,不是Socket剛剛連上來的時候創建線程,而是等待需要進行IO操作的時候再去創建線程,從而減少無關線程的創建。

  

 

  這張圖對比上面的題我們發現多個三個陌生的面孔,下面介紹一下他們

  Channel表示為一個已經建立好的支持I/O操作的實體(如文件和網絡)的連接,在此連接上進行數據的讀寫操作,使用的是Buffer緩沖區來實現讀寫。

  ServerSocketChannel------->open() 獲得實例 ----------->register(selector,accept) 將通道管理器和該通道綁定,並為該通道注冊事件。

  通過socket.getChannel()的方法獲得通道inChannel

  通道的數據傳輸是這樣的

  將Buffer的數據讀入通道

  int bytesWritten =inChannel.write(buffer);

  從Channel讀取數據到Buffer

  int bytesRead =inChannel.read(buffer);

  Selector一個專門的選擇器來同時對多個Socket通道進行監聽(輪詢或阻塞),當其中的某些Socket通道上有它感興趣的事件發生時,這些通道就會變為可用狀態,當狀態是IO狀態的時候,就會為他分配一個線程處理業務,當不是IO狀態的時候只會為他注冊一個接收(一共4種接收,連接,讀,寫),不會分配線程,這樣的話就保證了,系統中存在的線程都是用來處理業務的而不是用來等待的,這樣就能夠減少線程,也就減少了線程上下文的切換損耗資源。利用 Selector可使一個單獨的線程管理多個 Channel。Selector(多路復用器) 是非阻塞 IO 的核心。

  Selector----->open()獲得實例------>select()監聽動作(讀還是寫還是連接,相當於之前的accept()方法),通過源碼發現SelectorProvider.provider().poll()依賴於操作系統創建

  Buffer緩沖區,就像一個數組,可以保存多個相同類型的數據(ByteBuffer,CharBuffer,.....DoubleBuffer)通過這個方法獲取static XxxBuffer allocate(int capacity) 

其中里邊有些方法例如clear、flip、rewind都是操作limit和position的值來實現重復讀寫,這樣的話IO就不會阻塞,不會出現客戶端在寫入的時候,服務端不能寫出造成線程的阻塞。簡而言之,Channel負責傳輸,Buffer負責存儲

  position(初始的位置,讀的時候,位置會移動)

  limit(當你讀取完成了,數據需要進行固定flip(),limit=position)

  capacity(數組大小的一個容量)

  clear()把position回歸到原位

   

 

  其實這里的Selector相當於一個接待主管,當有一個客人從大門(Channel)進來來吃飯的時候,先帶它到位置上,給他安排一個台號,然后一直監聽客人的需求,當客人需要點餐的時候,此時接待主管監聽到了,就立馬給他分配一個服務員去幫你它完成點餐,當客人需要加餐的時候,接待主管分配服務員到指定台號,然后只需要在賬單(Buffer)上添加即可!

  

 

 

  小魯班:哇塞,有點暈,但是我還是能看懂的,這些都是概念,表哥有代碼么?

  魯班大師:代碼嘛,我今天打排位的時候用魯班被噴沒皮膚,咳咳!

  小魯班:皮膚好說好說!

  1.OIO服務端代碼

public class OioServer {

    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception {

        ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
        //創建socket服務,監聽10101端口
        ServerSocket server=new ServerSocket(10101);
        System.out.println("服務器啟動!");
        while(true){
            //獲取一個套接字(阻塞)
            final Socket socket = server.accept();
            System.out.println("來個一個新客戶端!");
            newCachedThreadPool.execute(new Runnable() {
                
                @Override
                public void run() {
                    //業務處理
                    handler(socket);
                }
            });
            
        }
    }
    
    /**
     * 讀取數據
     * @param socket
     * @throws Exception
     */
    public static void handler(Socket socket){
            try {
                byte[] bytes = new byte[1024];
                InputStream inputStream = socket.getInputStream();
                
                while(true){
                    //讀取數據(阻塞)
                    int read = inputStream.read(bytes);
                    if(read != -1){
                        System.out.println(new String(bytes, 0, read));
                    }else{
                        break;
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }finally{
                try {
                    System.out.println("socket關閉");
                    socket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    }
}

  2.NIO服務端代碼

public class NIOServer {
    // 通道管理器
    private Selector selector;

    /**
     * 獲得一個ServerSocket通道,並對該通道做一些初始化的工作
     * 
     * @param port
     *            綁定的端口號
     * @throws IOException
     */
    public void initServer(int port) throws IOException {
        // 獲得一個ServerSocket通道
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 設置通道為非阻塞
        serverChannel.configureBlocking(false);
        // 將該通道對應的ServerSocket綁定到port端口
        serverChannel.socket().bind(new InetSocketAddress(port));
        // 獲得一個通道管理器
        this.selector = Selector.open();
        // 將通道管理器和該通道綁定,並為該通道注冊SelectionKey.OP_ACCEPT事件,注冊該事件后,
        // 當該事件到達時,selector.select()會返回,如果該事件沒到達selector.select()會一直阻塞。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    }

    /**
     * 采用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則進行處理
     * 
     * @throws IOException
     */
    public void listen() throws IOException {
        System.out.println("服務端啟動成功!");
        // 輪詢訪問selector
        while (true) {
            // 當注冊的事件到達時,方法返回;否則,該方法會一直阻塞
            selector.select();
            // 獲得selector中選中的項的迭代器,選中的項為注冊的事件
            Iterator<?> ite = this.selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = (SelectionKey) ite.next();
                // 刪除已選的key,以防重復處理
                ite.remove();

                handler(key);
            }
        }
    }

    /**
     * 處理請求
     * 
     * @param key
     * @throws IOException
     */
    public void handler(SelectionKey key) throws IOException {
        
        // 客戶端請求連接事件
        if (key.isAcceptable()) {
            handlerAccept(key);
            // 獲得了可讀的事件
        } else if (key.isReadable()) {
            handelerRead(key);
        }
    }

    /**
     * 處理連接請求
     * 
     * @param key
     * @throws IOException
     */
    public void handlerAccept(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        // 獲得和客戶端連接的通道
        SocketChannel channel = server.accept();
        // 設置成非阻塞
        channel.configureBlocking(false);

        // 在這里可以給客戶端發送信息哦
        System.out.println("新的客戶端連接");
        // 在和客戶端連接成功之后,為了可以接收到客戶端的信息,需要給通道設置讀的權限。
        channel.register(this.selector, SelectionKey.OP_READ);
    }

    /**
     * 處理讀的事件
     * 
     * @param key
     * @throws IOException
     */
    public void handelerRead(SelectionKey key) throws IOException {
        // 服務器可讀取消息:得到事件發生的Socket通道
        SocketChannel channel = (SocketChannel) key.channel();
        // 創建讀取的緩沖區
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = channel.read(buffer);
        if(read > 0){
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("服務端收到信息:" + msg);
            
            //回寫數據
            ByteBuffer outBuffer = ByteBuffer.wrap("好的".getBytes());
            channel.write(outBuffer);// 將消息回送給客戶端
        }else{
            System.out.println("客戶端關閉");
            key.cancel();
        }
    }

    /**
     * 啟動服務端測試
     * 
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        NIOServer server = new NIOServer();
        server.initServer(8000);
        server.listen();
    }

}

 

   魯班大師:好好理解,有什么不懂的就留言,你表哥先撤了,皮膚可別忘了送,不早了晚安~

  小魯班:好滴!

   

  拓展一波,關於REDIS單線程為什么這么快?

  其實這里用到的核心依然是NIO,我們來看一下REDIS的工作流程

  

  解析一下,IO多路復用器復制監聽Socker連接的請求,把准備好的客戶端請求壓到一個隊列中,這樣避免了IO阻塞的等待,同時該線程只負責把請求壓到隊列中,而文件時間派發器則是從隊列中獲取請求,並處理請求,這里的處理方式是單線程的,避免了多線程頻繁的上下文切換帶來的資源耗費,同時該操作是存內存的操作,非常快

   總結有三點

       1)純內存操作

       2)核心是基於非阻塞的IO多路復用機制

       3)單線程反而避免了多線程的頻繁上下文切換問題


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM