深入理解Java AIO(一)—— Java AIO的簡單使用
深入理解AIO系列分為三個部分
- 第一部分也就是本節的Java AIO的簡單使用
- 第二部分是AIO源碼解析(只解析關鍵部分)(待更新)
- 第三部分是Linux中的AIO實現
Future和CompletionHandler
Java 異步 IO 提供了兩種使用方式,分別是返回 Future 實例和使用回調函數。
Future 實例
-
future.isDone();
判斷操作是否已經完成,包括了正常完成、異常拋出、取消
-
future.cancel(true);
取消操作,方式是中斷。參數 true 說的是,即使這個任務正在執行,也會進行中斷。
-
future.isCancelled();
是否被取消,只有在任務正常結束之前被取消,這個方法才會返回 true
-
future.get();
這是我們的老朋友,獲取執行結果,阻塞。
-
future.get(10, TimeUnit.SECONDS);
如果上面的 get() 方法的阻塞你不滿意,那就設置個超時時間。
Futrue這個東西和在線程池里用的應該差不多,不過我暫時沒看明白怎么用,也沒查到什么資料,之后看一下線程池再過來更新這塊。
CompletionHandler 回調函數
java.nio.channels.CompletionHandler 接口定義:
public interface CompletionHandler<V,A> {
// attachment是用來傳遞參數進去的 void completed(V result, A attachment); void failed(Throwable exc, A attachment); }
用法:
AsynchronousServerSocketChannel listener = AsynchronousServerSocketChannel.open().bind(null); // accept 方法的第一個參數可以傳遞 attachment listener.accept(attachment, new CompletionHandler<AsynchronousSocketChannel, Object>() {
@Override public void completed( AsynchronousSocketChannel client, Object attachment) { // }
@Override public void failed(Throwable exc, Object attachment) { // } });
簡單實例
老樣子只貼服務端部分的:
Server
public class Server { public static void main(String[] args) throws IOException { // 實例化,並監聽端口 AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(8080)); // 自己定義一個 Attachment 類,用於傳遞一些信息 Attachment att = new Attachment(); att.setServer(server); server.accept(att, new CompletionHandler<AsynchronousSocketChannel, Attachment>() { @Override public void completed(AsynchronousSocketChannel client, Attachment att) { try { SocketAddress clientAddr = client.getRemoteAddress(); System.out.println("收到新的連接:" + clientAddr); // 收到新的連接后,server 應該重新調用 accept 方法等待新的連接進來 att.getServer().accept(att, this); Attachment newAtt = new Attachment(); newAtt.setServer(server); newAtt.setClient(client); newAtt.setReadMode(true); newAtt.setBuffer(ByteBuffer.allocate(2048)); // 這里也可以繼續使用匿名實現類,不過代碼不好看,所以這里專門定義一個類 client.read(newAtt.getBuffer(), newAtt, new ChannelHandler()); } catch (IOException ex) { ex.printStackTrace(); } } @Override public void failed(Throwable t, Attachment att) { System.out.println("accept failed"); } }); // 為了防止 main 線程退出 try { Thread.currentThread().join(); } catch (InterruptedException e) { } } }
ChannelHandler
public class ChannelHandler implements CompletionHandler<Integer, Attachment> { @Override public void completed(Integer result, Attachment att) { if (att.isReadMode()) { // 讀取來自客戶端的數據 ByteBuffer buffer = att.getBuffer(); buffer.flip(); byte bytes[] = new byte[buffer.limit()]; buffer.get(bytes); String msg = new String(buffer.array()).toString().trim(); System.out.println("收到來自客戶端的數據: " + msg); // 響應客戶端請求,返回數據 buffer.clear(); buffer.put("Response from server!".getBytes(Charset.forName("UTF-8"))); att.setReadMode(false); buffer.flip(); // 寫數據到客戶端也是異步 att.getClient().write(buffer, att, this); } else { // 到這里,說明往客戶端寫數據也結束了,有以下兩種選擇: // 1. 繼續等待客戶端發送新的數據過來 // att.setReadMode(true); // att.getBuffer().clear(); // att.getClient().read(att.getBuffer(), att, this); // 2. 既然服務端已經返回數據給客戶端,斷開這次的連接 try { att.getClient().close(); } catch (IOException e) { } } } @Override public void failed(Throwable t, Attachment att) { System.out.println("連接斷開"); } }
Attachment
public class Attachment { private AsynchronousServerSocketChannel server; private AsynchronousSocketChannel client; private boolean isReadMode; private ByteBuffer buffer; // getter & setter }
因為網上資料較少,只找到了
參考資料:Java 非阻塞 IO 和異步 IO