【原創】基於Java NIO的多人在線聊天工具源碼實現(登錄,單聊,群聊)


  近來在學習Java NIO網絡開發知識,寫了一個基於Java NIO的多人在線聊天工具MyChat練練手。源碼公開在Coding上:

https://coding.net/u/hust_wsh/p/MyChat/git ,開發環境是Ubuntu14.04+Eclipse Mars+JDK1.8。

       編寫一個基於Java NIO的多人在線聊天工具,需要以下幾方面的知識:客戶端服務器模型,Java NIO中的Selector,Channel,ByteBuffer,Collections以及序列化和反序列化的知識。下面來對照源碼逐一剖析MyChat的源碼構成:

一.服務器

  為了便於實時分析服務器在線人數和聊天室列表,需要在服務器端提供一個交互接口,也就是獲取System.in的輸入,執行相應的操作,如下所示:  

                System.out.println("===輸入選擇項===");
                System.out.println("1.獲取用戶列表;2.獲取聊天室列表;3.獲取指定聊天室成員;4.關閉服務器");
                boolean isExit=false;
                Scanner scanner=new Scanner(System.in);

  由於主線程需要運行交互界面,這樣一來執行與客戶端交互任務的代碼就要放在另外一個線程中了:

                ChatServer server=new ChatServer();
                Thread serverThread=new Thread(server,"聊天服務器");
                serverThread.setDaemon(true);//后台進程
                serverThread.start();

  接下來將分別介紹服務器端的實現類ChatServer的關鍵成員:

private Selector mSelector=null;//用於注冊所有連接到服務器的SocketChannel對象
//保存所有用戶的Map private Map<String,UserEntity> mUsers=Collections.synchronizedMap(new HashMap<String,UserEntity>());

//保存所有聊天室的Map
private Map<String,ChatRoom> mRooms=Collections.synchronizedMap(new HashMap<String,ChatRoom>());//聊天室

  第一個成員變量mSelector是一個Selector對象,用於管理所有連接到服務器的Channel,為了管理多個通道的讀寫,要將不同的通道注冊到一個Selector對象上。每個通道分配有一個SelectionKey。然后程序可以詢問這個Selector對象,哪些通道已經准備就緒可以無阻塞的完成你希望完成的操作,可以請求Selector對象返回相應的鍵集合。通過調用Selector類的唯一構造函數:靜態工廠方法open()來創建新的選擇器,並通過register()方法注冊通道。

            mSelector=Selector.open();
            ServerSocketChannel server=ServerSocketChannel.open();
            InetSocketAddress isa=new InetSocketAddress(mHost, mPort);
            server.bind(isa);//綁定指定端口
            server.configureBlocking(false);
            server.register(mSelector, SelectionKey.OP_ACCEPT);
            System.out.println("服務器在"+mPort+"端口啟動成功");

  注冊成功后,就可以通過Selector的select()方法查詢已經就緒的通道。

            while(mSelector.select()>0)
            {
                Iterator<SelectionKey> iterator=mSelector.selectedKeys().iterator();
                while(iterator.hasNext())
                {
                    SelectionKey sk=iterator.next();
                    iterator.remove();

  select()方法用於查詢注冊到Selector上的待處理的就緒Channel,是一個阻塞方法,直到至少有一個注冊的Channel准備好之后就可以進行處理。SelectionKey對象相當於通道的指針,可以保存通道的連接狀態。Selector對象的selectedKeys()方法可以返回所有注冊Channel的SelectionKey。接下來可以通過isAccetable(),isReadable(),isWritable()等方法測試該鍵能進行的操作。

  ServerSocketChannel類只有一個目的:接受入站連接。通過注冊到Selector對象來獲取入站連接通知,如下所示:

                    if(sk.isAcceptable())
                    {
                        SocketChannel sc=server.accept();//開始接收客戶端連接
                        sc.configureBlocking(false);
                        sc.register(mSelector, SelectionKey.OP_READ);
                        sk.interestOps(SelectionKey.OP_ACCEPT);
                    }

  接下來可以通過sk.isReadable()進入處理客戶端數據的代碼塊:

                    if(sk.isReadable())//有數據
                    {
                        SocketChannel sc=(SocketChannel)sk.channel();
                        ByteBuffer buffer=ByteBuffer.allocate(1024);
                        ByteArrayOutputStream boStream=new ByteArrayOutputStream();
                        try 
                        {
                            while(sc.read(buffer)>0)//TODO:性能問題
                            {
                                buffer.flip();
                                boStream.write(Arrays.copyOfRange(buffer.array(), 0, buffer.limit()));
                            }
                            byte[] frame=boStream.toByteArray();
                            boStream.close();

  為了能進一步講明白為什么需要上面這種方式讀取客戶端信息,這里先插入講解一下服務器和客戶端交互的信使類Message。為了提升擴展性,我定義了一個Serializable類Message,用於服務器和客戶端之間進行交互(如登錄,返回結果,創建聊天室等)。Message類的定義如下:

 1 class Message implements Serializable
 2 {
 3     private static final long serialVersionUID = 1L;
 4     private Map<FieldType,String> fields=new HashMap<>();//TODO:泛型支持,任意消息類型,包括文本,圖片,語音,視頻,文件等
 5     private Commands command;
 6     public Message(Commands command)
 7     {
 8         this.command=command;
 9     }
10     public Commands getCommand()
11     {
12         return this.command;
13     }
14     public Message set(FieldType key,String value)
15     {
16         if(key!=null&&value!=null)
17         {
18             fields.put(key,value);
19         }
20         return this;
21     }
22     public String get(FieldType key)
23     {
24         return fields.get(key);
25     }
26     
27     public byte[] toBytes()
28     {
29         return SerializeHelper.serialize(this);
30     }
31     
32     public ByteBuffer wrap()
33     {
34         byte[] frame=toBytes();
35         return ByteBuffer.wrap(frame);
36     }
37 }

  其中有兩個關鍵的成員:一個Map型的用於保存數據的field成員和一個枚舉類型的用於表明命令類型的command成員。其中Command枚舉定義如下:

enum Commands{
    LOG_IN,
    LOG_OUT,
    QUERY_USERS,
    QUERY_ALL_CHAT_ROOMS,        
    QUERY_MY_CHAT_ROOMS,
    QUERY_ROOM_MEMBERS,
    HEART_BEAT,
    MSG_P2P,//個人對個人的消息
    MSG_P2R,//聊天室消息
    CREATE_CHAT_ROOM,
    JOIN_CHAT_ROOM,
    LEAVE_CHAT_ROOM,
    SET_USER_NAME; 
};

  另外,為了指名攜帶數據的類型,定義了一個FieldType枚舉,如下:

enum FieldType{
    USER_ID,
    USER_NAME,
    PASS_WD,
    PEER_ID,//單聊對象的ID
    ROOM_ID,//聊天室ID
    USER_LIST,//用戶列表
    ROOM_LIST_ALL,//所有房間列表
    ROOM_LIST_ME,//我的聊天室列表
    ROOM_MEMBERS,//用戶列表
    MSG_TXT,
    RESPONSE_STATUS,
    ENCODING;
};

  這樣一來,服務器和客戶端就可以通過這種可序列化的Message相互通信了。具體就是客戶端將要發送給服務器的數據封裝在Message對象中后,通過SocketChanne發送到服務器,服務器收到數據后通過反序列化獲取原始的Message對象,並根據command成員來判斷接收到的是什么類型的Message,如登錄,點對點消息等。

Message msg=(Message)SerializeHelper.deSerialize(frame);
if(msg!=null)
{
    String userId=msg.get(FieldType.USER_ID);
    switch (msg.getCommand()) {
    case LOG_IN:
    {
        System.out.println("用戶"+userId+"請求登錄...");
        Message message=new Message(Commands.LOG_IN);
        //TODO:檢查用戶名密碼,暫時沒有注冊功能,就只檢測用戶名是否重復
        if(!mUsers.containsKey(userId))
        {
            message.set(FieldType.RESPONSE_STATUS,"成功");
            System.out.println("用戶"+userId+"登錄成功");
            UserEntity user=new UserEntity(userId,sc);
            mUsers.put(userId,user);
        }
        else {
            message.set(FieldType.RESPONSE_STATUS,"該帳號已經登錄");
        }                                        
        //發送登錄結果                                        
        sendRawMessage(sc, message);
        break;
    }

  這里出現的mUsers對象,就是我要介紹的服務器端第二個重要的成員變量,mUsers是一個用Collections.synchronizedSet封裝的支持多線程訪問的HashSet,用於保存[用戶ID->用戶對象]的映射。所謂用戶對象就是另外定義的一個用於保存用戶基本信息的類,其中包含了用戶的id,passwd,對應的SocketChannel和所加入的聊天室集合。如下所示:

class UserEntity{
    private String mUserId;
    private String mPassWd;
    private SocketChannel mSocketChannel;
    private Set<String> mJoinedRooms=Collections.synchronizedSet(new HashSet<String>());

  服務器端還有一個重要的成員變量,用於保存服務器端所有聊天室的集合,也是一個用Collections.synchronizedSet封裝的HashSet,用於保存[聊天室ID->聊天室對象]的映射。聊天室對象是專門定義的一個保存聊天室基本信息的類,其中包含了聊天室id,聊天室成員集合。如下所示:

final class ChatRoom {
    private String mRoomId=null;
    private Set<String> mUsers=Collections.synchronizedSet(new HashSet<String>());

  到此,服務器端的代碼基本剖析完畢,接下來我們看看客戶端的代碼。

二.客戶端

  客戶端的代碼相對服務器來說要簡單許多,一個典型的NIO客戶端程序連接服務器流程如下所示:

        mSelector=Selector.open();
        InetSocketAddress remote=new InetSocketAddress(host, port);
        mSocketChannel=SocketChannel.open(remote);
        mSocketChannel.configureBlocking(false);
        mSocketChannel.register(mSelector, SelectionKey.OP_READ);

  其中注冊Selector的接口幾乎與服務器一致,除了傳遞給register方法的第二個參數不同。注冊完通道后就可以向服務器發送登錄請求了:

        Message message=new Message(Commands.LOG_IN);
        message.set(FieldType.USER_ID, userid);
        message.set(FieldType.PASS_WD, passwd);
        sendRawMessage(message);

  其中sendRawMessage是一個私有方法,用於將Message序列化后使用ByteBuffer通過SocketChannel發送到服務器端,具體代碼如下:

    private void sendRawMessage(Message message)
    {
        if(mSocketChannel!=null&&message!=null)
        {
            try {
                mSocketChannel.write(message.wrap());
            } catch (Exception e) {
                e.printStackTrace();
            }
            
        }
    }

  我為Message類設計了一個wrap()方法可以將Message序列化后的byte[]包裝成ByteBuffer返回,從而可以直接作為SocketChannel.write()方法的參數。具體代碼可以參考文章開頭的Git倉庫。

  與服務器一樣,客戶端需要接收用戶輸入,從而也將與服務器交互的部分放在單獨的線程運行。我將這個線程類放在ChatClient類的內部作為嵌套類,這樣可以直接訪問外部類的成員變量,為線程之間通信提供便利。

三.實例分析

  介紹完服務器和客戶端的設計之后,下面以創建聊天室為例詳細介紹客戶端和服務器端的通信流程。

  當客戶端登錄到服務器中后,服務器會保存客戶端的用戶ID以及對應的SocketChannel信息,客戶端通過一條創建聊天室的Message向服務器申請創建聊天室:

        Message message=new Message(Commands.CREATE_CHAT_ROOM);
        message.set(FieldType.USER_ID,mUserId );
        message.set(FieldType.ROOM_ID, roomId);
        sendRawMessage(message);

  如上所示,該Message的命令字是Commands.CREATE_CHAT_ROOM,包含了兩個域,分別是創建者的ID和待創建的房間ID(這里為了設計簡便,將ID和名稱等同為一個概念,實際中ID應該是一個唯一的整型量,名稱是聊天室的名字,可以重復)。服務器端通過反序列化Message,並提取對應的命令字進入對應的處理邏輯:

case CREATE_CHAT_ROOM:
{
    System.out.println("用戶"+userId+"請求創建聊天室");
    String roomId=msg.get(FieldType.ROOM_ID);
    Message message=new Message(Commands.CREATE_CHAT_ROOM);
    if(!StringHelper.isNullOrTrimEmpty(roomId))
    {
        if(!mRooms.containsKey(roomId))
        {
            ChatRoom room=new ChatRoom(roomId);
            room.addUser(userId);                
            mRooms.put(roomId, room);
            UserEntity user=mUsers.get(userId);
            if(user!=null)
                user.joinRoom(roomId);
            message.set(FieldType.RESPONSE_STATUS, "成功");
        }
        else
        {
            message.set(FieldType.RESPONSE_STATUS, "創建失敗,已存在同名聊天室");
        }
    }
    else//返回錯誤消息
    {
        message.set(FieldType.RESPONSE_STATUS, "創建失敗,聊天室名稱不能為空");
    }
    sendRawMessage(sc, message);
    break;
}

  我們來仔細分析下上面的代碼。首先從Message中提取到了userId和roomId,然后判斷服務器端mRooms集合是否已經存在同名聊天室,如果不存在,則創建一個新的聊天室:ChatRoom room=new Chat(roomId)。並將創建者本人加入到聊天室用戶列表中:room.addUser(userId)。同時,為了方便查找用戶加入的所有聊天室,還將該聊天室的ID通過UserEntity的joinRoom()方法保存到了UserEntity的聊天室集合中,最后將表示正確結果的Message發送給請求客戶端;反之如果已經存在同名聊天室,則將包含錯誤信息的Message發送給客戶端。而客戶端負責與服務器端交互的線程則通過反序列化Message獲取操作結果,並顯示給用戶。

  為了更加直觀地展示MyChat的工作流程,將終端運行的結果整了幾張截圖附在下面:

客戶端1:

客戶端2:

服務器端:

本文為原創,轉載請聲明:轉載自hust_wsh的技術博客:http://www.cnblogs.com/hust_wsh/p/5166001.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM