手動搭建I/O網絡通信框架2:BIO編程模型實現群聊


第一章:手動搭建I/O網絡通信框架1:Socket和ServerSocket入門實戰,實現單聊

第三章:手動搭建I/O網絡通信框架3:NIO編程模型,升級改造聊天室

第四章:手動搭建I/O網絡通信框架4:AIO編程模型,聊天室終極改造

  在第一章中運用Socket和ServerSocket簡單的實現了網絡通信。這一章,利用BIO編程模型進行升級改造,實現群聊聊天室。

  所謂BIO,就是Block IO,阻塞式的IO。這個阻塞主要發生在:ServerSocket接收請求時(accept()方法)、InputStream、OutputStream(輸入輸出流的讀和寫)都是阻塞的。這個可以在下面代碼的調試中發現,比如在客戶端接收服務器消息的輸入流處打上斷點,除非服務器發來消息,不然斷點是一直停在這個地方的。也就是說這個線程在這時間是被阻塞的。

  

  如圖:當一個客戶端請求進來時,接收器會為這個客戶端分配一個工作線程,這個工作線程專職處理客戶端的操作。在上一章中,服務器接收到客戶端請求后就跑去專門服務這個客戶端了,所以當其他請求進來時,是處理不到的。

  看到這個圖,很容易就會想到線程池,BIO是一個相對簡單的模型,實現它的關鍵之處也在於線程池。

  在上代碼之前,先大概說清楚每個類的作用,以免弄混淆。更詳細的說明,都寫在注釋當中。

  服務器端:

  ChatServer:這個類的作用就像圖中的Acceptor。它有兩個比較關鍵的全局變量,一個就是存儲在線用戶信息的Map,一個就是線程池。這個類會監聽端口,接收客戶端的請求,然后為客戶端分配工作線程。還會提供一些常用的工具方法給每個工作線程調用,比如:發送消息、添加在線用戶等。我之前簡單用過Netty和WebSocket,這個類看上去就已經和這些框架有點相似了。學習IO編程模型也是為了接下來深入學習Netty做准備。

  ChatHandler:這個類就是工作線程的類。在這個項目中,它的工作很簡單:把接收到的消息轉發給其他客戶端,當然還有一些小功能,比如添加\移除在線用戶。

  客戶端:

  相較於服務器,客戶端的改動較小,主要是把等待用戶輸入信息這個功能分到其他線程做,不然這個功能會一直阻塞主線程,導致無法接收其他客戶端的消息。

  ChatClient:客戶端啟動類,也就是主線程,會通過Socket和服務器連接。也提供了兩個工具方法:發送消息和接收消息。

  UserInputHandler:專門負責等待用戶輸入信息的線程,一旦有信息鍵入,就馬上發送給服務器。

  首先創建兩個包區分一下客戶端和服務器,client和server

  服務器端ChatServer:

public class ChatServer {
    private int DEFAULT_PORT = 8888;
    /**
     * 創建一個Map存儲在線用戶的信息。這個map可以統計在線用戶、針對這些用戶可以轉發其他用戶發送的消息
     * 因為會有多個線程操作這個map,所以為了安全起見用ConcurrentHashMap
     * 在這里key就是客戶端的端口號,但在實際中肯定不會用端口號區分用戶,如果是web的話一般用session。
     * value是IO的Writer,用以存儲客戶端發送的消息
     */
    private Map<Integer, Writer> map=new ConcurrentHashMap<>();
    /**
     * 創建線程池,線程上限為10個,如果第11個客戶端請求進來,服務器會接收但是不會去分配線程處理它。
     * 前10個客戶端的聊天記錄,它看不見。當有一個客戶端下線時,這第11個客戶端就會被分配線程,服務器顯示在線
     * 大家可以把10再設置小一點,測試看看
     * */
    private ExecutorService executorService= Executors.newFixedThreadPool(10);
    //客戶端連接時往map添加客戶端
    public void addClient(Socket socket) throws IOException {
        if (socket != null) {
            BufferedWriter writer = new BufferedWriter(
                    new OutputStreamWriter(socket.getOutputStream())
            );
            map.put(socket.getPort(), writer);
            System.out.println("Client["+socket.getPort()+"]:Online");
        }
    }

    //斷開連接時map里移除客戶端
    public void removeClient(Socket socket) throws Exception {
        if (socket != null) {
            if (map.containsKey(socket.getPort())) {
                map.get(socket.getPort()).close();
                map.remove(socket.getPort());
            }
            System.out.println("Client[" + socket.getPort() + "]Offline");
        }
    }

    //轉發客戶端消息,這個方法就是把消息發送給在線的其他的所有客戶端
    public void sendMessage(Socket socket, String msg) throws IOException {
        //遍歷在線客戶端
        for (Integer port : map.keySet()) {
            //發送給在線的其他客戶端
            if (port != socket.getPort()) {
                Writer writer = map.get(port);
                writer.write(msg);
                writer.flush();
            }
        }
    }

    //接收客戶端請求,並分配Handler去處理請求
    public void start() {
        try (ServerSocket serverSocket = new ServerSocket(DEFAULT_PORT)) {
            System.out.println("Server Start,The Port is:"+DEFAULT_PORT);
            while (true){
                //等待客戶端連接
                Socket socket=serverSocket.accept();
                //為客戶端分配一個ChatHandler線程
                executorService.execute(new ChatHandler(this,socket));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        ChatServer server=new ChatServer();
        server.start();
    }
}

  服務器端ChatHandler:

public class ChatHandler implements Runnable {
    private ChatServer server;
    private Socket socket;

    //構造函數,ChatServer通過這個分配Handler線程
    public ChatHandler(ChatServer server, Socket socket) {
        this.server = server;
        this.socket = socket;
    }

    @Override
    public void run() {
        try {
            //往map里添加這個客戶端
            server.addClient(socket);
            //讀取這個客戶端發送的消息
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );
            String msg = null;
            while ((msg = reader.readLine()) != null) {
                //這樣拼接是為了讓其他客戶端也能看清是誰發送的消息
                String sendmsg = "Client[" + socket.getPort() + "]:" + msg;
                //服務器打印這個消息
                System.out.println(sendmsg);
                //將收到的消息轉發給其他在線客戶端
                server.sendMessage(socket, sendmsg + "\n");
                if (msg.equals("quit")) {
                    break;
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            //如果用戶退出或者發生異常,就在map中移除該客戶端
            try {
                server.removeClient(socket);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

  客戶端ChatClient:

public class ChatClient {
    private BufferedReader reader;
    private BufferedWriter writer;
    private Socket socket;
    //發送消息給服務器
    public void sendToServer(String msg) throws IOException {
        //發送之前,判斷socket的輸出流是否關閉
        if (!socket.isOutputShutdown()) {
            //如果沒有關閉就把用戶鍵入的消息放到writer里面
            writer.write(msg + "\n");
            writer.flush();
        }
    }
    //從服務器接收消息
    public String receive() throws IOException {
        String msg = null;
        //判斷socket的輸入流是否關閉
        if (!socket.isInputShutdown()) {
            //沒有關閉的話就可以通過reader讀取服務器發送來的消息。注意:如果沒有讀取到消息線程會阻塞在這里
            msg = reader.readLine();
        }
        return msg;
    }

    public void start() {
        //和服務創建連接
        try {
            socket = new Socket("127.0.0.1", 8888);
            reader=new BufferedReader(
                    new InputStreamReader(socket.getInputStream())
            );
            writer=new BufferedWriter(
                    new OutputStreamWriter(socket.getOutputStream())
            );
            //新建一個線程去監聽用戶輸入的消息
            new Thread(new UserInputHandler(this)).start();
            /**
             * 不停的讀取服務器轉發的其他客戶端的信息
             * 記錄一下之前踩過的小坑:
             * 這里一定要創建一個msg接收信息,如果直接用receive()方法判斷和輸出receive()的話會造成有的消息不會顯示
             * 因為receive()獲取時,在返回之前是阻塞的,一旦接收到消息才會返回,也就是while這里是阻塞的,一旦有消息就會進入到while里面
             * 這時候如果輸出的是receive(),那么上次獲取的信息就會丟失,然后阻塞在System.out.println
             * */
            String msg=null;
            while ((msg=receive())!=null){
                System.out.println(msg);
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            try {
               if(writer!=null){
                   writer.close();
               }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new ChatClient().start();
    }
}

  客戶端UserInputHandler:

public class UserInputHandler implements Runnable {
    private ChatClient client;

    public UserInputHandler(ChatClient client) {
        this.client = client;
    }

    @Override
    public void run() {
        try {
            //接收用戶輸入的消息
            BufferedReader reader = new BufferedReader(
                    new InputStreamReader(System.in)
            );
            //不停的獲取reader中的System.in,實現了等待用戶輸入的效果
            while (true) {
                String input = reader.readLine();
                //向服務器發送消息
                client.sendToServer(input);
                if (input.equals("quit"))
                    break;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

 

  運行測試:

  通過打開終端,通過javac編譯。如果大家是在IDEA上編碼的話可能會報編碼錯誤,在javac后面加上-encoding utf-8再接java文件就好了。

  編譯后運行,通過java運行時,又遇到了一個坑。會報找不到主類的錯誤,原來是因為加上兩個包,要在class文件名前面加上包名。比如當前在src目錄,下面有client和server兩個包,要這么運行:java client.XXXX。可我之前明明在client文件夾下運行的java,也是不行,不知道為什么。

  接着測試:

  1.首先在一個終端里運行ChatServer,打開服務器

  2.在第二個終端里打開ChatClient,暫且叫A,此時服務器的終端顯示:

  3.類似的,在第三個終端里打開ChatClient,暫且叫B,此時服務器顯示:

  4.A中輸入hi,除了服務器會打印hi外,B中也會顯示,圖片中的端口號和前面的不一樣,是因為中間出了點小問題,前三張截圖和后面的不是同時運行的。實際中同一個客戶端會顯示一樣的端口號:

  5.當客戶端輸入quit時就會斷開連接,最后,服務器的顯示為:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM