Java NIO通信的基礎,基於TCP C/S例子介紹


為了更好的理解Netty異步事件驅動網絡通信框架,有必要先了解一點Java NIO原生的通信理論,下面將結合基於TCP的例子程序,含客戶端和服務端的源碼,實現了Echo流程。

 

Java NIO的核心概念有三個:ChannelSelectorByteBuffer

而這當中,Channel的比重最大,NIO的功能主要基於Channel來實現,進行業務邏輯操作。Selector主要是IO事件選擇器,當一個Channel創建並配置好后,注冊到Selector上,與Selector相關的重要概念是SelectionKey,這個上面綁定了IO事件相關的Channel。在獲取到Channel后,進行數據的讀寫操作,Channel的數據讀寫是不能直接操作數據的,必須基於ByteBuffer進行,然而,Java NIO原生的ByteBuffer操作比較繁瑣,要flip和clear操作。

 

1. 而我們在業務邏輯操作中,用到的channel,主要有ServerSocketChannel,SocketChannel,DataGramChannel。下面,用一個圖,來簡要的描述下Channel到這三個具體之類之間的繼承/實現關系(該圖來自網絡,若有不妥,請告知,謝謝)。

 

 

2. Selector,是事件選擇器,創建Selector后,在調用select之前,在注冊Channel到這個Selector上時,必須指定關注的事件類型(interestOps)。通過這個類的select函數,可以獲取選擇上監聽到的IO事件。一旦select函數檢測到事件,就可以從Selector上獲取到具體有哪些IO事件,這些事件通過SelectionKey承載,SelectionKey上標記出該事件的類型,比如是OP_CONNECT,OP_ACCEPT還是OP_READ等。另外,SelectionKey還記錄了對應該IO事件發生的Channel,可以通過SelectionKey得到該Channel。

 

 

3. ByteBuffer。 因為字節操作,是操作系統與IO設備之間進行通信的基本數據單元,在Java NIO中,各通道Channel之間進行數據通信時,指定必須是基於ByteBuffer的。 ByteBuffer有兩個重要的函數,flip和clear。當Channel調用read函數,將數據讀到ByteBuffer中后,ByteBuffer的數據長度指針將會移動到數據長度所在的位置,這個位置是小於等於ByteBuffer容量capacity值的。當業務邏輯操作讀取到的數據前,需要對ByteBuffer做一下flip操作,就是將limit指針指向當前數據指針position的位置,然后,將position指針指向0的位置。數據邏輯結束后,一般要恢復ByteBuffer,即調用clear函數。

 

這三個重要的概念,做了一番解釋和描述后,就以一個demo程序,基於Java NIO的TCP C/S源碼,代碼中帶有了重要邏輯的注釋,后續不再單獨解釋。

A. TCP Server:

/**
 * @author "shihuc"
 * @date   2017年3月16日
 */
package javaSocket.tcp.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

import javaSocket.tcp.Constants;

/**
 * @author chengsh05
 *
 */
public class TcpServer {

    /**
     * @param args
     */
    public static void main(String[] args) {
        try {
            startServer(Constants.SERVER_PORT);
        } catch (IOException e) {            
            e.printStackTrace();
        }
    }
    
    public static void startServer(int port) throws IOException{
        /*
         *開啟一個服務channel, 
         *A selectable channel for stream-oriented listening sockets.
         */
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(port));

        /*
         * 創建一個selector
         */
        Selector selector = Selector.open();    
        /*
         * 將創建的serverChannel注冊到selector選擇器上,指定這個channel只關心OP_ACCEPT事件
         */
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            /*
             * select()操作,默認是阻塞模式的,即,當沒有accept或者read時間到來時,將一直阻塞不往下面繼續執行。
             */
            int readyChannels = selector.select();  
            if (readyChannels <= 0) {
                continue;
            }

            /*
             * 從selector上獲取到了IO事件,可能是accept,也有可能是read
             */
            Set<SelectionKey> SelectonKeySet = selector.selectedKeys(); 
            Iterator<SelectionKey> iterator = SelectonKeySet.iterator();

            /*
             * 循環遍歷SelectionKeySet中的所有的SelectionKey
             */
            while (iterator.hasNext()) {    
                SelectionKey key = iterator.next();                
                if (key.isAcceptable()) {          //處理OP_ACCEPT事件 
                    SocketChannel socketChannel = serverChannel.accept();
                    socketChannel.configureBlocking(false);
                    socketChannel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {  //處理OP_READ事件
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    StringBuilder sb = new StringBuilder();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    
                    int readBytes = 0;
                    int ret = 0;
                    /*
                     * 注意讀數據的時候,ByteBuffer的操作,需要flip,clear進行指針位置的調整
                     */
                    while ((ret = socketChannel.read(byteBuffer)) > 0) {
                        readBytes += ret;
                        byteBuffer.flip();                        
                        sb.append(Charset.forName("UTF-8").decode(byteBuffer).toString());                        
                        byteBuffer.clear();
                    }
                    
                    if (readBytes == 0) {
                        System.err.println("handle opposite close Exception");
                        socketChannel.close();
                    }
                    
                    String message = sb.toString();
                    System.out.println("Message from client: " + message);
                    if (Constants.CLIENT_CLOSE.equalsIgnoreCase(message.toString().trim())) {
                        System.out.println("Client is going to shutdown!");                        
                        socketChannel.close();
                    } else if (Constants.SERVER_CLOSE.equalsIgnoreCase(message.trim())) {
                        System.out.println("Server is going to shutdown!");                        
                        socketChannel.close();
                        serverChannel.close();
                        selector.close();
                        System.exit(0);
                    } else {
                        String outMessage = "Server response:" + message;
                        socketChannel.write(Charset.forName("UTF-8").encode(outMessage));
                    }
                }
                /*
                 * 將selector上當前已經監聽到的且已經處理了的事件標記清除掉。
                 */
                iterator.remove();
            }
        }
    }
}

 

B. TCP Client

/**
 * @author "shihuc"
 * @date   2017年3月16日
 */
package javaSocket.tcp.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;

import javaSocket.tcp.Constants;

/**
 * @author chengsh05
 *
 */
public class TcpClient {

    /**
     * @param args
     */
    public static void main(String[] args) {
        try {
            startClient(Constants.SERVER_IP, Constants.SERVER_PORT);
        } catch (IOException e) {        
            e.printStackTrace();
        }
    }
    
    public static void startClient(String serverIp, int serverPort) throws IOException{
        /*
         * 創建一個SocketChannel,指定為非阻塞模式
         * A selectable channel for stream-oriented connecting sockets. 
         */
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        
        /*
         * 連接到指定的服務地址
         */
        socketChannel.connect(new InetSocketAddress(serverIp, serverPort));

        /*
         * 創建一個事件選擇器Selector
         */
        Selector selector = Selector.open();
        
        /*
         * 將創建的SocketChannel注冊到指定的Selector上,並指定關注的事件類型為OP_CONNECT
         */
        socketChannel.register(selector, SelectionKey.OP_CONNECT);

        /*
         * 從系統輸入終端讀取數據,作為客戶端信息輸入源
         */
        Scanner sc = new Scanner(System.in);
        String cont = null;        
        while(true){
            if(socketChannel.isConnected()){
                cont = sc.nextLine();
                socketChannel.write(Charset.forName("UTF-8").encode(cont));
                if(cont == null || cont.equalsIgnoreCase(Constants.CLIENT_CLOSE)){
                    socketChannel.close();
                    selector.close();
                    sc.close();
                    System.out.println("See you, 客戶端退出系統了");
                    System.exit(0);
                }
            }
            /*
             * 設置1sec的超時時間,進行IO事件選擇操作
             */
            int nSelectedKeys = selector.select(5000);
            if(nSelectedKeys > 0){
                for(SelectionKey skey: selector.selectedKeys()){
                    /*
                     * 判斷檢測到的channel是不是可連接的,將對應的channel注冊到選擇器上,指定關心的事件類型為OP_READ
                     */
                    if(skey.isConnectable()){
                        SocketChannel connChannel = (SocketChannel) skey.channel();
                        connChannel.configureBlocking(false);
                        connChannel.register(selector, SelectionKey.OP_READ);
                        connChannel.finishConnect();
                    }
                    /*
                     * 若檢測到的IO事件是讀事件,則處理相關數據的讀相關的業務邏輯
                     */
                    else if(skey.isReadable()){
                        SocketChannel readChannel = (SocketChannel) skey.channel();
                        StringBuilder sb = new StringBuilder();
                        /*
                         * 定義一個ByteBuffer的容器,容量為1k
                         */
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                       
                        int readBytes = 0;
                        int ret = 0;
                        /*
                         * 注意,對ByteBuffer的操作,需要關心的是flip,clear等。
                         */
                        while ((ret = readChannel.read(byteBuffer)) > 0) {
                            readBytes += ret;
                            byteBuffer.flip();                            
                            sb.append(Charset.forName("UTF-8").decode(byteBuffer).toString());                            
                            byteBuffer.clear();
                        }
                        
                        if (readBytes == 0) {
                            System.err.println("handle opposite close Exception");
                            readChannel.close();
                        }
                    }                                        
                }
                /*
                 * 一次監聽的事件處理完畢后,需要將已經記錄的事件清除掉,准備下一輪的事件標記
                 */
                selector.selectedKeys().clear();
            }else{
                System.err.println("handle select timeout Exception");
                socketChannel.close();
            }
        }        
    }
}

 

閱讀上述代碼時,請注意,server和client的實現風格不太一樣,主要是針對SelectionKeySet的遍歷,一次select操作獲取到的所有的SelectionKey處理完后的掃尾工作,體現出Selector的工作邏輯,若寫過C程序實現過TCP server/client程序,對事件選擇的過程應該就更清楚了。

 

最后,總結一下Java NIO TCP協議下的C/S結構程序流程圖,為徹底理解Java NIO服務。

 

基於這個例子引出的Java NIO的邏輯過程和思想,再去研讀Netty的代碼,相信會容易理解Netty的核心reactor模型工作原理。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM