Netty實現Tcp服務端


 

請求報文:前四位(指定報文長度)+報文內容

示例:0010aaooerudyh

1.1NettyServer:啟動TCP服務

package com.bokeyuan.socket.server;

import com.bokeyuan.socket.BeanUtil;
import com.bokeyuan.socket.config.ConfigConstant;
import com.bokeyuan.socket.server.codec.MyByteToMessageCodec;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;
import java.nio.charset.Charset;

/**
 * @author: chenly
 * @date: 2021-07-05 10:42
 * @description:
 * @version: 1.0
 */
@Component
@Slf4j
public class NettyServer extends Thread{


    @Override
    public void run() {
        startServer();
    }

    private void startServer(){
        EventLoopGroup bossGroup = null;
        EventLoopGroup workGroup = null;
        ServerBootstrap serverBootstrap = null;
        ChannelFuture future = null;
        try {
            //初始化線程組
            bossGroup= new NioEventLoopGroup();
            workGroup= new NioEventLoopGroup();
            //初始化服務端配置
            serverBootstrap= new ServerBootstrap();
            //綁定線程組
            serverBootstrap.group(bossGroup,workGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch)
                                throws Exception {
                            ch.pipeline().addLast(BeanUtil.getBean(MyByteToMessageCodec.class));
                            ch.pipeline().addLast(BeanUtil.getBean(MtReadTimeoutHandler.class));
                            ch.pipeline().addLast(new StringDecoder(Charset.forName(ConfigConstant.MsgCode)));
                            ch.pipeline().addLast(new StringEncoder(Charset.forName(ConfigConstant.MsgCode)));
                            ch.pipeline().addLast(BeanUtil.getBean(NettyServerHandler.class));

                        }
                    }).option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            future = serverBootstrap.bind(new InetSocketAddress(8090)).sync();
            log.info(" *************TCP服務端啟動成功 Port:{}*********** " , 8090);
        } catch (Exception e) {
            log.error("TCP服務端啟動異常",e);
        } finally {
            if(future!=null){
                try {
                    future.channel().closeFuture().sync();
                } catch (InterruptedException e) {
                    log.error("channel關閉異常:",e);
                }
            }
            if(bossGroup!=null){
                //線程組資源回收
                bossGroup.shutdownGracefully();
            }

            if(workGroup!=null){
                //線程組資源回收
                workGroup.shutdownGracefully();
            }


        }
    }

}
View Code

 

1.2NettyServerHandler 繼承ChannelInboundHandlerAdapter 類

package com.bokeyuan.socket.server;

import com.bokeyuan.socket.MsgHandler;
import com.bokeyuan.socket.StringUtil;
import com.bokeyuan.socket.config.ConfigConstant;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.net.InetSocketAddress;

/**
 * @author: void
 * @date: 2021-07-05 10:43
 * @description:
 * @version: 1.0
 */
@Component
@Scope("prototype")
@Slf4j
public class NettyServerHandler extends ChannelInboundHandlerAdapter {

    @Autowired
    private MsgHandler msgHandler;
    /**
     *
     * 業務數據處理
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        int len = 4;
        String response = "";
        String recieved = "";
        try{
            recieved  = (String) msg;
            log.info("服務端接收到的內容為:{}",msg);
            //小於4位不處理
            if(recieved.length()< len){
                log.info("報文小於4位");
                ctx.close();
                return;
            }
            String length = recieved.substring(0,4);
            //非數字不處理
            if(!StringUtil.isNumeric(length)){
                log.info("報文前四位不是數字");
                ctx.close();
                return;
            }
            //報文長度小於前四位指定的長度 不處理
            System.out.println(recieved.getBytes(ConfigConstant.MsgCode).length);
            if(recieved.getBytes(ConfigConstant.MsgCode).length-len < Integer.parseInt(length)){
                log.info("報文長度不夠");
                ctx.close();
                return;
            }
            //去掉前四位表示長度的內容,截取指定長度的內容
            System.out.println(recieved);
            byte[] msgbytes = recieved.getBytes(ConfigConstant.MsgCode);
            //截取報文前四位長度的報文
            byte[] tempMsg = new byte[Integer.parseInt(length)];
            System.arraycopy(msgbytes, 4, tempMsg, 0, tempMsg.length);
            response =msgHandler.handler(new String(tempMsg, ConfigConstant.MsgCode));
            log.info("服務端響應的的內容為:{}",response);
        }catch (Exception e) {
            log.error("報文解析異常,報文內容為:"+msg, e);
        }finally {
            ctx.writeAndFlush(response.getBytes(ConfigConstant.MsgCode)).addListener(ChannelFutureListener.CLOSE);
        }
    }

    /**
     *從客戶端收到新的數據、讀取完成---調用
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        log.info("從客戶端收到新的數據讀取完成********");
        if(ctx!=null){
            ctx.flush();
        }
    }

    /**
     * 客戶端與服務端建立連接--執行
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        super.channelActive(ctx);
        ctx.channel().read();
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        //此處不能使用ctx.close(),否則客戶端始終無法與服務端建立連接
        log.info("客戶端與服務端建立連接:{}", clientIp);
    }


    /**
     * 客戶端與服務端斷連-調用
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        super.channelInactive(ctx);
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        if(ctx!=null){
            //斷開連接時,服務端關閉連接,避免造成資源浪費
            ctx.close();
        }
        log.info("客戶端與服務端斷連:{}", clientIp);
    }


    /**
     * 當 Netty 由於 IO 錯誤或者處理器在處理事件時拋出的異常
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        if(ctx!=null){
            //拋出異常,斷開與客戶端的連接
            ctx.close();
            log.error("連接異常,服務端主動斷開連接{}",cause);
        }

    }

    /**
     * 服務端read超時-調用
     * @param ctx
     * @param evt
     * @throws Exception
     */
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {

        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIp = insocket.getAddress().getHostAddress();
        if(ctx!=null){
            //超時時斷開連接
            ctx.close();
        }
        log.error("服務端read超時:{}", clientIp);
    }



    private String  getClientIp(ChannelHandlerContext ctx){
        InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();
        String clientIP = insocket.getAddress().getHostAddress();
        return clientIP;
    }
}
View Code

 

1.3MsgHandler類 :業務處理

package com.bokeyuan.socket;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.Arrays;
import java.util.List;

/**
 * @author: void
 * @date: 2021-07-05 12:37
 * @description:
 * @version: 1.0
 */
@Component
@Scope("prototype")
@Slf4j
public class MsgHandler {


    /**
     * 業務出路
     * @param msg 請求報文
     * @return
     */
    public String handler(String msg){
        log.info("收到的報文內容為"+msg);
        //業務處理
        //.....
        return "success";

    }


}
View Code

 

1.4MyByteToMessageCodec類 處理拆包粘包 繼承ByteToMessageCodec類

package com.bokeyuan.socket.server.codec;
import com.bokeyuan.socket.config.ConfigConstant;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageCodec;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.List;

@Slf4j
@Component
@Scope("prototype")
public class MyByteToMessageCodec extends ByteToMessageCodec<Object>
{

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception
    {
        int msgLength = 4;
        if(buf.readableBytes() < msgLength)
        {// 不足長度4(開始4位代表整個報文長度位),無法獲取長度
            return;
        }
        //記下readerIndex
        buf.markReaderIndex();

        //獲取前4位表示報文長度的字符串
        String lenstr = getMsgLength(buf,msgLength);
        //轉為整型
        int length = toInt(lenstr);
        if(length <= 0)
        {
           ctx.close();
            return;
        }

        //判斷報文長度是否到達報文前四位指定的長度
        if(buf.readableBytes() < length)
        {
            //重置到上一次調用markReaderIndex()的readerIndex位置
            buf.resetReaderIndex();
            return;
        }

        //報文內容(不包含前四位,前四位在前面已經被讀取)
        byte[] data = getBytes(buf);
        String content = new String(data, ConfigConstant.MsgCode);

        //msg=報文前四位長度+報文內容
        String msg = lenstr+content;
        out.add(msg);
        ctx.writeAndFlush(out);
    }

    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception
    {
        byte[] body = (byte[]) msg;
        out.writeBytes(body);
    }


    /**
     * 讀取ByteBuf字節內容
     * @param buf
     * @return
     */
    private byte[] getBytes(ByteBuf buf){
        int readablebytes = buf.readableBytes();
        ByteBuf tempBuf = buf.readBytes(readablebytes);
        byte[] data = new byte[readablebytes];//數據大小
        tempBuf.getBytes(0, data);
        return data;
    }

    /**
     * 讀取ByteBuf字節內容
     * @param buf
     * @return
     */
    private byte[] getBytes2(ByteBuf buf){
        byte[] data = new byte[buf.readableBytes()];//數據大小
        buf.getBytes(0, data);
        return data;
    }

    /**
     * 讀取ByteBuf中指定長度內容
     * @param buf ByteBuf
     * @param len 讀取字節長度
     * @return
     */
    private String getMsgLength(ByteBuf buf,int len){
        byte[] bytes = new byte[len];
        ByteBuf lengBuf = buf.readBytes(len);
        lengBuf.getBytes(0, bytes);
        return  new String(bytes);
    }


    private int toInt(String lenstr){
        try {
            return Integer.parseInt(lenstr);
        } catch (NumberFormatException e) {
            log.info("報文前四位:{}不是有效數字",lenstr);
            return 0;
        }

    }

}
View Code

 

1.5MyReadTimeoutHandler 處理客戶端長時間未發數據給服務端情況 繼承ReadTimeoutHandler類

package com.bokeyuan.socket.server;

import com.bokeyuan.socket.config.ConfigConstant;
import io.netty.handler.timeout.ReadTimeoutHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Slf4j
@Component
@Scope("prototype")
public class MyReadTimeoutHandler extends ReadTimeoutHandler {

    public MyReadTimeoutHandler()
    {
        //客戶端長時間沒有發送數據給服務端,socket服務端主動斷開
        super(ConfigConstant.READ_TIMEOUT,TimeUnit.SECONDS);
    }

   
}
View Code

 

1.6、BeanUtil類

package com.bokeyuan.socket;

import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.stereotype.Component;

/**
 * @description:
 * @author: void
 * @create: 2019-11-07
 **/
@Component
public class BeanUtil implements ApplicationContextAware {

    private static ApplicationContext applicationContext;

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if (BeanUtil.applicationContext == null) {
            BeanUtil.applicationContext = applicationContext;
        }
    }

    /**
     * Get Bean by clazz.
     *
     * @param clazz Class
     * @param <T>   class type
     * @return Bean instance
     */
    public static <T> T getBean(Class<T> clazz) {
        if (applicationContext == null) {
            return null;
        }
        return applicationContext.getBean(clazz);
    }

    @SuppressWarnings("unchecked")
    public static <T> T getBean(String beanId) {
        if (applicationContext == null) {
            return null;
        }
        return (T) applicationContext.getBean(beanId);
    }

}
View Code

 

1.7、常量類

package com.bokeyuan.socket.config;

/**
 * @author: void
 * @date: 2021-09-03 14:06
 * @description:
 * @version: 1.0
 */
public class ConfigConstant {

    public static String MsgCode = "GBK";
    /**服務端讀取超時時間*/
    public static long READ_TIMEOUT;
}
View Code

 

1.8、啟動類

package com;
import com.bokeyuan.socket.server.NettyServer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * @author : void
 * @version : 2.0
 * @date : 2020/6/11 10:08
 */
@SpringBootApplication
public class Application implements CommandLineRunner {

    @Autowired
    private NettyServer nettyServer;


    @Override
    public void run(String... args) throws Exception {
        nettyServer.start();
    }

    public static void main(String[] args) {
        SpringApplication.run(Application.class,args);
    }
}
View Code

 

 

出現過的問題及解決方法

1MyByteToMessageCodec is not a @Sharable handler, so can't be added or removed multiple times.

解決方法

在MyByteToMessageCodec類上添加@Scope("prototype")設置為多例模式

 

2、報錯 java.lang.UnsupportedOperationException: direct buffer

使用以下代碼讀取Bytebuf報了下圖錯誤

 

 

 

 

 

 這個寫法jdk1.6支持,jdk1.8不支持

 改用如下方法讀取ByteBuf字節

 

 

 

 

3ByteToMessageDecoder中的decode方法執行多次的問題

 

 參考地址:https://blog.csdn.net/u011412234/article/details/54929360?locationNum=1&fps=1

 解決方法:

因為使用了錯誤的方法(下圖getBytes2(ByteBuf))讀取ByteBuf中的字節,導致readerIndex沒有變化,一直沒有讀完,所有decode方法一直調用。

改為下圖getBytes(ByteBuf)方法讀取

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM