我們現在用得非常多互聯網下載文件,非常直觀。有一個下載按鈕,然后我點擊了下載,然后文件慢慢就下載到本地了。就好像是一個復制的過程。
而既然是互聯網,那么必然會是使用網絡進行傳輸的。那么到底是怎樣傳輸的呢?
當然,下載文件有兩種方式:一是直接針對某個文件資源進行下載,無需應用開發代碼;二是應用代碼臨時生成需要的內容文件,然后輸出給到下載端。
其中,直接下載資源文件的場景給我們感覺是下載就是針對這個文件本身的一個操作,和復制一樣沒有什么疑義。而由應用代碼進行下載文件時,又當如何處理呢?
1. 上傳下載文件demo
在網上你可以非常容易地找到相應的模板代碼,然后處理掉。基本的樣子就是設置幾個頭信息,然后將數據寫入到response中。
demo1. 服務端接收文件上傳,並同時輸出文件到客戶端
@PostMapping("fileUpDownTest") @ResponseBody public Object fileUpDownTest(@ModelAttribute EncSingleDocFileReqModel reqModel, MultipartFile file, HttpServletResponse response) { // 做兩件事:1. 接收上傳的文件; 2. 將文件下載給到上傳端; // 即向雙向文件的傳輸,下載的文件可以是你處理之后的任意文件。 String tmpPath = saveMultipartToLocalFile(file); outputEncFileStream(tmpPath, response); System.out.println("path:" + tmpPath); return null; } /** * 保存文件到本地路徑 * * @param file 文件流 * @return 本地存儲路徑 */ private String saveMultipartToLocalFile(MultipartFile file) { try (InputStream inputStream = file.getInputStream()){ // 往臨時目錄寫文件 String fileSuffix = file.getOriginalFilename().substring(file.getOriginalFilename().lastIndexOf('.')); File tmpFile = File.createTempFile(file.getName(), ".tmp" + fileSuffix); FileUtils.copyInputStreamToFile(inputStream, tmpFile); return tmpFile.getCanonicalPath(); } catch (Exception e){ log.error("【加密文件】文件流處理失敗:" + file.getName(), e); throw new EncryptSysException("0011110", "文件接收失敗"); } } /** * 輸出文件流數據 * * @param encFileLocalPath 文件所在路徑 * @param response servlet io 流 */ private void outputEncFileStream(String encFileLocalPath, HttpServletResponse response) { File outFile = new File(encFileLocalPath); OutputStream os = null; InputStream inputStream = null; try { response.reset(); response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // response.setHeader("Content-Length", file.getContentLength()+""); String outputFileName = encFileLocalPath.substring(encFileLocalPath.lastIndexOf('/') + 1); response.setHeader("Content-Disposition", String.format("attachment; filename=%s", URLEncoder.encode(outputFileName, "UTF-8"))); response.setContentType("application/octet-stream; charset=utf-8"); response.setHeader("Pragma", "no-cache"); response.setHeader("Expires", "0"); inputStream = new FileInputStream(outFile); //寫入信息 os = CommonUtil.readInputStream(inputStream, response.getOutputStream()); } catch (Exception re) { log.error("輸出文件流失敗,", re); throw new RuntimeException("0011113: 輸出加密后的文件失敗"); } finally { if (os != null) { try { os.flush(); os.close(); } catch (IOException e) { log.error("輸出流文件失敗", e); } } if(inputStream != null) { try { inputStream.close(); } catch (IOException e) { log.error("加密文件輸入流關閉失敗", e); } } } }
我們在做開發時,面對的僅僅是 Request, Response 這種什么都有對象,直接問其要相關信息即可。給我們提供方便的同時,也失去了了解真相的機會。
demo2. 服務端轉發文件到另一個服務端,並同接收處理響應回來的文件流數據
/** * 使用本地文件,向加密服務器請求加密文件,並輸出到用戶端 * * @param localFilePath 想要下載的文件 * @return 文件流 */ @GetMapping("transLocalFileToEnc") public Object transLocalFileToEnc(@ModelAttribute EncSingleDocFileReqModel reqModel, @RequestParam String localFilePath, HttpServletResponse response) { File localFileIns = new File(localFilePath); if(!localFileIns.exists()) { return ResponseInfoBuilderUtil.fail("指定文件未找到"); } try(InputStream sourceFileInputStream = new FileInputStream(localFileIns);) { //這個url是要上傳到另一個服務器上接口, 此處模擬向本機發起加密請求 String url = "http://localhost:8082/encrypt/testEnc"; int lastFileSeparatorIndex = localFilePath.lastIndexOf('/'); String filename = lastFileSeparatorIndex == -1 ? localFilePath.substring(localFilePath.lastIndexOf('\\')) : localFilePath.substring(lastFileSeparatorIndex); Object object = null; // 創建HttpClients實體類 CloseableHttpClient aDefault = HttpClients.createDefault(); try { HttpPost httpPost = new HttpPost(url); MultipartEntityBuilder builder = MultipartEntityBuilder.create(); //使用這個,另一個服務就可以接收到這個file文件了 builder.addBinaryBody("file", sourceFileInputStream, ContentType.create("multipart/form-data"), URLEncoder.encode(filename, "utf-8")); builder.addTextBody("systemCode", "self"); String encOutputFilename = filename; builder.addTextBody("encOutputFileName", encOutputFilename); HttpEntity entity = builder.build(); httpPost.setEntity(entity); ResponseHandler<Object> rh = new ResponseHandler<Object>() { @Override public Object handleResponse(HttpResponse re) throws IOException { HttpEntity entity = re.getEntity(); if(entity.getContentType().toString().contains("application/json")) { // 通過判斷響應類型來判斷是否輸出文件流,非嚴謹的做法 String retMsg = EntityUtils.toString(entity, "UTF-8"); return JSONObject.parseObject(retMsg, ResponseInfo.class); } InputStream input = entity.getContent(); // String result = EntityUtils.toString(entity, "UTF-8"); // 寫入響應流信息 OutputStream os = null; try { response.setHeader("Cache-Control", "no-cache, no-store, must-revalidate"); // response.setHeader("Content-Length", file.getContentLength()+""); response.setHeader("Content-Disposition", String.format("attachment; filename=%s", URLEncoder.encode(filename, "UTF-8"))); response.setContentType("application/octet-stream; charset=utf-8"); response.setHeader("Pragma", "no-cache"); response.setHeader("Expires", "0"); // 往臨時目錄寫文件 File tmpFile = File.createTempFile(filename, ""); FileUtils.copyInputStreamToFile(input, tmpFile); String encFilePathTmp = tmpFile.getCanonicalPath(); File encFileIns = new File(encFilePathTmp); if(encFileIns.exists()) { FileInputStream zipStream = new FileInputStream(encFileIns); os = CommonUtil.readInputStream(zipStream, response.getOutputStream()); } } finally { if(os != null) { os.flush(); os.close(); } } // 已向客戶端輸出文件流 return Boolean.TRUE; } }; object = aDefault.execute(httpPost, rh); return object == Boolean.TRUE ? "加密成功,下載文件去!" : object; } catch (Exception e) { log.error("", e); } finally { try { aDefault.close(); } catch (IOException e) { log.error("關閉錯誤", e); } } } catch (FileNotFoundException e) { log.error("要加密的文件不存在", e); } catch (IOException e) { log.error("要加密的文件不存在", e); } return "處理失敗"; } // 抽出寫socket流的邏輯,方便統一控制 /** * 從輸入流中獲取字節數組 * * @param inputStream 輸入流 * @return 輸出流,超過5000行數據,刷寫一次網絡 * @throws IOException */ public static OutputStream readInputStream(InputStream inputStream, OutputStream os) throws IOException { byte[] bytes = new byte[2048]; int i = 0; int read = 0; //按字節逐個寫入,避免內存占用過高 while ((read = inputStream.read(bytes)) != -1) { os.write(bytes, 0, read); i++; // 每5000行 if (i % 5000 == 0) { os.flush(); } } inputStream.close(); return os; }
此處僅是使用后端代碼展現了前端的一人 form 提交過程,並無技巧可言。不過,這里說明了一個問題:文件流同樣可以任意在各服務器間流轉。只要按照協議規范實現即可。(注意以上代碼可能需要引入pom依賴: org.apache.httpcomponents:httpclient:4.5.6,org.apache.httpcomponents:httpmime:4.5.6)
2. http 協議之文件處理
一般地,我們應對的互聯網上的整個上傳下載文件,基本都是基於http協議的。所以,要從根本上理解上傳下載文件的原理,來看看http協議就好了。
我們可以通過上面的demo看下上傳時候的數據樣子,我們通過 fiddler進行抓包查看數據即可得如下:
POST http://localhost:8082/test/fileUpDownTest?systemCode=1111&outputFileName=111 HTTP/1.1 Host: localhost:8082 Connection: keep-alive Content-Length: 197 Accept: */* X-Requested-With: XMLHttpRequest User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/81.0.4044.129 Safari/537.36 OPR/68.0.3618.63 Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryen2ZJyNfx7WhA3yO Origin: http://localhost:8082 Sec-Fetch-Site: same-origin Sec-Fetch-Mode: cors Sec-Fetch-Dest: empty Referer: http://localhost:8082/swagger-ui.html Accept-Encoding: gzip, deflate, br Accept-Language: zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7 Cookie: JSESSIONID=40832A6766FB11E105717690AEF826AA ------WebKitFormBoundaryen2ZJyNfx7WhA3yO Content-Disposition: form-data; name="file"; filename="123.txt" Content-Type: text/plain 123content over ------WebKitFormBoundaryen2ZJyNfx7WhA3yO Content-Disposition: form-data; name="file2"; filename="123-2.txt" Content-Type: text/plain 2222content 2over ------WebKitFormBoundaryen2ZJyNfx7WhA3yO--
因為fiddler會做解碼操作,且http是一種基於字符串的傳輸協議,所以,我們看到的都是可讀的文件信息。我這里模擬是使用一個 123.txt 的文件,里面輸入了少量字符:“123content\nover”;
我們知道,http協議是每行作為一個header的,其中前三是固定的,不必多說。
與我們相關的有:
Content-Type: multipart/form-data; boundary=----WebKitFormBoundaryen2ZJyNfx7WhA3yO
Content-Type是個重要的標識字段,當我們用文件上傳時,multipart/form-data代表了這是一個多部分上傳的文件類型請求,即此處的文件上傳請求。 后面的 boundary 代表在上傳的實際多個部分內容時的分界線,該值應是在每次請求時隨機生成且避免與業務數據的沖突。
Content-Length: 197.
這個值是由瀏覽器主動計算出來的負載內容長度,服務端收到該信息后,只會讀取這么多的長度即認為傳輸完成。
http協議的包體是從遇到第一個兩個連續的換行符開始的。(所以,如果在header中包含了此特征時,需要自行編碼后再請求,否則將發生協議沖突。)
每個part部分的內容,以boundary作為分界線。part部分的內容可以是文件、流、或者純粹的key-value。
根據以上數據格式,服務端作出相應的反向解析就可以得到相應的內容了。
如果服務響應的結果是一個文件下載,那么對於響應的結果示例如下:
HTTP/1.1 200 Cache-Control: no-cache, no-store, must-revalidate Content-Disposition: attachment; filename=file5983940017135638617.tmp.txt Pragma: no-cache Expires: 0 Content-Type: application/octet-stream;charset=utf-8 Transfer-Encoding: chunked Date: Sun, 17 May 2020 05:30:57 GMT 10 123content over 0
重要字段說明:
Content-Disposition: attachment; filename=file5983940017135638617.tmp.txt
該字段說明本次響應的值應該作為一個附件形式下載保存到本地,這會被幾乎所有瀏覽器支持。但如果你自己寫代碼接收,那就隨你意好了,它只是一個標識而已;其中 filename 是用作用戶下載時的默認保存名稱,如果本地已存在一般會被添加(xxx)的后綴以避免下載覆蓋。
Content-Type: application/octet-stream;charset=utf-8
代表這是一個二進制的文件,也就是說,瀏覽器一般無法作出相應的處理。當然,這也只是一個建議,至於你輸出的是啥也無所謂了,反正只要追加到文件之后,就可以還原文件內容了。同樣,遇到第一個連續的換行之后,代表正式的文件內容開始了。
如上的輸出中,並沒有 Content-Length 字段,所以無法直接推斷出下載的數據大小,所以會在前后加一些字符器,用於判定結束。這樣做可能導致瀏覽器上無法判定已下載的數據量占比,即無法展示進度條。雖然不影響最終下載數據,但是一般別這么干。
如下,我們加下content-length之后的響應如下:
HTTP/1.1 200 Cache-Control: no-cache, no-store, must-revalidate Content-Disposition: attachment; filename=file4383190990004865558.tmp.txt Pragma: no-cache Expires: 0 Content-Type: application/octet-stream;charset=utf-8 Content-Length: 16 Date: Sun, 17 May 2020 07:26:47 GMT 123content over
如上,就是http協議對於文件的處理方式了,只要你按照協議規定進行請求時,對端就能接受你的文件上傳。只要服務按照協議規定輸出響應數據,瀏覽器端就可以進行相應文件下載。
http協議頭更多信息可以參考:https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers
3. http協議上傳下載的背后,還有什么?
我們知道,http協議是基於tcp協議上實現的一個應用層協議。上一節我們說到的,如何進行上傳下載文件,也是基於應用層去說的。說直接點就是,如果把網絡比作黑盒,那么我們認為這個黑盒會給我們正確的數據。我們只要基於這些數據,就可以解析相應的文件信息了。
實際上,tcp協議是一種可靠的傳輸協議。至於如何可靠,額,這么說吧:網絡上的信息是非常復雜和無序的,你從一個端點發送數據到另一個網絡站點,會使用IP協議通過網絡傳送出去,而這些傳輸是單向的,多包的。它會受到外部復雜環境的影響,可能有的包丟失,可能有的包后發先到等等。如果不能處理好它們的這些丟包、亂序,重復等問題,那么網絡發過來的數據將是無法使用的。(基本就是數據損壞這個結論)
tcp則是專門為處理這些問題而設計的,具體嘛,就很復雜了。總之一句話,使用了tcp協議后,你就無需關注復雜的網絡環境了,你可以無條件相信你從操作系統tcp層給你的數據就是有序的完整的數據。你可以去看書,或者查看更多網上資料。(書更可靠些,只是更費時間精力)可以參考這篇文章: http://www.ruanyifeng.com/blog/2017/06/tcp-protocol.html
4. java中對於文件上傳的處理實現?
雖然前面我們解讀完成http協議對於文件的上傳處理方式,但是,到具體如何實現,又當如何呢?如果給你一個socket的入口lib,你又如何去處理這些http請求呢?
可以大概這么思考: 1. 接收到頭信息,判斷出是文件類型的上傳;2. 取出 boundary, 取出content-length, 備用;3. 繼續讀取后續的網絡流數據,當發現傳輸的是key-value數據時,將其放入內存緩沖中存起來,當發現是文件類型的數據時,創建一個臨時文件,將讀取到的數據寫入其中,直到該部分文件傳輸完成,並存儲臨時文件信息;4. 讀取完整個http協議指定的數據后,封裝相應的請求給到應用代碼,待應用處理完成后響應給客戶端;
以tomcat為例,它會依次解析各個參數值。
有興趣的的同學可以先看看它是如何接入http請求的吧:(基於nio socket)大概流程為(下圖為其線程模型):Accepter -> Pollor -> SocketProcessor 。

// org.apache.tomcat.util.net.NioEndpoint.Acceptor @Override public void run() { int errorDelay = 0; // Loop until we receive a shutdown command while (running) { // Loop if endpoint is paused while (paused && running) { state = AcceptorState.PAUSED; try { Thread.sleep(50); } catch (InterruptedException e) { // Ignore } } if (!running) { break; } state = AcceptorState.RUNNING; try { //if we have reached max connections, wait countUpOrAwaitConnection(); SocketChannel socket = null; try { // Accept the next incoming connection from the server // socket // Nio 的 ServerSocketChannelImpl, 阻塞等待socket accept 事件 socket = serverSock.accept(); } catch (IOException ioe) { // We didn't get a socket countDownConnection(); if (running) { // Introduce delay if necessary errorDelay = handleExceptionWithDelay(errorDelay); // re-throw throw ioe; } else { break; } } // Successful accept, reset the error delay errorDelay = 0; // Configure the socket if (running && !paused) { // setSocketOptions() will hand the socket off to // an appropriate processor if successful // 處理socket事件 if (!setSocketOptions(socket)) { closeSocket(socket); } } else { closeSocket(socket); } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("endpoint.accept.fail"), t); } } state = AcceptorState.ENDED; } /** * Process the specified connection. * @param socket The socket channel * @return <code>true</code> if the socket was correctly configured * and processing may continue, <code>false</code> if the socket needs to be * close immediately */ protected boolean setSocketOptions(SocketChannel socket) { // Process the connection try { //disable blocking, APR style, we are gonna be polling it // 組裝channel,交給 Pollor socket.configureBlocking(false); Socket sock = socket.socket(); socketProperties.setProperties(sock); NioChannel channel = nioChannels.pop(); if (channel == null) { SocketBufferHandler bufhandler = new SocketBufferHandler( socketProperties.getAppReadBufSize(), socketProperties.getAppWriteBufSize(), socketProperties.getDirectBuffer()); if (isSSLEnabled()) { channel = new SecureNioChannel(socket, bufhandler, selectorPool, this); } else { channel = new NioChannel(socket, bufhandler); } } else { channel.setIOChannel(socket); channel.reset(); } // 添加到 Pollor 隊列中,Poller 的獲取使用輪詢方式獲取 getPoller0().register(channel); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); try { log.error("",t); } catch (Throwable tt) { ExceptionUtils.handleThrowable(tt); } // Tell to close the socket return false; } return true; } /** * Return an available poller in true round robin fashion. * * @return The next poller in sequence */ public Poller getPoller0() { // 第1次取1,第2次取2,第3次取1... 輪詢 int idx = Math.abs(pollerRotater.incrementAndGet()) % pollers.length; return pollers[idx]; } // org.apache.tomcat.util.net.NioEndpoint.Poller#register /** * Registers a newly created socket with the poller. * * @param socket The newly created socket */ public void register(final NioChannel socket) { socket.setPoller(this); NioSocketWrapper ka = new NioSocketWrapper(socket, NioEndpoint.this); socket.setSocketWrapper(ka); ka.setPoller(this); ka.setReadTimeout(getSocketProperties().getSoTimeout()); ka.setWriteTimeout(getSocketProperties().getSoTimeout()); ka.setKeepAliveLeft(NioEndpoint.this.getMaxKeepAliveRequests()); ka.setSecure(isSSLEnabled()); ka.setReadTimeout(getConnectionTimeout()); ka.setWriteTimeout(getConnectionTimeout()); PollerEvent r = eventCache.pop(); // 注冊OP_READ事件,給selector使用 ka.interestOps(SelectionKey.OP_READ);//this is what OP_REGISTER turns into. // 將socket信息添加到 PollerEvent 中 if ( r==null) r = new PollerEvent(socket,ka,OP_REGISTER); else r.reset(socket,ka,OP_REGISTER); addEvent(r); } // 添加事件並喚醒selector // org.apache.tomcat.util.net.NioEndpoint.Poller#addEvent private void addEvent(PollerEvent event) { events.offer(event); // 正在select()阻塞中的 selector, wakeupCounter=-1, 即可被喚醒狀態 if ( wakeupCounter.incrementAndGet() == 0 ) selector.wakeup(); } // step2. Poller 使用selector池處理讀就緒事件 /** * The background thread that adds sockets to the Poller, checks the * poller for triggered events and hands the associated socket off to an * appropriate processor as events occur. */ @Override public void run() { // Loop until destroy() is called while (true) { boolean hasEvents = false; try { if (!close) { // events() 會檢查是否有acceptor提交過來的 PollerEvent, 如果有,會先初始化event // 向selector注冊讀事件等等,以便后續 select() 生效 hasEvents = events(); if (wakeupCounter.getAndSet(-1) > 0) { //if we are here, means we have other stuff to do //do a non blocking select keyCount = selector.selectNow(); } else { keyCount = selector.select(selectorTimeout); } wakeupCounter.set(0); } if (close) { events(); timeout(0, false); try { selector.close(); } catch (IOException ioe) { log.error(sm.getString("endpoint.nio.selectorCloseFail"), ioe); } break; } } catch (Throwable x) { ExceptionUtils.handleThrowable(x); log.error("",x); continue; } //either we timed out or we woke up, process events first if ( keyCount == 0 ) hasEvents = (hasEvents | events()); Iterator<SelectionKey> iterator = keyCount > 0 ? selector.selectedKeys().iterator() : null; // Walk through the collection of ready keys and dispatch // any active event. while (iterator != null && iterator.hasNext()) { SelectionKey sk = iterator.next(); NioSocketWrapper attachment = (NioSocketWrapper)sk.attachment(); // Attachment may be null if another thread has called // cancelledKey() if (attachment == null) { iterator.remove(); } else { // 把key監聽移除,然后去處理具體key, 網絡接入成功 iterator.remove(); processKey(sk, attachment); } }//while //process timeouts timeout(keyCount,hasEvents); }//while getStopLatch().countDown(); } // org.apache.tomcat.util.net.NioEndpoint.Poller#processKey protected void processKey(SelectionKey sk, NioSocketWrapper attachment) { try { if ( close ) { cancelledKey(sk); } else if ( sk.isValid() && attachment != null ) { if (sk.isReadable() || sk.isWritable() ) { // sendfile if ( attachment.getSendfileData() != null ) { processSendfile(sk,attachment, false); } else { // 取消事件監聽,那么后續如何讀數據呢? // 這意味着當前socket將會從epoll的表中移除掉,不再被其管理,但並不影響后續的read // 后續的read() 操作將以bio等式展開 unreg(sk, attachment, sk.readyOps()); boolean closeSocket = false; // Read goes before write // 優先處理讀事件,再處理寫事件 if (sk.isReadable()) { if (!processSocket(attachment, SocketEvent.OPEN_READ, true)) { closeSocket = true; } } if (!closeSocket && sk.isWritable()) { if (!processSocket(attachment, SocketEvent.OPEN_WRITE, true)) { closeSocket = true; } } if (closeSocket) { cancelledKey(sk); } } } } else { //invalid key cancelledKey(sk); } } catch ( CancelledKeyException ckx ) { cancelledKey(sk); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error("",t); } } // org.apache.tomcat.util.net.AbstractEndpoint#processSocket /** * Process the given SocketWrapper with the given status. Used to trigger * processing as if the Poller (for those endpoints that have one) * selected the socket. * * @param socketWrapper The socket wrapper to process * @param event The socket event to be processed * @param dispatch Should the processing be performed on a new * container thread * * @return if processing was triggered successfully */ public boolean processSocket(SocketWrapperBase<S> socketWrapper, SocketEvent event, boolean dispatch) { try { if (socketWrapper == null) { return false; } // 使用線程池處理單個讀事件 SocketProcessorBase<S> sc = processorCache.pop(); if (sc == null) { sc = createSocketProcessor(socketWrapper, event); } else { sc.reset(socketWrapper, event); } // 線程池默認10個核心線程 // 此處的線程池並非原生jdk的線程池ThreadPoolExecutor,而是經過tomcat繼承過來的 org.apache.tomcat.util.threads.ThreadPoolExecutor, 主要用於做一次統計類工作 // 最終的socket處理將會由 SocketProcessor 進行統一調度具體的Handler處理 Executor executor = getExecutor(); if (dispatch && executor != null) { executor.execute(sc); } else { sc.run(); } } catch (RejectedExecutionException ree) { getLog().warn(sm.getString("endpoint.executor.fail", socketWrapper) , ree); return false; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); // This means we got an OOM or similar creating a thread, or that // the pool and its queue are full getLog().error(sm.getString("endpoint.process.fail"), t); return false; } return true; } // 以上過程,請求就從 Poller 中提交到了 SocketProcessor 了,將由 SocketProcessor 進行統一處理 // org.apache.tomcat.util.net.NioEndpoint.SocketProcessor#doRun @Override protected void doRun() { NioChannel socket = socketWrapper.getSocket(); SelectionKey key = socket.getIOChannel().keyFor(socket.getPoller().getSelector()); try { int handshake = -1; try { if (key != null) { if (socket.isHandshakeComplete()) { // No TLS handshaking required. Let the handler // process this socket / event combination. handshake = 0; } else if (event == SocketEvent.STOP || event == SocketEvent.DISCONNECT || event == SocketEvent.ERROR) { // Unable to complete the TLS handshake. Treat it as // if the handshake failed. handshake = -1; } else { handshake = socket.handshake(key.isReadable(), key.isWritable()); // The handshake process reads/writes from/to the // socket. status may therefore be OPEN_WRITE once // the handshake completes. However, the handshake // happens when the socket is opened so the status // must always be OPEN_READ after it completes. It // is OK to always set this as it is only used if // the handshake completes. event = SocketEvent.OPEN_READ; } } } catch (IOException x) { handshake = -1; if (log.isDebugEnabled()) log.debug("Error during SSL handshake",x); } catch (CancelledKeyException ckx) { handshake = -1; } if (handshake == 0) { SocketState state = SocketState.OPEN; // Process the request from this socket if (event == null) { state = getHandler().process(socketWrapper, SocketEvent.OPEN_READ); } else { // org.apache.coyote.AbstractProtocol$ConnectionHandler // 根據具體協議,創建不同的processor處理 如: Http11Processor // 此處hander統一調用外部類的父類處理為: org.apache.coyote.AbstractProtocol$ConnectionHandler state = getHandler().process(socketWrapper, event); } // 如果具體協議處理結果是 CLOSED, 那么就把該close關閉掉 // 從這個意義上來說,普通的請求實際上都是進行長連接的(當然了,客戶端一般會主動再調用一個close(),這就沒法了) if (state == SocketState.CLOSED) { close(socket, key); } } else if (handshake == -1 ) { close(socket, key); } else if (handshake == SelectionKey.OP_READ){ socketWrapper.registerReadInterest(); } else if (handshake == SelectionKey.OP_WRITE){ socketWrapper.registerWriteInterest(); } } catch (CancelledKeyException cx) { socket.getPoller().cancelledKey(key); } catch (VirtualMachineError vme) { ExceptionUtils.handleThrowable(vme); } catch (Throwable t) { log.error("", t); socket.getPoller().cancelledKey(key); } finally { socketWrapper = null; event = null; //return to cache // 復用 proccosor 處理器 if (running && !paused) { processorCache.push(this); } } } } // 以上,就是整個http請求如何轉交給應用處理的大體流程了。 // 不過還有一個問題:就是http請求處理完成之后,是關閉連接不是保持連接又當如何判定呢? // 實際上它是通過協議處理完成后返回一個 SocketState 來決定的,你如果有興趣,請繼續往下: // org.apache.coyote.AbstractProtocol.ConnectionHandler#process // 該 ConnectionHandler 將會統一管理實際的可復用的 Processor, 並針對無效的請求直接返回 SocketState.CLOSED, 以便直接關閉會話 @Override public SocketState process(SocketWrapperBase<S> wrapper, SocketEvent status) { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.process", wrapper.getSocket(), status)); } if (wrapper == null) { // Nothing to do. Socket has been closed. return SocketState.CLOSED; } S socket = wrapper.getSocket(); // 針對socket的處理,ConnectionHandler又使用了一個可復用的容器進行管理processors, 避免大量創建processor的開銷 Processor processor = connections.get(socket); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.connectionsGet", processor, socket)); } // Async timeouts are calculated on a dedicated thread and then // dispatched. Because of delays in the dispatch process, the // timeout may no longer be required. Check here and avoid // unnecessary processing. if (SocketEvent.TIMEOUT == status && (processor == null || !processor.isAsync() || !processor.checkAsyncTimeoutGeneration())) { // This is effectively a NO-OP return SocketState.OPEN; } if (processor != null) { // Make sure an async timeout doesn't fire getProtocol().removeWaitingProcessor(processor); } else if (status == SocketEvent.DISCONNECT || status == SocketEvent.ERROR) { // Nothing to do. Endpoint requested a close and there is no // longer a processor associated with this socket. return SocketState.CLOSED; } ContainerThreadMarker.set(); try { if (processor == null) { String negotiatedProtocol = wrapper.getNegotiatedProtocol(); if (negotiatedProtocol != null) { UpgradeProtocol upgradeProtocol = getProtocol().getNegotiatedProtocol(negotiatedProtocol); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); } else if (negotiatedProtocol.equals("http/1.1")) { // Explicitly negotiated the default protocol. // Obtain a processor below. } else { // TODO: // OpenSSL 1.0.2's ALPN callback doesn't support // failing the handshake with an error if no // protocol can be negotiated. Therefore, we need to // fail the connection here. Once this is fixed, // replace the code below with the commented out // block. if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); } return SocketState.CLOSED; /* * To replace the code above once OpenSSL 1.1.0 is * used. // Failed to create processor. This is a bug. throw new IllegalStateException(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", negotiatedProtocol)); */ } } } if (processor == null) { processor = recycledProcessors.pop(); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.processorPop", processor)); } } if (processor == null) { processor = getProtocol().createProcessor(); register(processor); } processor.setSslSupport( wrapper.getSslSupport(getProtocol().getClientCertProvider())); // Associate the processor with the connection connections.put(socket, processor); SocketState state = SocketState.CLOSED; do { // 該state最終會由 具體的processor決定, 如: Http11Processor state = processor.process(wrapper, status); if (state == SocketState.UPGRADING) { // Get the HTTP upgrade handler UpgradeToken upgradeToken = processor.getUpgradeToken(); // Retrieve leftover input ByteBuffer leftOverInput = processor.getLeftoverInput(); if (upgradeToken == null) { // Assume direct HTTP/2 connection UpgradeProtocol upgradeProtocol = getProtocol().getUpgradeProtocol("h2c"); if (upgradeProtocol != null) { processor = upgradeProtocol.getProcessor( wrapper, getProtocol().getAdapter()); wrapper.unRead(leftOverInput); // Associate with the processor with the connection connections.put(socket, processor); } else { if (getLog().isDebugEnabled()) { getLog().debug(sm.getString( "abstractConnectionHandler.negotiatedProcessor.fail", "h2c")); } return SocketState.CLOSED; } } else { HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); // Release the Http11 processor to be re-used release(processor); // Create the upgrade processor processor = getProtocol().createUpgradeProcessor(wrapper, upgradeToken); if (getLog().isDebugEnabled()) { getLog().debug(sm.getString("abstractConnectionHandler.upgradeCreate", processor, wrapper)); } wrapper.unRead(leftOverInput); // Mark the connection as upgraded wrapper.setUpgraded(true); // Associate with the processor with the connection connections.put(socket, processor); // Initialise the upgrade handler (which may trigger // some IO using the new protocol which is why the lines // above are necessary) // This cast should be safe. If it fails the error // handling for the surrounding try/catch will deal with // it. if (upgradeToken.getInstanceManager() == null) { httpUpgradeHandler.init((WebConnection) processor); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.init((WebConnection) processor); } finally { upgradeToken.getContextBind().unbind(false, oldCL); } } } } } while ( state == SocketState.UPGRADING); if (state == SocketState.LONG) { // In the middle of processing a request/response. Keep the // socket associated with the processor. Exact requirements // depend on type of long poll longPoll(wrapper, processor); if (processor.isAsync()) { getProtocol().addWaitingProcessor(processor); } } else if (state == SocketState.OPEN) { // In keep-alive but between requests. OK to recycle // processor. Continue to poll for the next request. connections.remove(socket); release(processor); wrapper.registerReadInterest(); } else if (state == SocketState.SENDFILE) { // Sendfile in progress. If it fails, the socket will be // closed. If it works, the socket either be added to the // poller (or equivalent) to await more data or processed // if there are any pipe-lined requests remaining. } else if (state == SocketState.UPGRADED) { // Don't add sockets back to the poller if this was a // non-blocking write otherwise the poller may trigger // multiple read events which may lead to thread starvation // in the connector. The write() method will add this socket // to the poller if necessary. if (status != SocketEvent.OPEN_WRITE) { longPoll(wrapper, processor); } } else if (state == SocketState.SUSPENDED) { // Don't add sockets back to the poller. // The resumeProcessing() method will add this socket // to the poller. } else { // Connection closed. OK to recycle the processor. Upgrade // processors are not recycled. connections.remove(socket); if (processor.isUpgrade()) { UpgradeToken upgradeToken = processor.getUpgradeToken(); HttpUpgradeHandler httpUpgradeHandler = upgradeToken.getHttpUpgradeHandler(); InstanceManager instanceManager = upgradeToken.getInstanceManager(); if (instanceManager == null) { httpUpgradeHandler.destroy(); } else { ClassLoader oldCL = upgradeToken.getContextBind().bind(false, null); try { httpUpgradeHandler.destroy(); } finally { try { instanceManager.destroyInstance(httpUpgradeHandler); } catch (Throwable e) { ExceptionUtils.handleThrowable(e); getLog().error(sm.getString("abstractConnectionHandler.error"), e); } upgradeToken.getContextBind().unbind(false, oldCL); } } } else { release(processor); } } // 將處理的狀態,返回給總控 Processor, 以便決定是否close socket return state; } catch(java.net.SocketException e) { // SocketExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.socketexception.debug"), e); } catch (java.io.IOException e) { // IOExceptions are normal getLog().debug(sm.getString( "abstractConnectionHandler.ioexception.debug"), e); } catch (ProtocolException e) { // Protocol exceptions normally mean the client sent invalid or // incomplete data. getLog().debug(sm.getString( "abstractConnectionHandler.protocolexception.debug"), e); } // Future developers: if you discover any other // rare-but-nonfatal exceptions, catch them here, and log as // above. catch (Throwable e) { ExceptionUtils.handleThrowable(e); // any other exception or error is odd. Here we log it // with "ERROR" level, so it will show up even on // less-than-verbose logs. getLog().error(sm.getString("abstractConnectionHandler.error"), e); } finally { ContainerThreadMarker.clear(); } // Make sure socket/processor is removed from the list of current // connections connections.remove(socket); release(processor); return SocketState.CLOSED; } // org.apache.coyote.AbstractProcessorLight#process // Http11Processor 繼承該 AbstractProcessorLight, 使用模板方法模式處理細節不同點 @Override public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status) throws IOException { SocketState state = SocketState.CLOSED; Iterator<DispatchType> dispatches = null; do { if (dispatches != null) { DispatchType nextDispatch = dispatches.next(); state = dispatch(nextDispatch.getSocketStatus()); } else if (status == SocketEvent.DISCONNECT) { // Do nothing here, just wait for it to get recycled } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) { state = dispatch(status); if (state == SocketState.OPEN) { // There may be pipe-lined data to read. If the data isn't // processed now, execution will exit this loop and call // release() which will recycle the processor (and input // buffer) deleting any pipe-lined data. To avoid this, // process it now. state = service(socketWrapper); } } else if (status == SocketEvent.OPEN_WRITE) { // Extra write event likely after async, ignore state = SocketState.LONG; } else if (status == SocketEvent.OPEN_READ){ // 普通的http請求,將會被處理為 OPEN_READ state = service(socketWrapper); } else { // Default to closing the socket if the SocketEvent passed in // is not consistent with the current state of the Processor state = SocketState.CLOSED; } if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], Status in: [" + status + "], State out: [" + state + "]"); } if (state != SocketState.CLOSED && isAsync()) { state = asyncPostProcess(); if (getLog().isDebugEnabled()) { getLog().debug("Socket: [" + socketWrapper + "], State after async post processing: [" + state + "]"); } } if (dispatches == null || !dispatches.hasNext()) { // Only returns non-null iterator if there are // dispatches to process. dispatches = getIteratorAndClearDispatches(); } // 循環處理請求,直接狀態是 CLOSE, 或者異步結束 } while (state == SocketState.ASYNC_END || dispatches != null && state != SocketState.CLOSED); return state; } // org.apache.coyote.http11.Http11Processor#service // 具體的協議處理方法,將返回處理結果狀態,決定是否關閉 socket @Override public SocketState service(SocketWrapperBase<?> socketWrapper) throws IOException { // 由 RequestInfo 來管理整個處理的生命周期 // STAGE_PARSE -> STAGE_PREPARE -> STAGE_SERVICE -> STAGE_ENDINPUT -> STAGE_ENDOUTPUT -> STAGE_KEEPALIVE -> STAGE_ENDED RequestInfo rp = request.getRequestProcessor(); rp.setStage(org.apache.coyote.Constants.STAGE_PARSE); // Setting up the I/O setSocketWrapper(socketWrapper); inputBuffer.init(socketWrapper); outputBuffer.init(socketWrapper); // Flags keepAlive = true; openSocket = false; readComplete = true; boolean keptAlive = false; SendfileState sendfileState = SendfileState.DONE; // 此處會循環讀取 inputStream 進行處理,如果只是一次 http 請求,則第一次處理完成之后,第二次將會產生 IOException // 從而觸發socket的關閉過程 while (!getErrorState().isError() && keepAlive && !isAsync() && upgradeToken == null && sendfileState == SendfileState.DONE && !endpoint.isPaused()) { // Parsing the request header try { if (!inputBuffer.parseRequestLine(keptAlive)) { if (inputBuffer.getParsingRequestLinePhase() == -1) { return SocketState.UPGRADING; } else if (handleIncompleteRequestLineRead()) { break; } } if (endpoint.isPaused()) { // 503 - Service unavailable response.setStatus(503); setErrorState(ErrorState.CLOSE_CLEAN, null); } else { keptAlive = true; // Set this every time in case limit has been changed via JMX request.getMimeHeaders().setLimit(endpoint.getMaxHeaderCount()); if (!inputBuffer.parseHeaders()) { // We've read part of the request, don't recycle it // instead associate it with the socket openSocket = true; readComplete = false; break; } if (!disableUploadTimeout) { socketWrapper.setReadTimeout(connectionUploadTimeout); } } } catch (IOException e) { if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.header.parse"), e); } setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); break; } catch (Throwable t) { ExceptionUtils.handleThrowable(t); UserDataHelper.Mode logMode = userDataHelper.getNextMode(); if (logMode != null) { String message = sm.getString("http11processor.header.parse"); switch (logMode) { case INFO_THEN_DEBUG: message += sm.getString("http11processor.fallToDebug"); //$FALL-THROUGH$ case INFO: log.info(message, t); break; case DEBUG: log.debug(message, t); } } // 400 - Bad Request response.setStatus(400); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } // Has an upgrade been requested? Enumeration<String> connectionValues = request.getMimeHeaders().values("Connection"); boolean foundUpgrade = false; while (connectionValues.hasMoreElements() && !foundUpgrade) { foundUpgrade = connectionValues.nextElement().toLowerCase( Locale.ENGLISH).contains("upgrade"); } if (foundUpgrade) { // Check the protocol String requestedProtocol = request.getHeader("Upgrade"); UpgradeProtocol upgradeProtocol = httpUpgradeProtocols.get(requestedProtocol); if (upgradeProtocol != null) { if (upgradeProtocol.accept(request)) { // TODO Figure out how to handle request bodies at this // point. response.setStatus(HttpServletResponse.SC_SWITCHING_PROTOCOLS); response.setHeader("Connection", "Upgrade"); response.setHeader("Upgrade", requestedProtocol); action(ActionCode.CLOSE, null); getAdapter().log(request, response, 0); InternalHttpUpgradeHandler upgradeHandler = upgradeProtocol.getInternalUpgradeHandler( getAdapter(), cloneRequest(request)); UpgradeToken upgradeToken = new UpgradeToken(upgradeHandler, null, null); action(ActionCode.UPGRADE, upgradeToken); return SocketState.UPGRADING; } } } if (!getErrorState().isError()) { // Setting up filters, and parse some request headers rp.setStage(org.apache.coyote.Constants.STAGE_PREPARE); try { prepareRequest(); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); if (log.isDebugEnabled()) { log.debug(sm.getString("http11processor.request.prepare"), t); } // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } } if (maxKeepAliveRequests == 1) { keepAlive = false; } else if (maxKeepAliveRequests > 0 && socketWrapper.decrementKeepAlive() <= 0) { keepAlive = false; } // Process the request in the adapter if (!getErrorState().isError()) { try { rp.setStage(org.apache.coyote.Constants.STAGE_SERVICE); // 使用適配器處理 request, 並忽略其返回狀態值, 此處真正的協議處理服務,后續就是許多的 filterChain 處理了 getAdapter().service(request, response); // Handle when the response was committed before a serious // error occurred. Throwing a ServletException should both // set the status to 500 and set the errorException. // If we fail here, then the response is likely already // committed, so we can't try and set headers. if(keepAlive && !getErrorState().isError() && !isAsync() && statusDropsConnection(response.getStatus())) { setErrorState(ErrorState.CLOSE_CLEAN, null); } } catch (InterruptedIOException e) { setErrorState(ErrorState.CLOSE_CONNECTION_NOW, e); } catch (HeadersTooLargeException e) { log.error(sm.getString("http11processor.request.process"), e); // The response should not have been committed but check it // anyway to be safe if (response.isCommitted()) { setErrorState(ErrorState.CLOSE_NOW, e); } else { response.reset(); response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, e); response.setHeader("Connection", "close"); // TODO: Remove } } catch (Throwable t) { ExceptionUtils.handleThrowable(t); log.error(sm.getString("http11processor.request.process"), t); // 500 - Internal Server Error response.setStatus(500); setErrorState(ErrorState.CLOSE_CLEAN, t); getAdapter().log(request, response, 0); } } // Finish the handling of the request rp.setStage(org.apache.coyote.Constants.STAGE_ENDINPUT); if (!isAsync()) { // If this is an async request then the request ends when it has // been completed. The AsyncContext is responsible for calling // endRequest() in that case. // 非異步請求,則處理 input 上下文 // 同時處理output endRequest(); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDOUTPUT); // If there was an error, make sure the request is counted as // and error, and update the statistics counter if (getErrorState().isError()) { response.setStatus(500); } if (!isAsync() || getErrorState().isError()) { request.updateCounters(); // 更換request,response 為空,以便proccor池安全復用 if (getErrorState().isIoAllowed()) { inputBuffer.nextRequest(); outputBuffer.nextRequest(); } } if (!disableUploadTimeout) { int soTimeout = endpoint.getConnectionTimeout(); if(soTimeout > 0) { socketWrapper.setReadTimeout(soTimeout); } else { socketWrapper.setReadTimeout(0); } } rp.setStage(org.apache.coyote.Constants.STAGE_KEEPALIVE); // 正常處理完成一次請求,接着會進入下一次處理流程,一般會以 IOException 結束 sendfileState = processSendfile(socketWrapper); } rp.setStage(org.apache.coyote.Constants.STAGE_ENDED); // 所以,普通http請求,一般都會得到一次 CLOSE_CONNECTION_NOW 的狀態,觸發立即關閉 socket if (getErrorState().isError() || endpoint.isPaused()) { return SocketState.CLOSED; } else if (isAsync()) { return SocketState.LONG; } else if (isUpgrade()) { return SocketState.UPGRADING; } else { if (sendfileState == SendfileState.PENDING) { return SocketState.SENDFILE; } else { if (openSocket) { if (readComplete) { return SocketState.OPEN; } else { return SocketState.LONG; } } else { return SocketState.CLOSED; } } } }
當把網絡請求接入進來后,先會經歷一系列的filterChain處理,然后其中某個Filter就會解析具體參數。
對於文件處理,我們從對 multipart/form-data 的分支處理開始觀察:
// org.apache.catalina.connector.Request#parseParameters /** * Parse request parameters. */ protected void parseParameters() { parametersParsed = true; Parameters parameters = coyoteRequest.getParameters(); boolean success = false; try { ... String contentType = getContentType(); if (contentType == null) { contentType = ""; } int semicolon = contentType.indexOf(';'); if (semicolon >= 0) { contentType = contentType.substring(0, semicolon).trim(); } else { contentType = contentType.trim(); } // 如果是 multipart 類型數據,解析每個 part 數據 if ("multipart/form-data".equals(contentType)) { parseParts(false); success = true; return; } ... success = true; } finally { if (!success) { parameters.setParseFailedReason(FailReason.UNKNOWN); } } } // 對 mulipart 的處理實現 // org.apache.catalina.connector.Request#parseParts private void parseParts(boolean explicit) { // Return immediately if the parts have already been parsed if (parts != null || partsParseException != null) { return; } ... boolean success = false; // Create a new file upload handler DiskFileItemFactory factory = new DiskFileItemFactory(); try { factory.setRepository(location.getCanonicalFile()); } catch (IOException ioe) { parameters.setParseFailedReason(FailReason.IO_ERROR); partsParseException = ioe; return; } factory.setSizeThreshold(mce.getFileSizeThreshold()); // 使用 ServletFileUplad 組件去解析文件信息 ServletFileUpload upload = new ServletFileUpload(); upload.setFileItemFactory(factory); upload.setFileSizeMax(mce.getMaxFileSize()); upload.setSizeMax(mce.getMaxRequestSize()); parts = new ArrayList<>(); try { // step1. 使用 ServletFileUpload 解析參數為 FileItem List<FileItem> items = upload.parseRequest(new ServletRequestContext(this)); int maxPostSize = getConnector().getMaxPostSize(); int postSize = 0; Charset charset = getCharset(); // step2. 將解析出的 FileItem 包裝為 ApplicationPart, 添加到當前Request的 parts 容器中 for (FileItem item : items) { ApplicationPart part = new ApplicationPart(item, location); parts.add(part); // 文件名處理 if (part.getSubmittedFileName() == null) { String name = part.getName(); String value = null; try { value = part.getString(charset.name()); } catch (UnsupportedEncodingException uee) { // Not possible } if (maxPostSize >= 0) { // Have to calculate equivalent size. Not completely // accurate but close enough. postSize += name.getBytes(charset).length; if (value != null) { // Equals sign postSize++; // Value length postSize += part.getSize(); } // Value separator postSize++; if (postSize > maxPostSize) { parameters.setParseFailedReason(FailReason.POST_TOO_LARGE); throw new IllegalStateException(sm.getString( "coyoteRequest.maxPostSizeExceeded")); } } parameters.addParameter(name, value); } } success = true; } ... } finally { // This might look odd but is correct. setParseFailedReason() only // sets the failure reason if none is currently set. This code could // be more efficient but it is written this way to be robust with // respect to changes in the remainder of the method. if (partsParseException != null || !success) { parameters.setParseFailedReason(FailReason.UNKNOWN); } } }
以上就是對文件上傳的大概處理邏輯: 1. 使用 ServletFileUpload 解析參數為 List<FileItem>; 2. 使用 ApplicationPart 包裝FileItem, 放到 Request的parts字段中,以備后續使用。可見,對文件的解析是在 ServletFileUpload 中完成的。
// ServletFileUplad, 處理文件解析 // org.apache.tomcat.util.http.fileupload.FileUploadBase#parseRequest /** * Processes an <a href="http://www.ietf.org/rfc/rfc1867.txt">RFC 1867</a> * compliant <code>multipart/form-data</code> stream. * * @param ctx The context for the request to be parsed. * * @return A list of <code>FileItem</code> instances parsed from the * request, in the order that they were transmitted. * * @throws FileUploadException if there are problems reading/parsing * the request or storing files. */ public List<FileItem> parseRequest(RequestContext ctx) throws FileUploadException { List<FileItem> items = new ArrayList<>(); boolean successful = false; try { // 依次迭代各part數據 FileItemIterator iter = getItemIterator(ctx); FileItemFactory fac = getFileItemFactory(); if (fac == null) { throw new NullPointerException("No FileItemFactory has been set."); } while (iter.hasNext()) { // 每進行一次迭代,就會創建一個 FileItemStreamImpl, 創建一個新的 InputStream final FileItemStream item = iter.next(); // Don't use getName() here to prevent an InvalidFileNameException. final String fileName = ((FileItemIteratorImpl.FileItemStreamImpl) item).name; // 為每個key 創建一個FileItem,用於輸出數據流 FileItem fileItem = fac.createItem(item.getFieldName(), item.getContentType(), item.isFormField(), fileName); items.add(fileItem); try { // 將socket中的數據流,寫入到 fileItem 創建的臨時文件中,達到框架上傳的目的 // fileItem.getOutputStream() 會創建臨時文件 // item.openStream() 會使用網絡io流作為數據來源,當讀取到 -1, 認為輸入結束了 // 最終會將所有單個part的數據全部寫入當前的臨時文件中 Streams.copy(item.openStream(), fileItem.getOutputStream(), true); } catch (FileUploadIOException e) { throw (FileUploadException) e.getCause(); } catch (IOException e) { throw new IOFileUploadException(String.format("Processing of %s request failed. %s", MULTIPART_FORM_DATA, e.getMessage()), e); } final FileItemHeaders fih = item.getHeaders(); fileItem.setHeaders(fih); } successful = true; return items; } catch (FileUploadIOException e) { throw (FileUploadException) e.getCause(); } catch (IOException e) { throw new FileUploadException(e.getMessage(), e); } finally { if (!successful) { // 如果部分失敗,則全部將已上傳部分刪除 for (FileItem fileItem : items) { try { fileItem.delete(); } catch (Exception ignored) { // ignored TODO perhaps add to tracker delete failure list somehow? } } } } }
以上就是整個文件的上傳框架解析過程了。大概步驟就是:
1. 基於 boundary, 使用迭代器模式依次創建InputStream();
2. 每次失敗創建一個 DiskFileItem 實例,用於存放讀取出的數據流;
3. 將網絡InputStream 寫入到Disk的臨時文件中;
4. 將各DiskFileItem作為輸入參數信息一起返回給應用;
下面,我們來看看它是如何基於 boundary 進行迭代 FileItemIteratorImpl 的吧:
// 1. 單個文件流的迭代准備工作 // FileItemIteratorImpl 會解析出content-Type, boundary 等信息,為后續迭代准備 // org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl#FileItemIteratorImpl /** * Creates a new instance. * * @param ctx The request context. * @throws FileUploadException An error occurred while * parsing the request. * @throws IOException An I/O error occurred. */ FileItemIteratorImpl(RequestContext ctx) throws FileUploadException, IOException { if (ctx == null) { throw new NullPointerException("ctx parameter"); } String contentType = ctx.getContentType(); if ((null == contentType) || (!contentType.toLowerCase(Locale.ENGLISH).startsWith(MULTIPART))) { throw new InvalidContentTypeException(String.format( "the request doesn't contain a %s or %s stream, content type header is %s", MULTIPART_FORM_DATA, MULTIPART_MIXED, contentType)); } final long requestSize = ((UploadContext) ctx).contentLength(); InputStream input; // N.B. this is eventually closed in MultipartStream processing if (sizeMax >= 0) { if (requestSize != -1 && requestSize > sizeMax) { throw new SizeLimitExceededException(String.format( "the request was rejected because its size (%s) exceeds the configured maximum (%s)", Long.valueOf(requestSize), Long.valueOf(sizeMax)), requestSize, sizeMax); } // N.B. this is eventually closed in MultipartStream processing input = new LimitedInputStream(ctx.getInputStream(), sizeMax) { @Override protected void raiseError(long pSizeMax, long pCount) throws IOException { FileUploadException ex = new SizeLimitExceededException( String.format("the request was rejected because its size (%s) exceeds the configured maximum (%s)", Long.valueOf(pCount), Long.valueOf(pSizeMax)), pCount, pSizeMax); throw new FileUploadIOException(ex); } }; } else { // 從 ServletRequestContext 中獲取inputStream input = ctx.getInputStream(); } String charEncoding = headerEncoding; if (charEncoding == null) { charEncoding = ctx.getCharacterEncoding(); } // 解析 boundary 參數,如: ----WebKitFormBoundarydyG19jnnVWC9U1zY boundary = getBoundary(contentType); if (boundary == null) { IOUtils.closeQuietly(input); // avoid possible resource leak throw new FileUploadException("the request was rejected because no multipart boundary was found"); } notifier = new MultipartStream.ProgressNotifier(listener, requestSize); try { multi = new MultipartStream(input, boundary, notifier); } catch (IllegalArgumentException iae) { IOUtils.closeQuietly(input); // avoid possible resource leak throw new InvalidContentTypeException( String.format("The boundary specified in the %s header is too long", CONTENT_TYPE), iae); } multi.setHeaderEncoding(charEncoding); skipPreamble = true; // 同樣會初始化第一個可被迭代的對象,后續的初始化動作則是由 hasNext() 觸發。 findNextItem(); } // 在迭代輸入數據時,會調用迭代方法 FileItemIteratorImpl next(); 讀取數據 // org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl#hasNext /** * Returns, whether another instance of {@link FileItemStream} * is available. * * @throws FileUploadException Parsing or processing the * file item failed. * @throws IOException Reading the file item failed. * @return True, if one or more additional file items * are available, otherwise false. */ @Override public boolean hasNext() throws FileUploadException, IOException { if (eof) { return false; } if (itemValid) { return true; } try { // 迭代出下一個part文件 return findNextItem(); } catch (FileUploadIOException e) { // unwrap encapsulated SizeException throw (FileUploadException) e.getCause(); } } // org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl#findNextItem /** * Called for finding the next item, if any. * * @return True, if an next item was found, otherwise false. * @throws IOException An I/O error occurred. */ private boolean findNextItem() throws IOException { if (eof) { return false; } if (currentItem != null) { currentItem.close(); currentItem = null; } for (;;) { boolean nextPart; // 首次讀取時,使用新的 boundary 跳過 // 后續則基於首個連續換行符進行數據獲取, \r\n\r\n if (skipPreamble) { nextPart = multi.skipPreamble(); } else { nextPart = multi.readBoundary(); } // 如果沒有讀取到更多的part, 則返回 if (!nextPart) { // 讀取第一個沒有 boundary 的數據,再搜索一次,如果還是沒有 boundary, 則認為數據已結束 if (currentFieldName == null) { // Outer multipart terminated -> No more data eof = true; return false; } // Inner multipart terminated -> Return to parsing the outer multi.setBoundary(boundary); // 將當前字段信息置空,下次如果再讀取不到 boundary, 則讀取結束 currentFieldName = null; continue; } // 以下有更多的 part 輸入 FileItemHeaders headers = getParsedHeaders(multi.readHeaders()); if (currentFieldName == null) { // We're parsing the outer multipart // 如: file, file2 ... String fieldName = getFieldName(headers); if (fieldName != null) { String subContentType = headers.getHeader(CONTENT_TYPE); if (subContentType != null && subContentType.toLowerCase(Locale.ENGLISH) .startsWith(MULTIPART_MIXED)) { currentFieldName = fieldName; // Multiple files associated with this field name byte[] subBoundary = getBoundary(subContentType); multi.setBoundary(subBoundary); skipPreamble = true; continue; } // 獲取文件名稱從 header 中 // Content-Disposition: form-data; name="file2"; filename="123-2.txt" String fileName = getFileName(headers); // 創建 FileItemStreamImpl, 以備后續迭代輸出,其構造方法將會創建 stream 實例 // FileItemStreamImpl 是迭代器的一個內部類,共享 multi 對象 // FileItemStreamImpl 會將網絡io流封裝為單個可讀取的inputStream, 備用 currentItem = new FileItemStreamImpl(fileName, fieldName, headers.getHeader(CONTENT_TYPE), fileName == null, getContentLength(headers)); currentItem.setHeaders(headers); notifier.noteItem(); itemValid = true; return true; } } else { String fileName = getFileName(headers); if (fileName != null) { currentItem = new FileItemStreamImpl(fileName, currentFieldName, headers.getHeader(CONTENT_TYPE), false, getContentLength(headers)); currentItem.setHeaders(headers); notifier.noteItem(); itemValid = true; return true; } } // 其他情況,直接丟棄當前 body 數據 multi.discardBodyData(); } } // org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl.FileItemStreamImpl#FileItemStreamImpl /** * Creates a new instance. * * @param pName The items file name, or null. * @param pFieldName The items field name. * @param pContentType The items content type, or null. * @param pFormField Whether the item is a form field. * @param pContentLength The items content length, if known, or -1 * @throws IOException Creating the file item failed. */ FileItemStreamImpl(String pName, String pFieldName, String pContentType, boolean pFormField, long pContentLength) throws IOException { name = pName; fieldName = pFieldName; contentType = pContentType; formField = pFormField; // 創建一個inputStream 的流,備讀 // 讀到哪里算是結束呢? 當 read() 返回-1時,認為輸入結束了 final ItemInputStream itemStream = multi.newInputStream(); InputStream istream = itemStream; if (fileSizeMax != -1) { if (pContentLength != -1 && pContentLength > fileSizeMax) { FileSizeLimitExceededException e = new FileSizeLimitExceededException( String.format("The field %s exceeds its maximum permitted size of %s bytes.", fieldName, Long.valueOf(fileSizeMax)), pContentLength, fileSizeMax); e.setFileName(pName); e.setFieldName(pFieldName); throw new FileUploadIOException(e); } // 限制最大大小,該值用 spring.http.multipart.max-file-size 配置, 但往往還會同時有一個參數需要配置: spring.http.multipart.max-request-size, // 因為當文件很大時,請求也會非常大,所以一般會要求同時設置這兩個參數,否則都會拋出超出最大允許大小限制異常 // 使用 LimitedInputStream 包裝提供的讀寫大小判定功能,原始istream接收網絡io輸入 istream = new LimitedInputStream(istream, fileSizeMax) { @Override protected void raiseError(long pSizeMax, long pCount) throws IOException { itemStream.close(true); FileSizeLimitExceededException e = new FileSizeLimitExceededException( String.format("The field %s exceeds its maximum permitted size of %s bytes.", fieldName, Long.valueOf(pSizeMax)), pCount, pSizeMax); e.setFieldName(fieldName); e.setFileName(name); throw new FileUploadIOException(e); } }; } stream = istream; } // 為每個part創建一個 DiskFileItem, 用於存儲網絡io的文件流,備用 // org.apache.tomcat.util.http.fileupload.disk.DiskFileItemFactory#createItem /** * Create a new {@link DiskFileItem} * instance from the supplied parameters and the local factory * configuration. * * @param fieldName The name of the form field. * @param contentType The content type of the form field. * @param isFormField <code>true</code> if this is a plain form field; * <code>false</code> otherwise. * @param fileName The name of the uploaded file, if any, as supplied * by the browser or other client. * * @return The newly created file item. */ @Override public FileItem createItem(String fieldName, String contentType, boolean isFormField, String fileName) { DiskFileItem result = new DiskFileItem(fieldName, contentType, isFormField, fileName, sizeThreshold, repository); result.setDefaultCharset(defaultCharset); return result; } // next() 進行迭代, 使用 itemValid 來控制單次迭代使用 /** * Returns the next available {@link FileItemStream}. * * @throws java.util.NoSuchElementException No more items are * available. Use {@link #hasNext()} to prevent this exception. * @throws FileUploadException Parsing or processing the * file item failed. * @throws IOException Reading the file item failed. * @return FileItemStream instance, which provides * access to the next file item. */ @Override public FileItemStream next() throws FileUploadException, IOException { if (eof || (!itemValid && !hasNext())) { throw new NoSuchElementException(); } itemValid = false; return currentItem; }
// 2. 從網絡io流到臨時文件流的寫入 // 主要分為三步:網絡io流的獲取;本地文件輸出流的獲取;流的對接; // 網絡io流即是在創建 FileItemStreamImpl 時生成的 stream; // org.apache.tomcat.util.http.fileupload.FileUploadBase.FileItemIteratorImpl.FileItemStreamImpl#openStream /** * Returns an input stream, which may be used to * read the items contents. * * @return Opened input stream. * @throws IOException An I/O error occurred. */ @Override public InputStream openStream() throws IOException { if (((Closeable) stream).isClosed()) { throw new FileItemStream.ItemSkippedException(); } return stream; } // 而本地文件輸出流則是一個本地臨時文件,用於對接任意大小的輸入流 // org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#getOutputStream /** * Returns an {@link java.io.OutputStream OutputStream} that can * be used for storing the contents of the file. * * @return An {@link java.io.OutputStream OutputStream} that can be used * for storing the contents of the file. * * @throws IOException if an error occurs. */ @Override public OutputStream getOutputStream() throws IOException { if (dfos == null) { // 創建臨時文件輸出 File outputFile = getTempFile(); // 使用 DeferredFileOutputStream 包裝臨時文件, dfos = new DeferredFileOutputStream(sizeThreshold, outputFile); } return dfos; } // org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#getTempFile /** * Creates and returns a {@link java.io.File File} representing a uniquely * named temporary file in the configured repository path. The lifetime of * the file is tied to the lifetime of the <code>FileItem</code> instance; * the file will be deleted when the instance is garbage collected. * <p> * <b>Note: Subclasses that override this method must ensure that they return the * same File each time.</b> * * @return The {@link java.io.File File} to be used for temporary storage. */ protected File getTempFile() { if (tempFile == null) { File tempDir = repository; if (tempDir == null) { tempDir = new File(System.getProperty("java.io.tmpdir")); } // uid 同進程相同, getUniqueId() 會返回一個自增的id, 保證進程唯一,加上 uid 后,就可以確定臨時文件唯一了 String tempFileName = String.format("upload_%s_%s.tmp", UID, getUniqueId()); tempFile = new File(tempDir, tempFileName); } return tempFile; } /** * Returns an identifier that is unique within the class loader used to * load this class, but does not have random-like appearance. * * @return A String with the non-random looking instance identifier. */ private static String getUniqueId() { final int limit = 100000000; int current = COUNTER.getAndIncrement(); String id = Integer.toString(current); // If you manage to get more than 100 million of ids, you'll // start getting ids longer than 8 characters. if (current < limit) { id = ("00000000" + id).substring(id.length()); } return id; } // 最后,將網絡io流對接到臨時文件流中,完成數據的接收 // org.apache.tomcat.util.http.fileupload.util.Streams#copy /** * Copies the contents of the given {@link InputStream} * to the given {@link OutputStream}. Shortcut for * <pre> * copy(pInputStream, pOutputStream, new byte[8192]); * </pre> * * @param inputStream The input stream, which is being read. * It is guaranteed, that {@link InputStream#close()} is called * on the stream. * @param outputStream The output stream, to which data should * be written. May be null, in which case the input streams * contents are simply discarded. * @param closeOutputStream True guarantees, that {@link OutputStream#close()} * is called on the stream. False indicates, that only * {@link OutputStream#flush()} should be called finally. * * @return Number of bytes, which have been copied. * @throws IOException An I/O error occurred. */ public static long copy(InputStream inputStream, OutputStream outputStream, boolean closeOutputStream) throws IOException { // 8096 return copy(inputStream, outputStream, closeOutputStream, new byte[DEFAULT_BUFFER_SIZE]); } /** * Copies the contents of the given {@link InputStream} * to the given {@link OutputStream}. * * @param inputStream The input stream, which is being read. * It is guaranteed, that {@link InputStream#close()} is called * on the stream. * @param outputStream The output stream, to which data should * be written. May be null, in which case the input streams * contents are simply discarded. * @param closeOutputStream True guarantees, that {@link OutputStream#close()} * is called on the stream. False indicates, that only * {@link OutputStream#flush()} should be called finally. * @param buffer Temporary buffer, which is to be used for * copying data. * @return Number of bytes, which have been copied. * @throws IOException An I/O error occurred. */ public static long copy(InputStream inputStream, OutputStream outputStream, boolean closeOutputStream, byte[] buffer) throws IOException { OutputStream out = outputStream; InputStream in = inputStream; try { long total = 0; for (;;) { // 阻塞式獲取文件數據流,寫入到臨時文件中 // 所以,如果我們上傳超大文件時,實際上有相當大部分的時間,只是框架和客戶端在交互,應用代碼則不會感知到 // 直到所有上傳已完成 int res = in.read(buffer); if (res == -1) { break; } if (res > 0) { total += res; if (out != null) { out.write(buffer, 0, res); } } } if (out != null) { // 關閉輸出流,即關閉臨時文件實例 if (closeOutputStream) { out.close(); } else { out.flush(); } out = null; } in.close(); in = null; return total; } finally { IOUtils.closeQuietly(in); if (closeOutputStream) { IOUtils.closeQuietly(out); } } }
總結:主要點在於 boundary 的解析處理,難點在於io流的對接。包括接收請求io流,以及在響應客戶端的os.flush(), 都是在做復雜的io操作。
5. 使用過后的清理工作?
外部請求的文件上傳,一般都會使用臨時文件進行接收,比如上面的實現。雖說是臨時文件,但如果不做清理,每次上傳都生成一個臨時文件,則必然占用大量磁盤空間,遲早得耗盡資源。這不是一個好的框架該做的事。
比如:spring框架中,DispatcherServlet會在使用完成 MultipartFile 后,會主動做一些清理操作。
// org.springframework.web.servlet.DispatcherServlet#doDispatch /** * Process the actual dispatching to the handler. * <p>The handler will be obtained by applying the servlet's HandlerMappings in order. * The HandlerAdapter will be obtained by querying the servlet's installed HandlerAdapters * to find the first that supports the handler class. * <p>All HTTP methods are handled by this method. It's up to HandlerAdapters or handlers * themselves to decide which methods are acceptable. * @param request current HTTP request * @param response current HTTP response * @throws Exception in case of any kind of processing failure */ protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception { HttpServletRequest processedRequest = request; HandlerExecutionChain mappedHandler = null; boolean multipartRequestParsed = false; WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); try { ModelAndView mv = null; Exception dispatchException = null; try { // 如果是文件類型,則會用 StandardMultipartHttpServletRequest 再包裝一層的處理 processedRequest = checkMultipart(request); multipartRequestParsed = (processedRequest != request); // Determine handler for the current request. mappedHandler = getHandler(processedRequest); if (mappedHandler == null || mappedHandler.getHandler() == null) { noHandlerFound(processedRequest, response); return; } // Determine handler adapter for the current request. HandlerAdapter ha = getHandlerAdapter(mappedHandler.getHandler()); // Process last-modified header, if supported by the handler. String method = request.getMethod(); boolean isGet = "GET".equals(method); if (isGet || "HEAD".equals(method)) { long lastModified = ha.getLastModified(request, mappedHandler.getHandler()); if (logger.isDebugEnabled()) { logger.debug("Last-Modified value for [" + getRequestUri(request) + "] is: " + lastModified); } if (new ServletWebRequest(request, response).checkNotModified(lastModified) && isGet) { return; } } if (!mappedHandler.applyPreHandle(processedRequest, response)) { return; } // Actually invoke the handler. mv = ha.handle(processedRequest, response, mappedHandler.getHandler()); if (asyncManager.isConcurrentHandlingStarted()) { return; } applyDefaultViewName(processedRequest, mv); mappedHandler.applyPostHandle(processedRequest, response, mv); } catch (Exception ex) { dispatchException = ex; } catch (Throwable err) { // As of 4.3, we're processing Errors thrown from handler methods as well, // making them available for @ExceptionHandler methods and other scenarios. dispatchException = new NestedServletException("Handler dispatch failed", err); } processDispatchResult(processedRequest, response, mappedHandler, mv, dispatchException); } catch (Exception ex) { triggerAfterCompletion(processedRequest, response, mappedHandler, ex); } catch (Throwable err) { triggerAfterCompletion(processedRequest, response, mappedHandler, new NestedServletException("Handler processing failed", err)); } finally { if (asyncManager.isConcurrentHandlingStarted()) { // Instead of postHandle and afterCompletion if (mappedHandler != null) { mappedHandler.applyAfterConcurrentHandlingStarted(processedRequest, response); } } else { // Clean up any resources used by a multipart request. // 如果本次有解析文件,則會做清理操作 if (multipartRequestParsed) { cleanupMultipart(processedRequest); } } } } // org.springframework.web.servlet.DispatcherServlet#cleanupMultipart /** * Clean up any resources used by the given multipart request (if any). * @param request current HTTP request * @see MultipartResolver#cleanupMultipart */ protected void cleanupMultipart(HttpServletRequest request) { // 獲取 MultipartHttpServletRequest 實例 MultipartHttpServletRequest multipartRequest = WebUtils.getNativeRequest(request, MultipartHttpServletRequest.class); if (multipartRequest != null) { // 清理文件 this.multipartResolver.cleanupMultipart(multipartRequest); } } // org.springframework.web.multipart.support.StandardServletMultipartResolver#cleanupMultipart @Override public void cleanupMultipart(MultipartHttpServletRequest request) { if (!(request instanceof AbstractMultipartHttpServletRequest) || ((AbstractMultipartHttpServletRequest) request).isResolved()) { // To be on the safe side: explicitly delete the parts, // but only actual file parts (for Resin compatibility) try { for (Part part : request.getParts()) { if (request.getFile(part.getName()) != null) { // 調用各part部分的接口,delete() part.delete(); } } } catch (Throwable ex) { LogFactory.getLog(getClass()).warn("Failed to perform cleanup of multipart items", ex); } } } // org.apache.catalina.core.ApplicationPart#delete @Override public void delete() throws IOException { fileItem.delete(); } // org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#delete /** * Deletes the underlying storage for a file item, including deleting any * associated temporary disk file. Although this storage will be deleted * automatically when the <code>FileItem</code> instance is garbage * collected, this method can be used to ensure that this is done at an * earlier time, thus preserving system resources. */ @Override public void delete() { cachedContent = null; // 獲取臨時文件實例,如果存在則刪除 File outputFile = getStoreLocation(); if (outputFile != null && !isInMemory() && outputFile.exists()) { outputFile.delete(); } } // org.apache.tomcat.util.http.fileupload.disk.DiskFileItem#getStoreLocation /** * Returns the {@link java.io.File} object for the <code>FileItem</code>'s * data's temporary location on the disk. Note that for * <code>FileItem</code>s that have their data stored in memory, * this method will return <code>null</code>. When handling large * files, you can use {@link java.io.File#renameTo(java.io.File)} to * move the file to new location without copying the data, if the * source and destination locations reside within the same logical * volume. * * @return The data file, or <code>null</code> if the data is stored in * memory. */ public File getStoreLocation() { if (dfos == null) { return null; } if (isInMemory()) { return null; } return dfos.getFile(); }
大概思路就是:1. 判斷是否存在文件類型的上傳;2. 從request中取出ApplicationPart;3. 遍歷每個FileItem, 依次獲取文件信息刪除;
6. 斷點續傳
http1.1 中增加了標准准頭如:Range: bytes=0-2000 , Content-range: bytes 100-2000 用於請求服務器文件的部分內容,服務端只需按照要求取出相應的數據流返回即可。
這樣做的好處是,增大了客戶端的控制力。客戶端只要在必要的時候記錄這個偏移量,在發生網絡等故障,恢復后就可以從上次斷開的地方進行請求,從而緊接上次未完下載任務,實現斷點續傳。
我們以http協議作為出發點,講解了文件上傳下載的基本原理,以tomcat的處理過程為樣板看了對該協議的實現方式。實際上協議可以有很多,比如各im工具,各下載工具,一般都會有自己的一套傳輸協議。不能說一樣吧,但大方向應該是相通的。