netty網絡通信中的tcp拆包問題


    工作中的一個項目,我們的一個應用與銀行系統進行tcp通信的時候,銀行下送的報文有時會分多次返回。在tcp中這種數據包分多次小數據包發送的情況成為拆包問題。

其中一個,也是最常見的思路就是在報文的報文頭部分規定某一段代表本次發送的完整報文的長度,這樣接收方就會心中有數,在沒有接收到這個長度的報文之前,認為本次通信未完成,數據包還不完整,從而繼續等待下去。之前曾經遇到過這樣的問題,那時候是用的java socket逐個字節對報文進行接收,直到看到結尾符為止。

    只是這次項目原來的程序員用的netty框架,一開始沒有注意到如何在netty正確處理拆包問題。導致后續投產后,銀行返回的報文出現沒有完整接收的情況,截斷在中文漢字處產生亂碼,導致異常。

    下面介紹如何在netty中處理拆包問題。

    

server端代碼:

public class NettyServer {
    public static void main(String[] args) {
        ServerBootstrap bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));

        // Set up the default event pipeline.
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new StringDecoder(), new StringEncoder(), new ServerHandler());
            }
        });

        // Bind and start to accept incoming connections.
        Channel bind = bootstrap.bind(new InetSocketAddress(8000));
        System.out.println("Server已經啟動,監聽端口: " + bind.getLocalAddress() + ", 等待客戶端注冊。。。");
    }

    private static class ServerHandler extends SimpleChannelHandler {
        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            if (e.getMessage() instanceof String) {
                String message = (String) e.getMessage();
                System.out.println("Client發來:" + message);

              //  e.getChannel().write("Server已收到剛發送的:" + message+"\n");
                e.getChannel().write("000287<?xml version=\"1.0\" encoding=\"GB18030\"?><root><head><TransCode>1002</TransCode><TransDate>20161025</TransDate><TransTime>092745</TransTime>"+
   "<SeqNo>2016110542160157</SeqNo><ZoneCode>HZCQ</ZoneCode><TransRltCode>-25330</TransRltCode><TransRltMsg>000</TransRltMsg></head><body></body></root>");
               

                System.out.println("\n等待客戶端輸入。。。");
            }

            super.messageReceived(ctx, e);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
            super.exceptionCaught(ctx, e);
        }

        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            System.out.println("有一個客戶端注冊上來了。。。");
            System.out.println("Client:" + e.getChannel().getRemoteAddress());
            System.out.println("Server:" + e.getChannel().getLocalAddress());
            System.out.println("\n等待客戶端輸入。。。");
            super.channelConnected(ctx, e);
        }
    }
}

 

client端代碼:

public class NettyClient {

    public static void main(String[] args) {
        // Configure the client.
        ClientBootstrap bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));

        // Set up the default event pipeline.
        bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() throws Exception {
                return Channels.pipeline(new DeliDecoder(),
                                        new StringEncoder(), 
                                        new ClientHandler());
            }
        });

        // Start the connection attempt.
        ChannelFuture future = bootstrap.connect(new InetSocketAddress("localhost", 8000));

        // Wait until the connection is closed or the connection attempt fails.
        future.getChannel().getCloseFuture().awaitUninterruptibly();

        // Shut down thread pools to exit.
        bootstrap.releaseExternalResources();
    }

    private static class ClientHandler extends SimpleChannelHandler {
        private BufferedReader sin = new BufferedReader(new InputStreamReader(System.in));

        @Override
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            if (e.getMessage() instanceof String) {
                String message = (String) e.getMessage();
                System.out.println(message);
                e.getChannel().write(sin.readLine());
            }

            super.messageReceived(ctx, e);
        }

        @Override
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
            System.out.println("已經與Server建立連接。。。。");
            System.out.println("\n請輸入要發送的信息:");
            super.channelConnected(ctx, e);

            e.getChannel().write("ddddd");
        }
    }
}

拆包問題解決的關鍵:

public class DeliDecoder extends FrameDecoder{
    
    private static final Logger LOG = Logger.getLogger(DeliDecoder.class);
    private final int headLen = 6; //銀行回傳的報文前6位為報文長度,前6位不計算在長度內

    @Override
    protected Object decode(ChannelHandlerContext chc, Channel channel,
            ChannelBuffer buffer) throws Exception {
        LOG.info("進入DeliDecoder.decode()");
        
         if (buffer.readableBytes() < headLen) {
            return null;    //return null表示繼續讀取,下同
        }
         LOG.info("buffer copy...");
         ChannelBuffer buffer2 = buffer.copy();    //直接用buffer.array()可能會報UnsupportedOperationException,故使用其copy
         LOG.info("buffer copy done");
         byte[] arr = buffer2.array();
         LOG.info("buffer array init");
         String temStr = new String(arr, "GB18030");
         LOG.info(temStr);
         int dataLength = Integer.parseInt(temStr.substring(0, 6));
        LOG.info("dataLength : " + dataLength);
        
        if (buffer.readableBytes() < dataLength + headLen) {
            return null;
        }

        buffer.skipBytes(headLen);        //從第7位開始讀取報文正文
        byte[] decoded = new byte[dataLength];
        buffer.readBytes(decoded);
        String msg = new String(decoded, "GB18030");
        return msg;
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM