1、Websocketservice
1 package websocket; 2 3 import io.netty.bootstrap.ServerBootstrap; 4 import io.netty.channel.Channel; 5 import io.netty.channel.ChannelInitializer; 6 import io.netty.channel.ChannelPipeline; 7 import io.netty.channel.EventLoopGroup; 8 import io.netty.channel.nio.NioEventLoopGroup; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel; 11 import io.netty.handler.codec.http.HttpObjectAggregator; 12 import io.netty.handler.codec.http.HttpServerCodec; 13 import io.netty.handler.stream.ChunkedWriteHandler; 14 15 16 /** 17 * Created by lz on 2016/8/12. 18 */ 19 public class WebSocketServer { 20 public void run(int port) throws Exception{ 21 //创建两组线程,监听连接和工作 22 EventLoopGroup bossGroup = new NioEventLoopGroup(); 23 EventLoopGroup workerGroup = new NioEventLoopGroup(); 24 try{ 25 //Netty用于启动Nio服务端的启动类 26 ServerBootstrap b = new ServerBootstrap(); 27 b.group(bossGroup,workerGroup) 28 //注册NioServerSocketChannel 29 .channel(NioServerSocketChannel.class) 30 //注册处理器 31 .childHandler(new ChannelInitializer<SocketChannel>() { 32 @Override 33 protected void initChannel(SocketChannel socketChannel) throws Exception { 34 ChannelPipeline pipeline = socketChannel.pipeline(); 35 //用于Http请求的编码或者解码 36 pipeline.addLast("http-codec", new HttpServerCodec()); 37 //把Http消息组成完整地HTTP消息 38 pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); 39 //向客户端发送HTML5文件 40 pipeline.addLast("http-chunked", new ChunkedWriteHandler()); 41 //实际处理的Handler 42 pipeline.addLast("handler", new WebSocketServerHandler()); 43 } 44 }); 45 Channel ch = b.bind(port).sync().channel(); 46 System.out.println("Web socket server started at port " + port + '.'); 47 System.out.println("Open your browser and navigate to http://localhost:" + port + '/'); 48 ch.closeFuture().sync(); 49 }finally { 50 bossGroup.shutdownGracefully(); 51 workerGroup.shutdownGracefully(); 52 } 53 } 54 55 public static void main(String[] args) throws Exception { 56 int port = 8080; 57 new WebSocketServer().run(port); 58 } 59 }
2、主要是处理我们系统逻辑的Handler;
1 package websocket; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.*; 6 import io.netty.handler.codec.http.*; 7 import io.netty.handler.codec.http.websocketx.*; 8 import io.netty.util.CharsetUtil; 9 10 import java.util.Date; 11 import java.util.logging.Level; 12 import java.util.logging.Logger; 13 14 import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive; 15 import static io.netty.handler.codec.http.HttpHeaders.setContentLength; 16 17 /** 18 * Created by lz on 2016/8/12. 19 */ 20 public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> { 21 private static final Logger logger = Logger.getLogger(WebSocketServerHandler.class.getName()); 22 23 private WebSocketServerHandshaker handshaker; 24 @Override 25 protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { 26 //传统的HTTP接入 27 if (msg instanceof FullHttpRequest){ 28 handleHttpRequest(ctx,(FullHttpRequest)msg); 29 }//WebSocket接入 30 else if (msg instanceof WebSocketFrame){ 31 handleWebSocketFrame(ctx,(WebSocketFrame)msg); 32 } 33 } 34 35 private void handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) { 36 //判断是否关闭链路指令 37 if (frame instanceof CloseWebSocketFrame){ 38 handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); 39 return; 40 } 41 //判断是否是Ping消息 42 if (frame instanceof PingWebSocketFrame){ 43 ctx.channel().write(new PongWebSocketFrame(frame.content()).retain()); 44 return; 45 } 46 //本例程仅支持文本信息,不支持二进制消息 47 if (!(frame instanceof TextWebSocketFrame)){ 48 throw new UnsupportedOperationException(String.format("%s frame types not supported",frame.getClass().getName())); 49 } 50 //返回应答消息 51 String request = ((TextWebSocketFrame) frame).text(); 52 if (logger.isLoggable(Level.FINE)){ 53 logger.fine(String.format("%s received %s",ctx.channel(),request)); 54 } 55 ctx.channel().write(new TextWebSocketFrame(request+"欢迎使用Netty WebSocket服务,现在时刻:" + new Date().toString())); 56 } 57 private static void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,FullHttpResponse res){ 58 //返回应答给客户端 59 if (res.getStatus().code() != 200){ 60 ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8); 61 res.content().writeBytes(buf); 62 buf.release(); 63 setContentLength(res,res.content().readableBytes()); 64 } 65 //如果是非Keep-Alive,关闭连接 66 ChannelFuture f = ctx.channel().writeAndFlush(res); 67 if (!isKeepAlive(req) || res.getStatus().code() != 200){ 68 f.addListener(ChannelFutureListener.CLOSE); 69 } 70 } 71 72 73 private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception{ 74 //如果HTTP解码失败,返回HTTP异常 75 if (!req.getDecoderResult().isSuccess() 76 || (!"websocket".equals(req.headers().get("Upgrade")))){ 77 sendHttpResponse(ctx,req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); 78 return; 79 } 80 //构造握手响应返回,本机测试 81 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket",null,false); 82 handshaker = wsFactory.newHandshaker(req); 83 if (handshaker == null){ 84 WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel()); 85 }else { 86 //把握手消息返回给客户端 87 handshaker.handshake(ctx.channel(),req); 88 } 89 } 90 91 92 @Override 93 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 94 ctx.flush(); 95 } 96 97 @Override 98 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 99 cause.printStackTrace(); 100 ctx.close(); 101 } 102 }