一、AIO簡介
AIO是java中IO模型的一種,作為NIO的改進和增強隨JDK1.7版本更新被集成在JDK的nio包中,因此AIO也被稱作是NIO2.0。區別於傳統的BIO(Blocking IO,同步阻塞式模型,JDK1.4之前就存在於JDK中,NIO於JDK1.4版本發布更新)的阻塞式讀寫,AIO提供了從建立連接到讀、寫的全異步操作。AIO可用於異步的文件讀寫和網絡通信。
二、同步/異步、阻塞/非阻塞
我們先來了解下什么是同步/異步,以及什么是阻塞/非阻塞。在IO操作中,IO分兩階段(一旦拿到數據后就變成了數據操作,不再是IO):
- 數據准備階段
- 內核空間復制數據到用戶進程緩沖區(用戶空間)階段 在操作系統中,程序運行的空間分為內核空間和用戶空間。 應用程序都是運行在用戶空間的,所以它們能操作的數據也都在用戶空間。
- 同步和異步IO的概念:同步是用戶線程發起I/O請求后需要等待或者輪詢內核I/O操作完成后才能繼續執行 異步是用戶線程發起I/O請求后仍需要繼續執行,當內核I/O操作完成后會通知用戶線程,或者調用用戶線程注冊的回調函數。
- 阻塞和非阻塞IO的概念: 阻塞是指I/O操作需要徹底完成后才能返回用戶空間 非阻塞是指I/O操作被調用后立即返回一個狀態值,無需等I/O操作徹底完成。
一般來講: 阻塞IO模型、非阻塞IO模型、IO復用模型(select/poll/epoll)、信號驅動IO模型都屬於同步IO,因為階段2是阻塞的(盡管時間很短)。同步IO和異步IO的區別就在於第二個步驟是否阻塞: 如果不阻塞,而是操作系統幫你做完IO操作再將結果返回給你,那么就是異步IO。
三、異步IO模型
異步IO則采用“訂閱-通知”模式:即應用程序向操作系統注冊IO監聽,然后繼續做自己的事情。當操作系統發生IO事件,並且准備好數據后,在主動通知應用程序,觸發相應的函數。也可以如下圖理解:
和同步IO一樣,異步IO也是由操作系統進行支持的。微軟的windows系統提供了一種異步IO技術:IOCP(I/O CompletionPort,I/O完成端口);Linux下由於沒有這種異步IO技術,所以使用的是epoll對異步IO進行模擬。
四、JAVA AIO框架簡析
JAVA AIO框架在windows下使用windows IOCP技術,在Linux下使用epoll多路復用IO技術模擬異步IO,這個從JAVA AIO框架的部分類設計上就可以看出來。例如框架中,在Windows下負責實現套接字通道的具體類是“sun.nio.ch.WindowsAsynchronousSocketChannelImpl”,在Linux下負責實現套接字通道的具體類是“sun.nio.ch.UnixAsynchronousServerSocketChannelImpl”,如下圖在Mac上安裝的JDK可以看到:
另外特別說明一下,請注意在上圖中的“java.nio.channels.NetworkChannel”接口,這個接口同樣被JAVA NIO框架實現了,如上圖所示:SocketChannel以及ServerSocketChannel就是NetworkChannel的實現。
在java中,支持異步模型的方式有兩個類:
- Future類
- Callable接口
五、AIO重要類
實現一個最簡單的AIO socket通信server、client,主要需要這些相關的類和接口:
AsynchronousServerSocketChannel
服務端Socket通道類,負責服務端Socket的創建和監聽;
AsynchronousSocketChannel
客戶端Socket通道類,負責客戶端消息讀寫;
CompletionHandler<A,V>
消息處理回調接口,是一個負責消費異步IO操作結果的消息處理器;
ByteBuffer
負責承載通信過程中需要讀、寫的消息。
此外,還有可選的用於異步通道資源共享的AsynchronousChannelGroup
類,接下來將一一介紹這些類的主要接口及使用。
1、AsynchronousServerSocketChannel
AsynchronousServerSocketChannel是一個流式監聽套接字的異步通道,是ServerSocketChannel的異步版本的通道,支持異步處理。AsynchronousServerSocketChannel的使用和ServerSocketChannel一樣需要經過三個步驟:創建/打開通道、綁定地址和端口和監聽客戶端連接請求。
1.1 創建/打開通道
try { AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); }
當打開通道失敗時,會拋出一個IOException異常。
1.2 綁定地址和端口
通過調用AsynchronousServerSocketChannel.bind(SocketAddress)方法來綁定監聽地址和端口:
// 構建一個InetSocketAddress實例以指定監聽的地址和端口,如果需要指定ip,則調用InetSocketAddress(ip,port)構造方法創建即可 serverSocketChannel.bind(new InetSocketAddress(port));
1.3 監聽和接收客戶端連接請求
監聽客戶端連接請求,主要通過調用AsynchronousServerSocketChannel.accept()方法完成。accept()有兩個重載方法:
public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>); public abstract Future<AsynchronousSocketChannel> accept();
這兩個重載方法的行為方式完全相同一種基於Future,一種基於回調,事實上,AIO的很多異步API都封裝了諸如此類的重載方法:提供CompletionHandle回調參數或者返回一個Future<T>類型變量。用過Feture接口的都知道,可以調用Feture.get()方法阻塞等待調用結果。無論是哪種方式來獲取連接,最終的處理對象都是Socket,和ServerSocketChannel不同的是,這里的socket是封裝在AsynchronousSocketChannel
中的。
基於Future實現:
public void AsynchronousServerSocketChannel() { try { AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(); channel.bind(new InetSocketAddress(8888)); while (true) { Future<AsynchronousSocketChannel> conn = channel.accept(); // 阻塞等待直到future有結果 AsynchronousSocketChannel asyncSocketChannel = conn.get(); // 異步處理連接 asyncHandle(asyncSocketChannel); } } catch (IOException | InterruptedException | ExecutionException e) { e.printStackTrace(); } }
基於回調:
public void AsynchronousServerSocketChannelCallback() { try { AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(); channel.bind(new InetSocketAddress(8888)); channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { @Override public void completed(AsynchronousSocketChannel result, Void attachment) {
// 接收到新的客戶端連接時調用,result就是和客戶端的連接對話,此時可以通過result和客戶端進行通信 System.out.println("accept completed"); // 異步處理連接 asyncHandle(result); // 繼續監聽accept channel.accept(null, this); } @Override public void failed(Throwable exc, Void attachment) {
// accept失敗時回調 System.out.println("accept failed"); } }); // 讓主線程保持存活 while (true) { System.in.read(); } } catch (IOException e) { e.printStackTrace(); } }
需要注意的是,AsynchronousServerSocketChannel是線程安全的,但在任何時候同一時間內只能允許有一個accept操作。因此,必須得等待前一個accept操作完成之后才能啟動下一個accept:
serverSocketChannel .accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel, AsynchronousServerSocketChannel>() { @Override public void completed(final AsynchronousSocketChannel result, final AsynchronousServerSocketChannel attachment) { // 接收到新的客戶端連接,此時本次accept已經完成 // 繼續監聽下一個客戶端連接到來 serverSocketChannel.accept(serverSocketChannel,this); // result即和該客戶端的連接會話 // 此時可以通過result與客戶端進行交互 } ... });
此外,還可以通過以下方法獲取和設置AsynchronousServerSocketChannel的socket選項:
// 設置socket選項 serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true); // 獲取socket選項設置 boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
其中StandardSocketOptions類封裝了常用的socket設置選項。
獲取本地地址:
InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();
1.4 AsynchronousChannelGroup異步通道組
try { ExecutorService pool = Executors.newCachedThreadPool(); AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 10); AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group); } catch (IOException e) { e.printStackTrace(); }
AsynchronousServerSocketChannel提供了設置通道分組(AsynchronousChannelGroup)的功能,以實現組內通道資源共享。可以調用open(AsynchronousChannelGroup)重載方法創建指定分組的通道,默認情況下,具有 open() 方法的通道屬於一個全局通道組,可利用如下系統變量對其進行配置:
java.nio.channels.DefaultThreadPoolthreadFactory
,其不采用默認設置,而是定義一個 java.util.concurrent.ThreadFactoryjava.nio.channels.DefaultThreadPool.initialSize
,指定線程池的初始規模
java.nio.channels.AsynchronousChannelGroup 中的三個實用方法提供了創建新通道組的方法:
withCachedThreadPool()
withFixedThreadPool()
withThreadPool()
這些方法或者對線程池進行定義,如 java.util.concurrent.ExecutorService,或者是 java.util.concurrent.ThreadFactory。例如,以下調用創建了具有線程池的新的通道組,該線程池包含 10 個線程,其中每個都構造為來自 Executors 類的線程工廠:
AsynchronousChannelGroup tenThreadGroup =
AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
三個異步網絡通道都具有 open() 方法的替代版本,它們采用給出的通道組而不是默認通道組。例如,當有異步操作請求時,此調用告訴 channel 使用 tenThreadGroup 而不是默認通道組來獲取線程:
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(tenThreadGroup);
定義自己的通道組可更好地控制服務於操作的線程,並能提供關閉線程或者等待終止的機制。
AsynchronousChannelGroup封裝了處理由綁定到組的異步通道所觸發的I/O操作完成所需的機制。每個AsynchronousChannelGroup關聯了一個被用於提交處理I/O事件和分發消費在組內通道上執行的異步操作結果的completion-handlers的線程池。除了處理I/O事件,該線程池還有可能處理其他一些用於支持完成異步I/O操作的任務。從上面例子可以看到,通過指定AsynchronousChannelGroup的方式打開AsynchronousServerSocketChannel,可以定制server channel執行的線程池。如果不指定AsynchronousChannelGroup,則AsynchronousServerSocketChannel會歸類到一個默認的分組中。
2、AsynchronousSocketChannel
AsynchronousSocketChannel和NIO通道是SocketChannel功能相似。是一個流式連接套接字的異步通道。
AsynchronousSocketChannel表示服務端與客戶端之間的連接通道。客戶端可以通過調用AsynchronousSocketChannel靜態方法open()創建,而服務端則通過調用AsynchronousServerSocketChannel.accept()方法后由AIO內部在合適的時候創建。下面以客戶端實現為例,介紹AsynchronousSocketChannel。
2.1 創建AsynchronousSocketChannel
需要通過open()創建和打開一個AsynchronousSocketChannel實例,再調用其connect()方法連接到服務端,接着才可以與服務端交互:
// 打開一個socket通道 AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open(); // 阻塞等待連接成功 socketChannel.connect(new InetSocketAddress(ip,port)).get(); // 連接成功,接下來可以進行read、write操作
同AsynchronousServerSocketChannel,AsynchronousSocketChannel也提供了open(AsynchronousChannelGroup)方法用於指定通道分組和定制線程池。
2.2 connect
socketChannel.connect()也提供了CompletionHandler回調和Future返回值兩個重載方法,上面例子使用帶Future返回值的重載,並調用get()方法阻塞等待連接建立完成。
// 基於回調 public abstract <A> void connect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler); // 基於Future 調用get()方法阻塞等待連接建立完成 public abstract Future<Void> connect(SocketAddress remote);
2.3 發送消息
可以構建一個ByteBuffer對象並調用socketChannel.write(ByteBuffer)方法異步發送消息,並通過CompletionHandler回調接收處理發送結果:
ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes()); socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() { @Override public void completed(final Integer result, final Object attachment) { // 發送完成,result:總共寫入的字節數 } @Override public void failed(final Throwable exc, final Object attachment) { // 發送失敗 } });
2.4 讀取消息
構建一個指定接收長度的ByteBuffer用於接收數據,調用socketChannel.read()方法讀取消息並通過CompletionHandler處理讀取結果:
ByteBuffer readBuffer = ByteBuffer.allocate(128); socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() { @Override public void completed(final Integer result, final Object attachment) { // 讀取完成,result:實際讀取的字節數。如果通道中沒有數據可讀則result=-1。 } @Override public void failed(final Throwable exc, final Object attachment) { // 讀取失敗 } });
此外,AsynchronousSocketChannel也封裝了設置/獲取socket選項的方法:
// 設置socket選項 socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true); // 獲取socket選項設置 boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
注意:讀寫操作,有多個重載的Future和回調式的read和write方法:
public abstract <A> void read(ByteBuffer dst, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer,? super A> handler); public final <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer,? super A> handler) public abstract Future<Integer> read(ByteBuffer dst); public abstract <A> void read(ByteBuffer[] dsts, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler<Long,? super A> handler); // write public abstract <A> void write(ByteBuffer src, long timeout, TimeUnit unit, A attachment, CompletionHandler<Integer,? super A> handler); public final <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler); public abstract Future<Integer> write(ByteBuffer src); public abstract <A> void write(ByteBuffer[] srcs, int offset, int length, long timeout, TimeUnit unit, A attachment, CompletionHandler<Long,? super A> handler);
如下服務器端示例,使用的是accept返回的channel:
// 基於future 實際上是同步的讀取方式 private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) { ByteBuffer dst = ByteBuffer.allocate(1024); // based on Future, // 實際上是同步處理的方式,為了不將處理變成阻塞式單連接的socket形式,使用子線程來獲取輸入流 new Thread(() -> { while (asyncSocketChannel.isOpen()) { Future<Integer> readFuture = asyncSocketChannel.read(dst); try { // 阻塞等待讀取結果 Integer readResult = readFuture.get(); if (readResult > 0) { System.out.println(new String(dst.array(), StandardCharsets.UTF_8)); dst.clear(); } else { // doOtherthing } } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } }).start(); } // 基於回調 private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) { asyncSocketChannel.read(dst, null, new CompletionHandler<Integer, Void>() { @Override public void completed(Integer result, Void attachment) { if (result > 0) { System.out.println(new String(dst.array(), StandardCharsets.UTF_8)); dst.clear(); } // 注冊回調,繼續讀取輸入 asyncSocketChannel.read(dst, null, this); } @Override public void failed(Throwable exc, Void attachment) { // TODO Auto-generated method stub } }); }
3、CompletionHandler
CompletionHandler是一個用於消費異步I/O操作結果的處理器。
AIO中定義的異步通道允許指定一個CompletionHandler處理器消費一個異步操作的結果。從上文中也可以看到,AIO中大部分的異步I/O操作接口都封裝了一個帶CompletionHandler類型參數的重載方法,使用CompletionHandler可以很方便地處理AIO中的異步I/O操作結果。CompletionHandler是一個具有兩個泛型類型參數的接口,聲明了兩個接口方法:
public interface CompletionHandler<V,A> { void completed(V result, A attachment); void failed(Throwable exc, A attachment); }
其中,泛型V表示I/O操作的結果類型,通過該類型參數消費I/O操作的結果;泛型A為附加到I/O操作中的對象類型,可以通過該類型參數將需要的變量傳入到CompletionHandler實現中使用。因此,AIO中大部分的異步I/O操作都有一個類似這樣的重載方法:
<V,A> void ioOperate(params,A attachment,CompletionHandler<V,A> handler);
例如,AsynchronousServerSocketChannel.accept()方法:
public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);
AsynchronousSocketChannel.write()方法等:
public final <A> void write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A> handler)
當I/O操作成功完成時,會回調到completed方法,failed方法則在I/O操作失敗時被回調。需要注意的是:在CompletionHandler的實現中應當即使處理操作結果,以避免一直占用調用線程而不能分發其他的CompletionHandler處理器。
六、AIO代碼實現
1、服務端
public class Server { private static int DEFAULT_PORT = 8888; private static AsyncServerHandler serverHandle; public volatile static long clientCount = 0; public static void start(){ start(DEFAULT_PORT); } public static synchronized void start(int port){ if(serverHandle!=null) return; serverHandle = new AsyncServerHandler(port); new Thread(serverHandle,"Server").start(); } public static void main(String[] args){ Server.start(); } }
public class AsyncServerHandler implements Runnable { public CountDownLatch latch; public AsynchronousServerSocketChannel channel; public AsyncServerHandler(int port) { try { //創建服務端通道 channel = AsynchronousServerSocketChannel.open(); //綁定端口 channel.bind(new InetSocketAddress(port)); System.out.println("服務器已啟動,端口號:" + port); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //CountDownLatch初始化 //它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞 //此處,讓現場在此阻塞,防止服務端執行完成后退出 //也可以使用while(true)+sleep //生成環境就不需要擔心這個問題,以為服務端是不會退出的 latch = new CountDownLatch(1); //用於接收客戶端的連接 channel.accept(this,new AcceptHandler()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } }
//作為handler接收客戶端連接 public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> { @Override public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) { //繼續接受其他客戶端的請求 Server.clientCount++; System.out.println("連接的客戶端數:" + Server.clientCount); serverHandler.channel.accept(serverHandler, this); //創建新的Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); //異步讀 第三個參數為接收消息回調的業務Handler channel.read(buffer, buffer, new ReadHandler(channel)); } @Override public void failed(Throwable exc, AsyncServerHandler serverHandler) { exc.printStackTrace(); serverHandler.latch.countDown(); } }
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { //用於讀取半包消息和發送應答 private AsynchronousSocketChannel channel; public ReadHandler(AsynchronousSocketChannel channel) { this.channel = channel; } //讀取到消息后的處理 @Override public void completed(Integer result, ByteBuffer attachment) { //flip操作 attachment.flip(); //根據 byte[] message = new byte[attachment.remaining()]; attachment.get(message); try { String expression = new String(message, "UTF-8"); System.out.println("服務器收到消息: " + expression); String calrResult = null; try{ calrResult = Caculator.cal(expression).toString(); }catch(Exception e){ calrResult = "計算錯誤:" + e.getMessage(); } //向客戶端發送消息 doWrite(calrResult); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } //發送消息 private void doWrite(String result) { byte[] bytes = result.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); //異步寫數據 參數與前面的read一樣 channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { //如果沒有發送完,就繼續發送直到完成 if (buffer.hasRemaining()) channel.write(buffer, buffer, this); else{ //創建新的Buffer ByteBuffer readBuffer = ByteBuffer.allocate(1024); //異步讀 第三個參數為接收消息回調的業務Handler channel.read(readBuffer, readBuffer, new ReadHandler(channel)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { } } }); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } }
2、客戶端
public class Client { private static String DEFAULT_HOST = "localhost"; private static int DEFAULT_PORT = 8888; private static AsyncClientHandler clientHandle; public static void start(){ start(DEFAULT_HOST,DEFAULT_PORT); } public static synchronized void start(String ip,int port){ if(clientHandle!=null) return; clientHandle = new AsyncClientHandler(ip,port); new Thread(clientHandle,"Client").start(); } //向服務器發送消息 public static boolean sendMsg(String msg) throws Exception{ if(msg.equals("q")) return false; clientHandle.sendMsg(msg); return true; } @SuppressWarnings("resource") public static void main(String[] args) throws Exception{ Client.start(); System.out.println("請輸入請求消息:"); Scanner scanner = new Scanner(System.in); while(Client.sendMsg(scanner.nextLine())); } }
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable { private AsynchronousSocketChannel clientChannel; private String host; private int port; private CountDownLatch latch; public AsyncClientHandler(String host, int port) { this.host = host; this.port = port; try { //創建異步的客戶端通道 clientChannel = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); } } @Override public void run() { //創建CountDownLatch等待 latch = new CountDownLatch(1); //發起異步連接操作,回調參數就是這個類本身,如果連接成功會回調completed方法 clientChannel.connect(new InetSocketAddress(host, port), this, this); try { latch.await(); } catch (InterruptedException e1) { e1.printStackTrace(); } try { clientChannel.close(); } catch (IOException e) { e.printStackTrace(); } } //連接服務器成功 //意味着TCP三次握手完成 @Override public void completed(Void result, AsyncClientHandler attachment) { System.out.println("客戶端成功連接到服務器..."); } //連接服務器失敗 @Override public void failed(Throwable exc, AsyncClientHandler attachment) { System.err.println("連接服務器失敗..."); exc.printStackTrace(); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { e.printStackTrace(); } } //向服務器發送消息 public void sendMsg(String msg){ byte[] req = msg.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(req.length); writeBuffer.put(req); writeBuffer.flip(); //異步寫 clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch)); } }
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public WriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result, ByteBuffer buffer) { //完成全部數據的寫入 if (buffer.hasRemaining()) { clientChannel.write(buffer, buffer, this); } else { //讀取數據 ByteBuffer readBuffer = ByteBuffer.allocate(1024); clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel, latch)); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { System.err.println("數據發送失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel clientChannel; private CountDownLatch latch; public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) { this.clientChannel = clientChannel; this.latch = latch; } @Override public void completed(Integer result,ByteBuffer buffer) { buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; buffer.get(bytes); String body; try { body = new String(bytes,"UTF-8"); System.out.println("客戶端收到結果:"+ body); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } } @Override public void failed(Throwable exc,ByteBuffer attachment) { System.err.println("數據讀取失敗..."); try { clientChannel.close(); latch.countDown(); } catch (IOException e) { } } }
3、測試類
public class Test { //測試主方法 @SuppressWarnings("resource") public static void main(String[] args) throws Exception { //運行服務器 Server.start(); //避免客戶端先於服務器啟動前執行代碼 Thread.sleep(100); //運行客戶端 Client.start(); System.out.println("請輸入請求消息:"); Scanner scanner = new Scanner(System.in); while (Client.sendMsg(scanner.nextLine())) ; } }
public final class Caculator { private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript"); public static Object cal(String expression) throws ScriptException { return jse.eval(expression); } }