消息服務框架使用案例之--大文件上傳(斷點續傳)功能
一、分塊上傳和斷點續傳原理
在我們的一個產品應用中,客戶需要上傳大量的文件到服務器,其中不乏很大的視頻文件。雖然可以使用FTP這樣成熟穩定的工具,但客戶表示不會使用FTP工具,並且我們產品也覺得客戶從我們軟件在切換到FTP用戶體驗不好,如果做成后台腳本調用FTP上傳那么進度信息很難呈現到我們軟件上。最終,決定我們自己做文件上傳功能。
大文件上傳受限於服務器每次處理數據的能力,不能一次傳輸完成,所以分塊上傳是必然的了,由於上傳時間可能較長,中途可能因為網絡或者人為原因終止上傳,所以還需要斷點上傳功能。
分塊上傳實際上是在客戶端分塊讀取文件,然后在服務器分塊寫入文件,每次讀寫記錄下讀寫的起始位置,也就是文件的偏移量,和要讀寫的數據長度。在上傳過程中,每完成一個文件數據塊的寫入,就向客戶端返回一次信息,客戶端據此進行下一文件數據塊的讀取。
斷點續傳功能也比較好實現,就是上傳過程中將文件在服務器寫為臨時文件,等全部寫完了(文件上傳完),將此臨時文件重命名為正式文件即可,如果中途上傳中斷過,下次上傳的時候根據當前臨時文件大小,作為在客戶端讀取文件的偏移量,從此位置繼續讀取文件數據塊,上傳到服務器從此偏移量繼續寫入文件即可。
二、消息服務框架實現文件上傳
假設我們將每一個文件數據塊看做一份“消息”,那么文件上傳本質上就是客戶端和服務器兩端頻繁的消息交互而已。消息服務框架(MSF)是一個集成了服務容器和消息訪問的框架,正好可以用來做文件上傳應用。具體做法就是在服務端,編寫一個“文件上傳服務”,在客戶端,編寫一個調用上傳服務的回調方法即可。
2.1,文件上傳服務端
新建一個MSF服務類:
public class FilesService : ServiceBase { }
然后添加一個處理上傳文件的方法:
/// <summary> /// 批量上傳文件(通過回調客戶端的方式,支持斷點續傳) /// </summary> /// <param name="list">文件列表</param> /// <returns></returns> public UploadResult UploadFiles(List<UploadFileInfos> list) { int uploadCount = 0; foreach (var uploadInfo in list) { string pathfile = string.Empty; try { pathfile = this.MapServerPath(uploadInfo.FilePath); if (!Directory.Exists(Path.GetDirectoryName(pathfile))) { Directory.CreateDirectory(Path.GetDirectoryName(pathfile)); } if (File.Exists(pathfile)) { FileInfo fi = new FileInfo(pathfile); if (fi.Length == uploadInfo.Size && fi.LastWriteTime == uploadInfo.FileModifyTime) { Console.WriteLine("文件 {0} {1}", pathfile, "已上傳,跳過"); continue;//文件已上傳,跳過 } else { fi.Delete(); } } //"斷點"上傳的文件 long offset = 0; //上傳的分部文件名稱增加一個文件長度數字,避免下次客戶端上傳的時候,修改了內容。 //如果文件上傳了一部分,的確修改了內容,那么原來上傳的部分文件就丟棄了。 string partFile = pathfile + uploadInfo.Size + ".part"; if (File.Exists(partFile)) { FileInfo fi = new FileInfo(partFile); offset = fi.Length; } while (offset < uploadInfo.Size) { uploadInfo.Offset = offset; uploadInfo.Length = MaxReadSize; if (uploadInfo.Offset + uploadInfo.Length > uploadInfo.Size) uploadInfo.Length = (int)(uploadInfo.Size - uploadInfo.Offset); //回調客戶端,通知上傳文件塊 var data = GetUploadFileData(uploadInfo); if (data.Length == 0) { //如果有長度為零的文件表示客戶讀取文件失敗,終止上傳操作 throw new Exception("讀取客戶端文件失敗(Length=0),終止上傳操作"); } if (data.Length != uploadInfo.Length) throw new Exception("網絡異常:上傳的文件流數據塊大小與預期的不一致"); //等待上次寫完 resetEvent.WaitOne(); //異步寫文件 System.Threading.Thread t = new System.Threading.Thread(new System.Threading.ParameterizedThreadStart(obj => { WriteFileInfo wfi = (WriteFileInfo)obj; CurWriteFile(wfi.FileName, wfi.WriteData, wfi.Offset); })); t.Start(new WriteFileInfo() { FileName = partFile, WriteData = data, Offset = offset }); offset += uploadInfo.Length; } resetEvent.WaitOne(); //重命名到正常文件名 File.Move(partFile, pathfile); System.IO.File.SetLastWriteTime(pathfile, uploadInfo.FileModifyTime); uploadCount++; resetEvent.Set(); } catch (Exception ex) { resetEvent.Set(); return new UploadResult() { Success = false, FilesCount = 0, Message = ex.Message }; }//end try } //end for return new UploadResult() { Success = true, FilesCount = list.Count }; }
在這個方法中,有一個重要方法,
//回調客戶端,通知上傳文件塊
var data = GetUploadFileData(uploadInfo);
它調用了MSF框架服務上下文的回調函數CallBackFunction,來讀取客戶端文件數據的,代碼如下:
private byte[] GetUploadFileData(UploadFileInfos fileinfo) { return base.CurrentContext.CallBackFunction<UploadFileInfos, byte[]>(fileinfo); }
另外,服務端寫文件的方法CurWriteFile 實現如下:
/// <summary> /// 將服務器端獲取到的字節流寫入文件 /// </summary> /// <param name="pReadByte">流</param> /// <param name="fileName">文件名</param> /// <param name="offset">要寫入文件的位置</param> public void CurWriteFile(string fileName, byte[] pReadByte, long offset) { FileStream pFileStream = null; try { pFileStream = new FileStream(fileName, FileMode.OpenOrCreate); pFileStream.Seek(offset, SeekOrigin.Begin); pFileStream.Write(pReadByte, 0, pReadByte.Length); } catch(Exception ex) { throw new Exception("寫文件塊失敗,寫入位置:"+offset+",文件名:"+fileName+",錯誤原因:"+ex.Message); } finally { if (pFileStream != null) pFileStream.Close(); resetEvent.Set(); } }
2.2,文件上傳客戶端
現在看文件上傳客戶端代碼,如何提供服務端需要的文件讀取回調函數:
ServiceRequest request = new ServiceRequest(); request.ServiceName = "FilesService"; request.MethodName = "UploadFiles"; request.Parameters = new object[] { infos }; Proxy srvProxy = new Proxy(); srvProxy.ServiceBaseUri = string.Format("net.tcp://{0}", serverHost); srvProxy.ErrorMessage += srvProxy_ErrorMessage; Task<UploadResult> result= srvProxy.RequestServiceAsync<UploadResult, UploadFileInfos, byte[]>(request, uploadingInfo => { //action委托方法顯示進度給客戶端 action(new UploadStateArg() { State = uploadingInfo.Offset + uploadingInfo.Length >= uploadingInfo.Size ? UploadState.Success: UploadState.Uploading, ProgressFile = uploadingInfo.FilePath, ProcessValue = Convert.ToInt32(uploadingInfo.Offset * 100 / uploadingInfo.Size), TotalProcessValue = Convert.ToInt32((uploadingInfo.UploadIndex +1) * 100 / index) }); Console.WriteLine(">>>Debug:Path:{0},FilePath:{1}",folder, uploadingInfo.FilePath); var fullName = Path.IsPathRooted(folder)? folder + uploadingInfo.FilePath : uploadingInfo.FilePath; Console.WriteLine(">>>服務器讀取客戶端文件:{0},偏移量:{1} 長度:{2}", fullName, uploadingInfo.Offset, uploadingInfo.Length); return ReadFileData(fullName, uploadingInfo.Offset, uploadingInfo.Length); } );
在上面的方法中, srvProxy.RequestServiceAsync泛型方法需要3個參數,第一個參數是服務的結果類型,第二個參數是提供給服務端回調方法(前面的base.CurrentContext.CallBackFunction方法)的參數,第三個參數是服務回調方法的結果。srvProxy.RequestServiceAsync 的回調方法的參數 uploadingInfo 是服務器推送過來的消息,里面包含了需要讀取的文件信息,包括文件名,偏移量,讀取長度等信息。
其中,客戶端讀取文件的方法 ReadFileData 實現如下:
/// <summary> /// 讀取文件返回字節流 /// </summary> /// <param name="fileName">文件路徑</param> /// <param name="offset">要讀取的文件流的位置</param> /// <param name="length">要讀取的文件塊大小</param> /// <returns></returns> private byte[] ReadFileData(string fileName, long offset, int length) { FileStream pFileStream = null; byte[] pReadByte = new byte[0]; try { pFileStream = new FileStream(fileName, FileMode.Open, FileAccess.Read); BinaryReader r = new BinaryReader(pFileStream); r.BaseStream.Seek(offset, SeekOrigin.Begin); pReadByte = r.ReadBytes(length); return pReadByte; } catch { return pReadByte; } finally { if (pFileStream != null) pFileStream.Close(); } }
這樣,在一次文件上傳的“請求-響應”過程中,MSF的服務端進行了多次回調客戶端的操作,客戶端根據服務端推送過來的參數信息來精確的讀取服務端需要的文件數據。一個支持斷點續傳的大文件上傳服務,使用MSF框架就做好了。
三、其它
本文使用到的其它相關服務端對象的代碼定義如下:
/// <summary> /// 上傳狀態枚舉 /// </summary> public enum UploadState { /// <summary> /// 上傳成功 /// </summary> Success, /// <summary> /// 上傳中 /// </summary> Uploading, /// <summary> /// 錯誤 /// </summary> Error } /// <summary> /// 上傳狀態參數 /// </summary> public class UploadStateArg { /// <summary> /// 上傳狀態 /// </summary> public UploadState State { get; set; } /// <summary> /// 上傳的文件名 /// </summary> public string ProgressFile { get; set; } /// <summary> /// 處理的消息,如果出錯,這里是錯誤消息 /// </summary> public string Message { get; set; } /// <summary> /// 處理進度(百分比) /// </summary> public int ProcessValue { get; set; } /// <summary> /// 總體處理進度(百分比) /// </summary> public int TotalProcessValue { get; set; } }
如果你不清楚如何使用MSF來實現本文的功能,請先閱讀下面的文章:
建議你讀完相關的其它兩篇文章:
“一切都是消息”--MSF(消息服務框架)之【請求-響應】模式
“一切都是消息”--MSF(消息服務框架)之【發布-訂閱】模式
讀完后,建議你再讀讀MSF的理論總結:
有關消息服務框架(MSF)更多的討論,請加我們QQ群討論,群號:18215717 ,加群口令:消息服務框架
