這兩天仿hadoop 寫java RPC框架,使用PB作為序列號工具,在寫讀數據的時候遇到一個小坑。之前寫過NIO代碼,恰好是錯誤的代碼產生正確的邏輯,誤以為自己寫對了。現在簡單整理一下。
使用NIO,select()到讀事件時,要處理4種情況:
1. channel還有數據,繼續讀。
2. channel中暫時沒數據,但channel還沒斷開,這是讀取到的數據個數為0,結束讀,繼續到select()處阻塞等待數據。
3. 另一端channel.close()關閉連接,這時候讀channel返回的讀取數是-1,表示已經到末尾,跟讀文件到末尾時是一樣的。既然已經結束了,就把對應的SelectionKey給cancel掉,表示selector不再監聽這個channel上的讀事件。並且關閉連接,本端channel.close()。
4. 另一端被強制關閉,也就是channel沒有close()就被強制斷開了,這時候本端會拋出一個IOException異常,要處理這個異常。
之前對 另一端channel.close()關閉連接 沒有細究,不清楚 讀channel返回的讀取數-1 是什么意思。然后沒有cancel對應的SelectionKey,也沒關閉連接,結果就是selector.select()一直返回讀事件,但是沒有數據。
直接貼服務器和客戶端代碼:
Server:
package socket; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; public class NIOServer2 { private void startServer() throws IOException { Selector selector = Selector.open(); { ServerSocketChannel ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ServerSocket ss = ssc.socket(); InetSocketAddress address = new InetSocketAddress(9000); ss.bind(address); System.out.println("ssc 0 : " + ssc); System.out.println("ss 0 : " + ss); SelectionKey acceptKey = ssc.register(selector, SelectionKey.OP_ACCEPT); System.out.println("acceptKey: " + acceptKey); printKeyInfo(acceptKey); System.out.println("Going to listen on 9000"); } while (true) { System.out.println("===================================\nstart select..."); int num = selector.select(); System.out.println("NIOServer: Number of keys after select operation: " + num); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); System.out.println("key: " + key); printKeyInfo(key); it.remove(); if ((key.readyOps() & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT) { System.out.println("select ACCEPT"); ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); System.out.println("ssc 1 : " + ssc); System.out.println("sc 1 : " + sc); SelectionKey newKey = sc.register(selector, SelectionKey.OP_READ); System.out.println("new key:" + newKey); printKeyInfo(newKey); } else if ((key.readyOps() & SelectionKey.OP_READ) == SelectionKey.OP_READ) { // System.out.println("select READ"); // System.out.print("before cancel:");printKeyInfo(key); // key.cancel(); // System.out.println("after cancel:");printKeyInfo(key); SocketChannel sc = (SocketChannel) key.channel(); System.out.println("sc 2 : " + sc); //echo data //下面的處理是正確的,count<0則cancel key。count=0則進入下一輪select()阻塞等待數據。 // try { // int count = doRead(key); // if (count < 0) { // key.cancel(); // System.out.println("cancel key for < 0"); // sc.read(ByteBuffer.allocate(2)); // } // } catch(IOException e) { // e.printStackTrace(); // key.cancel(); // System.out.println("cancel key"); // } //下面的處理過程是錯誤的,偶然情況下會出現正確邏輯。在客戶端連續寫,寫完馬上關閉連接,這時下面代碼能打印出客戶端的輸出, //客戶端關閉連接,下面的代碼馬上爆出異常,是這行代碼。java.io.IOException: 您的主機中的軟件中止了一個已建立的連接。 // int nbytes = 0; // ByteBuffer echoBuffer = ByteBuffer.allocate(16); // while (true) { // echoBuffer.clear(); // int r = sc.read(echoBuffer); // System.out.println(new String(echoBuffer.array())); // if (r <= 0) break; // echoBuffer.flip(); // sc.write(echoBuffer); // nbytes += r; // } // System.out.println("echoed " + nbytes + " from " + sc); //下面的是處理過程是正確的。正確的做法就是對讀取到n,0,-1分別處理,還要對客戶端強制關閉的異常做處理 while (true) { ByteBuffer buffer = ByteBuffer.allocate(2); buffer.clear(); int r; try { r = sc.read(buffer); System.out.println("r = " + r); System.out.println(new String(buffer.array())); if (r < 0) { //客戶端socket.close()會到這里,讀取數r=-1 key.cancel(); System.out.println("cancel key for < 0"); break; } else if (r == 0) { //客戶端socket沒有關閉,而channel沒有數據,數據數r=0。 //有時候select()返回了,但channel不一定有數據。可能select()是被其他方法喚醒 break; } } catch (IOException e) { //客戶端強制關閉會來這里報異常 e.printStackTrace(); key.cancel(); System.out.println("cancel key for Exception"); break; } }//while }// if ... else if // try { // Thread.sleep(500); // } catch (InterruptedException e) { // e.printStackTrace(); // } }//while }//while } private int doRead(SelectionKey key) throws IOException { SocketChannel channel = (SocketChannel) key.channel(); while (true) { int count = -1; ByteBuffer buffer = ByteBuffer.allocate(2); if (buffer.remaining() > 0) { count = channel.read(buffer); System.out.println("count = " + count); if (count <= 0) return count; } } } private static void printKeyInfo(SelectionKey sk) { String s = new String(); s = "Att: " + (sk.attachment() == null ? "no" : "yes"); s += ", Read: " + sk.isReadable(); s += ", Acpt: " + sk.isAcceptable(); s += ", Cnct: " + sk.isConnectable(); s += ", Wrt: " + sk.isWritable(); s += ", Valid: " + sk.isValid(); s += ", interestOps: " + sk.interestOps(); s += ", readyOps: " + sk.readyOps(); System.out.println(s); } public static void main(String[] args) { try { new NIOServer2().startServer(); } catch (IOException e) { e.printStackTrace(); } } }
Client:
package socket; import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; import java.net.UnknownHostException; public class SocketClient { public static void main(String[] args) throws UnknownHostException, IOException, InterruptedException { Socket socket = new Socket("localhost", 9000); DataOutputStream out = new DataOutputStream(socket.getOutputStream()); byte[] bytes = "fdfd".getBytes(); // System.out.println("send fdfd"); out.write(bytes); out.flush(); // Thread.sleep(15*1000); // System.out.println("send loll"); out.write("loull".getBytes()); out.flush(); // Thread.sleep(1*1000); socket.close(); System.out.println("client socket close"); } }
浪費了一些時間,一方面因為自己對網絡編程不夠熟悉,比如不清楚-1什么意思。另一方面Java NIO的API還是略顯難用。