上篇文章已經詳細介紹了如何基於netty搭建一個多人單聊天室(https://www.cnblogs.com/junehozhao/p/11853800.html),那么怎么建一個多人多聊天室呢?
其實,實現的方法並不難,我是借助使用redis來實現的,我們從代碼中可以知道,對於每個聊天的channel,是通過ChannelGroup來管理的,也就是每個進來的通道,我都會把它放進ChannelGroup中。那么每當我們接收到來訪問的請求時,可以把每個聊天室的唯一ID傳過來,在redis中,根據這個聊天室ID,把所有屬於這個聊天室ID的channel都放到同一個HashMap,並且緩存到redis中,這樣,每當一個channel需要推送消息,我們就根據消息中的聊天室ID去redis中拿所有的channel,然后再向這些channel循環推送消息,這樣,一個簡單的多聊天室就實現了。
代碼如下,同樣的,把http和websocket的兩個方法抽到了接口文件中,首先是 IHttp接口:
package com.june.netty.socket; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; public interface IHttp { void handleHttpRequset(ChannelHandlerContext ctx, FullHttpRequest request); }
IWebSocket接口:
package com.june.netty.socket; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; public interface IWebSocket { void textdoMessage(ChannelHandlerContext ctx, TextWebSocketFrame msg); }
然后是實現類WebsocketImpl,實現了IHttp和IWebSocket兩個接口,關於消息的發送方法和初始化方法就在這個類中,並且相對於單聊天室,這里面做了比較大的改動:
package com.june.netty.socket; import com.alibaba.fastjson.JSON; import com.google.gson.Gson; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.AttributeKey; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class WebsocketImpl implements IHttp,IWebSocket{ private static final String HN_HTTP_CODEC = "HN_HTTP_CODEC"; private static final String NH_HTTP_AGGREGATOR ="NH_HTTP_AGGREGATOR"; private static final String NH_HTTP_CHUNK = "HN_HTTP_CHUNK"; private static final String NH_SERVER = "NH_LOGIC"; private static final AttributeKey<WebSocketServerHandshaker> ATTR_HANDSHAKER = AttributeKey.newInstance("ATTR_KEY_CHANNELID"); private static final int MAX_CONTENT_LENGTH = 65536; private static final String WEBSOCKET_URI_ROOT_PATTERN = "ws://%s:%d"; //websocket地址 private String host; //websocket端口號 private int port; //存放websocket連接 private Map<ChannelId, Channel> channelMap = new ConcurrentHashMap<ChannelId, Channel>(); private ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final String WEBSOCKET_URI_ROOT; public WebsocketImpl(String host, int port) { super(); this.host = host; this.port = port; WEBSOCKET_URI_ROOT = String.format(WEBSOCKET_URI_ROOT_PATTERN, host, port); } //socket初始化方法 public void start(){ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(bossGroup, workerGroup); sb.channel(NioServerSocketChannel.class); sb.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // TODO Auto-generated method stub ChannelPipeline pl = ch.pipeline(); //保存引用 channelMap.put(ch.id(), ch); group.add(ch); ch.closeFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // TODO Auto-generated method stub //關閉后拋棄 channelMap.remove(future.channel().id()); group.remove(ch); } }); pl.addLast(HN_HTTP_CODEC,new HttpServerCodec()); pl.addLast(NH_HTTP_AGGREGATOR,new HttpObjectAggregator(MAX_CONTENT_LENGTH)); pl.addLast(NH_HTTP_CHUNK,new ChunkedWriteHandler()); pl.addLast(NH_SERVER,new WebSocketServerHandler(WebsocketImpl.this,WebsocketImpl.this)); } }); try { ChannelFuture future = sb.bind(host,port).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // TODO Auto-generated method stub if(future.isSuccess()){ System.out.println("websocket started"); } } }).sync(); future.channel().closeFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // TODO Auto-generated method stub System.out.println("channel is closed"); } }).sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } System.out.println("websocket stoped"); } @Override public void handleHttpRequset(ChannelHandlerContext ctx, FullHttpRequest request) { String subProtocols = request.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL); WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(WEBSOCKET_URI_ROOT, subProtocols, false); WebSocketServerHandshaker handshaker = factory.newHandshaker(request); if(handshaker == null){ WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }else{ handshaker.handshake(ctx.channel(), request); ctx.channel().attr(ATTR_HANDSHAKER).set(handshaker); } return; } @Override public void textdoMessage(ChannelHandlerContext ctx, TextWebSocketFrame msg) { Map<String,Object> maps = (Map) JSON.parse(msg.text()); //根據groupId從redis中獲取channel String key = "channel:" + maps.get("groupId"); Channel channel = ctx.channel(); Map<String , Object> map = JedisUtils.getObjectMap(key); if(map == null){ map = new HashMap<>(); } map.put(channel.id()+"" ,channel.id()); JedisUtils.del(key); JedisUtils.setObjectMap(key , map ,0); group.add(ctx.channel()); sendGroupText(map,maps); } public void sendGroupText(Map<String , Object> map, Map<String, Object> maps) { Gson gson = new Gson(); String groupId = (String) maps.get("groupId"); String value = (String) maps.get("msg"); Map<String,String> backMap = new HashMap<String,String>(); backMap.put("msg",value); for(Map.Entry<String , Object> entry : map.entrySet() ){ Channel channel = group.find((ChannelId) entry.getValue()); if(channel != null){ channel.writeAndFlush(new TextWebSocketFrame(gson.toJson(backMap))); } } } }
里面有個一細節就是使用redis緩存HashMap,我把代碼放上,至於如何springboot整合redis,不是本文重點而且也很簡單,有興趣的小伙伴可以關注后續的文章,會有介紹:
/** * 獲取Map緩存 * @param key 鍵 * @return 值 */ public static Map<String, Object> getObjectMap(String key) { key = KEY_PREFIX + key; Map<String, Object> value = null; Jedis jedis = null; try { jedis = getResource(); if (jedis.exists(getBytesKey(key))) { value = Maps.newHashMap(); Map<byte[], byte[]> map = jedis.hgetAll(getBytesKey(key)); for (Map.Entry<byte[], byte[]> e : map.entrySet()){ value.put(StringUtils.toString(e.getKey()), toObject(e.getValue())); } logger.debug("getObjectMap {} = {}", key, value); } } catch (Exception e) { logger.warn("getObjectMap {} = {}", key, value, e); } finally { returnResource(jedis); } return value; } /** * 設置Map緩存 * @param key 鍵 * @param value 值 * @param cacheSeconds 超時時間,0為不超時 * @return */ public static String setObjectMap(String key, Map<String, Object> value, int cacheSeconds) { key = KEY_PREFIX + key; String result = null; Jedis jedis = null; try { jedis = getResource(); if (jedis.exists(getBytesKey(key))) { jedis.del(key); } Map<byte[], byte[]> map = Maps.newHashMap(); for (Map.Entry<String, Object> e : value.entrySet()){ map.put(getBytesKey(e.getKey()), toBytes(e.getValue())); } result = jedis.hmset(getBytesKey(key), (Map<byte[], byte[]>)map); if (cacheSeconds != 0) { jedis.expire(key, cacheSeconds); } logger.debug("setObjectMap {} = {}", key, value); } catch (Exception e) { logger.warn("setObjectMap {} = {}", key, value, e); } finally { returnResource(jedis); } return result; }
然后是消息的接收方法類,和單聊天室的是一樣的:
package com.june.netty.socket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>{ private IWebSocket iWebSocket; private IHttp iHttp; public WebSocketServerHandler(IWebSocket iWebSocket, IHttp iHttp) { super(); this.iWebSocket = iWebSocket; this.iHttp = iHttp; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub if (msg instanceof TextWebSocketFrame){ iWebSocket.textdoMessage(ctx,(TextWebSocketFrame)msg); }if(msg instanceof FullHttpRequest){ iHttp.handleHttpRequset(ctx, (FullHttpRequest)msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub ctx.flush(); } }
最后是隨着項目一起啟動和初始化websocket的類SocketStartupRunner:
package com.june.netty.runner; import com.june.netty.socket.WebsocketImpl; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; @Component @Order(value = 2) public class SocketStartupRunner implements CommandLineRunner { @Override public void run(String... args) throws Exception { WebsocketImpl socket = new WebsocketImpl("localhost", 8009); socket.start(); } }
對於pom文件的配置,主要是添加netty包和redis的配置,主要代碼如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.june</groupId> <artifactId>netty</artifactId> <version>0.0.1-SNAPSHOT</version> <name>chat</name> <description>Demo project for Spring Boot</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <fastjson.version>1.2.15</fastjson.version> <guava.version>25.1-jre</guava.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- fastjson json --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <scope>compile</scope> </dependency> <!-- spring security --> <dependency> <groupId>io.jsonwebtoken</groupId> <artifactId>jjwt</artifactId> <version>0.7.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
對於頁面的更改,相比較單聊天室,只是添加了一個聊天室的唯一ID:groupId
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title></title> </head> </head> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://localhost:8009"); socket.onmessage = function(event){ appendln("收到:" + JSON.parse(event.data).msg); }; socket.onopen = function(event){ appendln("鏈接已打開"); }; socket.onclose = function(event){ appendln("鏈接已關閉"); }; }else{ alert("鏈接丟失"); } function send(message){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ var messages = {"groupId": "admin","msg":message} socket.send(JSON.stringify(messages)); appendln("發送:" + message); }else{ alert("發送失敗,鏈接丟失"); } } function appendln(text) { var ta = document.getElementById('responseMsg'); ta.value += text + "\r\n"; } function clear() { var ta = document.getElementById('responseMsg'); ta.value = ""; } </script> <body> <form onSubmit="return false;"> <h3>聊天室</h3> <textarea id="responseMsg" style="width: 1000px;height: 600px;"></textarea> <br/> <input type = "text" name="message" value="你好啊,朋友"/> <input type="button" value="發送" onClick="send(this.form.message.value)"/> </form> </body> </html>
這樣,一個基於netty的多聊天室就完成了。
