請求報文:前四位(指定報文長度)+報文內容
示例:0010aaooerudyh
1.1、NettyServer類 :啟動TCP服務

package com.bokeyuan.socket.server; import com.bokeyuan.socket.BeanUtil; import com.bokeyuan.socket.config.ConfigConstant; import com.bokeyuan.socket.server.codec.MyByteToMessageCodec; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; import java.nio.charset.Charset; /** * @author: chenly * @date: 2021-07-05 10:42 * @description: * @version: 1.0 */ @Component @Slf4j public class NettyServer extends Thread{ @Override public void run() { startServer(); } private void startServer(){ EventLoopGroup bossGroup = null; EventLoopGroup workGroup = null; ServerBootstrap serverBootstrap = null; ChannelFuture future = null; try { //初始化線程組 bossGroup= new NioEventLoopGroup(); workGroup= new NioEventLoopGroup(); //初始化服務端配置 serverBootstrap= new ServerBootstrap(); //綁定線程組 serverBootstrap.group(bossGroup,workGroup) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(BeanUtil.getBean(MyByteToMessageCodec.class)); ch.pipeline().addLast(BeanUtil.getBean(MtReadTimeoutHandler.class)); ch.pipeline().addLast(new StringDecoder(Charset.forName(ConfigConstant.MsgCode))); ch.pipeline().addLast(new StringEncoder(Charset.forName(ConfigConstant.MsgCode))); ch.pipeline().addLast(BeanUtil.getBean(NettyServerHandler.class)); } }).option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true); future = serverBootstrap.bind(new InetSocketAddress(8090)).sync(); log.info(" *************TCP服務端啟動成功 Port:{}*********** " , 8090); } catch (Exception e) { log.error("TCP服務端啟動異常",e); } finally { if(future!=null){ try { future.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("channel關閉異常:",e); } } if(bossGroup!=null){ //線程組資源回收 bossGroup.shutdownGracefully(); } if(workGroup!=null){ //線程組資源回收 workGroup.shutdownGracefully(); } } } }
1.2、NettyServerHandler類 繼承ChannelInboundHandlerAdapter 類

package com.bokeyuan.socket.server; import com.bokeyuan.socket.MsgHandler; import com.bokeyuan.socket.StringUtil; import com.bokeyuan.socket.config.ConfigConstant; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.net.InetSocketAddress; /** * @author: void * @date: 2021-07-05 10:43 * @description: * @version: 1.0 */ @Component @Scope("prototype") @Slf4j public class NettyServerHandler extends ChannelInboundHandlerAdapter { @Autowired private MsgHandler msgHandler; /** * * 業務數據處理 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { int len = 4; String response = ""; String recieved = ""; try{ recieved = (String) msg; log.info("服務端接收到的內容為:{}",msg); //小於4位不處理 if(recieved.length()< len){ log.info("報文小於4位"); ctx.close(); return; } String length = recieved.substring(0,4); //非數字不處理 if(!StringUtil.isNumeric(length)){ log.info("報文前四位不是數字"); ctx.close(); return; } //報文長度小於前四位指定的長度 不處理 System.out.println(recieved.getBytes(ConfigConstant.MsgCode).length); if(recieved.getBytes(ConfigConstant.MsgCode).length-len < Integer.parseInt(length)){ log.info("報文長度不夠"); ctx.close(); return; } //去掉前四位表示長度的內容,截取指定長度的內容 System.out.println(recieved); byte[] msgbytes = recieved.getBytes(ConfigConstant.MsgCode); //截取報文前四位長度的報文 byte[] tempMsg = new byte[Integer.parseInt(length)]; System.arraycopy(msgbytes, 4, tempMsg, 0, tempMsg.length); response =msgHandler.handler(new String(tempMsg, ConfigConstant.MsgCode)); log.info("服務端響應的的內容為:{}",response); }catch (Exception e) { log.error("報文解析異常,報文內容為:"+msg, e); }finally { ctx.writeAndFlush(response.getBytes(ConfigConstant.MsgCode)).addListener(ChannelFutureListener.CLOSE); } } /** *從客戶端收到新的數據、讀取完成---調用 * @param ctx * @throws Exception */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { log.info("從客戶端收到新的數據讀取完成********"); if(ctx!=null){ ctx.flush(); } } /** * 客戶端與服務端建立連接--執行 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); ctx.channel().read(); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); //此處不能使用ctx.close(),否則客戶端始終無法與服務端建立連接 log.info("客戶端與服務端建立連接:{}", clientIp); } /** * 客戶端與服務端斷連-調用 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { super.channelInactive(ctx); InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); if(ctx!=null){ //斷開連接時,服務端關閉連接,避免造成資源浪費 ctx.close(); } log.info("客戶端與服務端斷連:{}", clientIp); } /** * 當 Netty 由於 IO 錯誤或者處理器在處理事件時拋出的異常 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if(ctx!=null){ //拋出異常,斷開與客戶端的連接 ctx.close(); log.error("連接異常,服務端主動斷開連接{}",cause); } } /** * 服務端read超時-調用 * @param ctx * @param evt * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIp = insocket.getAddress().getHostAddress(); if(ctx!=null){ //超時時斷開連接 ctx.close(); } log.error("服務端read超時:{}", clientIp); } private String getClientIp(ChannelHandlerContext ctx){ InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress(); String clientIP = insocket.getAddress().getHostAddress(); return clientIP; } }
1.3、MsgHandler類 :業務處理

package com.bokeyuan.socket; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.Arrays; import java.util.List; /** * @author: void * @date: 2021-07-05 12:37 * @description: * @version: 1.0 */ @Component @Scope("prototype") @Slf4j public class MsgHandler { /** * 業務出路 * @param msg 請求報文 * @return */ public String handler(String msg){ log.info("收到的報文內容為"+msg); //業務處理 //..... return "success"; } }
1.4、MyByteToMessageCodec類 處理拆包粘包 繼承ByteToMessageCodec類

package com.bokeyuan.socket.server.codec; import com.bokeyuan.socket.config.ConfigConstant; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageCodec; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.List; @Slf4j @Component @Scope("prototype") public class MyByteToMessageCodec extends ByteToMessageCodec<Object> { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception { int msgLength = 4; if(buf.readableBytes() < msgLength) {// 不足長度4(開始4位代表整個報文長度位),無法獲取長度 return; } //記下readerIndex buf.markReaderIndex(); //獲取前4位表示報文長度的字符串 String lenstr = getMsgLength(buf,msgLength); //轉為整型 int length = toInt(lenstr); if(length <= 0) { ctx.close(); return; } //判斷報文長度是否到達報文前四位指定的長度 if(buf.readableBytes() < length) { //重置到上一次調用markReaderIndex()的readerIndex位置 buf.resetReaderIndex(); return; } //報文內容(不包含前四位,前四位在前面已經被讀取) byte[] data = getBytes(buf); String content = new String(data, ConfigConstant.MsgCode); //msg=報文前四位長度+報文內容 String msg = lenstr+content; out.add(msg); ctx.writeAndFlush(out); } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { byte[] body = (byte[]) msg; out.writeBytes(body); } /** * 讀取ByteBuf字節內容 * @param buf * @return */ private byte[] getBytes(ByteBuf buf){ int readablebytes = buf.readableBytes(); ByteBuf tempBuf = buf.readBytes(readablebytes); byte[] data = new byte[readablebytes];//數據大小 tempBuf.getBytes(0, data); return data; } /** * 讀取ByteBuf字節內容 * @param buf * @return */ private byte[] getBytes2(ByteBuf buf){ byte[] data = new byte[buf.readableBytes()];//數據大小 buf.getBytes(0, data); return data; } /** * 讀取ByteBuf中指定長度內容 * @param buf ByteBuf * @param len 讀取字節長度 * @return */ private String getMsgLength(ByteBuf buf,int len){ byte[] bytes = new byte[len]; ByteBuf lengBuf = buf.readBytes(len); lengBuf.getBytes(0, bytes); return new String(bytes); } private int toInt(String lenstr){ try { return Integer.parseInt(lenstr); } catch (NumberFormatException e) { log.info("報文前四位:{}不是有效數字",lenstr); return 0; } } }
1.5、MyReadTimeoutHandler類 處理客戶端長時間未發數據給服務端情況 繼承ReadTimeoutHandler類

package com.bokeyuan.socket.server; import com.bokeyuan.socket.config.ConfigConstant; import io.netty.handler.timeout.ReadTimeoutHandler; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Scope; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; @Slf4j @Component @Scope("prototype") public class MyReadTimeoutHandler extends ReadTimeoutHandler { public MyReadTimeoutHandler() { //客戶端長時間沒有發送數據給服務端,socket服務端主動斷開 super(ConfigConstant.READ_TIMEOUT,TimeUnit.SECONDS); } }
1.6、BeanUtil類

package com.bokeyuan.socket; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; /** * @description: * @author: void * @create: 2019-11-07 **/ @Component public class BeanUtil implements ApplicationContextAware { private static ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { if (BeanUtil.applicationContext == null) { BeanUtil.applicationContext = applicationContext; } } /** * Get Bean by clazz. * * @param clazz Class * @param <T> class type * @return Bean instance */ public static <T> T getBean(Class<T> clazz) { if (applicationContext == null) { return null; } return applicationContext.getBean(clazz); } @SuppressWarnings("unchecked") public static <T> T getBean(String beanId) { if (applicationContext == null) { return null; } return (T) applicationContext.getBean(beanId); } }
1.7、常量類

package com.bokeyuan.socket.config; /** * @author: void * @date: 2021-09-03 14:06 * @description: * @version: 1.0 */ public class ConfigConstant { public static String MsgCode = "GBK"; /**服務端讀取超時時間*/ public static long READ_TIMEOUT; }
1.8、啟動類

package com; import com.bokeyuan.socket.server.NettyServer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author : void * @version : 2.0 * @date : 2020/6/11 10:08 */ @SpringBootApplication public class Application implements CommandLineRunner { @Autowired private NettyServer nettyServer; @Override public void run(String... args) throws Exception { nettyServer.start(); } public static void main(String[] args) { SpringApplication.run(Application.class,args); } }
出現過的問題及解決方法
1、MyByteToMessageCodec is not a @Sharable handler, so can't be added or removed multiple times.
解決方法
在MyByteToMessageCodec類上添加@Scope("prototype")設置為多例模式
2、報錯 java.lang.UnsupportedOperationException: direct buffer
使用以下代碼讀取Bytebuf報了下圖錯誤
這個寫法jdk1.6支持,jdk1.8不支持
改用如下方法讀取ByteBuf字節
3、ByteToMessageDecoder中的decode方法執行多次的問題
參考地址:https://blog.csdn.net/u011412234/article/details/54929360?locationNum=1&fps=1
解決方法:
因為使用了錯誤的方法(下圖getBytes2(ByteBuf))讀取ByteBuf中的字節,導致readerIndex沒有變化,一直沒有讀完,所有decode方法一直調用。
改為下圖getBytes(ByteBuf)方法讀取