進行這項實驗之前,先讀了xbmchina的簡書文章,感謝這位大神提供的關於channelPipeline和channelHandler文章:
【Netty】ChannelPipeline和ChannelHandler(一)
【Netty】ChannelHandler的添加和刪除(二)
【Netty】inBound和outBound事件的傳播過程
之前想以leonzm的websocket_demo項目為基礎,寫netty4版本的聊天室,但是發現netty4的函數不一樣,messageReceived(建立鏈接/接收數據包)和close(斷開鏈接)不能覆寫,研究了下handler的生命周期。知道channelRead0可以建立鏈接,並接收已建立鏈接的客戶端的數據包;當隧道處於channelInactived階段時,表明數據隧道(鏈接)要斷開了,就要進入channelUnregistered階段,這時就可以在上面執行鏈接相關數據清除工作;隧道的處理器ChannelHandler也有生命周期,handlerRemoved時也可以執行類似操作。
netty的inbound和outbound的區別:除了inbound事件為被動觸發,在某些情況發生時自動觸發,outbound為主動觸發,在需要主動執行某些操作時觸發以外,outBound單獨用不能接收到websocket客戶端的信息(這是向外主動發信息的handler,接收信息要inbound來),outBound這個跟適合在pipeline流水線上嵌入,做AOP(切面編程)。
開始執行channelPipeline流水線程序比較:
Lanucher.java:(開啟netty服務的主函數)
1 package com.company.lanucher; 2 3 import com.company.server.ReversedWebSocketServer; 4 import com.company.server.WebSocketServer; 5 6 public class Lanucher { 7 8 public static void main(String[] args) throws Exception { 9 // 啟動WebSocket,如果想開啟另一個服務器,注釋掉Reversed,再解除WebSocketServer的注釋即可 10 //new WebSocketServer().run(WebSocketServer.WEBSOCKET_PORT); 11 new ReversedWebSocketServer().run(ReversedWebSocketServer.WEBSOCKET_PORT); 12 } 13 14 }
WebSocketServer.java:(流水線先執行inBoundHandler再執行OutBoundAdapter)
1 package com.company.server; 2 3 import org.apache.log4j.Logger; 4 5 import io.netty.bootstrap.ServerBootstrap; 6 import io.netty.channel.Channel; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelPipeline; 9 import io.netty.channel.EventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup; 11 import io.netty.channel.socket.nio.NioServerSocketChannel; 12 import io.netty.handler.codec.http.HttpObjectAggregator; 13 import io.netty.handler.codec.http.HttpServerCodec; 14 import io.netty.handler.stream.ChunkedWriteHandler; 15 16 /** 17 * WebSocket服務 18 * 19 */ 20 public class WebSocketServer { 21 private static final Logger LOG = Logger.getLogger(WebSocketServer.class); 22 23 // websocket端口 24 public static final int WEBSOCKET_PORT = 9090; 25 26 public void run(int port) throws Exception { 27 EventLoopGroup bossGroup = new NioEventLoopGroup(); 28 EventLoopGroup workerGroup = new NioEventLoopGroup(); 29 try { 30 ServerBootstrap b = new ServerBootstrap(); 31 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() { 32 33 @Override 34 protected void initChannel(Channel channel) throws Exception { 35 ChannelPipeline pipeline = channel.pipeline(); 36 pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息編碼解碼 37 pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息組裝 38 pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持 39 pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服務端Handler的前置攔截器 40 pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服務端Handler 41 } 42 }); 43 44 Channel channel = b.bind(port).sync().channel(); 45 LOG.info("WebSocket 已經啟動,端口:" + port + "."); 46 channel.closeFuture().sync(); 47 } finally { 48 bossGroup.shutdownGracefully(); 49 workerGroup.shutdownGracefully(); 50 } 51 } 52 }
ReversedWebSocketServer.java:(流水線先執行OutBoundAdapter再執行inBoundHandler)
1 package com.company.server; 2 3 import org.apache.log4j.Logger; 4 5 import io.netty.bootstrap.ServerBootstrap; 6 import io.netty.channel.Channel; 7 import io.netty.channel.ChannelInitializer; 8 import io.netty.channel.ChannelPipeline; 9 import io.netty.channel.EventLoopGroup; 10 import io.netty.channel.nio.NioEventLoopGroup; 11 import io.netty.channel.socket.nio.NioServerSocketChannel; 12 import io.netty.handler.codec.http.HttpObjectAggregator; 13 import io.netty.handler.codec.http.HttpServerCodec; 14 import io.netty.handler.stream.ChunkedWriteHandler; 15 16 public class ReversedWebSocketServer { 17 private static final Logger LOG = Logger.getLogger(WebSocketServer.class); 18 19 // websocket端口 20 public static final int WEBSOCKET_PORT = 9090; 21 public static final int FUN_WEBSOCKET_PORT = 9091; 22 23 public void run(int port) throws Exception { 24 EventLoopGroup bossGroup = new NioEventLoopGroup(); 25 EventLoopGroup workerGroup = new NioEventLoopGroup(); 26 try { 27 ServerBootstrap b = new ServerBootstrap(); 28 b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() { 29 30 @Override 31 protected void initChannel(Channel channel) throws Exception { 32 ChannelPipeline pipeline = channel.pipeline(); 33 pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息編碼解碼 34 pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息組裝 35 pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持 36 pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服務端Handler 37 pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服務端Handler 38 } 39 }); 40 41 Channel channel = b.bind(port).sync().channel(); 42 LOG.info("WebSocket 已經啟動,端口:" + port + "."); 43 channel.closeFuture().sync(); 44 } finally { 45 bossGroup.shutdownGracefully(); 46 workerGroup.shutdownGracefully(); 47 } 48 } 49 50 }
BananaWebSocketServerHandler.java:(inBoundHandler,處理從客戶端接收的請求)
1 package com.company.server; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelFuture; 6 import io.netty.channel.ChannelFutureListener; 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelPromise; 9 import io.netty.channel.SimpleChannelInboundHandler; 10 import io.netty.handler.codec.http.DefaultFullHttpResponse; 11 import io.netty.handler.codec.http.FullHttpRequest; 12 import io.netty.handler.codec.http.FullHttpResponse; 13 import io.netty.handler.codec.http.HttpHeaders; 14 import io.netty.handler.codec.http.HttpResponseStatus; 15 import io.netty.handler.codec.http.HttpVersion; 16 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; 17 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame; 18 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame; 19 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; 20 import io.netty.handler.codec.http.websocketx.WebSocketFrame; 21 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; 22 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; 23 import io.netty.util.CharsetUtil; 24 25 import org.apache.log4j.Logger; 26 27 import com.company.serviceimpl.BananaService; 28 import com.company.util.BufToString; 29 import com.company.util.CODE; 30 import com.company.util.Request; 31 import com.company.util.Response; 32 import com.google.common.base.Strings; 33 import com.google.gson.JsonSyntaxException; 34 35 36 /** 37 * WebSocket服務端Handler 38 * 39 */ 40 public class BananaWebSocketServerHandler extends SimpleChannelInboundHandler<Object> { 41 private static final Logger LOG = Logger.getLogger(BananaWebSocketServerHandler.class.getName()); 42 43 private WebSocketServerHandshaker handshaker; 44 private ChannelHandlerContext ctx; 45 private String sessionId; 46 private boolean isLog = true; 47 48 public BananaWebSocketServerHandler() { 49 super(); 50 } 51 52 public BananaWebSocketServerHandler(boolean isLog) { 53 this(); 54 this.isLog = isLog; 55 } 56 57 //netty 5的覆寫函數,netty4中用channelRead0代替 58 public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { 59 if(this.isLog) { 60 System.out.print("channel MessageReceived = = " + ctx.name()); 61 } 62 if (msg instanceof FullHttpRequest) { // 傳統的HTTP接入 63 FullHttpRequest mymsg = (FullHttpRequest) msg; 64 System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content())); 65 handleHttpRequest(ctx, mymsg); 66 } else if (msg instanceof WebSocketFrame) { // WebSocket接入 67 WebSocketFrame mymsg = (WebSocketFrame) msg; 68 System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content())); 69 handleWebSocketFrame(ctx, mymsg); 70 } 71 } 72 73 @Override 74 public void handlerAdded(ChannelHandlerContext ctx) throws Exception{ 75 System.out.println("channel handlerAdded = = " + ctx.name()); 76 } 77 78 @Override 79 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{ 80 System.out.println("channel handlerRemoved = = " + ctx.name()); 81 } 82 83 @Override 84 protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { 85 if(this.isLog) { 86 System.out.print("channel Read0 = = " + ctx.name()); 87 } 88 if (msg instanceof FullHttpRequest) { // 傳統的HTTP接入 89 FullHttpRequest mymsg = (FullHttpRequest) msg; 90 System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content())); 91 handleHttpRequest(ctx, mymsg); 92 } else if (msg instanceof WebSocketFrame) { // WebSocket接入 93 WebSocketFrame mymsg = (WebSocketFrame) msg; 94 System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content())); 95 handleWebSocketFrame(ctx, mymsg); 96 } 97 } 98 99 @Override 100 public void channelInactive(ChannelHandlerContext ctx) { 101 if(this.isLog) { 102 System.out.println("channel Inactive = = " + ctx.name()); 103 } 104 try { 105 this.close(ctx, null); 106 } catch (Exception e) { 107 e.printStackTrace(); 108 } 109 } 110 111 @Override 112 public void channelUnregistered(ChannelHandlerContext ctx) { 113 if(this.isLog) { 114 System.out.println("channel Unregistered = = " + ctx.name()); 115 } 116 } 117 118 @Override 119 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 120 ctx.flush(); 121 System.out.println("channel Flush = = " + ctx.name()); 122 } 123 124 @Override 125 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 126 127 ctx.close(); 128 if(this.isLog) { 129 System.err.println("channel exceptionCaught = = " + ctx.name()); 130 cause.printStackTrace(); 131 } 132 BananaService.logout(sessionId); // 注銷 133 BananaService.notifyDownline(sessionId); // 通知有人下線 134 } 135 136 //netty 5的覆寫函數,netty4中用channelInactive代替 137 public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 138 if(this.isLog) { 139 System.out.println("channel close = = " + ctx.name()); 140 } 141 BananaService.logout(sessionId); // 注銷 142 BananaService.notifyDownline(sessionId); // 通知有人下線 143 ctx.close(); 144 } 145 146 /** 147 * 處理Http請求,完成WebSocket握手<br/> 148 * 注意:WebSocket連接第一次請求使用的是Http 149 * @param ctx 150 * @param request 151 * @throws Exception 152 */ 153 private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { 154 // 如果HTTP解碼失敗,返回HHTP異常 155 if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) { 156 sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); 157 return; 158 } 159 160 // 正常WebSocket的Http連接請求,構造握手響應返回 161 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false); 162 handshaker = wsFactory.newHandshaker(request); 163 if (handshaker == null) { // 無法處理的websocket版本 164 WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); 165 } else { // 向客戶端發送websocket握手,完成握手 166 handshaker.handshake(ctx.channel(), request); 167 // 記錄管道處理上下文,便於服務器推送數據到客戶端 168 this.ctx = ctx; 169 } 170 } 171 172 /** 173 * 處理Socket請求 174 * @param ctx 175 * @param frame 176 * @throws Exception 177 */ 178 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { 179 // 判斷是否是關閉鏈路的指令 180 if (frame instanceof CloseWebSocketFrame) { 181 handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); 182 return; 183 } 184 // 判斷是否是Ping消息 185 if (frame instanceof PingWebSocketFrame) { 186 ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); 187 return; 188 } 189 // 當前只支持文本消息,不支持二進制消息 190 if (!(frame instanceof TextWebSocketFrame)) { 191 throw new UnsupportedOperationException("當前只支持文本消息,不支持二進制消息"); 192 } 193 194 // 處理來自客戶端的WebSocket請求 195 try { 196 /* 197 if(this.isLog) { 198 System.out.println("handleWebSocketFrame-=-=-" + ((TextWebSocketFrame)frame).text()); 199 } 200 */ 201 Request request = Request.create(((TextWebSocketFrame)frame).text()); 202 Response response = new Response(); 203 response.setServiceId(request.getServiceId()); 204 if (CODE.online.code.intValue() == request.getServiceId()) { // 客戶端注冊 205 String requestId = request.getRequestId(); 206 if (Strings.isNullOrEmpty(requestId)) { 207 response.setIsSucc(false).setMessage("requestId不能為空"); 208 return; 209 } else if (Strings.isNullOrEmpty(request.getName())) { 210 response.setIsSucc(false).setMessage("name不能為空"); 211 return; 212 } else if (BananaService.bananaWatchMap.containsKey(requestId)) { 213 response.setIsSucc(false).setMessage("您已經注冊了,不能重復注冊"); 214 return; 215 } 216 if (!BananaService.register(requestId, new BananaService(ctx, request.getName()))) { 217 response.setIsSucc(false).setMessage("注冊失敗"); 218 } else { 219 response.setIsSucc(true).setMessage("注冊成功"); 220 221 BananaService.bananaWatchMap.forEach((reqId, callBack) -> { 222 response.getHadOnline().put(reqId, ((BananaService)callBack).getName()); // 將已經上線的人員返回 223 224 if (!reqId.equals(requestId)) { 225 Request serviceRequest = new Request(); 226 serviceRequest.setServiceId(CODE.online.code); 227 serviceRequest.setRequestId(requestId); 228 serviceRequest.setName(request.getName()); 229 try { 230 callBack.send(serviceRequest); // 通知有人上線 231 } catch (Exception e) { 232 LOG.warn("回調發送消息給客戶端異常", e); 233 } 234 } 235 }); 236 } 237 sendWebSocket(response.toJson()); 238 this.sessionId = requestId; // 記錄會話id,當頁面刷新或瀏覽器關閉時,注銷掉此鏈路 239 } else if (CODE.send_message.code.intValue() == request.getServiceId()) { // 客戶端發送消息到聊天群 240 String requestId = request.getRequestId(); 241 if (Strings.isNullOrEmpty(requestId)) { 242 response.setIsSucc(false).setMessage("requestId不能為空"); 243 } else if (Strings.isNullOrEmpty(request.getName())) { 244 response.setIsSucc(false).setMessage("name不能為空"); 245 } else if (Strings.isNullOrEmpty(request.getMessage())) { 246 response.setIsSucc(false).setMessage("message不能為空"); 247 } else { 248 response.setIsSucc(true).setMessage("發送消息成功"); 249 250 BananaService.bananaWatchMap.forEach((reqId, callBack) -> { // 將消息發送到所有機器 251 Request serviceRequest = new Request(); 252 serviceRequest.setServiceId(CODE.receive_message.code); 253 serviceRequest.setRequestId(requestId); 254 serviceRequest.setName(request.getName()); 255 serviceRequest.setMessage(request.getMessage()); 256 try { 257 callBack.send(serviceRequest); 258 } catch (Exception e) { 259 LOG.warn("回調發送消息給客戶端異常", e); 260 } 261 }); 262 } 263 sendWebSocket(response.toJson()); 264 } else if (CODE.downline.code.intValue() == request.getServiceId()) { // 客戶端下線 265 String requestId = request.getRequestId(); 266 if (Strings.isNullOrEmpty(requestId)) { 267 sendWebSocket(response.setIsSucc(false).setMessage("requestId不能為空").toJson()); 268 } else { 269 BananaService.logout(requestId); 270 response.setIsSucc(true).setMessage("下線成功"); 271 272 BananaService.notifyDownline(requestId); // 通知有人下線 273 274 sendWebSocket(response.toJson()); 275 } 276 277 } else { 278 sendWebSocket(response.setIsSucc(false).setMessage("未知請求").toJson()); 279 } 280 } catch (JsonSyntaxException e1) { 281 LOG.warn("Json解析異常", e1); 282 System.err.println("Json解析異常"); 283 e1.printStackTrace(); 284 } catch (Exception e2) { 285 LOG.error("處理Socket請求異常", e2); 286 System.err.println("處理Socket請求異常"); 287 e2.printStackTrace(); 288 } 289 } 290 291 /** 292 * Http返回 293 * @param ctx 294 * @param request 295 * @param response 296 */ 297 private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) { 298 // 返回應答給客戶端 299 if (response.getStatus().code() != 200) { 300 ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8); 301 response.content().writeBytes(buf); 302 buf.release(); 303 HttpHeaders.setContentLength(response, response.content().readableBytes()); 304 } 305 306 // 如果是非Keep-Alive,關閉連接 307 ChannelFuture f = ctx.channel().writeAndFlush(response); 308 if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) { 309 f.addListener(ChannelFutureListener.CLOSE); 310 } 311 } 312 313 /** 314 * WebSocket返回 315 * @param ctx 316 * @param req 317 * @param res 318 */ 319 public void sendWebSocket(String msg) throws Exception { 320 if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) { 321 throw new Exception("尚未握手成功,無法向客戶端發送WebSocket消息"); 322 } 323 this.ctx.channel().write(new TextWebSocketFrame(msg)); 324 this.ctx.flush(); 325 } 326 327 }
FunWebSocketServerHandler.java:(outBoundAdapter,處理從服務器發出的響應)
1 package com.company.server; 2 3 import java.net.SocketAddress; 4 5 import com.company.util.BufToString; 6 7 import io.netty.channel.ChannelHandlerContext; 8 import io.netty.channel.ChannelOutboundHandlerAdapter; 9 import io.netty.channel.ChannelPromise; 10 import io.netty.handler.codec.http.DefaultFullHttpResponse; 11 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame; 12 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; 13 14 public class FunWebSocketServerHandler extends ChannelOutboundHandlerAdapter{ 15 16 @Override 17 public void read(ChannelHandlerContext ctx) throws Exception { 18 ChannelHandlerContext readRes = ctx.read(); 19 System.out.println(ctx.name() + " is read in " + readRes.toString()); 20 } 21 22 @Override 23 public void handlerAdded(ChannelHandlerContext ctx) throws Exception{ 24 System.out.println(ctx.name() + " handlerAdded = = " + ctx.name()); 25 } 26 27 @Override 28 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{ 29 System.out.println(ctx.name() + " handlerRemoved = = " + ctx.name()); 30 } 31 32 @Override 33 public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, 34 ChannelPromise promise) throws Exception { 35 ctx.bind(localAddress, promise); 36 System.out.println(ctx.name() + " is bind in " + localAddress.toString() + " in " + promise.toString()); 37 } 38 @Override 39 public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress, 40 SocketAddress localAddress, ChannelPromise promise) throws Exception { 41 ctx.connect(remoteAddress, localAddress, promise); 42 System.out.println(ctx.name() + " is connect in " + localAddress.toString() + " in client " + remoteAddress.toString() + " in " + promise.toString()); 43 } 44 @Override 45 public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) 46 throws Exception { 47 ctx.disconnect(promise); 48 System.out.println(ctx.name() + " is disconnect in " + promise.toString()); 49 } 50 @Override 51 public void close(ChannelHandlerContext ctx, ChannelPromise promise) 52 throws Exception { 53 ctx.close(promise); 54 System.out.println(ctx.name() + " is close in " + promise.toString()); 55 } 56 @Override 57 public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { 58 ctx.deregister(promise); 59 System.out.println(ctx.name() + " is deregister in " + promise.toString()); 60 } 61 62 @Override 63 public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { 64 ctx.write(msg, promise); 65 System.out.print(ctx.name() + " is write in " + promise.toString()); 66 if(msg instanceof DefaultFullHttpResponse) { 67 System.out.println(" with message : " + BufToString.convertByteBufToString(((DefaultFullHttpResponse)msg).content())); 68 } 69 else if(msg instanceof TextWebSocketFrame) { 70 System.out.println(" with socket message : " + ((TextWebSocketFrame)msg).text()); 71 } 72 else if(msg instanceof CloseWebSocketFrame) { 73 System.out.println(" close reason : " + ((CloseWebSocketFrame)msg).reasonText()); 74 } 75 else { 76 System.out.println(" with message : " + msg.getClass()); 77 } 78 } 79 @Override 80 public void flush(ChannelHandlerContext ctx) throws Exception { 81 ctx.flush(); 82 System.out.println(ctx.name() + " is flush"); 83 } 84 }
banana.html:(聊天室前端)
1 <!DOCTYPE html> 2 <html> 3 <head> 4 <meta charset="UTF-8"> 5 <title>Netty WebSocket 聊天實例</title> 6 </head> 7 <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" type="text/javascript"></script> 8 <script src="map.js" type="text/javascript"></script> 9 <script type="text/javascript"> 10 $(document).ready(function() { 11 var uuid = guid(); // uuid在一個會話唯一 12 var nameOnline = ''; // 上線姓名 13 var onlineName = new Map(); // 已上線人員, <requestId, name> 14 15 $("#name").attr("disabled","disabled"); 16 $("#onlineBtn").attr("disabled","disabled"); 17 $("#downlineBtn").attr("disabled","disabled"); 18 19 $("#banana").hide(); 20 21 // 初始化websocket 22 var socket; 23 if (!window.WebSocket) { 24 window.WebSocket = window.MozWebSocket; 25 } 26 if (window.WebSocket) { 27 socket = new WebSocket("ws://localhost:9090/"); 28 socket.onmessage = function(event) { 29 console.log("收到服務器消息:" + event.data); 30 if (event.data.indexOf("isSucc") != -1) {// 這里需要判斷是客戶端請求服務端返回后的消息(response) 31 var response = JSON.parse(event.data); 32 if (response != undefined && response != null) { 33 if (response.serviceId == 1001) { // 上線 34 if (response.isSucc) { 35 // 上線成功,初始化已上線人員 36 onlineName.clear(); 37 $("#showOnlineNames").empty(); 38 for (var reqId in response.hadOnline) { 39 onlineName.put(reqId, response.hadOnline[reqId]); 40 } 41 initOnline(); 42 43 $("#name").attr("disabled","disabled"); 44 $("#onlineBtn").attr("disabled","disabled"); 45 $("#downlineBtn").removeAttr("disabled"); 46 $("#banana").show(); 47 } else { 48 alert("上線失敗"); 49 } 50 } else if (response.serviceId == 1004) { 51 if (response.isSucc) { 52 onlineName.clear(); 53 $("#showBanana").empty(); 54 $("#showOnlineNames").empty(); 55 $("#name").removeAttr("disabled"); 56 $("#onlineBtn").removeAttr("disabled"); 57 $("#downlineBtn").attr("disabled","disabled"); 58 $("#banana").hide(); 59 } else { 60 alert("下線失敗"); 61 } 62 } 63 } 64 } else {// 還是服務端向客戶端的請求(request) 65 var request = JSON.parse(event.data); 66 if (request != undefined && request != null) { 67 if (request.serviceId == 1001 || request.serviceId == 1004) { // 有人上線/下線 68 if (request.serviceId == 1001) { 69 onlineName.put(request.requestId, request.name); 70 } 71 if (request.serviceId == 1004) { 72 onlineName.removeByKey(request.requestId); 73 } 74 75 initOnline(); 76 } else if (request.serviceId == 1003) { // 有人發消息 77 appendBanana(request.name, request.message); 78 } 79 } 80 } 81 }; 82 socket.onopen = function(event) { 83 $("#name").removeAttr("disabled"); 84 $("#onlineBtn").removeAttr("disabled"); 85 console.log("已連接服務器"); 86 }; 87 socket.onclose = function(event) { // WebSocket 關閉 88 console.log("WebSocket已經關閉!"); 89 }; 90 socket.onerror = function(event) { 91 console.log("WebSocket異常!"); 92 }; 93 } else { 94 alert("抱歉,您的瀏覽器不支持WebSocket協議!"); 95 } 96 97 // WebSocket發送請求 98 function send(message) { 99 if (!window.WebSocket) { return; } 100 if (socket.readyState == WebSocket.OPEN) { 101 socket.send(message); 102 } else { 103 console.log("WebSocket連接沒有建立成功!"); 104 alert("您還未連接上服務器,請刷新頁面重試"); 105 } 106 } 107 108 // 刷新上線人員 109 function initOnline() { 110 $("#showOnlineNames").empty(); 111 for (var i=0;i<onlineName.size();i++) { 112 $("#showOnlineNames").append('<tr><td>' + (i+1) + '</td>' + 113 '<td>' + onlineName.element(i).value + '</td>' + 114 '</tr>'); 115 } 116 } 117 // 追加聊天信息 118 function appendBanana(name, message) { 119 $("#showBanana").append('<tr><td>' + name + ': ' + message + '</td></tr>'); 120 } 121 122 $("#onlineBtn").bind("click", function() { 123 var name = $("#name").val(); 124 if (name == null || name == '') { 125 alert("請輸入您的尊姓大名"); 126 return; 127 } 128 129 nameOnline = name; 130 // 上線 131 send(JSON.stringify({"requestId":uuid, "serviceId":1001, "name":name})); 132 }); 133 134 $("#downlineBtn").bind("click", function() { 135 // 下線 136 send(JSON.stringify({"requestId":uuid, "serviceId":1004})); 137 }); 138 139 $("#sendBtn").bind("click", function() { 140 var message = $("#messageInput").val(); 141 if (message == null || message == '') { 142 alert("請輸入您的聊天信息"); 143 return; 144 } 145 146 // 發送聊天消息 147 send(JSON.stringify({"requestId":uuid, "serviceId":1002, "name":nameOnline, "message":message})); 148 $("#messageInput").val(""); 149 }); 150 151 }); 152 153 function guid() { 154 function S4() { 155 return (((1+Math.random())*0x10000)|0).toString(16).substring(1); 156 } 157 return (S4()+S4()+"-"+S4()+"-"+S4()+"-"+S4()+"-"+S4()+S4()+S4()); 158 } 159 </script> 160 <body> 161 <h1>Netty WebSocket 聊天實例</h1> 162 <input type="text" id="name" value="佚名" placeholder="姓名" /> 163 <input type="button" id="onlineBtn" value="上線" /> 164 <input type="button" id="downlineBtn" value="下線" /> 165 <hr/> 166 <table id="banana" border="1" > 167 <tr> 168 <td width="600" align="center">聊天</td> 169 <td width="100" align="center">上線人員</td> 170 </tr> 171 <tr height="200" valign="top"> 172 <td> 173 <table id="showBanana" border="0" width="600"> 174 <!-- 175 <tr> 176 <td>張三: 大家好</td> 177 </tr> 178 <tr> 179 <td>李四: 歡迎加入群聊</td> 180 </tr> 181 --> 182 </table> 183 </td> 184 <td> 185 <table id="showOnlineNames" border="0"> 186 <!-- 187 <tr> 188 <td>1</td> 189 <td>張三</td> 190 <tr/> 191 <tr> 192 <td>2</td> 193 <td>李四</td> 194 <tr/> 195 --> 196 </table> 197 </td> 198 </tr> 199 <tr height="40"> 200 <td></td> 201 <td></td> 202 </tr> 203 <tr> 204 <td> 205 <input type="text" id="messageInput" style="width:590px" placeholder="巴拉巴拉點什么吧" /> 206 </td> 207 <td> 208 <input type="button" id="sendBtn" value="發送" /> 209 </td> 210 </tr> 211 </table> 212 213 </body> 214 </html>
分別運行WebSocketServer和ReservedWebSocketServer,運行日志如下:
============先adapter后handler==============
============連接開始========================
adapter handlerAdded = = adapter
channel handlerAdded = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
channel Read0 = = handler with http request :
adapter is write in DefaultChannelPromise@3b536aab(incomplete) with message :
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
============上線用戶========================
channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1001,"name":"佚名"}
adapter is write in DefaultChannelPromise@61ace442(incomplete) with socket message : {"serviceId":1001,"isSucc":true,"message":"注冊成功","hadOnline":{"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d":"佚名"}}
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
============發送信息========================
channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1002,"name":"佚名","message":"queue"}
adapter is write in DefaultChannelPromise@5a4c689a(incomplete) with socket message : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1003,"name":"佚名","message":"queue"}
adapter is flush
adapter is write in DefaultChannelPromise@3cfa9e57(incomplete) with socket message : {"serviceId":1002,"isSucc":true,"message":"發送消息成功","hadOnline":{}}
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
============下線用戶========================
channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1004}
adapter is write in DefaultChannelPromise@b6ce0dc(incomplete) with socket message : {"serviceId":1004,"isSucc":true,"message":"下線成功","hadOnline":{}}
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
============用戶斷線========================
channel Read0 = = handler with socket request : �
adapter is write in DefaultChannelPromise@567e3360(incomplete) close reason :
adapter is flush
adapter is close in DefaultChannelPromise@1b6673fb(success)
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:49245])
channel Inactive = = handler
channel close = = handler
adapter is close in DefaultChannelPromise@5b261d0a(success)
channel Unregistered = = handler
channel handlerRemoved = = handler
adapter handlerRemoved = = adapter
以及
============先adapter后handler==============
============連接開始========================
channel handlerAdded = = handler
adapter handlerAdded = = adapter
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
channel Read0 = = handler with http request :
adapter is write in DefaultChannelPromise@171fb888(incomplete) with message :
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
============上線用戶========================
channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1001,"name":"佚名"}
adapter is write in DefaultChannelPromise@54042c55(incomplete) with socket message : {"serviceId":1001,"isSucc":true,"message":"注冊成功","hadOnline":{"70d182cf-b0ae-27ba-296d-33bd3ab5177b":"佚名"}}
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
============發送信息========================
channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1002,"name":"佚名","message":"queue"}
adapter is write in DefaultChannelPromise@324cb9a(incomplete) with socket message : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1003,"name":"佚名","message":"queue"}
adapter is write in DefaultChannelPromise@269f3a70(incomplete) with socket message : {"serviceId":1002,"isSucc":true,"message":"發送消息成功","hadOnline":{}}
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
============下線用戶========================
channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1004}
adapter is write in DefaultChannelPromise@2f6a67d7(incomplete) with socket message : {"serviceId":1004,"isSucc":true,"message":"下線成功","hadOnline":{}}
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
============用戶斷線========================
channel Read0 = = handler with socket request : �
adapter is write in DefaultChannelPromise@5dff633(incomplete) close reason :
adapter is flush
adapter is close in DefaultChannelPromise@1e58e8f0(success)
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:64671])
channel Inactive = = handler
channel close = = handler
channel Unregistered = = handler
adapter handlerRemoved = = adapter
channel handlerRemoved = = handler
除了運行順序不同,outBoundAdapter的flush操作也多了幾次,尤其在發送這一塊,因為不僅要接收數據包,還要發送數據包,要多刷新adapter。
由此可見,netty的pipeline一定要仔細規划,能先讓服務器處理就先讓服務器處理,把outbound攔截器放在inbound攔截器前面。
