AIO簡介
AIO是java中IO模型的一種,作為NIO的改進和增強隨JDK1.7版本更新被集成在JDK的nio包中,因此AIO也被稱作是NIO2.0。區別於傳統的BIO(Blocking IO,同步阻塞式模型,JDK1.4之前就存在於JDK中,NIO於JDK1.4版本發布更新)的阻塞式讀寫,AIO提供了從建立連接到讀、寫的全異步操作。AIO可用於異步的文件讀寫和網絡通信。
同步/異步、阻塞/非阻塞
我們先來了解下什么是同步/異步,以及什么是阻塞/非阻塞。在IO操作中,IO分兩階段(一旦拿到數據后就變成了數據操作,不再是IO):
- 數據准備階段
- 內核空間復制數據到用戶進程緩沖區(用戶空間)階段 在操作系統中,程序運行的空間分為內核空間和用戶空間。 應用程序都是運行在用戶空間的,所以它們能操作的數據也都在用戶空間。
- 同步和異步IO的概念:同步是用戶線程發起I/O請求后需要等待或者輪詢內核I/O操作完成后才能繼續執行;異步是用戶線程發起I/O請求后仍需要繼續執行,當內核I/O操作完成后會通知用戶線程,或者調用用戶線程注冊的回調函數。
- 阻塞和非阻塞IO的概念: 阻塞是指I/O操作需要徹底完成后才能返回用戶空間;非阻塞是指I/O操作被調用后立即返回一個狀態值,無需等I/O操作徹底完成。
一般來講: 阻塞IO模型、非阻塞IO模型、IO復用模型(select/poll/epoll)、信號驅動IO模型都屬於同步IO,因為階段2是阻塞的(盡管時間很短)。同步IO和異步IO的區別就在於第二個步驟是否阻塞: 如果不阻塞,而是操作系統幫你做完IO操作再將結果返回給你,那么就是異步IO。
異步IO模型
異步IO則采用“訂閱-通知”模式:即應用程序向操作系統注冊IO監聽,然后繼續做自己的事情。當操作系統發生IO事件,並且准備好數據后,在主動通知應用程序,觸發相應的函數。也可以如下圖理解:
和同步IO一樣,異步IO也是由操作系統進行支持的。微軟的windows系統提供了一種異步IO技術:IOCP(I/O CompletionPort,I/O完成端口);Linux下由於沒有這種異步IO技術,所以使用的是epoll對異步IO進行模擬。
JAVA AIO框架簡析
JAVA AIO框架在windows下使用windows IOCP技術,在Linux下使用epoll多路復用IO技術模擬異步IO,這個從JAVA AIO框架的部分類設計上就可以看出來。例如框架中,在Windows下負責實現套接字通道的具體類是“sun.nio.ch.WindowsAsynchronousSocketChannelImpl”,在Linux下負責實現套接字通道的具體類是“sun.nio.ch.UnixAsynchronousServerSocketChannelImpl”,如下圖在Mac上安裝的JDK可以看到:
另外特別說明一下,請注意在上圖中的“java.nio.channels.NetworkChannel”接口,這個接口同樣被JAVA NIO框架實現了,如上圖所示:SocketChannel以及ServerSocketChannel就是NetworkChannel的實現。
AIO和同步IO(BIO和NIO)不同在於,IO操作全部委托給了被調用者(操作系統),在阻塞和非阻塞IO中,不管是使用阻塞流還是使用select選擇器,用戶進程下一步操作都是依賴操作系統的IO操作結果的,也就是需要同步的。而在AIO中,也就是前面通俗說的,(先寫好回調操作)調用系統的IO操作。
在java中,支持異步模型的方式有兩個類:
- Future類
- Callable接口
嚴格來說,Future不能算是異步模型的類,因為future.get()方法是阻塞的,需要等待處理完成;而Callable是回調,是正宗的異步模型工具。一般來說,異步編程都是基於回調的。
AIO重要類
實現一個最簡單的AIO socket通信server、client,主要需要這些相關的類和接口:
1)AsynchronousServerSocketChannel
服務端Socket通道類,負責服務端Socket的創建和監聽;
2)AsynchronousSocketChannel
客戶端Socket通道類,負責客戶端消息讀寫;
3)CompletionHandler<A,V>
消息處理回調接口,是一個負責消費異步IO操作結果的消息處理器;
4) ByteBuffer
負責承載通信過程中需要讀、寫的消息。
此外,還有可選的用於異步通道資源共享的AsynchronousChannelGroup類,接下來將一一介紹這些類的主要接口及使用。
AsynchronousServerSocketChannel
AsynchronousServerSocketChannel是一個流式監聽套接字的異步通道,是ServerSocketChannel的異步版本的通道,支持異步處理。AsynchronousServerSocketChannel的使用和ServerSocketChannel一樣需要經過三個步驟:創建/打開通道、綁定地址和端口和監聽客戶端連接請求。
創建/打開通道
簡單地,可以通過調用AsynchronousServerSocketChannel的靜態方法open()來創建AsynchronousServerSocketChannel實例:
try {
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
當打開通道失敗時,會拋出一個IOException異常。
綁定地址和端口
通過調用AsynchronousServerSocketChannel.bind(SocketAddress)方法來綁定監聽地址和端口:
// 構建一個InetSocketAddress實例以指定監聽的地址和端口,如果需要指定ip,則調用InetSocketAddress(ip,port)構造方法創建即可
serverSocketChannel.bind(new InetSocketAddress(port));
監聽和接收客戶端連接請求
監聽客戶端連接請求,主要通過調用AsynchronousServerSocketChannel.accept()方法完成。accept()有兩個重載方法:
public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>);
public abstract Future<AsynchronousSocketChannel> accept();
這兩個重載方法的行為方式完全相同一種基於Future,一種基於回調,事實上,AIO的很多異步API都封裝了諸如此類的重載方法:提供CompletionHandle回調參數或者返回一個Future<T>類型變量。用過Feture接口的都知道,可以調用Future.get()方法阻塞等待調用結果。無論是哪種方式來獲取連接,最終的處理對象都是Socket,和ServerSocketChannel不同的是,這里的socket是封裝在AsynchronousSocketChannel中的。
基於Future實現:
public void AsynchronousServerSocketChannel() {
try {
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
channel.bind(new InetSocketAddress(8888));
while (true) {
Future<AsynchronousSocketChannel> conn = channel.accept();
// 阻塞等待直到future有結果
AsynchronousSocketChannel asyncSocketChannel = conn.get();
// 異步處理連接
asyncHandle(asyncSocketChannel);
}
} catch (IOException | InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
基於回調:
public void AsynchronousServerSocketChannelCallback() {
try {
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
channel.bind(new InetSocketAddress(8888));
channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {
@Override
public void completed(AsynchronousSocketChannel result, Void attachment) { // 接收到新的客戶端連接時調用,result就是和客戶端的連接對話,此時可以通過result和客戶端進行通信
System.out.println("accept completed");
// 異步處理連接
asyncHandle(result);
// 繼續監聽accept
channel.accept(null, this);
}
@Override
public void failed(Throwable exc, Void attachment) { // accept失敗時回調
System.out.println("accept failed");
}
});
// 讓主線程保持存活
while (true) {
System.in.read();
}
} catch (IOException e) {
e.printStackTrace();
}
}
需要注意的是,AsynchronousServerSocketChannel是線程安全的,但在任何時候同一時間內只能允許有一個accept操作。因此,必須得等待前一個accept操作完成之后才能啟動下一個accept:
serverSocketChannel
.accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel,
AsynchronousServerSocketChannel>() {
@Override
public void completed(final AsynchronousSocketChannel result,
final AsynchronousServerSocketChannel attachment) {
// 接收到新的客戶端連接,此時本次accept已經完成
// 繼續監聽下一個客戶端連接到來
serverSocketChannel.accept(serverSocketChannel,this);
// result即和該客戶端的連接會話
// 此時可以通過result與客戶端進行交互
}
...
});
此外,還可以通過以下方法獲取和設置AsynchronousServerSocketChannel的socket選項:
// 設置socket選項
serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// 獲取socket選項設置
boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
其中StandardSocketOptions類封裝了常用的socket設置選項。
獲取本地地址:
InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();
AsynchronousChannelGroup異步通道組
try {
ExecutorService pool = Executors.newCachedThreadPool();
AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 10);
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group);
} catch (IOException e) {
e.printStackTrace();
}
AsynchronousServerSocketChannel提供了設置通道分組(AsynchronousChannelGroup)的功能,以實現組內通道資源共享。可以調用open(AsynchronousChannelGroup)重載方法創建指定分組的通道,默認情況下,具有 open() 方法的通道屬於一個全局通道組,可利用如下系統變量對其進行配置:
- java.nio.channels.DefaultThreadPoolthreadFactory,其不采用默認設置,而是定義一個 java.util.concurrent.ThreadFactory
- java.nio.channels.DefaultThreadPool.initialSize,指定線程池的初始規模
java.nio.channels.AsynchronousChannelGroup 中的三個實用方法提供了創建新通道組的方法:
withCachedThreadPool()
withFixedThreadPool()
withThreadPool()
這些方法或者對線程池進行定義,如 java.util.concurrent.ExecutorService,或者是 java.util.concurrent.ThreadFactory。例如,以下調用創建了具有線程池的新的通道組,該線程池包含 10 個線程,其中每個都構造為來自 Executors 類的線程工廠:
AsynchronousChannelGroup tenThreadGroup =
AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());
三個異步網絡通道都具有 open() 方法的替代版本,它們采用給出的通道組而不是默認通道組。例如,當有異步操作請求時,此調用告訴 channel 使用 tenThreadGroup 而不是默認通道組來獲取線程:
AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(tenThreadGroup);
定義自己的通道組可更好地控制服務於操作的線程,並能提供關閉線程或者等待終止的機制。
AsynchronousChannelGroup封裝了處理由綁定到組的異步通道所觸發的I/O操作完成所需的機制。每個AsynchronousChannelGroup關聯了一個被用於提交處理I/O事件和分發消費在組內通道上執行的異步操作結果的completion-handlers的線程池。除了處理I/O事件,該線程池還有可能處理其他一些用於支持完成異步I/O操作的任務。從上面例子可以看到,通過指定AsynchronousChannelGroup的方式打開AsynchronousServerSocketChannel,可以定制server channel執行的線程池。如果不指定AsynchronousChannelGroup,則AsynchronousServerSocketChannel會歸類到一個默認的分組中。
AsynchronousSocketChannel
AsynchronousSocketChannel和NIO通道是SocketChannel功能相似。是一個流式連接套接字的異步通道。
AsynchronousSocketChannel表示服務端與客戶端之間的連接通道。客戶端可以通過調用AsynchronousSocketChannel靜態方法open()創建,而服務端則通過調用AsynchronousServerSocketChannel.accept()方法后由AIO內部在合適的時候創建。下面以客戶端實現為例,介紹AsynchronousSocketChannel。
創建AsynchronousSocketChannel
需要通過open()創建和打開一個AsynchronousSocketChannel實例,再調用其connect()方法連接到服務端,接着才可以與服務端交互:
// 打開一個socket通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// 阻塞等待連接成功
socketChannel.connect(new InetSocketAddress(ip,port)).get();
// 連接成功,接下來可以進行read、write操作
同AsynchronousServerSocketChannel,AsynchronousSocketChannel也提供了open(AsynchronousChannelGroup)方法用於指定通道分組和定制線程池。
connect
socketChannel.connect()也提供了CompletionHandler回調和Future返回值兩個重載方法,上面例子使用帶Future返回值的重載,並調用get()方法阻塞等待連接建立完成。
// 基於回調
public abstract <A> void connect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler);
// 基於Future 調用get()方法阻塞等待連接建立完成
public abstract Future<Void> connect(SocketAddress remote);
發送消息
可以構建一個ByteBuffer對象並調用socketChannel.write(ByteBuffer)方法異步發送消息,並通過CompletionHandler回調接收處理發送結果:
ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes());
socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(final Integer result, final Object attachment) {
// 發送完成,result:總共寫入的字節數
}
@Override
public void failed(final Throwable exc, final Object attachment) {
// 發送失敗
}
});
讀取消息
構建一個指定接收長度的ByteBuffer用於接收數據,調用socketChannel.read()方法讀取消息並通過CompletionHandler處理讀取結果:
ByteBuffer readBuffer = ByteBuffer.allocate(128);
socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() {
@Override
public void completed(final Integer result, final Object attachment) {
// 讀取完成,result:實際讀取的字節數。如果通道中沒有數據可讀則result=-1。
}
@Override
public void failed(final Throwable exc, final Object attachment) {
// 讀取失敗
}
});
此外,AsynchronousSocketChannel也封裝了設置/獲取socket選項的方法:
// 設置socket選項
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// 獲取socket選項設置
boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);
注意:讀寫操作,有多個重載的Future和回調式的read和write方法:
public abstract <A> void read(ByteBuffer dst,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler);
public final <A> void read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler);
public abstract Future<Integer> read(ByteBuffer dst);
public abstract <A> void read(ByteBuffer[] dsts,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler);
// write
public abstract <A> void write(ByteBuffer src,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Integer,? super A> handler);
public final <A> void write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler);
public abstract Future<Integer> write(ByteBuffer src);
public abstract <A> void write(ByteBuffer[] srcs,
int offset,
int length,
long timeout,
TimeUnit unit,
A attachment,
CompletionHandler<Long,? super A> handler);
如下服務器端示例,使用的是accept返回的channel:
// 基於future 實際上是同步的讀取方式
private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
ByteBuffer dst = ByteBuffer.allocate(1024);
// based on Future,
// 實際上是同步處理的方式,為了不將處理變成阻塞式單連接的socket形式,使用子線程來獲取輸入流
new Thread(() -> {
while (asyncSocketChannel.isOpen()) {
Future<Integer> readFuture = asyncSocketChannel.read(dst);
try {
// 阻塞等待讀取結果
Integer readResult = readFuture.get();
if (readResult > 0) {
System.out.println(new String(dst.array(), StandardCharsets.UTF_8));
dst.clear();
} else {
// doOtherthing
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
}).start();
}
// 基於回調
private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
asyncSocketChannel.read(dst, null, new CompletionHandler<Integer, Void>() {
@Override
public void completed(Integer result, Void attachment) {
if (result > 0) {
System.out.println(new String(dst.array(), StandardCharsets.UTF_8));
dst.clear();
}
// 注冊回調,繼續讀取輸入
asyncSocketChannel.read(dst, null, this);
}
@Override
public void failed(Throwable exc, Void attachment) {
// TODO Auto-generated method stub
}
});
}
CompletionHandler
CompletionHandler是一個用於消費異步I/O操作結果的處理器。
AIO中定義的異步通道允許指定一個CompletionHandler處理器消費一個異步操作的結果。從上文中也可以看到,AIO中大部分的異步I/O操作接口都封裝了一個帶CompletionHandler類型參數的重載方法,使用CompletionHandler可以很方便地處理AIO中的異步I/O操作結果。CompletionHandler是一個具有兩個泛型類型參數的接口,聲明了兩個接口方法:
public interface CompletionHandler<V,A> {
void completed(V result, A attachment);
void failed(Throwable exc, A attachment);
}
其中,泛型V表示I/O操作的結果類型,通過該類型參數消費I/O操作的結果;泛型A為附加到I/O操作中的對象類型,可以通過該類型參數將需要的變量傳入到CompletionHandler實現中使用。因此,AIO中大部分的異步I/O操作都有一個類似這樣的重載方法:
<V,A> void ioOperate(params,A attachment,CompletionHandler<V,A> handler);
例如,AsynchronousServerSocketChannel.accept()方法:
public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);
AsynchronousSocketChannel.write()方法等:
public final <A> void write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A> handler)
當I/O操作成功完成時,會回調到completed方法,failed方法則在I/O操作失敗時被回調。需要注意的是:在CompletionHandler的實現中應當即使處理操作結果,以避免一直占用調用線程而不能分發其他的CompletionHandler處理器。
AIO代碼實現
服務端
public class Server {
private static int DEFAULT_PORT = 8888;
private static AsyncServerHandler serverHandle;
public volatile static long clientCount = 0;
public static void start(){
start(DEFAULT_PORT);
}
public static synchronized void start(int port){
if(serverHandle!=null)
return;
serverHandle = new AsyncServerHandler(port);
new Thread(serverHandle,"Server").start();
}
public static void main(String[] args){
Server.start();
}
}
public class AsyncServerHandler implements Runnable {
public CountDownLatch latch;
public AsynchronousServerSocketChannel channel;
public AsyncServerHandler(int port) {
try {
//創建服務端通道
channel = AsynchronousServerSocketChannel.open();
//綁定端口
channel.bind(new InetSocketAddress(port));
System.out.println("服務器已啟動,端口號:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//CountDownLatch初始化
//它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞
//此處,讓現場在此阻塞,防止服務端執行完成后退出
//也可以使用while(true)+sleep
//生成環境就不需要擔心這個問題,以為服務端是不會退出的
latch = new CountDownLatch(1);
//用於接收客戶端的連接
channel.accept(this,new AcceptHandler());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//作為handler接收客戶端連接
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
@Override
public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
//繼續接受其他客戶端的請求
Server.clientCount++;
System.out.println("連接的客戶端數:" + Server.clientCount);
serverHandler.channel.accept(serverHandler, this);
//創建新的Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//異步讀 第三個參數為接收消息回調的業務Handler
channel.read(buffer, buffer, new ReadHandler(channel));
}
@Override
public void failed(Throwable exc, AsyncServerHandler serverHandler) {
exc.printStackTrace();
serverHandler.latch.countDown();
}
}
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
//用於讀取半包消息和發送應答
private AsynchronousSocketChannel channel;
public ReadHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
//讀取到消息后的處理
@Override
public void completed(Integer result, ByteBuffer attachment) {
//flip操作
attachment.flip();
//根據
byte[] message = new byte[attachment.remaining()];
attachment.get(message);
try {
String expression = new String(message, "UTF-8");
System.out.println("服務器收到消息: " + expression);
String calrResult = null;
try{
calrResult = Caculator.cal(expression).toString();
}catch(Exception e){
calrResult = "計算錯誤:" + e.getMessage();
}
//向客戶端發送消息
doWrite(calrResult);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//發送消息
private void doWrite(String result) {
byte[] bytes = result.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
//異步寫數據 參數與前面的read一樣
channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
//如果沒有發送完,就繼續發送直到完成
if (buffer.hasRemaining())
channel.write(buffer, buffer, this);
else{
//創建新的Buffer
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//異步讀 第三個參數為接收消息回調的業務Handler
channel.read(readBuffer, readBuffer, new ReadHandler(channel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
客戶端
public class Client {
private static String DEFAULT_HOST = "localhost";
private static int DEFAULT_PORT = 8888;
private static AsyncClientHandler clientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
return;
clientHandle = new AsyncClientHandler(ip,port);
new Thread(clientHandle,"Client").start();
}
//向服務器發送消息
public static boolean sendMsg(String msg) throws Exception{
if(msg.equals("q")) return false;
clientHandle.sendMsg(msg);
return true;
}
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception{
Client.start();
System.out.println("請輸入請求消息:");
Scanner scanner = new Scanner(System.in);
while(Client.sendMsg(scanner.nextLine()));
}
}
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
private AsynchronousSocketChannel clientChannel;
private String host;
private int port;
private CountDownLatch latch;
public AsyncClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
//創建異步的客戶端通道
clientChannel = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//創建CountDownLatch等待
latch = new CountDownLatch(1);
//發起異步連接操作,回調參數就是這個類本身,如果連接成功會回調completed方法
clientChannel.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//連接服務器成功
//意味着TCP三次握手完成
@Override
public void completed(Void result, AsyncClientHandler attachment) {
System.out.println("客戶端成功連接到服務器...");
}
//連接服務器失敗
@Override
public void failed(Throwable exc, AsyncClientHandler attachment) {
System.err.println("連接服務器失敗...");
exc.printStackTrace();
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
//向服務器發送消息
public void sendMsg(String msg){
byte[] req = msg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
//異步寫
clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
}
}
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;
public WriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
//完成全部數據的寫入
if (buffer.hasRemaining()) {
clientChannel.write(buffer, buffer, this);
} else {
//讀取數據
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel, latch));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("數據發送失敗...");
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
}
}
}
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;
public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}
@Override
public void completed(Integer result,ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes,"UTF-8");
System.out.println("客戶端收到結果:"+ body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {
System.err.println("數據讀取失敗...");
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
}
}
}
測試類
public class Test {
//測試主方法
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception {
//運行服務器
Server.start();
//避免客戶端先於服務器啟動前執行代碼
Thread.sleep(100);
//運行客戶端
Client.start();
System.out.println("請輸入請求消息:");
Scanner scanner = new Scanner(System.in);
while (Client.sendMsg(scanner.nextLine())) ;
}
}
public final class Caculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException {
return jse.eval(expression);
}
}
參考: |