0. 簡介
在某些時候我們可能會需要執行后台任務,或者是執行一些周期性的任務。比如說可能每隔 1 個小時要清除某個臨時文件夾內的數據,可能用戶會要針對某一個用戶群來群發一組短信。前面這些就是典型的應用場景,在 Abp 框架里面為我們准備了后台作業和后台工作者來幫助我們解決這個問題。
后台作業與后台工作者的區別是,前者主要用於某些耗時較長的任務,而不想阻塞用戶的時候所使用。后者主要用於周期性的執行某些任務,從 “工作者” 的名字可以看出來,就是一個個工人,而且他們每個工人都擁有單獨的后台線程。
0.1 典型場景
后台作業
- 某個用戶按下了報表按鈕來生成一個需要長時間等待的報表。你添加這個工作到隊列中,當報表生成完畢后,發送報表結果到該用戶的郵箱。
- 在后台作業中發送一封郵件,有些問題可能會導致發送失敗(網絡連接異常,或者主機宕機);由於有后台作業以及持久化機制,在問題排除后,可以重試以保證任務的成功執行。
后台工作者
- 后台工作者能夠周期性地執行舊日志的刪除。
- 后台工作者可以周期性地篩選出非活躍性用戶,並且發送回歸郵件給這些用戶。
1. 啟動流程
后台作業與后台工作者都是通過各自的 Manager(IBackgroundJobManager
/IBackgroundWorkerManager
) 來進行管理的。而這兩個 Manager 分別繼承了 ISingletonDependency
接口,所以在啟動的時候就會自動注入這兩個管理器以便開發人員管理操作。
這里值得注意的一點是,IBackgroundJobManager
接口是 IBackgroundWorker
的派生接口,而 IBackgroudWorker
是歸屬於 IBackgroundWorkerManager
進行管理的。
所以,你可以在 AbpKernelModule
里面看到如下代碼:
public sealed class AbpKernelModule : AbpModule
{
public override void PostInitialize()
{
// 注冊可能缺少的組件
RegisterMissingComponents();
// ... 忽略的代碼
// 各種管理器的初始化操作
// 從配置項中讀取,是否啟用了后台作業功能
if (Configuration.BackgroundJobs.IsJobExecutionEnabled)
{
var workerManager = IocManager.Resolve<IBackgroundWorkerManager>();
// 開始啟動后台工作者
workerManager.Start();
// 增加后台作業管理器
workerManager.Add(IocManager.Resolve<IBackgroundJobManager>());
}
}
}
可以看到,后台作業管理器是作為一個后台工作者被添加到了 IBackgroundWorkerManager
當中來執行的。
2. 代碼分析
2.1 后台工作者
2.1.1 后台工作者管理器
Abp 通過后台工作者管理器來管理后台作業隊列,所以我們首先來看一下后台工作者管理器接口的定義是什么樣子的。
public interface IBackgroundWorkerManager : IRunnable
{
void Add(IBackgroundWorker worker);
}
還是相當簡潔的,就一個 Add
方法用來添加一個新的后台工作者對象。只是在這個地方,可以看到該接口又是集成自 IRunnable
接口,那么該接口的作用又是什么呢?
轉到其定義可以看到,IRunable
接口定義了三個基本的方法:Start()
、Stop()
、WaitStop()
,而且他擁有一個默認實現 RunableBase
,其實就是用來標識一個任務的運行狀態。
public interface IRunnable
{
// 開始執行任務
void Start();
// 停止執行任務
void Stop();
// 阻塞線程,等待任務執行完成后標識為停止。
void WaitToStop();
}
public abstract class RunnableBase : IRunnable
{
// 用於標識任務是否運行的布爾值變量
public bool IsRunning { get { return _isRunning; } }
private volatile bool _isRunning;
// 啟動之后表示任務正在運行
public virtual void Start()
{
_isRunning = true;
}
// 停止之后表示任務結束運行
public virtual void Stop()
{
_isRunning = false;
}
public virtual void WaitToStop()
{
}
}
到目前為止整個代碼都還是比較簡單清晰的,我們接着看 IBackgroundWorkerManager
的默認實現 BackgroundWorkerManager
類,首先我們看一下該類擁有哪些屬性與字段。
public class BackgroundWorkerManager : RunnableBase, IBackgroundWorkerManager, ISingletonDependency, IDisposable
{
private readonly IIocResolver _iocResolver;
private readonly List<IBackgroundWorker> _backgroundJobs;
public BackgroundWorkerManager(IIocResolver iocResolver)
{
_iocResolver = iocResolver;
_backgroundJobs = new List<IBackgroundWorker>();
}
}
在后台工作者管理器類的內部,默認有一個 List 集合,用於維護所有的后台工作者對象。那么其他的 Start()
等方法肯定是基於這個集合進行操作的。
public override void Start()
{
base.Start();
_backgroundJobs.ForEach(job => job.Start());
}
public override void Stop()
{
_backgroundJobs.ForEach(job => job.Stop());
base.Stop();
}
public override void WaitToStop()
{
_backgroundJobs.ForEach(job => job.WaitToStop());
base.WaitToStop();
}
可以看到實現還是比較簡單的,接下來我們繼續看他的 Add()
方法是如何進行操作的?
public void Add(IBackgroundWorker worker)
{
_backgroundJobs.Add(worker);
if (IsRunning)
{
worker.Start();
}
}
在這里我們看到他會針對 IsRunning
進行判定是否立即啟動加入的后台工作者對象。而這個 IsRunning
屬性值唯一產生變化的情況就在於 Start()
方法與 Stop()
方法的調用。
最后肯定也有相關的銷毀方法,用於釋放所有注入的后台工作者對象,並將集合清除。
private bool _isDisposed;
public void Dispose()
{
if (_isDisposed)
{
return;
}
_isDisposed = true;
// 遍歷集合,通過 Ioc 解析器的 Release 方法釋放對象
_backgroundJobs.ForEach(_iocResolver.Release);
// 清空集合
_backgroundJobs.Clear();
}
所以,針對於所有后台工作者的管理,都是通過 IBackgroundWorkerManager
來進行操作的。
2.1.2 后台工作者
看完了管理器,我們來看一下 IBackgroundWorker
后台工作者對象是怎樣的構成。
public interface IBackgroundWorker : IRunnable
{
}
貌似只是一個空的接口,其作用主要是標識某個類型是否為后台工作者,轉到其抽象類實現 BackgroundWorkerBase
,里面只是注入了一些輔助對象與本地化的一些方法。
public abstract class BackgroundWorkerBase : RunnableBase, IBackgroundWorker
{
// 配置管理器
public ISettingManager SettingManager { protected get; set; }
// 工作單元管理器
public IUnitOfWorkManager UnitOfWorkManager
{
get
{
if (_unitOfWorkManager == null)
{
throw new AbpException("Must set UnitOfWorkManager before use it.");
}
return _unitOfWorkManager;
}
set { _unitOfWorkManager = value; }
}
private IUnitOfWorkManager _unitOfWorkManager;
// 獲得當前的工作單元
protected IActiveUnitOfWork CurrentUnitOfWork { get { return UnitOfWorkManager.Current; } }
// 本地化資源管理器
public ILocalizationManager LocalizationManager { protected get; set; }
// 默認的本地化資源的源名稱
protected string LocalizationSourceName { get; set; }
protected ILocalizationSource LocalizationSource
{
get
{
// 如果沒有配置源名稱,直接拋出異常
if (LocalizationSourceName == null)
{
throw new AbpException("Must set LocalizationSourceName before, in order to get LocalizationSource");
}
if (_localizationSource == null || _localizationSource.Name != LocalizationSourceName)
{
_localizationSource = LocalizationManager.GetSource(LocalizationSourceName);
}
return _localizationSource;
}
}
private ILocalizationSource _localizationSource;
// 日志記錄器
public ILogger Logger { protected get; set; }
protected BackgroundWorkerBase()
{
Logger = NullLogger.Instance;
LocalizationManager = NullLocalizationManager.Instance;
}
// ... 其他模板代碼
}
我們接着看繼承並實現了 BackgroundWorkerBase
的類型 PeriodicBackgroundWorkerBase
,從字面意思上來看,該類型應該是一個定時后台工作者基類。
重點在於 Periodic
(定時),從其類型內部的定義可以看到,該類型使用了一個 AbpTimer
對象來進行周期計時與具體工作任務的觸發。我們暫時先不看這個 AbpTimer
,僅僅看 PeriodicBackgroundWorkerBase
的內部實現。
public abstract class PeriodicBackgroundWorkerBase : BackgroundWorkerBase
{
protected readonly AbpTimer Timer;
// 注入 AbpTimer
protected PeriodicBackgroundWorkerBase(AbpTimer timer)
{
Timer = timer;
// 綁定周期執行的任務,這里是 DoWork()
Timer.Elapsed += Timer_Elapsed;
}
public override void Start()
{
base.Start();
Timer.Start();
}
public override void Stop()
{
Timer.Stop();
base.Stop();
}
public override void WaitToStop()
{
Timer.WaitToStop();
base.WaitToStop();
}
private void Timer_Elapsed(object sender, System.EventArgs e)
{
try
{
DoWork();
}
catch (Exception ex)
{
Logger.Warn(ex.ToString(), ex);
}
}
protected abstract void DoWork();
}
可以看到,這里基類綁定了 DoWork()
作為其定時執行的方法,那么用戶在使用的時候直接繼承自該基類,然后重寫 DoWork()
方法即可綁定自己的后台工作者的任務。
2.1.3 AbpTimer 定時器
在上面的基類我們看到,基類的 Start()
、Stop()
、WaitTpStop()
方法都是調用的 AbpTimer
所提供的,所以說 AbpTimer
其實也繼承了 RunableBase
基類並實現其具體的啟動與停止操作。
其實 AbpTimer
的核心就是通過 CLR 的 Timer
來實現周期性任務執行的,不過默認的 Timer
類有兩個比較大的問題。
- CLR 的
Timer
並不會等待你的任務執行完再執行下一個周期的任務,如果你的某個任務耗時過長,超過了Timer
定義的周期。那么Timer
會開啟一個新的線程執行,這樣的話最后我們系統的資源會因為線程大量重復創建而被拖垮。 - 如何知道一個
Timer
所執行的業務方法已經真正地被結束了。
所以 Abp 才會重新封裝一個 AbpTimer
作為一個基礎的計時器。第一個問題的解決方法很簡單,就是在執行具體綁定的業務方法之前,通過 Timer.Change()
方法來讓 Timer
臨時失效。等待業務方法執行完成之后,再將 Timer
的周期置為用戶設定的周期。
// CLR Timer 綁定的回調方法
private void TimerCallBack(object state)
{
lock (_taskTimer)
{
if (!_running || _performingTasks)
{
return;
}
// 暫時讓 Timer 失效
_taskTimer.Change(Timeout.Infinite, Timeout.Infinite);
// 設置執行標識為 TRUE,表示當前的 AbpTimer 正在執行
_performingTasks = true;
}
try
{
// 如果綁定了相應的觸發事件
if (Elapsed != null)
{
// 執行相應的業務方法,這里就是最開始綁定的 DoWork() 方法
Elapsed(this, new EventArgs());
}
}
catch
{
}
finally
{
lock (_taskTimer)
{
// 標識業務方法執行完成
_performingTasks = false;
if (_running)
{
// 更改周期為用戶指定的執行周期,等待下一次觸發
_taskTimer.Change(Period, Timeout.Infinite);
}
Monitor.Pulse(_taskTimer);
}
}
}
針對於第二個問題,Abp 通過 WaitToStop()
方法會阻塞調用這個 Timer
的線程,並且在 _performingTasks
標識位是 false
的時候釋放。
public override void WaitToStop()
{
// 鎖定 CLR 的 Timer 對象
lock (_taskTimer)
{
// 循環檢測
while (_performingTasks)
{
Monitor.Wait(_taskTimer);
}
}
base.WaitToStop();
}
至於其他的 Start()
方法就是使用 CLR 的 Timer
更改其執行周期,而 Stop()
就是直接將 Timer
的周期設置為無限大,使計時器失效。
2.1.4 總結
Abp 后台工作者的核心就是通過 AbpTimer
來實現周期性任務的執行,用戶只需要繼承自 PeriodicBackgroundWorkerBase
,然后將其添加到 IBackgroundWorkerManager
的集合當中。這樣 Abp 在啟動之后就會遍歷這個工作者集合,然后周期執行這些后台工作者綁定的方法。
當然如果你繼承了 PeriodicBackgroundWorkerBase
之后,可以通過設置構造函數的 AbpTimer
來指定自己的執行周期。
2.2 后台作業隊列
后台工作隊列的管理是通過 IBackgroundJobManager
來處理的,而該接口又繼承自 IBackgroundWorker
,所以一整個后台作業隊列就是一個后台工作者,只不過這個工作者有點特殊。
2.2.1 后台作業管理器
IBackgroundJobManager
接口的定義其實就兩個方法,一個 EnqueueAsync<TJob, TArgs>()
用於將一個后台作業加入到執行隊列當中。而 DeleteAsync()
方法呢,顧名思義就是從隊列當中移除指定的后台作業。
首先看一下其默認實現 BackgroundJobManager
,該實現同樣是繼承自 PeriodicBackgroundWorkerBase
並且其默認周期為 5000 ms。
public class BackgroundJobManager : PeriodicBackgroundWorkerBase, IBackgroundJobManager, ISingletonDependency
{
// 事件總線
public IEventBus EventBus { get; set; }
// 輪訓后台作業的間隔,默認值為 5000 毫秒.
public static int JobPollPeriod { get; set; }
// IOC 解析器
private readonly IIocResolver _iocResolver;
// 后台作業隊列存儲
private readonly IBackgroundJobStore _store;
static BackgroundJobManager()
{
JobPollPeriod = 5000;
}
public BackgroundJobManager(
IIocResolver iocResolver,
IBackgroundJobStore store,
AbpTimer timer)
: base(timer)
{
_store = store;
_iocResolver = iocResolver;
EventBus = NullEventBus.Instance;
Timer.Period = JobPollPeriod;
}
}
基礎結構基本上就這個樣子,接下來看一下他的兩個接口方法是如何實現的。
EnqueueAsync<TJob, TArgs>
方法通過傳入指定的后台作業對象和相應的參數,同時還有任務的優先級。將其通過 IBackgroundJobStore
進行持久化,並返回一個任務的唯一 JobId 以便進行刪除操作。
public async Task<string> EnqueueAsync<TJob, TArgs>(TArgs args, BackgroundJobPriority priority = BackgroundJobPriority.Normal, TimeSpan? delay = null)
where TJob : IBackgroundJob<TArgs>
{
// 通過 JobInfo 包裝任務的基本信息
var jobInfo = new BackgroundJobInfo
{
JobType = typeof(TJob).AssemblyQualifiedName,
JobArgs = args.ToJsonString(),
Priority = priority
};
// 如果需要延時執行的話,則用當前時間加上延時的時間作為任務下次運行的時間
if (delay.HasValue)
{
jobInfo.NextTryTime = Clock.Now.Add(delay.Value);
}
// 通過 Store 進行持久話存儲
await _store.InsertAsync(jobInfo);
// 返回后台任務的唯一標識
return jobInfo.Id.ToString();
}
至於刪除操作,在 Manager 內部其實也是通過 IBackgroundJobStore
進行實際的刪除操作的。
public async Task<bool> DeleteAsync(string jobId)
{
// 判斷 jobId 的值是否有效
if (long.TryParse(jobId, out long finalJobId) == false)
{
throw new ArgumentException($"The jobId '{jobId}' should be a number.", nameof(jobId));
}
// 使用 jobId 從 Store 處篩選到 JobInfo 對象的信息
BackgroundJobInfo jobInfo = await _store.GetAsync(finalJobId);
if (jobInfo == null)
{
return false;
}
// 如果存在有 JobInfo 則使用 Store 進行刪除操作
await _store.DeleteAsync(jobInfo);
return true;
}
后台作業管理器實質上是一個周期性執行的后台工作者,那么我們的后台作業是每 5000 ms 執行一次,那么他的 DoWork()
方法又在執行什么操作呢?
protected override void DoWork()
{
// 從 Store 當中獲得等待執行的后台作業集合
var waitingJobs = AsyncHelper.RunSync(() => _store.GetWaitingJobsAsync(1000));
// 遍歷這些等待執行的后台任務,然后通過 TryProcessJob 進行執行
foreach (var job in waitingJobs)
{
TryProcessJob(job);
}
}
可以看到每 5 秒鍾我們的后台作業管理器就會從 IBackgroundJobStore
當中拿到最大 1000 條的后台作業信息,然后遍歷這些信息。通過 TryProcessJob(job)
方法來執行后台作業。
而 TryProcessJob()
方法,本質上就是通過反射構建出一個 IBackgroundJob
對象,然后取得序列化的參數值,通過反射得到的 MethodInfo
對象來執行我們的后台任務。執行完成之后,就會從 Store 當中移除掉執行完成的任務。
針對於在執行過程當中所出現的異常,會通過 IEventBus
觸發一個 AbpHandledExceptionData
事件記錄后台作業執行失敗時的異常信息。並且一旦在執行過程當中出現了任何異常的情況,都會將該任務的 IsAbandoned
字段置為 true
,當該字段為 true
時,該任務將不再回被執行。
PS:就是在
GetWaitingJobsAsync()
方法時,會過濾掉 IsAbandoned 值為true
的任務。
private void TryProcessJob(BackgroundJobInfo jobInfo)
{
try
{
// 任務執行次數自增 1
jobInfo.TryCount++;
// 最后一次執行時間設置為當前時間
jobInfo.LastTryTime = Clock.Now;
// 通過反射取得后台作業的類型
var jobType = Type.GetType(jobInfo.JobType);
// 通過 Ioc 解析器得到一個臨時的后台作業對象,執行完之后既被釋放
using (var job = _iocResolver.ResolveAsDisposable(jobType))
{
try
{
// 通過反射得到后台作業的 Execute 方法
var jobExecuteMethod = job.Object.GetType().GetTypeInfo().GetMethod("Execute");
var argsType = jobExecuteMethod.GetParameters()[0].ParameterType;
var argsObj = JsonConvert.DeserializeObject(jobInfo.JobArgs, argsType);
// 結合持久話存儲的參數信息,調用 Execute 方法進行后台作業
jobExecuteMethod.Invoke(job.Object, new[] { argsObj });
// 執行完成之后從 Store 刪除該任務的信息
AsyncHelper.RunSync(() => _store.DeleteAsync(jobInfo));
}
catch (Exception ex)
{
Logger.Warn(ex.Message, ex);
// 計算下一次執行的時間,一旦超過 2 天該任務都執行失敗,則返回 null
var nextTryTime = jobInfo.CalculateNextTryTime();
if (nextTryTime.HasValue)
{
jobInfo.NextTryTime = nextTryTime.Value;
}
else
{
// 如果為 null 則說明該任務在 2 天的時間內都沒有執行成功,則放棄繼續執行
jobInfo.IsAbandoned = true;
}
// 更新 Store 存儲的任務信息
TryUpdate(jobInfo);
// 觸發異常事件
EventBus.Trigger(
this,
new AbpHandledExceptionData(
new BackgroundJobException(
"A background job execution is failed. See inner exception for details. See BackgroundJob property to get information on the background job.",
ex
)
{
BackgroundJob = jobInfo,
JobObject = job.Object
}
)
);
}
}
}
catch (Exception ex)
{
Logger.Warn(ex.ToString(), ex);
// 表示任務不再執行
jobInfo.IsAbandoned = true;
// 更新 Store
TryUpdate(jobInfo);
}
}
2.2.2 后台作業
后台作業的默認接口定義為 IBackgroundJob<in TArgs>
,他只有一個 Execute(TArgs args)
方法,用於接收指定類型的作業參數,並執行。
一般來說我們不建議直接通過繼承 IBackgroundJob<in TArgs>
來實現后台作業,而是繼承自 BackgroundJob<TArgs>
抽象類。該抽象類內部也沒有什么特別的實現,主要是注入了一些基礎設施,比如說 UOW 與 本地化資源管理器,方便我們開發使用。
后台作業本身是具體執行的對象,而 BackgroundJobInfo
則是存儲了后台作業的 Type 類型和參數,方便在需要執行的時候通過反射的方式執行后台作業。
2.2.2 后台作業隊列存儲
從 IBackgroundJobStore
我們就可以猜到以 Abp 框架的套路,他肯定會有兩種實現,第一種就是基於內存的 InMemoryBackgroundJobStore
。而第二種呢,就是由 Abp.Zero 模塊所提供的基於數據庫的 BackgroundJobStore
。
IBackgroundJobStore
接口所定義的方法基本上就是增刪改查,沒有什么復雜的。
public interface IBackgroundJobStore
{
// 通過 JobId 獲取后台任務信息
Task<BackgroundJobInfo> GetAsync(long jobId);
// 插入一個新的后台任務信息
Task InsertAsync(BackgroundJobInfo jobInfo);
/// <summary>
/// Gets waiting jobs. It should get jobs based on these:
/// Conditions: !IsAbandoned And NextTryTime <= Clock.Now.
/// Order by: Priority DESC, TryCount ASC, NextTryTime ASC.
/// Maximum result: <paramref name="maxResultCount"/>.
/// </summary>
/// <param name="maxResultCount">Maximum result count.</param>
Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount);
/// <summary>
/// Deletes a job.
/// </summary>
/// <param name="jobInfo">Job information.</param>
Task DeleteAsync(BackgroundJobInfo jobInfo);
/// <summary>
/// Updates a job.
/// </summary>
/// <param name="jobInfo">Job information.</param>
Task UpdateAsync(BackgroundJobInfo jobInfo);
}
這里先從簡單的內存 Store 說起,這個 InMemoryBackgroundJobStore
內部使用了一個並行字典來存儲這些任務信息。
public class InMemoryBackgroundJobStore : IBackgroundJobStore
{
private readonly ConcurrentDictionary<long, BackgroundJobInfo> _jobs;
private long _lastId;
public InMemoryBackgroundJobStore()
{
_jobs = new ConcurrentDictionary<long, BackgroundJobInfo>();
}
}
相當簡單,這幾個接口方法基本上就是針對與這個並行字典操作的一層封裝。
public Task<BackgroundJobInfo> GetAsync(long jobId)
{
return Task.FromResult(_jobs[jobId]);
}
public Task InsertAsync(BackgroundJobInfo jobInfo)
{
jobInfo.Id = Interlocked.Increment(ref _lastId);
_jobs[jobInfo.Id] = jobInfo;
return Task.FromResult(0);
}
public Task<List<BackgroundJobInfo>> GetWaitingJobsAsync(int maxResultCount)
{
var waitingJobs = _jobs.Values
// 首先篩選出不再執行的后台任務
.Where(t => !t.IsAbandoned && t.NextTryTime <= Clock.Now)
// 第一次根據后台作業的優先級進行排序,高優先級優先執行
.OrderByDescending(t => t.Priority)
// 再根據執行次數排序,執行次數越少的,越靠前
.ThenBy(t => t.TryCount)
.ThenBy(t => t.NextTryTime)
.Take(maxResultCount)
.ToList();
return Task.FromResult(waitingJobs);
}
public Task DeleteAsync(BackgroundJobInfo jobInfo)
{
_jobs.TryRemove(jobInfo.Id, out _);
return Task.FromResult(0);
}
public Task UpdateAsync(BackgroundJobInfo jobInfo)
{
// 如果是不再執行的任務,刪除
if (jobInfo.IsAbandoned)
{
return DeleteAsync(jobInfo);
}
return Task.FromResult(0);
}
至於持久化到數據庫,無非是注入一個倉儲,然后針對這個倉儲進行增刪查改的操作罷了,這里就不在贅述。
2.2.3 后台作業優先級
后台作業的優先級定義在 BackgroundJobPriority
枚舉當中,一共有 5 個等級,分別是 Low
、BelowNormal
、Normal
、AboveNormal
、High
,他們從最低到最高排列。