Java NIO 讀數據處理過程


這兩天仿hadoop 寫java RPC框架,使用PB作為序列號工具,在寫讀數據的時候遇到一個小坑。之前寫過NIO代碼,恰好是錯誤的代碼產生正確的邏輯,誤以為自己寫對了。現在簡單整理一下。

 

使用NIO,select()到讀事件時,要處理4種情況:

1. channel還有數據,繼續讀。

2. channel中暫時沒數據,但channel還沒斷開,這是讀取到的數據個數為0,結束讀,繼續到select()處阻塞等待數據。

3. 另一端channel.close()關閉連接,這時候讀channel返回的讀取數是-1,表示已經到末尾,跟讀文件到末尾時是一樣的。既然已經結束了,就把對應的SelectionKey給cancel掉,表示selector不再監聽這個channel上的讀事件。並且關閉連接,本端channel.close()。

4. 另一端被強制關閉,也就是channel沒有close()就被強制斷開了,這時候本端會拋出一個IOException異常,要處理這個異常。

 

之前對 另一端channel.close()關閉連接 沒有細究,不清楚 讀channel返回的讀取數-1 是什么意思。然后沒有cancel對應的SelectionKey,也沒關閉連接,結果就是selector.select()一直返回讀事件,但是沒有數據。

 

直接貼服務器和客戶端代碼:

Server:

package socket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer2 {

    private void startServer() throws IOException {
        Selector selector = Selector.open();
        
        {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            ServerSocket ss = ssc.socket();
            InetSocketAddress address = new InetSocketAddress(9000);
            ss.bind(address);
            
            System.out.println("ssc 0 : " + ssc);
            System.out.println("ss 0 : " + ss);
            
            SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("acceptKey: " + acceptKey);
            printKeyInfo(acceptKey);
            System.out.println("Going to listen on 9000");
        }
        
        while (true) {
            System.out.println("===================================\nstart select...");
            int num = selector.select();
            System.out.println("NIOServer: Number of keys after select operation: " + num);
            
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> it = selectionKeys.iterator();
            
            while (it.hasNext()) {
                SelectionKey key = it.next();
                System.out.println("key: " + key);
                printKeyInfo(key);
                
                it.remove();
                
                if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) {
                    System.out.println("select ACCEPT");
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    
                    System.out.println("ssc 1 : " + ssc);
                    System.out.println("sc 1 : " + sc);
                    
                    SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ);
                    System.out.println("new key:" + newKey);
                    printKeyInfo(newKey);
                }
                else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) {
//                    System.out.println("select READ");
//                    System.out.print("before cancel:");printKeyInfo(key);
//                    key.cancel();
//                    System.out.println("after cancel:");printKeyInfo(key);
                    SocketChannel sc = (SocketChannel) key.channel();
                    System.out.println("sc 2 : " + sc);
                    
                    //echo data
                    //下面的處理是正確的,count<0則cancel key。count=0則進入下一輪select()阻塞等待數據。
//                    try {
//                        int count = doRead(key);
//                        if (count < 0) {
//                            key.cancel();
//                            System.out.println("cancel key for < 0");
//                            sc.read(ByteBuffer.allocate(2));
//                        }
//                    } catch(IOException e) {
//                        e.printStackTrace();
//                        key.cancel();
//                        System.out.println("cancel key");
//                    }
                    
                    //下面的處理過程是錯誤的,偶然情況下會出現正確邏輯。在客戶端連續寫,寫完馬上關閉連接,這時下面代碼能打印出客戶端的輸出,
                    //客戶端關閉連接,下面的代碼馬上爆出異常,是這行代碼。java.io.IOException: 您的主機中的軟件中止了一個已建立的連接。
//                    int nbytes = 0;
//                    ByteBuffer echoBuffer = ByteBuffer.allocate(16);
//                    while (true) {
//                        echoBuffer.clear();
//                        int r = sc.read(echoBuffer);
//                        System.out.println(new String(echoBuffer.array()));
//                        if (r <= 0) break;
//                        echoBuffer.flip();
//                        sc.write(echoBuffer);
//                        nbytes += r;
//                    }
//                    System.out.println("echoed " + nbytes + " from " + sc);
                    
                    //下面的是處理過程是正確的。正確的做法就是對讀取到n,0,-1分別處理,還要對客戶端強制關閉的異常做處理
                    while (true) {
                        ByteBuffer buffer = ByteBuffer.allocate(2);
                        buffer.clear();
                        int r;
                        try {
                            r = sc.read(buffer);
                            System.out.println("r = " + r);
                            System.out.println(new String(buffer.array()));
                            if (r < 0) {
                                //客戶端socket.close()會到這里,讀取數r=-1
                                key.cancel();
                                System.out.println("cancel key for < 0");
                                break;
                            } else if (r == 0) {
                                //客戶端socket沒有關閉,而channel沒有數據,數據數r=0。
                                //有時候select()返回了,但channel不一定有數據。可能select()是被其他方法喚醒
                                break;
                            }
                        } catch (IOException e) {
                            //客戶端強制關閉會來這里報異常
                            e.printStackTrace();
                            key.cancel();
                            System.out.println("cancel key for Exception");
                            break;
                        }
                    }//while
                }// if ... else if
//                try {
//                    Thread.sleep(500);
//                } catch (InterruptedException e) {
//                    e.printStackTrace();
//                }
            }//while
        }//while
    }
    
    private int doRead(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        while (true) {
            int count = -1;
            ByteBuffer buffer = ByteBuffer.allocate(2);
            if (buffer.remaining() > 0) {
                count = channel.read(buffer);
                System.out.println("count = " + count);
                if (count <= 0) return count;
            }
        }
    }
    
    private static void printKeyInfo(SelectionKey sk) {
        String s = new String();

        s = "Att: " + (sk.attachment() == null ? "no" : "yes");
        s += ", Read: " + sk.isReadable();
        s += ", Acpt: " + sk.isAcceptable();
        s += ", Cnct: " + sk.isConnectable();
        s += ", Wrt: " + sk.isWritable();
        s += ", Valid: " + sk.isValid();
        s += ", interestOps: " + sk.interestOps();
        s += ", readyOps: " + sk.readyOps();
        System.out.println(s);
    }
    
    public static void main(String[] args) {
        try {
            new NIOServer2().startServer();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 

 

Client:

package socket;

import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.net.UnknownHostException;

public class SocketClient {

    public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException {
        Socket socket = new Socket("localhost", 9000);
        DataOutputStream out = new DataOutputStream(socket.getOutputStream());
        byte[] bytes = "fdfd".getBytes();
//        System.out.println("send fdfd");
        out.write(bytes);
        out.flush();
        
//        Thread.sleep(15*1000);
        
//        System.out.println("send loll");
        out.write("loull".getBytes());
        out.flush();
        
//        Thread.sleep(1*1000);
        socket.close();
        System.out.println("client socket close");
    }
}

 

 

浪費了一些時間,一方面因為自己對網絡編程不夠熟悉,比如不清楚-1什么意思。另一方面Java NIO的API還是略顯難用。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM