實戰即時聊天,一文說明白:聊天服務器+聊天客戶端+Web管理控制台。


一、前言

  說實話,寫這個玩意兒是我上周剛剛產生的想法,本想寫完后把代碼掛上來賺點積分也不錯。寫完后發現這東西值得寫一篇文章,授人予魚不如授人以漁嘛(這句話是這么說的吧),順便賺點應屆學生MM的膜拜那就更妙了。然后再掛一個收款二維碼,一個人1塊錢,一天10000個人付款,一個月30萬,一年360萬。。。可了不得了,離一個億的小目標就差幾十年了。

  不知道博客園對夢話有沒有限制,有的話請告知,我會盡快刪除上述文字。

  那么現在回到現實中,這篇博文如果能有>2個評論,我后續會再出一個Netty相關的專欄。否則,就不出了。有人會好奇,為什么把閾值定義成>2呢?不為什么,因為我肯定會先用我媳婦兒的號留個言,然后用自己的號留個言。

  好了,廢話不多說了,后面還有好多事兒呢,洗菜、做飯、刷碗、跪搓衣。。。好了,言歸正傳吧。

二、最終效果

  為什么先看最終效果?因為此刻代碼已經擼完了。更重要的是我們帶着感官的目標去進行后續的分析,可以更好地理解。標題中提到了,整個工程包含三個部分:

1、聊天服務器

  聊天服務器的職責一句話解釋:負責接收所有用戶發送的消息,並將消息轉發給目標用戶。

  聊天服務器沒有任何界面,但是卻是IM中最重要的角色,為表達敬意,必須要給它放個效果圖:

 

2021-05-11 10:41:40.037  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700900029,"messageType":"99"}
2021-05-11 10:41:50.049  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.n.handler.BussMessageHandler     : 收到消息:{"time":1620700910045,"messageType":"14","sendUserName":"guodegang","recvUserName":"yuqian","sendMessage":"於老師你好"}
2021-05-11 10:41:50.055  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.executor.SendMsgExecutor   : 消息轉發成功:{"time":1620700910052,"messageType":"14","sendUserName":"guodegang","recvUserName":"yuqian","sendMessage":"於老師你好"}
2021-05-11 10:41:54.068  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700914064,"messageType":"99"}
2021-05-11 10:41:57.302  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.n.handler.BussMessageHandler     : 收到消息:{"time":1620700917301,"messageType":"14","sendUserName":"yuqian","recvUserName":"guodegang","sendMessage":"郭老師你好"}
2021-05-11 10:41:57.304  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.executor.SendMsgExecutor   : 消息轉發成功:{"time":1620700917303,"messageType":"14","sendUserName":"yuqian","recvUserName":"guodegang","sendMessage":"郭老師你好"}
2021-05-11 10:42:05.050  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700925049,"messageType":"99"}
2021-05-11 10:42:12.309  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700932304,"messageType":"99"}
2021-05-11 10:42:20.066  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700940050,"messageType":"99"}
2021-05-11 10:42:27.311  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700947309,"messageType":"99"}
2021-05-11 10:42:35.070  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700955068,"messageType":"99"}
2021-05-11 10:42:42.316  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700962312,"messageType":"99"}
2021-05-11 10:42:50.072  INFO 9392 --- [ntLoopGroup-3-1] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700970071,"messageType":"99"}
2021-05-11 10:42:57.316  INFO 9392 --- [ntLoopGroup-3-2] c.e.o.s.netty.handler.HeartBeatHandler   : server收到心跳包:{"time":1620700977315,"messageType":"99"}

  從效果圖我們看到了一些內容:收到心跳包、收到消息,轉發消息,這些內容后面會詳細講解。

2、聊天客戶端

  聊天客戶端的職責一句話解釋:登陸,給別人發聊天內容,收其它人發給自己的聊天內容。

  下面為方便演示,我會打開兩個客戶端,用兩個不同用戶登陸,然后發消息。

 

3、Web管理控制台

  目前只做了一個賬戶管理,具體看圖吧:

三、需求分析

  無(見第二章節)。

四、概要設計

1、技術選型

1)聊天服務端

  聊天服務器與客戶端通過TCP協議進行通信,使用長連接、全雙工通信模式,基於經典通信框架Netty實現。

  那么什么是長連接?顧名思義,客戶端和服務器連上后,會在這條連接上面反復收發消息,連接不會斷開。與長連接對應的當然就是短連接了,短連接每次發消息之前都需要先建立連接,然后發消息,最后斷開連接。顯然,即時聊天適合使用長連接。

  那么什么又是全雙工?當長連接建立起來后,在這條連接上既有上行的數據,又有下行的數據,這就叫全雙工。那么對應的半雙工、單工,大家自行百度吧。

2)Web管理控制台

  Web管理端使用SpringBoot腳手架,前端使用Layuimini(一個基於Layui前端框架封裝的前端框架),后端使用SpringMVC+Jpa+Shiro。

3)聊天客戶端

  使用SpringBoot+JavaFX,做了一個極其簡陋的客戶端,JavaFX是一個開發Java桌面程序的框架,本人也是第一次使用,代碼中的寫法都是網上查的,這並不是本文的重點,有興趣的仔細百度吧。

4)SpringBoot

  以上三個組件,全部以SpringBoot做為腳手架開發。

5)代碼構建

  Maven。

2、數據庫設計

  我們只簡單用到一張用戶表,比較簡單直接貼腳本:

CREATE TABLE `sys_user` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
  `user_name` varchar(64) DEFAULT NULL COMMENT '用戶名:登陸賬號',
  `pass_word` varchar(128) DEFAULT NULL COMMENT '密碼',
  `name` varchar(16) DEFAULT NULL COMMENT '昵稱',
  `sex` char(1) DEFAULT NULL COMMENT '性別:1-男,2女',
  `status` bit(1) DEFAULT NULL COMMENT '用戶狀態:1-有效,0-無效',
  `online` bit(1) DEFAULT NULL COMMENT '在線狀態:1-在線,0-離線',
  `salt` varchar(128) DEFAULT NULL COMMENT '密碼鹽值',
  `admin` bit(1) DEFAULT NULL COMMENT '是否管理員(只有管理員才能登錄Web端):1-是,0-否',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

  這張表都在什么時候用到?

  1)Web管理端登陸的時候;2)聊天客戶端將登陸請求發送到聊天服務端時,聊天服務端進行用戶認證;3)聊天客戶端的好友列表加載。

3、通信設計

  本節將會是本文的核心內容之一,主要描述通信報文協議格式、以及通信報文的交互場景。

1)報文協議格式

  下面這張圖應該能說明99%了:

  剩下的1%在這里說:

  a)粘包問題,TCP長連接中,粘包是第一個需要解決的問題。通俗的講,粘包的意思是消息接收方往往收到的不是“整個”報文,有時候比“整個”多一點,有時候比“整個”少一點,這樣就導致接收方無法解析這個報文。那么上圖中的頭8個字節就為了解決這個問題,接收方根據頭8個字節標識的長度來獲取到“整個”報文,從而進行正常的業務處理;

  b)2字節報文類型,為了方便解析報文而設計。根據這兩個字節將后面的json轉成相應的實體以便進行后續處理;

  c)變長報文體實際上就是json格式的串,當然,你可以自己設計報文格式,我這里為了方便處理就直接放json了;

  d)當然,你可以把報文設計的更復雜、更專業,比如加密、加簽名等。

2)報文交互場景

  a)登陸

  b)發送消息-成功

  c)發送消息-目標客戶端不在線

  d)發送消息-目標客戶端在線,但消息轉發失敗

五、編碼實現

  前面說了那么多,現在總得說點有用的。

1、先說說Netty

  Netty是一個相當優秀的通信框架,大多數的頂級開源框架中都有Netty的身影。具體它有多么優秀,建議大家自行百度,我不如百度說的好。我只從應用方面說說Netty。應用過程中,它最核心的東西叫handler,我們可以簡單理解它為消息處理器。收到的消息和出去的消息都會經過一系列的handler加工處理。收到的消息我們叫它入站消息,發出去的消息我們叫它出站消息,因此handler又分為出站handler和入站handler。收到的消息只會被入站handler處理,發出去的消息只會被出站handler處理。

  舉個例子,我們從網絡上收到的消息是二進制的字節碼,我們的目標是將消息轉換成java bean,這樣方便我們程序處理,針對這個場景我設計這么幾個入站handler:

  1)將字節轉換成String的handler;

  2)將String轉成java bean的handler;

  3)對java bean進行業務處理的handler。

  發出去的消息呢,我設計這么幾個出站handler:

  1)java bean 轉成String的handler;

  2)String轉成byte的handler。

  以上是關於handler的說明。

  接下來再說一下Netty的異步。異步的意思是當你做完一個操作后,不會立馬得到操作結果,而是有結果后Netty會通知你。通過下面的一段代碼來說明:

channel.writeAndFlush(sendMsgRequest).addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()){
                        logger.info("消息發送成功:{}",sendMsgRequest);
                    }else {
                        logger.info("消息發送失敗:{}",sendMsgRequest);
                    }
                }
            });

  上面的writeAndFlush操作無法立即返回結果,如果你關注結果,那么為他添加一個listener,有結果后會在listener中響應。

  到這里,百度上搜到的Netty相關的代碼你基本就能看懂了。

2、聊天服務端

  首先看主入口的代碼

public void start(){
        EventLoopGroup boss = new NioEventLoopGroup();
        EventLoopGroup worker = new NioEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(boss, worker)
                .channel(NioServerSocketChannel.class)
                .option(ChannelOption.SO_BACKLOG, 1024)
                .handler(new LoggingHandler(LogLevel.INFO))
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        //心跳
                        ch.pipeline().addLast(new IdleStateHandler(25, 20, 0, TimeUnit.SECONDS));
                        //收整包
                        ch.pipeline().addLast(new StringLengthFieldDecoder());
                        //轉字符串
                        ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                        //json轉對象
                        ch.pipeline().addLast(new JsonDecoder());
                        //心跳
                        ch.pipeline().addLast(new HeartBeatHandler());
                        //實體轉json
                        ch.pipeline().addLast(new JsonEncoder());
                        //消息處理
                        ch.pipeline().addLast(bussMessageHandler);
                    }
                });
        try {
            ChannelFuture f = serverBootstrap.bind(port).sync();
            f.channel().closeFuture().sync();
        }catch (InterruptedException e) {
            logger.error("服務啟動失敗:{}", ExceptionUtils.getStackTrace(e));
        }finally {
            worker.shutdownGracefully();
            boss.shutdownGracefully();
        }
    }

  代碼中除了initChannel方法中的代碼,其他代碼都是固定寫法。那么什么叫固定寫法呢?通俗來講就是可以Ctrl+c、Ctrl+v。

  下面我們着重看initChannel方法里面的代碼。這里面就是上面講到的各種handler,我們下面挨個講這些handler都是干啥的。

  1)IdleStateHandler。這個是Netty內置的一個handler,既是出站handler又是入站handler。它的作用一般是用來實現心跳監測。所謂心跳,就是客戶端和服務端建立連接后,服務端要實時監控客戶端的健康狀態,如果客戶端掛了或者hung住了,服務端及時釋放相應的資源,以及做出其他處理比如通知運維。所以在我們的場景中,客戶端需要定時上報自己的心跳,如果服務端檢測到一段時間內沒收到客戶端上報的心跳,那么及時做出處理,我們這里就是簡單的將其連接斷開,並修改數據庫中相應賬戶的在線狀態。

  現在開始說IdleStateHandler,第一個參數叫讀超時時間,第二個參數叫寫超時時間,第三個參數叫讀寫超時時間,第四個參數時時間單位秒。這個handler表達的意思是當25秒內沒讀到客戶端的消息,或者20秒內沒往客戶端發消息,就會產生一個超時事件。那么這個超時事件我們該對他做什么處理呢,請看下一條。

  2)HeartBeatHandler。結合a)一起看,當發生超時事件時,HeartBeatHandler會收到這個事件,並對它做出處理:第一將鏈接斷開;第二講數據庫中相應的賬戶更新為不在線狀態。

public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    private static Logger logger = LoggerFactory.getLogger(HeartBeatHandler.class);

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent){
            IdleStateEvent event = (IdleStateEvent)evt;
            if (event.state() == IdleState.READER_IDLE) {
                //讀超時,應將連接斷掉
                InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
                String ip = socketAddress.getAddress().getHostAddress();
                ctx.channel().disconnect();
                logger.info("【{}】連接超時,斷開",ip);
                String userName = SessionManager.removeSession(ctx.channel());
                SpringContextUtil.getBean(UserService.class).updateOnlineStatus(userName,Boolean.FALSE);
            }else {
                super.userEventTriggered(ctx, evt);
            }
        }else {
            super.userEventTriggered(ctx, evt);
        }

    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof HeartBeat){
            //收到心跳包,不處理
            logger.info("server收到心跳包:{}",msg);
            return;
        }
        super.channelRead(ctx, msg);
    }
}
 

  3)StringLengthFieldDecoder。這是個入站handler,他的作用就是解決上面提到的粘包問題:

public class StringLengthFieldDecoder extends LengthFieldBasedFrameDecoder {
    public StringLengthFieldDecoder() {
        super(10*1024*1024,0,8,0,8);
    }


    @Override
    protected long getUnadjustedFrameLength(ByteBuf buf, int offset, int length, ByteOrder order) {
        buf = buf.order(order);
        byte[] lenByte = new byte[length];
        buf.getBytes(offset, lenByte);
        String lenStr = new String(lenByte);
        Long len =  Long.valueOf(lenStr);
        return len;
    }
}

  只需要集成Netty提供的LengthFieldBasedFrameDecoder 類,並重寫getUnadjustedFrameLength方法即可。

  首先看構造方法中的5個參數。第一個表示能處理的包的最大長度;第二三個參數應該結合起來理解,表示長度字段從第幾位開始,長度的長度是多少,也就是上面報文格式協議中的頭8個字節;第四個參數表示長度是否需要校正,舉例理解,比如頭8個字節解析出來的長度=包體長度+頭8個字節的長度,那么這里就需要校正8個字節,我們的協議中長度只包含報文體,因此這個參數填0;最后一個參數,表示接收到的報文是否要跳過一些字節,本例中設置為8,表示跳過頭8個字節,因此經過這個handler后,我們收到的數據就只有報文本身了,不再包含8個長度字節了。

  再看getUnadjustedFrameLength方法,其實就是將頭8個字符串型的長度為轉換成long型。重寫完這個方法后,Netty就知道如何收一個“完整”的數據包了。

  4)StringDecoder。這個是Netty自帶的入站handler,會將字節流以指定的編碼解析成String。

  5)JsonDecoder。是我們自定義的一個入站handler,目的是將json String轉換成java bean,以方便后續處理:

public class JsonDecoder extends MessageToMessageDecoder<String> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, String o, List<Object> list) throws Exception {
        Message msg = MessageEnDeCoder.decode(o);
        list.add(msg);
    }

}

  這里會調用我們自定義的一個編解碼幫助類進行轉換:

public static Message decode(String message){
        if (StringUtils.isEmpty(message) || message.length() < 2){
            return null;
        }
        String type = message.substring(0,2);
        message = message.substring(2);
        if (type.equals(LoginRequest)){
            return JsonUtil.jsonToObject(message,LoginRequest.class);
        }else if (type.equals(LoginResponse)){
            return JsonUtil.jsonToObject(message,LoginResponse.class);
        }else if (type.equals(LogoutRequest)){
            return JsonUtil.jsonToObject(message,LogoutRequest.class);
        }else if (type.equals(LogoutResponse)){
            return JsonUtil.jsonToObject(message,LogoutResponse.class);
        }else if (type.equals(SendMsgRequest)){
            return JsonUtil.jsonToObject(message,SendMsgRequest.class);
        }else if (type.equals(SendMsgResponse)){
            return JsonUtil.jsonToObject(message,SendMsgResponse.class);
        }else if (type.equals(HeartBeat)){
            return JsonUtil.jsonToObject(message,HeartBeat.class);
        }
        return null;
    }

  6)BussMessageHandler。先看這個入站handler,是我們的一個業務處理主入口,他的主要工作就是將消息分發給線程池去處理,另外還負載一個小場景,當客戶端主動斷開時,需要將相應的賬戶數據庫中狀態更新為不在線。

public class BussMessageHandler extends ChannelInboundHandlerAdapter {
    private static Logger logger = LoggerFactory.getLogger(BussMessageHandler.class);

    @Autowired
    private TaskDispatcher taskDispatcher;

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        logger.info("收到消息:{}",msg);
        if (msg instanceof Message){
            taskDispatcher.submit(ctx.channel(),(Message)msg);
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //客戶端連接斷開
        InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress();
        String ip = socketAddress.getAddress().getHostAddress();
        logger.info("客戶端斷開:{}",ip);
        String userName = SessionManager.removeSession(ctx.channel());
        SpringContextUtil.getBean(UserService.class).updateOnlineStatus(userName,Boolean.FALSE);
        super.channelInactive(ctx);
    }
}

  接下來還差線程池的處理邏輯,也非常簡單,就是將任務封裝成executor然后交給線程池處理:

public class TaskDispatcher {
    private ThreadPoolExecutor threadPool;

    public TaskDispatcher(){
        int corePoolSize = 15;
        int maxPoolSize = 50;
        int keepAliveSeconds = 30;
        int queueCapacity = 1024;
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(queueCapacity);
        this.threadPool = new ThreadPoolExecutor(
                corePoolSize, maxPoolSize, keepAliveSeconds, TimeUnit.SECONDS,
                queue);
    }

    public void submit(Channel channel, Message msg){
        ExecutorBase executor = null;
        String messageType = msg.getMessageType();
        if (messageType.equals(MessageEnDeCoder.LoginRequest)){
            executor = new LoginExecutor(channel,msg);
        }
        if (messageType.equalsIgnoreCase(MessageEnDeCoder.SendMsgRequest)){
            executor = new SendMsgExecutor(channel,msg);
        }
        if (executor != null){
            this.threadPool.submit(executor);
        }
    }
}
 

  接下來看一下消息轉發executor是怎么做的:

public class SendMsgExecutor extends ExecutorBase {
    private static Logger logger = LoggerFactory.getLogger(SendMsgExecutor.class);

    public SendMsgExecutor(Channel channel, Message message) {
        super(channel, message);
    }

    @Override
    public void run() {
        SendMsgResponse response = new SendMsgResponse();
        response.setMessageType(MessageEnDeCoder.SendMsgResponse);
        response.setTime(new Date());
        SendMsgRequest request = (SendMsgRequest)message;
        String recvUserName = request.getRecvUserName();
        String sendContent = request.getSendMessage();
        Channel recvChannel = SessionManager.getSession(recvUserName);
        if (recvChannel != null){
            SendMsgRequest sendMsgRequest = new SendMsgRequest();
            sendMsgRequest.setTime(new Date());
            sendMsgRequest.setMessageType(MessageEnDeCoder.SendMsgRequest);
            sendMsgRequest.setRecvUserName(recvUserName);
            sendMsgRequest.setSendMessage(sendContent);
            sendMsgRequest.setSendUserName(request.getSendUserName());
            recvChannel.writeAndFlush(sendMsgRequest).addListener(new GenericFutureListener<Future<? super Void>>() {
                @Override
                public void operationComplete(Future<? super Void> future) throws Exception {
                    if (future.isSuccess()){
                        logger.info("消息轉發成功:{}",sendMsgRequest);
                        response.setResultCode("0000");
                        response.setResultMessage(String.format("發給用戶[%s]消息成功",recvUserName));
                        channel.writeAndFlush(response);
                    }else {
                        logger.error(ExceptionUtils.getStackTrace(future.cause()));
                        logger.info("消息轉發失敗:{}",sendMsgRequest);
                        response.setResultCode("9999");
                        response.setResultMessage(String.format("發給用戶[%s]消息失敗",recvUserName));
                        channel.writeAndFlush(response);
                    }
                }
            });
        }else {
            logger.info("用戶{}不在線,消息轉發失敗",recvUserName);
            response.setResultCode("9999");
            response.setResultMessage(String.format("用戶[%s]不在線",recvUserName));
            channel.writeAndFlush(response);
        }
    }
}

  整體邏輯:一獲取要把消息發給那個賬號;二獲取該賬號對應的連接;三在此連接上發送消息;四獲取消息發送結果,將結果發給消息“發起者”。

  下面是登陸處理的executor:

public class LoginExecutor extends ExecutorBase {
    private static Logger logger = LoggerFactory.getLogger(LoginExecutor.class);

    public LoginExecutor(Channel channel, Message message) {
        super(channel, message);
    }
    @Override
    public void run() {
        LoginRequest request = (LoginRequest)message;
        String userName = request.getUserName();
        String password = request.getPassword();
        UserService userService = SpringContextUtil.getBean(UserService.class);
        boolean check = userService.checkLogin(userName,password);
        LoginResponse response = new LoginResponse();
        response.setUserName(userName);
        response.setMessageType(MessageEnDeCoder.LoginResponse);
        response.setTime(new Date());
        response.setResultCode(check?"0000":"9999");
        response.setResultMessage(check?"登陸成功":"登陸失敗,用戶名或密碼錯");
        if (check){
            userService.updateOnlineStatus(userName,Boolean.TRUE);
            SessionManager.addSession(userName,channel);
        }
        channel.writeAndFlush(response).addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                //登陸失敗,斷開連接
                if (!check){
                    logger.info("用戶{}登陸失敗,斷開連接",((LoginRequest) message).getUserName());
                    channel.disconnect();
                }
            }
        });
    }
}

  登陸邏輯也不復雜,登陸成功則更新用戶在線狀態,並且無論登陸成功還是失敗,都會返一個登陸應答。同時,如果登陸校驗失敗,在返回應答成功后,需要將鏈接斷開。

  7)JsonEncoder。最后看這個唯一的出站handler,服務端發出去的消息都會被出站handler處理,他的職責就是將java bean轉成我們之前定義的報文協議格式:

public class JsonEncoder extends MessageToByteEncoder<Message> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
        String msgStr = MessageEnDeCoder.encode(message);
        int length = msgStr.getBytes(Charset.forName("UTF-8")).length;
        String str = String.valueOf(length);
        String lenStr = StringUtils.leftPad(str,8,'0');
        msgStr = lenStr + msgStr;
        byteBuf.writeBytes(msgStr.getBytes("UTF-8"));
    }
}

  8)SessionManager。剩下最后一個東西沒說,這個是用來保存每個登陸成功賬戶的鏈接的,底層是個map,key為用戶賬戶,value為鏈接:

public class SessionManager {
    private static ConcurrentHashMap<String,Channel> sessionMap = new ConcurrentHashMap<>();

    public static void addSession(String userName,Channel channel){
        sessionMap.put(userName,channel);
    }

    public static String removeSession(String userName){
        sessionMap.remove(userName);
        return userName;
    }

    public static String removeSession(Channel channel){
        for (String key:sessionMap.keySet()){
            if (channel.id().asLongText().equalsIgnoreCase(sessionMap.get(key).id().asLongText())){
                sessionMap.remove(key);
                return key;
            }
        }
        return null;
    }

    public static Channel getSession(String userName){
        return sessionMap.get(userName);
    }
}

  到這里,整個服務端的邏輯就走完了,是不是,很簡單呢!

3、聊天客戶端

  客戶端中界面相關的東西是基於JavaFX框架做的,這個我是第一次用,所以不打算講這塊,怕誤導大家。主要還是講Netty作為客戶端是如何跟服務端通信的。

  按照慣例,還是先貼出主入口:

public void login(String userName,String password) throws Exception {
        Bootstrap clientBootstrap = new Bootstrap();
        EventLoopGroup clientGroup = new NioEventLoopGroup();
        try {
            clientBootstrap.group(clientGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.CONNECT_TIMEOUT_MILLIS,10000);
            clientBootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new IdleStateHandler(20, 15, 0, TimeUnit.SECONDS));
                    ch.pipeline().addLast(new StringLengthFieldDecoder());
                    ch.pipeline().addLast(new StringDecoder(Charset.forName("UTF-8")));
                    ch.pipeline().addLast(new JsonDecoder());
                    ch.pipeline().addLast(new JsonEncoder());
                    ch.pipeline().addLast(bussMessageHandler);
                    ch.pipeline().addLast(new HeartBeatHandler());
                }
            });
            ChannelFuture future = clientBootstrap.connect(server,port).sync();
            if (future.isSuccess()){
                channel = (SocketChannel)future.channel();
                LoginRequest request = new LoginRequest();
                request.setTime(new Date());
                request.setUserName(userName);
                request.setPassword(password);
                request.setMessageType(MessageEnDeCoder.LoginRequest);
                channel.writeAndFlush(request).addListener(new GenericFutureListener<Future<? super Void>>() {
                    @Override
                    public void operationComplete(Future<? super Void> future) throws Exception {
                        if (future.isSuccess()){
                            logger.info("登陸消息發送成功");
                        }else {
                            logger.info("登陸消息發送失敗:{}", ExceptionUtils.getStackTrace(future.cause()));
                            Platform.runLater(new Runnable() {
                                @Override
                                public void run() {
                                    LoginController.setLoginResult("網絡錯誤,登陸消息發送失敗");
                                }
                            });
                        }
                    }
                });
            }else {
                clientGroup.shutdownGracefully();
                throw new RuntimeException("網絡錯誤");
            }
        }catch (Exception e){
            clientGroup.shutdownGracefully();
            throw new RuntimeException("網絡錯誤");
        }
    }

  對這段代碼,我們主要關注這幾點:一所有handler的初始化;二connect服務端。

  所有handler中,除了bussMessageHandler是客戶端特有的外,其他的handler在服務端章節已經講過了,不再贅述。

  1)先看連接服務端的操作。首先發起連接,連接成功后發送登陸報文。發起連接需要對成功和失敗進行處理。發送登陸報文也需要對成功和失敗進行處理。注意,這里的成功失敗只是代表當前操作的網絡層面的成功失敗,這時候並不能獲取服務端返回的應答中的業務層面的成功失敗,如果不理解這句話,可以翻看前面講過的“異步”相關內容。

  2)BussMessageHandler。整體流程還是跟服務端一樣,將受到的消息扔給線程池處理,我們直接看處理消息的各個executor。

  先看客戶端發出登陸請求后,收到登陸應答消息后是怎么處理的(這段代碼可以結合1)的內容一起理解):

public class LoginRespExecutor extends ExecutorBase {
    private static Logger logger = LoggerFactory.getLogger(LoginRespExecutor.class);

    public LoginRespExecutor(Channel channel, Message message) {
        super(channel, message);
    }

    @Override
    public void run() {
        LoginResponse response = (LoginResponse)message;
        logger.info("登陸結果:{}->{}",response.getResultCode(),response.getResultMessage());
        if (!response.getResultCode().equals("0000")){
            Platform.runLater(new Runnable() {
                @Override
                public void run() {
                    LoginController.setLoginResult("登陸失敗,用戶名或密碼錯誤");
                }
            });
        }else {
            LoginController.setCurUserName(response.getUserName());
            ClientApplication.getScene().setRoot(SpringContextUtil.getBean(MainView.class).getView());
        }
    }
}

  接下來看客戶端是怎么發聊天信息的:

public void sendMessage(Message message) {
        channel.writeAndFlush(message).addListener(new GenericFutureListener<Future<? super Void>>() {
            @Override
            public void operationComplete(Future<? super Void> future) throws Exception {
                SendMsgRequest send = (SendMsgRequest)message;
                if (future.isSuccess()){
                    Platform.runLater(new Runnable() {
                        @Override
                        public void run() {
                            MainController.setMessageHistory(String.format("[我]在[%s]發給[%s]的消息[%s],發送成功",
                                    DateFormatUtils.format(send.getTime(),"yyyy-MM-dd HH:mm:ss"),send.getRecvUserName(),send.getSendMessage()));
                        }
                    });
                }else {
                    Platform.runLater(new Runnable() {
                        @Override
                        public void run() {
                            MainController.setMessageHistory(String.format("[我]在[%s]發給[%s]的消息[%s],發送失敗",
                                    DateFormatUtils.format(send.getTime(),"yyyy-MM-dd HH:mm:ss"),send.getRecvUserName(),send.getSendMessage()));
                        }
                    });
                }
            }
        });
    }

  實際上,到這里通信相關的代碼已經貼完了。剩下的都是界面處理相關的代碼,不再貼了。

  客戶端,是不是,非常簡單!

4、Web管理端

  Web管理端可以說是更沒任何技術含量,就是Shiro登陸認證、列表增刪改查。增刪改沒什么好說的,下面重點說一下Shiro登陸和列表查詢。

  1)Shiro登陸

  首先定義一個Realm,至於這是什么概念,自行百度吧,這里並不是本文重點:

public class UserDbRealm extends AuthorizingRealm {
    @Override
    protected AuthorizationInfo doGetAuthorizationInfo(PrincipalCollection principalCollection) {
        return null;
    }

    @Override
    protected AuthenticationInfo doGetAuthenticationInfo(AuthenticationToken authenticationToken) throws AuthenticationException {
        RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
        
        UsernamePasswordToken upToken = (UsernamePasswordToken) authenticationToken;
        String username = upToken.getUsername();
        String password = "";
        if (upToken.getPassword() != null)
        {
            password = new String(upToken.getPassword());
        }
        // TODO: 2021/5/13 校驗用戶名密碼,不通過則拋認證異常即可 
        ShiroUser user = new ShiroUser();
        SimpleAuthenticationInfo info = new SimpleAuthenticationInfo(user, password, getName());
        return info;
    }
}

  接下來把這個Realm注冊成Spring Bean,同時定義過濾鏈:

    @Bean
    public Realm realm() {
        UserDbRealm realm = new UserDbRealm();
        realm.setAuthorizationCachingEnabled(true);
        realm.setCacheManager(cacheManager());
        return realm;
    }
    
    @Bean
    public ShiroFilterChainDefinition shiroFilterChainDefinition() {
        DefaultShiroFilterChainDefinition chainDefinition = new DefaultShiroFilterChainDefinition();
        chainDefinition.addPathDefinition("/css/**", "anon");
        chainDefinition.addPathDefinition("/img/**", "anon");
        chainDefinition.addPathDefinition("/js/**", "anon");
        chainDefinition.addPathDefinition("/logout", "logout");
        chainDefinition.addPathDefinition("/login", "anon");
        chainDefinition.addPathDefinition("/captchaImage", "anon");
        chainDefinition.addPathDefinition("/**", "authc");
        return chainDefinition;
    }

  到現在為止,Shiro配置好了,下面看如何調起登陸:

    @PostMapping("/login")
    @ResponseBody
    public Result<String> login(String username, String password, Boolean rememberMe)
    {
        Result<String> ret = new Result<>();
        UsernamePasswordToken token = new UsernamePasswordToken(username, password);
        Subject subject = SecurityUtils.getSubject();
        try
        {
            subject.login(token);
            return ret;
        }
        catch (AuthenticationException e)
        {
            String msg = "用戶或密碼錯誤";
            if (StringUtils.isNotEmpty(e.getMessage()))
            {
                msg = e.getMessage();
            }
            ret.setCode(Result.FAIL);
            ret.setMessage(msg);
            return ret;
        }
    }

  登陸代碼就這么愉快的完成了。

  2)列表查詢

  查是個很簡單的操作,但是卻是所有web系統中使用最頻繁的操作。因此,做一個通用性的封裝,非常有必要。以下代碼不做過多講解,初級工程師到高級工程師,就差這段代碼了(手動捂臉):

  a)Controller

    @RequestMapping("/query")
    @ResponseBody
    public Result<Page<User>> query(@RequestParam Map<String,Object> params, String sort, String order, Integer pageIndex, Integer pageSize){
        Page<User> page = userService.query(params,sort,order,pageIndex,pageSize);
        Result<Page<User>> ret = new Result<>();
        ret.setData(page);
        return ret;
    }

  b)Service

    @Autowired
    private UserDao userDao;
    @Autowired
    private QueryService queryService;

    public Page<User> query(Map<String,Object> params, String sort, String order, Integer pageIndex, Integer pageSize){
        return queryService.query(userDao,params,sort,order,pageIndex,pageSize);
    }
public class QueryService {
    public <T> com.easy.okim.common.model.Page<T> query(JpaSpecificationExecutor<T> dao, Map<String,Object> filters, String sort, String order, Integer pageIndex, Integer pageSize){
        com.easy.okim.common.model.Page<T> ret = new com.easy.okim.common.model.Page<T>();
        Map<String,Object> params = new HashMap<>();
        if (filters != null){
            filters.remove("sort");
            filters.remove("order");
            filters.remove("pageIndex");
            filters.remove("pageSize");
            for (String key:filters.keySet()){
                Object value = filters.get(key);
                if (value != null && StringUtils.isNotEmpty(value.toString())){
                    params.put(key,value);
                }
            }
        }
        Pageable pageable = null;
        pageIndex = pageIndex - 1;
        if (StringUtils.isEmpty(sort)){
            pageable = PageRequest.of(pageIndex,pageSize);
        }else {
            Sort s = Sort.by(Sort.Direction.ASC,sort);
            if (StringUtils.isNotEmpty(order) && order.equalsIgnoreCase("desc")){
                s = Sort.by(Sort.Direction.DESC,sort);
            }
            pageable = PageRequest.of(pageIndex,pageSize,s);
        }
        Page<T> page = null;
        if (params.size() ==0){
            page = dao.findAll(null,pageable);
        }else {
            Specification<T> specification = new Specification<T>() {
                @Override
                public Predicate toPredicate(Root<T> root, CriteriaQuery<?> criteriaQuery, CriteriaBuilder builder) {
                    List<Predicate> predicates = new ArrayList<>();
                    for (String filter : params.keySet()) {
                        Object value = params.get(filter);
                        if (value == null || StringUtils.isEmpty(value.toString())) {
                            continue;
                        }
                        String field = filter;
                        String operator = "=";
                        String[] arr = filter.split("\\|");
                        if (arr.length == 2) {
                            field = arr[0];
                            operator = arr[1];
                        }
                        if (arr.length == 3) {
                            field = arr[0];
                            operator = arr[1];
                            String type = arr[2];
                            if (type.equalsIgnoreCase("boolean")){
                                value = Boolean.parseBoolean(value.toString());
                            }else if (type.equalsIgnoreCase("integer")){
                                value = Integer.parseInt(value.toString());
                            }else if (type.equalsIgnoreCase("long")){
                                value = Long.parseLong(value.toString());
                            }
                        }
                        String[] names = StringUtils.split(field, ".");
                        Path expression = root.get(names[0]);
                        for (int i = 1; i < names.length; i++) {
                            expression = expression.get(names[i]);
                        }
                        // logic operator
                        switch (operator) {
                            case "=":
                                predicates.add(builder.equal(expression, value));
                                break;
                            case "!=":
                                predicates.add(builder.notEqual(expression, value));
                                break;
                            case "like":
                                predicates.add(builder.like(expression, "%" + value + "%"));
                                break;
                            case ">":
                                predicates.add(builder.greaterThan(expression, (Comparable) value));
                                break;
                            case "<":
                                predicates.add(builder.lessThan(expression, (Comparable) value));
                                break;
                            case ">=":
                                predicates.add(builder.greaterThanOrEqualTo(expression, (Comparable) value));
                                break;
                            case "<=":
                                predicates.add(builder.lessThanOrEqualTo(expression, (Comparable) value));
                                break;
                            case "isnull":
                                predicates.add(builder.isNull(expression));
                                break;
                            case "isnotnull":
                                predicates.add(builder.isNotNull(expression));
                                break;
                            case "in":
                                CriteriaBuilder.In in = builder.in(expression);
                                String[] arr1 = StringUtils.split(filter.toString(), ",");
                                for (String e : arr1) {
                                    in.value(e);
                                }
                                predicates.add(in);
                                break;
                        }
                    }

                    // 將所有條件用 and 聯合起來
                    if (!predicates.isEmpty()) {
                        return builder.and(predicates.toArray(new Predicate[predicates.size()]));
                    }
                    return builder.conjunction();
                }
            };
            page = dao.findAll(specification,pageable);
        }
        ret.setTotal(page.getTotalElements());
        ret.setRows(page.getContent());
        return ret;
    }
}

  c)Dao

public interface UserDao extends JpaRepository<User,Long>,JpaSpecificationExecutor<User> {
    //啥都不用寫,繼承Spring Data Jpa提供的類就行了
}

五、結語

  雖然標題起的有些嘩眾取寵了,但內容也確實都是實實在在的干貨,希望本文能對大家有一些幫助,源代碼工程不打算貼了,希望你能跟着文章自己手敲一遍。

  開頭說的收款二維碼,只是說笑,如果你真想付款,請私信我索取收款二維碼,金額不設上限的哈哈。

  歡迎閱讀,歡迎轉載,轉載請注明出處,求你了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM