服務端:
Loader.java
package net.chatroom.server; public class Loader { public static void main(String[] args) { Deamon deamon = new Deamon(9999); new Thread(deamon).start(); } }
Util.java
package net.chatroom.server; import java.nio.charset.Charset; import java.util.HashSet; public class Util { public static Charset charset = Charset.forName("UTF-8"); // 相當於自定義協議格式,與客戶端協商好 public static String USER_CONTENT_SPILIT = "#@#"; // 用來記錄在線人數,以及昵稱 public static HashSet<String> users = new HashSet<String>(); public static String USER_EXIST = "system message: user exist, please change a name"; }
Deamon.java
package net.chatroom.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.ArrayList; import java.util.HashSet; import java.util.Iterator; import java.util.List; public class Deamon implements Runnable { private boolean flag = true; private ServerSocketChannel serverChannel = null; private Selector selector = null; /** * 記錄進來的所有的客戶端連接 * */ private List<SocketChannel> clientChannels = null; public void setFlag(boolean flag) { this.flag = flag; } public Deamon(int port) { try { serverChannel = ServerSocketChannel.open(); serverChannel.socket().bind(new InetSocketAddress(port)); selector = Selector.open(); serverChannel.configureBlocking(false); serverChannel.register(selector, SelectionKey.OP_ACCEPT); this.clientChannels = new ArrayList<SocketChannel>(); System.out.println("Server is listening now..."); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void run() { // System.out.println("server listening.."); while (this.flag) { int num = 0; try { //此處select()阻塞了線程 num = selector.select(); } catch (IOException e) { System.out.println("Error while select channel:" + e); } if (num > 0) { Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { SelectionKey key = it.next(); it.remove(); if (key.isAcceptable()) { // 監聽到有新的連接則再注冊讀操作 this.clientChannels.add(Dealer.accept(selector, serverChannel)); } else if (key.isReadable()) { // 監聽到讀操作 try { Dealer.read(selector, key, clientChannels); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } } } try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } System.out.println("server to close.."); if (this.serverChannel != null && this.serverChannel.isOpen()) { try { this.serverChannel.close(); } catch (IOException e) { e.printStackTrace(); } } if (this.selector != null && this.selector.isOpen()) { try { this.selector.close(); } catch (IOException e) { e.printStackTrace(); } } } }
Dealer.java
package net.chatroom.server; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.rmi.server.Skeleton; import java.util.List; import java.util.Scanner; public class Dealer { public static SocketChannel accept(Selector selector, ServerSocketChannel serverChannel) { SocketChannel channel = null; try { channel = serverChannel.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); channel.write(Util.charset.encode("Please input your name.")); } catch (Exception e) { System.out.println("Error while configure socket channel :" + e); if (channel != null) { try { channel.close(); } catch (IOException e1) { e1.printStackTrace(); } } } return channel; } public static void read(Selector selector, SelectionKey selectionkey, List<SocketChannel> clientChannels) throws IOException { SocketChannel socketClientChannel = (SocketChannel) selectionkey .channel(); ByteBuffer buffer = ByteBuffer.allocateDirect(6 * 1024); StringBuilder content = new StringBuilder(); int num = 0; try { // 將客戶端發上來的消息讀到buffer //循環將通道中數據讀入buffer while (socketClientChannel.read(buffer) > 0) { buffer.flip();// 切換成讀 content.append(Util.charset.decode(buffer)); } System.out.println("num:" + num); System.out.println("Server is listening from client :" + socketClientChannel.getRemoteAddress() + " data rev is: " + content); } catch (IOException e) { /** * 如果出現異常,則需要關閉連接。故把num設置為-1,用下面的關閉邏輯來關閉channel */ num = -1; } if (num >= 0) { if (content.length() > 0) { String[] arrayContent = content.toString().split( Util.USER_CONTENT_SPILIT); // 注冊用戶 if (arrayContent != null && arrayContent.length == 1) { String name = arrayContent[0]; if (Util.users.contains(name)) { socketClientChannel.write(Util.charset .encode(Util.USER_EXIST)); } else { Util.users.add(name); int onlineNum = clientChannels.size(); String message = "welcome " + name + " to chat room! Online numbers:" + onlineNum; BroadCast2(clientChannels, null, message); } } // 注冊完了,發送消息 else if (arrayContent != null && arrayContent.length > 1) { String name = arrayContent[0]; String message = content.substring(name.length() + Util.USER_CONTENT_SPILIT.length()); message = name + " say: " + message; if (Util.users.contains(name)) { // 不回發給發送此內容的客戶端 BroadCast2(clientChannels, socketClientChannel, message); } } // /** // * 把讀到的數據原封不動的下發給客戶端 // */ // int length = clientChannels.size(); // for (int index = 0; index < length; index++) { // // 循環所有的客戶端連接,下發數據 // buffer.flip(); // try { // // 將buffer里的數據再下發給客戶端的通道 // clientChannels.get(index).write(buffer); // } catch (IOException e) { // e.printStackTrace(); // } // } } } else { /** * 如果未讀到數據,對方關閉了SocketChannel 所以服務器這邊也要關閉 */ try { socketClientChannel.close(); int length = clientChannels.size(); for (int index = 0; index < length; index++) { if (clientChannels.get(index).equals(socketClientChannel)) { // 移除當前接受的通道 clientChannels.remove(index); break; } } } catch (IOException e) { e.printStackTrace(); } } } // TODO 要是能檢測下線,就不用這么統計了 public static int OnlineNum(Selector selector) { int res = 0; for (SelectionKey key : selector.keys()) { Channel targetchannel = key.channel(); if (targetchannel instanceof SocketChannel) { res++; } } return res; } public void BroadCast(Selector selector, SocketChannel except, String content) throws IOException { // 廣播數據到所有的SocketChannel中 for (SelectionKey key : selector.keys()) { Channel targetchannel = key.channel(); // 如果except不為空,不回發給發送此內容的客戶端 if (targetchannel instanceof SocketChannel && targetchannel != except) { SocketChannel dest = (SocketChannel) targetchannel; dest.write(Util.charset.encode(content)); } } } public static void BroadCast2(List<SocketChannel> socketChannels, SocketChannel except, String content) throws IOException { for (SocketChannel socketChannel : socketChannels) { if (!socketChannel.equals(except)) { // 除了自己,其它通道都通知 socketChannel.write(Util.charset.encode(content)); } } } }
客戶端:
Loader.java
package net.chatroom.client; import java.util.Scanner; import net.chatroom.server.Util; public class Loader { public static void main(String[] args) { String name = ""; Deamon deamon = new Deamon("127.0.0.1", 9999); new Thread(deamon).start(); // 在主線程中 從鍵盤讀取數據輸入到服務器端 Scanner scan = new Scanner(System.in); while (scan.hasNextLine()) { String line = scan.nextLine(); if ("".equals(line)) continue; // 不允許發空消息 if ("".equals(name)) { name = line; line = name + Util.USER_CONTENT_SPILIT; } else { line = name + Util.USER_CONTENT_SPILIT + line; } deamon.chancelToWrite(Util.charset.encode(line));// sc既能寫也能讀,這邊是寫 } } }
Deamon.java
package net.chatroom.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.Buffer; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import net.chatroom.server.Util; public class Deamon implements Runnable { /** * 選擇器,用於監聽注冊在上面的SocketChannel的狀態 */ private Selector selector = null; /** * SocketChannel 用戶發送和接受數據的信道 */ private SocketChannel channel = null; /** * 運行標識。在線程里此標識為false的時候會推出線程 * 該屬性在ExitCommandListener里通過調用setFlag方法修改,用於通知線程用戶要求退出的程序 */ private boolean flag = true; public void setFlag(boolean flag) { this.flag = flag; } public Deamon(String address, int port) { try { channel = SocketChannel.open(new InetSocketAddress(address, port)); channel.configureBlocking(false); selector = Selector.open(); // 客戶端直接注冊讀和寫操作 channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); } catch (IOException e) { e.printStackTrace(); } } public void chancelToWrite(ByteBuffer buffer){ try { channel.write(buffer); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } @Override public void run() { System.out.println("client run.."); while (this.flag) { /** * 如果可以繼續執行,則在循環體內循環執行監聽選擇操作 */ int num = 0; try { /** * 得到處於可讀或者可寫狀態的SocketChannel對象的個數 */ // 客戶端的select()並不阻塞線程,是因為客戶端一啟動就是SelectionKey.OP_WRITE狀態 // System.out.println("client select.."); num = this.selector.select(); // System.out.println("client num:"+num); } catch (IOException e) { /** * 如果出現異常,則此處應該加上日志打印,然后跳出循環,執行循環體下面的釋放資源操作 */ break; } if (num > 0) { /** * 如果有多個SocketChannel處於可讀或者可寫狀態,則輪詢注冊在Selector上面的SelectionKey */ Iterator<SelectionKey> keys = selector.selectedKeys() .iterator(); while (keys.hasNext()) { SelectionKey key = keys.next(); /** * 此步操作用於刪除該SelectionKey的被選中狀態 */ keys.remove(); if (key.isReadable()) { System.out.println("client isReadable.."); /** * 如果是讀操作,則調用讀操作的處理邏輯 */ try { Dealer.read((SocketChannel) key.channel()); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } else if (key.isWritable()) { //客戶端的寫狀態是一直就緒的 // System.out.println("client isWritable.."); /** * 如果是寫操作,則調用寫操作的處理邏輯 */ // Dealer.write((SocketChannel) key.channel()); } } } /*取消關注,多用在多線程的時候 * key.interestOps(key.interestOps() & (~SelectionKey.OP_READ)); * * 增加關注 * key.interestOps(key.interestOps() | SelectionKey.OP_READ); * */ try { Thread.sleep(500); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } if (this.channel != null && this.channel.isOpen()) { /** * 關閉SocketChannel */ try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } if (this.selector != null && this.selector.isOpen()) { /** * 關閉Selector選擇器 */ try { this.selector.close(); } catch (IOException e) { e.printStackTrace(); } } } }
Dealer.java
package net.chatroom.client; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import net.chatroom.server.Util; public class Dealer { /** * 從SocketChannel中讀取信息 * * @param channel * @throws IOException */ public static void read(SocketChannel channel) throws IOException { /** * 初始化緩沖區 */ ByteBuffer buffer = ByteBuffer.allocateDirect(6 * 1024); /** * 讀到的字節數 */ int num = 0; String content = ""; while ((num = channel.read(buffer)) > 0) { buffer.flip(); content += Util.charset.decode(buffer); } //若系統發送通知名字已經存在,則需要換個昵稱 if(Util.USER_EXIST.equals(content)) { // name = ""; System.out.println("name has exists."); } System.out.println(content); } /** * 想SocketChannel中寫入數據 * * @param channel */ public static void write(SocketChannel channel) { // /** // * 從消息隊列中獲取要發送的消息 // */ // String msg = MsgQueue.getInstance().get(); // if (msg == null) { // /** // * 如果消息隊列中沒有要發送的消息,則返回。 // */ // return; // } // /** // * 初始化緩沖區 // */ // ByteBuffer buffer = ByteBuffer.allocateDirect(6 * 1024); // // /** // * 把消息放到緩沖區中 // */ // buffer.put(msg.getBytes()); // // /** // * 重置緩沖區指針 // */ // buffer.flip(); // try { // /** // * 把緩沖區中的數據寫到SocketChannel里 // */ // channel.write(buffer); // } catch (IOException e) { // e.printStackTrace(); // } } }