服務端
package org.zln.netty.five.timer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 時間服務器服務端 * Created by sherry on 16/11/5. */ public class TimerServer { /** * 服務端綁定端口號 */ private int PORT; public TimerServer(int PORT) { this.PORT = PORT; } /** * 日志 */ private static Logger logger = LoggerFactory.getLogger(TimerServer.class); public void bind() { /* NioEventLoopGroup是線程池組 包含了一組NIO線程,專門用於網絡事件的處理 bossGroup:服務端,接收客戶端連接 workGroup:進行SocketChannel的網絡讀寫 */ EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workGroup = new NioEventLoopGroup(); try { /* ServerBootstrap:用於啟動NIO服務的輔助類,目的是降低服務端的開發復雜度 */ ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024)//配置TCP參數,能夠設置很多,這里就只設置了backlog=1024, .childHandler(new TimerServerInitializer());//綁定I/O事件處理類 logger.debug("綁定端口號:" + PORT + ",等待同步成功"); /* bind:綁定端口 sync:同步阻塞方法,等待綁定完成,完成后返回 ChannelFuture ,主要用於通知回調 */ ChannelFuture channelFuture = serverBootstrap.bind(PORT).sync(); logger.debug("等待服務端監聽窗口關閉"); /* closeFuture().sync():為了阻塞,服務端鏈路關閉后才退出.也是一個同步阻塞方法 */ channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } finally { logger.debug("優雅退出,釋放線程池資源"); bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); } } }
1 package org.zln.netty.five.timer; 2 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelPipeline; 5 import io.netty.channel.socket.SocketChannel; 6 import io.netty.handler.codec.LineBasedFrameDecoder; 7 import io.netty.handler.codec.string.StringDecoder; 8 9 /** 10 * Created by sherry on 16/11/5. 11 */ 12 public class TimerServerInitializer extends ChannelInitializer<SocketChannel> { 13 @Override 14 protected void initChannel(SocketChannel socketChannel) throws Exception { 15 16 ChannelPipeline pipeline = socketChannel.pipeline(); 17 18 pipeline.addLast(new LineBasedFrameDecoder(1024)); 19 pipeline.addLast(new StringDecoder()); 20 pipeline.addLast(new TimerServerHandler()); 21 22 23 24 } 25 }
1 package org.zln.netty.five.timer; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 9 10 import java.text.SimpleDateFormat; 11 import java.util.Date; 12 13 /** 14 * Handler主要用於對網絡事件進行讀寫操作,是真正的業務類 15 * 通常只需要關注 channelRead 和 exceptionCaught 方法 16 * Created by sherry on 16/11/5. 17 */ 18 public class TimerServerHandler extends ChannelHandlerAdapter { 19 20 /** 21 * 日志 22 */ 23 private Logger logger = LoggerFactory.getLogger(TimerServerHandler.class); 24 25 private static int count = 0; 26 27 @Override 28 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 29 30 String body = (String) msg; 31 logger.debug("第 "+(++count)+" 次收到請求 - "+body); 32 33 String timeNow = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss SSS").format(new Date())+System.lineSeparator(); 34 35 //獲取發送給客戶端的數據 36 ByteBuf resBuf = Unpooled.copiedBuffer(timeNow.getBytes("UTF-8")); 37 38 ctx.writeAndFlush(resBuf); 39 } 40 41 42 @Override 43 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { 44 //將消息發送隊列中的消息寫入到SocketChannel中發送給對方 45 logger.debug("channelReadComplete"); 46 ctx.flush(); 47 } 48 49 @Override 50 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 51 //發生異常時,關閉 ChannelHandlerContext,釋放ChannelHandlerContext 相關的句柄等資源 52 logger.error(cause.getMessage(),cause); 53 ctx.close(); 54 } 55 }
客戶端
1 package org.zln.netty.five.timer; 2 3 import io.netty.bootstrap.Bootstrap; 4 import io.netty.channel.ChannelFuture; 5 import io.netty.channel.ChannelOption; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.nio.NioSocketChannel; 9 import org.slf4j.Logger; 10 import org.slf4j.LoggerFactory; 11 12 /** 13 * 時間服務器客戶端 14 * Created by sherry on 16/11/5. 15 */ 16 public class TimerClient { 17 /** 18 * 日志 19 */ 20 private Logger logger = LoggerFactory.getLogger(TimerServer.class); 21 22 private String HOST; 23 private int PORT; 24 25 public TimerClient(String HOST, int PORT) { 26 this.HOST = HOST; 27 this.PORT = PORT; 28 } 29 30 public void connect(){ 31 //配置客戶端NIO線程組 32 EventLoopGroup eventLoopGroup = new NioEventLoopGroup(); 33 try { 34 Bootstrap bootstrap = new Bootstrap(); 35 bootstrap.group(eventLoopGroup) 36 .channel(NioSocketChannel.class) 37 .option(ChannelOption.TCP_NODELAY,true) 38 .handler(new TimerClientInitializer()); 39 //發起異步連接操作 40 logger.debug("發起異步連接操作 - start"); 41 ChannelFuture channelFuture = bootstrap.connect(HOST,PORT).sync(); 42 logger.debug("發起異步連接操作 - end"); 43 //等待客戶端鏈路關閉 44 logger.debug("等待客戶端鏈路關閉 - start"); 45 channelFuture.channel().closeFuture().sync(); 46 logger.debug("等待客戶端鏈路關閉 - end"); 47 } catch (InterruptedException e) { 48 logger.error(e.getMessage(),e); 49 }finally { 50 //優雅的關閉 51 eventLoopGroup.shutdownGracefully(); 52 } 53 } 54 }
1 package org.zln.netty.five.timer; 2 3 import io.netty.channel.ChannelInitializer; 4 import io.netty.channel.ChannelPipeline; 5 import io.netty.channel.socket.SocketChannel; 6 import io.netty.handler.codec.LineBasedFrameDecoder; 7 import io.netty.handler.codec.string.StringDecoder; 8 9 /** 10 * Created by sherry on 16/11/5. 11 */ 12 public class TimerClientInitializer extends ChannelInitializer<SocketChannel> { 13 @Override 14 protected void initChannel(SocketChannel socketChannel) throws Exception { 15 ChannelPipeline pipeline = socketChannel.pipeline(); 16 pipeline.addLast(new LineBasedFrameDecoder(1024)); 17 pipeline.addLast(new StringDecoder()); 18 pipeline.addLast(new TimerClientHandler()); 19 } 20 }
1 package org.zln.netty.five.timer; 2 3 import io.netty.buffer.ByteBuf; 4 import io.netty.buffer.Unpooled; 5 import io.netty.channel.ChannelHandlerAdapter; 6 import io.netty.channel.ChannelHandlerContext; 7 import org.slf4j.Logger; 8 import org.slf4j.LoggerFactory; 9 10 import java.io.UnsupportedEncodingException; 11 12 /** 13 * Created by sherry on 16/11/5. 14 */ 15 public class TimerClientHandler extends ChannelHandlerAdapter { 16 17 /** 18 * 日志 19 */ 20 private Logger logger = LoggerFactory.getLogger(TimerClientHandler.class); 21 22 private static int count = 0; 23 24 @Override 25 public void channelActive(ChannelHandlerContext ctx) throws Exception { 26 logger.debug("客戶端連接上了服務端"); 27 28 //發送請求 29 ByteBuf reqBuf = null; 30 for (int i = 0; i < 100; i++) { 31 reqBuf = getReq("GET TIME"+System.lineSeparator()); 32 ctx.writeAndFlush(reqBuf); 33 } 34 35 36 } 37 38 /** 39 * 將字符串包裝成ByteBuf 40 * @param s 41 * @return 42 */ 43 private ByteBuf getReq(String s) throws UnsupportedEncodingException { 44 byte[] data = s.getBytes("UTF-8"); 45 ByteBuf reqBuf = Unpooled.buffer(data.length); 46 reqBuf.writeBytes(data); 47 return reqBuf; 48 } 49 50 @Override 51 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { 52 String body = (String) msg; 53 logger.debug("這是收到的第 "+(++count)+" 筆響應 -- "+body); 54 } 55 56 @Override 57 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { 58 ctx.close(); 59 } 60 }
這里主要使用 LineBasedFrameDecoder 和 StringDecoder 來實現解決粘包問題
原理如下:
LineBasedFrameDecoder 依次遍歷 ByteBuf 中的可讀字節,判斷是否有 \n 或 \r\n,如果有,就作為結束位置。從可讀索引到結束位置區間的字節組成一行。它是以換行符為結束標志的解碼器。支持攜帶結束符或者不懈怠結束符兩種解碼方式。同時支持配置單行的最大長度。如果讀取到了最大長度仍舊沒有發現換行符,就會拋出異常,同時忽略掉之前讀到的數據。
StringDecoder 的作用就是講接收到的對象轉化成字符串,然后繼續調用handler。這樣就不需要再handler中手動將對象轉化成字符串了,直接強制轉化就行。
LineBasedFrameDecoder+StringDecoder組合就是按行切割的文本解碼器,用來解決TCP的粘包和拆包問題。
