HangFire的拓展和使用
https://www.cnblogs.com/gnsilence/p/10665951.html
最近由於之前的任務調度框架總出現問題,因此想尋找一個替代品,之前使用的是Quartz.Net,這個框架方便之處就是支持cron表達式適合復雜日期場景使用,以及秒級任務。但是配置比較復雜,而且管理不方便,自己開發了個web管理頁面,不過這個需要額外的單獨線程去統一管理工作狀態,很容易出現問題。
有考慮過 “FluentScheduler” ,使用簡單,但是管理配置也很麻煩,我希望能做到配置簡單,管理方便,高性能。最后想到了以前聽過的hangfire,它的好處就是自帶控制面板,在園子里看了很多相關資料,偶然發現了有人拓展過hangfire通過調用api接口來執行任務,這種方式可以避免依賴本地代碼,方便部署,在此基礎上,我用空閑時間拓展了一下現在已經基本可以滿足需求。
所拓展的功能全部屬於外部拓展,因此hangfire版本可以一直更新,現在已經更新最新版,支持秒級任務
由於更新到最新版hangfire 1.7支持秒級任務,使用的在線表達式生成部分表達式有問題,注掉了秒級任務表達式生成,有時間需要詳細測試更改,可以參考(hangfire官方提供的表達式)
現在已經實現的功能有:
1,部署及調試:只需要配置數據庫連接,然后編譯即可運行,無需建表,支持(redis,mysql, sqlserver)其他數據庫暫時用不到沒測試。推薦使用redis集群。項目中直接添加了redis的存儲包,已經更新StackExchange.Redis到最新版本方便拓展,調試時可以直接調試。部署,只需要發布項目,運行創建windows服務的bat命令,命令已經包含在項目中,或者發布至Linux。
2,周期任務:支持在控制面板頁面上添加周期任務,編輯周期任務,刪除周期任務,手動觸發周期任務,暫停和繼續周期任務(暫停實現的原理是通過set中添加屬性,在job執行前,過濾掉,直接跳過執行,因為hangfire中job一旦創建就失去了控制權,只能通過過濾器去攔截),任務暫停后會查詢狀態並渲染面板列表為紅色字體方便查找哪個任務被暫停。



3,計划任務:在作業選項卡中,計划作業中可以實現添加計划任務,計划任務可以使任務在指定的分鍾后執行,只執行一次。
4,只讀面板:通過配置的用戶名密碼,使用戶只具有讀取面板的權限,這樣可以防止誤操作
1 //只讀面板,只能讀取不能操作
2 app.UseHangfireDashboard("/job-read", new DashboardOptions
3 {
4 AppPath = "#",//返回時跳轉的地址
5 DisplayStorageConnectionString = false,//是否顯示數據庫連接信息
6 IsReadOnlyFunc = Context =>
7 {
8 return true;
9 },
10 Authorization = new[] { new BasicAuthAuthorizationFilter(new BasicAuthAuthorizationFilterOptions
11 {
12 RequireSsl = false,//是否啟用ssl驗證,即https
13 SslRedirect = false,
14 LoginCaseSensitive = true,
15 Users = new []
16 {
17 new BasicAuthAuthorizationUser
18 {
19 Login = "read",
20 PasswordClear = "only"
21 },
22 new BasicAuthAuthorizationUser
23 {
24 Login = "test",
25 PasswordClear = "123456"
26 },
27 new BasicAuthAuthorizationUser
28 {
29 Login = "guest",
30 PasswordClear = "123@123"
31 }
32 }
33 })
34 }
35 });
5,郵件推送:目前使用的方式是,任務錯誤重試達到指定次數后,發送郵件通知,使用的MailKit
1 catch (Exception ex)
2 {
3 //獲取重試次數
4 var count = context.GetJobParameter<string>("RetryCount");
5 context.SetTextColor(ConsoleTextColor.Red);
6 //signalR推送
7 //SendRequest(ConfigSettings.Instance.URL+"/api/Publish/EveryOne", "測試");
8 if (count == "3")//重試達到三次的時候發郵件通知
9 {
10 SendEmail(item.JobName, item.Url, ex.ToString());
11 }
12 logger.Error(ex, "HttpJob.Excute");
13 context.WriteLine($"執行出錯:{ex.Message}");
14 throw;//不拋異常不會執行重試操作
15 }
1 /// <summary>
2 /// 郵件模板
3 /// </summary>
4 /// <param name="jobname"></param>
5 /// <param name="url"></param>
6 /// <param name="exception"></param>
7 /// <returns></returns>
8 private static string SethtmlBody(string jobname, string url, string exception)
9 {
10 var htmlbody = $@"<h3 align='center'>{HangfireHttpJobOptions.SMTPSubject}</h3>
11 <h3>執行時間:</h3>
12 <p>
13 {DateTime.Now}
14 </p>
15 <h3>
16 任務名稱:<span> {jobname} </span><br/>
17 </h3>
18 <h3>
19 請求路徑:{url}
20 </h3>
21 <h3><span></span>
22 執行結果:<br/>
23 </h3>
24 <p>
25 {exception}
26 </p> ";
27 return htmlbody;
28 }
1 //使用redis
2 config.UseRedisStorage(Redis, new Hangfire.Redis.RedisStorageOptions()
3 {
4 FetchTimeout=TimeSpan.FromMinutes(5),
5 Prefix = "{hangfire}:",
6 //活動服務器超時時間
7 InvisibilityTimeout = TimeSpan.FromHours(1),
8 //任務過期檢查頻率
9 ExpiryCheckInterval = TimeSpan.FromHours(1),
10 DeletedListSize = 10000,
11 SucceededListSize = 10000
12 })
13 .UseHangfireHttpJob(new HangfireHttpJobOptions()
14 {
15 SendToMailList = HangfireSettings.Instance.SendMailList,
16 SendMailAddress = HangfireSettings.Instance.SendMailAddress,
17 SMTPServerAddress = HangfireSettings.Instance.SMTPServerAddress,
18 SMTPPort = HangfireSettings.Instance.SMTPPort,
19 SMTPPwd = HangfireSettings.Instance.SMTPPwd,
20 SMTPSubject = HangfireSettings.Instance.SMTPSubject
21 })
6,signalR 推送:宿主程序使用的weapi,因此可以通過webapi推送,這樣做的好處是可以將服務當作推送服務使用,第三方接口也可以利用此來推送,
1 /// <summary>
2 ///用戶加入組處理
3 /// </summary>
4 /// <param name="userid">用戶唯一標識</param>
5 /// <param name="GroupName">組名稱</param>
6 /// <returns></returns>
7 public Task InitUsers(string userid,string GroupName)
8 {
9 Console.WriteLine($"{userid}加入用戶組");
10 Groups.AddToGroupAsync(Context.ConnectionId, GroupName);
11 SignalrGroups.UserGroups.Add(new SignalrGroups()
12 {
13 ConnectionId = Context.ConnectionId,
14 GroupName = GroupName,
15 UserId = userid
16 });
17 return Clients.All.SendAsync("UserJoin", "用戶組數據更新,新增id為:" + Context.ConnectionId + " pid:" + userid);
18 }
19 /// <summary>
20 /// 斷線的時候處理
21 /// </summary>
22 /// <param name="exception"></param>
23 /// <returns></returns>
24 public override Task OnDisconnectedAsync(Exception exception)
25 {
26 //掉線移除用戶,不給其推送
27 var user = SignalrGroups.UserGroups.FirstOrDefault(c => c.ConnectionId == Context.ConnectionId);
28
29 if (user != null)
30 {
31 Console.WriteLine($"用戶:{user.UserId}已離線");
32 SignalrGroups.UserGroups.Remove(user);
33 Groups.RemoveFromGroupAsync(Context.ConnectionId, user.GroupName);
34 }
35 return base.OnDisconnectedAsync(exception);
36 }
1 /// <summary>
2 /// 單個connectionid推送
3 /// </summary>
4 /// <param name="groups"></param>
5 /// <returns></returns>
6 [HttpPost, Route("AnyOne")]
7 public IActionResult AnyOne([FromBody]IEnumerable<SignalrGroups> groups)
8 {
9 if (groups != null && groups.Any())
10 {
11 var ids = groups.Select(c => c.UserId);
12 var list = SignalrGroups.UserGroups.Where(c => ids.Contains(c.UserId));
13 foreach (var item in list)
14 hubContext.Clients.Client(item.ConnectionId).SendAsync("AnyOne", $"{item.ConnectionId}: {item.Content}");
15 }
16 return Ok();
17 }
18
19 /// <summary>
20 /// 全部推送
21 /// </summary>
22 /// <param name="message"></param>
23 /// <returns></returns>
24 [HttpPost, Route("EveryOne")]
25 public IActionResult EveryOne([FromBody] MSG body)
26 {
27 var data = HttpContext.Response.Body;
28 hubContext.Clients.All.SendAsync("EveryOne", $"{body.message}");
29 return Ok();
30 }
31
32 /// <summary>
33 /// 單個組推送
34 /// </summary>
35 /// <param name="group"></param>
36 /// <returns></returns>
37 [HttpPost, Route("AnyGroups")]
38 public IActionResult AnyGroups([FromBody]SignalrGroups group)
39 {
40 if (group != null)
41 {
42 hubContext.Clients.Group(group.GroupName).SendAsync("AnyGroups", $"{group.Content}");
43 }
44 return Ok();
45 }
7,接口健康檢查:因為主要用來調用api接口,因此集成接口健康檢查還是很有必要的,目前使用的方式是配置文件中添加需要檢查的地址
1 /*健康檢查配置項*/
2 "HealthChecks-UI": {
3 /*檢查地址,可以配置當前程序和外部程序*/
4 "HealthChecks": [
5 {
6 "Name": "Hangfire Api 健康檢查",
7 "Uri": "http://localhost:9006/healthz"
8 }
9 ],
10 /*需要檢查的Api地址*/
11 "CheckUrls": [
12 {
13 "Uri": "http://localhost:17600/CityService.svc/HealthyCheck",
14 "httpMethod": "Get"
15 },
16 {
17 "Uri": "http://localhost:9098/CheckHelath",
18 "httpMethod": "Post"
19 },
20 {
21 "Uri": "http://localhost:9067/GrtHelathCheck",
22 "httpMethod": "Get"
23 },
24 {
25 "Uri": "http://localhost:9043/GrtHelathCheck",
26 "httpMethod": "Get"
27 }
28 ],
29 "Webhooks": [], //鈎子配置
30 "EvaluationTimeOnSeconds": 10, //檢測頻率
31 "MinimumSecondsBetweenFailureNotifications": 60, //推送間隔時間
32 "HealthCheckDatabaseConnectionString": "Data Source=\\healthchecksdb" //-> sqlite庫存儲檢查配置及日志信息
33 }
后台會根據配置的指定間隔去檢查服務接口是否可以正常訪問,(這個中間件可以實現很多檢查功能,包括網絡,數據庫,mq等,支持webhook推送等豐富功能,系統用不到因此沒有添加)
健康檢查的配置
1 //添加健康檢查地址
2 HangfireSettings.Instance.HostServers.ForEach(s =>
3 {
4 services.AddHealthChecks().AddUrlGroup(new Uri(s.Uri), s.httpMethod.ToLower() == "post" ? HttpMethod.Post : HttpMethod.Get, $"{s.Uri}");
5 });
1 app.UseHealthChecks("/healthz", new HealthCheckOptions()
2 {
3 Predicate = _ => true,
4 ResponseWriter = UIResponseWriter.WriteHealthCheckUIResponse
5 });
6 app.UseHealthChecks("/health", options);//獲取自定義格式的json數據
7 app.UseHealthChecksUI(setup =>
8 {
9 setup.UIPath = "/hc"; // 健康檢查的UI面板地址
10 setup.ApiPath = "/hc-api"; // 用於api獲取json的檢查數據
11 });
其中,ui配置路徑是在面板中展示檢查結果需要使用的地址
api地址,可以通過接口的方式來調用檢查結果,方便在第三方系統中使用,其數據格式可以自定義

通過接口調用
1 [{
2 "id": 1,
3 "status": "Unhealthy",
4 "onStateFrom": "2019-04-07T18:00:09.6996751+08:00",
5 "lastExecuted": "2019-04-07T18:05:03.4761739+08:00",
6 "uri": "http://localhost:53583/healthz",
7 "name": "Hangfire Api 健康檢查",
8 "discoveryService": null,
9 "entries": [{
10 "id": 1,
11 "name": "http://localhost:17600/CityService.svc/HealthyCheck",
12 "status": "Unhealthy",
13 "description": "An error occurred while sending the request.",
14 "duration": "00:00:04.3907375"
15 }, {
16 "id": 2,
17 "name": "http://localhost:9098/CheckHelath",
18 "status": "Unhealthy",
19 "description": "An error occurred while sending the request.",
20 "duration": "00:00:04.4140310"
21 }, {
22 "id": 3,
23 "name": "http://localhost:9067/GrtHelathCheck",
24 "status": "Unhealthy",
25 "description": "An error occurred while sending the request.",
26 "duration": "00:00:04.3847367"
27 }, {
28 "id": 4,
29 "name": "http://localhost:9043/GrtHelathCheck",
30 "status": "Unhealthy",
31 "description": "An error occurred while sending the request.",
32 "duration": "00:00:04.4499007"
33 }],
34 "history": []
35 }]
1 {
2 "status": "Unhealthy",
3 "errors": [{
4 "key": "http://localhost:17600/CityService.svc/HealthyCheck",
5 "value": "Unhealthy"
6 }, {
7 "key": "http://localhost:9098/CheckHelath",
8 "value": "Unhealthy"
9 }, {
10 "key": "http://localhost:9067/GrtHelathCheck",
11 "value": "Unhealthy"
12 }, {
13 "key": "http://localhost:9043/GrtHelathCheck",
14 "value": "Unhealthy"
15 }]
16 }
1 //重寫json報告數據,可用於遠程調用獲取健康檢查結果
2 var options = new HealthCheckOptions
3 {
4 ResponseWriter = async (c, r) =>
5 {
6 c.Response.ContentType = "application/json";
7
8 var result = JsonConvert.SerializeObject(new
9 {
10 status = r.Status.ToString(),
11 errors = r.Entries.Select(e => new { key = e.Key, value = e.Value.Status.ToString() })
12 });
13 await c.Response.WriteAsync(result);
14 }
15 };
8,通過接口添加任務:添加編輯周期任務,添加計划任務,觸發周期任務,刪除周期任務,多個任務連續一次執行的任務
1 /// <summary>
2 /// 添加一個隊列任務立即被執行
3 /// </summary>
4 /// <param name="httpJob"></param>
5 /// <returns></returns>
6 [HttpPost, Route("AddBackGroundJob")]
7 public JsonResult AddBackGroundJob([FromBody] Hangfire.HttpJob.Server.HttpJobItem httpJob)
8 {
9 var addreslut = string.Empty;
10 try
11 {
12 addreslut = BackgroundJob.Enqueue(() => Hangfire.HttpJob.Server.HttpJob.Excute(httpJob, httpJob.JobName, null));
13 }
14 catch (Exception ec)
15 {
16 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });
17 }
18 return Json(new Message() { Code = true, ErrorMessage = "" });
19 }
20
21 /// <summary>
22 /// 添加一個周期任務
23 /// </summary>
24 /// <param name="httpJob"></param>
25 /// <returns></returns>
26 [HttpPost, Route("AddOrUpdateRecurringJob")]
27 public JsonResult AddOrUpdateRecurringJob([FromBody] Hangfire.HttpJob.Server.HttpJobItem httpJob)
28 {
29 try
30 {
31 RecurringJob.AddOrUpdate(httpJob.JobName, () => Hangfire.HttpJob.Server.HttpJob.Excute(httpJob, httpJob.JobName, null), httpJob.Corn, TimeZoneInfo.Local);
32 }
33 catch (Exception ec)
34 {
35 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });
36 }
37 return Json(new Message() { Code = true, ErrorMessage = "" });
38 }
39
40 /// <summary>
41 /// 刪除一個周期任務
42 /// </summary>
43 /// <param name="jobname"></param>
44 /// <returns></returns>
45 [HttpGet,Route("DeleteJob")]
46 public JsonResult DeleteJob(string jobname)
47 {
48 try
49 {
50 RecurringJob.RemoveIfExists(jobname);
51 }
52 catch (Exception ec)
53 {
54 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });
55 }
56 return Json(new Message() { Code = true, ErrorMessage = "" });
57 }
58 /// <summary>
59 /// 手動觸發一個任務
60 /// </summary>
61 /// <param name="jobname"></param>
62 /// <returns></returns>
63 [HttpGet, Route("TriggerRecurringJob")]
64 public JsonResult TriggerRecurringJob(string jobname)
65 {
66 try
67 {
68 RecurringJob.Trigger(jobname);
69 }
70 catch (Exception ec)
71 {
72 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });
73 }
74 return Json(new Message() { Code = true, ErrorMessage = "" });
75 }
76 /// <summary>
77 /// 添加一個延遲任務
78 /// </summary>
79 /// <param name="httpJob">httpJob.DelayFromMinutes(延遲多少分鍾執行)</param>
80 /// <returns></returns>
81 [HttpPost, Route("AddScheduleJob")]
82 public JsonResult AddScheduleJob([FromBody] Hangfire.HttpJob.Server.HttpJobItem httpJob)
83 {
84 var reslut = string.Empty;
85 try
86 {
87 reslut = BackgroundJob.Schedule(() => Hangfire.HttpJob.Server.HttpJob.Excute(httpJob, httpJob.JobName, null), TimeSpan.FromMinutes(httpJob.DelayFromMinutes));
88 }
89 catch (Exception ec)
90 {
91 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });
92 }
93 return Json(new Message() { Code = true, ErrorMessage = "" });
94 }
95 /// <summary>
96 /// 添加連續任務,多個任務依次執行,只執行一次
97 /// </summary>
98 /// <param name="httpJob"></param>
99 /// <returns></returns>
100 [HttpPost, Route("AddContinueJob")]
101 public JsonResult AddContinueJob([FromBody] List<Hangfire.HttpJob.Server.HttpJobItem> httpJobItems)
102 {
103 var reslut = string.Empty;
104 var jobid = string.Empty;
105 try
106 {
107 httpJobItems.ForEach(k =>
108 {
109 if (!string.IsNullOrEmpty(jobid))
110 {
111 jobid = BackgroundJob.ContinueJobWith(jobid, () => RunContinueJob(k));
112 }
113 else
114 {
115 jobid = BackgroundJob.Enqueue(() => Hangfire.HttpJob.Server.HttpJob.Excute(k, k.JobName, null));
116 }
117 });
118 reslut = "true";
119 }
120 catch (Exception ec)
121 {
122 return Json(new Message() { Code = false, ErrorMessage = ec.ToString() });
123 }
124 return Json(new Message() { Code = true, ErrorMessage = "" });
125 }
這樣做的好處是有效利用了宿主的webapi,而且無需登錄控制面板操作就能實現任務管理,方便集成管理到其他系統中
防止多個實例的任務並行執行,即一個任務未執行完成,另一個相同的任務開始執行,可以使用分布式鎖來解決
通過特性來添加任務重試時間間隔(hangfire 1.7 新增,單位/秒),重試次數,隊列名稱,任務名稱,以及分布式鎖超時時間
1 /// <summary>
2 /// 執行任務,DelaysInSeconds(重試時間間隔/單位秒)
3 /// </summary>
4 /// <param name="item"></param>
5 /// <param name="jobName"></param>
6 /// <param name="context"></param>
7 [AutomaticRetry(Attempts = 3, DelaysInSeconds = new[] { 30, 60, 90 }, LogEvents = true, OnAttemptsExceeded = AttemptsExceededAction.Fail)]
8 [DisplayName("Api任務:{1}")]
9 [Queue("apis")]
10 [JobFilter(timeoutInSeconds: 3600)]
1 //設置分布式鎖,分布式鎖會阻止兩個相同的任務並發執行,用任務名稱和方法名稱作為鎖
2 var jobresource = $"{filterContext.BackgroundJob.Job.Args[1]}.{filterContext.BackgroundJob.Job.Method.Name}";
3 var locktimeout = TimeSpan.FromSeconds(_timeoutInSeconds);
4 try
5 {
6 //判斷任務是否被暫停
7 using (var connection = JobStorage.Current.GetConnection())
8 {
9 var conts = connection.GetAllItemsFromSet($"JobPauseOf:{filterContext.BackgroundJob.Job.Args[1]}");
10 if (conts.Contains("true"))
11 {
12 filterContext.Canceled = true;//任務被暫停不執行直接跳過
13 return;
14 }
15 }
16 //申請分布式鎖
17 var distributedLock = filterContext.Connection.AcquireDistributedLock(jobresource, locktimeout);
18 filterContext.Items["DistributedLock"] = distributedLock;
19 }
20 catch (Exception ec)
21 {
22 //獲取鎖超時,取消任務,任務會默認置為成功
23 filterContext.Canceled = true;
24 logger.Info($"任務{filterContext.BackgroundJob.Job.Args[1]}超時,任務id{filterContext.BackgroundJob.Id}");
25 }
1 if (!filterContext.Items.ContainsKey("DistributedLock"))
2 {
3 throw new InvalidOperationException("找不到分布式鎖,沒有為該任務申請分布式鎖.");
4 }
5 //釋放分布式鎖
6 var distributedLock = (IDisposable)filterContext.Items["DistributedLock"];
7 distributedLock.Dispose();
通過過濾器來設置任務過期時間,過期后自動在數據庫刪除歷史記錄
1 public void OnStateApplied(ApplyStateContext context, IWriteOnlyTransaction transaction)
2 {
3 //設置過期時間,任務將在三天后過期,過期的任務會自動被掃描並刪除
4 context.JobExpirationTimeout = TimeSpan.FromDays(3);
5 }
redis集群下,測試秒級任務
集群為windws環境下,一個主節點四個從節點,(使用時需要在redis連接中配置全部集群連接,主節點和從節點),目前用不到linux環境,沒有進行測試。


