手動搭建I/O網絡通信框架4:AIO編程模型,聊天室終極改造


第一章:手動搭建I/O網絡通信框架1:Socket和ServerSocket入門實戰,實現單聊

第二章:手動搭建I/O網絡通信框架2:BIO編程模型實現群聊

第三章:手動搭建I/O網絡通信框架3:NIO編程模型,升級改造聊天室

  上一章講到的NIO編程模型比較主流,非常著名的Netty就是基於NIO編程模型的。這一章說的是AIO編程模型,是異步非阻塞的。雖然同樣實現的是聊天室功能,但是實現邏輯上稍微要比NIO和BIO復雜一點。不過理好整體脈絡,會好理解一些。首先還是講講概念:

  BIO和NIO的區別是阻塞和非阻塞,而AIO代表的是異步IO。在此之前只提到了阻塞和非阻塞,沒有提到異步還是同步。可以用我在知乎上看到的一句話表示:【在處理 IO 的時候,阻塞和非阻塞都是同步 IO,只有使用了特殊的 API 才是異步 IO】。這些“特殊的API”下面會講到。在說AIO之前,先總結一下阻塞非阻塞、異步同步的概念。

  阻塞和非阻塞,描述的是結果的請求阻塞:在得到結果之前就一直呆在那,啥也不干,此時線程掛起,就如其名,線程被阻塞了。非阻塞:如果沒得到結果就返回,等一會再去請求,直到得到結果為止。異步和同步,描述的是結果的發出,當調用方的請求進來。同步:在沒獲取到結果前就不返回給調用方,如果調用方是阻塞的,那么調用方就會一直等着。如果調用方是非阻塞的,調用方就會先回去,等一會再來問問得到結果沒。異步:調用方一來,會直接返回,等執行完實際的邏輯后在通過回調函數把結果返回給調用方。

  AIO中的異步操作

  CompletionHandler

  在AIO編程模型中,常用的API,如connect、accept、read、write都是支持異步操作的。當調用這些方法時,可以攜帶一個CompletionHandler參數,它會提供一些回調函數。這些回調函數包括:1.當這些操作成功時你需要怎么做;2.如果這些操作失敗了你要這么做。關於這個CompletionHandler參數,你只需要寫一個類實現CompletionHandler口,並實現里面兩個方法就行了。

  那如何在調用connect、accept、read、write這四個方法時,傳入CompletionHandler參數從而實現異步呢?下面分別舉例這四個方法的使用。

  先說說Socket和ServerSocket,在NIO中,它們變成了通道,配合緩沖區,從而實現了非阻塞。而在AIO中它們變成了異步通道。也就是AsynchronousServerSocketChannel和AsynchronousSocketChannel,下面例子中對象名分別是serverSocket和socket.

  accept:serverSocket.accept(attachment,handler)。handler就是實現了CompletionHandler接口並實現兩個回調函數的類,它具體怎么寫可以看下面的實戰代碼。attachment為handler里面可能需要用到的輔助數據,如果沒有就填null。

  read:socket.read(buffer,attachment,handler)。buffer是緩沖區,用以存放讀取到的信息。后面兩個參數和accept一樣。

  write:socket.write(buffer,attachment,handler)。和read參數一樣。

  connect:socket.connect(address,attachment,handler)。address為服務器的IP和端口,后面兩個參數與前幾個一樣。

  Future

  既然說到了異步操作,除了使用實現CompletionHandler接口的方式,不得不想到Future。客戶端邏輯較為簡單,如果使用CompletionHandler的話代碼反而更復雜,所以下面的實戰客戶端代碼就會使用Future的方式。簡單來說,Future表示的是異步操作未來的結果,怎么理解未來。比如,客戶端調用read方法獲取服務器發來得消息:

Future<Integer> readResult=clientChannel.read(buffer)

  Integer是read()的返回類型,此時變量readResult實際上並不一定有數據,而是表示read()方法未來的結果,這時候readResult有兩個方法,isDone():返回boolean,查看程序是否完成處理,如果返回true,有結果了,這時候可以通過get()獲取結果。如果你不事先判斷isDone()直接調用get()也行,只不過它是阻塞的。如果你不想阻塞,想在這期間做點什么,就用isDone()。

  還有一個問題:這些handler的方法是在哪個線程執行的?serverSocket.accept這個方法肯定是在主線程里面調用的,而傳入的這些回調方法其實是在其他線程執行的。在AIO中,會有一個AsynchronousChannelGroup,它和AsynchronousServerSocketChannel是綁定在一起的,它會為這些異步通道提供系統資源,線程就算其中一種系統資源,所以為了方便理解,我們暫時可以把他看作一個線程池,它會為這些handler分配線程,而不是在主線程中去執行。

   AIO編程模型

  上面只說了些零碎的概念,為了更好的理解,下面講一講大概的工作流程(主要針對服務器,客戶端邏輯較為簡單,代碼注釋也比較少,可以看前面幾章):

  1.首先做准備工作。跟NIO一樣,先要創建好通道,只不過AIO是異步通道。然后創建好AsyncChannelGroup,可以選擇自定義線程池。最后把AsyncServerSocket和AsyncChannelGroup綁定在一起,這樣處於同一個AsyncChannelGroup里的通道就可以共享系統資源。

  2.最后一步准備工作,創建好handler類,並實現接口和里面兩個回調方法。(如圖:客戶端1對應的handler,里面的回調方法會實現讀取消息和轉發消息的功能;serverSocket的handler里的回調方法會實現accept功能。)

  3.准備工作完成,當客戶端1連接請求進來,客戶端會馬上回去,ServerSocket的異步方法會在連接成功后把客戶端的SocketChannel存進在線用戶列表,並利用客戶端1的handler開始異步監聽客戶端1發送的消息。

  4.當客戶端1發送消息時,如果上一步中的handler成功監聽到,就會回調成功后的回調方法,這個方法里會把這個消息轉發給其他客戶端。轉發完成后,接着利用handler監聽客戶端1發送的消息。

 

  代碼一共有三個類:

  ChatServer:功能基本上和上面講的工作流程差不多,還會有一些工具方法,都比較簡單,就不多說了,如:轉發消息,客戶端下線后從在線列表移除客戶端等。

  ChatClient:基本和前兩章的BIO、NIO沒什么區別,一個線程監聽用戶輸入信息並發送,主線程異步的讀取服務器信息。

  UserInputHandler:監聽用戶輸入信息的線程。

  ChatServer

public class ChatServer {
    //設置緩沖區字節大小
    private static final int BUFFER = 1024;

    //聲明AsynchronousServerSocketChannel和AsynchronousChannelGroup
    private AsynchronousServerSocketChannel serverSocketChannel;
    private AsynchronousChannelGroup channelGroup;

    //在線用戶列表。為了並發下的線程安全,所以使用CopyOnWriteArrayList
    //CopyOnWriteArrayList在寫時加鎖,讀時不加鎖,而本項目正好在轉發消息時需要頻繁讀取.
    //ClientHandler包含每個客戶端的通道,類型選擇為ClientHandler是為了在write的時候調用每個客戶端的handler
    private CopyOnWriteArrayList<ClientHandler> clientHandlerList;
    //字符和字符串互轉需要用到,規定編碼方式,避免中文亂碼
    private Charset charset = Charset.forName("UTF-8");

    //通過構造函數設置監聽端口
    private int port;
    public ChatServer(int port) {
        this.port = port;
        clientHandlerList=new CopyOnWriteArrayList<>();
    }

    public void start() {
        try {
            /**
             *創建一個線程池並把線程池和AsynchronousChannelGroup綁定,前面提到了AsynchronousChannelGroup包括一些系統資源,而線程就是其中一種。
             *為了方便理解我們就暫且把它當作線程池,實際上並不止包含線程池。如果你需要自己選定線程池類型和數量,就可以如下操作
             *如果不需要自定義線程池類型和數量,可以不用寫下面兩行代碼。
             * */
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            channelGroup = AsynchronousChannelGroup.withThreadPool(executorService);
            serverSocketChannel=AsynchronousServerSocketChannel.open(channelGroup);
            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1",port));
            System.out.println("服務器啟動:端口【"+port+"】");
            /**
             * AIO中accept可以異步調用,就用上面說到的CompletionHandler方式
             * 第一個參數是輔助參數,回調函數中可能會用上的,如果沒有就填null;第二個參數為CompletionHandler接口的實現
             * 這里使用while和System.in.read()的原因:
             * while是為了讓服務器保持運行狀態,前面的NIO,BIO都有用到while無限循環來保持服務器運行,但是它們用的地方可能更好理解
             * System.in.read()是阻塞式的調用,只是單純的避免無限循環而讓accept頻繁被調用,無實際業務功能。
             */
            while (true) {
                serverSocketChannel.accept(null, new AcceptHandler());
                System.in.read();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            if(serverSocketChannel!=null){
                try {
                    serverSocketChannel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    //AsynchronousSocketChannel為accept返回的類型,Object為輔助參數類型,沒有就填Object
    private class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel,Object>{
        //如果成功,執行的回調方法
        @Override
        public void completed(AsynchronousSocketChannel clientChannel, Object attachment) {
            //如果服務器沒關閉,在接收完當前客戶端的請求后,再次調用,以接着接收其他客戶端的請求
            if(serverSocketChannel.isOpen()){
                serverSocketChannel.accept(null,this);
            }
            //如果客戶端的channel沒有關閉
            if(clientChannel!=null&&clientChannel.isOpen()){
                //這個就是異步read和write要用到的handler,並傳入當前客戶端的channel
                ClientHandler handler=new ClientHandler(clientChannel);
                //把新用戶添加到在線用戶列表里
                clientHandlerList.add(handler);
                System.out.println(getPort(clientChannel)+"上線啦!");
                ByteBuffer buffer=ByteBuffer.allocate(BUFFER);
                //異步調用read,第一個buffer是存放讀到數據的容器,第二個是輔助參數。
                //因為真正的處理是在handler里的回調函數進行的,輔助參數會直接傳進回調函數,所以為了方便使用,buffer就當作輔助參數
                clientChannel.read(buffer,buffer,handler);
            }
        }
        //如果失敗,執行的回調方法
        @Override
        public void failed(Throwable exc, Object attachment) {
            System.out.println("連接失敗"+exc);
        }
    }

    private class ClientHandler implements CompletionHandler<Integer, ByteBuffer>{
        private AsynchronousSocketChannel clientChannel;
        public ClientHandler(AsynchronousSocketChannel clientChannel) {
            this.clientChannel = clientChannel;
        }
        @Override
        public void completed(Integer result, ByteBuffer buffer) {
            if(buffer!=null){
                //如果read返回的結果小於等於0,而buffer不為空,說明客戶端通道出現異常,做下線操作
                if(result<=0){
                    removeClient(this);
                }else {
                    //轉換buffer讀寫模式並獲取消息
                    buffer.flip();
                    String msg=String.valueOf(charset.decode(buffer));
                    //在服務器上打印客戶端發來的消息
                    System.out.println(getPort(clientChannel)+msg);
                    //把消息轉發給其他客戶端
                    sendMessage(clientChannel,getPort(clientChannel)+msg);
                    buffer=ByteBuffer.allocate(BUFFER);

                    //如果用戶輸入的是退出,就從在線列表里移除。否則接着監聽這個用戶發送消息
                    if(msg.equals("quit"))
                        removeClient(this);
                    else
                        clientChannel.read(buffer, buffer, this);
                }
            }
        }

        @Override
        public void failed(Throwable exc, ByteBuffer attachment) {
            System.out.println("客戶端讀寫異常:"+exc);
        }
    }

    //轉發消息的方法
    private void sendMessage(AsynchronousSocketChannel clientChannel,String msg){
        for(ClientHandler handler:clientHandlerList){
            if(!handler.clientChannel.equals(clientChannel)){
                ByteBuffer buffer=charset.encode(msg);
                //write不需要buffer當輔助參數,因為寫到客戶端的通道就完事了,而讀還需要回調函數轉發給其他客戶端。
                handler.clientChannel.write(buffer,null,handler);
            }
        }
    }
    //根據客戶端channel獲取對應端口號的方法
    private String getPort(AsynchronousSocketChannel clientChannel){
        try {
            InetSocketAddress address=(InetSocketAddress)clientChannel.getRemoteAddress();
            return "客戶端["+address.getPort()+"]:";
        } catch (IOException e) {
            e.printStackTrace();
            return "客戶端[Undefined]:";
        }
    }
    //移除客戶端
    private void removeClient(ClientHandler handler){
        clientHandlerList.remove(handler);
        System.out.println(getPort(handler.clientChannel)+"斷開連接...");
        if(handler.clientChannel!=null){
            try {
                handler.clientChannel.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new ChatServer(8888).start();
    }
}

  ChatClient

public class ChatClient {
    private static final int BUFFER = 1024;
    private AsynchronousSocketChannel clientChannel;
    private Charset charset = Charset.forName("UTF-8");

    private String host;
    private int port;
    //設置服務器IP和端口
    public ChatClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() {
        try {
            clientChannel = AsynchronousSocketChannel.open();
            //連接服務器
            Future<Void> future = clientChannel.connect(new InetSocketAddress(host, port));
            future.get();
            //新建一個線程去等待用戶輸入
            new Thread(new UserInputHandler(this)).start();
            ByteBuffer buffer=ByteBuffer.allocate(BUFFER);
            //無限循環讓客戶端保持運行狀態
            while (true){
                //獲取服務器發來的消息並存入到buffer
                Future<Integer> read=clientChannel.read(buffer);
                if(read.get()>0){
                    buffer.flip();
                    String msg=String.valueOf(charset.decode(buffer));
                    System.out.println(msg);
                    buffer.clear();
                }else {
                    //如果read的結果小於等於0說明和服務器連接出現異常
                    System.out.println("服務器斷開連接");
                    if(clientChannel!=null){
                        clientChannel.close();
                    }
                    System.exit(-1);
                }
            }
        } catch (IOException | InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }
    }

    public void send(String msg) {
        if (msg.isEmpty())
            return;
        ByteBuffer buffer = charset.encode(msg);
        Future<Integer> write=clientChannel.write(buffer);
        try {
            //獲取發送結果,如果get方法發生異常說明發送失敗
            write.get();
        } catch (ExecutionException|InterruptedException e) {
            System.out.println("消息發送失敗");
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new ChatClient("127.0.0.1",8888).start();
    }
}

  UserInputHandler

public class UserInputHandler implements Runnable {
    ChatClient client;
    public UserInputHandler(ChatClient chatClient) {
        this.client=chatClient;
    }
    @Override
    public void run() {
        BufferedReader read=new BufferedReader(
                new InputStreamReader(System.in)
        );
        while (true){
            try {
                String input=read.readLine();
                client.send(input);
                if(input.equals("quit"))
                    break;
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

 

  運行測試:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM