掌握NIO,程序人生


就像新IO為java帶來的革新那樣,讓我們也開啟一段新的程序人生。

關鍵字:NIO,BIO,偽IO,AIO,多路復用選擇器,通道,緩沖區,jdk研究,回調函數,高並發

java.nio 概述

歷史背景

在java nio出現之前,java網絡IO是只有輸入輸出流操作的基於同步阻塞的Socket編程,這在實際應用中效率低下,因此當時高性能服務器開發領域一直被擁有更接近UNIX操作系統的Channel概念的C++和C長期占據。我們知道現代操作系統的根源都是來自於UNIX系統,它代表了操作系統層面底層的平台支撐,在UNIX網絡編程中,它的IO模型包括阻塞IO,非阻塞IO,IO復用,信號驅動IO以及異步IO,對於他們的具體解釋這里不便展開,未來如果有機會介紹UNIX網絡編程再來詳敘。總之,平台底層是支持多種IO模型的,而當時的java只有阻塞IO這么一種,這也是編碼最容易實現的一種,但卻極大的限制了java在服務端的發展。java為了獲得更高的性能表現,在jdk1.4版本增加了對異步IO模型的支持,提高了java的高性能IO處理能力,今天java已經逐漸取代C++成為了企業服務端應用開發的首選語言。java新增的這部分類庫就是java nio。

jdk1.6的nio源碼結構

java.nio.*包中引入了新的java io類庫,旨在提高速度,實際上舊的IO已經使用nio重新實現過,以便充分利用這種速度的提高,因此,即使我們不顯式地用nio編寫代碼,也能獲得受益效果。速度的提高在文件IO和網絡IO中都有可能發生,概述部分結束以后我們會分別介紹。首先我們來看jdk 1.6的源碼,分析一下java.nio相關的結構。

java.nio 
java.nio.channels 
~~java.nio.channels.spi ~~
java.nio.charset 
~~java.nio.charset.spi ~~

spi

那兩個spi結尾的都是相關的提供者服務類,可以參考前面講解提供者模式的文章,簡單來說就是,java.nio.channels.spi包是用於服務/決定/改變java.nio.channels對象的,同樣的,java.nio.charset.spi包是用於服務/決定/改變java.nio.charset對象的。我們在官方文檔上也能夠看到,這兩個spi包都是

java.nio.channels.spi: 只有那些定義新的選擇器提供者的開發人員才應直接使用此包。

java.nio.charset.spi: 只有定義新charset的開發人員才需要直接使用此包。

我們沒有開發新的charset和selector的需求,因此spi包可以忽略掉。

nio包

①緩沖區

nio包定義了作為數據容器的緩沖區,以及作為其他nio包的父類。主要是java.nio.Buffer類,以及它的子類。再加上一個java.nio.ByteOrder,它是一個字節順序的類型安全枚舉。(TODO:接下來會有專門介紹枚舉的文章)

Buffer是一個對象,它包含一些寫入或要讀出的數據,這與舊IO最大的不同在於舊IO將數據直接讀寫到流對象中,少了這一個中間層。

在NIO中所有數據讀寫都用Buffer處理。

緩沖區實質上是一個數組,通常它是一個字節數組——ByteBuffer,也可以是其他數據類型的數組。同時緩沖區還提供了對數據結構化訪問以及維護讀寫位置等信息。對於這些不同數據類型的Buffer類,他們都有相同的操作,只有ByteBuffer提供了更多的一些直接操作通道的方法。

這些Buffer類(Buffer類以及它的子類)定義了一個用於通過IO發送特定基本類型具體數據的有序線性表結構的容器,除了boolean,覆蓋了:byte, short, int, long, float, double 和 char。

特別的是,ByteBuffer類存在一個單獨的子類:MappedByteBuffer類。此類用特定於內存映射文件區域的操作擴展了ByteBuffer類,可用於內存映射文件的I/O 操作。

channels包

該包定義了各種通道,這些通道表示到能夠執行I/O操作的實體(如文件和套接字)的連接;定義了用於多路復用的、非阻塞I/O操作的選擇器。

①通道

NIO所有網絡IO都是通過Channel。舉例來講,如果通道是一條傳送帶,那上面的不同形狀的箱子就是緩沖區Buffer,而箱子內部裝有的才是真正的數據。

基本上,所有的NIO都是從一個Channel開始,數據可以從緩沖區傳輸到通道,也可以從通道傳輸到緩沖區。

全雙工:通道與流的不同之處在於流是定向的,半雙工的,InputStream只能是輸入流,只可以讀取數據;OutputStream是輸出流,只能寫入數據。沒有一種既能寫又能讀的,但是通道可以,通道是雙向的。Channels的設計更接近底層操作系統,因為操作系統的通道就是全雙工的。

下面是包中的通道相關類:

  • Channels:針對信道和流的實用工具方法。
  • DatagramChannel:基於UDP數據報的網絡IO通道
  • FileChannel:讀取、寫入、映射和操作文件IO通道
  • FileChannel.MapMode:文件映射模式的類型安全的枚舉。
  • FileLock:表示文件區域鎖定的標記。
  • Pipe:實現單向管道傳送的通道對,一個可寫入的sink通道和一個可讀取的source通道。
  • Pipe.SinkChannel:一個可寫入的sink通道。
  • Pipe.SourceChannel:一個可讀取的source通道。
  • ServerSocketChannel:類似於ServerSocket的基於TCP流的網絡IO編程的服務端通道
  • SocketChannel:類似於Socket的基於TCP流的網絡IO編程的客戶端通道

通道總體上來講,可以分為兩大類:

  • 網絡IO:SelectableChannel
  • 文件IO:FileChannel

根據上一篇介紹TCP和UDP的socket編程,我們可以看出這些通道有着自己的一套基於TCP和UDP的Socket編程。其實Socket雖然沒有在IO包中,但它是網絡IO,也屬於IO總范疇,通道中還有一個在IO中介紹過的用於文件IO的通道FileChannel。所以,以上內容除了管道以外,我們在之前的IO或Socket中都有過研究,可以作為我們在nio包中繼續研究他們的基礎。

至於管道,這是我們在IO中留的坑,本篇文章來填。

我們之前說管道處理的是進程間的通信,實際上網絡編程中客戶端和服務端也是通過端口進行端到端通信,也屬於兩個進程間的通信。

②選擇器

選擇器Selector是NIO編程的基礎,它是多路復用的、支持非阻塞IO操作的。

多路復用器提供選擇已就緒任務的能力。

  • Selector會不斷地監聽(通過輪詢的手段)注冊在其上的Channel,如果某個Channel上面發生事件(比如:連接打開,數據到達),這個Channel就處於就緒狀態,會被Selector輪詢出來,然后通過SelectionKey可獲取就緒Channel的集合,進行后續IO操作。一個Selector可以同時輪詢多個Channel,只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端。

下面是包中的選擇器相關類:

  • SelectableChannel:可通過Selector實現多路復用的通道。
  • SelectionKey:表示SelectableChannel在Selector中的注冊的標記。
  • Selector:SelectableChannel對象的多路復用器。

下面我們來分別認識一下多路復用和非阻塞IO:

  • 多路復用:在IO編程過程中,當需要同時處理多個客戶端接入請求時,可以利用多線程或者IO多路復用技術進行處理。簡單來說多路復用技術是與多線程同級的,解決差不多問題的技術,它通過把多個IO的阻塞復用到同一個select的阻塞上,使得系統在單線程的情況下可同時處理多個客戶端請求。與傳統的多線程/多進程相比,IO多路復用的優勢是系統開銷小,不需要創建和維護新的線程。
  • 非阻塞IO:nio之前,io操作都是同步阻塞IO(BIO),上面也講過了,存在性能和可靠性的瓶頸。
    • BIO最主要的問題在於每當有一個新客戶端請求接入時,服務端必須創建一個新的線程處理新接入的客戶端鏈路,線程與客戶端連接是一對一的關系,當有線程處於阻塞狀態,cpu就要頻繁使用上下文切換去更換到有效線程,然而上下文切換是一個比較消耗資源的操作,它要將當前線程的所有的上下文環境基礎復制到另外一個有效的線程上去繼續執行新的IO請求,而當多個線程阻塞時,cpu會無意義的在多個線程直接做上下文切換,在高性能服務器應用領域,往往要面向成千上萬個客戶端的並發連接,這種模型顯然無法滿足高性能、高並發接入的場景。
    • 偽異步IO,我們在IO中介紹的線程池雖然限制了線程的最大連接數,但是底層仍舊是基於BIO實現的,所以被稱為偽異步IO。
    • BIO源碼的阻塞,我們知道ServerSocket的accept方法會阻塞一個線程直到新的客戶端連入,除此之外,流操作中的InputStream的read方法和OutputStream的write方法在讀取操作過程中都會引發同步阻塞問題,阻塞的時間取決於對方IO線程的處理速度和網絡IO的處理速度,本質上講,這些對方的環境問題我們並沒辦法去保證。
    • 非阻塞IO(Non-block IO),它的縮寫也為NIO,這與New IO其實並無區別,因為New IO就是增加的對Non-block IO的支持,因此我們說NIO可以是New IO,也可以是Non-block IO,不要太在意這方面的困惑。NIO通過對通道的阻塞行為的配置,可以實現非阻塞的通道,使得線程在等待時(舊IO中的阻塞)可以同時做其他事情,實現了線程的異步操作。
      • NIO相對於BIO來講,雖然提供了高性能,高並發的支持,但是它也帶來了較高的編程復雜度。
      • 如果是低負載、低並發的應用程序大可使用BIO,而不要一味趕NIO的時髦。
      • 由一個專門的線程來負責處理分發所有的IO事件。
      • 把整個IO任務切換成多個小任務,通過任務間協作完成。
      • 事務驅動:只有IO事件到的時候才去處理,而不是時刻保持監視事件。
      • 線程通訊:通過notify和wait等方式通信,保證每次的上下文切換是有意義的,減少無意義的切換。
      • 通過增加Pipe、Channel、Buffer和Selector等很多UNIX網絡編程的概念,NIO實現了非阻塞IO,所以這是一個遞歸,在這里引用回了我們本篇博文的主題——NIO。

nio概述部分就算結束了,繼續多說就太浮於表面了,下面針對NIO的幾個主要特性進行代碼實例的演示。

基於TCP的NIO實例

下面我們先分別介紹一下基於TCP的NIO的服務端和客戶端的編寫流程。

基於TCP的NIO服務端的編寫流程

直接通過代碼來展示:

    /**
     * nio服務端與io的ServerSocket的編寫流程極為相似,nio包已經封裝好相關方法。
     */
    public void seqNio() throws IOException {
        // 1,open服務端Socket通道。
        ServerSocketChannel ssc = ServerSocketChannel.open();
        // 2,服務端通道綁定端口。
        ssc.bind(new InetSocketAddress(ipAddress, port));
        ssc.configureBlocking(false);// 設置通道為非阻塞
        // 3,open一個多路復用選擇器
        Selector selector = Selector.open();
        new Thread().start();// 開啟一個線程用於維護選擇器
        // 4,將服務端通道注冊到選擇器,監聽接入操作
        ssc.register(selector, SelectionKey.OP_ACCEPT);
        // 5,選擇器輪詢它身上的keys
        Set<?> selectKeys = selector.selectedKeys();
        Iterator<?> it = selectKeys.iterator();
        while (it.hasNext()) {
            SelectionKey key = (SelectionKey) it.next();
            System.out.println(key);
            // deal with the I/O event.
        }
        // 6,服務端通道accept處理新客戶端請求
        SocketChannel channel = ssc.accept();
        // 7,設置客戶端通道屬性
        channel.configureBlocking(false);
        channel.socket().setReuseAddress(true);
        // 8,注冊客戶端通道到選擇器,監聽讀操作。
        channel.register(selector, SelectionKey.OP_READ);
        // 9,直接操作客戶端通多進行異步讀操作
        ByteBuffer bb = ByteBuffer.allocateDirect(1024);
        channel.read(bb);
        // 10,解碼decode讀取的信息
        // 11,異步寫響應信息回客戶端通道
        channel.write(bb);
    }

基於TCP的NIO客戶端的編寫流程

同樣直接通過代碼來展示:

    /**
     * 列出大致的TCPNIO客戶端的執行順序
     * 
     * @throws IOException
     */
    public void seqClient() throws IOException {
        // 1,open創建一個客戶端通道實例
        SocketChannel sc = SocketChannel.open();
        // 2,設置通道關於TCP的屬性
        sc.configureBlocking(false);
        sc.socket().setReuseAddress(true);// 關閉TCP連接時,該連接可能在關閉后的一段時間內保持超時狀態
        sc.socket().setSendBufferSize(1024);// 將此Socket的SO_SNDBUF選項設置為指定的值。
        sc.socket().setReceiveBufferSize(1024);// 將此Socket的SO_RCVBUF選項設置為指定的值。
        // 3,open創建一個選擇器的實例
        Selector selector = Selector.open();
        // 4,客戶端連接服務端(遠程主機端口)
        boolean connected = sc.connect(new InetSocketAddress(ipAddress, port));
        if (connected) {
            // 5, 判斷連接如果成功,則注冊到選擇器的讀操作位
            sc.register(selector, SelectionKey.OP_READ);
        } else {
            // 6,如果連接不成功,則注冊到選擇器的連接操作位,監聽服務端的TCP的ACK應答
            sc.register(selector, SelectionKey.OP_CONNECT);
        }
        // 7,輪詢選擇器的keys
        Set<SelectionKey> set = selector.selectedKeys();
        Iterator<SelectionKey> it = set.iterator();
        SelectionKey key = null;
        while (it.hasNext()) {
            key = it.next();
            // 處理io event
            // 8,處理連接
            if (key.isConnectable()) {
                sc.register(selector, SelectionKey.OP_READ);// 連接成功以后就將讀操作注冊到多路復用選擇器上
            } else if (key.isReadable()) {
                // 9,處理數據
                ByteBuffer bb = ByteBuffer.allocate(1024);// 分配1MB緩沖區。
                SocketChannel socketChannel = (SocketChannel) key.channel();
                int readBytes = socketChannel.read(bb);
                if (readBytes > 0) {
                    // 10,解碼
                }
                // 11, 將客戶端請求寫回通道。
                socketChannel.write(bb);
            }
        }
    }

重新構建基於TCP的NIO實例

上面給出了大致的基於TCP的NIO編程的流程,大家不必過於研究他們,下面我們構建實例來說明,會得到更好的學習效果。本打算在之前寫過的java TCP socket實例的基礎上進行改造,然而由於客戶端等待標准輸入就是一個阻塞的過程,這與異步非阻塞的NIO是相悖的,我確實做過嘗試,發現一旦客戶端阻塞在等待用戶輸入的位置,整個線程都被迫等待,那么依賴於線程的Selector就不能更好的發揮作用,服務端的響應消息也就永遠無法給出。

換句話說,服務端的響應是無法像socket編程那樣寫在客戶端發送消息以后,同步接收服務端數據,然后再阻塞等待用戶新的輸入,NIO是通過對key的輪詢,這是異步的行為,也就是說客戶端發送請求消息與服務端返回響應消息是完全解耦的,不存在同步的順序執行的關系,所以當我們的線程被客戶端的標准輸入阻塞,key的輪詢異步操作也就完全run不起來了。

因此,我們采用去掉客戶端阻塞代碼的方式,重新構建基於TCP的NIO實例:指定客戶端請求消息,服務端接收以后返回當前時間作為響應。下面看代碼:

我們在Base里面定義了一些常量。

public class Base {
  protected final static Logger logger = LogManager.getLogger();
  protected static String ipAddress = "127.0.0.1";
  protected static int port = 23451;
  protected static String TIMEQUERY = "query time";
  protected final static int BUFFER_SIZE = 1024;
}

客戶端和服務端的入口都非常簡單。

public class NIOTCPClient extends Base {
  public static void main(String[] args) {
    new Thread(new ReactorClientHandler(), "nio-client-reactor-001").start();
  }
}
public class NIOTCPServer extends Base {
  public static void main(String[] args) {
    new Thread(new ReactorServerHandler(), "nio-server-reactor-001").start();
  }
}

下面分別介紹ReactorClientHandler和ReactorServerHandler。

package javaS.IO.nioS;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

import javaS.IO.socketS.Base;

public class ReactorClientHandler extends Base implements Runnable {
  private Selector selector;
  private SocketChannel sc;// 定義一個客戶端通道
  private volatile boolean stop;

  /**
   * 構造期間初始化對象,客戶端構造函數不做事務操作
   * 
   * @param ip
   * @param port
   */
  public ReactorClientHandler() {
    try {
      selector = Selector.open();
      sc = SocketChannel.open();
      // 設置客戶端通道屬性以及TCP參數
      sc.configureBlocking(false);
      sc.socket().setSendBufferSize(BUFFER_SIZE);
      sc.socket().setReceiveBufferSize(BUFFER_SIZE);
    } catch (IOException e) {
      e.printStackTrace();
      System.exit(1);
    }

  }

  @Override
  public void run() {
    try {
      // TODO: 連接成功,未考慮重連操作
      doConnect();
    } catch (IOException e) {
      e.printStackTrace();
    }
    while (!stop) {
      try {
        selector.select(1000);
        Set<SelectionKey> set = selector.selectedKeys();
        Iterator<SelectionKey> it = set.iterator();
        SelectionKey key = null;
        // 異步非阻塞單線程輪詢多路復用器選擇器keys
        while (it.hasNext()) {
          key = it.next();
          it.remove();// 處理一個Key就移除一個,然而在handleInput中還會注冊進來讀操作
          try {
            handleInput(key);
          } catch (Exception e) {
            if (key != null) {
              // 關閉key
              key.cancel();
              // 關閉在handleInput方法中打開的key的通道
              if (key.channel() != null) {
                key.channel().close();
              }
            }
          }
        }
      } catch (IOException e) {
        e.printStackTrace();
        System.exit(1);
      }
    }

    /**
     * 多路復用器關閉后,所有注冊在上面的Channel和Pipe等資源都會被自動去關閉,不需要重復釋放資源。
     */
    if (selector != null) {
      try {
        selector.close();
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }

  /**
   * 通過輪詢key來處理通道的數據(包括發送請求和接收響應)
   * 
   * @param key
   * @throws IOException
   */
  private void handleInput(SelectionKey key) throws IOException {
    if (key.isValid()) {// 首先判斷key是否有效
      SocketChannel sc = (SocketChannel) key.channel();// 用一個客戶端通道來接收key的通道
      if (key.isConnectable()) {// 判斷是否是連接狀態,說明服務端已返回ACK應答消息,但是連接還沒有最終建立
        if (sc.finishConnect()) {// 判斷是否連接成功,建立成功
          // 注冊該客戶端通道到多路復用器上,注冊讀操作位,監聽網絡讀操作
          sc.register(selector, SelectionKey.OP_READ);
          // 發送請求消息給服務端。
          sendReq(sc, TIMEQUERY);
        } else {// 連接失敗
          System.exit(1);
        }
      } // 格外注意這里的判斷終止符,key的狀態如果是Read就不是Connect了,要分開判斷。
      if (key.isReadable()) {
        // 讀取服務器的應答響應消息,如果客戶端接收到了服務端的響應消息,則SocketChannel是可讀的。
        ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);// 無法事先判斷碼流大小,開辟一個1MB的緩沖區
        int readBytes = sc.read(readBuffer);// 此時read為異步非阻塞的,因為我們已經為該通道設置為非阻塞。
        /**
         * 只要涉及異步讀,就要判斷讀取的結果
         */
        if (readBytes > 0) {// 讀到了字節,對字節進行編解碼。
          readBuffer.flip();// 將緩沖區當前limit設置為position,position設置為0,用於后續對緩沖區的讀取操作。
          byte[] bytes = new byte[readBuffer.remaining()];
          readBuffer.get(bytes);
          String body = new String(bytes, "UTF-8");// 使用UTF-8解碼
          logger.info("服務端的響應消息:" + body);
          this.stop = true;// 得到服務端響應則斷開連接線程(線程斷了,jvm上面所有的通道,key等資源都自動沒了,所以不必去重復釋放資源)。
        } else if (readBytes < 0) {// 返回值為-1,鏈路已關閉,需要手動關閉SocketChannel
          key.cancel();
          sc.close();
        } else {// 沒有讀到字節,多數情況,忽略。
          ;
        }
      }
    }
  }

  /**
   * 客戶端發起連接的操作
   * 
   * @throws IOException
   */
  private void doConnect() throws IOException {
    if (sc.connect(new InetSocketAddress(ipAddress, port))) {
      // 如果已連接,則注冊客戶端通道的讀操作到選擇器
      sc.register(selector, SelectionKey.OP_READ);
    } else {
      /**
       * 如果未連接,不代表連接失敗,可能還在等待服務端返回TCP握手應答消息, 所以此時注冊客戶端通道的連接操作到選擇器,等候輪詢執行連接操作, 當服務端返回TCP
       * syn-ack消息后,Selector就能夠輪詢到這個SocketChannel處於連接就緒狀態
       */
      sc.register(selector, SelectionKey.OP_CONNECT);
    }
  }

  /**
   * 客戶端通道發送消息
   * 
   * @param sc
   *          客戶端通道
   * @param strReq
   *          待發送消息
   * @throws IOException
   */
  private void sendReq(SocketChannel sc, String strReq) throws IOException {
    byte strBytes[] = strReq.getBytes();
    // 構造請求消息體ByteBuffer,只有通過ByteBuffer才能操作通道發送消息
    ByteBuffer reqBuffer = ByteBuffer.allocate(strBytes.length);// 由於已知碼流大小為strBytes.length,所以建立一個一樣大的緩沖區
    reqBuffer.put(strBytes);// 將請求的數據放入發送緩沖區(以字節數組的形式)
    reqBuffer.flip();
    sc.write(reqBuffer);
    /**
     * 由於發送請求是異步的,不會一次性全部發送成功,會存在“寫半包”的問題, 所以要通過hasRemaining方法對發送結果進行判斷,如果緩沖區中消息全部發送完成,則打印發送成字樣提示用戶。
     */
    if (!reqBuffer.hasRemaining()) {
      logger.info("客戶端請求發送成功!");
    }
  }
}

package javaS.IO.nioS;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

import javaS.IO.socketS.Base;

/**
 * 基於TCP的NIO服務端多路復用類(多路是基於單線程實現的"多線程")
 * 
 * 該類實現了Runnable接口,是一個獨立的線程,負責輪詢多路復用器Selector,可以處理多個客戶端的並發接入。
 * 
 * @author Evsward
 *
 */
public class ReactorServerHandler extends Base implements Runnable {
  private Selector selector;
  private ServerSocketChannel servChannel;// 定義一個服務端通道
  private volatile boolean stop;

  /**
   * 初始化多路復用選擇器,綁定監聽端口。
   * 
   * @param port
   *          監聽的端口
   */
  public ReactorServerHandler() {
    try {
      // 初始化對象
      selector = Selector.open();// 通過靜態方法open創建一個Selector實例。
      servChannel = ServerSocketChannel.open();// 通過靜態方法open創建一個ServerSocketChannel實例,
      servChannel.configureBlocking(false);// 設置ServerSocketChannel通道為非阻塞。
      // 開始事務操作
      servChannel.socket().bind(new InetSocketAddress(ipAddress, port), BUFFER_SIZE);// 通道綁定並監聽IP和端口,允許接入最多1024個連接。
      servChannel.register(selector, SelectionKey.OP_ACCEPT);// 服務器通道注冊到多路復用選擇器上。
      logger.info("server is listening in port: " + port);
    } catch (IOException e) {
      e.printStackTrace();
      System.exit(1);
    }
  }

  // 向外部提供一個停止監聽的方法。
  public void stop() {
    this.stop = true;
  }

  @Override
  public void run() {
    while (!stop) {
      try {
        selector.select(1000);// 設置休眠時間1s,每隔1s運行一次,也可以無參,當有就緒Channel時觸發執行,從而實現網絡的異步讀寫操作
        // 多路復用器輪詢Keys
        Set<SelectionKey> selectedKeys = selector.selectedKeys();
        Iterator<SelectionKey> it = selectedKeys.iterator();
        SelectionKey key = null;
        while (it.hasNext()) {
          key = it.next();
          it.remove();// 處理key以后,移除該key
          try {
            handleInput(key);
          } catch (IOException e) {
            if (key != null) {
              key.cancel();
              if (key.channel() != null) {
                key.channel().close();
              }
            }
          }
        }
      } catch (IOException e) {
        e.printStackTrace();
      }
    }
  }

  /**
   * 通過SelectionKey來處理客戶端請求以及響應。
   * 
   * @param key
   *          通道注冊在選擇器上的key
   */
  private void handleInput(SelectionKey key) throws IOException {
    if (key.isValid()) {
      // 處理新接入的請求消息
      if (key.isAcceptable()) {
        // 用一個服務端通道來接收key的通道
        ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
        // accept 新連接(創建新通道相當於TCP三次握手,建立TCP物理鏈路,但並不創建新線程)
        SocketChannel sc = ssc.accept();
        sc.configureBlocking(false);
        // 增加客戶端通道到選擇器,注意:服務端通道都是OP_ACCEPT操作位,客戶端通道都是OP_READ操作位。
        sc.register(selector, SelectionKey.OP_READ);
      }
      if (key.isReadable()) {
        // 讀取客戶端的請求消息
        SocketChannel sc = (SocketChannel) key.channel();
        ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);// 開辟一個1MB的緩沖區
        int readBytes = sc.read(readBuffer);// 此時read為非阻塞的,因為我們已經為該通道設置為非阻塞。
        if (readBytes > 0) {// 讀到了字節,對字節進行編解碼。
          readBuffer.flip();// 將緩沖區當前limit設置為position,position設置為0,用於后續對緩沖區的讀取操作。
          byte[] bytes = new byte[readBuffer.remaining()];
          readBuffer.get(bytes);
          String body = new String(bytes, "UTF-8");// 使用UTF-8解碼
          logger.info("客戶端請求信息:" + body);
          // TODO: 簡單處理請求,直接返回當前時間作為響應消息。
          doWrite(sc, new Date().toString());
        } else if (readBytes < 0) {// 返回值為-1,鏈路已關閉,需要手動關閉SocketChannel
          key.cancel();
          sc.close();
        } else {// 沒有讀到字節,多數情況,忽略。
          ;
        }
      }
    }
  }

  /**
   * 將響應消息異步發送回客戶端
   * 
   * @param channel
   *          客戶端通道
   * @param response
   *          響應消息的內容
   * @throws IOException
   */
  private void doWrite(SocketChannel channel, String response) throws IOException {
    if (response != null && response.trim().length() > 0) {
      byte[] bytes = response.getBytes();
      ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
      writeBuffer.put(bytes);
      writeBuffer.flip();
      channel.write(writeBuffer);// 將緩沖區內容寫入通道,發送出去
      /**
       * TODO 由於SocketChannel是異步非阻塞的,所以寫消息發送時不會一下子全部發送完畢,所以會出現“寫半包”的問題。
       * 我們需要注冊寫操作,不斷輪詢Selector,將沒有發送完的ByteBuffer發送完畢。 然后可以通過ByteBuffer的hasRemain方法判斷消息是否完整發送完畢。
       */
      if (!writeBuffer.hasRemaining()) {
        logger.info("服務端響應發送成功!");
      }
    }
  }
}

ByteBuffer的flip操作解析

上面代碼中總會出現關於ByteBuffer對象調用的flip方法,它的解釋是

將緩沖區當前limit設置為position,position設置為0,用於后續對緩沖區的讀取操作。

然而仍是一頭霧水,下面從ByteBuffer的屬性來解析這個flip方法的含義。首先來看ByteBuffer的幾個屬性:

  • capacity:定義了ByteBuffer對象的容量。
  • limit:定義了ByteBuffer在讀寫操作中的界限,讀操作中limit代表有效數據的長度(肯定是小於等於capacity),寫操作中等於capacity。
  • position:讀寫操作的當前下標。
  • mark:一個臨時存放的下標,用來在ByteBuffer對象的中間進行讀寫操作。

以上屬性可以通過以下方法進行設定:

  • clear():把position設為0,把limit設為capacity,一般在把數據寫入Buffer前調用。
  • flip():把limit設為當前position,把position設為0,一般在從Buffer讀出數據前調用。意思是將當前position即有效數據長度賦值給limit,然后將當前position調整到0,從0開始讀取,一直讀到limit。
  • rewind():把position設為0,limit不變,一般在把數據重寫入Buffer前調用。

總結

以上關於該實例的所有代碼都已經完整給出,至於內部的具體執行方式,我直接在代碼行間做了充足的注釋說明,我想比在這里用文字長篇累牘的效果要好得多。不過下面我還是要對該實例展現出的NIO特性以及該實例的局限性進行一個分析總結。

  • 首先是該實例展現的NIO特性,無論服務端還是客戶端,同一時間只需要唯一一個線程啟動,由它維持着多路復用器的輪詢工作,而實際上原來的多線程工作都轉交給了這個多路復用器,通過多路復用器將通道上的每個IO操作注冊進來,然后多路復用器有個休眠時間,selector.select(1000);每隔1s就會輪詢一遍。實際上,整個selector的輪詢工作本身是對當前線程的阻塞,但這是線程層面的阻塞,是為了保持多路復用器的輪詢工作得以持續開展,而涉及到具體業務io操作的工作不會被阻塞,原Socket的工作方式是將這些業務操作都做了阻塞同步操作,而NIO將線程與多路復用器做了分層,在多路復用器層面,我們達到了對業務IO操作的異步非阻塞的目標。
  • 接下來是分析該實例的局限性,就本例而言,最主要的問題還是異步請求的結果問題,正如上面我們分析過的,無法用該實例實現標准輸入返回回聲的基本業務需求,由於無法外部更新客戶端的請求,客戶端無法保持與服務端的連接,只能是發送請求以后則關閉,否則該通道就會完全阻塞廢棄在那里。該實例只是為了展示NIO的使用方式,總結下來來看,相較於原Socket編程,即使是“簡配版本”的NIO操作,也稱不上方便。

新NIO

我們都知道NIO本就是新IO了,怎么又蹦出來一個新NIO。其實是這樣的,上面的實例中我們也體會到了NIO對異步操作處理的力不從心,所以針對以上的問題,JDK1.7升級了NIO的類庫,我們可以叫它為新NIO,也可以是NIO 2.0,java正式提供了對異步IO的支持,解決了我們上面實例中提到的關於異步結果無法讀取的問題,客戶端與服務端的通道可以隨時不斷地發送請求和返回響應,實現真正的客戶端和服務端的長連接通道。對於新NIO中這部分支持異步通信的,我們稱他們為AIO(Asynchronous IO)。

AIO主要是通過回調函數解決了對異步操作結果的處理。該回調函數是通過實現CompletionHandler接口的completed方法。

jdk1.7的nio源碼結構

下面我們來看一下jdk1.7的nio源碼架構:

java.nio
java.nio.channels
java.nio.channels.spi
java.nio.charset
java.nio.charset.spi
java.nio.file
java.nio.file.attribute
java.nio.file.spi

跟上面介紹過的jdk1.6版本的相同,spi包的可以去掉。那么增加的是java.nio.file類,於此同時,channels包中也發生了變化,

*AsynchronousChannelGroup
*AsynchronousFileChannel
*AsynchronousServerSocketChannel:
*AsynchronousSocketChannel
Channels
DatagramChannel
FileChannel
FileChannel.MapMode
FileLock
*MembershipKey
Pipe
Pipe.SinkChannel
Pipe.SourceChannel
SelectableChannel
SelectionKey
Selector
ServerSocketChannel
SocketChannel

以上開頭帶*的類是我標示出來的jdk7新增加的類。其中MembershipKey是代表互聯網協議中多路廣播組的一員,我們暫且不管它,重點研究Asynchrounous開頭的異步支持類。先看他們的定義,

  • AsynchronousChannelGroup:對異步通道進行分組,達到資源共享的目的。
  • AsynchronousFileChannel:一個可以對文件讀寫操作的異步通道。
  • AsynchronousServerSocketChannel:一個異步通道用作流導向的監聽套接字,說白了就是服務端Socket通道。
  • AsynchronousSocketChannel:一個異步通道用作流導向的連接套接字,就是客戶端Socket通道。

這里我想對新nio中Client-Server架構進行一下理解。

其實在NIO中,因為全雙工的緣故,服務端客戶端的定義界限沒有原始Socket那么嚴格,服務端在NIO中的體現在它是監聽,而客戶端是連接,通過這條通道,他們都可以給對方發送消息,我們通常稱服務端發送給客戶端的消息為響應,而客戶端發送給服務端的消息為請求。

接着說回我們的新NIO,除了新增的AIO部分,其他內容都是微調整,下面我們主要針對AIO部分進行代碼實例的學習。

AIO編程

AIO編程中最大的不同就是取消了多路復用器,它不再使用多路復用器的“多線程”的實現方式,而是完全通過對一條線程的非阻塞高效使用來實現多任務並發,這就歸功於它對操作結果的異步處理。

因為異步操作的回調函數本身就是一個額外的jvm底層的線程池啟動的新線程負責回調並驅動讀寫操作返回結果,當結果處理完畢,它也就自動銷毀了。

所以沒有了多路復用器,又增加了真正異步的實現,AIO無論從編碼上還是功能上都比舊的NIO要好很多。下面閑言少敘,先看代碼:

服務端啟動一個線程,Handler改為新增加的AsyncServerHandler類。

public class NIOTCPServer extends Base {
  public static void main(String[] args) {
    new Thread(new AsyncServerHandler(), "nio-server-reactor-001").start();
  }
}

下面是AsyncServerHandler類。

package javaS.IO.nioS.aioS;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.util.concurrent.CountDownLatch;

import javaS.IO.socketS.Base;

/**
 * 異步非阻塞服務器處理類
 * 
 * @author Evsward
 *
 */
public class AsyncServerHandler extends Base implements Runnable {

  AsynchronousServerSocketChannel asyncServerChannel;// 服務端異步套接字通道
  CountDownLatch latch;// 倒計時門閂

  /**
   * 構造器對象初始化
   */
  public AsyncServerHandler() {
    try {
      // 與NIO相同的操作,通過open靜態方法創建一個AsynchronousServerSocketChannel的實例。
      asyncServerChannel = AsynchronousServerSocketChannel.open();
      asyncServerChannel.bind(new InetSocketAddress(ipAddress, port), 1024);// 一樣的操作,綁定IP端口。
      logger.info("server is listening in address -> " + ipAddress + ":" + port);
    } catch (IOException e) {
      e.printStackTrace();
      System.exit(1);
    }
  }

  @Override
  public void run() {
    latch = new CountDownLatch(1);// 初始化倒計時次數為1
    doAccept();
    try {
      latch.await();// 倒計時門閂開始阻塞,知道倒計時為0,如果在這期間線程中斷,則拋異常:InterruptedException。
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

  public void doAccept() {
    /**
     * 開始接收客戶端連接,將當前服務端異步套接字通道對象作為附件傳入保存,
     * 
     * 同時傳入一個 CompletionHandler<AsynchronousSocketChannel, ? super A>的實現類對象接收accept成功的消息。
     */
    asyncServerChannel.accept(this, new AcceptCompletionHandler());
  }

}

它依賴一個AcceptCompletionHandler類,用來回調處理結果。

package javaS.IO.nioS.aioS;

import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

import javaS.IO.socketS.Base;

/**
 * accept方法的回調操作
 * 
 * 泛型參數1:IO操作的回調結果的類型 泛型參數2:IO操作的附件對象的類型
 * 
 * 這兩個參數對應的也就是回調函數completed的參數類型
 * 
 * @author Evsward
 *
 */
public class AcceptCompletionHandler extends Base
    implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {

  @Override
  /**
   * 傳入一個客戶端異步套接字通道作為accept操作結果的接收者,一個異步非阻塞服務器處理類對象作為附件存儲
   */
  public void completed(AsynchronousSocketChannel result, AsyncServerHandler attachment) {
    /**
     * 為什么要再次執行相同的accept方法,他們甚至參數都一樣?
     * 
     * 因為上一個類中asyncServerChannel的accept方法執行以后,新的客戶端連接結果會調用當前completed方法。
     * 但是服務端是支持多個客戶端連接的,不能只有一個客戶端連接成功以后,調用回調函數completed就結束了。
     * 因此我們要在第一個客戶端連接結果的回調函數中再次開啟一個accept方法以接收第二個客戶端連接,遞歸調用,就可以支持accept無數個客戶端連接了。
     */
    attachment.asyncServerChannel.accept(attachment, this);
    // 開辟一個1MB的臨時緩沖區,將用於從異步套接字通道中讀取數據包
    ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
    /**
     * 回調函數對accept結果進行異步讀操作,讀取客戶端請求,放入buffer容器中
     * 
     * 其中attachment依然作為其回調時的入參:讀數據的時候,是通過ByteBuffer容器,無論是數據源還是結果存放,因此attachment也應該傳入一個ByteBuffer對象
     * 
     * 最后一個參數為異步讀操作的回調函數。
     */
    result.read(buffer, buffer, new ReadCompletionHandler(result));
  }

  @Override
  public void failed(Throwable exc, AsyncServerHandler attachment) {
    exc.printStackTrace();
    attachment.latch.countDown();// 倒計時一次,由於我們定義的初始化次數為1,所以當前線程直接往下運行,脫離阻塞狀態。
  }

}

這個類又依賴着ReadCompletionHandler類,用來做讀操作的回調處理結果。

package javaS.IO.nioS.aioS;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Date;

import javaS.IO.socketS.Base;

public class ReadCompletionHandler extends Base implements CompletionHandler<Integer, ByteBuffer> {
  private AsynchronousSocketChannel channel;

  /**
   * 通過構造器傳入關聯的AsynchronousSocketChannel實例作為成員變量,我們在回調要始終針對這個通道進行IO操作。
   * 
   * 其實從
   * 
   * @param channel
   */
  public ReadCompletionHandler(AsynchronousSocketChannel channel) {
    this.channel = channel;
  }

  @Override
  /**
   * attachment什么時候被賦的值?
   * 
   **** 答:回調的時候數據被填充到了attachment,返回結果是一個狀態碼存儲與Integer result對象中。
   */
  public void completed(Integer result, ByteBuffer attachment) {
    attachment.flip();// 為讀取數據做准備
    byte[] body = new byte[attachment.remaining()];// 創建一個與附件緩沖區大小相同的字節數組。
    attachment.get(body);// 將緩沖區內數據讀到字節數組中去。
    try {
      String req = new String(body, "UTF-8");
      logger.info("客戶端請求信息:" + req);
      if (TIMEQUERY.equals(req)) {
        doWrite(new Date().toString());// 將當前時間作為響應消息返回客戶端
      } else {
        doWrite(req);
      }
    } catch (UnsupportedEncodingException e) {
      e.printStackTrace();
    }
  }

  /**
   * 服務端寫響應信息
   * 
   * @param rep
   */
  private void doWrite(String rep) {
    if (rep != null && rep.trim().length() > 0) {
      byte[] bytes = rep.getBytes();
      ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
      writeBuffer.put(bytes);
      writeBuffer.flip();
      /**
       * 開始向服務端異步套接字通道中寫入數據,同樣的任何出現異步IO操作的都要有CompletionHandler的實現來來做回調的處理。
       * 直接采用匿名內部類實現CompletionHandler接口
       */
      channel.write(writeBuffer, writeBuffer, new CompletionHandler<Integer, ByteBuffer>() {

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
          if (attachment.hasRemaining()) {
            channel.write(writeBuffer, attachment, this);// 遞歸調用自己直到數據全部寫入通道。
            /**
             * 這里的數據全部寫入通道的控制與“寫半包”並不相同,寫半包是由於傳輸容器本身的大小限制正好對數據進行了分割導致,
             * 
             * 處理起來會更加復雜一些,這部分研究現在並不准備展開討論。
             */
          }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {// 對異常的處理
          try {
            channel.close();
          } catch (IOException e) {
            e.printStackTrace();
          }
        }

      });
    }
  }

  @Override
  public void failed(Throwable exc, ByteBuffer attachment) {
    try {
      // 失敗則關閉當前通道
      this.channel.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

}

服務端我們就寫完了,比較復雜,但其實原理都一樣,就是做回調的結果處理類,下面我們來寫客戶端,同樣的,客戶端啟動一個線程,Handler改為新增加的AsyncClientHandler類。

public class NIOTCPClient extends Base {
  public static void main(String[] args) {
    new Thread(new AsyncClientHandler(), "nio-client-reactor-001").start();
  }
}

然后,繼續寫AsyncClientHandler類,

package javaS.IO.nioS.aioS;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

import javaS.IO.socketS.Base;

/**
 * 異步非阻塞客戶端處理類
 * 
 * 本類大量使用匿名內部類來處理回調,好處是客戶端處理類只有本類一個,並無生成如服務端處理類相關的那么多類文件,
 * 
 * 壞處是本類層次結構較復雜,可讀性差。
 * 
 * 注意:Void作為類型要首字母大寫,就好像只有Integer可以作為泛型類型而不是int一樣,但Void不是引用類型,這里只是一種表示void類型的情況。
 * 
 * @author Evsward
 *
 */
public class AsyncClientHandler extends Base
    implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
  private AsynchronousSocketChannel asyncClientChannel;// 客戶端異步套接字通道,成員變量
  private CountDownLatch latch;// 用倒計時門閂來控制線程阻塞等待的狀態,而不是讓線程自己中斷退出

  public AsyncClientHandler() {
    try {
      asyncClientChannel = AsynchronousSocketChannel.open();
    } catch (IOException e) {
      e.printStackTrace();
    }

  }

  @Override
  public void run() {
    latch = new CountDownLatch(1);
    // 對象是本類,回調函數也是本類
    asyncClientChannel.connect(new InetSocketAddress(ipAddress, port), this, this);
    try {
      latch.await();
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
    try {
      asyncClientChannel.close();// 客戶端在請求完畢以后要關閉掉
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  @Override
  public void completed(Void result, AsyncClientHandler attachment) {
    // 客戶端輸入
    BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
    boolean flag = true;
    String str = null;
    while (flag) {
      logger.info("客戶端>輸入信息:");
      try {
        str = input.readLine();
      } catch (IOException e1) {
        e1.printStackTrace();
      }
      if (str == null || "".equals(str)) {
        logger.info("請不要輸入空字符!");
        continue;
      }
      /**
       * 不fail的情況下,只有客戶端輸入bye,才會主動斷開連接。
       */
      if ("bye".equals(str)) {
        logger.info("客戶端終止請求,斷開連接。");
        try {
          asyncClientChannel.close();
          latch.countDown();// 客戶端通道寫異常,釋放線程,執行完畢。
        } catch (IOException e) {
          e.printStackTrace();
        }
        break;
      }
      // 客戶端發起請求
      byte[] req = str.getBytes();
      ByteBuffer reqBuffer = ByteBuffer.allocate(req.length);
      reqBuffer.put(req);
      reqBuffer.flip();
      // 通道讀取到緩沖區成功以后開始寫請求
      asyncClientChannel.write(reqBuffer, reqBuffer, new CompletionHandler<Integer, ByteBuffer>() {

        @Override
        public void completed(Integer result, ByteBuffer attachment) {
          if (reqBuffer.hasRemaining()) {
            asyncClientChannel.write(reqBuffer, reqBuffer, this);
          } else {
            // 寫完請求以后,開始接收響應消息,並對進行結果回調處理
            ByteBuffer readBuffer = ByteBuffer.allocate(BUFFER_SIZE);
            asyncClientChannel.read(readBuffer, readBuffer,
                new CompletionHandler<Integer, ByteBuffer>() {

                  @Override
                  public void completed(Integer result, ByteBuffer attachment) {
                    attachment.flip();
                    byte[] bytes = new byte[attachment.remaining()];
                    attachment.get(bytes);// 將緩沖區讀到字節數組
                    try {
                      String body = new String(bytes, "UTF-8");
                      logger.info("服務端的響應消息:" + body);
                      logger.info("---------------------");
                      // 由於無法保持長連接通信,一次請求響應以后就無法再繼續通信,所以接收服務端響應以后,就斷開連接。
                      asyncClientChannel.close();
                      latch.countDown();
                    } catch (UnsupportedEncodingException e) {
                      e.printStackTrace();
                    } catch (IOException e) {
                      e.printStackTrace();
                    }
                  }

                  @Override
                  public void failed(Throwable exc, ByteBuffer attachment) {
                    try {
                      asyncClientChannel.close();
                      latch.countDown();// 客戶端通道讀異常,釋放線程,執行完畢。
                    } catch (IOException e) {
                      e.printStackTrace();
                    }
                  }

                });
          }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
          try {
            asyncClientChannel.close();
            latch.countDown();// 客戶端通道寫異常,釋放線程,執行完畢。
          } catch (IOException e) {
            e.printStackTrace();
          }
        }

      });
    }
  }

  @Override
  public void failed(Throwable exc, AsyncClientHandler attachment) {
    try {
      asyncClientChannel.close();
      latch.countDown();// 客戶端通道IO操作異常,釋放線程,執行完畢。
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

}

這個類很長,因為我們沒有單獨創建回調類,而是直接采用匿名內部類的方式實現了CompletionHandler接口。下面我們來看輸出結果,然后再來分析與總結。

首先,讓我們先啟動NIOTCPServer,

13:11:11[<init>][main]: server is listening in address -> 127.0.0.1:23451

然后,我們先啟動一個NIOTCPClient,就將它編號為1吧,

13:11:16[completed][Thread-10]: 客戶端>輸入信息:

我們在客戶端1中輸入a回車,客戶端控制台輸出為:

13:11:16[completed][Thread-10]: 客戶端>輸入信息:
a
13:11:18[completed][Thread-10]: 客戶端>輸入信息:
13:11:18[completed][Thread-2]: 服務端的響應消息:a
13:11:18[completed][Thread-2]: ---------------------

接着我們切換到服務端的控制台,發現也發生了變化:

13:11:11[<init>][main]: server is listening in address -> 127.0.0.1:23451
13:11:18[completed][Thread-2]: 客戶端請求信息:a

保持長連接的方法

這里,我曾設法繼續在客戶端1中輸入字符,發送請求,但是我們可以看到“客戶端>輸入信息”這一行已經被異步讀取的響應信息攔住了,此時在Thread-10上繼續輸入信息並沒有響應信息傳回,再次輸入信息回車會發生報錯。我設法去修改ReadCompletionHandler,讓它在發送完響應信息以后,能夠繼續調用AsynchronousSocketChannel的read方法,然后繼續讀取客戶端的請求信息,因為ReadCompletionHandler類本身就是read方法的回調處理類,讓它在處理完相應信息以后相當於在內部調用外部的read方法,再用自己來處理。按照這個思想,我對ReadCompletionHandler的響應寫入部分增加了一段代碼。

      // 開辟一個1MB的臨時緩沖區,將用於從異步套接字通道中讀取數據包
      ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
      channel.read(buffer, buffer, this);

我們仍舊要再開辟一個臨時緩沖區,用來給這條通道上下一個請求數據的處理做容器,然后用我們通過構造方法保存的AsynchronousSocketChannel對象channel,繼續調用它的read方法,並將自身作為回調處理類的入參。重新測試,與上面相同的內容我不再寫,重啟服務端和客戶端以后,接着上面的操作,我們繼續在客戶端1中輸入字符回車,控制台輸出為:

13:30:23[completed][Thread-10]: 客戶端>輸入信息:
a
13:30:25[completed][Thread-10]: 客戶端>輸入信息:
13:30:25[completed][Thread-3]: 服務端的響應消息:a
13:30:25[completed][Thread-3]: ---------------------
a
13:30:26[completed][Thread-10]: 客戶端>輸入信息:
13:30:26[completed][Thread-5]: 服務端的響應消息:a
13:30:26[completed][Thread-5]: ---------------------

我們再繼續輸入關鍵字“query time”用來請求服務端返回當前時間:

...
13:30:31[completed][Thread-7]: ---------------------
query time
13:30:34[completed][Thread-10]: 客戶端>輸入信息:
13:30:34[completed][Thread-8]: 服務端的響應消息:Thu Dec 14 13:30:34 CST 2017
13:30:34[completed][Thread-8]: ---------------------

目前為止,我們想要的功能均實現了,下面我們再啟動一個NIOTCPClient,繼續測試,仍舊成功。到現在為止,一個服務端已經連接了兩個客戶端,均可以正常工作,下面我又連着啟動了5個客戶端繼續測試仍舊穩定運行。回到服務端的控制台,

13:30:18[<init>][main]: server is listening in address -> 127.0.0.1:23451
13:30:25[completed][Thread-2]: 客戶端請求信息:a
13:30:26[completed][Thread-3]: 客戶端請求信息:a
13:30:28[completed][Thread-4]: 客戶端請求信息:afd
13:30:29[completed][Thread-5]: 客戶端請求信息:ads
13:30:30[completed][Thread-6]: 客戶端請求信息:adf
13:30:31[completed][Thread-7]: 客戶端請求信息:adf
13:30:34[completed][Thread-8]: 客戶端請求信息:query time
13:30:43[completed][Thread-10]: 客戶端請求信息:hey
13:30:46[completed][Thread-2]: 客戶端請求信息:heyyou
13:30:49[completed][Thread-3]: 客戶端請求信息:query time

我們使用AIO編程成功實現了回聲加時間訪問的服務器客戶端模型!

繼續優化

我能夠改造成功,是源自AcceptCompletionHandler類的重寫completed方法中的

attachment.asyncServerChannel.accept(attachment, this);

這行代碼我在以上粘貼的源碼部分已經寫下了詳實的注釋介紹了它出現第二次的理由。因為我們的服務端要想繼續支持其他的客戶端連入,就必須在第一個客戶端連入成功以后的回調函數里繼續為其他客戶端開啟accept通道。相似的,我們的服務端在返回響應消息以后,如果想繼續處理客戶端的請求,此時它與該客戶端仍舊保持連接狀態,只是失去了繼續處理該客戶端請求的能力,因此,我們將這個能力賦予給它就可以了。有些朋友說那只是支持了第二個客戶端連接,或者只是支持了當前連接的客戶端第二次請求而已,那第三個客戶端或者第三個請求呢?(這個問題的兩個主語因為是使用相同的手段實現的,所以我把他們放在一起來解釋,希望大家不要困擾。)原因就是我們新加的用於繼續處理新客戶端或者新請求的代碼中,調用的回調處理類是當前類本身,這就是遞歸的調用,無論再連入多少個客戶端,或者當前客戶端發送多少次請求,都可以穩定處理。

如果我們保持程序這樣,客戶端與服務端的通道是沒有主動斷開機制的,除非發生異常(例如你把整個jvm關掉了)。這是程序比較大的bug,如果用CheckStyle等源碼檢查工具來檢查的話會給你標示出來。那么我現在對它進行進一步修改。

我想的是直接在客戶端輸入時判斷輸入信息是否為關鍵字“bye”,如果是的話直接關閉通道,

      /**
       * 不fail的情況下,只有客戶端輸入bye,才會主動斷開連接。
       */
      if ("bye".equals(str)) {
        logger.info("客戶端終止請求,斷開連接。");
        try {
          asyncClientChannel.close();
          latch.countDown();// 客戶端通道寫異常,釋放線程,執行完畢。
        } catch (IOException e) {
          e.printStackTrace();
        }
        break;
      }

但經過測試,我發現客戶端控制台輸入“bye”以后,客戶端可以正常關閉,但是服務端發生異常:不斷的循環客戶端的輸入為空的情況。

好,下面我們對服務端進行調整:我們在服務端ReadCompletionHandler類讀取客戶端請求以后在輸出請求字符前首先判斷:

      // 如果接收到客戶端空字符的情況,說明客戶端已斷開連接,那么服務端也自動斷開通道即可。
      if (req == null || "".equals(req)) {
        channel.close();
        return;
      } 

經過仔細測試,這個方法可行,我們的程序距離健壯性又近了一步。

關於回調

在測試過程中,我們發現每一個請求或者響應的異步回調消息都是通過一個新的線程打印出來的,我們先來看服務端:

14:01:27[<init>][main]: server is listening in address -> 127.0.0.1:23451
14:01:33[completed][Thread-3]: 客戶端請求信息:1
14:01:33[completed][Thread-2]: 客戶端請求信息:2
14:01:34[completed][Thread-5]: 客戶端請求信息:3
14:01:35[completed][Thread-6]: 客戶端請求信息:4
14:01:35[completed][Thread-4]: 客戶端請求信息:5

然后,再來看客戶端:

14:01:31[completed][Thread-10]: 客戶端>輸入信息:
1
14:01:33[completed][Thread-10]: 客戶端>輸入信息:
14:01:33[completed][Thread-2]: 服務端的響應消息:1
14:01:33[completed][Thread-2]: ---------------------
2
14:01:33[completed][Thread-10]: 客戶端>輸入信息:
14:01:33[completed][Thread-4]: 服務端的響應消息:2
14:01:33[completed][Thread-4]: ---------------------
3
14:01:34[completed][Thread-10]: 客戶端>輸入信息:
14:01:34[completed][Thread-5]: 服務端的響應消息:3
14:01:34[completed][Thread-5]: ---------------------
4
14:01:35[completed][Thread-10]: 客戶端>輸入信息:
14:01:35[completed][Thread-6]: 服務端的響應消息:4
14:01:35[completed][Thread-6]: ---------------------
5
14:01:35[completed][Thread-10]: 客戶端>輸入信息:
14:01:35[completed][Thread-7]: 服務端的響應消息:5
14:01:35[completed][Thread-7]: ---------------------

我們發現了,服務端處理請求和客戶端處理響應的新線程並不具備任何關系,例如服務端打印請求為2的線程為Thread-2,然而客戶端返回處理請求2的響應線程為Thread-4,它們並不想等,也就是說這個線程的編號是獨立的。因為這些回調線程是由jdk實現的。

總結

我們終於完成了AIO實例的編程與測試和結果分析,下面我來總結一下。關於網絡編程,

  • 基礎是最普通的IO操作
  • 然后涉及到網絡IO,有了Socket來幫我們做這一層的工作
  • 我們不滿足於它阻塞的表現,增加了NIO,這部分的研究在本篇第一大部分進行了詳細的介紹,我們主要依賴對多路復用器Selector的輪詢來在單線程中實現“多線程”
  • 我們又不滿足與NIO的“假異步”的實現,增加了AIO,形成NIO 2.0,我們上面剛剛完成它的研究,我們是通過異步處理結果以后繼續接收新任務的方式來在單線程中實現“多線程”

其實本篇文章的內容不是真正意義的多線程知識,這個“多線程”是假的,是通過技術手段來合理的分配單一線程處理不同工作的方法,或者是依賴jdk實現過的穩定的回調線程的方式,但這種方式恰恰符合計算機系統中對線程的定義,我們知道cpu只有通過真的多核處理才是“真並發”,而線程多是通過合理分配資源的方式來實現並發的,然而我們也知道,有些cpu廠商也在做“假多核”,實際上這里面的思想是一致的。

本篇我們做的這些研究的工作都是針對TCP的,也就是基於流的,基於長連接的,長連接有個重要的特性就是,不僅可以處理客戶端的請求,它還可以主動給客戶端發送消息,這是長連接最大的優勢。

最后,我們的研究之路是隨着jdk的不斷發展來的,所以最新的AIO的方式肯定是超越舊版的,我們在未來的實際應用中可以選擇使用。接下來,我要趁熱打鐵,介紹多線程的知識,以及NIO開源框架Netty的知識,還有JVM,總之,知識是越研究越多,因為你的視野被逐漸打開了。

參考資料

  • 《netty權威指南》
  • 《java編程思想》
  • jdk 1.6 document api
  • jdk 1.7 document api

源碼位置

其他更多內容請轉到醒者呆的博客園


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM