在學完netty基礎部分后,你可能很難想到它的使用場景,本章就介紹一個netty的使用場景--websocket協議的應用。
WebSocket是一種在單個TCP連接上進行全雙工通信的協議。WebSocket使得客戶端和服務器之間的數據交換變得更加簡單,允許服務端主動向客戶端推送數據。在WebSocket API中,瀏覽器和服務器只需要完成一次握手,兩者之間就直接可以創建持久性的連接,並進行雙向數據傳輸。
其次websocke支持很多種數據傳輸,如:二進制流、文件、文本信息等等。
一、服務端模塊
1.引入maven與啟動類
本文是基於spring boot 2.0,netty4.0開發的,這里只展示netty的包
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.25.Final</version> </dependency>
因為是服務端,所以需要在spring boot自帶的啟動類中同時啟動netty服務
/** * 聲明CommandLineRunner接口,實現run方法,就能給啟動項目同時啟動netty服務 */ @SpringBootApplication public class ThemApplication implements CommandLineRunner { /** * netty服務 */ @Autowired ServerByNetty serverByNetty; public static void main(String[] args) { SpringApplication.run(ThemApplication.class, args); } @Override public void run(String... args) throws Exception { serverByNetty.startServer(); } }
2.netty服務端類
/** * 基於websocket的服務端代碼 */ @Configuration public class ServerByNetty { /** * 服務端啟動類 * @throws Exception */ public void startServer() throws Exception { // netty基本操作,兩個線程組 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup wokerGroup = new NioEventLoopGroup(); try{ //netty的啟動類 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup,wokerGroup).channel(NioServerSocketChannel.class) //記錄日志的handler,netty自帶的 .handler(new LoggingHandler(LogLevel.INFO)) .option(ChannelOption.SO_KEEPALIVE,true) .option(ChannelOption.SO_BACKLOG,1024*1024*10) //設置handler .childHandler(new ChannelInitializer< SocketChannel >(){ @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); //websocket協議本身是基於Http協議的,所以需要Http解碼器 pipeline.addLast("http-codec",new HttpServerCodec()); //以塊的方式來寫的處理器 pipeline.addLast("http-chunked",new ChunkedWriteHandler()); //netty是基於分段請求的,HttpObjectAggregator的作用是將請求分段再聚合,參數是聚合字節的最大長度 pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*1024)); //這個是websocket的handler,是netty提供的,也可以自定義,建議就用默認的 pipeline.addLast(new WebSocketServerProtocolHandler("/hello",null,true,65535)); //自定義的handler,處理服務端傳來的消息 pipeline.addLast(new WebSocketHandle()); } }); ChannelFuture channelFuture = serverBootstrap.bind(new InetSocketAddress(8899)).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); wokerGroup.shutdownGracefully(); } } }
在服務端類中需要注意的是使用的handler,一共有5個,前面三個都是HTTP的編解碼器,WebSocketServerProtocolHandler則是websocket的handler,這個的作用主要是用來解決HTTP握手等問題。雖然可以自己實現,但是推薦采用這個默認的handler,它能夠解決很多未知的問題。
3.自定義的業務處理handler
這里最主要的地方就是消息推送,其實只要你把IP存起來,發送消息就會非常簡單。
/** * 自定義的handler類 */ @Configuration public class WebSocketHandle extends SimpleChannelInboundHandler<Object> { //客戶端組 public static ChannelGroup channelGroup; static { channelGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); } //存儲ip和channel的容器 private static ConcurrentMap<String, Channel> channelMap = new ConcurrentHashMap<>(); /** * Handler活躍狀態,表示連接成功 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("與客戶端連接成功"); channelGroup.add(ctx.channel()); } /** * * @param ctx * @param msg * @throws Exception */ protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { //文本消息 if (msg instanceof TextWebSocketFrame) { //第一次連接成功后,給客戶端發送消息 sendMessageAll(); //獲取當前channel綁定的IP地址 InetSocketAddress ipSocket = (InetSocketAddress)ctx.channel().remoteAddress(); String address = ipSocket.getAddress().getHostAddress(); System.out.println("address為:"+address); //將IP和channel的關系保存 if (!channelMap.containsKey(address)){ channelMap.put(address,ctx.channel()); } } //二進制消息 if (msg instanceof BinaryWebSocketFrame) { System.out.println("收到二進制消息:" + ((BinaryWebSocketFrame) msg).content().readableBytes()); BinaryWebSocketFrame binaryWebSocketFrame = new BinaryWebSocketFrame(Unpooled.buffer().writeBytes("hello".getBytes())); //給客戶端發送的消息 ctx.channel().writeAndFlush(binaryWebSocketFrame); } //ping消息 if (msg instanceof PongWebSocketFrame) { System.out.println("客戶端ping成功"); } //關閉消息 if (msg instanceof CloseWebSocketFrame) { System.out.println("客戶端關閉,通道關閉"); Channel channel = ctx.channel(); channel.close(); } } /** * 未注冊狀態 * * @param ctx * @throws Exception */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { System.out.println("等待連接"); } /** * 非活躍狀態,沒有連接遠程主機的時候。 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客戶端關閉"); channelGroup.remove(ctx.channel()); } /** * 異常處理 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("連接異常:"+cause.getMessage()); ctx.close(); } /** * 給指定用戶發內容 * 后續可以掉這個方法推送消息給客戶端 */ public void sendMessage(String address){ Channel channel=channelMap.get(address); String message="你好,這是指定消息發送"; channel.writeAndFlush(new TextWebSocketFrame(message)); } /** * 群發消息 */ public void sendMessageAll(){ String meesage="這是群發信息"; channelGroup.writeAndFlush(new TextWebSocketFrame(meesage)); } }
因為我們采用了websocket自帶的handler,所以不需要我自己再去解決HTTP握手的問題,我們只需要對客戶端發送過來的數據進行轉換和業務處理。
至此,服務端的代碼就已經完成了。
二、客戶端模塊
客戶端模塊中,就可以有多種實現了。可以采用JS實現網頁版的聊天工具,也可以在安卓端實現客戶端。這里使用的java實現一個客戶端。
1.客戶端類
/** * 基於websocket的netty客戶端 * */ public class ClientByNetty { public static void main(String[] args) throws Exception { //netty基本操作,線程組 EventLoopGroup group = new NioEventLoopGroup(); //netty基本操作,啟動類 Bootstrap boot = new Bootstrap(); boot.option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .group(group) .handler(new LoggingHandler(LogLevel.INFO)) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); pipeline.addLast("http-codec",new HttpClientCodec()); pipeline.addLast("aggregator",new HttpObjectAggregator(1024*1024*10)); pipeline.addLast("hookedHandler", new WebSocketClientHandler()); } }); //websocke連接的地址,/hello是因為在服務端的websockethandler設置的 URI websocketURI = new URI("ws://localhost:8899/hello"); HttpHeaders httpHeaders = new DefaultHttpHeaders(); //進行握手 WebSocketClientHandshaker handshaker = WebSocketClientHandshakerFactory.newHandshaker(websocketURI, WebSocketVersion.V13, (String) null, true, httpHeaders); //客戶端與服務端連接的通道,final修飾表示只會有一個 final Channel channel = boot.connect(websocketURI.getHost(), websocketURI.getPort()).sync().channel(); WebSocketClientHandler handler = (WebSocketClientHandler) channel.pipeline().get("hookedHandler"); handler.setHandshaker(handshaker); handshaker.handshake(channel); //阻塞等待是否握手成功 handler.handshakeFuture().sync(); System.out.println("握手成功"); //給服務端發送的內容,如果客戶端與服務端連接成功后,可以多次掉用這個方法發送消息 sengMessage(channel); } public static void sengMessage(Channel channel){ //發送的內容,是一個文本格式的內容 String putMessage="你好,我是客戶端"; TextWebSocketFrame frame = new TextWebSocketFrame(putMessage); channel.writeAndFlush(frame).addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()) { System.out.println("消息發送成功,發送的消息是:"+putMessage); } else { System.out.println("消息發送失敗 " + channelFuture.cause().getMessage()); } } }); } }
客戶端代碼中需要注意很多地方:
(1) 客戶端的http編解碼器是HttpClientCodec與服務端是不一樣的,服務端是HttpServerCodec
(2) URI中的地址用的websocket的協議 ws:,而/hello則是服務端設置的通道地址,類似於HTTP的接口地址
(3) 當客戶端與服務端連接成功后,就可以通過調用sengMessage方法給服務端發送消息,只要這個連接沒有斷開就能夠一直發
(4) 調用發送消息的方法,一定要等待握手成功后發送
2.客戶端的業務handler
public class WebSocketClientHandler extends SimpleChannelInboundHandler<Object> { //握手的狀態信息 WebSocketClientHandshaker handshaker; //netty自帶的異步處理 ChannelPromise handshakeFuture; @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("當前握手的狀態"+this.handshaker.isHandshakeComplete()); Channel ch = ctx.channel(); FullHttpResponse response; //進行握手操作 if (!this.handshaker.isHandshakeComplete()) { try { response = (FullHttpResponse)msg; //握手協議返回,設置結束握手 this.handshaker.finishHandshake(ch, response); //設置成功 this.handshakeFuture.setSuccess(); System.out.println("服務端的消息"+response.headers()); } catch (WebSocketHandshakeException var7) { FullHttpResponse res = (FullHttpResponse)msg; String errorMsg = String.format("握手失敗,status:%s,reason:%s", res.status(), res.content().toString(CharsetUtil.UTF_8)); this.handshakeFuture.setFailure(new Exception(errorMsg)); } } else if (msg instanceof FullHttpResponse) { response = (FullHttpResponse)msg; throw new IllegalStateException("Unexpected FullHttpResponse (getStatus=" + response.status() + ", content=" + response.content().toString(CharsetUtil.UTF_8) + ')'); } else { //接收服務端的消息 WebSocketFrame frame = (WebSocketFrame)msg; //文本信息 if (frame instanceof TextWebSocketFrame) { TextWebSocketFrame textFrame = (TextWebSocketFrame)frame; System.out.println("客戶端接收的消息是:"+textFrame.text()); } //二進制信息 if (frame instanceof BinaryWebSocketFrame) { BinaryWebSocketFrame binFrame = (BinaryWebSocketFrame)frame; System.out.println("BinaryWebSocketFrame"); } //ping信息 if (frame instanceof PongWebSocketFrame) { System.out.println("WebSocket Client received pong"); } //關閉消息 if (frame instanceof CloseWebSocketFrame) { System.out.println("receive close frame"); ch.close(); } } } /** * Handler活躍狀態,表示連接成功 * * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("與服務端連接成功"); } /** * 非活躍狀態,沒有連接遠程主機的時候。 * * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("主機關閉"); } /** * 異常處理 * @param ctx * @param cause * @throws Exception */ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("連接異常:"+cause.getMessage()); ctx.close(); } public void handlerAdded(ChannelHandlerContext ctx) { this.handshakeFuture = ctx.newPromise(); } public WebSocketClientHandshaker getHandshaker() { return handshaker; } public void setHandshaker(WebSocketClientHandshaker handshaker) { this.handshaker = handshaker; } public ChannelPromise getHandshakeFuture() { return handshakeFuture; } public void setHandshakeFuture(ChannelPromise handshakeFuture) { this.handshakeFuture = handshakeFuture; } public ChannelFuture handshakeFuture() { return this.handshakeFuture; } }
在handler中,我們可以驗證握手是否成功,就使用handshaker.isHandshakeComplete()的方法,如果false就表示握手失敗。如果是握手失敗客戶端就無法接收服務端的消息,所以如果當你要驗證消息是否成功到達客戶端的時候,可以采用這個方法。
運行結果,先運行服務端,再運行客戶端:
服務端界面
客戶端界面: