netty 3.x 實現http server和遇到的坑


先轉載一篇 【初學與研發之NETTY】netty3之文件上傳 http://blog.csdn.net/mcpang/article/details/41139859

 

客戶端:
[java] view plain copy

    package netty3.socket.client;  
      
    import static org.jboss.netty.channel.Channels.pipeline;  
      
    import java.io.File;  
    import java.net.InetSocketAddress;  
    import java.util.List;  
    import java.util.concurrent.Executors;  
      
    import org.jboss.netty.bootstrap.ClientBootstrap;  
    import org.jboss.netty.buffer.ChannelBuffer;  
    import org.jboss.netty.channel.Channel;  
    import org.jboss.netty.channel.ChannelFuture;  
    import org.jboss.netty.channel.ChannelHandlerContext;  
    import org.jboss.netty.channel.ChannelPipeline;  
    import org.jboss.netty.channel.ChannelPipelineFactory;  
    import org.jboss.netty.channel.ExceptionEvent;  
    import org.jboss.netty.channel.MessageEvent;  
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;  
    import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;  
    import org.jboss.netty.handler.codec.http.DefaultHttpRequest;  
    import org.jboss.netty.handler.codec.http.HttpChunk;  
    import org.jboss.netty.handler.codec.http.HttpClientCodec;  
    import org.jboss.netty.handler.codec.http.HttpHeaders;  
    import org.jboss.netty.handler.codec.http.HttpMethod;  
    import org.jboss.netty.handler.codec.http.HttpRequest;  
    import org.jboss.netty.handler.codec.http.HttpRequestEncoder;  
    import org.jboss.netty.handler.codec.http.HttpResponse;  
    import org.jboss.netty.handler.codec.http.HttpResponseDecoder;  
    import org.jboss.netty.handler.codec.http.HttpVersion;  
    import org.jboss.netty.handler.codec.http.multipart.DefaultHttpDataFactory;  
    import org.jboss.netty.handler.codec.http.multipart.HttpDataFactory;  
    import org.jboss.netty.handler.codec.http.multipart.HttpPostRequestEncoder;  
    import org.jboss.netty.handler.codec.http.multipart.InterfaceHttpData;  
    import org.jboss.netty.handler.stream.ChunkedWriteHandler;  
    import org.jboss.netty.util.CharsetUtil;  
      
    public class UploadFileClient  
    {  
        private ClientBootstrap bootstrap = null;  
          
        private ChannelFuture future = null;  
          
        private HttpDataFactory factory = null;  
          
        // 服務端處理完成后返回的消息  
        private StringBuffer retMsg = new StringBuffer();  
      
        public UploadFileClient()  
        {  
            bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));  
            bootstrap.setPipelineFactory(new UploadChannelFactory());  
      
            // 連接超時時間為3s  
            bootstrap.setOption("connectTimeoutMillis", 3000);  
              
            future = bootstrap.connect(new InetSocketAddress("127.0.0.1", 2777));  
              
            // 獲得一個閾值,它是來控制上傳文件時內存/硬盤的比值,防止出現內存溢出  
            factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE);  
        }  
          
        /** 
         * 方法描述:關閉文件發送通道(為阻塞式) 
         */  
        public void shutdownClient()  
        {  
            // 等待數據的傳輸通道關閉  
            future.getChannel().getCloseFuture().awaitUninterruptibly();  
              
            bootstrap.releaseExternalResources();  
              
            // Really clean all temporary files if they still exist  
            factory.cleanAllHttpDatas();  
        }  
          
        /** 
         * 方法描述:獲取發送文件過程中服務端反饋的消息 
         * @return 服務端反饋的消息 
         */  
        public String getRetMsg()  
        {  
            return retMsg.toString();  
        }  
      
        /** 
         * 方法描述:將文件上傳到服務端 
         * @param file 待上傳的文件 
         */  
        public void uploadFile(File file)  
        {  
            if (!file.canRead())  
            {  
                return;  
            }  
              
            // Simple Post form: factory used for big attributes  
            List<InterfaceHttpData> bodylist = formpost(file);  
            if (bodylist == null)  
            {  
                return;  
            }  
              
            // Multipart Post form: factory used  
            uploadFileToServer(file.getName(), factory, bodylist);  
        }  
          
      
        /** 
         * @param file 
         * @return 
         */  
        private List<InterfaceHttpData> formpost(File file)  
        {  
            // Prepare the HTTP request.  
            HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "");  
      
            // Use the PostBody encoder  
            HttpPostRequestEncoder bodyRequestEncoder = null;  
            try  
            {  
                bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, false);  
                bodyRequestEncoder.addBodyAttribute("getform", "POST");  
                bodyRequestEncoder.addBodyFileUpload("myfile", file, "application/x-zip-compressed", false);  
            }  
            catch(Exception e)  
            {  
                // should not be since args are not null  
                e.printStackTrace();  
                return null;  
            }  
      
            // Create the bodylist to be reused on the last version with Multipart support  
            List<InterfaceHttpData> bodylist = bodyRequestEncoder.getBodyListAttributes();  
      
            return bodylist;  
        }  
          
        /** 
         * Multipart example 
         */  
        private void uploadFileToServer(String fileName, HttpDataFactory factory, List<InterfaceHttpData> bodylist)  
        {  
            // Wait until the connection attempt succeeds or fails.  
            Channel channel = future.awaitUninterruptibly().getChannel();  
            if (!future.isSuccess())  
            {  
                future.getCause().printStackTrace();  
                bootstrap.releaseExternalResources();  
                return;  
            }  
      
            // Prepare the HTTP request.  
            HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, fileName);  
              
            // 設置該屬性表示服務端文件接收完畢后會關閉發送通道  
            request.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.CLOSE);  
      
            // Use the PostBody encoder  
            HttpPostRequestEncoder bodyRequestEncoder = null;  
            try  
            {  
                bodyRequestEncoder = new HttpPostRequestEncoder(factory, request, true);  
                bodyRequestEncoder.setBodyHttpDatas(bodylist);  
                bodyRequestEncoder.finalizeRequest();  
            }  
            catch(Exception e)  
            {  
                // should not be since no null args  
                e.printStackTrace();  
            }  
            System.out.println("開始時間:"+System.currentTimeMillis());  
            // send request  
            channel.write(request);  
      
            // test if request was chunked and if so, finish the write  
            if (bodyRequestEncoder.isChunked())  
            {  
                channel.write(bodyRequestEncoder).awaitUninterruptibly();  
            }  
      
            // Now no more use of file representation (and list of HttpData)  
            bodyRequestEncoder.cleanFiles();  
        }  
      
        private class UploadChannelFactory implements ChannelPipelineFactory  
        {  
      
            public ChannelPipeline getPipeline() throws Exception  
            {  
                ChannelPipeline pipeline = pipeline();  
      
                pipeline.addLast("decoder", new HttpResponseDecoder());  
                pipeline.addLast("encoder", new HttpRequestEncoder());  
                pipeline.addLast("codec", new HttpClientCodec());  
                pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());  
                pipeline.addLast("handler", new UploadClientHandler());           
      
                return pipeline;  
            }  
        }  
      
        private class UploadClientHandler extends SimpleChannelUpstreamHandler  
        {  
            private boolean readingChunks;  
      
            /** 
             * 方法描述:接收服務端返回的消息 
             * @param ctx 發送消息的通道對象 
             * @param e 消息發送事件對象 
             */  
            public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception  
            {  
                if (!readingChunks)  
                {  
                    HttpResponse response = (HttpResponse)e.getMessage();  
      
                    // 收到服務端反饋的消息,並且鏈接正常、且還有后續消息  
                    if (response.getStatus().getCode() == 200 && response.isChunked())  
                    {  
                        readingChunks = true;  
                    }  
                    else  
                    {  
                        // 服務端有反饋消息,但沒有后續的消息了  
                        ChannelBuffer content = response.getContent();  
                        if (content.readable())  
                        {  
                            retMsg.append(content.toString(CharsetUtil.UTF_8));  
                        }  
                    }  
                }  
                else  
                {  
                    HttpChunk chunk = (HttpChunk)e.getMessage();  
                    if (chunk.isLast())  
                    {  
                        // 服務端的消息接收完畢  
                        readingChunks = false;  
                    }  
                    else  
                    {  
                        // 連續接收服務端發過來的消息  
                        retMsg.append(chunk.getContent().toString(CharsetUtil.UTF_8));  
                    }  
                }  
            }  
      
            /** 
             * 方法描述:消息接收或發送過程中出現異常 
             * @param ctx 發送消息的通道對象 
             * @param e 異常事件對象 
             */  
            public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception  
            {  
                System.out.println("異常--:" + e.getCause());  
                e.getChannel().close();  
                  
                // 有異常后釋放客戶端占用的通道資源  
                shutdownClient();  
            }  
        }  
    }  


 

服務端:
[java] view plain copy

    package netty3.socket.server;  
      
    import static org.jboss.netty.channel.Channels.pipeline;  
      
    import java.net.InetSocketAddress;  
    import java.util.concurrent.Executors;  
      
    import org.jboss.netty.bootstrap.ServerBootstrap;  
    import org.jboss.netty.channel.ChannelPipeline;  
    import org.jboss.netty.channel.ChannelPipelineFactory;  
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;  
    import org.jboss.netty.handler.codec.http.HttpRequestDecoder;  
    import org.jboss.netty.handler.codec.http.HttpResponseEncoder;  
    import org.jboss.netty.handler.stream.ChunkedWriteHandler;  
      
    public class InitServer  
    {  
        private static InitServer sockServer = null;  
      
        private static ServerBootstrap bootstrap = null;  
      
        public static InitServer getInstance()  
        {  
            if (sockServer == null)  
            {  
                sockServer = new InitServer();  
            }  
            return sockServer;  
        }  
      
        public InitServer()  
        {  
            bootstrap = new ServerBootstrap(new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()));  
      
            bootstrap.setPipelineFactory(new ChannelPipelineFactory()  
            {  
                public ChannelPipeline getPipeline() throws Exception  
                {  
                    ChannelPipeline pipeline = pipeline();  
                    pipeline.addLast("decoder", new HttpRequestDecoder());  
                    pipeline.addLast("encoder", new HttpResponseEncoder());  
                    pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());  
                    pipeline.addLast("handler", new ServerHandler());  
                      
                    return pipeline;  
                }  
      
            });  
      
            bootstrap.bind(new InetSocketAddress("127.0.0.1", 2777));  
        }  
          
        public void shutdownServer()  
        {  
            bootstrap.releaseExternalResources();  
        }  
    }  


 
[java] view plain copy

    package netty3.socket.server;  
      
    import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CACHE_CONTROL;  
    import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;  
    import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.DATE;  
    import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.EXPIRES;  
    import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.LAST_MODIFIED;  
    import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;  
      
    import java.io.File;  
    import java.io.FileNotFoundException;  
    import java.io.IOException;  
    import java.io.RandomAccessFile;  
    import java.io.UnsupportedEncodingException;  
    import java.net.URLDecoder;  
    import java.text.SimpleDateFormat;  
    import java.util.Calendar;  
    import java.util.Date;  
    import java.util.GregorianCalendar;  
    import java.util.HashMap;  
    import java.util.List;  
    import java.util.Locale;  
    import java.util.Map;  
    import java.util.Random;  
    import java.util.TimeZone;  
      
    import javax.activation.MimetypesFileTypeMap;  
      
    import netty3.socket.client.SendMsgClient;  
      
    import org.jboss.netty.buffer.ChannelBuffer;  
    import org.jboss.netty.buffer.ChannelBuffers;  
    import org.jboss.netty.channel.Channel;  
    import org.jboss.netty.channel.ChannelFuture;  
    import org.jboss.netty.channel.ChannelFutureListener;  
    import org.jboss.netty.channel.ChannelFutureProgressListener;  
    import org.jboss.netty.channel.ChannelHandlerContext;  
    import org.jboss.netty.channel.ChannelStateEvent;  
    import org.jboss.netty.channel.Channels;  
    import org.jboss.netty.channel.DefaultFileRegion;  
    import org.jboss.netty.channel.ExceptionEvent;  
    import org.jboss.netty.channel.FileRegion;  
    import org.jboss.netty.channel.MessageEvent;  
    import org.jboss.netty.channel.SimpleChannelHandler;  
    import org.jboss.netty.handler.codec.frame.TooLongFrameException;  
    import org.jboss.netty.handler.codec.http.DefaultHttpRequest;  
    import org.jboss.netty.handler.codec.http.DefaultHttpResponse;  
    import org.jboss.netty.handler.codec.http.HttpChunk;  
    import org.jboss.netty.handler.codec.http.HttpHeaders;  
    import org.jboss.netty.handler.codec.http.HttpMethod;  
    import org.jboss.netty.handler.codec.http.HttpRequest;  
    import org.jboss.netty.handler.codec.http.HttpResponse;  
    import org.jboss.netty.handler.codec.http.HttpResponseStatus;  
    import org.jboss.netty.handler.codec.http.HttpVersion;  
    import org.jboss.netty.handler.codec.http.multipart.Attribute;  
    import org.jboss.netty.handler.codec.http.multipart.DefaultHttpDataFactory;  
    import org.jboss.netty.handler.codec.http.multipart.DiskFileUpload;  
    import org.jboss.netty.handler.codec.http.multipart.FileUpload;  
    import org.jboss.netty.handler.codec.http.multipart.HttpDataFactory;  
    import org.jboss.netty.handler.codec.http.multipart.HttpPostRequestDecoder;  
    import org.jboss.netty.handler.codec.http.multipart.HttpPostRequestDecoder.EndOfDataDecoderException;  
    import org.jboss.netty.handler.codec.http.multipart.InterfaceHttpData;  
    import org.jboss.netty.handler.codec.http.multipart.InterfaceHttpData.HttpDataType;  
    import org.jboss.netty.handler.ssl.SslHandler;  
    import org.jboss.netty.handler.stream.ChunkedFile;  
    import org.jboss.netty.util.CharsetUtil;  
      
    public class ServerHandler extends SimpleChannelHandler  
    {  
        public static final String HTTP_DATE_FORMAT = "EEE, dd MMM yyyy HH:mm:ss zzz";  
          
        public static final String HTTP_DATE_GMT_TIMEZONE = "GMT";  
          
        public static final int HTTP_CACHE_SECONDS = 60;  
          
        private static final HttpDataFactory factory = new DefaultHttpDataFactory(DefaultHttpDataFactory.MINSIZE); // Disk if size exceed MINSIZE  
          
        private HttpPostRequestDecoder decoder;  
          
        private HttpRequest request;  
          
        private String receiveFileName = "";  
          
        private Map<String, String> msgMap = new HashMap<String, String>();  
          
        private boolean readingChunks = false;  
          
        static  
        {  
            DiskFileUpload.baseDirectory = "/home/build1/file_test/";  
        }  
          
        public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception  
        {  
            if (e.getMessage() instanceof HttpRequest)  
            {  
                HttpRequest request = (DefaultHttpRequest)e.getMessage();  
                String uri = sanitizeUri(request.getUri());  
              
                System.out.println(request.isChunked());  
      
                if (request.getMethod() == HttpMethod.POST)  
                {  
                        // 接收客戶端上傳的文件  
                        receiveFileName = uri;  
                        this.request = request;  
                          
                        // clean previous FileUpload if Any  
                        if (decoder != null)  
                        {  
                            decoder.cleanFiles();  
                            decoder = null;  
                        }  
                          
                        // if GET Method: should not try to create a HttpPostRequestDecoder  
                        try  
                        {  
                            decoder = new HttpPostRequestDecoder(factory, request);  
                        }  
                        catch(Exception e1)  
                        {  
                            e1.printStackTrace();  
                            writeResponse(e.getChannel(), "接收文件信息時出現異常:" + e1.toString());  
                            Channels.close(e.getChannel());  
                            return;  
                        }  
                          
                        if (!request.isChunked())  
                        {  
                                         readHttpDataAllReceive(e.getChannel());  
                                         writeResponse(e.getChannel(), "服務端文件接收完畢!");  
                                    }  
                }  
            }  
            else  
            {  
                // New chunk is received  
                HttpChunk chunk = (HttpChunk)e.getMessage();  
                // example of reading only if at the end  
                if (!chunk.isLast())  
                {  
                    try  
                    {  
                        decoder.offer(chunk);  
                    }  
                    catch(Exception e1)  
                    {  
                        e1.printStackTrace();  
                        writeResponse(e.getChannel(), "接收文件數據時出現異常:" + e1.toString());  
                        Channels.close(e.getChannel());  
                        return;  
                    }  
          
                    // example of reading chunk by chunk (minimize memory usage due to Factory)  
                    readHttpDataChunkByChunk();  
                  
                } else {  
                    readHttpDataAllReceive(e.getChannel()); //最后數據 //writeResponse(e.getChannel(), "服務端數據接收完畢!");  
                    String sendMsg = msgMap.get("sendMsg");  
                    System.out.println("服務端收到消息:" + sendMsg);  
      
                    sendReturnMsg(ctx, HttpResponseStatus.OK, "服務端返回的消息!");  
                }  
            }  
        }  
          
        /** 
         * Example of reading all InterfaceHttpData from finished transfer 
         */  
        private void readHttpDataAllReceive(Channel channel)  
        {  
            List<InterfaceHttpData> datas;  
            try  
            {  
                datas = decoder.getBodyHttpDatas();  
            }  
            catch(Exception e1)  
            {  
                e1.printStackTrace();  
                writeResponse(channel, "接收文件數據時出現異常:" + e1.toString());  
                Channels.close(channel);  
                return;  
            }  
              
            for (InterfaceHttpData data : datas)  
            {  
                writeHttpData(data);  
            }  
        }  
          
        /** 
         * Example of reading request by chunk and getting values from chunk to chunk 
         */  
        private void readHttpDataChunkByChunk()  
        {  
            try  
            {  
                while(decoder.hasNext())  
                {  
                    InterfaceHttpData data = decoder.next();  
                    if (data != null)  
                    {  
                        // new value  
                        writeHttpData(data);  
                    }  
                }  
            }  
            catch(EndOfDataDecoderException e1)  
            {  
                e1.printStackTrace();  
            }  
        }  
          
        private void writeHttpData(InterfaceHttpData data)  
        {  
            if (data.getHttpDataType() == HttpDataType.FileUpload)  
            {  
                FileUpload fileUpload = (FileUpload)data;  
                if (fileUpload.isCompleted())  
                {  
                    try  
                    {  
                        Random r = new Random();  
                        StringBuffer fileNameBuf = new StringBuffer();  
                        fileNameBuf.append(DiskFileUpload.baseDirectory).append("U").append(System.currentTimeMillis());  
                        fileNameBuf.append(String.valueOf(r.nextInt(10))).append(String.valueOf(r.nextInt(10)));  
                        fileNameBuf.append(receiveFileName.substring(receiveFileName.lastIndexOf(".")));  
      
                        fileUpload.renameTo(new File(fileNameBuf.toString()));  
                    }  
                    catch(IOException e)  
                    {  
                        e.printStackTrace();  
                    }  
                    System.out.println("結束時間:"+System.currentTimeMillis());  
                }  
                else  
                {  
                    System.out.println("\tFile to be continued but should not!\r\n");  
                }  
            }  
            else if (data.getHttpDataType() == HttpDataType.Attribute)   
            {  
                Attribute attribute = (Attribute)data;  
                try  
                {  
                    msgMap.put(attribute.getName(), attribute.getString());  
                }  
                catch(IOException e)  
                {  
                    e.printStackTrace();  
                }  
            }  
        }  
          
        private void writeResponse(Channel channel, String retMsg)  
        {  
            // Convert the response content to a ChannelBuffer.  
            ChannelBuffer buf = ChannelBuffers.copiedBuffer(retMsg, CharsetUtil.UTF_8);  
      
            // Decide whether to close the connection or not.  
            boolean close = HttpHeaders.Values.CLOSE.equalsIgnoreCase(request.getHeader(HttpHeaders.Names.CONNECTION))  
                    || request.getProtocolVersion().equals(HttpVersion.HTTP_1_0)  
                    && !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(request.getHeader(HttpHeaders.Names.CONNECTION));  
      
            // Build the response object.  
            HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);  
            response.setContent(buf);  
            response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/plain; charset=UTF-8");  
      
            if (!close)  
            {  
                // There's no need to add 'Content-Length' header  
                // if this is the last response.  
                response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.readableBytes()));  
            }  
      
            // Write the response.  
            ChannelFuture future = channel.write(response);  
            // Close the connection after the write operation is done if necessary.  
            if (close)  
            {  
                future.addListener(ChannelFutureListener.CLOSE);  
            }  
        }  
          
        private String sanitizeUri(String uri)  
        {  
            try  
            {  
                uri = URLDecoder.decode(uri, "UTF-8");  
            }  
            catch(UnsupportedEncodingException e)  
            {  
                try  
                {  
                    uri = URLDecoder.decode(uri, "ISO-8859-1");  
                }  
                catch(UnsupportedEncodingException e1)  
                {  
                    throw new Error();  
                }  
            }  
      
            return uri;  
        }  
      
        /** 
         * 方法描述:設置請求響應的header信息 
         * @param response 請求響應對象 
         * @param fileToCache 下載文件 
         */  
        private static void setContentTypeHeader(HttpResponse response, File fileToCache)  
        {  
            MimetypesFileTypeMap mimeTypesMap = new MimetypesFileTypeMap();  
            response.setHeader(CONTENT_TYPE, mimeTypesMap.getContentType(fileToCache.getPath()));  
              
            SimpleDateFormat dateFormatter = new SimpleDateFormat(HTTP_DATE_FORMAT, Locale.US);  
            dateFormatter.setTimeZone(TimeZone.getTimeZone(HTTP_DATE_GMT_TIMEZONE));  
      
            // Date header  
            Calendar time = new GregorianCalendar();  
            response.setHeader(DATE, dateFormatter.format(time.getTime()));  
      
            // Add cache headers  
            time.add(Calendar.SECOND, HTTP_CACHE_SECONDS);  
            response.setHeader(EXPIRES, dateFormatter.format(time.getTime()));  
            response.setHeader(CACHE_CONTROL, "private, max-age=" + HTTP_CACHE_SECONDS);  
            response.setHeader(LAST_MODIFIED, dateFormatter.format(new Date(fileToCache.lastModified())));  
        }  
          
        /** 
         * 方法描述:給客戶端發送反饋消息 
         * @param ctx 發送消息的通道 
         * @param status 狀態 
         * @param retMsg 反饋消息 
         */  
        private static void sendReturnMsg(ChannelHandlerContext ctx, HttpResponseStatus status, String retMsg)  
        {  
            HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);  
            response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");  
            response.setContent(ChannelBuffers.copiedBuffer(retMsg, CharsetUtil.UTF_8));  
      
            // 信息發送成功后,關閉連接通道  
            ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);  
        }  
          
        public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception  
        {  
            if (decoder != null)  
            {  
                decoder.cleanFiles();  
            }  
            System.out.println("連接斷開:" + e.getChannel().getRemoteAddress().toString());  
        }  
          
        public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception  
        {  
            String remoteIp = e.getChannel().getRemoteAddress().toString();  
            System.out.println(remoteIp.substring(1, remoteIp.indexOf(":")));  
            System.out.println("收到連接:" + e.getChannel().getRemoteAddress().toString());  
        }  
      
      
        @Override  
        public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception  
        {  
            Channel ch = e.getChannel();  
            Throwable cause = e.getCause();  
            if (cause instanceof TooLongFrameException)  
            {  
                return;  
            }  
      
            System.err.println("連接的通道出現異常:" + cause.toString());  
            if (ch.isConnected())  
            {  
                System.out.println("連接還沒有關閉!");  
                ch.close();  
            }  
        }  
      
      
      
    }  

 

我在此基礎上仿寫了一個http server, 但是當執行到 readHttpDataAllReceive(e.getChannel()); //最后數據  這里面的方法體 datas = decoder.getBodyHttpDatas();的時候就會報錯,我首先忽略這部分異常不返回,還是繼續走下面的讀取代碼,雖然程序沒有報錯,但是最后獲取到的數據就丟失了部分
回頭看看 datas = decoder.getBodyHttpDatas(); 這里報的異常是org.jboss.netty.handler.codec.http.multipart.HttpPostRequestDecoder$ NotEnoughDataDecoderException
我們看看netty 3.07的源碼
public List<InterfaceHttpData> getBodyHttpDatas()
            throws NotEnoughDataDecoderException {
        if (!isLastChunk) {
            throw new NotEnoughDataDecoderException();
        }
        return bodyListHttpData;
    }

isLastChunk=false得時候就會觸發這個異常,但是我們之前的代碼流程不是已經到了最后一個chunk了嗎,怎么回事?我們再看看isLastChunk的代碼

public void offer(HttpChunk chunk) throws ErrorDataDecoderException {
        ChannelBuffer chunked = chunk.getContent();
        if (undecodedChunk == null) {
            undecodedChunk = chunked;
        } else {
            //undecodedChunk = ChannelBuffers.wrappedBuffer(undecodedChunk, chunk.getContent());
            // less memory usage
            undecodedChunk = ChannelBuffers.wrappedBuffer(
                    undecodedChunk, chunked);
        }
        if (chunk.isLast()) {
            isLastChunk = true;
        }
        parseBody();
    }

offer方法里會觸發isLastChunk=true, 那問題就清晰了,我們再回到readHttpDataAllReceive(e.getChannel()); //最后數據 這段代碼里,在這之前也加上

 
          
try { decoder.offer(chunk); } catch(Exception e1) { e1.printStackTrace();  return; } 
 

問題解決,接收數據也完整

stackoverflow也提及過這個問題

https://stackoverflow.com/questions/23989217/posting-data-to-netty-with-apache-httpclient

To solve the problem you either need to offer() all chunks (HttpContent) of a message to HttpPostRequestDecoder before calling getBodyHttpDatas(), or alternatively you can just add the HttpObjectAggregator handler right before your handler to the channel's pipeline. If you do so, HttpObjectAggregator will collect all chunks for you and produce a single FullHttpRequest in place of multiple chunks. Passing FullHttpRequest instead of an ordinary HttpRequest to HttpPostRequestDecoder's constructor eliminates need to offer() chunks.

意思是通過offer方法把一個http請求的若干個chunk合在一起,在調用getBodyHttpDatas()前必須使用offer()方法把message完整化

 

剛開始寫這個http server的時候,客戶端發送數據很少,包體不會拆分成若干個chunk,一切都很正常,后來當客戶端發送數據很大的時候,使用之前的httpRequest根本沒有辦法獲取到數據

PS: 一個http請求里的拆分成若干個chunk后,channelid是一樣的, 我們通過這個channelid來重新組裝數據



 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM