本文参考《Netty权威指南》
├── WebSocketServerHandler.java
├── WebSocketServer.java
└── wsclient.html
package com.xh.netty.test11; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.stream.ChunkedWriteHandler; /** * Created by root on 1/8/18. */ public class WebSocketServer { public void run(int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("http-codec", new HttpServerCodec()); pipeline.addLast("aggregator", new HttpObjectAggregator(65536)); pipeline.addLast("http-chunked", new ChunkedWriteHandler()); pipeline.addLast("handler", new WebSocketServerHandler()); } }); Channel ch = b.bind(port).sync().channel(); System.out.println("websocketserver start port at " + port); ch.closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) { int port = 8080; if (args.length > 0) { port = Integer.valueOf(args[0]); } new WebSocketServer().run(port); } }
package com.xh.netty.test11; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.CharsetUtil; import java.util.Date; import java.util.logging.Logger; import static io.netty.handler.codec.http.HttpHeaderUtil.isKeepAlive; import static io.netty.handler.codec.http.HttpHeaderUtil.setContentLength; /** * Created by root on 1/8/18. */ public class WebSocketServerHandler extends SimpleChannelInboundHandler { private WebSocketServerHandshaker handshaker; @Override protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg.toString()); //http if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) {//websocket handleWebsocketFrame(ctx, (WebSocketFrame) msg); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } private void handleWebsocketFrame(ChannelHandlerContext ctx, WebSocketFrame msg) { //关闭链路指令 if (msg instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) msg.retain()); return; } //PING 消息 if (msg instanceof PingWebSocketFrame) { ctx.write(new PongWebSocketFrame(msg.content().retain())); return; } //非文本 if (!(msg instanceof TextWebSocketFrame)) { throw new UnsupportedOperationException(String.format("%s frame type not support", msg.getClass().getName())); } //应答消息 String requset = ((TextWebSocketFrame) msg).text(); ctx.channel().write(new TextWebSocketFrame(requset + " >>>>Now is " + new Date().toString())); } private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest msg) { //HTTP 请异常 if (!msg.decoderResult().isSuccess() || !"websocket".equals(msg.headers().get("Upgrade"))) { System.out.println(msg.decoderResult().isSuccess()); System.out.println(msg.headers().get("Upgrade")); sendHttpResponse(ctx, msg, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } //握手 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("ws://localhost:8080/websocket", null, false); handshaker = wsFactory.newHandshaker(msg); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), msg); } } private void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest msg, FullHttpResponse resp) { //响应 if (resp.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(resp.status().toString(), CharsetUtil.UTF_8); resp.content().writeBytes(buf); buf.release(); setContentLength(resp, resp.content().readableBytes()); } //非Keep-Alive,关闭链接 ChannelFuture future = ctx.channel().writeAndFlush(resp); if (!isKeepAlive(resp) || resp.status().code() != 200) { future.addListener(ChannelFutureListener.CLOSE); } } }
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> </head> <body> <form onsubmit="return false;"> <input type="text" name="msg" value="NETTY"> <button onclick="send(this.form.msg.value)">send</button> <br> <textarea id="resText"> </textarea> </form> </body> <script> var socket; if (!window.WebSocket) { window.WebSocket = window.MozWebSocket; } if (window.WebSocket) { socket = new WebSocket("ws://127.0.0.1:8080/websocket"); socket.onmessage = function (event) { var ta = document.getElementById("resText"); ta.value = ""; ta.value = event.data; }; socket.onopen = function (event) { alert("浏览器支持WebSocket"); var ta = document.getElementById("resText"); ta.value = ""; ta.value = "浏览器支持WebSocket"; }; socket.onclose = function (event) { var ta = document.getElementById("resText"); ta.value = ""; ta.value = "关闭WebSocket"; } } else { alert("浏览器不支持WebSocket"); } function send(msg) { if (!window.WebSocket) { return; } if (socket.readyState == WebSocket.OPEN) { socket.send(msg); } else { alert("建立连接失败") } } </script> </html>