java根据Netty的WebSocket协议开发


1、Websocketservice

 1 package websocket;
 2 
 3 import io.netty.bootstrap.ServerBootstrap;
 4 import io.netty.channel.Channel;
 5 import io.netty.channel.ChannelInitializer;
 6 import io.netty.channel.ChannelPipeline;
 7 import io.netty.channel.EventLoopGroup;
 8 import io.netty.channel.nio.NioEventLoopGroup;
 9 import io.netty.channel.socket.SocketChannel;
10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 import io.netty.handler.codec.http.HttpObjectAggregator;
12 import io.netty.handler.codec.http.HttpServerCodec;
13 import io.netty.handler.stream.ChunkedWriteHandler;
14 
15 
16 /**
17  * Created by lz on 2016/8/12.
18  */
19 public class WebSocketServer {
20     public void run(int port) throws Exception{
21         //创建两组线程,监听连接和工作
22         EventLoopGroup bossGroup = new NioEventLoopGroup();
23         EventLoopGroup workerGroup = new NioEventLoopGroup();
24         try{
25             //Netty用于启动Nio服务端的启动类
26             ServerBootstrap b = new ServerBootstrap();
27             b.group(bossGroup,workerGroup)
28                     //注册NioServerSocketChannel
29                     .channel(NioServerSocketChannel.class)
30                      //注册处理器
31                     .childHandler(new ChannelInitializer<SocketChannel>() {
32                         @Override
33                         protected void initChannel(SocketChannel socketChannel) throws Exception {
34                             ChannelPipeline pipeline = socketChannel.pipeline();
35                             //用于Http请求的编码或者解码
36                             pipeline.addLast("http-codec", new HttpServerCodec());
37                             //把Http消息组成完整地HTTP消息
38                             pipeline.addLast("aggregator", new HttpObjectAggregator(65536));
39                             //向客户端发送HTML5文件
40                             pipeline.addLast("http-chunked", new ChunkedWriteHandler());
41                             //实际处理的Handler
42                             pipeline.addLast("handler", new WebSocketServerHandler());
43                         }
44                     });
45             Channel ch = b.bind(port).sync().channel();
46             System.out.println("Web socket server started at port " + port + '.');
47             System.out.println("Open your browser and navigate to http://localhost:" + port + '/');
48             ch.closeFuture().sync();
49         }finally {
50             bossGroup.shutdownGracefully();
51             workerGroup.shutdownGracefully();
52         }
53     }
54 
55     public static void main(String[] args) throws Exception {
56         int port = 8080;
57          new WebSocketServer().run(port);
58     }
59 }

2、主要是处理我们系统逻辑的Handler;

  1 package websocket;
  2 
  3 import io.netty.buffer.ByteBuf;
  4 import io.netty.buffer.Unpooled;
  5 import io.netty.channel.*;
  6 import io.netty.handler.codec.http.*;
  7 import io.netty.handler.codec.http.websocketx.*;
  8 import io.netty.util.CharsetUtil;
  9 
 10 import java.util.Date;
 11 import java.util.logging.Level;
 12 import java.util.logging.Logger;
 13 
 14 import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
 15 import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
 16 
 17 /**
 18  * Created by lz on 2016/8/12.
 19  */
 20 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
 21     private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName());
 22 
 23     private WebSocketServerHandshaker handshaker;
 24     @Override
 25     protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
 26         //传统的HTTP接入
 27         if (msg instanceof FullHttpRequest){
 28             handleHttpRequest(ctx,(FullHttpRequest)msg);
 29         }//WebSocket接入
 30         else if (msg instanceof WebSocketFrame){
 31             handleWebSocketFrame(ctx,(WebSocketFrame)msg);
 32         }
 33     }
 34 
 35     private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
 36         //判断是否关闭链路指令
 37         if (frame instanceof CloseWebSocketFrame){
 38             handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
 39             return;
 40         }
 41         //判断是否是Ping消息
 42         if (frame instanceof PingWebSocketFrame){
 43             ctx.channel().write(new PongWebSocketFrame(frame.content()).retain());
 44             return;
 45         }
 46         //本例程仅支持文本信息,不支持二进制消息
 47         if (!(frame instanceof TextWebSocketFrame)){
 48             throw new UnsupportedOperationException(String.format("%s frame types not supported",frame.getClass().getName()));
 49         }
 50         //返回应答消息
 51         String request = ((TextWebSocketFrame) frame).text();
 52         if (logger.isLoggable(Level.FINE)){
 53             logger.fine(String.format("%s received %s",ctx.channel(),request));
 54         }
 55         ctx.channel().write(new TextWebSocketFrame(request+"欢迎使用Netty WebSocket服务,现在时刻:" + new Date().toString()));
 56     }
 57     private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,FullHttpResponse res){
 58         //返回应答给客户端
 59         if (res.getStatus().code() != 200){
 60             ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
 61             res.content().writeBytes(buf);
 62             buf.release();
 63             setContentLength(res,res.content().readableBytes());
 64         }
 65         //如果是非Keep-Alive,关闭连接
 66         ChannelFuture f = ctx.channel().writeAndFlush(res);
 67         if (!isKeepAlive(req) || res.getStatus().code() != 200){
 68             f.addListener(ChannelFutureListener.CLOSE);
 69         }
 70     }
 71 
 72 
 73     private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception{
 74         //如果HTTP解码失败,返回HTTP异常
 75         if (!req.getDecoderResult().isSuccess()
 76                 || (!"websocket".equals(req.headers().get("Upgrade")))){
 77             sendHttpResponse(ctx,req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
 78             return;
 79         }
 80         //构造握手响应返回,本机测试
 81         WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket",null,false);
 82         handshaker = wsFactory.newHandshaker(req);
 83         if (handshaker == null){
 84             WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
 85         }else {
 86             //把握手消息返回给客户端
 87             handshaker.handshake(ctx.channel(),req);
 88         }
 89     }
 90 
 91 
 92     @Override
 93     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
 94         ctx.flush();
 95     }
 96 
 97     @Override
 98     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
 99         cause.printStackTrace();
100         ctx.close();
101     }
102 }

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM