netty的ChannelPipeline執行順序對inBound和outBound執行器造成的影響


  進行這項實驗之前,先讀了xbmchina的簡書文章,感謝這位大神提供的關於channelPipeline和channelHandler文章:

  【Netty】ChannelPipeline和ChannelHandler(一)

  【Netty】ChannelHandler的添加和刪除(二)

  【Netty】inBound和outBound事件的傳播過程

  之前想以leonzm的websocket_demo項目為基礎,寫netty4版本的聊天室,但是發現netty4的函數不一樣,messageReceived(建立鏈接/接收數據包)和close(斷開鏈接)不能覆寫,研究了下handler的生命周期。知道channelRead0可以建立鏈接,並接收已建立鏈接的客戶端的數據包;當隧道處於channelInactived階段時,表明數據隧道(鏈接)要斷開了,就要進入channelUnregistered階段,這時就可以在上面執行鏈接相關數據清除工作;隧道的處理器ChannelHandler也有生命周期,handlerRemoved時也可以執行類似操作。

  netty的inbound和outbound的區別:除了inbound事件為被動觸發,在某些情況發生時自動觸發,outbound為主動觸發,在需要主動執行某些操作時觸發以外,outBound單獨用不能接收到websocket客戶端的信息(這是向外主動發信息的handler,接收信息要inbound來),outBound這個跟適合在pipeline流水線上嵌入,做AOP(切面編程)。

  開始執行channelPipeline流水線程序比較:

  Lanucher.java:(開啟netty服務的主函數)

 1 package com.company.lanucher;
 2 
 3 import com.company.server.ReversedWebSocketServer;
 4 import com.company.server.WebSocketServer;
 5 
 6 public class Lanucher {
 7 
 8     public static void main(String[] args) throws Exception {
 9         // 啟動WebSocket,如果想開啟另一個服務器,注釋掉Reversed,再解除WebSocketServer的注釋即可
10         //new WebSocketServer().run(WebSocketServer.WEBSOCKET_PORT);
11         new ReversedWebSocketServer().run(ReversedWebSocketServer.WEBSOCKET_PORT);
12     }
13     
14 }
Lanucher.java

  WebSocketServer.java:(流水線先執行inBoundHandler再執行OutBoundAdapter)

 1 package com.company.server;
 2 
 3 import org.apache.log4j.Logger;
 4 
 5 import io.netty.bootstrap.ServerBootstrap;
 6 import io.netty.channel.Channel;
 7 import io.netty.channel.ChannelInitializer;
 8 import io.netty.channel.ChannelPipeline;
 9 import io.netty.channel.EventLoopGroup;
10 import io.netty.channel.nio.NioEventLoopGroup;
11 import io.netty.channel.socket.nio.NioServerSocketChannel;
12 import io.netty.handler.codec.http.HttpObjectAggregator;
13 import io.netty.handler.codec.http.HttpServerCodec;
14 import io.netty.handler.stream.ChunkedWriteHandler;
15 
16 /**
17  * WebSocket服務
18  *
19  */
20 public class WebSocketServer {
21     private static final Logger LOG = Logger.getLogger(WebSocketServer.class);
22     
23     // websocket端口
24     public static final int WEBSOCKET_PORT = 9090;
25 
26     public void run(int port) throws Exception {
27         EventLoopGroup bossGroup = new NioEventLoopGroup();
28         EventLoopGroup workerGroup = new NioEventLoopGroup();
29         try {
30             ServerBootstrap b = new ServerBootstrap();
31             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
32 
33                 @Override
34                 protected void initChannel(Channel channel) throws Exception {
35                     ChannelPipeline pipeline = channel.pipeline();
36                     pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息編碼解碼
37                     pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息組裝
38                     pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持
39                     pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服務端Handler的前置攔截器
40                     pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服務端Handler
41                 }
42             });
43             
44             Channel channel = b.bind(port).sync().channel();
45             LOG.info("WebSocket 已經啟動,端口:" + port + ".");
46             channel.closeFuture().sync();
47         } finally {
48             bossGroup.shutdownGracefully();
49             workerGroup.shutdownGracefully();
50         }
51     }
52 }
WebSocketServer.java

  ReversedWebSocketServer.java:(流水線先執行OutBoundAdapter再執行inBoundHandler)

 1 package com.company.server;
 2 
 3 import org.apache.log4j.Logger;
 4 
 5 import io.netty.bootstrap.ServerBootstrap;
 6 import io.netty.channel.Channel;
 7 import io.netty.channel.ChannelInitializer;
 8 import io.netty.channel.ChannelPipeline;
 9 import io.netty.channel.EventLoopGroup;
10 import io.netty.channel.nio.NioEventLoopGroup;
11 import io.netty.channel.socket.nio.NioServerSocketChannel;
12 import io.netty.handler.codec.http.HttpObjectAggregator;
13 import io.netty.handler.codec.http.HttpServerCodec;
14 import io.netty.handler.stream.ChunkedWriteHandler;
15 
16 public class ReversedWebSocketServer {
17     private static final Logger LOG = Logger.getLogger(WebSocketServer.class);
18     
19     // websocket端口
20     public static final int WEBSOCKET_PORT = 9090;
21     public static final int FUN_WEBSOCKET_PORT = 9091;
22 
23     public void run(int port) throws Exception {
24         EventLoopGroup bossGroup = new NioEventLoopGroup();
25         EventLoopGroup workerGroup = new NioEventLoopGroup();
26         try {
27             ServerBootstrap b = new ServerBootstrap();
28             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<Channel>() {
29 
30                 @Override
31                 protected void initChannel(Channel channel) throws Exception {
32                     ChannelPipeline pipeline = channel.pipeline();
33                     pipeline.addLast("http-codec", new HttpServerCodec()); // Http消息編碼解碼
34                     pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); // Http消息組裝
35                     pipeline.addLast("http-chunked", new ChunkedWriteHandler()); // WebSocket通信支持
36                     pipeline.addLast("handler", new BananaWebSocketServerHandler()); // WebSocket服務端Handler
37                     pipeline.addLast("adapter", new FunWebSocketServerHandler()); // WebSocket服務端Handler
38                 }
39             });
40             
41             Channel channel = b.bind(port).sync().channel();
42             LOG.info("WebSocket 已經啟動,端口:" + port + ".");
43             channel.closeFuture().sync();
44         } finally {
45             bossGroup.shutdownGracefully();
46             workerGroup.shutdownGracefully();
47         }
48     }
49     
50 }
ReversedWebSocketServer.java

  BananaWebSocketServerHandler.java:(inBoundHandler,處理從客戶端接收的請求)

  1 package com.company.server;
  2 
  3 import io.netty.buffer.ByteBuf;
  4 import io.netty.buffer.Unpooled;
  5 import io.netty.channel.ChannelFuture;
  6 import io.netty.channel.ChannelFutureListener;
  7 import io.netty.channel.ChannelHandlerContext;
  8 import io.netty.channel.ChannelPromise;
  9 import io.netty.channel.SimpleChannelInboundHandler;
 10 import io.netty.handler.codec.http.DefaultFullHttpResponse;
 11 import io.netty.handler.codec.http.FullHttpRequest;
 12 import io.netty.handler.codec.http.FullHttpResponse;
 13 import io.netty.handler.codec.http.HttpHeaders;
 14 import io.netty.handler.codec.http.HttpResponseStatus;
 15 import io.netty.handler.codec.http.HttpVersion;
 16 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
 17 import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
 18 import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
 19 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
 20 import io.netty.handler.codec.http.websocketx.WebSocketFrame;
 21 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
 22 import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
 23 import io.netty.util.CharsetUtil;
 24 
 25 import org.apache.log4j.Logger;
 26 
 27 import com.company.serviceimpl.BananaService;
 28 import com.company.util.BufToString;
 29 import com.company.util.CODE;
 30 import com.company.util.Request;
 31 import com.company.util.Response;
 32 import com.google.common.base.Strings;
 33 import com.google.gson.JsonSyntaxException;
 34 
 35 
 36 /**
 37  * WebSocket服務端Handler
 38  *
 39  */
 40 public class BananaWebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
 41     private static final Logger LOG = Logger.getLogger(BananaWebSocketServerHandler.class.getName());
 42     
 43     private WebSocketServerHandshaker handshaker;
 44     private ChannelHandlerContext ctx;
 45     private String sessionId;
 46     private boolean isLog = true;
 47     
 48     public BananaWebSocketServerHandler() {
 49         super();
 50     }
 51     
 52     public BananaWebSocketServerHandler(boolean isLog) {
 53         this();
 54         this.isLog = isLog;
 55     }
 56 
 57     //netty 5的覆寫函數,netty4中用channelRead0代替
 58     public void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
 59         if(this.isLog) {
 60             System.out.print("channel MessageReceived = = " + ctx.name());
 61         }
 62         if (msg instanceof FullHttpRequest) { // 傳統的HTTP接入
 63             FullHttpRequest mymsg = (FullHttpRequest) msg;
 64             System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content()));
 65             handleHttpRequest(ctx, mymsg);
 66         } else if (msg instanceof WebSocketFrame) { // WebSocket接入
 67             WebSocketFrame mymsg = (WebSocketFrame) msg;
 68             System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content()));
 69             handleWebSocketFrame(ctx, mymsg);
 70         }
 71     }
 72     
 73     @Override
 74     public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
 75         System.out.println("channel handlerAdded = = " + ctx.name());
 76     }
 77     
 78     @Override
 79     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
 80         System.out.println("channel handlerRemoved = = " + ctx.name());
 81     }
 82     
 83     @Override
 84     protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
 85         if(this.isLog) {
 86             System.out.print("channel Read0 = = " + ctx.name());
 87         }
 88         if (msg instanceof FullHttpRequest) { // 傳統的HTTP接入
 89             FullHttpRequest mymsg = (FullHttpRequest) msg;
 90             System.out.println(" with http request : " + BufToString.convertByteBufToString(mymsg.content()));
 91             handleHttpRequest(ctx, mymsg);
 92         } else if (msg instanceof WebSocketFrame) { // WebSocket接入
 93             WebSocketFrame mymsg = (WebSocketFrame) msg;
 94             System.out.println(" with socket request : " + BufToString.convertByteBufToString(mymsg.content()));
 95             handleWebSocketFrame(ctx, mymsg);
 96         }
 97     }
 98     
 99     @Override
100     public void channelInactive(ChannelHandlerContext ctx) {
101         if(this.isLog) {
102             System.out.println("channel Inactive = = " + ctx.name());
103         }
104         try {
105             this.close(ctx, null);
106         } catch (Exception e) {
107             e.printStackTrace();
108         }
109     }
110     
111     @Override
112     public void channelUnregistered(ChannelHandlerContext ctx) {
113         if(this.isLog) {
114             System.out.println("channel Unregistered = = " + ctx.name());
115         }
116     }
117 
118     @Override
119     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
120         ctx.flush();
121         System.out.println("channel Flush = = " + ctx.name());
122     }
123     
124     @Override
125     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
126 
127         ctx.close();
128         if(this.isLog) {
129             System.err.println("channel exceptionCaught = = " + ctx.name());
130             cause.printStackTrace();
131         }
132         BananaService.logout(sessionId); // 注銷
133         BananaService.notifyDownline(sessionId); // 通知有人下線
134     }
135 
136     //netty 5的覆寫函數,netty4中用channelInactive代替
137     public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
138         if(this.isLog) {
139             System.out.println("channel close = = " + ctx.name());
140         }
141         BananaService.logout(sessionId); // 注銷
142         BananaService.notifyDownline(sessionId); // 通知有人下線
143         ctx.close();
144     }
145 
146     /**
147      * 處理Http請求,完成WebSocket握手<br/>
148      * 注意:WebSocket連接第一次請求使用的是Http
149      * @param ctx
150      * @param request
151      * @throws Exception
152      */
153     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception {
154         // 如果HTTP解碼失敗,返回HHTP異常
155         if (!request.getDecoderResult().isSuccess() || (!"websocket".equals(request.headers().get("Upgrade")))) {
156             sendHttpResponse(ctx, request, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
157             return;
158         }
159 
160         // 正常WebSocket的Http連接請求,構造握手響應返回
161         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://" + request.headers().get(HttpHeaders.Names.HOST), null, false);
162         handshaker = wsFactory.newHandshaker(request);
163         if (handshaker == null) { // 無法處理的websocket版本
164             WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
165         } else { // 向客戶端發送websocket握手,完成握手
166             handshaker.handshake(ctx.channel(), request);
167             // 記錄管道處理上下文,便於服務器推送數據到客戶端
168             this.ctx = ctx;
169         }
170     }
171 
172     /**
173      * 處理Socket請求
174      * @param ctx
175      * @param frame
176      * @throws Exception 
177      */
178     private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
179         // 判斷是否是關閉鏈路的指令
180         if (frame instanceof CloseWebSocketFrame) {
181             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
182             return;
183         }
184         // 判斷是否是Ping消息
185         if (frame instanceof PingWebSocketFrame) {
186             ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
187             return;
188         }
189         // 當前只支持文本消息,不支持二進制消息
190         if (!(frame instanceof TextWebSocketFrame)) {
191             throw new UnsupportedOperationException("當前只支持文本消息,不支持二進制消息");
192         }
193         
194         // 處理來自客戶端的WebSocket請求
195         try {
196             /*
197             if(this.isLog) {
198                 System.out.println("handleWebSocketFrame-=-=-" + ((TextWebSocketFrame)frame).text());
199             }
200             */
201             Request request = Request.create(((TextWebSocketFrame)frame).text());
202             Response response = new Response();
203             response.setServiceId(request.getServiceId());
204             if (CODE.online.code.intValue() == request.getServiceId()) { // 客戶端注冊
205                 String requestId = request.getRequestId();
206                 if (Strings.isNullOrEmpty(requestId)) {
207                     response.setIsSucc(false).setMessage("requestId不能為空");
208                     return;
209                 } else if (Strings.isNullOrEmpty(request.getName())) {
210                     response.setIsSucc(false).setMessage("name不能為空");
211                     return;
212                 } else if (BananaService.bananaWatchMap.containsKey(requestId)) {
213                     response.setIsSucc(false).setMessage("您已經注冊了,不能重復注冊");
214                     return;
215                 }
216                 if (!BananaService.register(requestId, new BananaService(ctx, request.getName()))) {
217                     response.setIsSucc(false).setMessage("注冊失敗");
218                 } else {
219                     response.setIsSucc(true).setMessage("注冊成功");
220                     
221                     BananaService.bananaWatchMap.forEach((reqId, callBack) -> {
222                         response.getHadOnline().put(reqId, ((BananaService)callBack).getName()); // 將已經上線的人員返回
223                         
224                         if (!reqId.equals(requestId)) {
225                             Request serviceRequest = new Request();
226                             serviceRequest.setServiceId(CODE.online.code);
227                             serviceRequest.setRequestId(requestId);
228                             serviceRequest.setName(request.getName());
229                             try {
230                                 callBack.send(serviceRequest); // 通知有人上線
231                             } catch (Exception e) {
232                                 LOG.warn("回調發送消息給客戶端異常", e);
233                             }
234                         }
235                     });
236                 }
237                 sendWebSocket(response.toJson());
238                 this.sessionId = requestId; // 記錄會話id,當頁面刷新或瀏覽器關閉時,注銷掉此鏈路
239             } else if (CODE.send_message.code.intValue() == request.getServiceId()) { // 客戶端發送消息到聊天群
240                 String requestId = request.getRequestId();
241                 if (Strings.isNullOrEmpty(requestId)) {
242                     response.setIsSucc(false).setMessage("requestId不能為空");
243                 } else if (Strings.isNullOrEmpty(request.getName())) {
244                     response.setIsSucc(false).setMessage("name不能為空");
245                 } else if (Strings.isNullOrEmpty(request.getMessage())) {
246                     response.setIsSucc(false).setMessage("message不能為空");
247                 } else {
248                     response.setIsSucc(true).setMessage("發送消息成功");
249                     
250                     BananaService.bananaWatchMap.forEach((reqId, callBack) -> { // 將消息發送到所有機器
251                         Request serviceRequest = new Request();
252                         serviceRequest.setServiceId(CODE.receive_message.code);
253                         serviceRequest.setRequestId(requestId);
254                         serviceRequest.setName(request.getName());
255                         serviceRequest.setMessage(request.getMessage());
256                         try {
257                             callBack.send(serviceRequest);
258                         } catch (Exception e) {
259                             LOG.warn("回調發送消息給客戶端異常", e);
260                         }
261                     });
262                 }
263                 sendWebSocket(response.toJson());
264             } else if (CODE.downline.code.intValue() == request.getServiceId()) { // 客戶端下線
265                 String requestId = request.getRequestId();
266                 if (Strings.isNullOrEmpty(requestId)) {
267                     sendWebSocket(response.setIsSucc(false).setMessage("requestId不能為空").toJson());
268                 } else {
269                     BananaService.logout(requestId);
270                     response.setIsSucc(true).setMessage("下線成功");
271                     
272                     BananaService.notifyDownline(requestId); // 通知有人下線
273                     
274                     sendWebSocket(response.toJson());
275                 }
276                 
277             } else {
278                 sendWebSocket(response.setIsSucc(false).setMessage("未知請求").toJson());
279             }
280         } catch (JsonSyntaxException e1) {
281             LOG.warn("Json解析異常", e1);
282             System.err.println("Json解析異常");
283             e1.printStackTrace();
284         } catch (Exception e2) {
285             LOG.error("處理Socket請求異常", e2);
286             System.err.println("處理Socket請求異常");
287             e2.printStackTrace();
288         }
289     }
290 
291     /**
292      * Http返回
293      * @param ctx
294      * @param request
295      * @param response
296      */
297     private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest request, FullHttpResponse response) {
298         // 返回應答給客戶端
299         if (response.getStatus().code() != 200) {
300             ByteBuf buf = Unpooled.copiedBuffer(response.getStatus().toString(), CharsetUtil.UTF_8);
301             response.content().writeBytes(buf);
302             buf.release();
303             HttpHeaders.setContentLength(response, response.content().readableBytes());
304         }
305 
306         // 如果是非Keep-Alive,關閉連接
307         ChannelFuture f = ctx.channel().writeAndFlush(response);
308         if (!HttpHeaders.isKeepAlive(request) || response.getStatus().code() != 200) {
309             f.addListener(ChannelFutureListener.CLOSE);
310         }
311     }
312     
313     /**
314      * WebSocket返回
315      * @param ctx
316      * @param req
317      * @param res
318      */
319     public void sendWebSocket(String msg) throws Exception {
320         if (this.handshaker == null || this.ctx == null || this.ctx.isRemoved()) {
321             throw new Exception("尚未握手成功,無法向客戶端發送WebSocket消息");
322         }
323         this.ctx.channel().write(new TextWebSocketFrame(msg));
324         this.ctx.flush();
325     }
326 
327 }
BananaWebSocketServerHandler.java

  FunWebSocketServerHandler.java:(outBoundAdapter,處理從服務器發出的響應)

 1 package com.company.server;
 2 
 3 import java.net.SocketAddress;
 4 
 5 import com.company.util.BufToString;
 6 
 7 import io.netty.channel.ChannelHandlerContext;
 8 import io.netty.channel.ChannelOutboundHandlerAdapter;
 9 import io.netty.channel.ChannelPromise;
10 import io.netty.handler.codec.http.DefaultFullHttpResponse;
11 import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
12 import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
13 
14 public class FunWebSocketServerHandler extends ChannelOutboundHandlerAdapter{
15     
16     @Override
17     public void read(ChannelHandlerContext ctx) throws Exception {  
18         ChannelHandlerContext readRes = ctx.read();  
19         System.out.println(ctx.name() + " is read in " + readRes.toString());
20     }
21     
22     @Override
23     public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
24         System.out.println(ctx.name() + " handlerAdded = = " + ctx.name());
25     }
26     
27     @Override
28     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
29         System.out.println(ctx.name() + " handlerRemoved = = " + ctx.name());
30     }
31     
32     @Override
33     public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,  
34             ChannelPromise promise) throws Exception {  
35         ctx.bind(localAddress, promise);  
36         System.out.println(ctx.name() + " is bind in " + localAddress.toString() + " in " + promise.toString());
37     }
38     @Override
39     public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,  
40             SocketAddress localAddress, ChannelPromise promise) throws Exception {  
41         ctx.connect(remoteAddress, localAddress, promise);  
42         System.out.println(ctx.name() + " is connect in " + localAddress.toString() + " in client "  + remoteAddress.toString() + " in " + promise.toString());
43     }   
44     @Override
45     public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)  
46             throws Exception {  
47         ctx.disconnect(promise);  
48         System.out.println(ctx.name() + " is disconnect in " + promise.toString());
49     }   
50     @Override
51     public void close(ChannelHandlerContext ctx, ChannelPromise promise)  
52             throws Exception {  
53         ctx.close(promise);  
54         System.out.println(ctx.name() + " is close in " + promise.toString());
55     }   
56     @Override
57     public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {  
58         ctx.deregister(promise); 
59         System.out.println(ctx.name() + " is deregister in " + promise.toString());
60     }   
61 
62     @Override
63     public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {  
64         ctx.write(msg, promise);  
65         System.out.print(ctx.name() + " is write in " + promise.toString());
66         if(msg instanceof DefaultFullHttpResponse) {
67             System.out.println(" with message : " + BufToString.convertByteBufToString(((DefaultFullHttpResponse)msg).content()));
68         }
69         else if(msg instanceof TextWebSocketFrame) {
70             System.out.println(" with socket message : " + ((TextWebSocketFrame)msg).text());
71         }
72         else if(msg instanceof CloseWebSocketFrame) {
73             System.out.println(" close reason : " + ((CloseWebSocketFrame)msg).reasonText());
74         }
75         else {
76             System.out.println(" with message : " + msg.getClass());
77         }
78     }   
79     @Override
80     public void flush(ChannelHandlerContext ctx) throws Exception {  
81         ctx.flush();  
82         System.out.println(ctx.name() + " is flush");
83     }  
84 }
FunWebSocketServerHandler.java

  banana.html:(聊天室前端)

  1 <!DOCTYPE html>
  2 <html>
  3 <head>
  4 <meta charset="UTF-8">
  5 <title>Netty WebSocket 聊天實例</title>
  6 </head>
  7 <script src="https://cdn.staticfile.org/jquery/1.10.2/jquery.min.js" type="text/javascript"></script>
  8 <script src="map.js" type="text/javascript"></script>
  9 <script type="text/javascript">
 10 $(document).ready(function() {
 11     var uuid = guid(); // uuid在一個會話唯一
 12     var nameOnline = ''; // 上線姓名
 13     var onlineName = new Map(); // 已上線人員, <requestId, name>
 14     
 15     $("#name").attr("disabled","disabled");
 16     $("#onlineBtn").attr("disabled","disabled");
 17     $("#downlineBtn").attr("disabled","disabled");
 18     
 19     $("#banana").hide();
 20 
 21     // 初始化websocket
 22     var socket;
 23     if (!window.WebSocket) {
 24         window.WebSocket = window.MozWebSocket;
 25     }
 26     if (window.WebSocket) {
 27         socket = new WebSocket("ws://localhost:9090/");
 28         socket.onmessage = function(event) {
 29             console.log("收到服務器消息:" + event.data);
 30             if (event.data.indexOf("isSucc") != -1) {// 這里需要判斷是客戶端請求服務端返回后的消息(response)
 31                 var response = JSON.parse(event.data);
 32                 if (response != undefined && response != null) {
 33                     if (response.serviceId == 1001) { // 上線
 34                         if (response.isSucc) {
 35                             // 上線成功,初始化已上線人員
 36                             onlineName.clear();
 37                             $("#showOnlineNames").empty();
 38                             for (var reqId in response.hadOnline) {
 39                                 onlineName.put(reqId, response.hadOnline[reqId]);
 40                             }
 41                             initOnline();
 42                             
 43                             $("#name").attr("disabled","disabled");
 44                             $("#onlineBtn").attr("disabled","disabled");
 45                             $("#downlineBtn").removeAttr("disabled");
 46                             $("#banana").show();
 47                         } else {
 48                             alert("上線失敗");
 49                         }
 50                     } else if (response.serviceId == 1004) {
 51                         if (response.isSucc) {
 52                             onlineName.clear();
 53                             $("#showBanana").empty();
 54                             $("#showOnlineNames").empty();
 55                             $("#name").removeAttr("disabled");
 56                             $("#onlineBtn").removeAttr("disabled");
 57                             $("#downlineBtn").attr("disabled","disabled");
 58                             $("#banana").hide();
 59                         } else {
 60                             alert("下線失敗");
 61                         }
 62                     }
 63                 }
 64             } else {// 還是服務端向客戶端的請求(request)
 65                 var request = JSON.parse(event.data);
 66                 if (request != undefined && request != null) {
 67                     if (request.serviceId == 1001 || request.serviceId == 1004) { // 有人上線/下線
 68                         if (request.serviceId == 1001) {
 69                             onlineName.put(request.requestId, request.name);
 70                         }
 71                         if (request.serviceId == 1004) {
 72                             onlineName.removeByKey(request.requestId);
 73                         }
 74                         
 75                         initOnline();
 76                     } else if (request.serviceId == 1003) { // 有人發消息
 77                         appendBanana(request.name, request.message);
 78                     }
 79                 }
 80             }
 81         };
 82         socket.onopen = function(event) {
 83             $("#name").removeAttr("disabled");
 84             $("#onlineBtn").removeAttr("disabled");
 85             console.log("已連接服務器");
 86         };
 87         socket.onclose = function(event) { // WebSocket 關閉
 88             console.log("WebSocket已經關閉!");
 89         };
 90         socket.onerror = function(event) {
 91             console.log("WebSocket異常!");
 92         };
 93     } else {
 94         alert("抱歉,您的瀏覽器不支持WebSocket協議!");
 95     }
 96     
 97     // WebSocket發送請求
 98     function send(message) {
 99         if (!window.WebSocket) { return; }
100         if (socket.readyState == WebSocket.OPEN) {
101             socket.send(message);
102         } else {
103             console.log("WebSocket連接沒有建立成功!");
104             alert("您還未連接上服務器,請刷新頁面重試");
105         }
106     }
107     
108     // 刷新上線人員
109     function initOnline() {
110         $("#showOnlineNames").empty();
111         for (var i=0;i<onlineName.size();i++) {
112             $("#showOnlineNames").append('<tr><td>' + (i+1) + '</td>' +
113             '<td>' + onlineName.element(i).value + '</td>' +
114             '</tr>');
115         }
116     }
117     // 追加聊天信息
118     function appendBanana(name, message) {
119         $("#showBanana").append('<tr><td>' + name + ': ' + message + '</td></tr>');
120     }
121     
122     $("#onlineBtn").bind("click", function() {
123         var name = $("#name").val();
124         if (name == null || name == '') {
125             alert("請輸入您的尊姓大名");
126             return;
127         }
128 
129         nameOnline = name;
130         // 上線
131         send(JSON.stringify({"requestId":uuid, "serviceId":1001, "name":name}));
132     });
133     
134     $("#downlineBtn").bind("click", function() {
135         // 下線
136         send(JSON.stringify({"requestId":uuid, "serviceId":1004}));
137     });
138     
139     $("#sendBtn").bind("click", function() {
140         var message = $("#messageInput").val();
141         if (message == null || message == '') {
142             alert("請輸入您的聊天信息");
143             return;
144         }
145         
146         // 發送聊天消息
147         send(JSON.stringify({"requestId":uuid, "serviceId":1002, "name":nameOnline, "message":message}));
148         $("#messageInput").val("");
149     });
150     
151 });
152 
153 function guid() {
154     function S4() {
155        return (((1+Math.random())*0x10000)|0).toString(16).substring(1);
156     }
157     return (S4()+S4()+"-"+S4()+"-"+S4()+"-"+S4()+"-"+S4()+S4()+S4());
158 }
159 </script>
160 <body>
161   <h1>Netty WebSocket 聊天實例</h1>
162   <input type="text" id="name" value="佚名" placeholder="姓名" />
163   <input type="button" id="onlineBtn" value="上線" />
164   <input type="button" id="downlineBtn" value="下線" />
165   <hr/>
166   <table id="banana" border="1" >
167     <tr>
168       <td width="600" align="center">聊天</td>
169       <td width="100" align="center">上線人員</td>
170     </tr>
171     <tr height="200" valign="top">
172       <td>
173         <table id="showBanana" border="0" width="600">
174             <!--
175             <tr>
176               <td>張三: 大家好</td>
177             </tr>
178             <tr>
179               <td>李四: 歡迎加入群聊</td>
180             </tr>
181             -->
182         </table>
183       </td>
184       <td>
185         <table id="showOnlineNames" border="0">
186             <!--
187             <tr>
188               <td>1</td>
189               <td>張三</td>
190             <tr/>
191             <tr>
192               <td>2</td>
193               <td>李四</td>
194             <tr/>
195             -->
196         </table>
197       </td>
198     </tr>
199     <tr height="40">
200       <td></td>
201       <td></td>
202     </tr>
203     <tr>
204       <td>
205         <input type="text" id="messageInput"  style="width:590px" placeholder="巴拉巴拉點什么吧" />
206       </td>
207       <td>
208         <input type="button" id="sendBtn" value="發送" />
209       </td>
210     </tr>
211   </table>
212 
213 </body>
214 </html>
banana.html

  分別運行WebSocketServer和ReservedWebSocketServer,運行日志如下:

============先adapter后handler==============
============連接開始========================

adapter handlerAdded = = adapter
channel handlerAdded = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])
channel Read0 = = handler with http request : 
adapter is write in DefaultChannelPromise@3b536aab(incomplete) with message : 
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============上線用戶========================

channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1001,"name":"佚名"}
adapter is write in DefaultChannelPromise@61ace442(incomplete) with socket message : {"serviceId":1001,"isSucc":true,"message":"注冊成功","hadOnline":{"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d":"佚名"}}
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============發送信息========================

channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1002,"name":"佚名","message":"queue"}
adapter is write in DefaultChannelPromise@5a4c689a(incomplete) with socket message : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1003,"name":"佚名","message":"queue"}
adapter is flush
adapter is write in DefaultChannelPromise@3cfa9e57(incomplete) with socket message : {"serviceId":1002,"isSucc":true,"message":"發送消息成功","hadOnline":{}}
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============下線用戶========================

channel Read0 = = handler with socket request : {"requestId":"edaffbe7-efd7-d7a7-e8bd-4a42c24bfe6d","serviceId":1004}
adapter is write in DefaultChannelPromise@b6ce0dc(incomplete) with socket message : {"serviceId":1004,"isSucc":true,"message":"下線成功","hadOnline":{}}
adapter is flush
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:49245])

============用戶斷線========================

channel Read0 = = handler with socket request : �
adapter is write in DefaultChannelPromise@567e3360(incomplete) close reason : 
adapter is flush
adapter is close in DefaultChannelPromise@1b6673fb(success)
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0x5560f152, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:49245])
channel Inactive = = handler
channel close = = handler
adapter is close in DefaultChannelPromise@5b261d0a(success)
channel Unregistered = = handler
channel handlerRemoved = = handler
adapter handlerRemoved = = adapter
WebSocketServer運行結果

  以及

============先adapter后handler==============
============連接開始========================

channel handlerAdded = = handler
adapter handlerAdded = = adapter
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])
channel Read0 = = handler with http request : 
adapter is write in DefaultChannelPromise@171fb888(incomplete) with message : 
adapter is flush
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============上線用戶========================

channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1001,"name":"佚名"}
adapter is write in DefaultChannelPromise@54042c55(incomplete) with socket message : {"serviceId":1001,"isSucc":true,"message":"注冊成功","hadOnline":{"70d182cf-b0ae-27ba-296d-33bd3ab5177b":"佚名"}}
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============發送信息========================

channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1002,"name":"佚名","message":"queue"}
adapter is write in DefaultChannelPromise@324cb9a(incomplete) with socket message : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1003,"name":"佚名","message":"queue"}
adapter is write in DefaultChannelPromise@269f3a70(incomplete) with socket message : {"serviceId":1002,"isSucc":true,"message":"發送消息成功","hadOnline":{}}
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============下線用戶========================

channel Read0 = = handler with socket request : {"requestId":"70d182cf-b0ae-27ba-296d-33bd3ab5177b","serviceId":1004}
adapter is write in DefaultChannelPromise@2f6a67d7(incomplete) with socket message : {"serviceId":1004,"isSucc":true,"message":"下線成功","hadOnline":{}}
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 - R:/0:0:0:0:0:0:0:1:64671])

============用戶斷線========================

channel Read0 = = handler with socket request : �
adapter is write in DefaultChannelPromise@5dff633(incomplete) close reason : 
adapter is flush
adapter is close in DefaultChannelPromise@1e58e8f0(success)
channel Flush = = handler
adapter is read in ChannelHandlerContext(adapter, [id: 0xa48d64f4, L:/0:0:0:0:0:0:0:1:9090 ! R:/0:0:0:0:0:0:0:1:64671])
channel Inactive = = handler
channel close = = handler
channel Unregistered = = handler
adapter handlerRemoved = = adapter
channel handlerRemoved = = handler
ReversedWebSocketServer運行結果

  除了運行順序不同,outBoundAdapter的flush操作也多了幾次,尤其在發送這一塊,因為不僅要接收數據包,還要發送數據包,要多刷新adapter。

  由此可見,netty的pipeline一定要仔細規划,能先讓服務器處理就先讓服務器處理,把outbound攔截器放在inbound攔截器前面。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM