Hangfire只允許同時運行同一個任務


Hangfire有個機制可以確保所有任務都會被執行,如果當服務器停機了一段時間重新啟動時,在此期間的周期任務會幾乎同時執行。而大部分時候,我們希望同個周期任務每段時間只運行一個就行了。

或者是如果周期任務設置得過於頻繁,當之前的任務還沒執行完,我們也不希望繼續添加周期任務進隊列去排隊執行。

Hangfire有提供一個擴展https://docs.hangfire.io/en/latest/background-processing/throttling.html 

同個DisableConcurrentExecution我們可以限制同一個任務每次只會執行一個,但是如果有任務正在執行,這時候又有新任務過來,新任務並不會被刪除而是處於排隊狀態,等待前面的任務執行完。

 

 

而且,如果我們的任務用了同一個方法作為入口時(或者說我們需要根據方法的參數來確定是否為同一個任務),此時這個控制就不適用了。

參考了https://gist.github.com/sbosell/3831f5bb893b20e82c72467baf8aefea,我們可以用過濾器來實現,將運行期間進來的任務給取消掉。

代碼的具體實現為:

 1     /// <summary>
 2     /// 禁用多個排隊項目
 3     /// <remarks>同個任務取消並行執行,期間進來的任務不會等待,會被取消</remarks>
 4     /// </summary>
 5     public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter
 6     {
 7         private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
 8         private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(4);//任務執行超時時間
 9 
10         public void OnCreating(CreatingContext filterContext)
11         {
12             var recurringJobId = filterContext.GetJobParameter<string>("RecurringJobId");
13             if (!string.IsNullOrEmpty(recurringJobId)&&!AddFingerprintIfNotExists(filterContext.Connection, recurringJobId))
14             {
15                 filterContext.Canceled = true;
16             }
17         }
18 
19         public void OnPerformed(PerformedContext filterContext)
20         {
21             var recurringJobId = filterContext.GetJobParameter<string>("RecurringJobId");
22             if (!string.IsNullOrEmpty(recurringJobId))
23             {
24                 RemoveFingerprint(filterContext.Connection, recurringJobId);
25             }
26         }
27 
28         private static bool AddFingerprintIfNotExists(IStorageConnection connection, string recurringJobId)
29         {
30             using (connection.AcquireDistributedLock(GetFingerprintLockKey(recurringJobId), LockTimeout))
31             {
32                 var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(recurringJobId));
33 
34                 if (fingerprint != null &&
35                     fingerprint.ContainsKey("Timestamp") &&
36                     DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out var timestamp) &&
37                     DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
38                 {
39                     // 有任務還未執行完,並且沒有超時
40                     return false;
41                 }
42 
43                 // 沒有任務執行,或者該任務已超時
44                 connection.SetRangeInHash(GetFingerprintKey(recurringJobId), new Dictionary<string, string>
45             {
46                 { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
47             });
48 
49                 return true;
50             }
51         }
View Code

 

在OnCreating方法中,我們讀取RecurringJobId的值,獲取周期任務的id(同樣的id代表同一個周期任務),然后以這個id為key去設置一個超時。如果在此期間,如果拿到了key的值,以及設置的時間還未超時的話,我們通過設置filterContext.Canceled = true取消掉此任務。

 

 使用connection.AcquireDistributedLock在設置鍵值時添加分布式鎖,確保不會同時設置了多個相同的任務。使用connection.SetRangeInHash鍵RecurringJobId作為key,當前時間作為值保存。以此來確保在FingerprintTimeout的超時時間內,同個RecurringJobId的任務只能創建一個。或者等任務執行完后在OnPerformed方法中釋放掉這個鍵值。

在OnPerformed方法中,將我們在創建方法中設置的RecurringJobId key和對應的時間給刪除,這樣OnCreating可以繼續創建同一個RecurringJobId 的任務。

 

 

 

或者是普通觸發的任務,這時候沒有RecurringJobId 我們希望可以同個參數來控制,同樣的參數不能同時執行。我們可以通過這個方法來生成相應的key

 1         private static string GetFingerprint(Job job)
 2         {
 3             var parameters = string.Empty;
 4             if (job?.Arguments != null)
 5             {
 6                 parameters = string.Join(".", job.Arguments);
 7             }
 8             if (job?.Type == null || job.Method == null)
 9             {
10                 return string.Empty;
11             }
12             var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
13             var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
14             var fingerprint = Convert.ToBase64String(hash);
15             return fingerprint;
16         }    
View Code

這樣我們就能確保我們希望的同一個任務不會同時在執行,而且周期任務也不會繼續在隊列中排隊

 


 

考慮到寫死鎖的key值不太合理,現添加特性來處理。

添加DisableMultipleInstanceAttribute特性,添加WebApiPullDisableMultipleInstance默認實現,取參數列表的第一個參數當做key

 1     /// <summary>
 2     /// 后台任務禁用重復任務排隊
 3     /// </summary>
 4     [AttributeUsage(AttributeTargets.Class)]
 5     public class DisableMultipleInstanceAttribute: BackgroundJobAttribute
 6     {
 7         private string _fingerprint;
 8         public DisableMultipleInstanceAttribute(string fingerprint = null)
 9         {
10             _fingerprint = fingerprint;
11         }
12 
13         public virtual string GetFingerprint(IReadOnlyList<object> methodArgs)
14         {
15             return _fingerprint;
16         }
17 
18         public MultiTenancySides MultiTenancySides { get; set; } = MultiTenancySides.Tenant;
19     }
20 
21     /// <summary>
22     /// 接口禁用重復任務排隊
23     /// </summary>
24     public class WebApiPullDisableMultipleInstanceAttribute: DisableMultipleInstanceAttribute
25     {
26         public override string GetFingerprint(IReadOnlyList<object> methodArgs)
27         {
28             return methodArgs[0].ToString();
29         }
30     }
View Code

在基類中添加特性

 

 

 

添加hangfire的JobFilterAttributeFilterProvider的實現CustomJobAttributeFilterProvider,重寫GetTypeAttributes方法,添加我們新增的特性

 1     protected override IEnumerable<JobFilterAttribute> GetTypeAttributes(Job job)
 2         {
 3             foreach (var attribute in ReflectedAttributeCache.GetTypeFilterAttributes(job.Type))
 4             {
 5                 if (attribute is CaptureContextAttribute)
 6                 {
 7                     yield return new CaptureContextMessageAttribute();
 8                 }
 9 
10                 if (attribute is AutomaticRetryAttribute automaticRetry)
11                 {
12                     yield return new AutoRetryAttribute() { Attempts = automaticRetry.AutomaticRetry };
13                 }
14 
15                 if (attribute is WebApiPullDisableMultipleInstanceAttribute apiPullDisable)
16                 {
17                     yield return new DisableMultipleQueuedItemsAttribute(apiPullDisable);
18                 }
19             }
20         }
View Code

在DisableMultipleQueuedItemsAttribute的OnCreating中調用_disableMultipleInstanceAttribute.GetFingerprint獲取分布式鎖的key

  1     /// <summary>
  2     /// 禁用多個排隊項目
  3     /// <remarks>同個任務取消並行執行,期間進來的任務不會等待,會被取消</remarks>
  4     /// </summary>
  5     public class DisableMultipleQueuedItemsAttribute : JobFilterAttribute, IClientFilter, IServerFilter
  6     {
  7         private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
  8         private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(4);//任務執行超時時間
  9 
 10         private readonly DisableMultipleInstanceAttribute _disableMultipleInstanceAttribute;
 11         public DisableMultipleQueuedItemsAttribute(DisableMultipleInstanceAttribute attribute)
 12         {
 13             _disableMultipleInstanceAttribute = attribute;
 14         }
 15 
 16         public void OnCreating(CreatingContext filterContext)
 17         {
 18             var fingerprintKey = _disableMultipleInstanceAttribute.GetFingerprint(filterContext.Job.Args);
 19             if (string.IsNullOrEmpty(fingerprintKey))
 20             {
 21                 throw new AppFatalExceptions("唯一鍵為空");
 22             }
 23             if (_disableMultipleInstanceAttribute.MultiTenancySides==MultiTenancySides.Tenant)
 24             {
 25                 var contextMessage = filterContext.GetJobParameter<ContextMessage>("_ld_contextMessage");
 26                 if (string.IsNullOrEmpty(contextMessage.TenantId))
 27                 {
 28                     throw new AppFatalExceptions("租戶Id為空");
 29                 }
 30                 fingerprintKey = $"{contextMessage.TenantId}:{fingerprintKey}";
 31             }
 32             if (!AddFingerprintIfNotExists(filterContext.Connection, fingerprintKey))
 33             {
 34                 filterContext.Canceled = true;
 35             }
 36         }
 37 
 38         public void OnPerformed(PerformedContext filterContext)
 39         {
 40             var fingerprintKey = _disableMultipleInstanceAttribute.GetFingerprint(filterContext.BackgroundJob.Job.Args);
 41             if (_disableMultipleInstanceAttribute.MultiTenancySides == MultiTenancySides.Tenant)
 42             {
 43                 var contextMessage = filterContext.GetJobParameter<ContextMessage>("_ld_contextMessage");
 44                 fingerprintKey = $"{contextMessage.TenantId}:{fingerprintKey}";
 45             }
 46             RemoveFingerprint(filterContext.Connection, fingerprintKey);
 47         }
 48 
 49         private static bool AddFingerprintIfNotExists(IStorageConnection connection, string fingerprintKey)
 50         {
 51             using (connection.AcquireDistributedLock(GetFingerprintLockKey(fingerprintKey), LockTimeout))
 52             {
 53                 var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(fingerprintKey));
 54 
 55                 if (fingerprint != null &&
 56                     fingerprint.ContainsKey("Timestamp") &&
 57                     DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out var timestamp) &&
 58                     DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
 59                 {
 60                     // 有任務還未執行完,並且沒有超時
 61                     return false;
 62                 }
 63 
 64                 // 沒有任務執行,或者該任務已超時
 65                 connection.SetRangeInHash(GetFingerprintKey(fingerprintKey), new Dictionary<string, string>
 66             {
 67                 { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
 68             });
 69 
 70                 return true;
 71             }
 72         }
 73 
 74         private static void RemoveFingerprint(IStorageConnection connection, string recurringJobId)
 75         {
 76             using (connection.AcquireDistributedLock(GetFingerprintLockKey(recurringJobId), LockTimeout))
 77             using (var transaction = connection.CreateWriteTransaction())
 78             {
 79                 transaction.RemoveHash(GetFingerprintKey(recurringJobId));
 80                 transaction.Commit();
 81             }
 82         }
 83 
 84         private static string GetFingerprintLockKey(string key)
 85         {
 86             return String.Format("{0}:lock", key);
 87         }
 88 
 89         private static string GetFingerprintKey(string key)
 90         {
 91             return String.Format("fingerprint:{0}", key);
 92         }
 93 
 94 
 95         void IClientFilter.OnCreated(CreatedContext filterContext)
 96         {
 97             
 98         }
 99 
100         void IServerFilter.OnPerforming(PerformingContext filterContext)
101         {
102         }
103 
104     }
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM