Springboot + Netty + WebSocket 實現簡單的聊天


  簡單的實現聊天,發送至服務器端之后由服務器轉發給其他在線的用戶。

1. pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>cn.qz</groupId>
    <artifactId>xm</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>blog</name>
    <description>blog-server</description>
    <packaging>jar</packaging>

    <properties>
        <java.version>1.8</java.version>
        <!--<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> -->
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-tomcat</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13</version>
            <scope>test</scope>
        </dependency>

        <!-- spring-boot整合mybatis-plus -->
        <dependency>
            <groupId>com.baomidou</groupId>
            <artifactId>mybatis-plus-boot-starter</artifactId>
            <version>2.3</version>
        </dependency>
        <dependency>
            <groupId>com.github.pagehelper</groupId>
            <artifactId>pagehelper</artifactId>
            <version>5.1.2</version>
        </dependency>

        <!-- spring-boot整合mysql -->
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <!-- 引入 redis 依賴 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>

        <!-- spring-boot整合druid -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.22</version>
        </dependency>

        <!-- 使用事務需要引入這個包 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-jdbc</artifactId>
        </dependency>

        <!-- 引入 spring aop 依賴 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-aop</artifactId>
        </dependency>

        <!-- commons工具包 -->

        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-collections4</artifactId>
            <version>4.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-lang3</artifactId>
        </dependency>
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.4</version>
        </dependency>
        <dependency>
            <groupId>commons-collections</groupId>
            <artifactId>commons-collections</artifactId>
            <version>3.2</version>
        </dependency>
        <!-- 阿里的fastjson用於手動轉JSON -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.56</version>
        </dependency>
        <!--httpclient相關包 -->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.3.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpmime</artifactId>
            <version>4.3.1</version>
        </dependency>

        <!--tika解析文本內容 -->
        <dependency>
            <groupId>org.apache.tika</groupId>
            <artifactId>tika-parsers</artifactId>
            <version>1.17</version>
        </dependency>

        <!--POI -->
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi</artifactId>
            <version>3.16</version>
        </dependency>
        <dependency>
            <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId>
            <version>3.16</version>
        </dependency>

        <!-- springdata jpa依賴 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.jsoup/jsoup -->
        <dependency>
            <groupId>org.jsoup</groupId>
            <artifactId>jsoup</artifactId>
            <version>1.12.2</version>
        </dependency>

        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger2</artifactId>
            <version>2.9.2</version>
        </dependency>
        <dependency>
            <groupId>io.springfox</groupId>
            <artifactId>springfox-swagger-ui</artifactId>
            <version>2.9.2</version>
        </dependency>

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.15.0</version>
        </dependency>

        <!--netty-->
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.11.Final</version>
        </dependency>
        <!-- poi依賴 <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId>
            <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.poi</groupId>
            <artifactId>poi-ooxml</artifactId> <version>RELEASE</version> </dependency> -->
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <dependencies>
                    <!-- spring熱部署 -->
                    <!-- 該依賴在此處下載不下來,可以放置在build標簽外部下載完成后再粘貼進plugin中 -->
                    <dependency>
                        <groupId>org.springframework</groupId>
                        <artifactId>springloaded</artifactId>
                        <version>1.2.6.RELEASE</version>
                    </dependency>
                </dependencies>
                <configuration>
                    <fork>true</fork>
                </configuration>
            </plugin>

            <!-- 要將源碼放上去,需要加入這個插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-source-plugin</artifactId>
                <executions>
                    <execution>
                        <id>attach-sources</id>
                        <goals>
                            <goal>jar</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

            <!-- 執行Junit測試(測試所有類) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.10</version>
                <configuration>
                    <includes>
                        <!--<include>*\*\*\*Test.java</include>-->
                        <include>**\*</include>
                    </includes>
                </configuration>
            </plugin>
        </plugins>
    </build>

</project>

核心是netty-all, 其他依賴按需引入即可

2. 主要類信息

1. 服務端程序

package com.xm.ggn.netty;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NettyServer {

    private final int port;

    public NettyServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        // 修改bossGroup的數量,2線程足夠用
        EventLoopGroup bossGroup = new NioEventLoopGroup(2);
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap sb = new ServerBootstrap();
            sb.option(ChannelOption.SO_BACKLOG, 1024);
            sb.group(workerGroup, bossGroup) // 綁定線程池
                    .channel(NioServerSocketChannel.class) // 指定使用的channel
                    .localAddress(this.port)// 綁定監聽端口
                    .childHandler(new MyChannelInitializer());
            ChannelFuture cf = sb.bind().sync(); // 服務器異步創建綁定
            log.info(NettyServer.class + " 啟動正在監聽: " + cf.channel().localAddress());
            cf.channel().closeFuture().sync(); // 關閉服務器通道
        } finally {
            workerGroup.shutdownGracefully().sync(); // 釋放線程池資源
            bossGroup.shutdownGracefully().sync();
        }
    }
}

2. Initializer

package com.xm.ggn.netty;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        log.info("收到新的客戶端連接: {}", socketChannel.toString());
        // websocket協議本身是基於http協議的,所以這邊也要使用http解編碼器
        socketChannel.pipeline().addLast(new HttpServerCodec());
        // 以塊的方式來寫的處理器(添加對於讀寫大數據流的支持)
        socketChannel.pipeline().addLast(new ChunkedWriteHandler());
        // 對httpMessage進行聚合
        socketChannel.pipeline().addLast(new HttpObjectAggregator(8192));

        // ================= 上述是用於支持http協議的 =============

        // websocket 服務器處理的協議,用於給指定的客戶端進行連接訪問的路由地址
        socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));

        // 添加自己的handler
        socketChannel.pipeline().addLast(new MyWebSocketHandler());
    }
}

3.handler

package com.xm.ggn.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;

import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 自定義服務器端處理handler,繼承SimpleChannelInboundHandler,處理WebSocket 連接數據
 */
@Slf4j
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // 用戶id=>channel示例
    // 可以通過用戶的唯一標識保存用戶的channel
    // 這樣就可以發送給指定的用戶
    public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();

    /**
     * 每當服務端收到新的客戶端連接時,客戶端的channel存入ChannelGroup列表中,並通知列表中其他客戶端channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 獲取連接的channel
        Channel incomming = ctx.channel();
        //通知所有已經連接到服務器的客戶端,有一個新的通道加入
        /*for(Channel channel:channelGroup){
            channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入\n");
        }*/
        channelGroup.add(incomming);
    }

    /**
     * 每當服務端斷開客戶端連接時,客戶端的channel從ChannelGroup中移除,並通知列表中其他客戶端channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //獲取連接的channel
        /*Channel incomming = ctx.channel();
        for(Channel channel:channelGroup){
            channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"離開\n");
        }*/
        //從服務端的channelGroup中移除當前離開的客戶端
        channelGroup.remove(ctx.channel());

        //從服務端的channelMap中移除當前離開的客戶端
        Collection<Channel> col = channelMap.values();
        while (true == col.contains(ctx.channel())) {
            col.remove(ctx.channel());
            log.info("netty客戶端連接刪除成功!");
        }

    }


    /**
     * 每當從服務端讀到客戶端寫入信息時,將信息轉發給其他客戶端的Channel.
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("netty客戶端收到服務器數據, 客戶端地址: {}, msg: {}", ctx.channel().remoteAddress(), msg.text());
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        //消息處理類
        message(ctx, msg.text(), date);

        //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text()));
    }

    /**
     * 當服務端的IO 拋出異常時被調用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel incoming = ctx.channel();
        log.error("SimpleChatClient:" + incoming.remoteAddress() + "異常", cause);
        //異常出現就關閉連接
        ctx.close();
    }

    //消息處理類
    public void message(ChannelHandlerContext ctx, String msg, String date) {
        try {
            // 消息轉發給在線的其他用戶
            Channel channel = ctx.channel();
            for (Channel tmpChannel : channelGroup) {
                if (!tmpChannel.equals(channel)) {
                    String sendedMsg = date + ":" + msg;
                    log.info("服務器轉發消息,客戶端地址: {}, msg: {}", ctx.channel().remoteAddress(), sendedMsg);
                    tmpChannel.writeAndFlush(new TextWebSocketFrame(sendedMsg));
                }
            }
        } catch (Exception e) {
            log.error("message 處理異常, msg: {}, date: {}", msg, date, e);
        }
    }
}

4. Springboot主啟動類: 也可以將啟動nettyServer代碼移動至監聽Spring容器啟動事件類中

package com.xm.ggn;

import com.xm.ggn.netty.NettyServer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.domain.EntityScan;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@ServletComponentScan("com")
@EntityScan(basePackages = {"com"})
@EnableScheduling
// 允許通過AopContext.currentProxy() 獲取代理類
@EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true)
@EnableAsync
public class BlogApplication {

    public static void main(String[] args) {
        SpringApplication.run(BlogApplication.class, args);

        // 啟動netty服務器
        try {
            new NettyServer(8091).start();
        } catch (Exception e) {
            System.out.println("NettyServerError:" + e.getMessage());
        }
    }
}

5. 前端就用HTML界面簡單的測試

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>Netty-Websocket</title>
    <script type="text/javascript">
        var socket;
        if(!window.WebSocket){
            window.WebSocket = window.MozWebSocket;
        }
        if(window.WebSocket){
            socket = new WebSocket("ws://127.0.0.1:8091/ws");
            socket.onmessage = function(event){
                var ta = document.getElementById('responseText');
                ta.value += event.data+"\r\n";
            };
            socket.onopen = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服務器。。。。。。連接  \r\n";
            };
            socket.onclose = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服務器。。。。。。關閉 \r\n";
            };
        }else{
            alert("您的瀏覽器不支持WebSocket協議!");
        }
        function send(message){
            if(!window.WebSocket){return;}
            if(socket.readyState == WebSocket.OPEN){
                socket.send(message);
            }else{
                alert("WebSocket 連接沒有建立成功!");
            }
 
        }
 
    </script>
</head>
<body>
<form onSubmit="return false;">
    <label>TEXT</label><input type="text" name="message" value="這里輸入消息" 
    style="width: 1024px;height: 100px;"/> <br />
    <br /> <input type="button" value="發送ws消息"
                  onClick="send(this.form.message.value)" />
    <hr color="black" />
    <h3>服務端返回的應答消息</h3>
    <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
</form>
</body>
</html>

3. 測試

1. 啟動boot應用

2. 前端用兩個瀏覽器打開

3. 查看服務器端控制台:

2021-03-02 18:14:00.644 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 啟動正在監聽: /0:0:0:0:0:0:0:0:8091
2021-03-02 18:14:07.304 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客戶端連接: [id: 0x94e8a9ec, L:/127.0.0.1:8091 - R:/127.0.0.1:65288]
2021-03-02 18:14:18.861 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客戶端連接: [id: 0xaf71586a, L:/127.0.0.1:8091 - R:/127.0.0.1:65369]

3. 兩個控制台分別發幾條信息

查看兩個界面的服務器端消息:

(1) 第一個

 (2) 第二個:

 4. 查看服務器端日志

2021-03-02 18:14:00.644 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 啟動正在監聽: /0:0:0:0:0:0:0:0:8091
2021-03-02 18:14:07.304 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客戶端連接: [id: 0x94e8a9ec, L:/127.0.0.1:8091 - R:/127.0.0.1:65288]
2021-03-02 18:14:18.861 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客戶端連接: [id: 0xaf71586a, L:/127.0.0.1:8091 - R:/127.0.0.1:65369]
2021-03-02 18:15:20.947 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客戶端收到服務器數據, 客戶端地址: /127.0.0.1:65288, msg: 我說是什么
2021-03-02 18:15:20.948 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服務器轉發消息,客戶端地址: /127.0.0.1:65288, msg: 2021-03-02 18:15:20:我說是什么
2021-03-02 18:15:29.342 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客戶端收到服務器數據, 客戶端地址: /127.0.0.1:65369, msg: 我說不知道
2021-03-02 18:15:29.342 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服務器轉發消息,客戶端地址: /127.0.0.1:65369, msg: 2021-03-02 18:15:29:我說不知道
2021-03-02 18:15:34.745 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客戶端收到服務器數據, 客戶端地址: /127.0.0.1:65369, msg: 我說不知道個鬼
2021-03-02 18:15:34.746 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服務器轉發消息,客戶端地址: /127.0.0.1:65369, msg: 2021-03-02 18:15:34:我說不知道個鬼
2021-03-02 18:15:44.819 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客戶端收到服務器數據, 客戶端地址: /127.0.0.1:65288, msg: 你說身子
2021-03-02 18:15:44.820 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服務器轉發消息,客戶端地址: /127.0.0.1:65288, msg: 2021-03-02 18:15:44:你說身子

 

  接下來就是基於上面的代碼簡單的實現基於vue的聊天設計。

 

補充: 關於WebSocketServerProtocolHandler 這個處理器用於處理WebSocket請求套路

  驗證URL是否是WebSocke的URL,主要就是判斷創建時候傳進去的這個"/ws"。默認是根據equals來匹配,也可以通過參數來設置進行startWith 匹配,如下方法:

io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler#isNotWebSocketPath

    private boolean isNotWebSocketPath(FullHttpRequest req) {
        return checkStartsWith ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath);
    }

(1) 第一種:

socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));

  上面這中實際調了重載構造方法傳遞的checkStartsWith  為false

(2) 第二種: 也可以直接調用參數設置checkStartsWith  為true

socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10, true, true));

對應的構造方法是:

    public WebSocketServerProtocolHandler(String websocketPath, String subprotocols,
            boolean allowExtensions, int maxFrameSize, boolean allowMaskMismatch, boolean checkStartsWith) {
        this.websocketPath = websocketPath;
        this.subprotocols = subprotocols;
        this.allowExtensions = allowExtensions;
        maxFramePayloadLength = maxFrameSize;
        this.allowMaskMismatch = allowMaskMismatch;
        this.checkStartsWith = checkStartsWith;
    }

 補充: socket建立連接的時候我們希望獲取到用戶的標識信息,然后將用戶信息和channel維護起來

1. 調整MyChannelInitializer 中的handler

package com.xm.ggn.netty;


import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {

    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
        log.info("收到新的客戶端連接: {}", socketChannel.toString());
        // websocket協議本身是基於http協議的,所以這邊也要使用http解編碼器
        socketChannel.pipeline().addLast(new HttpServerCodec());
        // 以塊的方式來寫的處理器(添加對於讀寫大數據流的支持)
        socketChannel.pipeline().addLast(new ChunkedWriteHandler());
        // 對httpMessage進行聚合
        socketChannel.pipeline().addLast(new HttpObjectAggregator(8192));

        // ================= 上述是用於支持http協議的 =============

        // 添加自己的handler
        socketChannel.pipeline().addLast(new MyWebSocketHandler());

        // websocket 服務器處理的協議,用於給指定的客戶端進行連接訪問的路由地址
        // 這個主要就是驗證URL是否是WebSocke的URL,主要就是判斷創建時候傳進去的這個"/ws"。 下面四個參數的是比較路徑相等io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler.isNotWebSocketPath
//        socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
        // 也可以用下面參數用於比較startWith
        socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10, true, true));
    }
}

2. 修改MyWebSocketHandler 重寫channelRead 方法,注意不是channelRead0 方法

    /**
     * 處理建立連接時候請求(用於拿參數)
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (null != msg && msg instanceof FullHttpRequest) {
            log.info("連接請求,准備提取參數");
            //轉化為http請求
            FullHttpRequest request = (FullHttpRequest) msg;
            //拿到請求地址
            String uri = request.uri();
            log.info("uri: " + uri);
            if (StringUtils.isNotBlank(uri)) {
                String path = StringUtils.substringBefore(uri, "?");
                log.info("path: {}", path);
                String username = StringUtils.substringAfterLast(path, "/");
                log.info(username);
                channelMap.put(username, ctx.channel());
                log.info("channelMap: {}", channelMap);
            }

            //重新設置請求地址為WebSocketServerProtocolHandler 匹配的地址(如果WebSocketServerProtocolHandler 的時候checkStartsWith   為true則不需要設置,會根據前綴匹配)
//            request.setUri("/ws");
        }

        //接着建立請求
        super.channelRead(ctx, msg);
    }

3. 調整前端請求連接地址增加用戶姓名

<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd">
<html xmlns="http://www.w3.org/1999/xhtml">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
    <title>Netty-Websocket</title>
    <script type="text/javascript">
        var socket;
        if(!window.WebSocket){
            window.WebSocket = window.MozWebSocket;
        }
        if(window.WebSocket){
            socket = new WebSocket("ws://127.0.0.1:8091/ws/admin?username=admin");
            socket.onmessage = function(event){
                var ta = document.getElementById('responseText');
                ta.value += event.data+"\r\n";
            };
            socket.onopen = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服務器。。。。。。連接  \r\n";
            };
            socket.onclose = function(event){
                var ta = document.getElementById('responseText');
                ta.value = "Netty-WebSocket服務器。。。。。。關閉 \r\n";
            };
        }else{
            alert("您的瀏覽器不支持WebSocket協議!");
        }
        function send(message){
            if(!window.WebSocket){return;}
            if(socket.readyState == WebSocket.OPEN){
                socket.send(message);
            }else{
                alert("WebSocket 連接沒有建立成功!");
            }
 
        }
 
    </script>
</head>
<body>
<form onSubmit="return false;">
    <label>TEXT</label><input type="text" name="message" value="這里輸入消息" 
    style="width: 1024px;height: 100px;"/> <br />
    <br /> <input type="button" value="發送ws消息"
                  onClick="send(this.form.message.value)" />
    <hr color="black" />
    <h3>服務端返回的應答消息</h3>
    <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea>
</form>
</body>
</html>

4. 測試服務器端日志:(可以看到正確的拿到參數信息並且建立連接,也可以通過?傳遞參數)

2021-03-02 22:43:07.196 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 啟動正在監聽: /0:0:0:0:0:0:0:0:8091
2021-03-02 22:43:11.026 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客戶端連接: [id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269]
2021-03-02 22:43:12.702 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:47 - 添加新的channel, incomming: [id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269]
2021-03-02 22:43:12.928 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:85 - 連接請求,准備提取參數
2021-03-02 22:43:12.928 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:90 - uri: /ws/admin?username=admin
2021-03-02 22:43:12.934 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:93 - path: /ws/admin
2021-03-02 22:43:12.935 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:95 - admin
2021-03-02 22:43:12.935 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:97 - channelMap: {admin=[id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269]}

補充:整合Vue的jwchat實現聊天 ,jwchat是基於Elementui封裝的聊天插件

jwchat官網: https://codegi.gitee.io/jwchatdoc/

這里用了jwchat的兩個組件: JwChat-rightbox 展示在線用戶、JwChat-index 展示聊天窗口

0. 界面截圖如下:

1. 前端核心vue:

<template>
  <div class="dashboard-container">
    <div class="dashboard-text">
      <el-row>
        <el-col :span="6"
          ><div class="grid-content bg-purple">
            <JwChat-rightbox :config="onlineUsers" @click="rightClick" /></div
        ></el-col>

        <!-- 如果選擇了在線用戶顯示聊天窗口 -->
        <el-col :span="18" v-if="chatUserConfig.name != ''"
          ><div class="grid-content bg-purple-light">
            <JwChat-index
              :config="chatUserConfig"
              :showRightBox="true"
              :taleList="chatlogTaleList"
              @enter="bindEnter"
              v-model="inputMsg"
              :toolConfig="toolConfig"
              scrollType="scroll"
              @clickTalk="clickTalk"
            >
              <!-- 右邊插槽 -->
              <template>
                <h3>聊天愉快</h3>
              </template>
            </JwChat-index>
          </div></el-col
        >
      </el-row>
    </div>
  </div>
</template>

<script>
// other.png 表示對方頭像; myself.png 表示我自己
import { MessageBox } from "element-ui";
import {findCurrentUsername} from "@/utils/auth"

export default {
  data() {
    return {
      // 在線用戶相關信息
      onlineUsers: {
        tip: "選擇在線人開始聊天",
        listTip: "當前在線",
        list: [],
      },

      // 輸入框內默認的消息
      inputMsg: "",
      // 聊天記錄
      chatlogTaleList: [
        // {
        //   date: "2020/04/25 21:19:07",
        //   text: { text: "起床不" },
        //   mine: false,
        //   name: "留戀人間不羡仙",
        //   img: "/images/other.png",
        // }
      ],
      // 展示的工具欄配置
      toolConfig: {
        // show: ['file', 'history', 'img', ['文件1', '', '美圖']],
        show: null, // 關閉所有其他組件
        showEmoji: true,
        callback: this.toolEvent,
      },

      // 正在聊天的用戶的信息
      chatUserConfig: {
        img: "/images/other.png",
        name: "",
        username: "",
        fullname: "",
        dept: "大部門",
        callback: this.bindCover,
        historyConfig: {
          show: true,
          tip: "加載更多",
          callback: this.bindLoadHistory,
        },
      },

      // 當前用戶信息
      currentUser: {
        username: "",
        fullname: "",
      },

      socket: new Object(),
    };
  },

  created() {
    this.listOnlineUsers();
    this.findCurrentUserInfo();
    this.webSocket();
  },

  methods: {
    webSocket() {
      // 先記錄this對象
      const that = this;
      if (typeof WebSocket == "undefined") {
        MessageBox.alert("瀏覽器暫不支持聊天", "提示信息");
      } else {
        // 實例化socket,這里我把用戶名傳給了后台,使后台能判斷要把消息發給哪個用戶,其實也可以后台直接獲取用戶IP來判斷並推送
        const socketUrl = "ws://127.0.0.1:8091/ws/" + findCurrentUsername();
        this.socket = new WebSocket(socketUrl);
        // 監聽socket打開
        this.socket.onopen = function () {
          console.log("瀏覽器WebSocket已打開");
        };
        // 監聽socket消息接收
        this.socket.onmessage = function (messageEvent) {
          // 轉換為json對象然后添加到chatlogTaleList
          let receivedLog = JSON.parse(messageEvent.data);
          console.log(receivedLog);
          let receivedLogs = new Array();
          receivedLogs.push(receivedLog);
          receivedLogs = that.rehandleChatLogs(receivedLogs);
          if (!that.chatlogTaleList) {
            that.chatlogTaleList = new Array();  
          }
          that.chatlogTaleList = that.chatlogTaleList.concat(receivedLogs);
        };
        // 監聽socket錯誤
        this.socket.onerror = function () {};
        // 監聽socket關閉
        this.socket.onclose = function () {
          MessageBox.alert("WebSocket已關閉");
        };
      }
    },
    // 查詢當前用戶信息
    findCurrentUserInfo() {
      let url = "/user/getInfo";
      this.$http.post(url).then((res) => {
        this.currentUser = res.data;
      });
    },
    // 發送websocket 消息
    send(message) {
      if (!window.WebSocket) {
        return;
      }

      // 封裝消息,然后發送消息
      const chatLog = {
        sendUsername: this.currentUser.username,
        sendFullname: this.currentUser.fullname,
        receiveUsername: this.chatUserConfig.username,
        receiveFullname: this.chatUserConfig.fullname,
        content: message,
        readed: false,
      };
      let socket = this.socket;
      if (socket.readyState == WebSocket.OPEN) {
        socket.send(JSON.stringify(chatLog));
      } else {
        MessageBox.alert("WebSocket 連接沒有建立成功!");
      }
    },

    // 獲取在線用戶(有在線用戶的情況下賦值到右邊窗口)
    listOnlineUsers() {
      let url = "/user/listOnlineUser";
      this.$http.get(url).then((res) => {
        var onlineUsers = res.data;
        if (!onlineUsers || onlineUsers.length < 1) {
          return;
        }

        onlineUsers.forEach((element) => {
          element.name = element.username;
          element.img = "/images/cover.png";
        });
        this.onlineUsers.list = onlineUsers;
      });
    },
    // 點擊在線人事件
    rightClick(type) {
      // 1.賦值給聊天人信息
      let chatUser = type.value;
      this.chatUserConfig.name = chatUser.fullname;
      this.chatUserConfig.username = chatUser.username;
      this.chatUserConfig.fullname = chatUser.fullname;
      // 2. 查詢聊天記錄
      let listChatlogurl = "/chat/log/list";
      let requestVO = {
        sendUsername: this.currentUser.username,
        receiveUsername: this.chatUserConfig.username,
        queryChangeRole: true,
      };
      this.$http.post(listChatlogurl, requestVO).then((res) => {
        this.chatlogTaleList = this.rehandleChatLogs(res.data);
      });
    },
    // 重新處理聊天記錄, 主要是做特殊標記以及設置圖像等操作
    rehandleChatLogs(chatlogs) {
      if (!chatlogs || chatlogs.length < 1) {
        return new Array();
      }

      chatlogs.forEach((element) => {
        element.date = element.createtimeStr;
        element.name = element.sendFullname;
        // 聊天內容(如下為設置文本,也可以設置其他video、圖片等)
        element.text = new Object();
        element.text.text = element.content;
        if (element.sendUsername == this.currentUser.username) {
          element.mine = true;
          element.img = "/images/myself.png";
        } else {
          element.mine = false;
          element.img = "/images/other.png";
        }
      });
      return chatlogs;
    },

    // 點擊左上角用戶名稱事件
    clickTalk(obj) {
      console.log(obj);
    },
    // 點擊發送或者回車事件
    bindEnter(obj) {
      const msg = this.inputMsg;
      if (!msg) {
        MessageBox.alert("您不能發送空消息");
        return;
      }
      // 發送消息
      this.send(msg);
    },
    /**
     * @description:
     * @param {*} type 當前點擊的按鈕
     * @param {*} plyload 附加文件或者需要處理的數據
     * @return {*}
     */
    toolEvent(type, plyload) {
      console.log("tools", type, plyload);
    },
    /**
     * @description: 點擊加載更多的回調函數
     * @param {*}
     * @return {*}
     */
    bindLoadHistory() {
      const history = new Array(3).fill().map((i, j) => {
        return {
          date: "2020/05/20 23:19:07",
          text: { text: j + new Date() },
          mine: false,
          name: "JwChat",
          img: "image/three.jpeg",
        };
      });
      let list = history.concat(this.list);
      this.list = list;
    },
    bindCover(type) {
      console.log("header", type);
    },
  },
};
</script>

涉及到主要邏輯:

(1)  點擊頁面進行如下操作:

1》創建WebSocket連接,創建WebSocket的時候將當前的用戶名傳到后端,后端記錄當前用戶名與連接到的netty的channel

2》 查詢當前在線用戶,並且展示到JwChat-rightbox 列表內。(也可以展示所有用戶、如果有群聊體系展示所有的群)

(2) 點擊在線用戶的時候獲取到在線用戶的信息並記錄下來,然后用ajax異步獲取聊天記錄(后台根據發布者和接收者按時間升序排序),然后前台根據聊天記錄做對應的轉換。這里是ajax獲取,也可以用websocket拿,對發送的消息做處理,后端接收到消息處理對應的業務即可

(3) 聊天的時候判斷是否輸入有信息,有信息的時候將信息包裝一下(增加發送者、接收者信息)發到后端,后端存入數據庫之后再發送到對應的channel返回給前端,前端接收到后做處理完加入聊天記錄數組展示在界面

2. 后端主要文件:

(1) 聊天記錄表

package com.xm.ggn.bean.chat;

import com.xm.ggn.bean.AbstractSequenceEntity;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import javax.persistence.Entity;

/**
 * 聊天記錄
 */
@Entity
@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString(callSuper = true)
public class ChatLog extends AbstractSequenceEntity {

    private String sendUsername;

    private String sendFullname;

    private String receiveUsername;

    private String receiveFullname;

    private String content;

    private String remark;

    /**
     * 是否已讀
     */
    private boolean readed;
}

包含繼承的通用字段:

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    @TableId(type = IdType.AUTO) // 增加該注解,mybatis plus insert之后會給bean設上Id
    protected long id;

    /**
     * 創建者
     */
    @Index(name = "creator")
    @TableField(update = "%s")
    protected String creator;

    /**
     * 唯一編號
     */
    @Index(name = "uniqueCode")
    @TableField(update = "%s")
    protected String uniqueCode;

    /**
     * 創建時間
     */
    @Index(name = "createtime")
    @TableField(update = "%s")
    protected Date createtime;

(2) MyWebSocketHandler消息處理者類:

package com.xm.ggn.netty;

import com.alibaba.fastjson.JSONObject;
import com.xm.ggn.bean.chat.ChatLog;
import com.xm.ggn.service.chat.ChatLogService;
import com.xm.ggn.utils.system.SpringBootUtils;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * 自定義服務器端處理handler,繼承SimpleChannelInboundHandler,處理WebSocket 連接數據
 */
@Slf4j
public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    // 用戶id=>channel示例
    // 可以通過用戶的唯一標識保存用戶的channel
    // 這樣就可以發送給指定的用戶
    public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>();

    /**
     * 每當服務端收到新的客戶端連接時,客戶端的channel存入ChannelGroup列表中,並通知列表中其他客戶端channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        // 獲取連接的channel
        Channel incomming = ctx.channel();
        //通知所有已經連接到服務器的客戶端,有一個新的通道加入
        /*for(Channel channel:channelGroup){
            channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入\n");
        }*/
        channelGroup.add(incomming);
        log.info("添加新的channel, incomming: {}", incomming);
    }

    /**
     * 每當服務端斷開客戶端連接時,客戶端的channel從ChannelGroup中移除,並通知列表中其他客戶端channel
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //獲取連接的channel
        /*Channel incomming = ctx.channel();
        for(Channel channel:channelGroup){
            channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"離開\n");
        }*/
        //從服務端的channelGroup中移除當前離開的客戶端
        channelGroup.remove(ctx.channel());

        //從服務端的channelMap中移除當前離開的客戶端
        Collection<Channel> col = channelMap.values();
        while (true == col.contains(ctx.channel())) {
            col.remove(ctx.channel());
            log.info("netty客戶端連接刪除成功!");
        }

    }

    /**
     * 處理建立連接時候請求(用於拿參數)
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (null != msg && msg instanceof FullHttpRequest) {
            log.info("連接請求,准備提取參數");
            //轉化為http請求
            FullHttpRequest request = (FullHttpRequest) msg;
            //拿到請求地址
            String uri = request.uri();
            log.info("uri: " + uri);
            if (StringUtils.isNotBlank(uri)) {
                String path = StringUtils.substringBefore(uri, "?");
                log.info("path: {}", path);
                String username = StringUtils.substringAfterLast(path, "/");
                log.info(username);
                channelMap.put(username, ctx.channel());
                log.info("channelMap: {}", channelMap);
            }

            //重新設置請求地址為WebSocketServerProtocolHandler 匹配的地址(如果WebSocketServerProtocolHandler 的時候checkStartsWith   為true則不需要設置,會根據前綴匹配)
//            request.setUri("/ws");
        }

        //接着建立請求
        super.channelRead(ctx, msg);
    }

    /**
     * 每當從服務端讀到客戶端寫入信息時,將信息轉發給其他客戶端的Channel.
     *
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
        log.info("netty客戶端收到服務器數據, 客戶端地址: {}, msg: {}", ctx.channel().remoteAddress(), msg.text());
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        //消息處理類
        handleMessage(ctx, msg.text(), date);
        //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text()));
    }

    /**
     * 當服務端的IO 拋出異常時被調用
     *
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        Channel incoming = ctx.channel();
        log.error("SimpleChatClient:" + incoming.remoteAddress() + "異常", cause);
        //異常出現就關閉連接
        ctx.close();
    }

    /**
     * 處理讀取到的消息
     *
     * @param ctx
     * @param msg
     * @param date
     */
    private void handleMessage(ChannelHandlerContext ctx, String msg, String date) {
        try {
            // 消息入庫
            ChatLog chatLog = JSONObject.parseObject(msg, ChatLog.class);
            log.info("chatLog: {}", chatLog);
            ChatLogService chatLogService = SpringBootUtils.getBean(ChatLogService.class);
            chatLogService.insert(chatLog);

            // 消息轉發給對應用戶(發給發送者和接收者)
            String receiveUsername = chatLog.getReceiveUsername();
            String sendUsername = chatLog.getSendUsername();
            Set<Map.Entry<String, Channel>> entries = channelMap.entrySet();
            String key = null;
            for (Map.Entry<String, Channel> entry : entries) {
                key = entry.getKey();
                if (key.equals(receiveUsername) || key.equals(sendUsername)) {
                    log.info("服務器轉發消息, key: {}, msg: {}", key, JSONObject.toJSONString(chatLog));
                    entry.getValue().writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(chatLog)));
                }
            }
        } catch (Exception e) {
            log.error("message 處理異常, msg: {}, date: {}", msg, date, e);
        }
    }
}

   這個只是簡單的實現了在線用戶的單聊,如果要做的好可以添加通訊錄功能、群聊,其實這個就是發送消息的時候接受者是群號等。待有這方面需求的時候會繼續完善。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM