簡單的實現聊天,發送至服務器端之后由服務器轉發給其他在線的用戶。
1. pom
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>cn.qz</groupId> <artifactId>xm</artifactId> <version>0.0.1-SNAPSHOT</version> <name>blog</name> <description>blog-server</description> <packaging>jar</packaging> <properties> <java.version>1.8</java.version> <!--<maven-jar-plugin.version>3.2.0</maven-jar-plugin.version> --> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13</version> <scope>test</scope> </dependency> <!-- spring-boot整合mybatis-plus --> <dependency> <groupId>com.baomidou</groupId> <artifactId>mybatis-plus-boot-starter</artifactId> <version>2.3</version> </dependency> <dependency> <groupId>com.github.pagehelper</groupId> <artifactId>pagehelper</artifactId> <version>5.1.2</version> </dependency> <!-- spring-boot整合mysql --> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.49</version> </dependency> <!-- 引入 redis 依賴 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- spring-boot整合druid --> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.1.22</version> </dependency> <!-- 使用事務需要引入這個包 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <!-- 引入 spring aop 依賴 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-aop</artifactId> </dependency> <!-- commons工具包 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-collections4</artifactId> <version>4.4</version> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.4</version> </dependency> <dependency> <groupId>commons-collections</groupId> <artifactId>commons-collections</artifactId> <version>3.2</version> </dependency> <!-- 阿里的fastjson用於手動轉JSON --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.56</version> </dependency> <!--httpclient相關包 --> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.3.1</version> </dependency> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpmime</artifactId> <version>4.3.1</version> </dependency> <!--tika解析文本內容 --> <dependency> <groupId>org.apache.tika</groupId> <artifactId>tika-parsers</artifactId> <version>1.17</version> </dependency> <!--POI --> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>3.16</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>3.16</version> </dependency> <!-- springdata jpa依賴 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-jpa</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.jsoup/jsoup --> <dependency> <groupId>org.jsoup</groupId> <artifactId>jsoup</artifactId> <version>1.12.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.15.0</version> </dependency> <!--netty--> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.11.Final</version> </dependency> <!-- poi依賴 <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.poi</groupId> <artifactId>poi-ooxml</artifactId> <version>RELEASE</version> </dependency> --> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.8.1</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> </configuration> </plugin> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> <dependencies> <!-- spring熱部署 --> <!-- 該依賴在此處下載不下來,可以放置在build標簽外部下載完成后再粘貼進plugin中 --> <dependency> <groupId>org.springframework</groupId> <artifactId>springloaded</artifactId> <version>1.2.6.RELEASE</version> </dependency> </dependencies> <configuration> <fork>true</fork> </configuration> </plugin> <!-- 要將源碼放上去,需要加入這個插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <executions> <execution> <id>attach-sources</id> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <!-- 執行Junit測試(測試所有類) --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-surefire-plugin</artifactId> <version>2.10</version> <configuration> <includes> <!--<include>*\*\*\*Test.java</include>--> <include>**\*</include> </includes> </configuration> </plugin> </plugins> </build> </project>
核心是netty-all, 其他依賴按需引入即可
2. 主要類信息
1. 服務端程序
package com.xm.ggn.netty; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; @Slf4j public class NettyServer { private final int port; public NettyServer(int port) { this.port = port; } public void start() throws Exception { // 修改bossGroup的數量,2線程足夠用 EventLoopGroup bossGroup = new NioEventLoopGroup(2); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap sb = new ServerBootstrap(); sb.option(ChannelOption.SO_BACKLOG, 1024); sb.group(workerGroup, bossGroup) // 綁定線程池 .channel(NioServerSocketChannel.class) // 指定使用的channel .localAddress(this.port)// 綁定監聽端口 .childHandler(new MyChannelInitializer()); ChannelFuture cf = sb.bind().sync(); // 服務器異步創建綁定 log.info(NettyServer.class + " 啟動正在監聽: " + cf.channel().localAddress()); cf.channel().closeFuture().sync(); // 關閉服務器通道 } finally { workerGroup.shutdownGracefully().sync(); // 釋放線程池資源 bossGroup.shutdownGracefully().sync(); } } }
2. Initializer
package com.xm.ggn.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class MyChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { log.info("收到新的客戶端連接: {}", socketChannel.toString()); // websocket協議本身是基於http協議的,所以這邊也要使用http解編碼器 socketChannel.pipeline().addLast(new HttpServerCodec()); // 以塊的方式來寫的處理器(添加對於讀寫大數據流的支持) socketChannel.pipeline().addLast(new ChunkedWriteHandler()); // 對httpMessage進行聚合 socketChannel.pipeline().addLast(new HttpObjectAggregator(8192)); // ================= 上述是用於支持http協議的 ============= // websocket 服務器處理的協議,用於給指定的客戶端進行連接訪問的路由地址 socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10)); // 添加自己的handler socketChannel.pipeline().addLast(new MyWebSocketHandler()); } }
3.handler
package com.xm.ggn.netty; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; import java.util.concurrent.ConcurrentHashMap; /** * 自定義服務器端處理handler,繼承SimpleChannelInboundHandler,處理WebSocket 連接數據 */ @Slf4j public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 用戶id=>channel示例 // 可以通過用戶的唯一標識保存用戶的channel // 這樣就可以發送給指定的用戶 public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(); /** * 每當服務端收到新的客戶端連接時,客戶端的channel存入ChannelGroup列表中,並通知列表中其他客戶端channel * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 獲取連接的channel Channel incomming = ctx.channel(); //通知所有已經連接到服務器的客戶端,有一個新的通道加入 /*for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入\n"); }*/ channelGroup.add(incomming); } /** * 每當服務端斷開客戶端連接時,客戶端的channel從ChannelGroup中移除,並通知列表中其他客戶端channel * * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //獲取連接的channel /*Channel incomming = ctx.channel(); for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"離開\n"); }*/ //從服務端的channelGroup中移除當前離開的客戶端 channelGroup.remove(ctx.channel()); //從服務端的channelMap中移除當前離開的客戶端 Collection<Channel> col = channelMap.values(); while (true == col.contains(ctx.channel())) { col.remove(ctx.channel()); log.info("netty客戶端連接刪除成功!"); } } /** * 每當從服務端讀到客戶端寫入信息時,將信息轉發給其他客戶端的Channel. * * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { log.info("netty客戶端收到服務器數據, 客戶端地址: {}, msg: {}", ctx.channel().remoteAddress(), msg.text()); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //消息處理類 message(ctx, msg.text(), date); //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text())); } /** * 當服務端的IO 拋出異常時被調用 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); log.error("SimpleChatClient:" + incoming.remoteAddress() + "異常", cause); //異常出現就關閉連接 ctx.close(); } //消息處理類 public void message(ChannelHandlerContext ctx, String msg, String date) { try { // 消息轉發給在線的其他用戶 Channel channel = ctx.channel(); for (Channel tmpChannel : channelGroup) { if (!tmpChannel.equals(channel)) { String sendedMsg = date + ":" + msg; log.info("服務器轉發消息,客戶端地址: {}, msg: {}", ctx.channel().remoteAddress(), sendedMsg); tmpChannel.writeAndFlush(new TextWebSocketFrame(sendedMsg)); } } } catch (Exception e) { log.error("message 處理異常, msg: {}, date: {}", msg, date, e); } } }
4. Springboot主啟動類: 也可以將啟動nettyServer代碼移動至監聽Spring容器啟動事件類中
package com.xm.ggn; import com.xm.ggn.netty.NettyServer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.scheduling.annotation.EnableAsync; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @ServletComponentScan("com") @EntityScan(basePackages = {"com"}) @EnableScheduling // 允許通過AopContext.currentProxy() 獲取代理類 @EnableAspectJAutoProxy(proxyTargetClass = true, exposeProxy = true) @EnableAsync public class BlogApplication { public static void main(String[] args) { SpringApplication.run(BlogApplication.class, args); // 啟動netty服務器 try { new NettyServer(8091).start(); } catch (Exception e) { System.out.println("NettyServerError:" + e.getMessage()); } } }
5. 前端就用HTML界面簡單的測試
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>Netty-Websocket</title> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://127.0.0.1:8091/ws"); socket.onmessage = function(event){ var ta = document.getElementById('responseText'); ta.value += event.data+"\r\n"; }; socket.onopen = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服務器。。。。。。連接 \r\n"; }; socket.onclose = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服務器。。。。。。關閉 \r\n"; }; }else{ alert("您的瀏覽器不支持WebSocket協議!"); } function send(message){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("WebSocket 連接沒有建立成功!"); } } </script> </head> <body> <form onSubmit="return false;"> <label>TEXT</label><input type="text" name="message" value="這里輸入消息" style="width: 1024px;height: 100px;"/> <br /> <br /> <input type="button" value="發送ws消息" onClick="send(this.form.message.value)" /> <hr color="black" /> <h3>服務端返回的應答消息</h3> <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea> </form> </body> </html>
3. 測試
1. 啟動boot應用
2. 前端用兩個瀏覽器打開
3. 查看服務器端控制台:
2021-03-02 18:14:00.644 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 啟動正在監聽: /0:0:0:0:0:0:0:0:8091 2021-03-02 18:14:07.304 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客戶端連接: [id: 0x94e8a9ec, L:/127.0.0.1:8091 - R:/127.0.0.1:65288] 2021-03-02 18:14:18.861 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客戶端連接: [id: 0xaf71586a, L:/127.0.0.1:8091 - R:/127.0.0.1:65369]
3. 兩個控制台分別發幾條信息
查看兩個界面的服務器端消息:
(1) 第一個
(2) 第二個:
4. 查看服務器端日志
2021-03-02 18:14:00.644 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 啟動正在監聽: /0:0:0:0:0:0:0:0:8091 2021-03-02 18:14:07.304 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客戶端連接: [id: 0x94e8a9ec, L:/127.0.0.1:8091 - R:/127.0.0.1:65288] 2021-03-02 18:14:18.861 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客戶端連接: [id: 0xaf71586a, L:/127.0.0.1:8091 - R:/127.0.0.1:65369] 2021-03-02 18:15:20.947 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客戶端收到服務器數據, 客戶端地址: /127.0.0.1:65288, msg: 我說是什么 2021-03-02 18:15:20.948 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服務器轉發消息,客戶端地址: /127.0.0.1:65288, msg: 2021-03-02 18:15:20:我說是什么 2021-03-02 18:15:29.342 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客戶端收到服務器數據, 客戶端地址: /127.0.0.1:65369, msg: 我說不知道 2021-03-02 18:15:29.342 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服務器轉發消息,客戶端地址: /127.0.0.1:65369, msg: 2021-03-02 18:15:29:我說不知道 2021-03-02 18:15:34.745 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客戶端收到服務器數據, 客戶端地址: /127.0.0.1:65369, msg: 我說不知道個鬼 2021-03-02 18:15:34.746 | cmdb - INFO | nioEventLoopGroup-4-2 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服務器轉發消息,客戶端地址: /127.0.0.1:65369, msg: 2021-03-02 18:15:34:我說不知道個鬼 2021-03-02 18:15:44.819 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:82 - netty客戶端收到服務器數據, 客戶端地址: /127.0.0.1:65288, msg: 你說身子 2021-03-02 18:15:44.820 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:113 - 服務器轉發消息,客戶端地址: /127.0.0.1:65288, msg: 2021-03-02 18:15:44:你說身子
接下來就是基於上面的代碼簡單的實現基於vue的聊天設計。
補充: 關於WebSocketServerProtocolHandler 這個處理器用於處理WebSocket請求套路
驗證URL是否是WebSocke的URL,主要就是判斷創建時候傳進去的這個"/ws"。默認是根據equals來匹配,也可以通過參數來設置進行startWith 匹配,如下方法:
io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler#isNotWebSocketPath
private boolean isNotWebSocketPath(FullHttpRequest req) { return checkStartsWith ? !req.uri().startsWith(websocketPath) : !req.uri().equals(websocketPath); }
(1) 第一種:
socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10));
上面這中實際調了重載構造方法傳遞的checkStartsWith 為false
(2) 第二種: 也可以直接調用參數設置checkStartsWith 為true
socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10, true, true));
對應的構造方法是:
public WebSocketServerProtocolHandler(String websocketPath, String subprotocols, boolean allowExtensions, int maxFrameSize, boolean allowMaskMismatch, boolean checkStartsWith) { this.websocketPath = websocketPath; this.subprotocols = subprotocols; this.allowExtensions = allowExtensions; maxFramePayloadLength = maxFrameSize; this.allowMaskMismatch = allowMaskMismatch; this.checkStartsWith = checkStartsWith; }
補充: socket建立連接的時候我們希望獲取到用戶的標識信息,然后將用戶信息和channel維護起來
1. 調整MyChannelInitializer 中的handler
package com.xm.ggn.netty; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.stream.ChunkedWriteHandler; import lombok.extern.slf4j.Slf4j; @Slf4j public class MyChannelInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { log.info("收到新的客戶端連接: {}", socketChannel.toString()); // websocket協議本身是基於http協議的,所以這邊也要使用http解編碼器 socketChannel.pipeline().addLast(new HttpServerCodec()); // 以塊的方式來寫的處理器(添加對於讀寫大數據流的支持) socketChannel.pipeline().addLast(new ChunkedWriteHandler()); // 對httpMessage進行聚合 socketChannel.pipeline().addLast(new HttpObjectAggregator(8192)); // ================= 上述是用於支持http協議的 ============= // 添加自己的handler socketChannel.pipeline().addLast(new MyWebSocketHandler()); // websocket 服務器處理的協議,用於給指定的客戶端進行連接訪問的路由地址 // 這個主要就是驗證URL是否是WebSocke的URL,主要就是判斷創建時候傳進去的這個"/ws"。 下面四個參數的是比較路徑相等io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandshakeHandler.isNotWebSocketPath // socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10)); // 也可以用下面參數用於比較startWith socketChannel.pipeline().addLast(new WebSocketServerProtocolHandler("/ws", "WebSocket", true, 65536 * 10, true, true)); } }
2. 修改MyWebSocketHandler 重寫channelRead 方法,注意不是channelRead0 方法
/** * 處理建立連接時候請求(用於拿參數) * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (null != msg && msg instanceof FullHttpRequest) { log.info("連接請求,准備提取參數"); //轉化為http請求 FullHttpRequest request = (FullHttpRequest) msg; //拿到請求地址 String uri = request.uri(); log.info("uri: " + uri); if (StringUtils.isNotBlank(uri)) { String path = StringUtils.substringBefore(uri, "?"); log.info("path: {}", path); String username = StringUtils.substringAfterLast(path, "/"); log.info(username); channelMap.put(username, ctx.channel()); log.info("channelMap: {}", channelMap); } //重新設置請求地址為WebSocketServerProtocolHandler 匹配的地址(如果WebSocketServerProtocolHandler 的時候checkStartsWith 為true則不需要設置,會根據前綴匹配) // request.setUri("/ws"); } //接着建立請求 super.channelRead(ctx, msg); }
3. 調整前端請求連接地址增加用戶姓名
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title>Netty-Websocket</title> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://127.0.0.1:8091/ws/admin?username=admin"); socket.onmessage = function(event){ var ta = document.getElementById('responseText'); ta.value += event.data+"\r\n"; }; socket.onopen = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服務器。。。。。。連接 \r\n"; }; socket.onclose = function(event){ var ta = document.getElementById('responseText'); ta.value = "Netty-WebSocket服務器。。。。。。關閉 \r\n"; }; }else{ alert("您的瀏覽器不支持WebSocket協議!"); } function send(message){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("WebSocket 連接沒有建立成功!"); } } </script> </head> <body> <form onSubmit="return false;"> <label>TEXT</label><input type="text" name="message" value="這里輸入消息" style="width: 1024px;height: 100px;"/> <br /> <br /> <input type="button" value="發送ws消息" onClick="send(this.form.message.value)" /> <hr color="black" /> <h3>服務端返回的應答消息</h3> <textarea id="responseText" style="width: 1024px;height: 300px;"></textarea> </form> </body> </html>
4. 測試服務器端日志:(可以看到正確的拿到參數信息並且建立連接,也可以通過?傳遞參數)
2021-03-02 22:43:07.196 | cmdb - INFO | main | com.xm.ggn.netty.NettyServer | line:32 - class com.xm.ggn.netty.NettyServer 啟動正在監聽: /0:0:0:0:0:0:0:0:8091 2021-03-02 22:43:11.026 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyChannelInitializer | line:17 - 收到新的客戶端連接: [id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269] 2021-03-02 22:43:12.702 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:47 - 添加新的channel, incomming: [id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269] 2021-03-02 22:43:12.928 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:85 - 連接請求,准備提取參數 2021-03-02 22:43:12.928 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:90 - uri: /ws/admin?username=admin 2021-03-02 22:43:12.934 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:93 - path: /ws/admin 2021-03-02 22:43:12.935 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:95 - admin 2021-03-02 22:43:12.935 | cmdb - INFO | nioEventLoopGroup-4-1 | com.xm.ggn.netty.MyWebSocketHandler | line:97 - channelMap: {admin=[id: 0xa9fd1310, L:/127.0.0.1:8091 - R:/127.0.0.1:50269]}
補充:整合Vue的jwchat實現聊天 ,jwchat是基於Elementui封裝的聊天插件
jwchat官網: https://codegi.gitee.io/jwchatdoc/
這里用了jwchat的兩個組件: JwChat-rightbox 展示在線用戶、JwChat-index 展示聊天窗口
0. 界面截圖如下:
1. 前端核心vue:
<template> <div class="dashboard-container"> <div class="dashboard-text"> <el-row> <el-col :span="6" ><div class="grid-content bg-purple"> <JwChat-rightbox :config="onlineUsers" @click="rightClick" /></div ></el-col> <!-- 如果選擇了在線用戶顯示聊天窗口 --> <el-col :span="18" v-if="chatUserConfig.name != ''" ><div class="grid-content bg-purple-light"> <JwChat-index :config="chatUserConfig" :showRightBox="true" :taleList="chatlogTaleList" @enter="bindEnter" v-model="inputMsg" :toolConfig="toolConfig" scrollType="scroll" @clickTalk="clickTalk" > <!-- 右邊插槽 --> <template> <h3>聊天愉快</h3> </template> </JwChat-index> </div></el-col > </el-row> </div> </div> </template> <script> // other.png 表示對方頭像; myself.png 表示我自己 import { MessageBox } from "element-ui"; import {findCurrentUsername} from "@/utils/auth" export default { data() { return { // 在線用戶相關信息 onlineUsers: { tip: "選擇在線人開始聊天", listTip: "當前在線", list: [], }, // 輸入框內默認的消息 inputMsg: "", // 聊天記錄 chatlogTaleList: [ // { // date: "2020/04/25 21:19:07", // text: { text: "起床不" }, // mine: false, // name: "留戀人間不羡仙", // img: "/images/other.png", // } ], // 展示的工具欄配置 toolConfig: { // show: ['file', 'history', 'img', ['文件1', '', '美圖']], show: null, // 關閉所有其他組件 showEmoji: true, callback: this.toolEvent, }, // 正在聊天的用戶的信息 chatUserConfig: { img: "/images/other.png", name: "", username: "", fullname: "", dept: "大部門", callback: this.bindCover, historyConfig: { show: true, tip: "加載更多", callback: this.bindLoadHistory, }, }, // 當前用戶信息 currentUser: { username: "", fullname: "", }, socket: new Object(), }; }, created() { this.listOnlineUsers(); this.findCurrentUserInfo(); this.webSocket(); }, methods: { webSocket() { // 先記錄this對象 const that = this; if (typeof WebSocket == "undefined") { MessageBox.alert("瀏覽器暫不支持聊天", "提示信息"); } else { // 實例化socket,這里我把用戶名傳給了后台,使后台能判斷要把消息發給哪個用戶,其實也可以后台直接獲取用戶IP來判斷並推送 const socketUrl = "ws://127.0.0.1:8091/ws/" + findCurrentUsername(); this.socket = new WebSocket(socketUrl); // 監聽socket打開 this.socket.onopen = function () { console.log("瀏覽器WebSocket已打開"); }; // 監聽socket消息接收 this.socket.onmessage = function (messageEvent) { // 轉換為json對象然后添加到chatlogTaleList let receivedLog = JSON.parse(messageEvent.data); console.log(receivedLog); let receivedLogs = new Array(); receivedLogs.push(receivedLog); receivedLogs = that.rehandleChatLogs(receivedLogs); if (!that.chatlogTaleList) { that.chatlogTaleList = new Array(); } that.chatlogTaleList = that.chatlogTaleList.concat(receivedLogs); }; // 監聽socket錯誤 this.socket.onerror = function () {}; // 監聽socket關閉 this.socket.onclose = function () { MessageBox.alert("WebSocket已關閉"); }; } }, // 查詢當前用戶信息 findCurrentUserInfo() { let url = "/user/getInfo"; this.$http.post(url).then((res) => { this.currentUser = res.data; }); }, // 發送websocket 消息 send(message) { if (!window.WebSocket) { return; } // 封裝消息,然后發送消息 const chatLog = { sendUsername: this.currentUser.username, sendFullname: this.currentUser.fullname, receiveUsername: this.chatUserConfig.username, receiveFullname: this.chatUserConfig.fullname, content: message, readed: false, }; let socket = this.socket; if (socket.readyState == WebSocket.OPEN) { socket.send(JSON.stringify(chatLog)); } else { MessageBox.alert("WebSocket 連接沒有建立成功!"); } }, // 獲取在線用戶(有在線用戶的情況下賦值到右邊窗口) listOnlineUsers() { let url = "/user/listOnlineUser"; this.$http.get(url).then((res) => { var onlineUsers = res.data; if (!onlineUsers || onlineUsers.length < 1) { return; } onlineUsers.forEach((element) => { element.name = element.username; element.img = "/images/cover.png"; }); this.onlineUsers.list = onlineUsers; }); }, // 點擊在線人事件 rightClick(type) { // 1.賦值給聊天人信息 let chatUser = type.value; this.chatUserConfig.name = chatUser.fullname; this.chatUserConfig.username = chatUser.username; this.chatUserConfig.fullname = chatUser.fullname; // 2. 查詢聊天記錄 let listChatlogurl = "/chat/log/list"; let requestVO = { sendUsername: this.currentUser.username, receiveUsername: this.chatUserConfig.username, queryChangeRole: true, }; this.$http.post(listChatlogurl, requestVO).then((res) => { this.chatlogTaleList = this.rehandleChatLogs(res.data); }); }, // 重新處理聊天記錄, 主要是做特殊標記以及設置圖像等操作 rehandleChatLogs(chatlogs) { if (!chatlogs || chatlogs.length < 1) { return new Array(); } chatlogs.forEach((element) => { element.date = element.createtimeStr; element.name = element.sendFullname; // 聊天內容(如下為設置文本,也可以設置其他video、圖片等) element.text = new Object(); element.text.text = element.content; if (element.sendUsername == this.currentUser.username) { element.mine = true; element.img = "/images/myself.png"; } else { element.mine = false; element.img = "/images/other.png"; } }); return chatlogs; }, // 點擊左上角用戶名稱事件 clickTalk(obj) { console.log(obj); }, // 點擊發送或者回車事件 bindEnter(obj) { const msg = this.inputMsg; if (!msg) { MessageBox.alert("您不能發送空消息"); return; } // 發送消息 this.send(msg); }, /** * @description: * @param {*} type 當前點擊的按鈕 * @param {*} plyload 附加文件或者需要處理的數據 * @return {*} */ toolEvent(type, plyload) { console.log("tools", type, plyload); }, /** * @description: 點擊加載更多的回調函數 * @param {*} * @return {*} */ bindLoadHistory() { const history = new Array(3).fill().map((i, j) => { return { date: "2020/05/20 23:19:07", text: { text: j + new Date() }, mine: false, name: "JwChat", img: "image/three.jpeg", }; }); let list = history.concat(this.list); this.list = list; }, bindCover(type) { console.log("header", type); }, }, }; </script>
涉及到主要邏輯:
(1) 點擊頁面進行如下操作:
1》創建WebSocket連接,創建WebSocket的時候將當前的用戶名傳到后端,后端記錄當前用戶名與連接到的netty的channel
2》 查詢當前在線用戶,並且展示到JwChat-rightbox 列表內。(也可以展示所有用戶、如果有群聊體系展示所有的群)
(2) 點擊在線用戶的時候獲取到在線用戶的信息並記錄下來,然后用ajax異步獲取聊天記錄(后台根據發布者和接收者按時間升序排序),然后前台根據聊天記錄做對應的轉換。這里是ajax獲取,也可以用websocket拿,對發送的消息做處理,后端接收到消息處理對應的業務即可
(3) 聊天的時候判斷是否輸入有信息,有信息的時候將信息包裝一下(增加發送者、接收者信息)發到后端,后端存入數據庫之后再發送到對應的channel返回給前端,前端接收到后做處理完加入聊天記錄數組展示在界面
2. 后端主要文件:
(1) 聊天記錄表
package com.xm.ggn.bean.chat; import com.xm.ggn.bean.AbstractSequenceEntity; import lombok.EqualsAndHashCode; import lombok.Getter; import lombok.Setter; import lombok.ToString; import javax.persistence.Entity; /** * 聊天記錄 */ @Entity @Getter @Setter @EqualsAndHashCode(callSuper = true) @ToString(callSuper = true) public class ChatLog extends AbstractSequenceEntity { private String sendUsername; private String sendFullname; private String receiveUsername; private String receiveFullname; private String content; private String remark; /** * 是否已讀 */ private boolean readed; }
包含繼承的通用字段:
@Id @GeneratedValue(strategy = GenerationType.IDENTITY) @TableId(type = IdType.AUTO) // 增加該注解,mybatis plus insert之后會給bean設上Id protected long id; /** * 創建者 */ @Index(name = "creator") @TableField(update = "%s") protected String creator; /** * 唯一編號 */ @Index(name = "uniqueCode") @TableField(update = "%s") protected String uniqueCode; /** * 創建時間 */ @Index(name = "createtime") @TableField(update = "%s") protected Date createtime;
(2) MyWebSocketHandler消息處理者類:
package com.xm.ggn.netty; import com.alibaba.fastjson.JSONObject; import com.xm.ggn.bean.chat.ChatLog; import com.xm.ggn.service.chat.ChatLogService; import com.xm.ggn.utils.system.SpringBootUtils; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import java.text.SimpleDateFormat; import java.util.Collection; import java.util.Date; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * 自定義服務器端處理handler,繼承SimpleChannelInboundHandler,處理WebSocket 連接數據 */ @Slf4j public class MyWebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { public static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // 用戶id=>channel示例 // 可以通過用戶的唯一標識保存用戶的channel // 這樣就可以發送給指定的用戶 public static ConcurrentHashMap<String, Channel> channelMap = new ConcurrentHashMap<>(); /** * 每當服務端收到新的客戶端連接時,客戶端的channel存入ChannelGroup列表中,並通知列表中其他客戶端channel * * @param ctx * @throws Exception */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { // 獲取連接的channel Channel incomming = ctx.channel(); //通知所有已經連接到服務器的客戶端,有一個新的通道加入 /*for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"加入\n"); }*/ channelGroup.add(incomming); log.info("添加新的channel, incomming: {}", incomming); } /** * 每當服務端斷開客戶端連接時,客戶端的channel從ChannelGroup中移除,並通知列表中其他客戶端channel * * @param ctx * @throws Exception */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { //獲取連接的channel /*Channel incomming = ctx.channel(); for(Channel channel:channelGroup){ channel.writeAndFlush("[SERVER]-"+incomming.remoteAddress()+"離開\n"); }*/ //從服務端的channelGroup中移除當前離開的客戶端 channelGroup.remove(ctx.channel()); //從服務端的channelMap中移除當前離開的客戶端 Collection<Channel> col = channelMap.values(); while (true == col.contains(ctx.channel())) { col.remove(ctx.channel()); log.info("netty客戶端連接刪除成功!"); } } /** * 處理建立連接時候請求(用於拿參數) * * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (null != msg && msg instanceof FullHttpRequest) { log.info("連接請求,准備提取參數"); //轉化為http請求 FullHttpRequest request = (FullHttpRequest) msg; //拿到請求地址 String uri = request.uri(); log.info("uri: " + uri); if (StringUtils.isNotBlank(uri)) { String path = StringUtils.substringBefore(uri, "?"); log.info("path: {}", path); String username = StringUtils.substringAfterLast(path, "/"); log.info(username); channelMap.put(username, ctx.channel()); log.info("channelMap: {}", channelMap); } //重新設置請求地址為WebSocketServerProtocolHandler 匹配的地址(如果WebSocketServerProtocolHandler 的時候checkStartsWith 為true則不需要設置,會根據前綴匹配) // request.setUri("/ws"); } //接着建立請求 super.channelRead(ctx, msg); } /** * 每當從服務端讀到客戶端寫入信息時,將信息轉發給其他客戶端的Channel. * * @param ctx * @param msg * @throws Exception */ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { log.info("netty客戶端收到服務器數據, 客戶端地址: {}, msg: {}", ctx.channel().remoteAddress(), msg.text()); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); //消息處理類 handleMessage(ctx, msg.text(), date); //channelGroup.writeAndFlush( new TextWebSocketFrame(msg.text())); } /** * 當服務端的IO 拋出異常時被調用 * * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); log.error("SimpleChatClient:" + incoming.remoteAddress() + "異常", cause); //異常出現就關閉連接 ctx.close(); } /** * 處理讀取到的消息 * * @param ctx * @param msg * @param date */ private void handleMessage(ChannelHandlerContext ctx, String msg, String date) { try { // 消息入庫 ChatLog chatLog = JSONObject.parseObject(msg, ChatLog.class); log.info("chatLog: {}", chatLog); ChatLogService chatLogService = SpringBootUtils.getBean(ChatLogService.class); chatLogService.insert(chatLog); // 消息轉發給對應用戶(發給發送者和接收者) String receiveUsername = chatLog.getReceiveUsername(); String sendUsername = chatLog.getSendUsername(); Set<Map.Entry<String, Channel>> entries = channelMap.entrySet(); String key = null; for (Map.Entry<String, Channel> entry : entries) { key = entry.getKey(); if (key.equals(receiveUsername) || key.equals(sendUsername)) { log.info("服務器轉發消息, key: {}, msg: {}", key, JSONObject.toJSONString(chatLog)); entry.getValue().writeAndFlush(new TextWebSocketFrame(JSONObject.toJSONString(chatLog))); } } } catch (Exception e) { log.error("message 處理異常, msg: {}, date: {}", msg, date, e); } } }
這個只是簡單的實現了在線用戶的單聊,如果要做的好可以添加通訊錄功能、群聊,其實這個就是發送消息的時候接受者是群號等。待有這方面需求的時候會繼續完善。