作者:RobanLee
原創文章,轉載請注明: 蘿卜李 http://www.robanlee.com
源碼在這里: https://github.com/robanlee123/RobCron
時間有限,就不詳細注釋,有問題或者意見歡迎@我,也歡迎大家批評指正.
本文所必須的一些資料如下:
1. NODEJS ==> 可以去NODEJS.ORG下載最新的源碼.
2. Redis ==> Redis.io
3. KUE ==> Nodejs的一個開源隊列系統
4. NODE-SCHEDULE ==> NODEJS 一個開源調度系統
廢話不多說,先來介紹任務的流程:
1. NODEJS或者PHP或者其他語言 寫入REDIS 一個計划任務, 比如每分鍾做某件事,這里就用SAYHELLO來代替好了
2. 使用NODEJS讀取這個任務,並將它轉換為NODE的調度任務(node-schedule 來完成)
3. 調度器[node-schedule]根據設定的規則來分發任務.
4. KUE接受任務,並且加入列隊,執行.
5. DONE
STEP1: 創建一個任務
/** * Add task * @author Robanlee@gmail.com */ //加載函數,集中加載一些LIB,這個源碼請參照最后的附屬文件 var loader = require('./loader'); function addTask(opts){ new loader(this); //默認設置 this.opts = { keyIDs:'schedule:job:ids', keyLists:'schedule:job:list', keyJob:'schedule:job:' } //合並配置,類似JQUERY: extend this.mergeParams(opts); }; //Merge options addTask.prototype.mergeParams = function ( param ){ if(undefined === this.opts ) { return false; } for(var x in param) { if(param[x] != undefined && '' != param[x]) { this.opts[x] = param[x]; } } }; //添加數據方法 addTask.prototype.pushData = function ( data ){ if(undefined == data ) { console.log('--ERROR:data is null'); return false; } this.getIncr.call(this,function(response,obj){ var id = response; obj.redisClient.rpush(obj.opts.keyLists,id,function(err,response){ if(err) throw err; }); data.id = id; var m = obj.redisClient.multi(); for(var x in data) { m.hset( obj.opts.keyJob+id,x,data[x] ); } m.exec(function(err,response){ if(err) throw err; console.log('[info] Task: ['+data.name+'] has been set successful!'); }); }); }; //獲取REDIS目前的自增ID addTask.prototype.getIncr = function (callBack){ var obj = this; this.redisClient.incr(this.opts.keyIDs,function(err,response){ console.log("[info] Current id is : " + response); callBack(response, obj); }); };
加載這個lib 寫入一個DEMO:
var data = { 'name':'taskDemo', 'created':Date.now(), 'state':1, 'type':'untitled', 'rule':'*/1 * * * *' //這個任務規則可以為CRONTAB的規則,這個表示每分鍾執行一次 }; var job = new addTask(); job.pushData(data);
執行這個腳本,如果一切正常,你會看到如下信息:
NODEJS 輸出:
REDIS:
接下來就是獲取數據,並且轉換為調度任務了,
源碼:
var loader = require('./loader'); var taskLog = require("./TaskLog"); function scheduleTask(){ new loader(this); this.opts = { keyIDs:'schedule:job:ids', keyLists:'schedule:job:list', keyJob:'schedule:job:' } this.task = { taskDemo:undefined }; //監聽取消任務操作 this.listenCancel(); }; scheduleTask.prototype.setScheduleTask = function (data,obj){ this.task[data.name] = this.libs['node-schedule'].scheduleJob(data.rule,function(){ obj.setQueue(data); console.log('[info] Task :' + data.name + ' has been set in queue!'); }); }; scheduleTask.prototype.setQueue = function (datas){ var jobs = this.libs.kue.createQueue(); jobs.create(datas.name,{ 'name:':datas.name, 'state':1 }).save(); console.log("[info] Task ["+datas.name+"] has been queued!"); this.setLog(datas); }; scheduleTask.prototype.setLog = function (responseData){ var logData = { jobid:responseData.id, name:responseData.name, result:1 }; new taskLog(logData); console.log("[info] Task has been loged"); }; scheduleTask.prototype.getJob = function (){ this.getJobIndex.call(this,function(response,obj){ for(var x in response ) { obj.redisClient.hgetall(obj.opts.keyJob+response[x],function(err,data){ console.log("[info] Task:["+data.name+"] has been loaded!"); obj.setScheduleTask(data, obj); }); } }); }; scheduleTask.prototype.getJobIndex = function(callBack){ //Read tasks from <list schedule:job:list> var o = this; this.redisClient.lrange(this.opts.keyLists,0,-1,function(err,response){ if(err) throw err; callBack(response, o); }); }; scheduleTask.prototype.listenCancel = function (){ var job = this.libs.kue.createQueue(); var that = this; job.process('cancelJob',function(data,done){ that.task[data.data.data].cancel(); console.log('[info] Task: '+data.data.data + ' has been canceled') ; done(); }); }
執行代碼:
var x = new scheduleTask(); x.getJob();
等待一分鍾后,NODEJS控制台會輸出(這個任務在沒有取消的情況下,每分鍾都會執行):
第二分鍾:
REDIS 現在的數據:
這個數據中增加了KUE的一些任務, q:job:[]:inactive 這個就標識任務還未被執行,執行后的任務狀態有
complete active failed delay 四種
至此,就只剩下執行任務的步驟了
var loader = require('./loader'); function execTask(){ new loader(this); var job = this.libs.kue.createQueue(); job.process('taskDemo',function(data,done){ console.log('[info] Task:'+data.type+'#'+data.id+' has been executed successful!'); //DONE之前可以做你想要做的事情 done(); //千萬別忘記調用此方法 }); } //添加一個取消定時任務的KUE任務 execTask.prototype.addCancelJob = function (){ var job =this.libs.kue.createQueue(); job.create('cancelJob', {data:'taskDemo'}).save(); console.log('[info] Task: cancelJob has been send!'); }
執行這個腳本:
var et = new execTask(); //取消定時任務 et.addCancelJob();
執行后會有2個結果
1. 程序會執行當前列隊里的任務.
2. 定時任務會被取消,下一分鍾后任務不會再由SCHEDULE分配
任務執行結果:
取消任務的回應:
注意最后一行…