IO模型之AIO代碼及其實踐詳解


AIO簡介

AIO是java中IO模型的一種,作為NIO的改進和增強隨JDK1.7版本更新被集成在JDK的nio包中,因此AIO也被稱作是NIO2.0。區別於傳統的BIO(Blocking IO,同步阻塞式模型,JDK1.4之前就存在於JDK中,NIO於JDK1.4版本發布更新)的阻塞式讀寫,AIO提供了從建立連接到讀、寫的全異步操作。AIO可用於異步的文件讀寫和網絡通信。

同步/異步、阻塞/非阻塞

我們先來了解下什么是同步/異步,以及什么是阻塞/非阻塞。在IO操作中,IO分兩階段(一旦拿到數據后就變成了數據操作,不再是IO):

  1. 數據准備階段
  2. 內核空間復制數據到用戶進程緩沖區(用戶空間)階段 在操作系統中,程序運行的空間分為內核空間和用戶空間。 應用程序都是運行在用戶空間的,所以它們能操作的數據也都在用戶空間。
  • 同步和異步IO的概念:同步是用戶線程發起I/O請求后需要等待或者輪詢內核I/O操作完成后才能繼續執行;異步是用戶線程發起I/O請求后仍需要繼續執行,當內核I/O操作完成后會通知用戶線程,或者調用用戶線程注冊的回調函數。
  • 阻塞和非阻塞IO的概念: 阻塞是指I/O操作需要徹底完成后才能返回用戶空間;非阻塞是指I/O操作被調用后立即返回一個狀態值,無需等I/O操作徹底完成。

一般來講: 阻塞IO模型、非阻塞IO模型、IO復用模型(select/poll/epoll)、信號驅動IO模型都屬於同步IO,因為階段2是阻塞的(盡管時間很短)。同步IO和異步IO的區別就在於第二個步驟是否阻塞: 如果不阻塞,而是操作系統幫你做完IO操作再將結果返回給你,那么就是異步IO。

異步IO模型

異步IO則采用“訂閱-通知”模式:即應用程序向操作系統注冊IO監聽,然后繼續做自己的事情。當操作系統發生IO事件,並且准備好數據后,在主動通知應用程序,觸發相應的函數。也可以如下圖理解:

和同步IO一樣,異步IO也是由操作系統進行支持的。微軟的windows系統提供了一種異步IO技術:IOCP(I/O CompletionPort,I/O完成端口);Linux下由於沒有這種異步IO技術,所以使用的是epoll對異步IO進行模擬。

JAVA AIO框架簡析

JAVA AIO框架在windows下使用windows IOCP技術,在Linux下使用epoll多路復用IO技術模擬異步IO,這個從JAVA AIO框架的部分類設計上就可以看出來。例如框架中,在Windows下負責實現套接字通道的具體類是“sun.nio.ch.WindowsAsynchronousSocketChannelImpl”,在Linux下負責實現套接字通道的具體類是“sun.nio.ch.UnixAsynchronousServerSocketChannelImpl”,如下圖在Mac上安裝的JDK可以看到:

另外特別說明一下,請注意在上圖中的“java.nio.channels.NetworkChannel”接口,這個接口同樣被JAVA NIO框架實現了,如上圖所示:SocketChannel以及ServerSocketChannel就是NetworkChannel的實現。

AIO和同步IO(BIO和NIO)不同在於,IO操作全部委托給了被調用者(操作系統),在阻塞和非阻塞IO中,不管是使用阻塞流還是使用select選擇器,用戶進程下一步操作都是依賴操作系統的IO操作結果的,也就是需要同步的。而在AIO中,也就是前面通俗說的,(先寫好回調操作)調用系統的IO操作。

在java中,支持異步模型的方式有兩個類:

  • Future類
  • Callable接口

嚴格來說,Future不能算是異步模型的類,因為future.get()方法是阻塞的,需要等待處理完成;而Callable是回調,是正宗的異步模型工具。一般來說,異步編程都是基於回調的。

AIO重要類

實現一個最簡單的AIO socket通信server、client,主要需要這些相關的類和接口:

1)AsynchronousServerSocketChannel

服務端Socket通道類,負責服務端Socket的創建和監聽;

2)AsynchronousSocketChannel

客戶端Socket通道類,負責客戶端消息讀寫;

3)CompletionHandler<A,V>

消息處理回調接口,是一個負責消費異步IO操作結果的消息處理器;

4) ByteBuffer

負責承載通信過程中需要讀、寫的消息。

此外,還有可選的用於異步通道資源共享的AsynchronousChannelGroup類,接下來將一一介紹這些類的主要接口及使用。

AsynchronousServerSocketChannel

AsynchronousServerSocketChannel是一個流式監聽套接字的異步通道,是ServerSocketChannel的異步版本的通道,支持異步處理。AsynchronousServerSocketChannel的使用和ServerSocketChannel一樣需要經過三個步驟:創建/打開通道、綁定地址和端口和監聽客戶端連接請求。

創建/打開通道

簡單地,可以通過調用AsynchronousServerSocketChannel的靜態方法open()來創建AsynchronousServerSocketChannel實例:

try {
    AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
} catch (IOException e) {
    e.printStackTrace();
}

當打開通道失敗時,會拋出一個IOException異常。

綁定地址和端口

通過調用AsynchronousServerSocketChannel.bind(SocketAddress)方法來綁定監聽地址和端口:

// 構建一個InetSocketAddress實例以指定監聽的地址和端口,如果需要指定ip,則調用InetSocketAddress(ip,port)構造方法創建即可
serverSocketChannel.bind(new InetSocketAddress(port));

監聽和接收客戶端連接請求

監聽客戶端連接請求,主要通過調用AsynchronousServerSocketChannel.accept()方法完成。accept()有兩個重載方法:

public abstract <A> void accept(A,CompletionHandler<AsynchronousSocketChannel,? super A>);
public abstract Future<AsynchronousSocketChannel> accept();

這兩個重載方法的行為方式完全相同一種基於Future,一種基於回調,事實上,AIO的很多異步API都封裝了諸如此類的重載方法:提供CompletionHandle回調參數或者返回一個Future<T>類型變量。用過Feture接口的都知道,可以調用Future.get()方法阻塞等待調用結果。無論是哪種方式來獲取連接,最終的處理對象都是Socket,和ServerSocketChannel不同的是,這里的socket是封裝在AsynchronousSocketChannel中的。

基於Future實現:

public void AsynchronousServerSocketChannel() {
    try {
        AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
        channel.bind(new InetSocketAddress(8888));
        while (true) {
            Future<AsynchronousSocketChannel> conn = channel.accept();
            // 阻塞等待直到future有結果
            AsynchronousSocketChannel asyncSocketChannel = conn.get();
            // 異步處理連接
            asyncHandle(asyncSocketChannel);
        }
    } catch (IOException | InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

基於回調: 

public void AsynchronousServerSocketChannelCallback() {
    try {
        AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open();
        channel.bind(new InetSocketAddress(8888));
        channel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() {

            @Override
            public void completed(AsynchronousSocketChannel result, Void attachment) {            // 接收到新的客戶端連接時調用,result就是和客戶端的連接對話,此時可以通過result和客戶端進行通信
                System.out.println("accept completed");
                // 異步處理連接
                asyncHandle(result);
                // 繼續監聽accept
                channel.accept(null, this);
            }

            @Override
            public void failed(Throwable exc, Void attachment) {            // accept失敗時回調
                System.out.println("accept failed");
            }
        });
        // 讓主線程保持存活
        while (true) {
            System.in.read();
        }

    } catch (IOException e) {
        e.printStackTrace();
    }
}

需要注意的是,AsynchronousServerSocketChannel是線程安全的,但在任何時候同一時間內只能允許有一個accept操作。因此,必須得等待前一個accept操作完成之后才能啟動下一個accept:

serverSocketChannel
.accept(serverSocketChannel, new CompletionHandler<AsynchronousSocketChannel,
        AsynchronousServerSocketChannel>() {
          @Override
          public void completed(final AsynchronousSocketChannel result,
                                final AsynchronousServerSocketChannel attachment) {
            // 接收到新的客戶端連接,此時本次accept已經完成
            // 繼續監聽下一個客戶端連接到來
            serverSocketChannel.accept(serverSocketChannel,this);
            // result即和該客戶端的連接會話
            // 此時可以通過result與客戶端進行交互
          }
          ...
});

此外,還可以通過以下方法獲取和設置AsynchronousServerSocketChannel的socket選項: 

// 設置socket選項
serverSocketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// 獲取socket選項設置
boolean keepAlive = serverSocketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);

其中StandardSocketOptions類封裝了常用的socket設置選項。 

獲取本地地址:

InetSocketAddress address = (InetSocketAddress) serverSocketChannel.getLocalAddress();

AsynchronousChannelGroup異步通道組

try {
    ExecutorService pool = Executors.newCachedThreadPool();
    AsynchronousChannelGroup group = AsynchronousChannelGroup.withCachedThreadPool(pool, 10);
    AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open(group);
} catch (IOException e) {
    e.printStackTrace();
}

AsynchronousServerSocketChannel提供了設置通道分組(AsynchronousChannelGroup)的功能,以實現組內通道資源共享。可以調用open(AsynchronousChannelGroup)重載方法創建指定分組的通道,默認情況下,具有 open() 方法的通道屬於一個全局通道組,可利用如下系統變量對其進行配置:

  • java.nio.channels.DefaultThreadPoolthreadFactory,其不采用默認設置,而是定義一個 java.util.concurrent.ThreadFactory
  • java.nio.channels.DefaultThreadPool.initialSize,指定線程池的初始規模

java.nio.channels.AsynchronousChannelGroup 中的三個實用方法提供了創建新通道組的方法:

withCachedThreadPool()
withFixedThreadPool()
withThreadPool()

這些方法或者對線程池進行定義,如 java.util.concurrent.ExecutorService,或者是 java.util.concurrent.ThreadFactory。例如,以下調用創建了具有線程池的新的通道組,該線程池包含 10 個線程,其中每個都構造為來自 Executors 類的線程工廠:

AsynchronousChannelGroup tenThreadGroup =
AsynchronousChannelGroup.withFixedThreadPool(10, Executors.defaultThreadFactory());

三個異步網絡通道都具有 open() 方法的替代版本,它們采用給出的通道組而不是默認通道組。例如,當有異步操作請求時,此調用告訴 channel 使用 tenThreadGroup 而不是默認通道組來獲取線程:

AsynchronousServerSocketChannel channel = AsynchronousServerSocketChannel.open(tenThreadGroup);

定義自己的通道組可更好地控制服務於操作的線程,並能提供關閉線程或者等待終止的機制。

AsynchronousChannelGroup封裝了處理由綁定到組的異步通道所觸發的I/O操作完成所需的機制。每個AsynchronousChannelGroup關聯了一個被用於提交處理I/O事件和分發消費在組內通道上執行的異步操作結果的completion-handlers的線程池。除了處理I/O事件,該線程池還有可能處理其他一些用於支持完成異步I/O操作的任務。從上面例子可以看到,通過指定AsynchronousChannelGroup的方式打開AsynchronousServerSocketChannel,可以定制server channel執行的線程池。如果不指定AsynchronousChannelGroup,則AsynchronousServerSocketChannel會歸類到一個默認的分組中。

AsynchronousSocketChannel

AsynchronousSocketChannel和NIO通道是SocketChannel功能相似。是一個流式連接套接字的異步通道。

AsynchronousSocketChannel表示服務端與客戶端之間的連接通道。客戶端可以通過調用AsynchronousSocketChannel靜態方法open()創建,而服務端則通過調用AsynchronousServerSocketChannel.accept()方法后由AIO內部在合適的時候創建。下面以客戶端實現為例,介紹AsynchronousSocketChannel。

創建AsynchronousSocketChannel

需要通過open()創建和打開一個AsynchronousSocketChannel實例,再調用其connect()方法連接到服務端,接着才可以與服務端交互:

// 打開一個socket通道
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
// 阻塞等待連接成功
socketChannel.connect(new InetSocketAddress(ip,port)).get();
// 連接成功,接下來可以進行read、write操作

同AsynchronousServerSocketChannel,AsynchronousSocketChannel也提供了open(AsynchronousChannelGroup)方法用於指定通道分組和定制線程池。

connect

socketChannel.connect()也提供了CompletionHandler回調和Future返回值兩個重載方法,上面例子使用帶Future返回值的重載,並調用get()方法阻塞等待連接建立完成。

// 基於回調
public abstract <A> void connect(SocketAddress remote, A attachment, CompletionHandler<Void,? super A> handler);
// 基於Future 調用get()方法阻塞等待連接建立完成
public abstract Future<Void> connect(SocketAddress remote);

發送消息

可以構建一個ByteBuffer對象並調用socketChannel.write(ByteBuffer)方法異步發送消息,並通過CompletionHandler回調接收處理發送結果:

ByteBuffer writeBuf = ByteBuffer.wrap("From socketChannel:Hello i am socketChannel".getBytes());
socketChannel.write(writeBuf, null, new CompletionHandler<Integer, Object>() {
  @Override
  public void completed(final Integer result, final Object attachment) {
    // 發送完成,result:總共寫入的字節數
  }

  @Override
  public void failed(final Throwable exc, final Object attachment) {
    // 發送失敗
  }
});

讀取消息

構建一個指定接收長度的ByteBuffer用於接收數據,調用socketChannel.read()方法讀取消息並通過CompletionHandler處理讀取結果:

ByteBuffer readBuffer = ByteBuffer.allocate(128);
socketChannel.read(readBuffer, null, new CompletionHandler<Integer, Object>() {
  @Override
  public void completed(final Integer result, final Object attachment) {
    // 讀取完成,result:實際讀取的字節數。如果通道中沒有數據可讀則result=-1。
  }

  @Override
  public void failed(final Throwable exc, final Object attachment) {
    // 讀取失敗
  }
});

此外,AsynchronousSocketChannel也封裝了設置/獲取socket選項的方法:

// 設置socket選項
socketChannel.setOption(StandardSocketOptions.SO_KEEPALIVE,true);
// 獲取socket選項設置
boolean keepAlive = socketChannel.getOption(StandardSocketOptions.SO_KEEPALIVE);

注意:讀寫操作,有多個重載的Future和回調式的read和write方法:

public abstract <A> void read(ByteBuffer dst,
                                  long timeout,
                                  TimeUnit unit,
                                  A attachment,
                                  CompletionHandler<Integer,? super A> handler);

public final <A> void read(ByteBuffer dst,
                           A attachment,
                           CompletionHandler<Integer,? super A> handler);

public abstract Future<Integer> read(ByteBuffer dst);

public abstract <A> void read(ByteBuffer[] dsts,
                              int offset,
                              int length,
                              long timeout,
                              TimeUnit unit,
                              A attachment,
                              CompletionHandler<Long,? super A> handler);
// write
public abstract <A> void write(ByteBuffer src,
                               long timeout,
                               TimeUnit unit,
                               A attachment,
                               CompletionHandler<Integer,? super A> handler);

public final <A> void write(ByteBuffer src,
                            A attachment,
                            CompletionHandler<Integer,? super A> handler);

public abstract Future<Integer> write(ByteBuffer src);

public abstract <A> void write(ByteBuffer[] srcs,
                               int offset,
                               int length,
                               long timeout,
                               TimeUnit unit,
                               A attachment,
                               CompletionHandler<Long,? super A> handler);

如下服務器端示例,使用的是accept返回的channel:

// 基於future 實際上是同步的讀取方式
private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
    ByteBuffer dst = ByteBuffer.allocate(1024);
    // based on Future,
    // 實際上是同步處理的方式,為了不將處理變成阻塞式單連接的socket形式,使用子線程來獲取輸入流
    new Thread(() -> {
        while (asyncSocketChannel.isOpen()) {
            Future<Integer> readFuture = asyncSocketChannel.read(dst);
            try {
                // 阻塞等待讀取結果
                Integer readResult = readFuture.get();
                if (readResult > 0) {

                    System.out.println(new String(dst.array(), StandardCharsets.UTF_8));
                    dst.clear();

                } else {
                    // doOtherthing
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }).start();

}

// 基於回調
private void asyncHandle(AsynchronousSocketChannel asyncSocketChannel) {
    asyncSocketChannel.read(dst, null, new CompletionHandler<Integer, Void>() {

        @Override
        public void completed(Integer result, Void attachment) {
            if (result > 0) {
                System.out.println(new String(dst.array(), StandardCharsets.UTF_8));
                dst.clear();
            }
            // 注冊回調,繼續讀取輸入
            asyncSocketChannel.read(dst, null, this);

        }

        @Override
        public void failed(Throwable exc, Void attachment) {
            // TODO Auto-generated method stub

        }
    });
}

CompletionHandler

CompletionHandler是一個用於消費異步I/O操作結果的處理器。

AIO中定義的異步通道允許指定一個CompletionHandler處理器消費一個異步操作的結果。從上文中也可以看到,AIO中大部分的異步I/O操作接口都封裝了一個帶CompletionHandler類型參數的重載方法,使用CompletionHandler可以很方便地處理AIO中的異步I/O操作結果。CompletionHandler是一個具有兩個泛型類型參數的接口,聲明了兩個接口方法:

public interface CompletionHandler<V,A> {
    void completed(V result, A attachment);
    void failed(Throwable exc, A attachment);
}

其中,泛型V表示I/O操作的結果類型,通過該類型參數消費I/O操作的結果;泛型A為附加到I/O操作中的對象類型,可以通過該類型參數將需要的變量傳入到CompletionHandler實現中使用。因此,AIO中大部分的異步I/O操作都有一個類似這樣的重載方法:

<V,A> void ioOperate(params,A attachment,CompletionHandler<V,A> handler);

例如,AsynchronousServerSocketChannel.accept()方法:

public abstract <A> void accept(A attachment,CompletionHandler<AsynchronousSocketChannel,? super A> handler);

AsynchronousSocketChannel.write()方法等:

public final <A> void write(ByteBuffer src,A attachment,CompletionHandler<Integer,? super A> handler)

當I/O操作成功完成時,會回調到completed方法,failed方法則在I/O操作失敗時被回調。需要注意的是:在CompletionHandler的實現中應當即使處理操作結果,以避免一直占用調用線程而不能分發其他的CompletionHandler處理器。

AIO代碼實現

服務端

public class Server {
    private static int DEFAULT_PORT = 8888;
    private static AsyncServerHandler serverHandle;
    public volatile static long clientCount = 0;
    public static void start(){
        start(DEFAULT_PORT);
    }
    public static synchronized void start(int port){
        if(serverHandle!=null)
            return;
        serverHandle = new AsyncServerHandler(port);
        new Thread(serverHandle,"Server").start();
    }
    public static void main(String[] args){
        Server.start();
    }
}
public class AsyncServerHandler implements Runnable {
    public CountDownLatch latch;
    public AsynchronousServerSocketChannel channel;
    public AsyncServerHandler(int port) {
        try {
            //創建服務端通道
            channel = AsynchronousServerSocketChannel.open();
            //綁定端口
            channel.bind(new InetSocketAddress(port));
            System.out.println("服務器已啟動,端口號:" + port);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        //CountDownLatch初始化
        //它的作用:在完成一組正在執行的操作之前,允許當前的現場一直阻塞
        //此處,讓現場在此阻塞,防止服務端執行完成后退出
        //也可以使用while(true)+sleep
        //生成環境就不需要擔心這個問題,以為服務端是不會退出的
        latch = new CountDownLatch(1);
        //用於接收客戶端的連接
        channel.accept(this,new AcceptHandler());
        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
//作為handler接收客戶端連接
public class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
    @Override
    public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
        //繼續接受其他客戶端的請求
        Server.clientCount++;
        System.out.println("連接的客戶端數:" + Server.clientCount);
        serverHandler.channel.accept(serverHandler, this);
        //創建新的Buffer
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        //異步讀  第三個參數為接收消息回調的業務Handler
        channel.read(buffer, buffer, new ReadHandler(channel));
    }
    @Override
    public void failed(Throwable exc, AsyncServerHandler serverHandler) {
        exc.printStackTrace();
        serverHandler.latch.countDown();
    }
}
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    //用於讀取半包消息和發送應答
    private AsynchronousSocketChannel channel;
    public ReadHandler(AsynchronousSocketChannel channel) {
        this.channel = channel;
    }
    //讀取到消息后的處理
    @Override
    public void completed(Integer result, ByteBuffer attachment) {
        //flip操作
        attachment.flip();
        //根據
        byte[] message = new byte[attachment.remaining()];
        attachment.get(message);
        try {
            String expression = new String(message, "UTF-8");
            System.out.println("服務器收到消息: " + expression);
            String calrResult = null;
            try{
                calrResult = Caculator.cal(expression).toString();
            }catch(Exception e){
                calrResult = "計算錯誤:" + e.getMessage();
            }
            //向客戶端發送消息
            doWrite(calrResult);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    //發送消息
    private void doWrite(String result) {
        byte[] bytes = result.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
        writeBuffer.put(bytes);
        writeBuffer.flip();
        //異步寫數據 參數與前面的read一樣
        channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
            @Override
            public void completed(Integer result, ByteBuffer buffer) {
                //如果沒有發送完,就繼續發送直到完成
                if (buffer.hasRemaining())
                    channel.write(buffer, buffer, this);
                else{
                    //創建新的Buffer
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    //異步讀  第三個參數為接收消息回調的業務Handler
                    channel.read(readBuffer, readBuffer, new ReadHandler(channel));
                }
            }
            @Override
            public void failed(Throwable exc, ByteBuffer attachment) {
                try {
                    channel.close();
                } catch (IOException e) {
                }
            }
        });
    }
    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        try {
            this.channel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客戶端

public class Client {
    private static String DEFAULT_HOST = "localhost";
    private static int DEFAULT_PORT = 8888;
    private static AsyncClientHandler clientHandle;
    public static void start(){
        start(DEFAULT_HOST,DEFAULT_PORT);
    }
    public static synchronized void start(String ip,int port){
        if(clientHandle!=null)
            return;
        clientHandle = new AsyncClientHandler(ip,port);
        new Thread(clientHandle,"Client").start();
    }
    //向服務器發送消息
    public static boolean sendMsg(String msg) throws Exception{
        if(msg.equals("q")) return false;
        clientHandle.sendMsg(msg);
        return true;
    }
    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception{
        Client.start();
        System.out.println("請輸入請求消息:");
        Scanner scanner = new Scanner(System.in);
        while(Client.sendMsg(scanner.nextLine()));
    }
}
public class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
    private AsynchronousSocketChannel clientChannel;
    private String host;
    private int port;
    private CountDownLatch latch;
    public AsyncClientHandler(String host, int port) {
        this.host = host;
        this.port = port;
        try {
            //創建異步的客戶端通道
            clientChannel = AsynchronousSocketChannel.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void run() {
        //創建CountDownLatch等待
        latch = new CountDownLatch(1);
        //發起異步連接操作,回調參數就是這個類本身,如果連接成功會回調completed方法
        clientChannel.connect(new InetSocketAddress(host, port), this, this);
        try {
            latch.await();
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
        try {
            clientChannel.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //連接服務器成功
    //意味着TCP三次握手完成
    @Override
    public void completed(Void result, AsyncClientHandler attachment) {
        System.out.println("客戶端成功連接到服務器...");
    }
    //連接服務器失敗
    @Override
    public void failed(Throwable exc, AsyncClientHandler attachment) {
        System.err.println("連接服務器失敗...");
        exc.printStackTrace();
        try {
            clientChannel.close();
            latch.countDown();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    //向服務器發送消息
    public void sendMsg(String msg){
        byte[] req = msg.getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        //異步寫
        clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
    }
}
public class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel clientChannel;
    private CountDownLatch latch;

    public WriteHandler(AsynchronousSocketChannel clientChannel, CountDownLatch latch) {
        this.clientChannel = clientChannel;
        this.latch = latch;
    }

    @Override
    public void completed(Integer result, ByteBuffer buffer) {
        //完成全部數據的寫入
        if (buffer.hasRemaining()) {
            clientChannel.write(buffer, buffer, this);
        } else {
            //讀取數據
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            clientChannel.read(readBuffer, readBuffer, new ReadHandler(clientChannel, latch));
        }
    }

    @Override
    public void failed(Throwable exc, ByteBuffer attachment) {
        System.err.println("數據發送失敗...");
        try {
            clientChannel.close();
            latch.countDown();
        } catch (IOException e) {
        }
    }
}
public class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
    private AsynchronousSocketChannel clientChannel;
    private CountDownLatch latch;
    public ReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
        this.clientChannel = clientChannel;
        this.latch = latch;
    }
    @Override
    public void completed(Integer result,ByteBuffer buffer) {
        buffer.flip();
        byte[] bytes = new byte[buffer.remaining()];
        buffer.get(bytes);
        String body;
        try {
            body = new String(bytes,"UTF-8");
            System.out.println("客戶端收到結果:"+ body);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
    }
    @Override
    public void failed(Throwable exc,ByteBuffer attachment) {
        System.err.println("數據讀取失敗...");
        try {
            clientChannel.close();
            latch.countDown();
        } catch (IOException e) {
        }
    }
}

測試類

public class Test {
    //測試主方法
    @SuppressWarnings("resource")
    public static void main(String[] args) throws Exception {
        //運行服務器
        Server.start();
        //避免客戶端先於服務器啟動前執行代碼
        Thread.sleep(100);
        //運行客戶端
        Client.start();
        System.out.println("請輸入請求消息:");
        Scanner scanner = new Scanner(System.in);
        while (Client.sendMsg(scanner.nextLine())) ;
    }
}
public final class Caculator {
    private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
    public static Object cal(String expression) throws ScriptException {
        return jse.eval(expression);
    }
}

 

參考:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM