背景
我司在很久之前,一位很久之前的同事寫過一個文檔轉圖片的服務,具體業務如下:
- 用戶在客戶端上傳文檔,可以是ppt,word,pdf 等格式,用戶上傳完成可以在客戶端預覽上傳的文檔,預覽的時候采用的是圖片形式(不要和我說用別的方式預覽,現在已經來不及了)
- 當用戶把文檔上傳到雲端之后(阿里雲),把文檔相關的信息記錄在數據庫,然后等待轉碼完成
- 服務器有一個轉碼服務(其實就是一個windows service)不停的在輪訓待轉碼的數據,如果有待轉碼的數據,則從數據庫取出來,然后根據文檔的網絡地址下載到本地進行轉碼(轉成多張圖片)
- 當文檔轉碼完畢,把轉碼出來的圖片上傳到雲端,並把雲端圖片的信息記錄到數據庫
- 客戶端有預覽需求的時候,根據數據庫來判斷有沒有轉碼成功,如果成功,則獲取數據來顯示。
文檔預覽的整體過程如以上所說,老的轉碼服務現在什么問題呢?
- 由於一個文檔同時只能被一個線程進行轉碼操作,所以老的服務采用了把待轉碼數據划分管道的思想,一共有六個管道,映射到數據庫大體就是 Id=》管道ID 這個樣子。
- 一個控制台程序,根據配置文件信息,讀取某一個管道待轉碼的文檔,然后單線程進行轉碼操作
- 一共有六個管道,所以服務器上起了六個cmd的黑窗口……
- 有的時候個別文檔由於格式問題或者其他問題 轉碼過程中會卡住,具體的表現為:停止了轉碼操作。
- 如果程序卡住了,需要運維人員重新啟動轉碼cmd窗口(這種維護比較蛋疼)
后來機緣巧合,這個程序的維護落到的菜菜頭上,維護了一周左右,大約重啟了10多次,終於忍受不了了,重新搞一個吧。仔細分析過后,刨除實際文檔轉碼的核心操作之外,整個轉碼流程其實還有很多注意點
- 需要保證轉碼服務不被卡住,如果和以前一樣就沒有必要重新設計了
- 盡量避免開多個進程的方式,其實在這個業務場景下,多個進程和多個線程作用是一致的。
- 每個文檔只能被轉碼一次,如果一個文檔被轉碼多次,不僅浪費了服務器資源,而且還有可能會有數據不一致的情況發生
- 轉碼失敗的文檔需要有一定次數的重試,因為一次失敗不代表第二次失敗,所以一定要給失敗的文檔再次被操作的機會
- 因為程序不停的把文檔轉碼成本地圖片,所以需要保證這些文件在轉碼完成在服務器上刪除,不然的話,時間長了會生成很多無用的文件
說了這么多,其實需要注意的點還是很多的。以整個的轉碼流程來說,本質上是一個任務池的生產和消費問題,任務池中的任務就是待轉碼的文檔,生產者不停的把待轉碼文檔丟進任務池,消費者不停的把任務池中文檔轉碼完成。
線程池
這很顯然和線程池很類似,菜菜之前就寫過一個線程池的文章,有興趣的同學可以去翻翻歷史。今天我們就以這個線程池來解決這個轉碼問題。線程池的本質是初始化一定數目的線程,不停的執行任務。
//線程池定義
public class LXThreadPool:IDisposable
{
bool PoolEnable = true; //線程池是否可用
List<Thread> ThreadContainer = null; //線程的容器
ConcurrentQueue<ActionData> JobContainer = null; //任務的容器
int _maxJobNumber; //線程池最大job容量
ConcurrentDictionary<string, DateTime> JobIdList = new ConcurrentDictionary<string, DateTime>(); //job的副本,用於排除某個job 是否在運行中
public LXThreadPool(int threadNumber,int maxJobNumber=1000)
{
if(threadNumber<=0 || maxJobNumber <= 0)
{
throw new Exception("線程池初始化失敗");
}
_maxJobNumber = maxJobNumber;
ThreadContainer = new List<Thread>(threadNumber);
JobContainer = new ConcurrentQueue<ActionData>();
for (int i = 0; i < threadNumber; i++)
{
var t = new Thread(RunJob);
t.Name = $"轉碼線程{i}";
ThreadContainer.Add(t);
t.Start();
}
//清除超時任務的線程
var tTimeOutJob = new Thread(CheckTimeOutJob);
tTimeOutJob.Name = $"清理超時任務線程";
tTimeOutJob.Start();
}
//往線程池添加一個線程,返回線程池的新線程數
public int AddThread(int number=1)
{
if(!PoolEnable || ThreadContainer==null || !ThreadContainer.Any() || JobContainer==null|| !JobContainer.Any())
{
return 0;
}
while (number <= 0)
{
var t = new Thread(RunJob);
ThreadContainer.Add(t);
t.Start();
number -= number;
}
return ThreadContainer?.Count ?? 0;
}
//向線程池添加一個任務,返回0:添加任務失敗 1:成功
public int AddTask(Action<object> job, object obj,string actionId, Action<Exception> errorCallBack = null)
{
if (JobContainer != null)
{
if(JobContainer.Count>= _maxJobNumber)
{
return 0;
}
//首先排除10分鍾還沒轉完的
var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10) < DateTime.Now);
if(timeoOutJobList!=null&& timeoOutJobList.Any())
{
foreach (var timeoutJob in timeoOutJobList)
{
JobIdList.TryRemove(timeoutJob.Key,out DateTime v);
}
}
if (!JobIdList.Any(s => s.Key == actionId))
{
if(JobIdList.TryAdd(actionId, DateTime.Now))
{
JobContainer.Enqueue(new ActionData { Job = job, Data = obj, ActionId = actionId, ErrorCallBack = errorCallBack });
return 1;
}
else
{
return 101;
}
}
else
{
return 100;
}
}
return 0;
}
private void RunJob()
{
while (JobContainer != null && PoolEnable)
{
//任務列表取任務
ActionData job = null;
JobContainer?.TryDequeue(out job);
if (job == null)
{
//如果沒有任務則休眠
Thread.Sleep(20);
continue;
}
try
{
//執行任務
job.Job.Invoke(job.Data);
}
catch (Exception error)
{
//異常回調
if (job != null&& job.ErrorCallBack!=null)
{
job?.ErrorCallBack(error);
}
}
finally
{
if (!JobIdList.TryRemove(job.ActionId,out DateTime v))
{
}
}
}
}
//終止線程池
public void Dispose()
{
PoolEnable = false;
JobContainer = null;
if (ThreadContainer != null)
{
foreach (var t in ThreadContainer)
{
//強制線程退出並不好,會有異常
t.Join();
}
ThreadContainer = null;
}
}
//清理超時的任務
private void CheckTimeOutJob()
{
//首先排除10分鍾還沒轉完的
var timeoOutJobList = JobIdList.Where(s => s.Value.AddMinutes(10) < DateTime.Now);
if (timeoOutJobList != null && timeoOutJobList.Any())
{
foreach (var timeoutJob in timeoOutJobList)
{
JobIdList.TryRemove(timeoutJob.Key, out DateTime v);
}
}
System.Threading.Thread.Sleep(60000);
}
}
public class ActionData
{
//任務的id,用於排重
public string ActionId { get; set; }
//執行任務的參數
public object Data { get; set; }
//執行的任務
public Action<object> Job { get; set; }
//發生異常時候的回調方法
public Action<Exception> ErrorCallBack { get; set; }
}
以上就是一個線程池的具體實現,和具體的業務無關,完全可以用於任何適用於線程池的場景,其中有一個注意點,我新加了任務的標示,主要用於排除重復的任務被投放多次(只排除正在運行中的任務)。當然代碼不是最優的,有需要的同學可以自己去優化
使用線程池
接下來,我們利用以上的線程池來完成我們的文檔轉碼任務,首先我們啟動的時候初始化一個線程池,並啟動一個獨立線程來不停的往線程池來輸送任務,順便起了一個監控線程去監視發送任務的線程
string lastResId = null;
string lastErrorResId = null;
Dictionary<string, int> ResErrNumber = new Dictionary<string, int>(); //轉碼失敗的資源重試次數
int MaxErrNumber = 5;//最多轉碼錯誤的資源10次
Thread tPutJoj = null;
LXThreadPool pool = new LXThreadPool(4,100);
public void OnStart()
{
//初始化一個線程發送轉碼任務
tPutJoj = new Thread(PutJob);
tPutJoj.IsBackground = true;
tPutJoj.Start();
//初始化 監控線程
var tMonitor = new Thread(MonitorPutJob);
tMonitor.IsBackground = true;
tMonitor.Start();
}
//監視發放job的線程
private void MonitorPutJob()
{
while (true)
{
if(tPutJoj == null|| !tPutJoj.IsAlive)
{
Log.Error($"發送轉碼任務線程停止==========");
tPutJoj = new Thread(PutJob);
tPutJoj.Start();
Log.Error($"發送轉碼任務線程重新初始化並啟動==========");
}
System.Threading.Thread.Sleep(5000);
}
}
private void PutJob()
{
while (true)
{
try
{
//先搜索等待轉碼的
var fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Wait }, 30, lastResId);
Log.Error($"拉取待轉碼記錄===總數:lastResId:{lastResId},結果:{fileList?.Count() ?? 0}");
if (fileList == null || !fileList.Any())
{
lastResId = null;
Log.Error($"待轉碼數量為0,開始拉取轉碼失敗記錄,重新轉碼==========");
//如果無待轉,則把出錯的 嘗試
fileList = DocResourceRegisterProxy.GetFileList(new int[] { (int)FileToImgStateEnum.Error, (int)FileToImgStateEnum.TimeOut, (int)FileToImgStateEnum.Fail }, 1, lastErrorResId);
if (fileList == null || !fileList.Any())
{
lastErrorResId = null;
}
else
{
// Log.Error($"開始轉碼失敗記錄:{JsonConvert.SerializeObject(fileList)}");
List<DocResourceRegister> errFilter = new List<DocResourceRegister>();
foreach (var errRes in fileList)
{
if (ResErrNumber.TryGetValue(errRes.res_id, out int number))
{
if (number > MaxErrNumber)
{
Log.Error($"資源:{errRes.res_id} 轉了{MaxErrNumber}次不成功,放棄===========");
continue;
}
else
{
errFilter.Add(errRes);
ResErrNumber[errRes.res_id] = number + 1;
}
}
else
{
ResErrNumber.Add(errRes.res_id, 1);
errFilter.Add(errRes);
}
}
fileList = errFilter;
if (fileList.Any())
{
lastErrorResId = fileList.Select(s => s.res_id).Max();
}
}
}
else
{
lastResId = fileList.Select(s => s.res_id).Max();
}
if (fileList != null && fileList.Any())
{
foreach (var file in fileList)
{
//如果 任務投放線程池失敗,則等待一面繼續投放
int poolRet = 0;
while (poolRet <= 0)
{
poolRet = pool.AddTask(s => {
AliFileService.ConvertToImg(file.res_id + $".{file.res_ext}", FileToImgFac.Instance(file.res_ext));
}, file, file.res_id);
if (poolRet <= 0 || poolRet > 1)
{
Log.Error($"發放轉碼任務失敗==========線程池返回結果:{poolRet}");
System.Threading.Thread.Sleep(1000);
}
}
}
}
//每一秒去數據庫取一次數據
System.Threading.Thread.Sleep(3000);
}
catch
{
continue;
}
}
}
以上就是發放任務,線程池執行任務的所有代碼,由於具體的轉碼代碼涉及到隱私,這里不在提供,如果有需要可以私下找菜菜索要,雖然我深知還有更優的方式,但是我覺得線程池這樣的思想可能會對部分人有幫助,其中任務超時的核心代碼如下(采用了polly插件):
var policy= Policy.Timeout(TimeSpan.FromSeconds(this.TimeOut), onTimeout: (context, timespan, task) =>
{
ret.State=Enum.FileToImgStateEnum.TimeOut;
});
policy.Execute(s=>{
.....
});
把你的更優方案寫在留言區吧,2020年大家越來越好