【Java TCP/IP Socket】基於NIO的TCP通信(含代碼)


NIO主要原理及使用

    NIO采取通道(Channel)和緩沖區(Buffer)來傳輸和保存數據,它是非阻塞式的I/O,即在等待連接、讀寫數據(這些都是在一線程以客戶端的程序中會阻塞線程的操作)的時候,程序也可以做其他事情,以實現線程的異步操作。

   考慮一個即時消息服務器,可能有上千個客戶端同時連接到服務器,但是在任何時刻只有非常少量的消息需要讀取和分發(如果采用線程池或者一線程一客戶端方式,則會非常浪費資源),這就需要一種方法能阻塞等待,直到有一個信道可以進行I/O操作。NIO的Selector選擇器就實現了這樣的功能,一個Selector實例可以同時檢查一組信道的I/O狀態,它就類似一個觀察者,只要我們把需要探知的SocketChannel告訴Selector,我們接着做別的事情,當有事件(比如,連接打開、數據到達等)發生時,它會通知我們,傳回一組SelectionKey,我們讀取這些Key,就會獲得我們剛剛注冊過的SocketChannel,然后,我們從這個Channel中讀取數據,接着我們可以處理這些數據。

    Selector內部原理實際是在做一個對所注冊的Channel的輪詢訪問,不斷的輪詢(目前就這一個算法),一旦輪詢到一個Channel有所注冊的事情發生,比如數據來了,它就會讀取Channel中的數據,並對其進行處理。

    要使用選擇器,需要創建一個Selector實例,並將其注冊到想要監控的信道上(通過Channel的方法實現)。最后調用選擇器的select()方法,該方法會阻塞等待,直到有一個或多個信道准備好了I/O操作或等待超時,或另一個線程調用了該選擇器的wakeup()方法。現在,在一個單獨的線程中,通過調用select()方法,就能檢查多個信道是否准備好進行I/O操作,由於非阻塞I/O的異步特性,在檢查的同時,我們也可以執行其他任務。

 

 

   基於NIO的TCP連接的建立步驟

 

  服務端

 

    1、傳建一個Selector實例;

    2、將其注冊到各種信道,並指定每個信道上感興趣的I/O操作;

    3、重復執行:

        1)調用一種select()方法;

        2)獲取選取的鍵列表;

        3)對於已選鍵集中的每個鍵:

           a、獲取信道,並從鍵中獲取附件(如果為信道及其相關的key添加了附件的話);

           b、確定准備就緒的操縱並執行,如果是accept操作,將接收的信道設置為非阻塞模式,並注冊到選擇器;

           c、如果需要,修改鍵的興趣操作集;

           d、從已選鍵集中移除鍵

 

     客戶端

   與基於多線程的TCP客戶端大致相同,只是這里是通過信道建立的連接,但在等待連接建立及讀寫時,我們可以異步地執行其他任務。

 

 

   基於NIO的TCP通信Demo

    下面給出一個基於NIO的TCP通信的Demo,客戶端發送一串字符串到服務端,服務端將該字符串原原本本地反饋給客戶端。

    客戶端代碼及其詳細注釋如下:

 

import java.net.InetSocketAddress;  
import java.net.SocketException;  
import java.nio.ByteBuffer;  
import java.nio.channels.SocketChannel;  
  
public class TCPEchoClientNonblocking {  
    public static void main(String args[]) throws Exception{  
        if ((args.length < 2) || (args.length > 3))   
        throw new IllegalArgumentException("參數不正確");  
        //第一個參數作為要連接的服務端的主機名或IP  
        String server = args[0];   
        //第二個參數為要發送到服務端的字符串  
        byte[] argument = args[1].getBytes();  
        //如果有第三個參數,則作為端口號,如果沒有,則端口號設為7  
        int servPort = (args.length == 3) ? Integer.parseInt(args[2]) : 7;  
        //創建一個信道,並設為非阻塞模式  
        SocketChannel clntChan = SocketChannel.open();  
        clntChan.configureBlocking(false);  
        //向服務端發起連接  
        if (!clntChan.connect(new InetSocketAddress(server, servPort))){  
            //不斷地輪詢連接狀態,直到完成連接  
            while (!clntChan.finishConnect()){  
                //在等待連接的時間里,可以執行其他任務,以充分發揮非阻塞IO的異步特性  
                //這里為了演示該方法的使用,只是一直打印"."  
                System.out.print(".");    
            }  
        }  
        //為了與后面打印的"."區別開來,這里輸出換行符  
        System.out.print("\n");  
        //分別實例化用來讀寫的緩沖區  
        ByteBuffer writeBuf = ByteBuffer.wrap(argument);  
        ByteBuffer readBuf = ByteBuffer.allocate(argument.length);  
        //接收到的總的字節數  
        int totalBytesRcvd = 0;   
        //每一次調用read()方法接收到的字節數  
        int bytesRcvd;   
        //循環執行,直到接收到的字節數與發送的字符串的字節數相等  
        while (totalBytesRcvd < argument.length){  
            //如果用來向通道中寫數據的緩沖區中還有剩余的字節,則繼續將數據寫入信道  
            if (writeBuf.hasRemaining()){  
                clntChan.write(writeBuf);  
            }  
            //如果read()接收到-1,表明服務端關閉,拋出異常  
            if ((bytesRcvd = clntChan.read(readBuf)) == -1){  
                throw new SocketException("Connection closed prematurely");  
            }  
            //計算接收到的總字節數  
            totalBytesRcvd += bytesRcvd;  
            //在等待通信完成的過程中,程序可以執行其他任務,以體現非阻塞IO的異步特性  
            //這里為了演示該方法的使用,同樣只是一直打印"."  
            System.out.print(".");   
        }  
        //打印出接收到的數據  
        System.out.println("Received: " +  new String(readBuf.array(), 0, totalBytesRcvd));  
        //關閉信道  
        clntChan.close();  
    }  
}  

服務端用單個線程監控一組信道,代碼如下:

 

import java.io.IOException;  
import java.net.InetSocketAddress;  
import java.nio.channels.SelectionKey;  
import java.nio.channels.Selector;  
import java.nio.channels.ServerSocketChannel;  
import java.util.Iterator;  
  
public class TCPServerSelector{  
    //緩沖區的長度  
    private static final int BUFSIZE = 256;   
    //select方法等待信道准備好的最長時間  
    private static final int TIMEOUT = 3000;   
    public static void main(String[] args) throws IOException {  
        if (args.length < 1){  
            throw new IllegalArgumentException("Parameter(s): <Port> ...");  
        }  
        //創建一個選擇器  
        Selector selector = Selector.open();  
        for (String arg : args){  
            //實例化一個信道  
            ServerSocketChannel listnChannel = ServerSocketChannel.open();  
            //將該信道綁定到指定端口  
            listnChannel.socket().bind(new InetSocketAddress(Integer.parseInt(arg)));  
            //配置信道為非阻塞模式  
            listnChannel.configureBlocking(false);  
            //將選擇器注冊到各個信道  
            listnChannel.register(selector, SelectionKey.OP_ACCEPT);  
        }  
        //創建一個實現了協議接口的對象  
        TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE);  
        //不斷輪詢select方法,獲取准備好的信道所關聯的Key集  
        while (true){  
            //一直等待,直至有信道准備好了I/O操作  
            if (selector.select(TIMEOUT) == 0){  
                //在等待信道准備的同時,也可以異步地執行其他任務,  
                //這里只是簡單地打印"."  
                System.out.print(".");  
                continue;  
            }  
            //獲取准備好的信道所關聯的Key集合的iterator實例  
            Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();  
            //循環取得集合中的每個鍵值  
            while (keyIter.hasNext()){  
                SelectionKey key = keyIter.next();   
                //如果服務端信道感興趣的I/O操作為accept  
                if (key.isAcceptable()){  
                    protocol.handleAccept(key);  
                }  
                //如果客戶端信道感興趣的I/O操作為read  
                if (key.isReadable()){  
                    protocol.handleRead(key);  
                }  
                //如果該鍵值有效,並且其對應的客戶端信道感興趣的I/O操作為write  
                if (key.isValid() && key.isWritable()) {  
                    protocol.handleWrite(key);  
                }  
                //這里需要手動從鍵集中移除當前的key  
                keyIter.remove();   
            }  
        }  
    }  
}  

這里為了使不同協議都能方便地使用這個基本的服務模式,我們把信道中與具體協議相關的處理各種I/O的操作分離了出來,定義了一個接口,如下:

 

import java.nio.channels.SelectionKey;  
import java.io.IOException;  
  
/** 
*該接口定義了通用TCPSelectorServer類與特定協議之間的接口, 
*它把與具體協議相關的處理各種I/O的操作分離了出來, 
*以使不同協議都能方便地使用這個基本的服務模式。 
*/  
public interface TCPProtocol{  
    //accept I/O形式  
    void handleAccept(SelectionKey key) throws IOException;  
    //read I/O形式  
    void handleRead(SelectionKey key) throws IOException;  
    //write I/O形式  
    void handleWrite(SelectionKey key) throws IOException;  
}  

接口的實現類代碼如下:

 

import java.nio.channels.SelectionKey;  
import java.nio.channels.SocketChannel;  
import java.nio.channels.ServerSocketChannel;  
import java.nio.ByteBuffer;  
import java.io.IOException;  
  
public class EchoSelectorProtocol implements TCPProtocol {  
    private int bufSize; // 緩沖區的長度  
    public EchoSelectorProtocol(int bufSize){  
    this.bufSize = bufSize;  
    }  
  
    //服務端信道已經准備好了接收新的客戶端連接  
    public void handleAccept(SelectionKey key) throws IOException {  
        SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();  
        clntChan.configureBlocking(false);  
        //將選擇器注冊到連接到的客戶端信道,並指定該信道key值的屬性為OP_READ,同時為該信道指定關聯的附件  
        clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));  
    }  
  
    //客戶端信道已經准備好了從信道中讀取數據到緩沖區  
    public void handleRead(SelectionKey key) throws IOException{  
        SocketChannel clntChan = (SocketChannel) key.channel();  
        //獲取該信道所關聯的附件,這里為緩沖區  
        ByteBuffer buf = (ByteBuffer) key.attachment();  
        long bytesRead = clntChan.read(buf);  
        //如果read()方法返回-1,說明客戶端關閉了連接,那么客戶端已經接收到了與自己發送字節數相等的數據,可以安全地關閉  
        if (bytesRead == -1){   
            clntChan.close();  
        }else if(bytesRead > 0){  
        //如果緩沖區總讀入了數據,則將該信道感興趣的操作設置為為可讀可寫  
        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);  
        }  
    }  
      
    //客戶端信道已經准備好了將數據從緩沖區寫入信道  
    public void handleWrite(SelectionKey key) throws IOException {  
    //獲取與該信道關聯的緩沖區,里面有之前讀取到的數據  
    ByteBuffer buf = (ByteBuffer) key.attachment();  
    //重置緩沖區,准備將數據寫入信道  
    buf.flip();   
    SocketChannel clntChan = (SocketChannel) key.channel();  
    //將數據寫入到信道中  
    clntChan.write(buf);  
    if (!buf.hasRemaining()){   
    //如果緩沖區中的數據已經全部寫入了信道,則將該信道感興趣的操作設置為可讀  
      key.interestOps(SelectionKey.OP_READ);  
    }  
    //為讀入更多的數據騰出空間  
    buf.compact();   
  }  
  
}  

執行結果如下:

      
   說明:以上的服務端程序,select()方法第一次能選擇出來的准備好的信道都是服務端信道,其關聯鍵值的屬性都為OP_ACCEPT,亦及有效操作都為accept,在執行handleAccept方法時,為取得連接的客戶端信道也進行了注冊,屬性為OP_READ,這樣下次輪詢調用select()方法時,便會檢查到對read操作感興趣的客戶端信道(當然也有可能有關聯accept操作興趣集的信道),從而調用handleRead方法,在該方法中又注冊了OP_WRITE屬性,那么第三次調用select()方法時,便會檢測到對write操作感興趣的客戶端信道(當然也有可能有關聯read操作興趣集的信道),從而調用handleWrite方法。
    
   結果:從結果中很明顯地可以看出,服務器端在等待信道准備好的時候,線程沒有阻塞,而是可以執行其他任務,這里只是簡單的打印".",客戶端在等待連接和等待數據讀寫完成的時候,線程沒有阻塞,也可以執行其他任務,這里也正是簡單的打印"."。
 

   幾個需要注意的地方

    1、對於非阻塞SocketChannel來說,一旦已經調用connect()方法發起連接,底層套接字可能既不是已經連接,也不是沒有連接,而是正在連接。由於底層協議的工作機制,套接字可能會在這個狀態一直保持下去,這時候就需要循環地調用finishConnect()方法來檢查是否完成連接,在等待連接的同時,線程也可以做其他事情,這便實現了線程的異步操作。
 
    2、write()方法的非阻塞調用哦只會寫出其能夠發送的數據,而不會阻塞等待所有數據,而后一起發送,因此在調用write()方法將數據寫入信道時,一般要用到while循環,如:
while(buf.hasRemaining())
    channel.write(buf);
 
    3、任何對key(信道)所關聯的興趣操作集的改變,都只在下次調用了select()方法后才會生效。
 
    4、selectedKeys()方法返回的鍵集是可修改的,實際上在兩次調用select()方法之間,都必須手動將其清空,否則,它就會在下次調用select()方法時仍然保留在集合中,而且可能會有無用的操作來調用它,換句話說,select()方法只會在已有的所選鍵集上添加鍵,它們不會創建新的建集。
 
    5、對於ServerSocketChannel來說,accept是唯一的有效操作,而對於SocketChannel來說,有效操作包括讀、寫和連接,另外,對於DatagramChannle,只有讀寫操作是有效的。
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM