【Java NIO】一文了解NIO


Java NIO

1 背景介紹

上一篇文章中我們介紹了Java基本IO,也就是阻塞式IO(BIO),在JDK1.4版本后推出了新的IO系統(NIO),也可以理解為非阻塞IO(Non-Blocking IO)。引用《Java NIO》中的一段話來解釋一下NIO出現的原因:

操作系統與 Java 基於流的 I/O模型有些不匹配。操作系統要移動的是大塊數據(緩沖區),這往往是在硬件直接存儲器存取( DMA)的協助下完成的。而 JVM 的 I/O 類喜歡操作小塊數據——單個字節、幾行文本。結果,操作系統送來整緩沖區的數據, java.io 的流數據類再花大量時間把它們拆成小塊,往往拷貝一個小塊就要往返於幾層對象。操作系統喜歡整卡車地運來數據, java.io 類則喜歡一鏟子一鏟子地加工數據。有了 NIO,就可以輕松地把一卡車數據備份到您能直接使用的地方( ByteBuffer 對象)。但是Java里的RandomAccessFile類是比較接近操作系統的方式。

可以看出Java原生的IO模型之所以慢,是因為與操作系統的操作方式不匹配造成的,那么NIO之所以比BIO快主要就是用到了緩沖區相關的技術,接下來慢慢介紹這些技術點。

1.1 緩沖區操作

下圖描述了操作系統中數據是如何從外部存儲向運行中的進程內存區域移動的過程:進程使用read()系統調用要求緩沖區被填充滿。內核隨即向磁盤控制器發出指令,要求其從磁盤讀取數據。磁盤控制器通過DMA直接把磁盤上的數據寫入緩沖區,這一步不需要CPU的參與。當緩沖區填滿時,內核將數據從臨時緩沖區拷貝到進程執行read()調用時指定的緩沖區。

這里需要主要為什么要執行系統調用這樣一個中間步驟而不是直接DMA到進程的緩沖區,是因為用戶空間是無法直接操作硬件的,另外磁盤這種塊存儲設備操作的是固定大小的數據塊,而用戶請求的則是非規則大小的數據,內核空間在這里的作用就是分解、重組的作用。

2 基本組件

Java NIO主要依賴的組件有三個:緩沖區Buffer、通道Channel和選擇器Selector。

2.1 緩沖區(Buffer)

Buffer家族主要有這么些個成員,根據類名也大概能猜到它們的用處,用的最多的是ByteBuffer,在下面的例子中也會主要用到它。

在這里就不仔細講Buffer類的API了,因為需要用的時候可以去查Java Doc,而以幾個常用的操作來講述一下怎么使用Buffer。

2.1.1 緩沖區屬性

容量(capacity):緩沖區的最大大小

上界(limit):緩沖區當前的大小

位置(position):下一個要讀寫的位置,由get()和put()更新

標記(mark):備忘位置,由mark()來指定mark = position,由reset()來指定position=mark

它們之間的大小關系:

0 <= mark <= position <= limit <= capacity

2.1.2 創建緩沖區

一種最常用的方式是:

ByteBuffer buffer = ByteBuffer.allocate(1024);

這種方法是創建一個1024字節大小的緩沖區。也可以用下面這種方式來包裝自己創建的字節數組。
byte[] bytes = new byte[1024];
ByteBuffer buffer = ByteBuffer.wrap(bytes);

2.1.3 緩沖區翻轉

Buffer在填充完畢后需要傳遞到一個通道中,這時如果直接讀取Buffer,其實是什么都讀不到的。因為Buffer的設計中是有一個指針概念的,指向當前的位置,當一個Buffer填充完畢時指針是指向末尾的,因此在讀取時應該將指針指向Buffer的頭部,簡單的方法就是使用下面這個方法:

Buffer.flip();

flip的實現如下:

public final Buffer flip() {
    limit = position;
    position = 0;
    mark = -1;
    return this;
}

可以看出flip其實是把當前的limit從capacity變成了position,又把position放到了緩沖區的起點,並取消了mark。

2.1.4 緩沖區清空

Buffer.clear();

clear的實現如下:

public final Buffer clear() {
    position = 0;
    limit = capacity;
    mark = -1;
    return this;
}

clear函數就是將position放到起點,並重置limiti為capacity,以及取消mark。

2.1.5 另外一種翻轉

Buffer.rewind();

rewind的實現如下:

public final Buffer rewind() {
    position = 0;
    mark = -1;
    return this;
}

rewind和flip的區別在於沒有改變limit的值。

2.1.6 緩沖區壓縮

Buffer.compact()

2.2 通道(Channel)

開始我不是很理解Channel這個東西為什么要存在,看了書才慢慢明白,緩沖區為我們裝載了數據,但是數據的寫入和讀取並不能直接進行read()和write()這樣的系統調用,而是JVM為我們提供了一層對系統調用的封裝。而Channel可以用最小的開銷來訪問操作系統本身的IO服務,這就是為什么要有Channel的原因。

下面來講講常用的幾個Channel類及其常用的方法。

2.2.1 常見Channel分類

I/O從廣義上可以分為File I/O和Stream I/O,對應到通道來說就有文件通道和socket通道,具體的說是FileChannle類和SocketChannel、ServerSocketChannel和DatagramChannel類。

它們之間的區別還是很大的,從繼承關系上來看:

public abstract class FileChannel
    extends AbstractInterruptibleChannel
    implements SeekableByteChannel, GatheringByteChannel, ScatteringByteChannel

FileChannel主要是繼承了可中斷接口,而對於socket相關的Channel類都繼承AbstractSelectableChannel,這是選擇器(Selector)相關的通道,在下一節中具體講解。

public abstract class SocketChannel
    extends AbstractSelectableChannel
    implements ByteChannel, ScatteringByteChannel, GatheringByteChannel, NetworkChannel

2.2.2 文件通道

2.2.2.1 打開

FileChannel只能通過工廠方法來實例化,那就是調用RandomAccessFile、FileInputStream和FileOutputStream的getChannel()方法。如:

RandomAccessFile file = new RandomAccessFile("a.txt", "r");
FileChannel fc = file.getChannel();
2.2.2.2 使用

先看看FileChannel提供的方法句柄:

public abstract int read(ByteBuffer dst) throws IOException;//把通道中數據傳到目的緩沖區中,dst是destination的縮寫
public abstract int write(ByteBuffer src) throws IOException;//把源緩沖區中的內容寫到指定的通道中去

從句柄可以看出FileChannel是既可以讀又可以寫的,是全雙工的。下面這個例子用來展示FileChannel是如何進行讀和寫的。

public class FileChannelTest {

    public static void readFile(String path) throws IOException {
        FileChannel fc = new FileInputStream(path).getChannel();
        ByteBuffer buffer = ByteBuffer.allocate(128);
        StringBuilder sb = new StringBuilder();
        while ((fc.read(buffer)) >= 0) {
            //翻轉指針
            buffer.flip();
            //remaining = limit - position
            byte[] bytes = new byte[buffer.remaining()];
            buffer.get(bytes);
            String string = new String(bytes, "UTF-8");
            sb.append(string);

            //清空buffer
            buffer.clear();
        }
        System.out.println(sb.toString());
    }

    public static void writeFile(String path, String string) throws IOException {
        FileChannel fc = new FileOutputStream(path).getChannel();
        ByteBuffer buffer = ByteBuffer.allocate(10);
        int current = 0;
        int len = string.getBytes().length;
        while (current < len) {
            for (int i=0;i<10;i++) {
                if (current+i>=len) break;
                buffer.put(string.getBytes()[current+i]);
            }
            current += buffer.position();

            buffer.flip();
            fc.write(buffer);
            buffer.clear();
        }
    }


    public static void main(String[] args) throws IOException {
        String in = "D:/in.txt";
        String out = "D:/out.txt";
        readFile(in);
        writeFile(out, "hello world");
        readFile(out);
    }
}

分析一下上面這段代碼,在readFile()函數中,通過FileInputStream.getChannel()得到FileChannel對象,並創建ByteBuffer對象,接着利用FileChannel的read方法填充buffer,得到填充完的buffer之后我們將buffer的當前指針翻轉一下接着利用buffer的get方法把數據放到byte數組中,接着就可以讀取數據了。

讀取文件的整個過程相比原生的I/O方法還是略顯麻煩,但是我們如果把數據看成一堆煤礦,把ByteBuffer看成裝煤的礦車,而FileChannel看成是運煤的礦道,那么上面的過程就演變成了:先打通一條礦道,然后把煤礦裝在小車里運出來。形象的記憶更利於理解這個過程。

而writeFile()函數也是類似,為了更好的理解Buffer的屬性,我特意將buffer的大小設置為10,為要寫入的字符串長度為11個字節。首先還是通過FileOutputStream.getChannel()方法得到FileChannel對象,並創建一個10字節大小的緩沖區,接着定義一個整型變量current指向要寫入的字符串的當前下標,每次向buffer中put10個字節,並更新current,通過buffer.position()方法可以得到buffer被填充之后指針的位置,也就是緩沖區里的字節個數,然后翻轉指針,最后通過FileChannel.write(buffer)方法將buffer寫入到文件中。

同樣考慮一下形象化的過程:我們首先把煤礦裝入小車(buffer.put()),並打開一條通往礦山的礦道(FileOutputStream.getChannel()),接着把煤礦運輸進去(FileChannel.write(buffer))。還是很容易理解的吧!

2.2.3 Socket通道

另一篇博客中介紹了阻塞式TCP的使用,接下來會介紹一下非阻塞式的TCP使用。

Socket通道與文件通道有着不同的特征,最顯著的就是可以運行非阻塞模式並且是可以選擇的。在2.2.1節中我們講到Socket通道都繼承自AbstractSelectableChannel類,而文件通道沒有,而這個類就是Socket通道擁有非阻塞和可選擇特點的關鍵。下面是SelectableChannel的幾個方法句柄:

public abstract boolean isBlocking();
public abstract SelectableChannel configureBlocking(boolean block)
    throws IOException;

從這兩個方法句柄可以看到,設置一個socket通道的非阻塞模式只需要:

socketChannel.configureBlocking(false)

即可。而有條件的選擇(readiness selection)是一種可以用來查詢通道的機制,該查詢可以判斷通道是否准備好執行一個目標操作,比如read、write或accept。這個特性是在SelectableChannel類和SelectionKey類中進行了定義。

public static final int OP_READ = 1 << 0;
public static final int OP_WRITE = 1 << 2;
public static final int OP_CONNECT = 1 << 3;
public static final int OP_ACCEPT = 1 << 4;

SelectionKey中的四個常量定義了socket通道的四種狀態,而SelectableChannel的register方法正好返回了SelectionKey對象。

public abstract SelectionKey register(Selector sel, int ops, Object att)
    throws ClosedChannelException;
2.2.3.1 創建

socket通道與文件通道不同,並不是通過socket.getChannel()來創建對象(盡管socket對象有這個方法),而是通過SocketChannel.open()這樣的靜態工廠方法去創建對象。每一個socket通道有與之關聯的一個socket對象,卻並不是所有的socket對象都有一個關聯的通道,如果用傳統的方法創建了一個socket對象,則它不會有一個關聯的通道並且getChannel()方法總是返回null。

SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);

這樣就創建了一個非阻塞的socket通道。

2.2.3.2 ServerSocketChannel
public abstract class ServerSocketChannel
	extends AbstractSelectableChannel
{
	public static ServerSocketChannel open( ) throws IOException;
	public abstract ServerSocket socket( );
	public abstract ServerSocket accept( ) throws IOException;
	public final int validOps( );
}

ServerSocketChannel與SocketChannel和DatagramChannel不同,它本身是不傳輸數據的,提供的接口非常簡單,如果要進行數據讀寫,需要通過ServerSocketChannel.socket()方法返回一個與之關聯的ServerSocket對象來進行。

ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket ss = ssc.socket();
ss.bind(new InetSocketAddress(port));

ServerSocketChannel同ServerSocket一樣也有accept()方法,當調用ServerSocket的accept()函數時只能是阻塞式的,而調用ServerSocketChannel的accept()函數卻可以是非阻塞式。

下面這個例子展示了ServerSocketChannel的用法:

public class Server {
    static int port = 20001;

    public static void main(String[] args) throws IOException, InterruptedException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.socket().bind(new InetSocketAddress(port));
        ssc.configureBlocking(false);
        String string = "hello client";
        ByteBuffer buffer = ByteBuffer.wrap(string.getBytes());
        ByteBuffer in = ByteBuffer.allocate(1024);
        System.out.println("Server wait for connection...");
        while (true) {
            SocketChannel sc = ssc.accept();
            if (sc == null) {
                TimeUnit.SECONDS.sleep(1);
            }else {
                //rewind只是將position調到0,不會改變Limit的值,而flip是將limit調整成position,再把position改成0
                System.out.println("Server get a connection...");

                sc.read(in);
                in.flip();

                buffer.rewind();
                sc.write(buffer);
                System.out.println("From client:" + new String(in.array(), "UTF-8"));
            }
        }
    }
}

2.3 選擇器(Selector)

選擇器其實是一種多路復用機制在Java語言中的應用,在學習Selector之前有必要學習一下I/O多路復用的概念。

2.3.1 多路復用

之前的文章中我們已經看到對於每個客戶端請求都分配一個線程的設計,或者是利用線程池來處理客戶端請求,但是這樣的設計對於處理客戶端有大量請求的情況都束手無策。原因在於首先線程非常消耗系統資源,其次阻塞式的設計在某一個請求發送的數據很大時會使其他請求等待很久。那么究竟有沒有其他方法來解決這個問題呢?早在上世紀80年代在Unix系統中就已經提出select模型來解決這個問題,在之后對select進行優化又提出了poll模型和epoll模型(Linux專有)。

select/poll/epoll其實都是一種多路復用模型,什么是多路復用,開始聽見這個名詞我也是一臉懵逼,覺得好像很高大上很難理解的樣子。后面通過看書和看知乎上的形象化描述,慢慢理解了其實多路復用也沒有想象着那么難。我們如果把每個客戶端請求看成一個電路,如下圖,那么是否有必要為每條電路都分配一條專有的線路呢?還是當電流來了進行開關切換?很明顯,后者只需要一個開關就可以節省大量的不必要開銷。select模型其實就是這樣做的,監控所有的socket請求,當某個socket准備好(read/write/accept)時就進行處。但是如何做到監控所有socket的狀態呢,select做的很簡單,也許你也想到了,就是去輪詢所有socket的狀態,這樣很明顯當socket數量比較大時效率非常低。並且select對於監控的socket數量有限制,默認是1024個。poll模型進行了一些改進,但是並沒有本質的改變。到了epoll模型,就有了非常大的改觀。假象另一個場景,如果你是一個監考老師,考試結束時要去收卷子,你按照常理一個一個的收着,一旦有一個學生還沒寫完,於是你就會卡(阻塞)在那,並且整個輪詢一遍下來非常慢。所以你想到了嗎?讓那些已經做完的學生舉手告知你他已經做完了,你再過去收一下卷子即可。這樣很明顯阻塞會大幅度減少。這就是epoll,讓那些已經准備好的socket發出通知,然后來處理它。

如果還是不理解,可以看看知乎上的一些回答

2.3.2 NIO多路復用

好了,廢話這么多,已經是可以理解多路復用是什么了。Java語言直到JDK1.4版本才有多路復用這個概念,很大原因也是因為沒人用Java去寫服務器,例如著名的Apache和Nginx都是用C/C++寫的。接下來對NIO中多路復用的實現進行介紹。

NIO處理多路復用請求只需要三個組件:可選擇的通道(SelectableChannels)、選擇器(Selector)和選擇鍵(SelectionKey),他們之間的關系如下圖所示:

可選擇的通道可以主動注冊到一個選擇器上,並指定對哪些動作是感興趣的。這個注冊行為會返回一個選擇鍵,選擇鍵封裝了該通道和選擇器之間的注冊關系,包含兩個比特集:指示該注冊關系所關心的通道操作;通道已經准備好的操作。選擇器是核心組件,它管理着注冊在其上的通道集合的信息和它們的就緒狀態。值得注意的是,通道在注冊到一個選擇器之前,必須設置為非阻塞模式。原因在這里

2.3.3 常用操作

2.3.3.1 創建選擇器

通過靜態工廠方法創建一個選擇器。

Selector selector = Selector.open();
2.3.3.2 通道注冊到選擇器上

這是通道擁有的方法,先看看方法句柄:

public abstract SelectionKey register(Selector sel, int ops, Object att)
    throws ClosedChannelException;

public final SelectionKey register(Selector sel, int ops)
    throws ClosedChannelException
{
    return register(sel, ops, null);
}

值得注意的是第二個參數ops,這個參數表示了該通道感興趣的操作,所有的操作包括讀(read)、寫(write)、連接(connect)和接受(accept)。並不是所有通道都支持這些操作,比如SocketChannel就沒有accept這個操作,因為這是專屬於ServerSocketChannel的操作。可以通過調用Channel.validOps()來查詢支持的操作。

第三個參數是用來傳遞一個對象的引用,在調用新生成的選擇鍵的attach()方法時會返回該對象的引用。

2.3.3.3 選擇過程

選擇器的核心功能是選擇過程,選擇器實際上是對select()、poll()等本地系統調用的一個封裝。每一個選擇器會維護三個鍵集合:已注冊的鍵集合、已選擇的鍵集合和已取消的鍵集合。通過執行Selector.select()、Selector.select(int timeout)或Selector.selectNow(),選擇過程被調用,這時會執行以下步驟:

  • 首先會檢查被取消的鍵的集合。因為在任何時候選擇鍵(通道和選擇器的綁定關系)都可能被取消,所以在正式選擇之前需要先檢查一下被取消的鍵。如果這個集合非空,則其中的鍵會從另外兩個鍵集合中去除。
  • 已注冊的鍵集合中的鍵的interest集合將被檢查。這個過程會調用底層的系統調用(具體調用依賴於特定的操作系統),如果沒有通道准備好,則線程會阻塞在這里。這個操作會更新那些准備好interest集合中至少一種操作的通道的ready集合。這一句非常的拗口,也比較難理解,說的簡單一點,就是每個通道有一個感興趣操作集合,底層的系統調用可以去檢查這些操作是否就緒,如果就緒就會更新該通道綁定的選擇鍵里的相關值。所以你只需要去檢查選擇鍵里的相關值就可以知道該操作是不是准備好了。
  • 完成步驟2可能很耗時。完成后還需要再進行步驟1,因為這個過程中某些選擇鍵也可能被取消,這樣做是為了提高程序的健壯性(robust)。
  • 最后select()操作會返回此次選擇過程中ready()集合被改變的鍵的數量,而不是所有的ready()集合中的鍵的數量。這非常合理,因為你可以知道這次選擇過程到底有幾個通道准備就緒。通過判斷select()返回值是否大於0,就可以知道要不要去操作了。

好不容易才寫完上面這段,因為我在看原書的時候看了2-3遍才看懂,過程還是比較復雜的,我覺得是時候去看看Unix中的select()是怎么做的,也許這樣更利於理解這個選擇過程。

2.3.3.4 綜合使用這三個組件

說了這么多原理,不知道你暈沒暈,反正我是快暈了。這時候來一段實戰代碼,告訴你了解了這么多,到底該怎么用!

通常的做法如下:在選擇器上調用一次select操作(這會更新已選擇鍵的集合),然后遍歷selectedKeys返回的鍵的集合。接着鍵將從已選擇的鍵的集合中被移除(通過Iterator.remove()方法),然后檢測下一個鍵。完成后,繼續下一次select操作。

服務端程序演示:

public class SelectorTest {
    public static void main(String[] args) throws IOException {
        new SelectorTest().select();
    }

    public void select() throws IOException {
        //創建選擇器
        Selector selector = Selector.open();
        //創建serverChannel
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //設置為非阻塞模式
        ssc.configureBlocking(false);
        //綁定監聽的地址
        ssc.socket().bind(new InetSocketAddress(20000), 1024);
        //將serverChannel注冊到選擇器上,監聽accept事件,返回選擇鍵
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            //此次選擇過程准備就緒的通道數量
            int num = selector.select();
            if (num == 0) {
                //若沒有准備好的就繼續循環
                continue;
            }
            //返回已就緒的鍵集合
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                handle(selector, key);
                //因為已經處理了該鍵,所以把當前的key從已選擇的集合中去除
                it.remove();
            }
        }
    }

    public void handle(Selector selector, SelectionKey key) throws IOException {
        if (key.isValid()) {
            //當一個ServerChannel為accept狀態時,注冊這個ServerChannel的SocketChannel為可讀取狀態
            if (key.isAcceptable()) {
                ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                SocketChannel channel = serverChannel.accept();
				//把通道注冊到選擇器之前要設置為非阻塞,否則會報異常
                channel.configureBlocking(false);
                channel.register(selector, SelectionKey.OP_READ);
            }
            //如果channel是可讀取狀態,則讀取其中的數據
            if (key.isReadable()) {
                //只有SocketChannel才能讀寫數據,所以如果是可讀取狀態,只能是SocketChannel
                SocketChannel sc = (SocketChannel) key.channel();
                ByteBuffer in = ByteBuffer.allocate(1024);
                //將socketChannel中的數據讀入到buffer中,返回當前字節的位置
                int readBytes = sc.read(in);
                if (readBytes > 0) {
                    //把buffer的position指針指向buffer的開頭
                    in.flip();
                    byte[] bytes = new byte[in.remaining()];
                    in.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The server receive : " + body);
                    //把response輸出到socket中
                    doWrite(sc, "Hello client");
                } else if (readBytes < 0) {
                    key.cancel();
                    sc.close();
                }
            }
        }
    }

    private void doWrite(SocketChannel sc, String response) throws IOException {
		//把服務器端返回的數據寫到socketChannel中
        if (response == null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            sc.write(writeBuffer);
        }
    }
}

代碼相較於阻塞式TCP服務端程序復雜了太多倍,但是基本思路跟我上面那段話寫的是一樣的,而且基本每一段代碼都寫了注釋,耐心看下去肯定看的懂。我就不再解釋這段代碼啦。

客戶端演示:

public class Client {
    public static final int PORT = 20000;
    public static final String HOST = "127.0.0.1";
    private volatile boolean stop = false;

    public static void main(String[] args) throws IOException {
        new Client().select();
    }

    public void select() throws IOException {
        // 創建選擇器
        Selector selector = Selector.open();
        // 創建SocketChannel
        SocketChannel sc = SocketChannel.open();
        // 設置為非阻塞模式
        sc.configureBlocking(false);
        try {
            doConnect(selector, sc);
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }

        while (!stop) {
            int num = selector.select();
            if (num == 0) {
                continue;
            }
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                try {
                    handleKeys(selector, key);
                } catch (Exception e) {
                    e.printStackTrace();
                    if (key != null) {
                        key.cancel();
                        if (key.channel() != null) {
                            key.channel().close();
                        }
                    }
                }
                // 因為已經處理了該鍵,所以把當前的key從已選擇的集合中去除
                it.remove();
            }
        }

        if (selector != null) {
            selector.close();
        }
    }

    private void doWrite(SocketChannel sc, String response) throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            writeBuffer.flip();
            sc.write(writeBuffer);
            if (!writeBuffer.hasRemaining()) {
                System.out.println("Send msg successfully");
            }
        }
    }

    private void handleKeys(Selector selector, SelectionKey key) throws IOException {
        if (key.isValid()) {
            SocketChannel sc = (SocketChannel) key.channel();
            // 判斷是否連接成功
            if (key.isConnectable()) {
                if (sc.finishConnect()) {
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc, "Hello Server");
                } else {
                    System.exit(1);
                }
            }
            if (key.isReadable()) {
                ByteBuffer in = ByteBuffer.allocate(1024);
                // 將socketChannel中的數據讀入到buffer中,返回當前字節的位置
                int readBytes = sc.read(in);
                if (readBytes > 0) {
                    // 把buffer的position指針指向buffer的開頭
                    in.flip();
                    byte[] bytes = new byte[in.remaining()];
                    in.get(bytes);
                    String body = new String(bytes, "UTF-8");
                    System.out.println("The Client receive : " + body);
                    this.stop = true;
                } else if (readBytes < 0) {
                    // 對端鏈路關閉
                    key.cancel();
                    sc.close();
                } else {
                    // 讀到0字節,忽略
                }
            }
        }
    }

    private void doConnect(Selector selector, SocketChannel sc) throws IOException {
        if (sc.connect(new InetSocketAddress(HOST, PORT))) {
            System.out.println("Client connect successfully...");
            // 如果直接連接成功,則注冊讀操作
            sc.register(selector, SelectionKey.OP_READ);
            doWrite(sc, "Hello server!");
        } else {
            // 如果沒有連接成功,則注冊連接操作
            sc.register(selector, SelectionKey.OP_CONNECT);
        }
    }
}

客戶端跟服務端很相似,唯一不同的是服務端需要監測的socket行為是OP_ACCEPT和OP_READ,而客戶端需要監控的是OP_CONNECT和OP_READ,其他的區別不是很大。

依次運行服務器端和客戶端,結果如下:

代碼在我的github repo上也可以找到。

3 總結

花了大概三天的時間,把《Java NIO》這本書看了一遍並記錄了下來學習過程,並且結合《Netty權威指南》中的例子去實踐了一下,慢慢感覺到NIO的魅力。反思一下學習的比較慢的原因,應該是對Unix上的I/O模型不熟悉導致的,所以覺得接下來好好學習一下select、poll、epoll,加深對多路復用的理解。

本文中可能存在理解有偏差的地方,也請多多指正。

參考文獻

1.《Java NIO》

2.《Netty權威指南》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM