說下hangfire吧


最近因工作需要開發計划任務模塊(嚴格來說應該是修改bug吧,其他同事負責的)接觸到了Hangfire。早前聽同事說hangfire有點坑,懷着好奇,趁這兩天bug改的差不多了,在github上面down了hangfire源碼,下面分享一下,自己讀hangfire源碼的一些理解,和工作中需要注意的地方。介紹大概分為以下幾個部分吧。1.准備工作,2.簡單使用,3.源碼分析,4.避坑。需要說明一下接觸hangfire源碼的時間不長,也就幾天時間理解不到位,或者說錯了的,希望在評論指正。
准備工作:hangfire源代碼的代碼量不多,github地址: https://github.com/HangfireIO/Hangfire,有興趣的朋友可以自己下載瞅瞅源碼。功能上大概可以分為客戶端模式和服務端模式。用到的技術大概有Multi Thread、Expression、Dapper、Cron等。可以這么說,它的定時任務完全就是基於多線程協作實現的。因為是多線程環境,所以個人覺得看起來有點費力。
簡單使用:.Net&.Net Core環境都可以使用,下面就以.Net Core的使用為例。
1.客戶端和服務端獨立部署
client端
 1 public IServiceProvider ConfigureServices(IServiceCollection services)
 2         {
 3             // 其他代碼
 4              services.AddHangfire(config =>
 5             {
 6                  config.UseSqlServerStorage(...);
 7             });
 8         }
 9  
10 public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
11         {
12             // 其他代碼...
13             // 啟用Dashboard看板
14             app.UseHangfireDashboard();
15         }

 

server端
 1 public void Configuration(IAppBuilder app)
 2         {
 3             GlobalConfiguration.Configuration
 4                  .UseSqlServerStorage("連接字符串", new SqlServerStorageOptions
 5                 {
 6                     // options
 7                 });
 8             app.UseHangfireServer(new BackgroundJobServerOptions
 9             {
10             });
11         }
12  
13  
或者
1 services.AddHangfireServer(options =>
2                     {
3                         // 基於IHostedService接口實現
4                     });
PS:server端還有一種實現方式,實現IHostedService接口 其實跟上面的使用方法一樣的,注入到服務就ok,在程序啟動階段會自動執行IHostedService接口的兩個方法,可以簡單看下IHostedService接口的定義。
1   public interface IHostedService
2   {
3     Task StartAsync(CancellationToken cancellationToken);
4     Task StopAsync(CancellationToken cancellationToken);
5   }
接口就定義了兩個方法,start是在程序啟動的時候執行,當然stop就是在程序停止的時候執行。我們用一張圖簡單描繪一下它的執行時機,圖是盜的。
以上就是hangfire的client端和server端分開部署的一個簡單應用,下面我們看下第二種,client&server部署在同一台機器上。
2.客戶端和服務端統一部署
1 public void Configuration(IAppBuilder app)
2 {
3     GlobalConfiguration.Configuration.UseSqlServerStorage(); // 配置數據庫連接
4     
5     app.UseHangfireServer(); // 啟用server
6     app.UseHangfireDashboard(); // 啟用看板
7 }

 

簡單的幾行代碼,當然我也只會簡單的用法。以上服務注入並執行,接下來就是往hangfire里面添加任務。
1 BackgroundJob.Enqueue(() => Console.WriteLine("Simple!")); // 立即執行
2 BackgroundJob.Schedule(() => Console.WriteLine("Reliable!"), TimeSpan.FromDays(7)); // 延后執行
3 RecurringJob.AddOrUpdate(() => Console.WriteLine("Transparent!"), Cron.Daily); // 循環執行,支持cron表達式
簡單使用就到這吧,我們繼續大綱的第三部分,源碼分析。
 
源碼分析
客戶端模式就不用說了,說白了就是往hangfire數據庫里面寫任務,我們主要是看看服務端的執行原理。我們先找到入口,也可以看做是NetCore里面的一個中間件吧。看代碼
1 app.UseHangfireServer(); // 啟用server
UseHangfireServer實現
 1 public static IAppBuilder UseHangfireServer(
 2             [NotNull] this IAppBuilder builder,
 3             [NotNull] JobStorage storage,
 4             [NotNull] BackgroundJobServerOptions options,
 5             [NotNull] params IBackgroundProcess[] additionalProcesses)
 6         {
 7             // 其他代碼...
 8             var server = new BackgroundJobServer(options, storage,  additionalProcesses); 
 9             
10             return builder;
11         }

 

UseHangfireServer擴展方法實現里面,比較重要的一行代碼就是創建BackgroundJobServer,BackgroundJobServer實現了IBackgroundProcessingServer接口,server的啟動也就是間接在它的構造器里面完成的。我們不妨先瞅瞅IBackgroundProcessingServer接口和BackgroundJobServer類的定義。
 1 // IBackgroundProcessingServer
 2 public interface IBackgroundProcessingServer : IDisposable
 3     {
 4         void SendStop();
 5         bool WaitForShutdown(TimeSpan timeout);
 6         Task WaitForShutdownAsync(CancellationToken cancellationToken);
 7     }
 8  
 9 // BackgroundJobServer
10 public class BackgroundJobServer : IBackgroundProcessingServer
11     {
12         // 其他成員...
13         public BackgroundJobServer(
14             [NotNull] BackgroundJobServerOptions options,
15             [NotNull] JobStorage storage,
16             [NotNull] IEnumerable<IBackgroundProcess> additionalProcesses,
17             [CanBeNull] IJobFilterProvider filterProvider,
18             [CanBeNull] JobActivator activator,
19             [CanBeNull] IBackgroundJobFactory factory,
20             [CanBeNull] IBackgroundJobPerformer performer,
21             [CanBeNull] IBackgroundJobStateChanger stateChanger)
22         {
23             // 其他代碼
24             var processes = new List<IBackgroundProcessDispatcherBuilder>();
25             processes.AddRange(GetRequiredProcesses(filterProvider, activator,  factory, performer, stateChanger));
26             processes.AddRange(additionalProcesses.Select(x =>  x.UseBackgroundPool(1)));
27             var properties = new Dictionary<string, object>
28             {
29                 { "Queues", options.Queues },
30                 { "WorkerCount", options.WorkerCount }
31             };
32             
33             _processingServer = new BackgroundProcessingServer(
34                 storage,
35                 processes,
36                 properties,
37                 GetProcessingServerOptions());
38         }
39         public void SendStop()
40         {
41         }
42         public void Dispose()
43         {
44         }
45         [Obsolete("This method is a stub. There is no need to call the `Start`  method. Will be removed in version 2.0.0.")]
46         public void Start()
47         {
48         }
49         [Obsolete("Please call the `Shutdown` method instead. Will be removed in  version 2.0.0.")]
50         public void Stop()
51         {
52         }
53         [Obsolete("Please call the `Shutdown` method instead. Will be removed in  version 2.0.0.")]
54         public void Stop(bool force)
55         {
56         }
57         public bool WaitForShutdown(TimeSpan timeout)
58         {
59         }
60         public Task WaitForShutdownAsync(CancellationToken cancellationToken)
61         {
62         }

 

IBackgroundProcessingServer接口里面的這幾個方法都是跟停用server,取消任務清理資源相關的。BackgroundJobServer類里面真正完成接口的實現是由BackgroundProcessingServer類型的同名函數實現,這個對象是在構造函數里面初始化的,在初始化BackgroundProcessingServer類型的同時,創建了若干IBackgroundProcessDispatcherBuilder實現類BackgroundProcessDispatcherBuilder的實例,hangfire默認實現了7種dispatcher,我們任務、日志、心跳等等獨立線程都是由它的Create方法完成,這個地方不算server啟動主線,會在后面細說。我們繼續看看BackgroundProcessingServer這個類型。這里需要注意的是里面有幾個方法好像是被停用了,start、stop等方法,官方也注釋了,被刪除了。start方法被停用了,難道我們的server啟動是在BackgroundProcessingServer類型里面?繼續看BackgroundProcessingServer的定義。
 1 public sealed class BackgroundProcessingServer : IBackgroundProcessingServer
 2     {
 3         // 其他成員
 4         internal BackgroundProcessingServer(
 5             [NotNull] BackgroundServerProcess process,
 6             [NotNull] BackgroundProcessingServerOptions options)
 7         {
 8             _process = process ?? throw new ArgumentNullException(nameof(process));
 9             _options = options ?? throw new ArgumentNullException(nameof(options));
10             _dispatcher = CreateDispatcher();
11 #if !NETSTANDARD1_3
12             AppDomain.CurrentDomain.DomainUnload += OnCurrentDomainUnload;
13             AppDomain.CurrentDomain.ProcessExit += OnCurrentDomainUnload;
14 #endif
15         }
16         public void SendStop()
17         {
18         }
19         public bool WaitForShutdown(TimeSpan timeout)
20         {
21         }
22         public async Task WaitForShutdownAsync(CancellationToken cancellationToken)
23         {
24         }
25         public void Dispose()
26         {
27             
28         }
29         private void OnCurrentDomainUnload(object sender, EventArgs args)
30         {
31             
32         }
33         private IBackgroundDispatcher CreateDispatcher()
34         {
35             var execution = new BackgroundExecution(
36                 _stoppingCts.Token,
37                 new BackgroundExecutionOptions
38                 {
39                     Name = nameof(BackgroundServerProcess),
40                     ErrorThreshold = TimeSpan.Zero,
41                     StillErrorThreshold = TimeSpan.Zero,
42                     RetryDelay = retry => _options.RestartDelay
43                 });
44             return new BackgroundDispatcher(
45                 execution,
46                 RunServer,
47                 execution,
48                 ThreadFactory);
49         }
50         private void RunServer(Guid executionId, object state)
51         {
52             _process.Execute(executionId, (BackgroundExecution)state,  _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
53         }
54         private static IEnumerable<Thread> ThreadFactory(ThreadStart threadStart)
55         {
56             yield return new Thread(threadStart)
57             {
58                 IsBackground = true,
59                 Name = $"{nameof(BackgroundServerProcess)}  #{Interlocked.Increment(ref _lastThreadId)}",
60             };
61         }
62     }

 

果不其然,server的啟動快要揭開神秘的面紗了,RunServer?翻譯過來應該是啟動服務吧,我們暫且不去管他,先記一下這個有個runserver,我們繼續跟蹤。在構造函數里面調用了一個CreateDispatcher()的方法,我們看下它的實現
 1 private IBackgroundDispatcher CreateDispatcher()
 2         {
 3             var execution = new BackgroundExecution(
 4                 _stoppingCts.Token,
 5                 new BackgroundExecutionOptions
 6                 {
 7                     Name = nameof(BackgroundServerProcess),
 8                     ErrorThreshold = TimeSpan.Zero,
 9                     StillErrorThreshold = TimeSpan.Zero,
10                     RetryDelay = retry => _options.RestartDelay
11                 });
12             return new BackgroundDispatcher(
13                 execution,
14                 RunServer,
15                 execution,
16                 ThreadFactory);
17         }

 

在CreateDispatcher方法里面返回了一個BackgroundDispatcher,字面意思好像是后台分發器,並且指定了回調runserver,BackgroundDispatcher實現了IBackgroundDispatcher接口,我們先看下它們的定義。
 1 // IBackgroundDispatcher
 2 public interface IBackgroundDispatcher : IDisposable
 3     {
 4         bool Wait(TimeSpan timeout);
 5         Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken);
 6     }
 7  
 8 // BackgroundDispatcher
 9 internal sealed class BackgroundDispatcher : IBackgroundDispatcher
10     {
11         // 其他成員
12         public BackgroundDispatcher(
13             [NotNull] IBackgroundExecution execution,
14             [NotNull] Action<Guid, object> action,
15             [CanBeNull] object state,
16             [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory)
17         {
18             if (threadFactory == null) throw new  ArgumentNullException(nameof(threadFactory));
19             _execution = execution ?? throw new  ArgumentNullException(nameof(execution));
20             _action = action ?? throw new ArgumentNullException(nameof(action));
21             _state = state;
22 #if !NETSTANDARD1_3
23             AppDomainUnloadMonitor.EnsureInitialized();
24 #endif
25             var threads = threadFactory(DispatchLoop)?.ToArray();
26             if (threads == null || threads.Length == 0)
27             {
28                 throw new ArgumentException("At least one unstarted thread should be  created.", nameof(threadFactory));
29             }
30             if (threads.Any(thread => thread == null || (thread.ThreadState &  ThreadState.Unstarted) == 0))
31             {
32                 throw new ArgumentException("All the threads should be non-null and  in the ThreadState.Unstarted state.", nameof(threadFactory));
33             }
34             _stopped = new CountdownEvent(threads.Length);
35             foreach (var thread in threads)
36             {
37                 thread.Start();
38             }
39         }
40         public bool Wait(TimeSpan timeout)
41         {
42             return _stopped.WaitHandle.WaitOne(timeout);
43         }
44         public async Task WaitAsync(TimeSpan timeout, CancellationToken  cancellationToken)
45         {
46             await _stopped.WaitHandle.WaitOneAsync(timeout,  cancellationToken).ConfigureAwait(false);
47         }
48         public void Dispose()
49         {
50         }
51         public override string ToString()
52         {
53         }
54         private void DispatchLoop()
55         {
56             try
57             {
58                 _execution.Run(_action, _state);
59             }
60             catch (Exception ex)
61             {
62  
63             }
64             finally
65             {
66                 try
67                 {
68                     _stopped.Signal();
69                 }
70                 catch (ObjectDisposedException)
71                 {
72  
73                 }
74             }
75         }
76     }

 

從IBackgroundDispatcher接口的定義來看,分發器應該是負責協調資源處理,我們具體看看BackgroundDispatcher的實現。以上代碼就是server的啟動執行核心代碼並且我以加粗,其實就是啟動線程Loop執行。在DispatchLoop方法里面間接調用了我上面說的runserver方法。在runserver方法里面實現了整個server端的初始化工作。我們接着看DispatchLoop方法的實現 ,在這個方法里面調用了IBackgroundExecution接口的run方法,繼續IBackgroundExecution接口的定義。
1 public interface IBackgroundExecution : IDisposable
2     {
3         void Run([NotNull] Action<Guid, object> callback, [CanBeNull] object  state);
4         Task RunAsync([NotNull] Func<Guid, object, Task> callback, [CanBeNull]  object state);
5     }

 

就兩方法,run包含同步和異步,看看它的唯一實現類BackgroundExecution。
 1   internal sealed class BackgroundExecution : IBackgroundExecution
 2     {
 3                 // 其他成員
 4         public void Run(Action<Guid, object> callback, object state)
 5         {
 6             if (callback == null) throw new ArgumentNullException(nameof(callback));
 7             var executionId = Guid.NewGuid();
 8            
 9             {
10 #if !NETSTANDARD1_3
11                 try
12 #endif
13                 {
14                     HandleStarted(executionId, out var nextDelay);
15                     while (true)
16                     {
17                         // Don't place anything here.
18                         try
19                         {
20                            
21                             if (StopRequested) break;
22                             if (nextDelay > TimeSpan.Zero)
23                             {
24                                 HandleDelay(executionId, nextDelay);
25                             }
26                             callback(executionId, state);
27                             HandleSuccess(out nextDelay);
28                         }
29 #if !NETSTANDARD1_3
30                         catch (ThreadAbortException) when  (AppDomainUnloadMonitor.IsUnloading)
31                         {
32                             // Our thread is aborted due to AppDomain unload. It's  better to give up to
33                             // not to cause the host to be more aggressive.
34                             throw;
35                         }
36 #endif
37                         catch (OperationCanceledException) when (StopRequested)
38                         {
39                             break;
40                         }
41                         catch (Exception ex)
42                         {
43                             HandleException(executionId, ex, out nextDelay);
44                         }
45                     }
46                     HandleStop(executionId);
47                 }
48 #if !NETSTANDARD1_3
49                 catch (ThreadAbortException ex)
50                 {
51                     HandleThreadAbort(executionId, ex);
52                 }
53 #endif
54             }
55         }
56 }

 

hangfire里面所有的獨立線程都是通過run方法執行,然后回調到自己的實現類Execute方法,自此每個獨立的功能線程就循環干着自己獨立的工作(這個后面會單獨分析RecurringJobScheduler)。繼續我們的主線,server啟動,我們以run的同步方法為例,第一個線程(我們就叫它主線程吧)啟動了一個while循環,在循環里面並且callback調用了我們的runserver方法。
    
1 private void RunServer(Guid executionId, object state)
2         {
3             _process.Execute(executionId, (BackgroundExecution)state,  _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
4         }

 

在runserver方法里面的實現很簡單,直接調用了_process的execute方法,我們簡單看下_process類型IBackgroundServerProcess的定義。
1 internal interface IBackgroundServerProcess
2     {
3         void Execute(
4             Guid executionId,
5             BackgroundExecution execution,
6             CancellationToken stoppingToken,
7             CancellationToken stoppedToken,
8             CancellationToken shutdownToken);
9     }

 

IBackgroundServerProcess的定義就一個execute方法,這個接口的工作其實就是初始化server服務端,我們看看它的唯一實現類BackgroundServerProcess。
  1 internal sealed class BackgroundServerProcess : IBackgroundServerProcess
  2     {
  3         
  4         // 其他成員
  5         public BackgroundServerProcess(
  6             [NotNull] JobStorage storage,
  7             [NotNull] IEnumerable<IBackgroundProcessDispatcherBuilder> dispatcherBuilders,
  8             [NotNull] BackgroundProcessingServerOptions options,
  9             [NotNull] IDictionary<string, object> properties)
 10         {
 11             if (dispatcherBuilders == null) throw new ArgumentNullException(nameof(dispatcherBuilders));
 12  
 13  
 14             _storage = storage ?? throw new ArgumentNullException(nameof(storage));
 15             _options = options ?? throw new ArgumentNullException(nameof(options));
 16             _properties = properties ?? throw new ArgumentNullException(nameof(properties));
 17  
 18  
 19             var builders = new List<IBackgroundProcessDispatcherBuilder>();
 20             builders.AddRange(GetRequiredProcesses()); // 添加默認的工作dispatcher也就是獨立線程
 21             builders.AddRange(GetStorageComponents());
 22             builders.AddRange(dispatcherBuilders);
 23  
 24  
 25             _dispatcherBuilders = builders.ToArray();
 26         }
 27  
 28  
 29         public void Execute(Guid executionId, BackgroundExecution execution, CancellationToken stoppingToken,
 30             CancellationToken stoppedToken, CancellationToken shutdownToken)  // server初始化
 31         {
 32             var serverId = GetServerId();
 33             Stopwatch stoppedAt = null;
 34  
 35  
 36             void HandleRestartSignal()
 37             {
 38                 if (!stoppingToken.IsCancellationRequested)
 39                 {
 40                     _logger.Info($"{GetServerTemplate(serverId)} caught restart signal...");
 41                 }
 42             }
 43             using (var restartCts = new CancellationTokenSource())
 44             using (var restartStoppingCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, restartCts.Token))
 45             using (var restartStoppedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppedToken, restartCts.Token))
 46             using (var restartShutdownCts = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken, restartCts.Token))
 47             using (restartStoppingCts.Token.Register(HandleStopRestartSignal))
 48             using (stoppingToken.Register(HandleStoppingSignal))
 49             using (stoppedToken.Register(HandleStoppedSignal))
 50             using (shutdownToken.Register(HandleShutdownSignal))
 51             using (restartCts.Token.Register(HandleRestartSignal))
 52             {
 53                 var context = new BackgroundServerContext(
 54                     serverId,
 55                     _storage,
 56                     _properties,
 57                     restartStoppingCts.Token,
 58                     restartStoppedCts.Token,
 59                     restartShutdownCts.Token);
 60                 var dispatchers = new List<IBackgroundDispatcher>();
 61                 CreateServer(context);
 62                 try
 63                 {
 64                     // ReSharper disable once AccessToDisposedClosure
 65                     using (var heartbeat = CreateHeartbeatProcess(context, () => restartCts.Cancel())) // 創建守護線程
 66                     {
 67                         StartDispatchers(context, dispatchers); // 啟動hangfire默認初始化的所有獨立任務線程
 68                         execution.NotifySucceeded();
 69                         WaitForDispatchers(context, dispatchers);
 70  
 71  
 72                         restartCts.Cancel();
 73  
 74                         heartbeat.WaitAsync(Timeout.InfiniteTimeSpan, shutdownToken).GetAwaiter().GetResult();
 75                     }
 76                 }
 77                 finally
 78                 {
 79                     DisposeDispatchers(dispatchers);
 80                     ServerDelete(context, stoppedAt);
 81                 }
 82             }
 83         }
 84  
 85  
 86         private IBackgroundDispatcher CreateHeartbeatProcess(BackgroundServerContext context, Action requestRestart) // 創建守護線程
 87         {
 88             return new ServerHeartbeatProcess(_options.HeartbeatInterval, _options.ServerTimeout, requestRestart)
 89                 .UseBackgroundPool(threadCount: 1)
 90                 .Create(context, _options);
 91         }
 92  
 93  
 94         private IEnumerable<IBackgroundProcessDispatcherBuilder> GetRequiredProcesses() // 初始化日志和任務監控線程
 95         {
 96             yield return new ServerWatchdog(_options.ServerCheckInterval, _options.ServerTimeout).UseBackgroundPool(threadCount: 1);
 97             yield return new ServerJobCancellationWatcher(_options.CancellationCheckInterval).UseBackgroundPool(threadCount: 1);
 98         }
 99         private string GetServerId() // 獲取serverid
100         {
101             var serverName = _options.ServerName
102                  ?? Environment.GetEnvironmentVariable("COMPUTERNAME")
103                  ?? Environment.GetEnvironmentVariable("HOSTNAME");
104             var guid = Guid.NewGuid().ToString();
105  
106             return !String.IsNullOrWhiteSpace(serverName) ? $"{serverName.ToLowerInvariant()}:{guid}" : guid;
107         }
108  
109         
110         private void CreateServer(BackgroundServerContext context) // 創建server,寫入Server數據表
111         {
112             var stopwatch = Stopwatch.StartNew();
113             using (var connection = _storage.GetConnection())
114             {
115                 connection.AnnounceServer(context.ServerId, GetServerContext(_properties));
116             }
117             stopwatch.Stop();
118  
119  
120             ServerJobCancellationToken.AddServer(context.ServerId);
121             _logger.Info($"{GetServerTemplate(context.ServerId)} successfully announced in {stopwatch.Elapsed.TotalMilliseconds} ms");
122         }
123  
124  
125         private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) // 啟動所有獨立的任務線程,包括我們的隊列計划、循環計划、日志、守護等等線程
126         {
127  
128             foreach (var dispatcherBuilder in _dispatcherBuilders)
129             {
130                 dispatchers.Add(dispatcherBuilder.Create(context, _options));
131             }
132         }
133  
134     }

 

以上代碼我有做精簡處理,不要糾結里面的實現,代碼注釋也比較詳細。下面我做一個簡單的總結吧,第一個線程(暫時叫主線程吧)從startup里面調用usehangfireserver擴展方法-》啟動一個新的worker線程用於初始化&啟動server-》主程返回-》啟動hangfire所有任務線程-》創建的第一個worker線程掛起(用於處理所有任務線程的資源釋放)。server的初始化工作大概就是這些,下面詳細看看hangfire的任務線程的執行原理,這里我們以RecurringJobScheduler循環任務為例。
 
RecurringJobScheduler實現機制
還記得上面提到的7個dispatcher任務線程的創建嗎?這7個默認的任務線程初始化就發生在上面加粗的代碼里面StartDispatchers方法,我們看代碼。
1 private void StartDispatchers(BackgroundServerContext context,  ICollection<IBackgroundDispatcher> dispatchers)
2         {
3                // 其他代碼...
4             foreach (var dispatcherBuilder in _dispatcherBuilders)
5             {
6                 dispatchers.Add(dispatcherBuilder.Create(context, _options)); // 初始化獨立任務線程
7             }
8         }

 

遍歷_dispatcherBuilders數組,7種任務類型,分別調用它們的Create方法。繼續看create方法。
   
 1  public IBackgroundDispatcher Create(BackgroundServerContext context,  BackgroundProcessingServerOptions options) // 第一步
 2         {
 3             // 其他代碼
 4             var execution = new BackgroundExecution(
 5                 context.StoppingToken,
 6                 new BackgroundExecutionOptions
 7                 {
 8                     Name = _process.GetType().Name,
 9                     RetryDelay = options.RetryDelay
10                 }); // 定義自己的execution
11             return new BackgroundDispatcher( // 創建BackgroundDispatcher 
12                 execution,
13                 ExecuteProcess, // 指定回調
14                 Tuple.Create(_process, context, execution), // 創建三元組上下文,注意一下1元組這個對象
15                 _threadFactory);
16         }
17  
18 public BackgroundDispatcher(  // 第二步
19             [NotNull] IBackgroundExecution execution,
20             [NotNull] Action<Guid, object> action,
21             [CanBeNull] object state,
22             [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory)
23         {
24    
25             _state = state;
26  
27             var threads = threadFactory(DispatchLoop)?.ToArray();
28            
29             foreach (var thread in threads)
30             {
31                 thread.Start(); // 執行線程
32             }
33         }
34  
35 private void DispatchLoop() // 第三步
36         {
37             try
38             {
39                 _execution.Run(_action, _state);  // 在run里面回調_action
40             }
41             catch (Exception ex)
42             {
43             }
44             finally
45             {
46                 try
47                 {
48                     _stopped.Signal();
49                 }
50                 catch (ObjectDisposedException)
51                 {
52                 }
53             }
54         }
55  
56 private static void ExecuteProcess(Guid executionId, object state) // 第四步 回調方法,對應上面的指定回調
57         {
58             var tuple = (Tuple<IBackgroundProcess, BackgroundServerContext,  BackgroundExecution>)state;
59             var serverContext = tuple.Item2;
60             var context = new BackgroundProcessContext( // 創建公共上下文
61                 serverContext.ServerId,
62                 serverContext.Storage,
63                 serverContext.Properties.ToDictionary(x => x.Key, x => x.Value),
64                 executionId,
65                 serverContext.StoppingToken,
66                 serverContext.StoppedToken,
67                 serverContext.ShutdownToken);
68             while (!context.IsStopping)
69             {
70                 tuple.Item1.Execute(context); // 執行自己元組對應的實例
71                 tuple.Item3.NotifySucceeded();
72             }
73         }

 

上面有點亂啊,我大概簡單串起來說一下。第一步在create方法里面創建了BackgroundDispatcher並指定了元組參數-》第二步綁定線程的執行函數Loop並且執行-》第三步執行Loop並且回調_action委托-》第四步_action參數對應的函數地址就是ExecuteProcess,最后在ExecuteProcess里面通過元組參數調用對應的任務類型,自此7種任務類型啟動並開始工作。以上代碼還有個細節需要說明一下,Tuple.Create(_process, context, execution)。元組的第一個參數,其類型為IBackgroundProcess,看下定義。
1 public interface IBackgroundProcess : IServerProcess
2     {
3         void Execute([NotNull] BackgroundProcessContext context);
4     }

 

接口就定義了一個方法,沒什么特別的,但是它的幾個實現類就是我們單獨的任務類,我們下面要說的RecurringJobScheduler循環任務類也實現了這個接口。到此我們的RecurringJobScheduler循環定時任務線程就算開始執行了。
RecurringJobScheduler循環定時任務機制
照舊看下這個類型的定義
 1 public class RecurringJobScheduler : IBackgroundProcess
 2     {
 3         // 其他代碼
 4         public RecurringJobScheduler(
 5             [NotNull] IBackgroundJobFactory factory,
 6             TimeSpan pollingDelay,
 7             [NotNull] ITimeZoneResolver timeZoneResolver,
 8             [NotNull] Func<DateTime> nowFactory)
 9         {
10             if (factory == null) throw new ArgumentNullException(nameof(factory));
11             if (nowFactory == null) throw new ArgumentNullException(nameof(nowFactory));
12             if (timeZoneResolver == null) throw new ArgumentNullException(nameof(timeZoneResolver));
13  
14  
15             _factory = factory;
16             _nowFactory = nowFactory;
17             _timeZoneResolver = timeZoneResolver;
18             _pollingDelay = pollingDelay;
19             _profiler = new SlowLogProfiler(_logger);
20         }
21  
22  
23         /// <inheritdoc />
24         public void Execute(BackgroundProcessContext context) // 實現方法
25         {
26             if (context == null) throw new ArgumentNullException(nameof(context));
27  
28  
29             var jobsEnqueued = 0;
30  
31  
32             while (EnqueueNextRecurringJobs(context)) // 從數據庫獲取定時任務
33             {
34                 jobsEnqueued++;
35  
36  
37                 if (context.IsStopping)
38                 {
39                     break;
40                 }
41             }
42  
43  
44             if (jobsEnqueued != 0)
45             {
46                 _logger.Debug($"{jobsEnqueued} recurring job(s) enqueued.");
47             }
48  
49  
50             if (_pollingDelay > TimeSpan.Zero)
51             {
52                 context.Wait(_pollingDelay);
53             }
54             else
55             {
56                 var now = _nowFactory();
57                 context.Wait(now.AddMilliseconds(-now.Millisecond).AddSeconds(-now.Second).AddMinutes(1) - now);
58             }
59         }
60     }

 

承上,調用元組的第一個參數的execute方法,RecurringJobScheduler的execute方法得以執行,該方法就干一件事,每隔15秒從數據庫獲取待執行的計划,每次1000條數據。通過EnqueueNextRecurringJobs方法獲取任務。
 1 private bool EnqueueNextRecurringJobs(BackgroundProcessContext context)
 2         {
 3             return UseConnectionDistributedLock(context.Storage, connection => 
 4             {
 5                 var result = false;
 6                 if (IsBatchingAvailable(connection))
 7                 {
 8                     var now = _nowFactory();
 9                     var timestamp = JobHelper.ToTimestamp(now);
10                     var recurringJobIds =  ((JobStorageConnection)connection).GetFirstByLowestScoreFromSet("recurring-jobs", 0,  timestamp, BatchSize); // 從數據庫里面查詢
11                     if (recurringJobIds == null || recurringJobIds.Count == 0) return  false;
12                     foreach (var recurringJobId in recurringJobIds)
13                     {
14                         if (context.IsStopping) return false;
15                         if (TryEnqueueBackgroundJob(context, connection, recurringJobId,  now))// 排隊執行
16                         {
17                             result = true;
18                         }
19                     }
20                 }
21                 else
22                 {
23                     for (var i = 0; i < BatchSize; i++)
24                     {
25                         if (context.IsStopping) return false;
26                         var now = _nowFactory();
27                         var timestamp = JobHelper.ToTimestamp(now);
28                         var recurringJobId =  connection.GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp);
29                         if (recurringJobId == null) return false;
30                         if (!TryEnqueueBackgroundJob(context, connection, recurringJobId,  now))
31                         {
32                             return false;
33                         }
34                     }
35                 }
36                 return result;
37             });
38         }

 

GetFirstByLowestScoreFromSet方法從數據庫Set表里面查詢top1000數據,條件是key為recurring-jobs字符串(表示定時任務)並且 時間范圍是0到當前時間。隨后遍歷這些jobids,排隊執行,往下看TryEnqueueBackgroundJob方法的實現。
 1 private bool EnqueueBackgroundJob(
 2             BackgroundProcessContext context,
 3             IStorageConnection connection,
 4             string recurringJobId,
 5             DateTime now)
 6         {
 7             // 其他代碼
 8             using (connection.AcquireDistributedRecurringJobLock(recurringJobId,  LockTimeout))
 9             {
10                 try
11                 {
12                     var recurringJob = connection.GetRecurringJob(recurringJobId,  _timeZoneResolver, now);
13                     if (recurringJob == null)
14                     {
15                         using (var transaction = connection.CreateWriteTransaction())
16                         {
17                             transaction.RemoveFromSet("recurring-jobs", recurringJobId);
18                             transaction.Commit();
19                         }
20                         return false;
21                     }
22           
23                     BackgroundJob backgroundJob = null;
24                     IReadOnlyDictionary<string, string> changedFields;
25                     if (recurringJob.TrySchedule(out var nextExecution, out var error))
26                     {
27                         if (nextExecution.HasValue && nextExecution <= now)
28                         {
29                             backgroundJob = _factory.TriggerRecurringJob(context.Storage,  connection, _profiler, recurringJob, now);
30                             if (String.IsNullOrEmpty(backgroundJob?.Id))
31                             {
32                                 _logger.Debug($"Recurring job '{recurringJobId}' execution  at '{nextExecution}' has been canceled.");
33                             }
34                         }
35                         recurringJob.IsChanged(out changedFields, out nextExecution);
36                     }
37                     else if (recurringJob.RetryAttempt < MaxRetryAttemptCount)
38                     {
39                         var delay = _pollingDelay > TimeSpan.Zero ? _pollingDelay :  TimeSpan.FromMinutes(1);
40                         
41                         _logger.WarnException($"Recurring job '{recurringJobId}' can't be  scheduled due to an error and will be retried in {delay}.", error);
42                         recurringJob.ScheduleRetry(delay, out changedFields, out  nextExecution);
43                     }
44                     else
45                     {
46                         _logger.ErrorException($"Recurring job '{recurringJobId}' can't be  scheduled due to an error and will be disabled.", error);
47                         recurringJob.Disable(error, out changedFields, out nextExecution);
48                     }
49               
50                     using (var transaction = connection.CreateWriteTransaction())
51                     {
52                         if (backgroundJob != null)
53                         {
54                             _factory.StateMachine.EnqueueBackgroundJob(
55                                 context.Storage,
56                                 connection,
57                                 transaction,
58                                 recurringJob,
59                                 backgroundJob,
60                                 "Triggered by recurring job scheduler",
61                                 _profiler);
62                         }
63                         transaction.UpdateRecurringJob(recurringJob, changedFields,  nextExecution, _logger);
64                         transaction.Commit();
65                         return true;
66                     }
67                 }
68                 catch (TimeZoneNotFoundException ex)
69                 {
70                 catch (Exception ex)
71                 {
72    
73                 }
74                 return false;
75             }
76         }

 

需要注意的地方我都有加粗,該方法大概流程是:1.GetRecurringJob根據jobid從Hash表里面查詢一條完整的定時任務,2.TrySchedule獲取該任務的下次執行時間,如果下次執行時間小於當前,執行這條任務(並非真正執行定時任務,只是往job表里面寫數據,真正執行任務由worker完成),3.獲取下次執行時間&所有任務字段,4.狀態機修改任務狀態。定時任務就這樣周而復始的重復執行以上流程。這里簡單說下worker的執行機制,其實際就是輪詢檢索job表里面的數據執行任務表達式樹,worker在hangfire里面默認開啟了20個線程。第三部分就到這吧。
 
避坑
簡單說下個人在改bug期間遇到的一些問題啊。
1.時區問題,在添加定時任務時如果不指定時區信息,默認使用的是utc時間,我們中國是東8區,也就是說解析出來的執行時間會晚8個小時執行。解決辦法有幾種可以通過全局指定options的ITimeZoneResolver屬性指定,也可以通過AddorUpdate方法指定,如果是指定時區信息,需要注意看板上面的異常信息,如果有異常會導致任務不執行,時區信息它是從系統里面檢索出來的,沒有就拋異常。就這樣吧。
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM