Quartz.Net分布式任務管理平台


       無關主題:一段時間沒有更新文章了,與自己心里的堅持還是背馳,雖然這期間在公司做了統計分析,由於資源分配問題,自己或多或少的原因,確實拖得有點久了,自己這段時間也有點松懈,借口就不說那么多了,還是進入主題吧。


       前言:我相信大多數人公司的業務上都有定時任務這么個功能,我們公司也不例外,剛來公司的時候使用Quartz.Net為我們組做了第一個任務,大致流程是新建一個控制台程序,引用需要的程序集,Execute方法中寫着咱們需要定時的任務的業務邏輯,同樣這邊需要用的一些數據庫操作類引用的是WebApi項目中的一些類庫,然后拿着服務去服務器上部署。隨着時間的推移問題和不方便性慢慢被暴露,總結一下這樣的方式在我公司發生的問題

第一:隨着任務量的增加,我會分配新的同事去開發這樣的任務,可能由於我沒有交流清楚,導致新的同事會自己下載所需的程序集,上面已經說了服務會引用WebApi中的類庫會導致同樣的程序集本版不匹配,同樣改變Api中的一些程序集依然會出現這樣的問題。

第二:業務邏輯集成在了服務類中很多時候產品“大大”會更改邏輯由於部署的方式 那么我們需要下載服務重新編譯再次部署。

第三:任務在服務器上沒有監聽那么意味着服務掛了我們沒有感知,案例:我們組會和其他組有着共同的任務,由於一些原因導致數據可能沒有正確的相應,這個服務會去檢測,客戶問我昨晚的數據怎么到現在還是這樣,排查后發現服務已經掛了。

第四:我們的服務中充滿了業務邏輯導致邏輯分散,同樣壓力交給了定時服務,那么會導致服務的運行周期和任務執行的時間會沖突。

第五:就是可能部署的人今天請假了,導致這樣的部署方式變得不在那么容易。

那么問題出現了,看見了總不能當作沒發生,開始自行研究得到今天要分享的主題,但是針對上面的問題我相信很多的解決方案,在各位園友的公司中使用了上述方式沒問題也是有可能的。


      主題:為了解決上面的問題,便捷性,做出如下“架構圖”:

 

對應我們的項目層次圖如下:

說一下兩幅圖的對應關系按順序稱一圖和二圖:

一圖中的Quartz服務節點對應這二圖中的Quartz.Net_RemoteServer

一圖中的任務基類對應着二圖中的Quartz.Net_JobBase

一圖中的任務子類對應着二圖中的TestJob1,TestJob2

對於Mvc站點中我們使用了兩層,關於分層這一說比較藝術不作過多討論,我們項目中沒有什么業務邏輯Web直接和“倉儲”直接做交互,使用的是EF(ORM);表現層我們使用BootStrap和Vue.Js完成前端工作。·

 

 

先簡單對一圖作一下解釋:

第一部分:Quartz服務節點是我們任務運行的調度器,既然是分布式,我們會將調度器部署在三台服務器,Quartz默認是基於內存的既然我們要分布式 ,我們需要持久化,本版本是基於SqlServer,同時Quartz框架在數據庫中用鎖實現了每個任務只會被一台服務器調用,那么當某個服務器上的調度器掛掉之后,Quartz會檢測發現掛了之后會使用其他服務器上的節點來接手掛掉節點中的所有任務,這個都是Quartz自身提供的。具體我們看Quartz.Net_RemoteServer類庫:

 

Program類中我們將配置調度器這里使用代碼配置:

 1   class Program
 2     {
 3         static void Main(string[] args)
 4 
 5         {
 6             var properties = new NameValueCollection();
 7             properties["quartz.scheduler.instanceName"] = "RemoteServer";
 8             properties["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz";
 9             properties["quartz.threadPool.threadCount"] = "5";
10             properties["quartz.threadPool.threadPriority"] = "Normal";
11             properties["quartz.scheduler.exporter.type"] = "Quartz.Simpl.RemotingSchedulerExporter, Quartz";
12             properties["quartz.scheduler.exporter.port"] = "555";//端口號
13             properties["quartz.scheduler.exporter.bindName"] = "QuartzScheduler";//名稱
14             properties["quartz.scheduler.exporter.channelType"] = "tcp";//通道名
15             properties["quartz.scheduler.exporter.channelName"] = "httpQuartz";
16             properties["quartz.scheduler.exporter.rejectRemoteRequests"] = "true";
17             properties["quartz.jobStore.clustered"] = "true";//集群配置
18             //下面為指定quartz持久化數據庫的配置
19             properties["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz";
20             properties["quartz.jobStore.tablePrefix"] = "QRTZ_";
21             properties["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.SqlServerDelegate, Quartz";
22             properties["quartz.jobStore.dataSource"] = "myDS";
23             properties["quartz.dataSource.myDS.connectionString"] = @"Data Source=.;Initial Catalog=QuartzManager;User ID=sa;Password=123456";
24             properties["quartz.dataSource.myDS.provider"] = "SqlServer-20";
25             properties["quartz.scheduler.instanceId"] = "AUTO";
26             var schedulerFactory = new StdSchedulerFactory(properties);
27             var scheduler = schedulerFactory.GetScheduler();
28             scheduler.ListenerManager.AddJobListener(new MyJobListener(), GroupMatcher<JobKey>.AnyGroup());
29             scheduler.Start();
30 
31         }
32     }

這是Quartz代碼配置,很多項感興趣的大家可以自行搜索一下意思;這邊注釋了客戶端將要用的信息,同時我們增加監聽器來監聽運行服務的狀況,Quartz提供了三種監聽IJobListener,ITriggerListener,ISchedulerListener,和相應的實現類JobListenerSupport,TriggerListenerSupport,SchedulerListenerSupport,在這里我們使用的job的監聽器同時我們繼承實現類就好了,因為很多很方法不需要。

我們會在任務完成時向數據庫中寫入此任務當前狀態,或者異常信息

 1                 jobId = Convert.ToInt32(context.JobDetail.JobDataMap["jobId"]);
 2                 name = context.Scheduler.GetTriggerState(context.Trigger.Key).ToString();
 3                 triggerState = _changeType(context.Scheduler.GetTriggerState(context.Trigger.Key));
 4                 customerJobInfoModel = _customerJobInfoRepository.LoadCustomerInfo(x => x.Id == jobId);
 5                 customerJobInfoModel.TriggerState = triggerState;
 6                 if (jobException == null)
 7                 {
 8 
 9 
10                     Console.WriteLine("任務編號{0};執行時間:{1},狀態:{2}", context.JobDetail.JobDataMap["jobId"], DateTime.Now, name);
11                 }
12                 else
13                 {
14                     customerJobInfoModel.Exception = jobException.Message;
15                     Console.WriteLine("jobId{0}執行失敗:{1}", context.JobDetail.JobDataMap["jobId"], jobException.Message);
16                 }
17                 _customerJobInfoRepository.UpdateCustomerJobInfo(customerJobInfoModel);

 

第二部分:要想實現任務執行我們得實現Quartz提供得Excute方法,也就是咱們的具體任務類,任務基類的設計來目的是:首先它是個抽象類,我們的任務基類會引用所需程序集並實現Excute方法,並提供子類必須要實現的方法,這樣我們具體開發任務的時候繼承我們的基類實現基類中的方法 而我們任務子類除了引用了基類不會在引用其他第三方程序集。繼續看Quartz.Net_RemoteServer類庫:

 

在這里就像看到一樣我們將需要的程序集在基類中引用包括Quartz.dll,日志等,BaseJob封裝了任務執行所必要的方法,和子類必須實現的方法:

 1   public abstract class JobBase : IJob
 2     {
 3         public string RequestUrl { get; set; }
 4         public JobBase()
 5         {
 6             SetRequestUrl();
 7         }
 8         public virtual void Execute(IJobExecutionContext context)
 9         {
10             try
11             {
12                 HttpClient hc = new HttpClient();
13                 hc.GetAsync(RequestUrl);
14             }
15             catch (Exception ex)
16             {
17                 throw new Exception(ex.ToString());
18             }
19         }
20         public abstract void SetRequestUrl();
21     }

同時可以看到我們的執行的任務本質上就是調用了Api中實現業務的接口,至此我們將業務邏輯和和部分壓力點轉移到Api中是我們任務做的東西和職責很明確:根據運行周期來執行一個業務,業務本身並不屬於自己的功能點,接下來我們在看看具體子類,上面的這些東西在我這邊完成后,想加任務的同事只需要編寫任務子類就可以在通過管理界面操作,子類的代碼如下:

 1 using Quartz.Net_JobItem;
 2 using System;
 3 using System.Collections.Generic;
 4 using System.Linq;
 5 using System.Text;
 6 using System.Threading.Tasks;
 7 
 8 namespace TestJob1
 9 {
10     public class Job1:JobBase
11     {
12         public Job1() {
13             SetRequestUrl();
14         }
15         public override void SetRequestUrl()
16         {
17             base.RequestUrl = "http://localhost:53582/QuartzJobManage/Test";
18         }
19     }
20 }

可以看到已經簡單的動動小手指就可以了,輕松完成,同時咱們可以看到我們除了引用了基類沒有引用其他第三方的框架,來避免上面的問題出現。基類在編譯dll已經放入了Quartz的服務節點和Web站點中。子類我們同樣需要放入着兩個類庫中,方便Quartz服務節點可以找到要執行的任務,和Web站點做反射需要

第三部分:Mvc站點將提供可視化的管理界面。首先看一下界面的模樣:

導航欄是Quartz任務所有的狀態,列表是我們任務的具體信息,在一圖中中我們將在任務可視化界面中提供:添加任務,運行任務,修改任務運行周期,暫停任務,恢復任務,刪除任務操作,也是目前我們需要在公司上線的第一個版本所有功能點。每個狀態下的任務能使用的功能會有所不一樣,接下來將逐一展示以上提到的功能點實現:

1:添加功能:我們需要將Quartz需要的任務的和我們需要的信息存入到我們自己的表中,以便運行任務的時候從表中取出任務信息提供Quartz。

添加任務的界面,我們需要給出這樣的信息以及咱們自己編寫的任務子類的dll,我們在這會將dll保存到Quartz服務節點和Web站點中。任務類全名是命名空間加上子類的名字。其他都是Quartz文檔上都是會介紹的。程序集名稱和上傳的文件匹配。具體代碼如下:

 1    [HttpPost]
 2         /// <summary>
 3         /// 添加任務
 4         /// </summary>
 5         /// <param name="jobName">任務名稱</param>
 6         /// <param name="jobGroupName">任務所在組名稱</param>
 7         /// <param name="triggerName">觸發器名稱</param>
 8         /// <param name="triggerGroupName">觸發器所在的組名稱</param>
 9         /// <param name="cron">執行周期表達式</param>
10         /// <param name="dllName">任務程序集名稱(xxxx.dll)</param>
11         /// <param name="fullJobName">任務類全名</param>
12         /// <param name="jobDescription">任務描述</param>
13         /// <param name="jobstartTime">開始時間</param>
14         /// <returns></returns>
15 
16         public JsonResult AddJob(string jobName, string jobGroupName, string triggerName, string triggerGroupName, string cron, string dllName, string fullJobName, string jobDescription, DateTime jobStartTime)
17         {
18             HttpPostedFileBase dllFile = Request.Files[0] as HttpPostedFileBase;
19             if (dllFile == null || dllFile.ContentLength <= 0)
20             {
21                 return Json(ResponseDataFactory.CreateAjaxResponseData("-10003", "無任務文件", null));
22             }
23             _savePathInWeb(dllFile);
24             //TODO:添加到數據庫自己的表
25             var jobId = _customerJobInfoRepository.AddCustomerJobInfo(jobName, jobGroupName, triggerName, triggerGroupName, cron, dllName, fullJobName, jobDescription, jobStartTime);
26             _savePathInRemoteServer(dllFile);
27             return Json(ResponseDataFactory.CreateAjaxResponseData("1", "添加成功", jobId));
28 
29         }

添加完之后界面會變成如下這樣:

可以看到這個任務可以執行運行和刪除,運行周期是10秒鍾執行一次。下面接着點擊運行讓這個任務跑起來:

這樣這個任務就以咱們設置的運行周期運行起來,同時運行的任務提供了暫停,刪除,更改運行周期操作:先看任務的運行效果,Quartz服務節點鍾我們使用監聽器對服務的監聽同時輸出了一些信息:

同時運行任務執行背后的執行邏輯為:

 1   [HttpPost]
 2         /// <summary>
 3         /// 啟動任務
 4         /// </summary>
 5         /// <param name="jobId">任務編號</param>
 6         /// <returns></returns>
 7         public JsonResult RunJob(int jobId)
 8         {
 9             var ajaxResponseData = _operateJob(jobId, (jobDetail) => { jobDetail.TriggerState = 0; _customerJobInfoRepository.UpdateCustomerJobInfo(jobDetail); return _jobHelper.RunJob(jobDetail); });
10             return Json(ajaxResponseData);
11         }
 1         /// <summary>
 2         /// 運行任務
 3         /// </summary>
 4         /// <param name="jobInfo">任務信息</param>
 5         /// <returns></returns>
 6         public bool RunJob(Customer_JobInfo jobInfo)
 7         {
 8             Assembly assembly = Assembly.LoadFile(AppDomain.CurrentDomain.BaseDirectory + $"bin/{jobInfo.DLLName}");
 9             var type = assembly.GetType(jobInfo.FullJobName);
10             JobKey jobKey = _createJobKey(jobInfo.JobName, jobInfo.JobGroupName);
11             if (!_scheduler.CheckExists(jobKey))
12             {
13                 IJobDetail job = JobBuilder.Create(type)
14                     .WithIdentity(jobKey)
15                     .UsingJobData(_createJobDataMap("jobId", jobInfo.Id))
16                     .Build();
17 
18                 CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.CronSchedule(jobInfo.Cron);
19                 ITrigger trigger = TriggerBuilder.Create().StartNow()//StartAt(DateTime.SpecifyKind(jobInfo.JobStartTime, DateTimeKind.Local))
20                     .WithIdentity(jobInfo.TriggerName, jobInfo.TriggerGroupName)
21                     .ForJob(jobKey)
22                     .WithSchedule(scheduleBuilder.WithMisfireHandlingInstructionFireAndProceed())
23                     .Build();
24           
25 
26                 _scheduler.ScheduleJob(job, trigger);
27 
28             }
29             return true;
30         }

這里就用了到了我們第一步添加任務時上傳的dll和任務類的全名 我們通過加載程序集通過全名找到我們需要執行的任務類,同時假如額外信息,JobId,以便監聽器可以使用id更新自己的表。同樣我們可以更改運行周期來調節任務調度的頻次:

那么在更改運行周期之后我們需要重新構建觸發器

 1   /// <summary>
 2         /// 更改任務運行周期
 3         /// </summary>
 4         /// <param name="jobInfo">任務信息</param>
 5         /// <returns></returns>
 6         public bool ModifyJobCron(Customer_JobInfo jobInfo)
 7         {
 8             CronScheduleBuilder scheduleBuilder = CronScheduleBuilder.CronSchedule(jobInfo.Cron);
 9             var triggerKey = _createTriggerKey(jobInfo.TriggerName, jobInfo.TriggerGroupName);
10             ITrigger trigger = TriggerBuilder.Create().StartAt(DateTime.SpecifyKind(jobInfo.JobStartTime, DateTimeKind.Local))
11                     .WithIdentity(jobInfo.TriggerName, jobInfo.TriggerGroupName)
12                    .WithSchedule(scheduleBuilder.WithMisfireHandlingInstructionDoNothing())
13                     .Build();
14             _scheduler.RescheduleJob(triggerKey, trigger);
15             return true;
16         }

這個點是需要注意的!!!

接下來我們使用暫停功能:

暫停列表中我們對此任務可進行恢復和刪除操作,那么恢復操作就是恢復任務運行,刪除就是從調度器中刪除這個任務同時我們將自己的表中標記刪除。

到現在我們的整體流程就已經走了,但是最終我們的主題是分布式,當然這個已經能滿足一部分需求了,所以接下來由於環境有限我這邊會啟動兩個服務節點能模擬多個服務器然后我們關閉掉一個服務節點看另一個是否能拿到任務進行,啟動任務后我們可以看到:

當我們關閉剛才正在調度此任務的調度器之后看看另一個調度器的結果:

可以看到 另一個調度器執行剛才那個任務,原理是 我們在服務端設置了檢查時間每個調度器會以這個時間去數據庫鍾檢測另外的節點是否正常,當掛掉后會從數據庫鍾取出任務信息再次調度。至此我們的分布式調度平台就大致分享完了。由於是第一版,所以很多地方還不合理或者需要改進會在之后的時間磨合中不斷去完善。

最后定時框架有很多,大家要的是 第一符合自己的業務,第二自己能熟悉掌控的去選擇技術。


 

     題外:分享一些在這次主題鍾關於代碼這一塊的東西,當然對於大牛來說這都是小兒科了,大家可以看到這個項目代碼量並不是很大,沒有什么業務邏輯,層次也很簡單。

1:因為沒有什么業務邏輯的處理,我們不需要很多邏輯層,所以大家對分層這塊一定要具體情況具體對待。

2:面向抽象編程基本的東西我們還是要去遵守,即使像這個簡單的層次的項目,我項目中使用的MEF進行依賴注入的

 1     [Export]
 2     public class QuartzJobManageController : Controller
 3     {
 4 
 5         [Import("CustomerJobInfoRepository")]
 6         public ICustomerJobInfoRepository _customerJobInfoRepository { get; set; }
 7         [Import("JobHelper")]
 8         private JobHelper _jobHelper { get; set; }
 9         public QuartzJobManageController()
10         {
11         }
12 }
   [Export("CustomerJobInfoRepository",typeof(ICustomerJobInfoRepository))]
    internal class CustomerJobInfoRepository : ICustomerJobInfoRepository
    {
        private readonly QuartzManagerEntities _dbContext;
        public CustomerJobInfoRepository()
        {
            _dbContext = DbContextFactory.DbContext;
        }
}

同時可以看到我們實現類鍾的訪問級別,很多時候 上層依然可以看到具體的實現,接口形同虛設,包括上面所提到的任務基類和任務子類,一方面解決了我們程序集不同人可能引入不同版本,本質上還是抽象使依賴降到最低

3:同時一些工廠的使用,大家可以看看設計原則和設計模式

給大家一下參考文章:http://www.cnblogs.com/aflyun/p/6515813.html

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM