上篇文章已经详细介绍了如何基于netty搭建一个多人单聊天室(https://www.cnblogs.com/junehozhao/p/11853800.html),那么怎么建一个多人多聊天室呢?
其实,实现的方法并不难,我是借助使用redis来实现的,我们从代码中可以知道,对于每个聊天的channel,是通过ChannelGroup来管理的,也就是每个进来的通道,我都会把它放进ChannelGroup中。那么每当我们接收到来访问的请求时,可以把每个聊天室的唯一ID传过来,在redis中,根据这个聊天室ID,把所有属于这个聊天室ID的channel都放到同一个HashMap,并且缓存到redis中,这样,每当一个channel需要推送消息,我们就根据消息中的聊天室ID去redis中拿所有的channel,然后再向这些channel循环推送消息,这样,一个简单的多聊天室就实现了。
代码如下,同样的,把http和websocket的两个方法抽到了接口文件中,首先是 IHttp接口:
package com.june.netty.socket; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.FullHttpRequest; public interface IHttp { void handleHttpRequset(ChannelHandlerContext ctx, FullHttpRequest request); }
IWebSocket接口:
package com.june.netty.socket; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; public interface IWebSocket { void textdoMessage(ChannelHandlerContext ctx, TextWebSocketFrame msg); }
然后是实现类WebsocketImpl,实现了IHttp和IWebSocket两个接口,关于消息的发送方法和初始化方法就在这个类中,并且相对于单聊天室,这里面做了比较大的改动:
package com.june.netty.socket; import com.alibaba.fastjson.JSON; import com.google.gson.Gson; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.util.AttributeKey; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class WebsocketImpl implements IHttp,IWebSocket{ private static final String HN_HTTP_CODEC = "HN_HTTP_CODEC"; private static final String NH_HTTP_AGGREGATOR ="NH_HTTP_AGGREGATOR"; private static final String NH_HTTP_CHUNK = "HN_HTTP_CHUNK"; private static final String NH_SERVER = "NH_LOGIC"; private static final AttributeKey<WebSocketServerHandshaker> ATTR_HANDSHAKER = AttributeKey.newInstance("ATTR_KEY_CHANNELID"); private static final int MAX_CONTENT_LENGTH = 65536; private static final String WEBSOCKET_URI_ROOT_PATTERN = "ws://%s:%d"; //websocket地址 private String host; //websocket端口号 private int port; //存放websocket连接 private Map<ChannelId, Channel> channelMap = new ConcurrentHashMap<ChannelId, Channel>(); private ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private final String WEBSOCKET_URI_ROOT; public WebsocketImpl(String host, int port) { super(); this.host = host; this.port = port; WEBSOCKET_URI_ROOT = String.format(WEBSOCKET_URI_ROOT_PATTERN, host, port); } //socket初始化方法 public void start(){ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap sb = new ServerBootstrap(); sb.group(bossGroup, workerGroup); sb.channel(NioServerSocketChannel.class); sb.childHandler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { // TODO Auto-generated method stub ChannelPipeline pl = ch.pipeline(); //保存引用 channelMap.put(ch.id(), ch); group.add(ch); ch.closeFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // TODO Auto-generated method stub //关闭后抛弃 channelMap.remove(future.channel().id()); group.remove(ch); } }); pl.addLast(HN_HTTP_CODEC,new HttpServerCodec()); pl.addLast(NH_HTTP_AGGREGATOR,new HttpObjectAggregator(MAX_CONTENT_LENGTH)); pl.addLast(NH_HTTP_CHUNK,new ChunkedWriteHandler()); pl.addLast(NH_SERVER,new WebSocketServerHandler(WebsocketImpl.this,WebsocketImpl.this)); } }); try { ChannelFuture future = sb.bind(host,port).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // TODO Auto-generated method stub if(future.isSuccess()){ System.out.println("websocket started"); } } }).sync(); future.channel().closeFuture().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { // TODO Auto-generated method stub System.out.println("channel is closed"); } }).sync(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } finally{ bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } System.out.println("websocket stoped"); } @Override public void handleHttpRequset(ChannelHandlerContext ctx, FullHttpRequest request) { String subProtocols = request.headers().get(HttpHeaderNames.SEC_WEBSOCKET_PROTOCOL); WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(WEBSOCKET_URI_ROOT, subProtocols, false); WebSocketServerHandshaker handshaker = factory.newHandshaker(request); if(handshaker == null){ WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); }else{ handshaker.handshake(ctx.channel(), request); ctx.channel().attr(ATTR_HANDSHAKER).set(handshaker); } return; } @Override public void textdoMessage(ChannelHandlerContext ctx, TextWebSocketFrame msg) { Map<String,Object> maps = (Map) JSON.parse(msg.text()); //根据groupId从redis中获取channel String key = "channel:" + maps.get("groupId"); Channel channel = ctx.channel(); Map<String , Object> map = JedisUtils.getObjectMap(key); if(map == null){ map = new HashMap<>(); } map.put(channel.id()+"" ,channel.id()); JedisUtils.del(key); JedisUtils.setObjectMap(key , map ,0); group.add(ctx.channel()); sendGroupText(map,maps); } public void sendGroupText(Map<String , Object> map, Map<String, Object> maps) { Gson gson = new Gson(); String groupId = (String) maps.get("groupId"); String value = (String) maps.get("msg"); Map<String,String> backMap = new HashMap<String,String>(); backMap.put("msg",value); for(Map.Entry<String , Object> entry : map.entrySet() ){ Channel channel = group.find((ChannelId) entry.getValue()); if(channel != null){ channel.writeAndFlush(new TextWebSocketFrame(gson.toJson(backMap))); } } } }
里面有个一细节就是使用redis缓存HashMap,我把代码放上,至于如何springboot整合redis,不是本文重点而且也很简单,有兴趣的小伙伴可以关注后续的文章,会有介绍:
/** * 获取Map缓存 * @param key 键 * @return 值 */ public static Map<String, Object> getObjectMap(String key) { key = KEY_PREFIX + key; Map<String, Object> value = null; Jedis jedis = null; try { jedis = getResource(); if (jedis.exists(getBytesKey(key))) { value = Maps.newHashMap(); Map<byte[], byte[]> map = jedis.hgetAll(getBytesKey(key)); for (Map.Entry<byte[], byte[]> e : map.entrySet()){ value.put(StringUtils.toString(e.getKey()), toObject(e.getValue())); } logger.debug("getObjectMap {} = {}", key, value); } } catch (Exception e) { logger.warn("getObjectMap {} = {}", key, value, e); } finally { returnResource(jedis); } return value; } /** * 设置Map缓存 * @param key 键 * @param value 值 * @param cacheSeconds 超时时间,0为不超时 * @return */ public static String setObjectMap(String key, Map<String, Object> value, int cacheSeconds) { key = KEY_PREFIX + key; String result = null; Jedis jedis = null; try { jedis = getResource(); if (jedis.exists(getBytesKey(key))) { jedis.del(key); } Map<byte[], byte[]> map = Maps.newHashMap(); for (Map.Entry<String, Object> e : value.entrySet()){ map.put(getBytesKey(e.getKey()), toBytes(e.getValue())); } result = jedis.hmset(getBytesKey(key), (Map<byte[], byte[]>)map); if (cacheSeconds != 0) { jedis.expire(key, cacheSeconds); } logger.debug("setObjectMap {} = {}", key, value); } catch (Exception e) { logger.warn("setObjectMap {} = {}", key, value, e); } finally { returnResource(jedis); } return result; }
然后是消息的接收方法类,和单聊天室的是一样的:
package com.june.netty.socket; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>{ private IWebSocket iWebSocket; private IHttp iHttp; public WebSocketServerHandler(IWebSocket iWebSocket, IHttp iHttp) { super(); this.iWebSocket = iWebSocket; this.iHttp = iHttp; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // TODO Auto-generated method stub if (msg instanceof TextWebSocketFrame){ iWebSocket.textdoMessage(ctx,(TextWebSocketFrame)msg); }if(msg instanceof FullHttpRequest){ iHttp.handleHttpRequset(ctx, (FullHttpRequest)msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // TODO Auto-generated method stub ctx.flush(); } }
最后是随着项目一起启动和初始化websocket的类SocketStartupRunner:
package com.june.netty.runner; import com.june.netty.socket.WebsocketImpl; import org.springframework.boot.CommandLineRunner; import org.springframework.core.annotation.Order; import org.springframework.stereotype.Component; @Component @Order(value = 2) public class SocketStartupRunner implements CommandLineRunner { @Override public void run(String... args) throws Exception { WebsocketImpl socket = new WebsocketImpl("localhost", 8009); socket.start(); } }
对于pom文件的配置,主要是添加netty包和redis的配置,主要代码如下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.7.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.june</groupId> <artifactId>netty</artifactId> <version>0.0.1-SNAPSHOT</version> <name>chat</name> <description>Demo project for Spring Boot</description> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> <fastjson.version>1.2.15</fastjson.version> <guava.version>25.1-jre</guava.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- fastjson json --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>${fastjson.version}</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-test</artifactId> <scope>compile</scope> </dependency> <!-- spring security --> <dependency> <groupId>io.jsonwebtoken</groupId> <artifactId>jjwt</artifactId> <version>0.7.0</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.8.1</version> </dependency> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>18.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>com.google.code.gson</groupId> <artifactId>gson</artifactId> <version>${gson.version}</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.5</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
对于页面的更改,相比较单聊天室,只是添加了一个聊天室的唯一ID:groupId
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd"> <html xmlns="http://www.w3.org/1999/xhtml"> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8" /> <title></title> </head> </head> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } if(window.WebSocket){ socket = new WebSocket("ws://localhost:8009"); socket.onmessage = function(event){ appendln("收到:" + JSON.parse(event.data).msg); }; socket.onopen = function(event){ appendln("链接已打开"); }; socket.onclose = function(event){ appendln("链接已关闭"); }; }else{ alert("链接丢失"); } function send(message){ if(!window.WebSocket){return;} if(socket.readyState == WebSocket.OPEN){ var messages = {"groupId": "admin","msg":message} socket.send(JSON.stringify(messages)); appendln("发送:" + message); }else{ alert("发送失败,链接丢失"); } } function appendln(text) { var ta = document.getElementById('responseMsg'); ta.value += text + "\r\n"; } function clear() { var ta = document.getElementById('responseMsg'); ta.value = ""; } </script> <body> <form onSubmit="return false;"> <h3>聊天室</h3> <textarea id="responseMsg" style="width: 1000px;height: 600px;"></textarea> <br/> <input type = "text" name="message" value="你好啊,朋友"/> <input type="button" value="发送" onClick="send(this.form.message.value)"/> </form> </body> </html>
这样,一个基于netty的多聊天室就完成了。