Hangfire有個機制可以確保所有任務都會被執行,如果當服務器停機了一段時間重新啟動時,在此期間的周期任務會幾乎同時執行。而大部分時候,我們希望同個周期任務每段時間只運行一個就行了。
或者是如果周期任務設置得過於頻繁,當之前的任務還沒執行完,我們也不希望繼續添加周期任務進隊列去排隊執行。
Hangfire有提供一個擴展https://docs.hangfire.io/en/latest/background-processing/throttling.html
同個DisableConcurrentExecution我們可以限制同一個任務每次只會執行一個,但是如果有任務正在執行,這時候又有新任務過來,新任務並不會被刪除而是處於排隊狀態,等待前面的任務執行完。
而且,如果我們的任務用了同一個方法作為入口時(或者說我們需要根據方法的參數來確定是否為同一個任務),此時這個控制就不適用了。
參考了https://gist.github.com/sbosell/3831f5bb893b20e82c72467baf8aefea,我們可以用過濾器來實現,將運行期間進來的任務給取消掉。
代碼的具體實現為:

1 /// <summary> 2 /// 禁用多個排隊項目 3 /// <remarks>同個任務取消並行執行,期間進來的任務不會等待,會被取消</remarks> 4 /// </summary> 5 public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter 6 { 7 private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5); 8 private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(4);//任務執行超時時間 9 10 public void OnCreating(CreatingContext filterContext) 11 { 12 var recurringJobId = filterContext.GetJobParameter<string>("RecurringJobId"); 13 if (!string.IsNullOrEmpty(recurringJobId)&&!AddFingerprintIfNotExists(filterContext.Connection, recurringJobId)) 14 { 15 filterContext.Canceled = true; 16 } 17 } 18 19 public void OnPerformed(PerformedContext filterContext) 20 { 21 var recurringJobId = filterContext.GetJobParameter<string>("RecurringJobId"); 22 if (!string.IsNullOrEmpty(recurringJobId)) 23 { 24 RemoveFingerprint(filterContext.Connection, recurringJobId); 25 } 26 } 27 28 private static bool AddFingerprintIfNotExists(IStorageConnection connection, string recurringJobId) 29 { 30 using (connection.AcquireDistributedLock(GetFingerprintLockKey(recurringJobId), LockTimeout)) 31 { 32 var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(recurringJobId)); 33 34 if (fingerprint != null && 35 fingerprint.ContainsKey("Timestamp") && 36 DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out var timestamp) && 37 DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout)) 38 { 39 // 有任務還未執行完,並且沒有超時 40 return false; 41 } 42 43 // 沒有任務執行,或者該任務已超時 44 connection.SetRangeInHash(GetFingerprintKey(recurringJobId), new Dictionary<string, string> 45 { 46 { "Timestamp", DateTimeOffset.UtcNow.ToString("o") } 47 }); 48 49 return true; 50 } 51 }
在OnCreating方法中,我們讀取RecurringJobId的值,獲取周期任務的id(同樣的id代表同一個周期任務),然后以這個id為key去設置一個超時。如果在此期間,如果拿到了key的值,以及設置的時間還未超時的話,我們通過設置filterContext.Canceled = true取消掉此任務。
使用connection.AcquireDistributedLock在設置鍵值時添加分布式鎖,確保不會同時設置了多個相同的任務。使用connection.SetRangeInHash鍵RecurringJobId作為key,當前時間作為值保存。以此來確保在FingerprintTimeout的超時時間內,同個RecurringJobId的任務只能創建一個。或者等任務執行完后在OnPerformed方法中釋放掉這個鍵值。
在OnPerformed方法中,將我們在創建方法中設置的RecurringJobId key和對應的時間給刪除,這樣OnCreating可以繼續創建同一個RecurringJobId 的任務。
或者是普通觸發的任務,這時候沒有RecurringJobId 我們希望可以同個參數來控制,同樣的參數不能同時執行。我們可以通過這個方法來生成相應的key

1 private static string GetFingerprint(Job job) 2 { 3 var parameters = string.Empty; 4 if (job?.Arguments != null) 5 { 6 parameters = string.Join(".", job.Arguments); 7 } 8 if (job?.Type == null || job.Method == null) 9 { 10 return string.Empty; 11 } 12 var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}"; 13 var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload)); 14 var fingerprint = Convert.ToBase64String(hash); 15 return fingerprint; 16 }
這樣我們就能確保我們希望的同一個任務不會同時在執行,而且周期任務也不會繼續在隊列中排隊
考慮到寫死鎖的key值不太合理,現添加特性來處理。
添加DisableMultipleInstanceAttribute特性,添加WebApiPullDisableMultipleInstance默認實現,取參數列表的第一個參數當做key

1 /// <summary> 2 /// 后台任務禁用重復任務排隊 3 /// </summary> 4 [AttributeUsage(AttributeTargets.Class)] 5 public class DisableMultipleInstanceAttribute: BackgroundJobAttribute 6 { 7 private string _fingerprint; 8 public DisableMultipleInstanceAttribute(string fingerprint = null) 9 { 10 _fingerprint = fingerprint; 11 } 12 13 public virtual string GetFingerprint(IReadOnlyList<object> methodArgs) 14 { 15 return _fingerprint; 16 } 17 18 public MultiTenancySides MultiTenancySides { get; set; } = MultiTenancySides.Tenant; 19 } 20 21 /// <summary> 22 /// 接口禁用重復任務排隊 23 /// </summary> 24 public class WebApiPullDisableMultipleInstanceAttribute: DisableMultipleInstanceAttribute 25 { 26 public override string GetFingerprint(IReadOnlyList<object> methodArgs) 27 { 28 return methodArgs[0].ToString(); 29 } 30 }
在基類中添加特性
添加hangfire的JobFilterAttributeFilterProvider的實現CustomJobAttributeFilterProvider,重寫GetTypeAttributes方法,添加我們新增的特性

1 protected override IEnumerable<JobFilterAttribute> GetTypeAttributes(Job job) 2 { 3 foreach (var attribute in ReflectedAttributeCache.GetTypeFilterAttributes(job.Type)) 4 { 5 if (attribute is CaptureContextAttribute) 6 { 7 yield return new CaptureContextMessageAttribute(); 8 } 9 10 if (attribute is AutomaticRetryAttribute automaticRetry) 11 { 12 yield return new AutoRetryAttribute() { Attempts = automaticRetry.AutomaticRetry }; 13 } 14 15 if (attribute is WebApiPullDisableMultipleInstanceAttribute apiPullDisable) 16 { 17 yield return new DisableMultipleQueuedItemsAttribute(apiPullDisable); 18 } 19 } 20 }
在DisableMultipleQueuedItemsAttribute的OnCreating中調用_disableMultipleInstanceAttribute.GetFingerprint獲取分布式鎖的key

1 /// <summary> 2 /// 禁用多個排隊項目 3 /// <remarks>同個任務取消並行執行,期間進來的任務不會等待,會被取消</remarks> 4 /// </summary> 5 public class DisableMultipleQueuedItemsAttribute : JobFilterAttribute, IClientFilter, IServerFilter 6 { 7 private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5); 8 private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(4);//任務執行超時時間 9 10 private readonly DisableMultipleInstanceAttribute _disableMultipleInstanceAttribute; 11 public DisableMultipleQueuedItemsAttribute(DisableMultipleInstanceAttribute attribute) 12 { 13 _disableMultipleInstanceAttribute = attribute; 14 } 15 16 public void OnCreating(CreatingContext filterContext) 17 { 18 var fingerprintKey = _disableMultipleInstanceAttribute.GetFingerprint(filterContext.Job.Args); 19 if (string.IsNullOrEmpty(fingerprintKey)) 20 { 21 throw new AppFatalExceptions("唯一鍵為空"); 22 } 23 if (_disableMultipleInstanceAttribute.MultiTenancySides==MultiTenancySides.Tenant) 24 { 25 var contextMessage = filterContext.GetJobParameter<ContextMessage>("_ld_contextMessage"); 26 if (string.IsNullOrEmpty(contextMessage.TenantId)) 27 { 28 throw new AppFatalExceptions("租戶Id為空"); 29 } 30 fingerprintKey = $"{contextMessage.TenantId}:{fingerprintKey}"; 31 } 32 if (!AddFingerprintIfNotExists(filterContext.Connection, fingerprintKey)) 33 { 34 filterContext.Canceled = true; 35 } 36 } 37 38 public void OnPerformed(PerformedContext filterContext) 39 { 40 var fingerprintKey = _disableMultipleInstanceAttribute.GetFingerprint(filterContext.BackgroundJob.Job.Args); 41 if (_disableMultipleInstanceAttribute.MultiTenancySides == MultiTenancySides.Tenant) 42 { 43 var contextMessage = filterContext.GetJobParameter<ContextMessage>("_ld_contextMessage"); 44 fingerprintKey = $"{contextMessage.TenantId}:{fingerprintKey}"; 45 } 46 RemoveFingerprint(filterContext.Connection, fingerprintKey); 47 } 48 49 private static bool AddFingerprintIfNotExists(IStorageConnection connection, string fingerprintKey) 50 { 51 using (connection.AcquireDistributedLock(GetFingerprintLockKey(fingerprintKey), LockTimeout)) 52 { 53 var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(fingerprintKey)); 54 55 if (fingerprint != null && 56 fingerprint.ContainsKey("Timestamp") && 57 DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out var timestamp) && 58 DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout)) 59 { 60 // 有任務還未執行完,並且沒有超時 61 return false; 62 } 63 64 // 沒有任務執行,或者該任務已超時 65 connection.SetRangeInHash(GetFingerprintKey(fingerprintKey), new Dictionary<string, string> 66 { 67 { "Timestamp", DateTimeOffset.UtcNow.ToString("o") } 68 }); 69 70 return true; 71 } 72 } 73 74 private static void RemoveFingerprint(IStorageConnection connection, string recurringJobId) 75 { 76 using (connection.AcquireDistributedLock(GetFingerprintLockKey(recurringJobId), LockTimeout)) 77 using (var transaction = connection.CreateWriteTransaction()) 78 { 79 transaction.RemoveHash(GetFingerprintKey(recurringJobId)); 80 transaction.Commit(); 81 } 82 } 83 84 private static string GetFingerprintLockKey(string key) 85 { 86 return String.Format("{0}:lock", key); 87 } 88 89 private static string GetFingerprintKey(string key) 90 { 91 return String.Format("fingerprint:{0}", key); 92 } 93 94 95 void IClientFilter.OnCreated(CreatedContext filterContext) 96 { 97 98 } 99 100 void IServerFilter.OnPerforming(PerformingContext filterContext) 101 { 102 } 103 104 }