例子中有些寫法參考自Netty4源碼,建議在實際運用中采用Netty,而非原生的Java NIO(小心epoll空轉)。
1. 服務器端
public class NioServer { static SelectorProvider provider = SelectorProvider.provider(); static Selector selector = null; static ServerSocketChannel server = null; private static void accept() throws IOException { SocketChannel channel = null; try { channel = server.accept(); // 接受連接 channel.configureBlocking(false); // 非阻塞模式 channel.register(selector, SelectionKey.OP_READ, null); // 監聽讀就緒 } catch (IOException e) { if (channel != null) channel.close(); } } private static int read(SocketChannel channel) throws IOException { try { ByteBuffer buffer = ByteBuffer.allocate(1024); // 分配HeapByteBuffer int len = channel.read(buffer); // 直到沒有數據 || buffer滿 if (len > 0) System.out.println(new String(buffer.array(), 0, len, Charset.forName("UTF-8"))); // buffer.array():取HeapByteBuffer中的原始byte[] return len; } catch (IOException e) { if (channel != null) channel.close(); return -1; } } private static void write(SocketChannel channel, String msg) throws IOException { try { byte[] bytes = msg.getBytes(Charset.forName("UTF-8")); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); // 分配HeapByteBuffer buffer.put(bytes); buffer.flip(); // 切換為讀模式 channel.write(buffer); } catch (IOException e) { if (channel != null) channel.close(); } } public static void main(String[] args) throws IOException { try { selector = provider.openSelector(); server = provider.openServerSocketChannel(); server.configureBlocking(false); // 非阻塞模式 SelectionKey key = server.register(selector, 0, null); // 注冊 if (server.bind(new InetSocketAddress(8888)).socket().isBound()) // 綁定成功 key.interestOps(SelectionKey.OP_ACCEPT); // 監聽連接請求 while (true) { selector.select(); // 監聽就緒事件 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { key = it.next(); it.remove(); // 從已選擇鍵集中移除key if (key.isAcceptable()) { // 連接請求到來 System.out.println("accept..."); accept(); } else { SocketChannel channel = (SocketChannel) key.channel(); if (key.isWritable()) { // 寫就緒 System.out.println("write..."); write(channel, "Hello NioClient!"); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); // 取消寫就緒,否則會一直觸發寫就緒(寫就緒為代碼觸發) key.channel().close(); // 關閉channel(key將失效) } if (key.isValid() && key.isReadable()) { // key有效(避免在寫就緒時關閉了channel或者取消了key) && 讀就緒 System.out.println("read..."); int len = read(channel); if (len >= 0) key.interestOps(key.interestOps() | SelectionKey.OP_WRITE); // 寫就緒,准備寫數據 else if (len < 0) // 客戶端已關閉socket channel.close(); // 關閉channel(key將失效) } } } } } finally { if (server != null) server.close(); if (selector != null) selector.close(); } } }
2. 客戶端
public class NioClient { static SelectorProvider provider = SelectorProvider.provider(); static Selector selector = null; static SocketChannel client = null; static boolean close = false; private static void write(String msg) throws IOException { byte[] bytes = msg.getBytes(Charset.forName("UTF-8")); ByteBuffer buffer = ByteBuffer.allocate(bytes.length); // 建立HeapByteBuffer(DirectByteBuffer以后有機會再討論) buffer.put(bytes); buffer.flip(); // 切換為讀模式 client.write(buffer); } private static int read() throws IOException { ByteBuffer buffer = ByteBuffer.allocate(1024); // 分配HeapByteBuffer int len = client.read(buffer); // 直到沒有數據 || buffer滿 if (len > 0) System.out.println(new String(buffer.array(), 0, len, Charset.forName("UTF-8"))); // buffer.array():取HeapByteBuffer中的原始byte[] return len; } public static void main(String[] args) throws IOException { try { selector = provider.openSelector(); client = provider.openSocketChannel(); client.configureBlocking(false); // 非阻塞模式 SelectionKey key = client.register(selector, 0, null); // 注冊 if (client.connect(new InetSocketAddress("127.0.0.1", 8888))) { // 連接成功(很難) System.out.println("connected..."); key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 監聽讀就緒和寫就緒(准備寫數據) } else // 連接失敗(正常情況下) key.interestOps(SelectionKey.OP_CONNECT); // 監聽連接就緒 while (!close) { selector.select(); // 監聽就緒事件 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); while (it.hasNext()) { key = it.next(); it.remove(); // 從已選擇鍵集移除key if (key.isConnectable()) { // 連接就緒 client.finishConnect(); // 完成連接 System.out.println("connected..."); key.interestOps(key.interestOps() & ~SelectionKey.OP_CONNECT); // 取消監聽連接就緒(否則selector會不斷提醒連接就緒) key.interestOps(key.interestOps() | SelectionKey.OP_READ | SelectionKey.OP_WRITE); // 監聽讀就緒和寫就緒 } else { if (key.isWritable()) { // 寫就緒 System.out.println("write..."); write("Hello NioServer!"); key.interestOps(key.interestOps() & ~SelectionKey.OP_WRITE); // 取消寫就緒,否則會一直觸發寫就緒(寫就緒為代碼觸發) } if (key.isValid() && key.isReadable()) { // key有效(避免在寫就緒時關閉了channel或者取消了key) && 讀就緒 System.out.println("read..."); if (read() < 0) // 服務器已關閉socket close = true; // 退出循環 } } } } } finally { if (client != null) client.close(); if (selector != null) selector.close(); } } }