BIO
BIO(Blocking I/O),同步阻塞,實現模式為一個連接一個線程,即當有客戶端連接時,服務器端需為其單獨分配一個線程,如果該連接不做任何操作就會造成不必要的線程開銷。BIO是傳統的Java io編程,其相關的類和接口在java.io 包下。
BIO適用於連接數目較小且固定的架構,對服務器資源的要求較高,是JDK1.4以前的唯一選擇,但程序簡單易理解。
BIO編程流程
-
服務器端啟動一個SeverSocket
-
客戶端啟動Socket對服務器端發起通信,默認情況下服務器端需為每個客戶端創建一個線程與之通訊
-
客戶端發起請求后,先咨詢服務器端是否有線程響應,如果沒有則會等待或被拒絕
-
如果有線程響應,客戶端線程會等待請求結束后,再繼續執行
簡單代碼實現
//BIO-服務器端 public class BIOSever { public static void main(String[] args) throws IOException { //在BIO中,可以使用線程池進行優化 ExecutorService cachedThreadPool = Executors.newCachedThreadPool(); ServerSocket serverSocket = new ServerSocket(6666); System.out.println("服務器已啟動"); while (true){ System.out.println("等待客戶端連接.....(阻塞中)"); Socket socket = serverSocket.accept(); System.out.println("客戶端連接"); cachedThreadPool.execute(new Runnable() { public void run() { handler(socket); } }); } } //從客服端socket讀取數據 public static void handler(Socket socket){ try{ InputStream inputStream = socket.getInputStream(); byte[] b = new byte[1024]; while (true){ System.out.println("等待客戶端輸入.....(阻塞中)"); int read = inputStream.read(b); if (read != -1){ System.out.println(new String(b, 0, read)); }else { break; } } inputStream.close(); }catch (Exception e){ e.printStackTrace(); }finally { try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } }
//BIO-客戶端 public class BIOClient { public static void main(String[] args) throws IOException { Socket socket = new Socket("localhost", 6666); OutputStream outputStream = socket.getOutputStream(); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String message = scanner.nextLine(); if ("exit".equals(message)) { break; } outputStream.write(message.getBytes()); } outputStream.close(); socket.close(); } }
BIO問題分析
從上面代碼中可以看出BIO編程的兩個問題:
-
服務器端在監聽客戶端連接時(serverSocket.accept()),服務器端處於阻塞狀態,不能處理其他事務
-
服務器端需要為每個客戶端建立一個線程,雖然可以用線程池來優化,但在並發較大時,線程開銷依舊很大
-
當連接的客戶端沒有發送數據時,服務器端會阻塞在read操作上,等待客戶端輸入,造成線程資源浪費
NIO
從JDK1.4開始,java提供了一系列改進輸入/輸出的新特性,統稱為NIO,全稱n為new I/O,是同步非阻塞的,所以也有人稱為non-blocking I/O。NIO的相關類都放在java.nio包或其子包下,並對原先java.io包中許多類進行了改寫。
NIO的三大核心
緩沖區(Buffer)
NIO是面向緩沖區, 或者說是面向塊編程的。在NIO的IO傳輸中,數據會先讀入到緩沖區,當需要時再從緩沖區寫出,這樣減少了直接讀寫磁盤的次數,提高了IO傳輸的效率。
緩沖區(buffer)本質上是一個可以讀寫數據的內存塊,即在內存空間中預留了一定的存儲空間,這些存儲空間用來緩沖輸入和輸出的數據,這部分預留的存儲空間就叫緩沖區。
在NIO程序中,通道channel雖然負責數據的傳輸,但是輸入和輸出的數據都必須經過緩沖區buffer。
在java中,緩沖區的相關類都在java.nio包下,其最頂層的類是 Buffer,它是一個抽象類。
Buffer類的4個重要屬性:
-
mark:標記
-
position:位置,下一個要被讀或寫的元素的索引,每次讀寫緩沖區都會改變該值,為下次讀寫做准備
-
limit:表示緩沖區的終點,不能對緩沖區中超過極限的位置進行讀寫操作,且極限是可修改的
-
capacity:容量,即緩沖區的最多可容納的數據量,該值在創建緩沖區時被設立,且不可修改
Buffer類常用方法:
Buffer的常用子類(它們之間最大區別在於底層實現數組的數據類型):
-
ByteBuffer:存儲字節數據到緩沖區
-
CharBuffer:存儲字符數據到緩沖區
-
IntBuffer:存儲整型數據到緩沖區
-
ShortBuffer:存儲短整型數據到緩沖區
-
LongBuffer:存儲長整型數據到緩沖區
-
FloatBuffer:存儲浮點型數據到緩沖區
-
DoubleBuffer:存儲雙精度浮點型數據到緩沖區
ByteBuffer
在Buffer的所有子類中,最常用的還是ByteBuffer,它的常用方法:
通道(Channel)
在NIO程序中服務器端和客戶端之間的數據讀寫不是通過流,而是通過通道來讀寫的。
通道類似於流,都是用來讀寫數據的,但它們之間也是有區別的:
-
通道是雙向的,即可以讀也可以寫,而流是單向的,只能讀或寫
-
通道可以實現異步讀寫數據
-
通道可以從緩沖區讀數據,也可以把數據寫入緩沖區
java中channel的相關類在java.nio.channel包下。Channel是一個接口,其常用的實現類有:
-
FileChannel:用於文件的數據讀寫,其真正的實現類為FileChannelImpl
-
DatagramChannel:用於UDP的數據讀寫,其真正的實現類為DatagramChannelImpl
-
ServerSocketChannel:用於監聽TCP連接,每當有客戶端連接時都會創建一個SocketChannel,功能類似ServerSocket,其真正的實現類為ServerSocketChannelImpl
-
SocketChannel:用於TCP的數據讀寫,功能類似節點流+Socket,其真正的實現類為SocketChannelImpl
FileChannel
FileChannel主要用於對本地文件進行IO操作,如文件復制等。它的常用方法有:
在文件傳輸流中有個屬性channel,它默認是空的,可以通過流中的getChanel()方法根據當前文件流的屬性生成對應的FileChannel。
public FileChannel getChannel() { synchronized (this) { if (channel == null) { channel = FileChannelImpl.open(fd, path, false, true, append, this); } return channel; } } }
public class NIOChannel { public static void main(String[] args) throws IOException { } //將數據寫入目標文件 public static void writeFile() throws IOException{ String str = "Hello, gofy"; //創建文件輸出流 FileOutputStream fileOutputStream = new FileOutputStream("f:\\file.txt"); //根據文件輸出流生成文件通道 FileChannel fileChannel = fileOutputStream.getChannel(); //創建字節緩沖區,並將字符串轉成字節存入 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.put(str.getBytes()); //注意,在存入后需要進行寫出操作時,需將緩沖區翻轉 byteBuffer.flip(); //將緩沖區數據寫入通道 fileChannel.write(byteBuffer); //將文件輸出流關閉(該方法同時會關閉通道) fileOutputStream.close(); } //從文件中讀取數據 public static void readFile() throws IOException{ //創建文件輸入流 File file = new File("f:\\file.txt"); FileInputStream fileInputStream = new FileInputStream(file); //根據文件輸入流生成文件通道 FileChannel fileChannel = fileInputStream.getChannel(); //創建字節緩沖區,大小為文件大小 ByteBuffer byteBuffer = ByteBuffer.allocate((int)file.length()); //將通道數據讀入緩沖區 fileChannel.read(byteBuffer); //同樣,在讀入后需要取出緩沖區內所有數據時,需將緩沖區翻轉 byteBuffer.flip(); System.out.println(new String(byteBuffer.array())); fileInputStream.close(); } //將文件數據傳輸到另一個文件 public static void readAndWriteFile() throws IOException{ //創建文件輸入流和文件輸出流,並生成對應的通道 FileInputStream fileInputStream = new FileInputStream("file1.txt"); FileChannel inputStreamChannel= fileInputStream.getChannel(); FileOutputStream fileOutputStream = new FileOutputStream("file2.txt"); FileChannel outputStreamChannel = fileOutputStream.getChannel(); //創建字節緩沖區 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); //進行數據讀取 while (true){ //在讀取前需清除緩沖區 byteBuffer.clear(); //將文件輸入的通道的數據讀入緩沖區 int read = inputStreamChannel.read(byteBuffer); //當read為-1時,即通道數據已讀取完畢 if (read == -1){ break; } //將緩沖區翻轉后,將緩沖區數據寫入文件輸出的通道 byteBuffer.flip(); outputStreamChannel.write(byteBuffer); } fileInputStream.close(); fileOutputStream.close(); } //文件的復制粘貼 public static void copyAndPaste() throws IOException{ //復制的文件輸入流 FileInputStream fileInputStream = new FileInputStream("f:\\a.jpg"); FileChannel srcChannel = fileInputStream.getChannel(); //粘貼的文件輸出流 FileOutputStream fileOutputStream = new FileOutputStream("f:\\b.jpg"); FileChannel targetChannel = fileOutputStream.getChannel(); //使用transferFrom進行復制粘貼 targetChannel.transferFrom(srcChannel, 0, srcChannel.size()); fileInputStream.close(); fileOutputStream.close(); } }
選擇器(Selector)
在NIO程序中,可以用選擇器Selector實現一個選擇器處理多個通道,即一個線程處理多個連接
選擇器的相關類在java.nio.channels包和其子包下,頂層類是Selector,它是一個抽象類,它的常用方法有:
通道的注冊
在ServerSocketChannel和SocketChannel類里都有一個注冊方法 register(Selector sel, int ops),sel為要注冊到的選擇器,ops為該通道監聽的操作事件的類型,可以通過該方法將ServerSocketChannel或SocketChannel注冊到目標選擇器中,該方法會返回一個SelectionKey(真正實現類為SelectionKeyImpl)儲存在注冊的Selector的publicKeys集合屬性里。SelectionKey儲存了通道的事件類型和該注冊的通道對象,可以通過SelectionKey.channel()方法獲取SelectionKey對應的通道。
每個注冊到選擇器的通道都需定義需進行的操作事件類型,通過查看SelectionKey類的屬性可以知道操作事件的類型有4種:
public static final int OP_READ = 1 << 0; //讀操作 public static final int OP_WRITE = 1 << 2; //寫操作 public static final int OP_CONNECT = 1 << 3; //連接操作 public static final int OP_ACCEPT = 1 << 4; //接收操作
選擇器的檢查
我們可以通過選擇器的檢查方法,如select()
public final boolean isReadable() { //判斷是否是讀操作 return (readyOps() & OP_READ) != 0; } public final boolean isWritable() { //判斷是否是寫操作 return (readyOps() & OP_WRITE) != 0; } public final boolean isConnectable() { //判斷是否是連接操作 return (readyOps() & OP_CONNECT) != 0; } public final boolean isAcceptable() { //判斷是否是接收操作 return (readyOps() & OP_ACCEPT) != 0; }
//服務器端 public class GroupChatSever { private final static int PORT = 6666;//監聽端口 private Selector selector;//選擇器 private ServerSocketChannel serverSocketChannel; public GroupChatSever(){ try{ selector = Selector.open();//開啟選擇器 serverSocketChannel = ServerSocketChannel.open();//開啟通道 serverSocketChannel.configureBlocking(false);//將通道設為非阻塞狀態 serverSocketChannel.socket().bind(new InetSocketAddress(PORT));//通道綁定監聽端口 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//將通道注冊到選擇器上,事件類型為接收 listen(); }catch (IOException e){ e.printStackTrace(); } } //對端口進行監聽 public void listen(){ try { while (true){ //檢查注冊通道是否有事件發生,檢查時長為2秒 int count = selector.select(2000); if (count > 0){//如果注冊通道有事件發生則進行處理 //獲取所有發生事件的通道對應的SelectionKey Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()){ SelectionKey key = keyIterator.next(); if (key.isAcceptable()){//判斷該key對應的通道是否需進行接收操作 //雖然accept()方法是阻塞的,但是因為對通道進行過判斷, //可以確定是有客戶端連接的,所以此時調用accept並不會阻塞 SocketChannel socketChannel = serverSocketChannel.accept(); socketChannel.configureBlocking(false); //接收后,將獲取的客戶端通道注冊到選擇器上,事件類型為讀 socketChannel.register(selector, SelectionKey.OP_READ); System.out.println(socketChannel.getRemoteAddress() + "上線!"); } if (key.isReadable()){//判斷該key對應的通道是否需進行讀操作 readFromClient(key); } //注意當處理完一個通道key時,需將它從迭代器中移除 keyIterator.remove(); } } } }catch (IOException e){ e.printStackTrace(); } } /** * 讀取客戶端發來的消息 * @param key 需讀取的通道對應的SelectionKey */ public void readFromClient(SelectionKey key){ SocketChannel socketChannel = null; try{ //通過SelectionKey獲取對應通道 socketChannel = (SocketChannel)key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); int read = socketChannel.read(byteBuffer); if (read > 0){ String message = new String(byteBuffer.array()); System.out.println("客戶端: " + message); sendToOtherClient(message, socketChannel); } }catch (IOException e){ //這里做了簡化,將所有異常都當做是客戶端斷開連接觸發的異常,實際項目中請不要這樣做 try{ System.out.println(socketChannel.getRemoteAddress() + "下線"); key.cancel();//將該SelectionKey撤銷 socketChannel.close();//再關閉對應通道 }catch (IOException e2){ e2.printStackTrace(); } } } /** * 將客戶端發送的消息轉發到其他客戶端 * @param message 轉發的消息 * @param from 發送消息的客戶端通道 * @throws IOException */ public void sendToOtherClient(String message, SocketChannel from) throws IOException{ System.out.println("消息轉發中......"); for (SelectionKey key : selector.keys()){//遍歷選擇器中所有SelectionKey Channel channel = key.channel();//根據SelectionKey獲取對應通道 //排除掉發送消息的通道,將消息寫入到其他客戶端通道 if (channel instanceof SocketChannel && channel != from){ SocketChannel socketChannel = (SocketChannel)channel; ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes()); socketChannel.write(byteBuffer); } } } public static void main(String[] args) { GroupChatSever groupChatSever = new GroupChatSever(); } }
//客戶端 public class GroupChatClient { private final static String SEVER_HOST = "127.0.0.1";//連接的客戶端主機 private final static int SEVER_PORT = 6666;//連接的客戶端端口 private Selector selector;//選擇器 private SocketChannel socketChannel; private String username;//儲存客戶端ip地址 public GroupChatClient(){ try { selector = Selector.open();//開啟選擇器 socketChannel = SocketChannel.open(new InetSocketAddress(SEVER_HOST, SEVER_PORT));//開啟通道 socketChannel.configureBlocking(false);//將通道設為非阻塞 socketChannel.register(selector, SelectionKey.OP_READ);//將通道注冊在選擇器上,事件類型為讀 username = socketChannel.getLocalAddress().toString().substring(1);//獲取客戶端ip地址 String message = " 進入聊天群!"; sendMessage(message); }catch (IOException e){ e.printStackTrace(); } } //發送消息 public void sendMessage(String message){ message = username+": "+message; try{ ByteBuffer byteBuffer = ByteBuffer.wrap(message.getBytes()); socketChannel.write(byteBuffer); }catch (IOException e){ e.printStackTrace(); } } //讀取從服務器轉發送過來的消息 public void readMessage(){ try{ int read = selector.select(); if (read > 0){ Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator(); while (keyIterator.hasNext()){ SelectionKey key = keyIterator.next(); if (key.isReadable()){ SocketChannel socketChannel = (SocketChannel)key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); socketChannel.read(byteBuffer); System.out.println(new String(byteBuffer.array())); } keyIterator.remove(); } } }catch (IOException e){ e.printStackTrace(); } } public static void main(String[] args) { final GroupChatClient groupChatClient = new GroupChatClient(); //客戶端開啟一個線程來監聽是否有服務器轉發來消息 new Thread(){ @Override public void run() { while (true){ groupChatClient.readMessage(); try { Thread.currentThread().sleep(1000); }catch (InterruptedException e){ e.printStackTrace(); } } } }.start(); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String message = scanner.nextLine(); groupChatClient.sendMessage(message); } } }